Switch.cpp 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962
  1. /*
  2. * Copyright (c)2019 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2023-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include <stdio.h>
  14. #include <stdlib.h>
  15. #include <algorithm>
  16. #include <utility>
  17. #include <stdexcept>
  18. #include "../include/ZeroTierOne.h"
  19. #include "Constants.hpp"
  20. #include "RuntimeEnvironment.hpp"
  21. #include "Switch.hpp"
  22. #include "Node.hpp"
  23. #include "InetAddress.hpp"
  24. #include "Topology.hpp"
  25. #include "Peer.hpp"
  26. #include "SelfAwareness.hpp"
  27. #include "Packet.hpp"
  28. #include "Trace.hpp"
  29. namespace ZeroTier {
  30. Switch::Switch(const RuntimeEnvironment *renv) :
  31. RR(renv),
  32. _lastBeaconResponse(0),
  33. _lastCheckedQueues(0),
  34. _lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine
  35. {
  36. }
  37. void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddress &fromAddr,const void *data,unsigned int len)
  38. {
  39. try {
  40. const int64_t now = RR->node->now();
  41. const SharedPtr<Path> path(RR->topology->getPath(localSocket,fromAddr));
  42. path->received(now);
  43. if (len == 13) {
  44. /* LEGACY: before VERB_PUSH_DIRECT_PATHS, peers used broadcast
  45. * announcements on the LAN to solve the 'same network problem.' We
  46. * no longer send these, but we'll listen for them for a while to
  47. * locate peers with versions <1.0.4. */
  48. const Address beaconAddr(reinterpret_cast<const char *>(data) + 8,5);
  49. if (beaconAddr == RR->identity.address())
  50. return;
  51. if (!RR->node->shouldUsePathForZeroTierTraffic(tPtr,beaconAddr,localSocket,fromAddr))
  52. return;
  53. const SharedPtr<Peer> peer(RR->topology->get(beaconAddr));
  54. if (peer) { // we'll only respond to beacons from known peers
  55. if ((now - _lastBeaconResponse) >= 2500) { // limit rate of responses
  56. _lastBeaconResponse = now;
  57. Packet outp(peer->address(),RR->identity.address(),Packet::VERB_NOP);
  58. outp.armor(peer->key(),true);
  59. path->send(RR,tPtr,outp.data(),outp.size(),now);
  60. }
  61. }
  62. } else if (len > ZT_PROTO_MIN_FRAGMENT_LENGTH) { // SECURITY: min length check is important since we do some C-style stuff below!
  63. if (reinterpret_cast<const uint8_t *>(data)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR) {
  64. // Handle fragment ----------------------------------------------------
  65. Packet::Fragment fragment(data,len);
  66. const Address destination(fragment.destination());
  67. if (destination != RR->identity.address()) {
  68. if (fragment.hops() < ZT_RELAY_MAX_HOPS) {
  69. fragment.incrementHops();
  70. // Note: we don't bother initiating NAT-t for fragments, since heads will set that off.
  71. // It wouldn't hurt anything, just redundant and unnecessary.
  72. SharedPtr<Peer> relayTo = RR->topology->get(destination);
  73. if ((!relayTo)||(!relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now,false))) {
  74. // Don't know peer or no direct path -- so relay via someone upstream
  75. relayTo = RR->topology->findRelayTo(now,destination);
  76. if (relayTo)
  77. relayTo->sendDirect(tPtr,fragment.data(),fragment.size(),now,true);
  78. }
  79. }
  80. } else {
  81. // Fragment looks like ours
  82. const uint64_t fragmentPacketId = fragment.packetId();
  83. const unsigned int fragmentNumber = fragment.fragmentNumber();
  84. const unsigned int totalFragments = fragment.totalFragments();
  85. if ((totalFragments <= ZT_MAX_PACKET_FRAGMENTS)&&(fragmentNumber < ZT_MAX_PACKET_FRAGMENTS)&&(fragmentNumber > 0)&&(totalFragments > 1)) {
  86. // Fragment appears basically sane. Its fragment number must be
  87. // 1 or more, since a Packet with fragmented bit set is fragment 0.
  88. // Total fragments must be more than 1, otherwise why are we
  89. // seeing a Packet::Fragment?
  90. RXQueueEntry *const rq = _findRXQueueEntry(fragmentPacketId);
  91. Mutex::Lock rql(rq->lock);
  92. if (rq->packetId != fragmentPacketId) {
  93. // No packet found, so we received a fragment without its head.
  94. rq->timestamp = now;
  95. rq->packetId = fragmentPacketId;
  96. rq->frags[fragmentNumber - 1] = fragment;
  97. rq->totalFragments = totalFragments; // total fragment count is known
  98. rq->haveFragments = 1 << fragmentNumber; // we have only this fragment
  99. rq->complete = false;
  100. } else if (!(rq->haveFragments & (1 << fragmentNumber))) {
  101. // We have other fragments and maybe the head, so add this one and check
  102. rq->frags[fragmentNumber - 1] = fragment;
  103. rq->totalFragments = totalFragments;
  104. if (Utils::countBits(rq->haveFragments |= (1 << fragmentNumber)) == totalFragments) {
  105. // We have all fragments -- assemble and process full Packet
  106. for(unsigned int f=1;f<totalFragments;++f)
  107. rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength());
  108. if (rq->frag0.tryDecode(RR,tPtr)) {
  109. rq->timestamp = 0; // packet decoded, free entry
  110. } else {
  111. rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something
  112. }
  113. }
  114. } // else this is a duplicate fragment, ignore
  115. }
  116. }
  117. // --------------------------------------------------------------------
  118. } else if (len >= ZT_PROTO_MIN_PACKET_LENGTH) { // min length check is important!
  119. // Handle packet head -------------------------------------------------
  120. const Address destination(reinterpret_cast<const uint8_t *>(data) + 8,ZT_ADDRESS_LENGTH);
  121. const Address source(reinterpret_cast<const uint8_t *>(data) + 13,ZT_ADDRESS_LENGTH);
  122. if (source == RR->identity.address())
  123. return;
  124. if (destination != RR->identity.address()) {
  125. Packet packet(data,len);
  126. if (packet.hops() < ZT_RELAY_MAX_HOPS) {
  127. packet.incrementHops();
  128. SharedPtr<Peer> relayTo = RR->topology->get(destination);
  129. if ((relayTo)&&(relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,false))) {
  130. if ((source != RR->identity.address())&&(_shouldUnite(now,source,destination))) {
  131. const SharedPtr<Peer> sourcePeer(RR->topology->get(source));
  132. if (sourcePeer)
  133. relayTo->introduce(tPtr,now,sourcePeer);
  134. }
  135. } else {
  136. relayTo = RR->topology->findRelayTo(now,destination);
  137. if ((relayTo)&&(relayTo->address() != source)) {
  138. if (relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,true)) {
  139. const SharedPtr<Peer> sourcePeer(RR->topology->get(source));
  140. if (sourcePeer)
  141. relayTo->introduce(tPtr,now,sourcePeer);
  142. }
  143. }
  144. }
  145. }
  146. } else if ((reinterpret_cast<const uint8_t *>(data)[ZT_PACKET_IDX_FLAGS] & ZT_PROTO_FLAG_FRAGMENTED) != 0) {
  147. // Packet is the head of a fragmented packet series
  148. const uint64_t packetId = (
  149. (((uint64_t)reinterpret_cast<const uint8_t *>(data)[0]) << 56) |
  150. (((uint64_t)reinterpret_cast<const uint8_t *>(data)[1]) << 48) |
  151. (((uint64_t)reinterpret_cast<const uint8_t *>(data)[2]) << 40) |
  152. (((uint64_t)reinterpret_cast<const uint8_t *>(data)[3]) << 32) |
  153. (((uint64_t)reinterpret_cast<const uint8_t *>(data)[4]) << 24) |
  154. (((uint64_t)reinterpret_cast<const uint8_t *>(data)[5]) << 16) |
  155. (((uint64_t)reinterpret_cast<const uint8_t *>(data)[6]) << 8) |
  156. ((uint64_t)reinterpret_cast<const uint8_t *>(data)[7])
  157. );
  158. RXQueueEntry *const rq = _findRXQueueEntry(packetId);
  159. Mutex::Lock rql(rq->lock);
  160. if (rq->packetId != packetId) {
  161. // If we have no other fragments yet, create an entry and save the head
  162. rq->timestamp = now;
  163. rq->packetId = packetId;
  164. rq->frag0.init(data,len,path,now);
  165. rq->totalFragments = 0;
  166. rq->haveFragments = 1;
  167. rq->complete = false;
  168. } else if (!(rq->haveFragments & 1)) {
  169. // If we have other fragments but no head, see if we are complete with the head
  170. if ((rq->totalFragments > 1)&&(Utils::countBits(rq->haveFragments |= 1) == rq->totalFragments)) {
  171. // We have all fragments -- assemble and process full Packet
  172. rq->frag0.init(data,len,path,now);
  173. for(unsigned int f=1;f<rq->totalFragments;++f)
  174. rq->frag0.append(rq->frags[f - 1].payload(),rq->frags[f - 1].payloadLength());
  175. if (rq->frag0.tryDecode(RR,tPtr)) {
  176. rq->timestamp = 0; // packet decoded, free entry
  177. } else {
  178. rq->complete = true; // set complete flag but leave entry since it probably needs WHOIS or something
  179. }
  180. } else {
  181. // Still waiting on more fragments, but keep the head
  182. rq->frag0.init(data,len,path,now);
  183. }
  184. } // else this is a duplicate head, ignore
  185. } else {
  186. // Packet is unfragmented, so just process it
  187. IncomingPacket packet(data,len,path,now);
  188. if (!packet.tryDecode(RR,tPtr)) {
  189. RXQueueEntry *const rq = _nextRXQueueEntry();
  190. Mutex::Lock rql(rq->lock);
  191. rq->timestamp = now;
  192. rq->packetId = packet.packetId();
  193. rq->frag0 = packet;
  194. rq->totalFragments = 1;
  195. rq->haveFragments = 1;
  196. rq->complete = true;
  197. }
  198. }
  199. // --------------------------------------------------------------------
  200. }
  201. }
  202. } catch ( ... ) {} // sanity check, should be caught elsewhere
  203. }
  204. void Switch::onLocalEthernet(void *tPtr,const SharedPtr<Network> &network,const MAC &from,const MAC &to,unsigned int etherType,unsigned int vlanId,const void *data,unsigned int len)
  205. {
  206. if (!network->hasConfig())
  207. return;
  208. // Check if this packet is from someone other than the tap -- i.e. bridged in
  209. bool fromBridged;
  210. if ((fromBridged = (from != network->mac()))) {
  211. if (!network->config().permitsBridging(RR->identity.address())) {
  212. RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"not a bridge");
  213. return;
  214. }
  215. }
  216. uint8_t qosBucket = ZT_QOS_DEFAULT_BUCKET;
  217. if (to.isMulticast()) {
  218. MulticastGroup multicastGroup(to,0);
  219. if (to.isBroadcast()) {
  220. if ( (etherType == ZT_ETHERTYPE_ARP) && (len >= 28) && ((((const uint8_t *)data)[2] == 0x08)&&(((const uint8_t *)data)[3] == 0x00)&&(((const uint8_t *)data)[4] == 6)&&(((const uint8_t *)data)[5] == 4)&&(((const uint8_t *)data)[7] == 0x01)) ) {
  221. /* IPv4 ARP is one of the few special cases that we impose upon what is
  222. * otherwise a straightforward Ethernet switch emulation. Vanilla ARP
  223. * is dumb old broadcast and simply doesn't scale. ZeroTier multicast
  224. * groups have an additional field called ADI (additional distinguishing
  225. * information) which was added specifically for ARP though it could
  226. * be used for other things too. We then take ARP broadcasts and turn
  227. * them into multicasts by stuffing the IP address being queried into
  228. * the 32-bit ADI field. In practice this uses our multicast pub/sub
  229. * system to implement a kind of extended/distributed ARP table. */
  230. multicastGroup = MulticastGroup::deriveMulticastGroupForAddressResolution(InetAddress(((const unsigned char *)data) + 24,4,0));
  231. } else if (!network->config().enableBroadcast()) {
  232. // Don't transmit broadcasts if this network doesn't want them
  233. RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"broadcast disabled");
  234. return;
  235. }
  236. } else if ((etherType == ZT_ETHERTYPE_IPV6)&&(len >= (40 + 8 + 16))) {
  237. // IPv6 NDP emulation for certain very special patterns of private IPv6 addresses -- if enabled
  238. if ((network->config().ndpEmulation())&&(reinterpret_cast<const uint8_t *>(data)[6] == 0x3a)&&(reinterpret_cast<const uint8_t *>(data)[40] == 0x87)) { // ICMPv6 neighbor solicitation
  239. Address v6EmbeddedAddress;
  240. const uint8_t *const pkt6 = reinterpret_cast<const uint8_t *>(data) + 40 + 8;
  241. const uint8_t *my6 = (const uint8_t *)0;
  242. // ZT-RFC4193 address: fdNN:NNNN:NNNN:NNNN:NN99:93DD:DDDD:DDDD / 88 (one /128 per actual host)
  243. // ZT-6PLANE address: fcXX:XXXX:XXDD:DDDD:DDDD:####:####:#### / 40 (one /80 per actual host)
  244. // (XX - lower 32 bits of network ID XORed with higher 32 bits)
  245. // For these to work, we must have a ZT-managed address assigned in one of the
  246. // above formats, and the query must match its prefix.
  247. for(unsigned int sipk=0;sipk<network->config().staticIpCount;++sipk) {
  248. const InetAddress *const sip = &(network->config().staticIps[sipk]);
  249. if (sip->ss_family == AF_INET6) {
  250. my6 = reinterpret_cast<const uint8_t *>(reinterpret_cast<const struct sockaddr_in6 *>(&(*sip))->sin6_addr.s6_addr);
  251. const unsigned int sipNetmaskBits = Utils::ntoh((uint16_t)reinterpret_cast<const struct sockaddr_in6 *>(&(*sip))->sin6_port);
  252. if ((sipNetmaskBits == 88)&&(my6[0] == 0xfd)&&(my6[9] == 0x99)&&(my6[10] == 0x93)) { // ZT-RFC4193 /88 ???
  253. unsigned int ptr = 0;
  254. while (ptr != 11) {
  255. if (pkt6[ptr] != my6[ptr])
  256. break;
  257. ++ptr;
  258. }
  259. if (ptr == 11) { // prefix match!
  260. v6EmbeddedAddress.setTo(pkt6 + ptr,5);
  261. break;
  262. }
  263. } else if (sipNetmaskBits == 40) { // ZT-6PLANE /40 ???
  264. const uint32_t nwid32 = (uint32_t)((network->id() ^ (network->id() >> 32)) & 0xffffffff);
  265. if ( (my6[0] == 0xfc) && (my6[1] == (uint8_t)((nwid32 >> 24) & 0xff)) && (my6[2] == (uint8_t)((nwid32 >> 16) & 0xff)) && (my6[3] == (uint8_t)((nwid32 >> 8) & 0xff)) && (my6[4] == (uint8_t)(nwid32 & 0xff))) {
  266. unsigned int ptr = 0;
  267. while (ptr != 5) {
  268. if (pkt6[ptr] != my6[ptr])
  269. break;
  270. ++ptr;
  271. }
  272. if (ptr == 5) { // prefix match!
  273. v6EmbeddedAddress.setTo(pkt6 + ptr,5);
  274. break;
  275. }
  276. }
  277. }
  278. }
  279. }
  280. if ((v6EmbeddedAddress)&&(v6EmbeddedAddress != RR->identity.address())) {
  281. const MAC peerMac(v6EmbeddedAddress,network->id());
  282. uint8_t adv[72];
  283. adv[0] = 0x60; adv[1] = 0x00; adv[2] = 0x00; adv[3] = 0x00;
  284. adv[4] = 0x00; adv[5] = 0x20;
  285. adv[6] = 0x3a; adv[7] = 0xff;
  286. for(int i=0;i<16;++i) adv[8 + i] = pkt6[i];
  287. for(int i=0;i<16;++i) adv[24 + i] = my6[i];
  288. adv[40] = 0x88; adv[41] = 0x00;
  289. adv[42] = 0x00; adv[43] = 0x00; // future home of checksum
  290. adv[44] = 0x60; adv[45] = 0x00; adv[46] = 0x00; adv[47] = 0x00;
  291. for(int i=0;i<16;++i) adv[48 + i] = pkt6[i];
  292. adv[64] = 0x02; adv[65] = 0x01;
  293. adv[66] = peerMac[0]; adv[67] = peerMac[1]; adv[68] = peerMac[2]; adv[69] = peerMac[3]; adv[70] = peerMac[4]; adv[71] = peerMac[5];
  294. uint16_t pseudo_[36];
  295. uint8_t *const pseudo = reinterpret_cast<uint8_t *>(pseudo_);
  296. for(int i=0;i<32;++i) pseudo[i] = adv[8 + i];
  297. pseudo[32] = 0x00; pseudo[33] = 0x00; pseudo[34] = 0x00; pseudo[35] = 0x20;
  298. pseudo[36] = 0x00; pseudo[37] = 0x00; pseudo[38] = 0x00; pseudo[39] = 0x3a;
  299. for(int i=0;i<32;++i) pseudo[40 + i] = adv[40 + i];
  300. uint32_t checksum = 0;
  301. for(int i=0;i<36;++i) checksum += Utils::hton(pseudo_[i]);
  302. while ((checksum >> 16)) checksum = (checksum & 0xffff) + (checksum >> 16);
  303. checksum = ~checksum;
  304. adv[42] = (checksum >> 8) & 0xff;
  305. adv[43] = checksum & 0xff;
  306. RR->node->putFrame(tPtr,network->id(),network->userPtr(),peerMac,from,ZT_ETHERTYPE_IPV6,0,adv,72);
  307. return; // NDP emulation done. We have forged a "fake" reply, so no need to send actual NDP query.
  308. } // else no NDP emulation
  309. } // else no NDP emulation
  310. }
  311. // Check this after NDP emulation, since that has to be allowed in exactly this case
  312. if (network->config().multicastLimit == 0) {
  313. RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"multicast disabled");
  314. return;
  315. }
  316. /* Learn multicast groups for bridged-in hosts.
  317. * Note that some OSes, most notably Linux, do this for you by learning
  318. * multicast addresses on bridge interfaces and subscribing each slave.
  319. * But in that case this does no harm, as the sets are just merged. */
  320. if (fromBridged)
  321. network->learnBridgedMulticastGroup(tPtr,multicastGroup,RR->node->now());
  322. // First pass sets noTee to false, but noTee is set to true in OutboundMulticast to prevent duplicates.
  323. if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
  324. RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
  325. return;
  326. }
  327. RR->mc->send(
  328. tPtr,
  329. RR->node->now(),
  330. network,
  331. Address(),
  332. multicastGroup,
  333. (fromBridged) ? from : MAC(),
  334. etherType,
  335. data,
  336. len);
  337. } else if (to == network->mac()) {
  338. // Destination is this node, so just reinject it
  339. RR->node->putFrame(tPtr,network->id(),network->userPtr(),from,to,etherType,vlanId,data,len);
  340. } else if (to[0] == MAC::firstOctetForNetwork(network->id())) {
  341. // Destination is another ZeroTier peer on the same network
  342. Address toZT(to.toAddress(network->id())); // since in-network MACs are derived from addresses and network IDs, we can reverse this
  343. SharedPtr<Peer> toPeer(RR->topology->get(toZT));
  344. if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),toZT,from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
  345. RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
  346. return;
  347. }
  348. network->pushCredentialsIfNeeded(tPtr,toZT,RR->node->now());
  349. if (fromBridged) {
  350. Packet outp(toZT,RR->identity.address(),Packet::VERB_EXT_FRAME);
  351. outp.append(network->id());
  352. outp.append((unsigned char)0x00);
  353. to.appendTo(outp);
  354. from.appendTo(outp);
  355. outp.append((uint16_t)etherType);
  356. outp.append(data,len);
  357. if (!network->config().disableCompression())
  358. outp.compress();
  359. aqm_enqueue(tPtr,network,outp,true,qosBucket);
  360. } else {
  361. Packet outp(toZT,RR->identity.address(),Packet::VERB_FRAME);
  362. outp.append(network->id());
  363. outp.append((uint16_t)etherType);
  364. outp.append(data,len);
  365. if (!network->config().disableCompression())
  366. outp.compress();
  367. aqm_enqueue(tPtr,network,outp,true,qosBucket);
  368. }
  369. } else {
  370. // Destination is bridged behind a remote peer
  371. // We filter with a NULL destination ZeroTier address first. Filtrations
  372. // for each ZT destination are also done below. This is the same rationale
  373. // and design as for multicast.
  374. if (!network->filterOutgoingPacket(tPtr,false,RR->identity.address(),Address(),from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
  375. RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked");
  376. return;
  377. }
  378. Address bridges[ZT_MAX_BRIDGE_SPAM];
  379. unsigned int numBridges = 0;
  380. /* Create an array of up to ZT_MAX_BRIDGE_SPAM recipients for this bridged frame. */
  381. bridges[0] = network->findBridgeTo(to);
  382. std::vector<Address> activeBridges(network->config().activeBridges());
  383. if ((bridges[0])&&(bridges[0] != RR->identity.address())&&(network->config().permitsBridging(bridges[0]))) {
  384. /* We have a known bridge route for this MAC, send it there. */
  385. ++numBridges;
  386. } else if (!activeBridges.empty()) {
  387. /* If there is no known route, spam to up to ZT_MAX_BRIDGE_SPAM active
  388. * bridges. If someone responds, we'll learn the route. */
  389. std::vector<Address>::const_iterator ab(activeBridges.begin());
  390. if (activeBridges.size() <= ZT_MAX_BRIDGE_SPAM) {
  391. // If there are <= ZT_MAX_BRIDGE_SPAM active bridges, spam them all
  392. while (ab != activeBridges.end()) {
  393. bridges[numBridges++] = *ab;
  394. ++ab;
  395. }
  396. } else {
  397. // Otherwise pick a random set of them
  398. while (numBridges < ZT_MAX_BRIDGE_SPAM) {
  399. if (ab == activeBridges.end())
  400. ab = activeBridges.begin();
  401. if (((unsigned long)Utils::random() % (unsigned long)activeBridges.size()) == 0) {
  402. bridges[numBridges++] = *ab;
  403. ++ab;
  404. } else ++ab;
  405. }
  406. }
  407. }
  408. for(unsigned int b=0;b<numBridges;++b) {
  409. if (network->filterOutgoingPacket(tPtr,true,RR->identity.address(),bridges[b],from,to,(const uint8_t *)data,len,etherType,vlanId,qosBucket)) {
  410. Packet outp(bridges[b],RR->identity.address(),Packet::VERB_EXT_FRAME);
  411. outp.append(network->id());
  412. outp.append((uint8_t)0x00);
  413. to.appendTo(outp);
  414. from.appendTo(outp);
  415. outp.append((uint16_t)etherType);
  416. outp.append(data,len);
  417. if (!network->config().disableCompression())
  418. outp.compress();
  419. aqm_enqueue(tPtr,network,outp,true,qosBucket);
  420. } else {
  421. RR->t->outgoingNetworkFrameDropped(tPtr,network,from,to,etherType,vlanId,len,"filter blocked (bridge replication)");
  422. }
  423. }
  424. }
  425. }
  426. void Switch::aqm_enqueue(void *tPtr, const SharedPtr<Network> &network, Packet &packet,bool encrypt,int qosBucket)
  427. {
  428. if(!network->qosEnabled()) {
  429. send(tPtr, packet, encrypt);
  430. return;
  431. }
  432. NetworkQoSControlBlock *nqcb = _netQueueControlBlock[network->id()];
  433. if (!nqcb) {
  434. // DEBUG_INFO("creating network QoS control block (NQCB) for network %llx", network->id());
  435. nqcb = new NetworkQoSControlBlock();
  436. _netQueueControlBlock[network->id()] = nqcb;
  437. // Initialize ZT_QOS_NUM_BUCKETS queues and place them in the INACTIVE list
  438. // These queues will be shuffled between the new/old/inactive lists by the enqueue/dequeue algorithm
  439. for (int i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
  440. nqcb->inactiveQueues.push_back(new ManagedQueue(i));
  441. }
  442. }
  443. if (packet.verb() != Packet::VERB_FRAME && packet.verb() != Packet::VERB_EXT_FRAME) {
  444. // DEBUG_INFO("skipping, no QoS for this packet, verb=%x", packet.verb());
  445. // just send packet normally, no QoS for ZT protocol traffic
  446. send(tPtr, packet, encrypt);
  447. }
  448. _aqm_m.lock();
  449. // Enqueue packet and move queue to appropriate list
  450. const Address dest(packet.destination());
  451. TXQueueEntry *txEntry = new TXQueueEntry(dest,RR->node->now(),packet,encrypt);
  452. ManagedQueue *selectedQueue = nullptr;
  453. for (size_t i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
  454. if (i < nqcb->oldQueues.size()) { // search old queues first (I think this is best since old would imply most recent usage of the queue)
  455. if (nqcb->oldQueues[i]->id == qosBucket) {
  456. selectedQueue = nqcb->oldQueues[i];
  457. }
  458. } if (i < nqcb->newQueues.size()) { // search new queues (this would imply not often-used queues)
  459. if (nqcb->newQueues[i]->id == qosBucket) {
  460. selectedQueue = nqcb->newQueues[i];
  461. }
  462. } if (i < nqcb->inactiveQueues.size()) { // search inactive queues
  463. if (nqcb->inactiveQueues[i]->id == qosBucket) {
  464. selectedQueue = nqcb->inactiveQueues[i];
  465. // move queue to end of NEW queue list
  466. selectedQueue->byteCredit = ZT_QOS_QUANTUM;
  467. // DEBUG_INFO("moving q=%p from INACTIVE to NEW list", selectedQueue);
  468. nqcb->newQueues.push_back(selectedQueue);
  469. nqcb->inactiveQueues.erase(nqcb->inactiveQueues.begin() + i);
  470. }
  471. }
  472. }
  473. if (!selectedQueue) {
  474. return;
  475. }
  476. selectedQueue->q.push_back(txEntry);
  477. selectedQueue->byteLength+=txEntry->packet.payloadLength();
  478. nqcb->_currEnqueuedPackets++;
  479. // DEBUG_INFO("nq=%2lu, oq=%2lu, iq=%2lu, nqcb.size()=%3d, bucket=%2d, q=%p", nqcb->newQueues.size(), nqcb->oldQueues.size(), nqcb->inactiveQueues.size(), nqcb->_currEnqueuedPackets, qosBucket, selectedQueue);
  480. // Drop a packet if necessary
  481. ManagedQueue *selectedQueueToDropFrom = nullptr;
  482. if (nqcb->_currEnqueuedPackets > ZT_QOS_MAX_ENQUEUED_PACKETS)
  483. {
  484. // DEBUG_INFO("too many enqueued packets (%d), finding packet to drop", nqcb->_currEnqueuedPackets);
  485. int maxQueueLength = 0;
  486. for (size_t i=0; i<ZT_QOS_NUM_BUCKETS; i++) {
  487. if (i < nqcb->oldQueues.size()) {
  488. if (nqcb->oldQueues[i]->byteLength > maxQueueLength) {
  489. maxQueueLength = nqcb->oldQueues[i]->byteLength;
  490. selectedQueueToDropFrom = nqcb->oldQueues[i];
  491. }
  492. } if (i < nqcb->newQueues.size()) {
  493. if (nqcb->newQueues[i]->byteLength > maxQueueLength) {
  494. maxQueueLength = nqcb->newQueues[i]->byteLength;
  495. selectedQueueToDropFrom = nqcb->newQueues[i];
  496. }
  497. } if (i < nqcb->inactiveQueues.size()) {
  498. if (nqcb->inactiveQueues[i]->byteLength > maxQueueLength) {
  499. maxQueueLength = nqcb->inactiveQueues[i]->byteLength;
  500. selectedQueueToDropFrom = nqcb->inactiveQueues[i];
  501. }
  502. }
  503. }
  504. if (selectedQueueToDropFrom) {
  505. // DEBUG_INFO("dropping packet from head of largest queue (%d payload bytes)", maxQueueLength);
  506. int sizeOfDroppedPacket = selectedQueueToDropFrom->q.front()->packet.payloadLength();
  507. delete selectedQueueToDropFrom->q.front();
  508. selectedQueueToDropFrom->q.pop_front();
  509. selectedQueueToDropFrom->byteLength-=sizeOfDroppedPacket;
  510. nqcb->_currEnqueuedPackets--;
  511. }
  512. }
  513. _aqm_m.unlock();
  514. aqm_dequeue(tPtr);
  515. }
  516. uint64_t Switch::control_law(uint64_t t, int count)
  517. {
  518. return (uint64_t)(t + ZT_QOS_INTERVAL / sqrt(count));
  519. }
  520. Switch::dqr Switch::dodequeue(ManagedQueue *q, uint64_t now)
  521. {
  522. dqr r;
  523. r.ok_to_drop = false;
  524. r.p = q->q.front();
  525. if (r.p == NULL) {
  526. q->first_above_time = 0;
  527. return r;
  528. }
  529. uint64_t sojourn_time = now - r.p->creationTime;
  530. if (sojourn_time < ZT_QOS_TARGET || q->byteLength <= ZT_DEFAULT_MTU) {
  531. // went below - stay below for at least interval
  532. q->first_above_time = 0;
  533. } else {
  534. if (q->first_above_time == 0) {
  535. // just went above from below. if still above at
  536. // first_above_time, will say it's ok to drop.
  537. q->first_above_time = now + ZT_QOS_INTERVAL;
  538. } else if (now >= q->first_above_time) {
  539. r.ok_to_drop = true;
  540. }
  541. }
  542. return r;
  543. }
  544. Switch::TXQueueEntry * Switch::CoDelDequeue(ManagedQueue *q, bool isNew, uint64_t now)
  545. {
  546. dqr r = dodequeue(q, now);
  547. if (q->dropping) {
  548. if (!r.ok_to_drop) {
  549. q->dropping = false;
  550. }
  551. while (now >= q->drop_next && q->dropping) {
  552. q->q.pop_front(); // drop
  553. r = dodequeue(q, now);
  554. if (!r.ok_to_drop) {
  555. // leave dropping state
  556. q->dropping = false;
  557. } else {
  558. ++(q->count);
  559. // schedule the next drop.
  560. q->drop_next = control_law(q->drop_next, q->count);
  561. }
  562. }
  563. } else if (r.ok_to_drop) {
  564. q->q.pop_front(); // drop
  565. r = dodequeue(q, now);
  566. q->dropping = true;
  567. q->count = (q->count > 2 && now - q->drop_next < 8*ZT_QOS_INTERVAL)?
  568. q->count - 2 : 1;
  569. q->drop_next = control_law(now, q->count);
  570. }
  571. return r.p;
  572. }
  573. void Switch::aqm_dequeue(void *tPtr)
  574. {
  575. // Cycle through network-specific QoS control blocks
  576. for(std::map<uint64_t,NetworkQoSControlBlock*>::iterator nqcb(_netQueueControlBlock.begin());nqcb!=_netQueueControlBlock.end();) {
  577. if (!(*nqcb).second->_currEnqueuedPackets) {
  578. return;
  579. }
  580. uint64_t now = RR->node->now();
  581. TXQueueEntry *entryToEmit = nullptr;
  582. std::vector<ManagedQueue*> *currQueues = &((*nqcb).second->newQueues);
  583. std::vector<ManagedQueue*> *oldQueues = &((*nqcb).second->oldQueues);
  584. std::vector<ManagedQueue*> *inactiveQueues = &((*nqcb).second->inactiveQueues);
  585. _aqm_m.lock();
  586. // Attempt dequeue from queues in NEW list
  587. bool examiningNewQueues = true;
  588. while (currQueues->size()) {
  589. ManagedQueue *queueAtFrontOfList = currQueues->front();
  590. if (queueAtFrontOfList->byteCredit < 0) {
  591. queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
  592. // Move to list of OLD queues
  593. // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
  594. oldQueues->push_back(queueAtFrontOfList);
  595. currQueues->erase(currQueues->begin());
  596. } else {
  597. entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
  598. if (!entryToEmit) {
  599. // Move to end of list of OLD queues
  600. // DEBUG_INFO("moving q=%p from NEW to OLD list", queueAtFrontOfList);
  601. oldQueues->push_back(queueAtFrontOfList);
  602. currQueues->erase(currQueues->begin());
  603. }
  604. else {
  605. int len = entryToEmit->packet.payloadLength();
  606. queueAtFrontOfList->byteLength -= len;
  607. queueAtFrontOfList->byteCredit -= len;
  608. // Send the packet!
  609. queueAtFrontOfList->q.pop_front();
  610. send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
  611. (*nqcb).second->_currEnqueuedPackets--;
  612. }
  613. if (queueAtFrontOfList) {
  614. //DEBUG_INFO("dequeuing from q=%p, len=%lu in NEW list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
  615. }
  616. break;
  617. }
  618. }
  619. // Attempt dequeue from queues in OLD list
  620. examiningNewQueues = false;
  621. currQueues = &((*nqcb).second->oldQueues);
  622. while (currQueues->size()) {
  623. ManagedQueue *queueAtFrontOfList = currQueues->front();
  624. if (queueAtFrontOfList->byteCredit < 0) {
  625. queueAtFrontOfList->byteCredit += ZT_QOS_QUANTUM;
  626. oldQueues->push_back(queueAtFrontOfList);
  627. currQueues->erase(currQueues->begin());
  628. } else {
  629. entryToEmit = CoDelDequeue(queueAtFrontOfList, examiningNewQueues, now);
  630. if (!entryToEmit) {
  631. //DEBUG_INFO("moving q=%p from OLD to INACTIVE list", queueAtFrontOfList);
  632. // Move to inactive list of queues
  633. inactiveQueues->push_back(queueAtFrontOfList);
  634. currQueues->erase(currQueues->begin());
  635. }
  636. else {
  637. int len = entryToEmit->packet.payloadLength();
  638. queueAtFrontOfList->byteLength -= len;
  639. queueAtFrontOfList->byteCredit -= len;
  640. queueAtFrontOfList->q.pop_front();
  641. send(tPtr, entryToEmit->packet, entryToEmit->encrypt);
  642. (*nqcb).second->_currEnqueuedPackets--;
  643. }
  644. if (queueAtFrontOfList) {
  645. //DEBUG_INFO("dequeuing from q=%p, len=%lu in OLD list (byteCredit=%d)", queueAtFrontOfList, queueAtFrontOfList->q.size(), queueAtFrontOfList->byteCredit);
  646. }
  647. break;
  648. }
  649. }
  650. nqcb++;
  651. _aqm_m.unlock();
  652. }
  653. }
  654. void Switch::removeNetworkQoSControlBlock(uint64_t nwid)
  655. {
  656. NetworkQoSControlBlock *nq = _netQueueControlBlock[nwid];
  657. if (nq) {
  658. _netQueueControlBlock.erase(nwid);
  659. delete nq;
  660. nq = NULL;
  661. }
  662. }
  663. void Switch::send(void *tPtr,Packet &packet,bool encrypt)
  664. {
  665. const Address dest(packet.destination());
  666. if (dest == RR->identity.address())
  667. return;
  668. if (!_trySend(tPtr,packet,encrypt)) {
  669. {
  670. Mutex::Lock _l(_txQueue_m);
  671. if (_txQueue.size() >= ZT_TX_QUEUE_SIZE) {
  672. _txQueue.pop_front();
  673. }
  674. _txQueue.push_back(TXQueueEntry(dest,RR->node->now(),packet,encrypt));
  675. }
  676. if (!RR->topology->get(dest))
  677. requestWhois(tPtr,RR->node->now(),dest);
  678. }
  679. }
  680. void Switch::requestWhois(void *tPtr,const int64_t now,const Address &addr)
  681. {
  682. if (addr == RR->identity.address())
  683. return;
  684. {
  685. Mutex::Lock _l(_lastSentWhoisRequest_m);
  686. int64_t &last = _lastSentWhoisRequest[addr];
  687. if ((now - last) < ZT_WHOIS_RETRY_DELAY)
  688. return;
  689. else last = now;
  690. }
  691. const SharedPtr<Peer> root(RR->topology->root(now));
  692. if (root) {
  693. Packet outp(root->address(),RR->identity.address(),Packet::VERB_WHOIS);
  694. addr.appendTo(outp);
  695. RR->node->expectReplyTo(outp.packetId());
  696. root->sendDirect(tPtr,outp.data(),outp.size(),now,true);
  697. }
  698. }
  699. void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
  700. {
  701. {
  702. Mutex::Lock _l(_lastSentWhoisRequest_m);
  703. _lastSentWhoisRequest.erase(peer->address());
  704. }
  705. const int64_t now = RR->node->now();
  706. for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
  707. RXQueueEntry *const rq = &(_rxQueue[ptr]);
  708. Mutex::Lock rql(rq->lock);
  709. if ((rq->timestamp)&&(rq->complete)) {
  710. if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT))
  711. rq->timestamp = 0;
  712. }
  713. }
  714. {
  715. Mutex::Lock _l(_txQueue_m);
  716. for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
  717. if (txi->dest == peer->address()) {
  718. if (_trySend(tPtr,txi->packet,txi->encrypt)) {
  719. _txQueue.erase(txi++);
  720. } else {
  721. ++txi;
  722. }
  723. } else {
  724. ++txi;
  725. }
  726. }
  727. }
  728. }
  729. unsigned long Switch::doTimerTasks(void *tPtr,int64_t now)
  730. {
  731. const uint64_t timeSinceLastCheck = now - _lastCheckedQueues;
  732. if (timeSinceLastCheck < ZT_WHOIS_RETRY_DELAY)
  733. return (unsigned long)(ZT_WHOIS_RETRY_DELAY - timeSinceLastCheck);
  734. _lastCheckedQueues = now;
  735. std::vector<Address> needWhois;
  736. {
  737. Mutex::Lock _l(_txQueue_m);
  738. for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
  739. if (_trySend(tPtr,txi->packet,txi->encrypt)) {
  740. _txQueue.erase(txi++);
  741. } else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
  742. _txQueue.erase(txi++);
  743. } else {
  744. if (!RR->topology->get(txi->dest))
  745. needWhois.push_back(txi->dest);
  746. ++txi;
  747. }
  748. }
  749. }
  750. for(std::vector<Address>::const_iterator i(needWhois.begin());i!=needWhois.end();++i)
  751. requestWhois(tPtr,now,*i);
  752. for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
  753. RXQueueEntry *const rq = &(_rxQueue[ptr]);
  754. Mutex::Lock rql(rq->lock);
  755. if ((rq->timestamp)&&(rq->complete)) {
  756. if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT)) {
  757. rq->timestamp = 0;
  758. } else {
  759. const Address src(rq->frag0.source());
  760. if (!RR->topology->get(src))
  761. requestWhois(tPtr,now,src);
  762. }
  763. }
  764. }
  765. {
  766. Mutex::Lock _l(_lastUniteAttempt_m);
  767. Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt);
  768. _LastUniteKey *k = (_LastUniteKey *)0;
  769. uint64_t *v = (uint64_t *)0;
  770. while (i.next(k,v)) {
  771. if ((now - *v) >= (ZT_MIN_UNITE_INTERVAL * 8))
  772. _lastUniteAttempt.erase(*k);
  773. }
  774. }
  775. {
  776. Mutex::Lock _l(_lastSentWhoisRequest_m);
  777. Hashtable< Address,int64_t >::Iterator i(_lastSentWhoisRequest);
  778. Address *a = (Address *)0;
  779. int64_t *ts = (int64_t *)0;
  780. while (i.next(a,ts)) {
  781. if ((now - *ts) > (ZT_WHOIS_RETRY_DELAY * 2))
  782. _lastSentWhoisRequest.erase(*a);
  783. }
  784. }
  785. return ZT_WHOIS_RETRY_DELAY;
  786. }
  787. bool Switch::_shouldUnite(const int64_t now,const Address &source,const Address &destination)
  788. {
  789. Mutex::Lock _l(_lastUniteAttempt_m);
  790. uint64_t &ts = _lastUniteAttempt[_LastUniteKey(source,destination)];
  791. if ((now - ts) >= ZT_MIN_UNITE_INTERVAL) {
  792. ts = now;
  793. return true;
  794. }
  795. return false;
  796. }
  797. bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
  798. {
  799. SharedPtr<Path> viaPath;
  800. const int64_t now = RR->node->now();
  801. const Address destination(packet.destination());
  802. const SharedPtr<Peer> peer(RR->topology->get(destination));
  803. if (peer) {
  804. viaPath = peer->getAppropriatePath(now,false);
  805. if (!viaPath) {
  806. if (peer->rateGateTryStaticPath(now)) {
  807. InetAddress tryAddr;
  808. bool gotPath = RR->node->externalPathLookup(tPtr,peer->address(),AF_INET6,tryAddr);
  809. if ((gotPath)&&(tryAddr)) {
  810. peer->sendHELLO(tPtr,-1,tryAddr,now);
  811. } else {
  812. gotPath = RR->node->externalPathLookup(tPtr,peer->address(),AF_INET,tryAddr);
  813. if ((gotPath)&&(tryAddr))
  814. peer->sendHELLO(tPtr,-1,tryAddr,now);
  815. }
  816. }
  817. const SharedPtr<Peer> relay(RR->topology->findRelayTo(now,destination));
  818. if (relay) {
  819. viaPath = relay->getAppropriatePath(now,true);
  820. if (!viaPath)
  821. return false;
  822. }
  823. return false;
  824. }
  825. } else {
  826. return false;
  827. }
  828. unsigned int mtu = ZT_DEFAULT_PHYSMTU;
  829. uint64_t trustedPathId = 0;
  830. RR->topology->getOutboundPathInfo(viaPath->address(),mtu,trustedPathId);
  831. unsigned int chunkSize = std::min(packet.size(),mtu);
  832. packet.setFragmented(chunkSize < packet.size());
  833. peer->recordOutgoingPacket(viaPath, packet.packetId(), packet.payloadLength(), packet.verb(), now);
  834. if (trustedPathId) {
  835. packet.setTrusted(trustedPathId);
  836. } else {
  837. packet.armor(peer->key(),encrypt);
  838. }
  839. if (viaPath->send(RR,tPtr,packet.data(),chunkSize,now)) {
  840. if (chunkSize < packet.size()) {
  841. // Too big for one packet, fragment the rest
  842. unsigned int fragStart = chunkSize;
  843. unsigned int remaining = packet.size() - chunkSize;
  844. unsigned int fragsRemaining = (remaining / (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
  845. if ((fragsRemaining * (mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH)) < remaining)
  846. ++fragsRemaining;
  847. const unsigned int totalFragments = fragsRemaining + 1;
  848. for(unsigned int fno=1;fno<totalFragments;++fno) {
  849. chunkSize = std::min(remaining,(unsigned int)(mtu - ZT_PROTO_MIN_FRAGMENT_LENGTH));
  850. Packet::Fragment frag(packet,fragStart,chunkSize,fno,totalFragments);
  851. viaPath->send(RR,tPtr,frag.data(),frag.size(),now);
  852. fragStart += chunkSize;
  853. remaining -= chunkSize;
  854. }
  855. }
  856. }
  857. return true;
  858. }
  859. } // namespace ZeroTier