|
@@ -50,7 +50,6 @@ namespace ZeroTier {
|
|
Switch::Switch(const RuntimeEnvironment *renv) :
|
|
Switch::Switch(const RuntimeEnvironment *renv) :
|
|
RR(renv),
|
|
RR(renv),
|
|
_lastBeaconResponse(0),
|
|
_lastBeaconResponse(0),
|
|
- _outstandingWhoisRequests(32),
|
|
|
|
_lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine
|
|
_lastUniteAttempt(8) // only really used on root servers and upstreams, and it'll grow there just fine
|
|
{
|
|
{
|
|
}
|
|
}
|
|
@@ -229,8 +228,8 @@ void Switch::onRemotePacket(void *tPtr,const int64_t localSocket,const InetAddre
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- relayTo = RR->topology->getUpstreamPeer(&source,1,true);
|
|
|
|
- if (relayTo)
|
|
|
|
|
|
+ relayTo = RR->topology->getUpstreamPeer();
|
|
|
|
+ if ((relayTo)&&(relayTo->address() != source))
|
|
relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,true);
|
|
relayTo->sendDirect(tPtr,packet.data(),packet.size(),now,true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -553,33 +552,35 @@ void Switch::send(void *tPtr,Packet &packet,bool encrypt)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
-void Switch::requestWhois(void *tPtr,const Address &addr)
|
|
|
|
|
|
+void Switch::requestWhois(void *tPtr,const uint64_t now,const Address &addr)
|
|
{
|
|
{
|
|
if (addr == RR->identity.address())
|
|
if (addr == RR->identity.address())
|
|
return;
|
|
return;
|
|
- bool inserted = false;
|
|
|
|
|
|
+
|
|
{
|
|
{
|
|
- Mutex::Lock _l(_outstandingWhoisRequests_m);
|
|
|
|
- WhoisRequest &r = _outstandingWhoisRequests[addr];
|
|
|
|
- if (r.lastSent) {
|
|
|
|
- r.retries = 0; // reset retry count if entry already existed, but keep waiting and retry again after normal timeout
|
|
|
|
- } else {
|
|
|
|
- r.lastSent = RR->node->now();
|
|
|
|
- inserted = true;
|
|
|
|
- }
|
|
|
|
|
|
+ Mutex::Lock _l(_lastSentWhoisRequest_m);
|
|
|
|
+ uint64_t &last = _lastSentWhoisRequest[addr];
|
|
|
|
+ if ((now - last) < ZT_WHOIS_RETRY_DELAY)
|
|
|
|
+ return;
|
|
|
|
+ else last = now;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ const SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer());
|
|
|
|
+ if (upstream) {
|
|
|
|
+ Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS);
|
|
|
|
+ addr.appendTo(outp);
|
|
|
|
+ RR->node->expectReplyTo(outp.packetId());
|
|
|
|
+ send(tPtr,outp,true);
|
|
}
|
|
}
|
|
- if (inserted)
|
|
|
|
- _sendWhoisRequest(tPtr,addr,(const Address *)0,0);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
|
|
void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
|
|
{
|
|
{
|
|
- { // cancel pending WHOIS since we now know this peer
|
|
|
|
- Mutex::Lock _l(_outstandingWhoisRequests_m);
|
|
|
|
- _outstandingWhoisRequests.erase(peer->address());
|
|
|
|
|
|
+ {
|
|
|
|
+ Mutex::Lock _l(_lastSentWhoisRequest_m);
|
|
|
|
+ _lastSentWhoisRequest.erase(peer->address());
|
|
}
|
|
}
|
|
|
|
|
|
- // finish processing any packets waiting on peer's public key / identity
|
|
|
|
const uint64_t now = RR->node->now();
|
|
const uint64_t now = RR->node->now();
|
|
for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
|
|
for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
|
|
RXQueueEntry *const rq = &(_rxQueue[ptr]);
|
|
RXQueueEntry *const rq = &(_rxQueue[ptr]);
|
|
@@ -589,57 +590,61 @@ void Switch::doAnythingWaitingForPeer(void *tPtr,const SharedPtr<Peer> &peer)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- { // finish sending any packets waiting on peer's public key / identity
|
|
|
|
|
|
+ {
|
|
Mutex::Lock _l(_txQueue_m);
|
|
Mutex::Lock _l(_txQueue_m);
|
|
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
|
|
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
|
|
if (txi->dest == peer->address()) {
|
|
if (txi->dest == peer->address()) {
|
|
- if (_trySend(tPtr,txi->packet,txi->encrypt))
|
|
|
|
|
|
+ if (_trySend(tPtr,txi->packet,txi->encrypt)) {
|
|
_txQueue.erase(txi++);
|
|
_txQueue.erase(txi++);
|
|
- else ++txi;
|
|
|
|
- } else ++txi;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now)
|
|
|
|
-{
|
|
|
|
- unsigned long nextDelay = 0xffffffff; // ceiling delay, caller will cap to minimum
|
|
|
|
-
|
|
|
|
- { // Retry outstanding WHOIS requests
|
|
|
|
- Mutex::Lock _l(_outstandingWhoisRequests_m);
|
|
|
|
- Hashtable< Address,WhoisRequest >::Iterator i(_outstandingWhoisRequests);
|
|
|
|
- Address *a = (Address *)0;
|
|
|
|
- WhoisRequest *r = (WhoisRequest *)0;
|
|
|
|
- while (i.next(a,r)) {
|
|
|
|
- const unsigned long since = (unsigned long)(now - r->lastSent);
|
|
|
|
- if (since >= ZT_WHOIS_RETRY_DELAY) {
|
|
|
|
- if (r->retries >= ZT_MAX_WHOIS_RETRIES) {
|
|
|
|
- _outstandingWhoisRequests.erase(*a);
|
|
|
|
} else {
|
|
} else {
|
|
- r->lastSent = now;
|
|
|
|
- r->peersConsulted[r->retries] = _sendWhoisRequest(tPtr,*a,r->peersConsulted,(r->retries > 1) ? r->retries : 0);
|
|
|
|
- ++r->retries;
|
|
|
|
- nextDelay = std::min(nextDelay,(unsigned long)ZT_WHOIS_RETRY_DELAY);
|
|
|
|
|
|
+ ++txi;
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- nextDelay = std::min(nextDelay,ZT_WHOIS_RETRY_DELAY - since);
|
|
|
|
|
|
+ ++txi;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+}
|
|
|
|
|
|
- { // Time out TX queue packets that never got WHOIS lookups or other info.
|
|
|
|
|
|
+unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now)
|
|
|
|
+{
|
|
|
|
+ const uint64_t timeSinceLastCheck = now - _lastCheckedQueues;
|
|
|
|
+ if (timeSinceLastCheck < ZT_WHOIS_RETRY_DELAY)
|
|
|
|
+ return (unsigned long)(ZT_WHOIS_RETRY_DELAY - timeSinceLastCheck);
|
|
|
|
+ _lastCheckedQueues = now;
|
|
|
|
+
|
|
|
|
+ {
|
|
Mutex::Lock _l(_txQueue_m);
|
|
Mutex::Lock _l(_txQueue_m);
|
|
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
|
|
for(std::list< TXQueueEntry >::iterator txi(_txQueue.begin());txi!=_txQueue.end();) {
|
|
if (_trySend(tPtr,txi->packet,txi->encrypt)) {
|
|
if (_trySend(tPtr,txi->packet,txi->encrypt)) {
|
|
_txQueue.erase(txi++);
|
|
_txQueue.erase(txi++);
|
|
} else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
|
|
} else if ((now - txi->creationTime) > ZT_TRANSMIT_QUEUE_TIMEOUT) {
|
|
RR->t->txTimedOut(tPtr,txi->dest);
|
|
RR->t->txTimedOut(tPtr,txi->dest);
|
|
- _txQueue.erase(txi++);
|
|
|
|
- } else ++txi;
|
|
|
|
|
|
+ _txQueue.erase(txi);
|
|
|
|
+ ++txi;
|
|
|
|
+ } else if (!RR->topology->getPeer(tPtr,txi->dest)) {
|
|
|
|
+ requestWhois(tPtr,now,txi->dest);
|
|
|
|
+ ++txi;
|
|
|
|
+ } else {
|
|
|
|
+ ++txi;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for(unsigned int ptr=0;ptr<ZT_RX_QUEUE_SIZE;++ptr) {
|
|
|
|
+ RXQueueEntry *const rq = &(_rxQueue[ptr]);
|
|
|
|
+ if ((rq->timestamp)&&(rq->complete)) {
|
|
|
|
+ if ((rq->frag0.tryDecode(RR,tPtr))||((now - rq->timestamp) > ZT_RECEIVE_QUEUE_TIMEOUT)) {
|
|
|
|
+ rq->timestamp = 0;
|
|
|
|
+ } else {
|
|
|
|
+ const Address src(rq->frag0.source());
|
|
|
|
+ if (!RR->topology->getPeer(tPtr,src))
|
|
|
|
+ requestWhois(tPtr,now,src);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- { // Remove really old last unite attempt entries to keep table size controlled
|
|
|
|
|
|
+ {
|
|
Mutex::Lock _l(_lastUniteAttempt_m);
|
|
Mutex::Lock _l(_lastUniteAttempt_m);
|
|
Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt);
|
|
Hashtable< _LastUniteKey,uint64_t >::Iterator i(_lastUniteAttempt);
|
|
_LastUniteKey *k = (_LastUniteKey *)0;
|
|
_LastUniteKey *k = (_LastUniteKey *)0;
|
|
@@ -650,7 +655,18 @@ unsigned long Switch::doTimerTasks(void *tPtr,uint64_t now)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- return nextDelay;
|
|
|
|
|
|
+ {
|
|
|
|
+ Mutex::Lock _l(_lastSentWhoisRequest_m);
|
|
|
|
+ Hashtable< Address,uint64_t >::Iterator i(_lastSentWhoisRequest);
|
|
|
|
+ Address *a = (Address *)0;
|
|
|
|
+ uint64_t *ts = (uint64_t *)0;
|
|
|
|
+ while (i.next(a,ts)) {
|
|
|
|
+ if ((now - *ts) > (ZT_WHOIS_RETRY_DELAY * 2))
|
|
|
|
+ _lastSentWhoisRequest.erase(*a);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return ZT_WHOIS_RETRY_DELAY;
|
|
}
|
|
}
|
|
|
|
|
|
bool Switch::_shouldUnite(const uint64_t now,const Address &source,const Address &destination)
|
|
bool Switch::_shouldUnite(const uint64_t now,const Address &source,const Address &destination)
|
|
@@ -664,18 +680,6 @@ bool Switch::_shouldUnite(const uint64_t now,const Address &source,const Address
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
-Address Switch::_sendWhoisRequest(void *tPtr,const Address &addr,const Address *peersAlreadyConsulted,unsigned int numPeersAlreadyConsulted)
|
|
|
|
-{
|
|
|
|
- SharedPtr<Peer> upstream(RR->topology->getUpstreamPeer(peersAlreadyConsulted,numPeersAlreadyConsulted,false));
|
|
|
|
- if (upstream) {
|
|
|
|
- Packet outp(upstream->address(),RR->identity.address(),Packet::VERB_WHOIS);
|
|
|
|
- addr.appendTo(outp);
|
|
|
|
- RR->node->expectReplyTo(outp.packetId());
|
|
|
|
- send(tPtr,outp,true);
|
|
|
|
- }
|
|
|
|
- return Address();
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
|
|
bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
|
|
{
|
|
{
|
|
SharedPtr<Path> viaPath;
|
|
SharedPtr<Path> viaPath;
|
|
@@ -709,7 +713,7 @@ bool Switch::_trySend(void *tPtr,Packet &packet,bool encrypt)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
- requestWhois(tPtr,destination);
|
|
|
|
|
|
+ requestWhois(tPtr,now,destination);
|
|
return false; // if we are not in cluster mode, there is no way we can send without knowing the peer directly
|
|
return false; // if we are not in cluster mode, there is no way we can send without knowing the peer directly
|
|
}
|
|
}
|
|
|
|
|