Multicaster.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. /*
  2. * Copyright (c)2019 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2023-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include <algorithm>
  14. #include "Constants.hpp"
  15. #include "RuntimeEnvironment.hpp"
  16. #include "Multicaster.hpp"
  17. #include "Network.hpp"
  18. #include "Membership.hpp"
  19. #include "Topology.hpp"
  20. #include "Switch.hpp"
  21. namespace ZeroTier {
  22. Multicaster::Multicaster(const RuntimeEnvironment *renv) :
  23. RR(renv),
  24. _groups(32) {}
  25. Multicaster::~Multicaster() {}
  26. unsigned int Multicaster::send(
  27. void *tPtr,
  28. int64_t now,
  29. const SharedPtr<Network> &network,
  30. const MulticastGroup &mg,
  31. const MAC &src,
  32. unsigned int etherType,
  33. const unsigned int existingBloomMultiplier,
  34. const uint8_t existingBloom[ZT_MULTICAST_BLOOM_FILTER_SIZE_BITS / 8],
  35. const void *const data,
  36. unsigned int len)
  37. {
  38. static const unsigned int PRIMES[16] = { 3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59 }; // 2 is skipped as it's even
  39. std::vector< std::pair<int64_t,Address> > recipients;
  40. const NetworkConfig &config = network->config();
  41. if (config.multicastLimit == 0) return 0; // multicast disabled
  42. Address specialists[ZT_MAX_NETWORK_SPECIALISTS],multicastReplicators[ZT_MAX_NETWORK_SPECIALISTS];
  43. unsigned int specialistCount = 0,multicastReplicatorCount = 0,bridgeCount = 0;
  44. bool amMulticastReplicator = false;
  45. for(unsigned int i=0;i<config.specialistCount;++i) {
  46. if (RR->identity.address() == config.specialists[i]) {
  47. amMulticastReplicator |= ((config.specialists[i] & ZT_NETWORKCONFIG_SPECIALIST_TYPE_MULTICAST_REPLICATOR) != 0);
  48. } else {
  49. specialists[specialistCount++] = config.specialists[i];
  50. if ((config.specialists[i] & ZT_NETWORKCONFIG_SPECIALIST_TYPE_ACTIVE_BRIDGE) != 0) {
  51. recipients.push_back(std::pair<int64_t,Address>(0,config.specialists[i]));
  52. ++bridgeCount;
  53. } if ((config.specialists[i] & ZT_NETWORKCONFIG_SPECIALIST_TYPE_MULTICAST_REPLICATOR) != 0) {
  54. multicastReplicators[multicastReplicatorCount++] = config.specialists[i];
  55. }
  56. }
  57. }
  58. std::sort(&(specialists[0]),&(specialists[specialistCount])); // for binary search
  59. int64_t lastGather = 0;
  60. _K groupKey(network->id(),mg);
  61. {
  62. Mutex::Lock l(_groups_l);
  63. const _G *const g = _groups.get(groupKey);
  64. if (g) {
  65. lastGather = g->lastGather;
  66. recipients.reserve(recipients.size() + g->members.size());
  67. Hashtable< Address,int64_t >::Iterator mi(const_cast<_G *>(g)->members);
  68. Address *mik = nullptr;
  69. int64_t *miv = nullptr;
  70. while (mi.next(mik,miv)) {
  71. if (!std::binary_search(&(specialists[0]),&(specialists[specialistCount]),*mik))
  72. recipients.push_back(std::pair<int64_t,Address>(*miv,*mik));
  73. }
  74. }
  75. }
  76. // Sort recipients, maintaining bridges first in list
  77. std::sort(recipients.begin() + bridgeCount,recipients.end(),std::greater< std::pair<int64_t,Address> >());
  78. // Gather new recipients periodically, being more aggressive if we have none.
  79. if ((now - lastGather) > (recipients.empty() ? 5000 : ZT_MULTICAST_GATHER_PERIOD)) {
  80. {
  81. Mutex::Lock l(_groups_l);
  82. _groups[groupKey].lastGather = now;
  83. }
  84. Packet outp(network->controller(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
  85. outp.append(network->id());
  86. outp.append((uint8_t)0);
  87. mg.mac().appendTo(outp);
  88. outp.append(mg.adi());
  89. outp.append((uint32_t)0xffffffff);
  90. RR->sw->send(tPtr,outp,true);
  91. for(unsigned int i=0;i<specialistCount;++i) {
  92. outp.newInitializationVector();
  93. outp.setDestination(specialists[i]);
  94. RR->sw->send(tPtr,outp,true);
  95. }
  96. // LEGACY: roots may know about older versions' multicast subscriptions but
  97. // the root's role here is being phased out.
  98. SharedPtr<Peer> root(RR->topology->root(now));
  99. if (root) {
  100. outp.newInitializationVector();
  101. outp.setDestination(root->address());
  102. outp.armor(root->key(),true);
  103. root->sendDirect(tPtr,outp.data(),outp.size(),now,true);
  104. }
  105. }
  106. if (recipients.empty())
  107. return 0;
  108. unsigned int sentCount = 0;
  109. uint64_t bloomFilter[ZT_MULTICAST_BLOOM_FILTER_SIZE_BITS / 64];
  110. unsigned int bloomMultiplier;
  111. if (existingBloom) {
  112. memcpy(bloomFilter,existingBloom,sizeof(bloomFilter));
  113. bloomMultiplier = existingBloomMultiplier;
  114. } else {
  115. memset(bloomFilter,0,sizeof(bloomFilter));
  116. bloomMultiplier = 1;
  117. // Iteratively search for a bloom multiplier that results in no collisions
  118. // among known recipients. Usually the first iteration is good unless
  119. // the recipient set is quite large.
  120. if (recipients.size() > 1) {
  121. unsigned long bestMultColl = 0xffffffff;
  122. for(int k=0;k<16;++k) { // 16 == arbitrary limit on iterations for this search, also must be <= size of PRIMES
  123. const unsigned int mult = PRIMES[k];
  124. unsigned long coll = 0;
  125. for(std::vector< std::pair<int64_t,Address> >::const_iterator r(recipients.begin());r!=recipients.end();++r) {
  126. const unsigned int bfi = mult * (unsigned int)r->second.toInt();
  127. const unsigned int byte = (bfi >> 3) % sizeof(bloomFilter);
  128. const uint8_t bit = 1 << (bfi & 7);
  129. coll += ((((uint8_t *)bloomFilter)[byte] & bit) != 0);
  130. ((uint8_t *)bloomFilter)[byte] |= bit;
  131. }
  132. memset(bloomFilter,0,sizeof(bloomFilter));
  133. if (coll <= bestMultColl) {
  134. bloomMultiplier = mult;
  135. if (coll == 0) // perfect score, no need to continue searching
  136. break;
  137. bestMultColl = coll;
  138. }
  139. }
  140. }
  141. }
  142. // See if there is a multicast replicator, trying to pick the fastest/best one.
  143. Address bestReplicator;
  144. if (multicastReplicatorCount > 0) {
  145. unsigned int bestReplicatorLatency = 0xffff;
  146. for(unsigned int i=0;i<multicastReplicatorCount;++i) {
  147. const unsigned int bfi = bloomMultiplier * (unsigned int)multicastReplicators[i].toInt();
  148. if ((((uint8_t *)bloomFilter)[(bfi >> 3) % sizeof(bloomFilter)] & (1 << (bfi & 7))) == 0) {
  149. SharedPtr<Peer> peer(RR->topology->get(multicastReplicators[i]));
  150. if (peer) {
  151. const unsigned int lat = peer->latency(now);
  152. if (lat <= bestReplicatorLatency) {
  153. bestReplicator = peer->address();
  154. bestReplicatorLatency = lat;
  155. }
  156. } else if (!bestReplicator) {
  157. bestReplicator = multicastReplicators[i];
  158. }
  159. }
  160. }
  161. }
  162. // If this is a multicast replicator, aggressively replicate. Multicast
  163. // replicators are not subject to send count limits.
  164. if (amMulticastReplicator) {
  165. std::vector< std::pair< int,Address > > byLatency;
  166. for(std::vector< std::pair<int64_t,Address> >::const_iterator r(recipients.begin());r!=recipients.end();++r) {
  167. const unsigned int bfi = bloomMultiplier * (unsigned int)r->second.toInt();
  168. if ((((uint8_t *)bloomFilter)[(bfi >> 3) % sizeof(bloomFilter)] & (1 << (bfi & 7))) == 0) {
  169. SharedPtr<Peer> peer(RR->topology->get(r->second));
  170. byLatency.push_back(std::pair< int,Address >((peer) ? (int)peer->latency(now) : 0xffff,r->second));
  171. }
  172. }
  173. std::sort(byLatency.begin(),byLatency.end());
  174. unsigned long cnt = byLatency.size();
  175. if (bestReplicator)
  176. cnt /= 2; // send to only the best half of the latency-sorted population if there are more replicators
  177. for(unsigned long i=0;i<cnt;++i) {
  178. const unsigned int bfi = bloomMultiplier * (unsigned int)byLatency[i].second.toInt();
  179. ((uint8_t *)bloomFilter)[(bfi >> 3) % sizeof(bloomFilter)] |= 1 << (bfi & 7);
  180. Packet outp(byLatency[i].second,RR->identity.address(),Packet::VERB_MULTICAST_FRAME);
  181. outp.append(network->id());
  182. outp.append((uint8_t)0x04);
  183. src.appendTo(outp);
  184. mg.mac().appendTo(outp);
  185. outp.append(mg.adi());
  186. outp.append((uint16_t)etherType);
  187. outp.append(data,len);
  188. outp.compress();
  189. RR->sw->send(tPtr,outp,true);
  190. ++sentCount;
  191. }
  192. }
  193. // Forward to the next multicast replicator, if any.
  194. if (bestReplicator) {
  195. const unsigned int bfi = bloomMultiplier * (unsigned int)bestReplicator.toInt();
  196. ((uint8_t *)bloomFilter)[(bfi >> 3) % sizeof(bloomFilter)] |= 1 << (bfi & 7);
  197. Packet outp(bestReplicator,RR->identity.address(),Packet::VERB_MULTICAST_FRAME);
  198. outp.append((uint8_t)(0x04 | 0x08));
  199. RR->identity.address().appendTo(outp);
  200. outp.append((uint16_t)bloomMultiplier);
  201. outp.append((uint16_t)sizeof(bloomFilter));
  202. outp.append(((uint8_t *)bloomFilter),sizeof(bloomFilter));
  203. src.appendTo(outp);
  204. mg.mac().appendTo(outp);
  205. outp.append(mg.adi());
  206. outp.append((uint16_t)etherType);
  207. outp.append(data,len);
  208. outp.compress();
  209. RR->sw->send(tPtr,outp,true);
  210. ++sentCount;
  211. }
  212. // If this is a multicast replicator, we've already replicated.
  213. if (amMulticastReplicator)
  214. return (unsigned int)recipients.size();
  215. // Find the two best next hops (that have never seen this multicast)
  216. // that are newer version nodes.
  217. SharedPtr<Peer> nextHops[2];
  218. unsigned int nextHopsBestLatency[2] = { 0xffff,0xffff };
  219. for(std::vector< std::pair<int64_t,Address> >::iterator r(recipients.begin());r!=recipients.end();++r) {
  220. if (r->first >= 0) {
  221. const unsigned int bfi = bloomMultiplier * (unsigned int)r->second.toInt();
  222. if ((((uint8_t *)bloomFilter)[(bfi >> 3) % sizeof(bloomFilter)] & (1 << (bfi & 7))) == 0) {
  223. const SharedPtr<Peer> peer(RR->topology->get(r->second));
  224. if ((peer)&&(peer->remoteVersionProtocol() >= 11)) {
  225. r->first = -1; // use this field now to flag as non-legacy
  226. const unsigned int lat = peer->latency(now);
  227. for(unsigned int nh=0;nh<2;++nh) {
  228. if (lat <= nextHopsBestLatency[nh]) {
  229. nextHopsBestLatency[nh] = lat;
  230. nextHops[nh] = peer;
  231. break;
  232. }
  233. }
  234. }
  235. }
  236. }
  237. }
  238. // Set bits for next hops in bloom filter
  239. for(unsigned int nh=0;nh<2;++nh) {
  240. if (nextHops[nh]) {
  241. const unsigned int bfi = bloomMultiplier * (unsigned int)nextHops[nh]->address().toInt();
  242. ((uint8_t *)bloomFilter)[(bfi >> 3) % sizeof(bloomFilter)] |= 1 << (bfi & 7);
  243. ++sentCount;
  244. }
  245. }
  246. // Send to legacy peers and flag these in bloom filter
  247. const unsigned int limit = config.multicastLimit + bridgeCount;
  248. for(std::vector< std::pair<int64_t,Address> >::const_iterator r(recipients.begin());(r!=recipients.end())&&(sentCount<limit);++r) {
  249. if (r->first >= 0) {
  250. const unsigned int bfi = bloomMultiplier * (unsigned int)r->second.toInt();
  251. ((uint8_t *)bloomFilter)[(bfi >> 3) % sizeof(bloomFilter)] |= 1 << (bfi & 7);
  252. Packet outp(r->second,RR->identity.address(),Packet::VERB_MULTICAST_FRAME);
  253. outp.append(network->id());
  254. outp.append((uint8_t)0x04);
  255. src.appendTo(outp);
  256. mg.mac().appendTo(outp);
  257. outp.append(mg.adi());
  258. outp.append((uint16_t)etherType);
  259. outp.append(data,len);
  260. outp.compress();
  261. RR->sw->send(tPtr,outp,true);
  262. ++sentCount;
  263. }
  264. }
  265. // Send to next hops for P2P propagation
  266. for(unsigned int nh=0;nh<2;++nh) {
  267. if (nextHops[nh]) {
  268. Packet outp(nextHops[nh]->address(),RR->identity.address(),Packet::VERB_MULTICAST_FRAME);
  269. outp.append((uint8_t)(0x04 | 0x08));
  270. RR->identity.address().appendTo(outp);
  271. outp.append((uint16_t)bloomMultiplier);
  272. outp.append((uint16_t)sizeof(bloomFilter));
  273. outp.append(((uint8_t *)bloomFilter),sizeof(bloomFilter));
  274. src.appendTo(outp);
  275. mg.mac().appendTo(outp);
  276. outp.append(mg.adi());
  277. outp.append((uint16_t)etherType);
  278. outp.append(data,len);
  279. outp.compress();
  280. RR->sw->send(tPtr,outp,true);
  281. }
  282. }
  283. return (unsigned int)recipients.size();
  284. }
  285. void Multicaster::clean(int64_t now)
  286. {
  287. }
  288. } // namespace ZeroTier