PacketMultiplexer.cpp 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. /*
  2. * Copyright (c)2013-2021 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2026-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include "PacketMultiplexer.hpp"
  14. #include "Node.hpp"
  15. #include "RuntimeEnvironment.hpp"
  16. #include "Constants.hpp"
  17. #include <stdio.h>
  18. #include <stdlib.h>
  19. namespace ZeroTier {
  20. PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
  21. {
  22. RR = renv;
  23. };
  24. void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId)
  25. {
  26. #if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
  27. RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len);
  28. return;
  29. #endif
  30. if (!_enabled) {
  31. RR->node->putFrame(tPtr,nwid,nuptr,source,dest,etherType,vlanId,(const void *)data,len);
  32. return;
  33. }
  34. PacketRecord* packet;
  35. _rxPacketVector_m.lock();
  36. if (_rxPacketVector.empty()) {
  37. packet = new PacketRecord;
  38. }
  39. else {
  40. packet = _rxPacketVector.back();
  41. _rxPacketVector.pop_back();
  42. }
  43. _rxPacketVector_m.unlock();
  44. packet->tPtr = tPtr;
  45. packet->nwid = nwid;
  46. packet->nuptr = nuptr;
  47. packet->source = source.toInt();
  48. packet->dest = dest.toInt();
  49. packet->etherType = etherType;
  50. packet->vlanId = vlanId;
  51. packet->len = len;
  52. packet->flowId = flowId;
  53. memcpy(packet->data, data, len);
  54. int bucket = flowId % _concurrency;
  55. _rxPacketQueues[bucket]->postLimit(packet, 2048);
  56. }
  57. void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled)
  58. {
  59. #if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
  60. return;
  61. #endif
  62. _enabled = true;
  63. _concurrency = concurrency;
  64. bool _enablePinning = cpuPinningEnabled;
  65. for (unsigned int i = 0; i < _concurrency; ++i) {
  66. fprintf(stderr, "Reserved queue for thread %d\n", i);
  67. _rxPacketQueues.push_back(new BlockingQueue<PacketRecord*>());
  68. }
  69. // Each thread picks from its own queue to feed into the core
  70. for (unsigned int i = 0; i < _concurrency; ++i) {
  71. _rxThreads.push_back(std::thread([this, i, _enablePinning]() {
  72. fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i);
  73. PacketRecord* packet = nullptr;
  74. for (;;) {
  75. if (! _rxPacketQueues[i]->get(packet)) {
  76. break;
  77. }
  78. if (! packet) {
  79. break;
  80. }
  81. // fprintf(stderr, "popped packet from queue %d\n", i);
  82. MAC sourceMac = MAC(packet->source);
  83. MAC destMac = MAC(packet->dest);
  84. RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len);
  85. {
  86. Mutex::Lock l(_rxPacketVector_m);
  87. _rxPacketVector.push_back(packet);
  88. }
  89. /*
  90. if (ZT_ResultCode_isFatal(err)) {
  91. char tmp[256];
  92. OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
  93. Mutex::Lock _l(_termReason_m);
  94. _termReason = ONE_UNRECOVERABLE_ERROR;
  95. _fatalErrorMessage = tmp;
  96. this->terminate();
  97. break;
  98. }
  99. */
  100. }
  101. }));
  102. }
  103. }
  104. } // namespace ZeroTier