|
@@ -175,128 +175,130 @@ void Multicaster::send(
|
|
|
unsigned long idxbuf[8194];
|
|
|
unsigned long *indexes = idxbuf;
|
|
|
|
|
|
- Mutex::Lock _l(_groups_m);
|
|
|
- MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)];
|
|
|
-
|
|
|
- if (!gs.members.empty()) {
|
|
|
- // Allocate a memory buffer if group is monstrous
|
|
|
- if (gs.members.size() > (sizeof(idxbuf) / sizeof(unsigned long)))
|
|
|
- indexes = new unsigned long[gs.members.size()];
|
|
|
-
|
|
|
- // Generate a random permutation of member indexes
|
|
|
- for(unsigned long i=0;i<gs.members.size();++i)
|
|
|
- indexes[i] = i;
|
|
|
- for(unsigned long i=(unsigned long)gs.members.size()-1;i>0;--i) {
|
|
|
- unsigned long j = (unsigned long)RR->node->prng() % (i + 1);
|
|
|
- unsigned long tmp = indexes[j];
|
|
|
- indexes[j] = indexes[i];
|
|
|
- indexes[i] = tmp;
|
|
|
+ try {
|
|
|
+ Mutex::Lock _l(_groups_m);
|
|
|
+ MulticastGroupStatus &gs = _groups[Multicaster::Key(nwid,mg)];
|
|
|
+
|
|
|
+ if (!gs.members.empty()) {
|
|
|
+ // Allocate a memory buffer if group is monstrous
|
|
|
+ if (gs.members.size() > (sizeof(idxbuf) / sizeof(unsigned long)))
|
|
|
+ indexes = new unsigned long[gs.members.size()];
|
|
|
+
|
|
|
+ // Generate a random permutation of member indexes
|
|
|
+ for(unsigned long i=0;i<gs.members.size();++i)
|
|
|
+ indexes[i] = i;
|
|
|
+ for(unsigned long i=(unsigned long)gs.members.size()-1;i>0;--i) {
|
|
|
+ unsigned long j = (unsigned long)RR->node->prng() % (i + 1);
|
|
|
+ unsigned long tmp = indexes[j];
|
|
|
+ indexes[j] = indexes[i];
|
|
|
+ indexes[i] = tmp;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- if (gs.members.size() >= limit) {
|
|
|
- // Skip queue if we already have enough members to complete the send operation
|
|
|
- OutboundMulticast out;
|
|
|
-
|
|
|
- out.init(
|
|
|
- RR,
|
|
|
- now,
|
|
|
- nwid,
|
|
|
- com,
|
|
|
- limit,
|
|
|
- 1, // we'll still gather a little from peers to keep multicast list fresh
|
|
|
- src,
|
|
|
- mg,
|
|
|
- etherType,
|
|
|
- data,
|
|
|
- len);
|
|
|
-
|
|
|
- unsigned int count = 0;
|
|
|
-
|
|
|
- for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
|
|
|
- if (*ast != RR->identity.address()) {
|
|
|
- out.sendOnly(RR,*ast);
|
|
|
- if (++count >= limit)
|
|
|
- break;
|
|
|
+ if (gs.members.size() >= limit) {
|
|
|
+ // Skip queue if we already have enough members to complete the send operation
|
|
|
+ OutboundMulticast out;
|
|
|
+
|
|
|
+ out.init(
|
|
|
+ RR,
|
|
|
+ now,
|
|
|
+ nwid,
|
|
|
+ com,
|
|
|
+ limit,
|
|
|
+ 1, // we'll still gather a little from peers to keep multicast list fresh
|
|
|
+ src,
|
|
|
+ mg,
|
|
|
+ etherType,
|
|
|
+ data,
|
|
|
+ len);
|
|
|
+
|
|
|
+ unsigned int count = 0;
|
|
|
+
|
|
|
+ for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
|
|
|
+ if (*ast != RR->identity.address()) {
|
|
|
+ out.sendOnly(RR,*ast); // optimization: don't use dedup log if it's a one-pass send
|
|
|
+ if (++count >= limit)
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- unsigned long idx = 0;
|
|
|
- while ((count < limit)&&(idx < gs.members.size())) {
|
|
|
- Address ma(gs.members[indexes[idx++]].address);
|
|
|
- if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) {
|
|
|
- out.sendOnly(RR,ma);
|
|
|
- ++count;
|
|
|
+ unsigned long idx = 0;
|
|
|
+ while ((count < limit)&&(idx < gs.members.size())) {
|
|
|
+ Address ma(gs.members[indexes[idx++]].address);
|
|
|
+ if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) {
|
|
|
+ out.sendOnly(RR,ma); // optimization: don't use dedup log if it's a one-pass send
|
|
|
+ ++count;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- } else {
|
|
|
- unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
|
|
|
-
|
|
|
- if ((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY) {
|
|
|
- gs.lastExplicitGather = now;
|
|
|
- SharedPtr<Peer> r(RR->topology->getBestRoot());
|
|
|
- if (r) {
|
|
|
- TRACE(">>MC upstream GATHER up to %u for group %.16llx/%s",gatherLimit,nwid,mg.toString().c_str());
|
|
|
-
|
|
|
- const CertificateOfMembership *com = (CertificateOfMembership *)0;
|
|
|
- {
|
|
|
- SharedPtr<Network> nw(RR->node->network(nwid));
|
|
|
- if (nw) {
|
|
|
- SharedPtr<NetworkConfig> nconf(nw->config2());
|
|
|
- if ((nconf)&&(nconf->com())&&(nconf->isPrivate())&&(r->needsOurNetworkMembershipCertificate(nwid,now,true)))
|
|
|
- com = &(nconf->com());
|
|
|
+ } else {
|
|
|
+ unsigned int gatherLimit = (limit - (unsigned int)gs.members.size()) + 1;
|
|
|
+
|
|
|
+ if ((gs.members.empty())||((now - gs.lastExplicitGather) >= ZT_MULTICAST_EXPLICIT_GATHER_DELAY)) {
|
|
|
+ gs.lastExplicitGather = now;
|
|
|
+ SharedPtr<Peer> r(RR->topology->getBestRoot());
|
|
|
+ if (r) {
|
|
|
+ TRACE(">>MC upstream GATHER up to %u for group %.16llx/%s",gatherLimit,nwid,mg.toString().c_str());
|
|
|
+
|
|
|
+ const CertificateOfMembership *com = (CertificateOfMembership *)0;
|
|
|
+ {
|
|
|
+ SharedPtr<Network> nw(RR->node->network(nwid));
|
|
|
+ if (nw) {
|
|
|
+ SharedPtr<NetworkConfig> nconf(nw->config2());
|
|
|
+ if ((nconf)&&(nconf->com())&&(nconf->isPrivate())&&(r->needsOurNetworkMembershipCertificate(nwid,now,true)))
|
|
|
+ com = &(nconf->com());
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- Packet outp(r->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
|
|
|
- outp.append(nwid);
|
|
|
- outp.append((uint8_t)(com ? 0x01 : 0x00));
|
|
|
- mg.mac().appendTo(outp);
|
|
|
- outp.append((uint32_t)mg.adi());
|
|
|
- outp.append((uint32_t)gatherLimit);
|
|
|
- if (com)
|
|
|
- com->serialize(outp);
|
|
|
- outp.armor(r->key(),true);
|
|
|
- r->send(RR,outp.data(),outp.size(),now);
|
|
|
+ Packet outp(r->address(),RR->identity.address(),Packet::VERB_MULTICAST_GATHER);
|
|
|
+ outp.append(nwid);
|
|
|
+ outp.append((uint8_t)(com ? 0x01 : 0x00));
|
|
|
+ mg.mac().appendTo(outp);
|
|
|
+ outp.append((uint32_t)mg.adi());
|
|
|
+ outp.append((uint32_t)gatherLimit);
|
|
|
+ if (com)
|
|
|
+ com->serialize(outp);
|
|
|
+ outp.armor(r->key(),true);
|
|
|
+ r->send(RR,outp.data(),outp.size(),now);
|
|
|
+ }
|
|
|
+ gatherLimit = 0;
|
|
|
}
|
|
|
- gatherLimit = 0;
|
|
|
- }
|
|
|
|
|
|
- gs.txQueue.push_back(OutboundMulticast());
|
|
|
- OutboundMulticast &out = gs.txQueue.back();
|
|
|
-
|
|
|
- out.init(
|
|
|
- RR,
|
|
|
- now,
|
|
|
- nwid,
|
|
|
- com,
|
|
|
- limit,
|
|
|
- gatherLimit,
|
|
|
- src,
|
|
|
- mg,
|
|
|
- etherType,
|
|
|
- data,
|
|
|
- len);
|
|
|
-
|
|
|
- unsigned int count = 0;
|
|
|
-
|
|
|
- for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
|
|
|
- if (*ast != RR->identity.address()) {
|
|
|
- out.sendAndLog(RR,*ast);
|
|
|
- if (++count >= limit)
|
|
|
- break;
|
|
|
+ gs.txQueue.push_back(OutboundMulticast());
|
|
|
+ OutboundMulticast &out = gs.txQueue.back();
|
|
|
+
|
|
|
+ out.init(
|
|
|
+ RR,
|
|
|
+ now,
|
|
|
+ nwid,
|
|
|
+ com,
|
|
|
+ limit,
|
|
|
+ gatherLimit,
|
|
|
+ src,
|
|
|
+ mg,
|
|
|
+ etherType,
|
|
|
+ data,
|
|
|
+ len);
|
|
|
+
|
|
|
+ unsigned int count = 0;
|
|
|
+
|
|
|
+ for(std::vector<Address>::const_iterator ast(alwaysSendTo.begin());ast!=alwaysSendTo.end();++ast) {
|
|
|
+ if (*ast != RR->identity.address()) {
|
|
|
+ out.sendAndLog(RR,*ast);
|
|
|
+ if (++count >= limit)
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- unsigned long idx = 0;
|
|
|
- while ((count < limit)&&(idx < gs.members.size())) {
|
|
|
- Address ma(gs.members[indexes[idx++]].address);
|
|
|
- if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) {
|
|
|
- out.sendAndLog(RR,ma);
|
|
|
- ++count;
|
|
|
+ unsigned long idx = 0;
|
|
|
+ while ((count < limit)&&(idx < gs.members.size())) {
|
|
|
+ Address ma(gs.members[indexes[idx++]].address);
|
|
|
+ if (std::find(alwaysSendTo.begin(),alwaysSendTo.end(),ma) == alwaysSendTo.end()) {
|
|
|
+ out.sendAndLog(RR,ma);
|
|
|
+ ++count;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
+ } catch ( ... ) {} // this is a sanity check to catch any failures and make sure indexes[] still gets deleted
|
|
|
|
|
|
// Free allocated memory buffer if any
|
|
|
if (indexes != idxbuf)
|