PacketMultiplexer.cpp 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  4. *
  5. * (c) ZeroTier, Inc.
  6. * https://www.zerotier.com/
  7. */
  8. #include "PacketMultiplexer.hpp"
  9. #include "Constants.hpp"
  10. #include "Node.hpp"
  11. #include "RuntimeEnvironment.hpp"
  12. #include <stdio.h>
  13. #include <stdlib.h>
  14. namespace ZeroTier {
  15. PacketMultiplexer::PacketMultiplexer(const RuntimeEnvironment* renv)
  16. {
  17. RR = renv;
  18. };
  19. void PacketMultiplexer::putFrame(void* tPtr, uint64_t nwid, void** nuptr, const MAC& source, const MAC& dest, unsigned int etherType, unsigned int vlanId, const void* data, unsigned int len, unsigned int flowId)
  20. {
  21. #if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
  22. RR->node->putFrame(tPtr, nwid, nuptr, source, dest, etherType, vlanId, (const void*)data, len);
  23. return;
  24. #endif
  25. if (! _enabled) {
  26. RR->node->putFrame(tPtr, nwid, nuptr, source, dest, etherType, vlanId, (const void*)data, len);
  27. return;
  28. }
  29. PacketRecord* packet;
  30. _rxPacketVector_m.lock();
  31. if (_rxPacketVector.empty()) {
  32. packet = new PacketRecord;
  33. }
  34. else {
  35. packet = _rxPacketVector.back();
  36. _rxPacketVector.pop_back();
  37. }
  38. _rxPacketVector_m.unlock();
  39. packet->tPtr = tPtr;
  40. packet->nwid = nwid;
  41. packet->nuptr = nuptr;
  42. packet->source = source.toInt();
  43. packet->dest = dest.toInt();
  44. packet->etherType = etherType;
  45. packet->vlanId = vlanId;
  46. packet->len = len;
  47. packet->flowId = flowId;
  48. memcpy(packet->data, data, len);
  49. int bucket = flowId % _concurrency;
  50. _rxPacketQueues[bucket]->postLimit(packet, 2048);
  51. }
  52. void PacketMultiplexer::setUpPostDecodeReceiveThreads(unsigned int concurrency, bool cpuPinningEnabled)
  53. {
  54. #if defined(__APPLE__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__WINDOWS__)
  55. return;
  56. #endif
  57. _enabled = true;
  58. _concurrency = concurrency;
  59. bool _enablePinning = cpuPinningEnabled;
  60. for (unsigned int i = 0; i < _concurrency; ++i) {
  61. fprintf(stderr, "Reserved queue for thread %d\n", i);
  62. _rxPacketQueues.push_back(new BlockingQueue<PacketRecord*>());
  63. }
  64. // Each thread picks from its own queue to feed into the core
  65. for (unsigned int i = 0; i < _concurrency; ++i) {
  66. _rxThreads.push_back(std::thread([this, i, _enablePinning]() {
  67. fprintf(stderr, "Created post-decode packet ingestion thread %d\n", i);
  68. PacketRecord* packet = nullptr;
  69. for (;;) {
  70. if (! _rxPacketQueues[i]->get(packet)) {
  71. break;
  72. }
  73. if (! packet) {
  74. break;
  75. }
  76. // fprintf(stderr, "popped packet from queue %d\n", i);
  77. MAC sourceMac = MAC(packet->source);
  78. MAC destMac = MAC(packet->dest);
  79. RR->node->putFrame(packet->tPtr, packet->nwid, packet->nuptr, sourceMac, destMac, packet->etherType, 0, (const void*)packet->data, packet->len);
  80. {
  81. Mutex::Lock l(_rxPacketVector_m);
  82. _rxPacketVector.push_back(packet);
  83. }
  84. /*
  85. if (ZT_ResultCode_isFatal(err)) {
  86. char tmp[256];
  87. OSUtils::ztsnprintf(tmp, sizeof(tmp), "error processing packet: %d", (int)err);
  88. Mutex::Lock _l(_termReason_m);
  89. _termReason = ONE_UNRECOVERABLE_ERROR;
  90. _fatalErrorMessage = tmp;
  91. this->terminate();
  92. break;
  93. }
  94. */
  95. }
  96. }));
  97. }
  98. }
  99. } // namespace ZeroTier