Cluster.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2015 ZeroTier, Inc.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. * --
  19. *
  20. * ZeroTier may be used and distributed under the terms of the GPLv3, which
  21. * are available at: http://www.gnu.org/licenses/gpl-3.0.html
  22. *
  23. * If you would like to embed ZeroTier into a commercial application or
  24. * redistribute it in a modified binary form, please contact ZeroTier Networks
  25. * LLC. Start here: http://www.zerotier.com/
  26. */
  27. #ifdef ZT_ENABLE_CLUSTER
  28. #include <stdint.h>
  29. #include <stdio.h>
  30. #include <stdlib.h>
  31. #include <string.h>
  32. #include <algorithm>
  33. #include <utility>
  34. #include "Cluster.hpp"
  35. #include "RuntimeEnvironment.hpp"
  36. #include "MulticastGroup.hpp"
  37. #include "CertificateOfMembership.hpp"
  38. #include "Salsa20.hpp"
  39. #include "Poly1305.hpp"
  40. #include "Packet.hpp"
  41. #include "Peer.hpp"
  42. #include "Switch.hpp"
  43. #include "Node.hpp"
  44. namespace ZeroTier {
  45. Cluster::Cluster(const RuntimeEnvironment *renv,uint16_t id,DistanceAlgorithm da,int32_t x,int32_t y,int32_t z,void (*sendFunction)(void *,uint16_t,const void *,unsigned int),void *arg) :
  46. RR(renv),
  47. _sendFunction(sendFunction),
  48. _arg(arg),
  49. _x(x),
  50. _y(y),
  51. _z(z),
  52. _da(da),
  53. _id(id),
  54. _members(new _Member[65536])
  55. {
  56. uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
  57. // Generate master secret by hashing the secret from our Identity key pair
  58. RR->identity.sha512PrivateKey(_masterSecret);
  59. // Generate our inbound message key, which is the master secret XORed with our ID and hashed twice
  60. memcpy(stmp,_masterSecret,sizeof(stmp));
  61. stmp[0] ^= Utils::hton(id);
  62. SHA512::hash(stmp,stmp,sizeof(stmp));
  63. SHA512::hash(stmp,stmp,sizeof(stmp));
  64. memcpy(_key,stmp,sizeof(_key));
  65. Utils::burn(stmp,sizeof(stmp));
  66. }
  67. Cluster::~Cluster()
  68. {
  69. Utils::burn(_masterSecret,sizeof(_masterSecret));
  70. Utils::burn(_key,sizeof(_key));
  71. delete [] _members;
  72. }
  73. void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
  74. {
  75. Buffer<ZT_CLUSTER_MAX_MESSAGE_LENGTH> dmsg;
  76. {
  77. // FORMAT: <[16] iv><[8] MAC><... data>
  78. if ((len < 24)||(len > ZT_CLUSTER_MAX_MESSAGE_LENGTH))
  79. return;
  80. // 16-byte IV: first 8 bytes XORed with key, last 8 bytes used as Salsa20 64-bit IV
  81. char keytmp[32];
  82. memcpy(keytmp,_key,32);
  83. for(int i=0;i<8;++i)
  84. keytmp[i] ^= reinterpret_cast<const char *>(msg)[i];
  85. Salsa20 s20(keytmp,256,reinterpret_cast<const char *>(msg) + 8);
  86. Utils::burn(keytmp,sizeof(keytmp));
  87. // One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
  88. char polykey[ZT_POLY1305_KEY_LEN];
  89. memset(polykey,0,sizeof(polykey));
  90. s20.encrypt12(polykey,polykey,sizeof(polykey));
  91. // Compute 16-byte MAC
  92. char mac[ZT_POLY1305_MAC_LEN];
  93. Poly1305::compute(mac,reinterpret_cast<const char *>(msg) + 24,len - 24,polykey);
  94. // Check first 8 bytes of MAC against 64-bit MAC in stream
  95. if (!Utils::secureEq(mac,reinterpret_cast<const char *>(msg) + 16,8))
  96. return;
  97. // Decrypt!
  98. dmsg.setSize(len - 24);
  99. s20.decrypt12(reinterpret_cast<const char *>(msg) + 24,const_cast<void *>(dmsg.data()),dmsg.size());
  100. }
  101. if (dmsg.size() < 2)
  102. return;
  103. const uint16_t fromMemberId = dmsg.at<uint16_t>(0);
  104. unsigned int ptr = 2;
  105. _Member &m = _members[fromMemberId];
  106. Mutex::Lock mlck(m.lock);
  107. m.lastReceivedFrom = RR->node->now();
  108. try {
  109. while (ptr < dmsg.size()) {
  110. const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
  111. const unsigned int nextPtr = ptr + mlen;
  112. int mtype = -1;
  113. try {
  114. switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
  115. default:
  116. break;
  117. case STATE_MESSAGE_ALIVE: {
  118. ptr += 7; // skip version stuff, not used yet
  119. m.x = dmsg.at<int32_t>(ptr); ptr += 4;
  120. m.y = dmsg.at<int32_t>(ptr); ptr += 4;
  121. m.z = dmsg.at<int32_t>(ptr); ptr += 4;
  122. ptr += 8; // skip local clock, not used
  123. m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
  124. ptr += 8; // skip flags, unused
  125. m.physicalAddressCount = dmsg[ptr++];
  126. if (m.physicalAddressCount > ZT_CLUSTER_MEMBER_MAX_PHYSICAL_ADDRS)
  127. m.physicalAddressCount = ZT_CLUSTER_MEMBER_MAX_PHYSICAL_ADDRS;
  128. for(unsigned int i=0;i<m.physicalAddressCount;++i)
  129. ptr += m.physicalAddresses[i].deserialize(dmsg,ptr);
  130. m.lastReceivedAliveAnnouncement = RR->node->now();
  131. } break;
  132. case STATE_MESSAGE_HAVE_PEER: {
  133. try {
  134. Identity id;
  135. ptr += id.deserialize(dmsg,ptr);
  136. RR->topology->saveIdentity(id);
  137. { // Add or update peer affinity entry
  138. _PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
  139. Mutex::Lock _l2(_peerAffinities_m);
  140. std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
  141. if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
  142. i->timestamp = pa.timestamp;
  143. } else {
  144. _peerAffinities.push_back(pa);
  145. std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
  146. }
  147. }
  148. } catch ( ... ) {
  149. // ignore invalid identities
  150. }
  151. } break;
  152. case STATE_MESSAGE_MULTICAST_LIKE: {
  153. const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
  154. const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
  155. const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
  156. const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
  157. RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
  158. } break;
  159. case STATE_MESSAGE_COM: {
  160. // TODO: not used yet
  161. } break;
  162. case STATE_MESSAGE_RELAY: {
  163. const unsigned int numRemotePeerPaths = dmsg[ptr++];
  164. InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
  165. for(unsigned int i=0;i<numRemotePeerPaths;++i)
  166. ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
  167. const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
  168. const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
  169. if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
  170. const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
  171. SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
  172. if (destinationPeer) {
  173. if (
  174. (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
  175. (numRemotePeerPaths > 0)&&
  176. (packetLen >= 18)&&
  177. (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
  178. ) {
  179. // If remote peer paths were sent with this relayed packet, we do
  180. // RENDEZVOUS. It's handled here for cluster-relayed packets since
  181. // we don't have both Peer records so this is a different path.
  182. const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
  183. InetAddress bestDestV4,bestDestV6;
  184. destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
  185. InetAddress bestRemoteV4,bestRemoteV6;
  186. for(unsigned int i=0;i<numRemotePeerPaths;++i) {
  187. if ((bestRemoteV4)&&(bestRemoteV6))
  188. break;
  189. switch(remotePeerPaths[i].ss_family) {
  190. case AF_INET:
  191. if (!bestRemoteV4)
  192. bestRemoteV4 = remotePeerPaths[i];
  193. break;
  194. case AF_INET6:
  195. if (!bestRemoteV6)
  196. bestRemoteV6 = remotePeerPaths[i];
  197. break;
  198. }
  199. }
  200. Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
  201. rendezvousForDest.append((uint8_t)0);
  202. remotePeerAddress.appendTo(rendezvousForDest);
  203. Buffer<2048> rendezvousForOtherEnd;
  204. rendezvousForOtherEnd.addSize(2); // leave room for payload size
  205. rendezvousForOtherEnd.append((uint8_t)STATE_MESSAGE_PROXY_SEND);
  206. remotePeerAddress.appendTo(rendezvousForOtherEnd);
  207. rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
  208. const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
  209. rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
  210. rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
  211. destinationAddress.appendTo(rendezvousForOtherEnd);
  212. bool haveMatch = false;
  213. if ((bestDestV6)&&(bestRemoteV6)) {
  214. haveMatch = true;
  215. rendezvousForDest.append((uint16_t)bestRemoteV6.port());
  216. rendezvousForDest.append((uint8_t)16);
  217. rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
  218. rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
  219. rendezvousForOtherEnd.append((uint8_t)16);
  220. rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
  221. rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
  222. } else if ((bestDestV4)&&(bestRemoteV4)) {
  223. haveMatch = true;
  224. rendezvousForDest.append((uint16_t)bestRemoteV4.port());
  225. rendezvousForDest.append((uint8_t)4);
  226. rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
  227. rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
  228. rendezvousForOtherEnd.append((uint8_t)4);
  229. rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
  230. rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
  231. }
  232. if (haveMatch) {
  233. RR->sw->send(rendezvousForDest,true,0);
  234. rendezvousForOtherEnd.setAt<uint16_t>(0,(uint16_t)(rendezvousForOtherEnd.size() - 2));
  235. _send(fromMemberId,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
  236. }
  237. }
  238. }
  239. }
  240. } break;
  241. case STATE_MESSAGE_PROXY_SEND: {
  242. const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
  243. const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
  244. const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
  245. Packet outp(rcpt,RR->identity.address(),verb);
  246. outp.append(dmsg.field(ptr,len),len);
  247. RR->sw->send(outp,true,0);
  248. } break;
  249. }
  250. } catch ( ... ) {
  251. TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
  252. // drop invalids
  253. }
  254. ptr = nextPtr;
  255. }
  256. } catch ( ... ) {
  257. TRACE("invalid message (outer loop), discarding");
  258. // drop invalids
  259. }
  260. }
  261. void Cluster::replicateHavePeer(const Address &peerAddress)
  262. {
  263. }
  264. void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group)
  265. {
  266. }
  267. void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com)
  268. {
  269. }
  270. void Cluster::doPeriodicTasks()
  271. {
  272. // Go ahead and flush whenever possible right now
  273. {
  274. Mutex::Lock _l(_memberIds_m);
  275. for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
  276. Mutex::Lock _l2(_members[*mid].lock);
  277. _flush(*mid);
  278. }
  279. }
  280. }
  281. void Cluster::addMember(uint16_t memberId)
  282. {
  283. Mutex::Lock _l2(_members[memberId].lock);
  284. Mutex::Lock _l(_memberIds_m);
  285. _memberIds.push_back(memberId);
  286. std::sort(_memberIds.begin(),_memberIds.end());
  287. // Generate this member's message key from the master and its ID
  288. uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
  289. memcpy(stmp,_masterSecret,sizeof(stmp));
  290. stmp[0] ^= Utils::hton(memberId);
  291. SHA512::hash(stmp,stmp,sizeof(stmp));
  292. SHA512::hash(stmp,stmp,sizeof(stmp));
  293. memcpy(_members[memberId].key,stmp,sizeof(_members[memberId].key));
  294. Utils::burn(stmp,sizeof(stmp));
  295. // Prepare q
  296. _members[memberId].q.clear();
  297. char iv[16];
  298. Utils::getSecureRandom(iv,16);
  299. _members[memberId].q.append(iv,16);
  300. _members[memberId].q.addSize(8); // room for MAC
  301. _members[memberId].q.append((uint16_t)_id);
  302. }
  303. void Cluster::_send(uint16_t memberId,const void *msg,unsigned int len)
  304. {
  305. _Member &m = _members[memberId];
  306. // assumes m.lock is locked!
  307. for(;;) {
  308. if ((m.q.size() + len) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)
  309. _flush(memberId);
  310. else {
  311. m.q.append(msg,len);
  312. break;
  313. }
  314. }
  315. }
  316. void Cluster::_flush(uint16_t memberId)
  317. {
  318. _Member &m = _members[memberId];
  319. // assumes m.lock is locked!
  320. if (m.q.size() > 26) { // 16-byte IV + 8-byte MAC + 2-byte cluster member ID (latter two bytes are inside crypto envelope)
  321. // Create key from member's key and IV
  322. char keytmp[32];
  323. memcpy(keytmp,m.key,32);
  324. for(int i=0;i<8;++i)
  325. keytmp[i] ^= m.q[i];
  326. Salsa20 s20(keytmp,256,m.q.field(8,8));
  327. Utils::burn(keytmp,sizeof(keytmp));
  328. // One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
  329. char polykey[ZT_POLY1305_KEY_LEN];
  330. memset(polykey,0,sizeof(polykey));
  331. s20.encrypt12(polykey,polykey,sizeof(polykey));
  332. // Encrypt m.q in place
  333. s20.encrypt12(reinterpret_cast<const char *>(m.q.data()) + 24,const_cast<char *>(reinterpret_cast<const char *>(m.q.data())) + 24,m.q.size() - 24);
  334. // Add MAC for authentication (encrypt-then-MAC)
  335. char mac[ZT_POLY1305_MAC_LEN];
  336. Poly1305::compute(mac,reinterpret_cast<const char *>(m.q.data()) + 24,m.q.size() - 24,polykey);
  337. memcpy(m.q.field(16,8),mac,8);
  338. // Send!
  339. _sendFunction(_arg,memberId,m.q.data(),m.q.size());
  340. // Prepare for more
  341. m.q.clear();
  342. char iv[16];
  343. Utils::getSecureRandom(iv,16);
  344. m.q.append(iv,16);
  345. m.q.addSize(8); // room for MAC
  346. m.q.append((uint16_t)_id);
  347. }
  348. }
  349. } // namespace ZeroTier
  350. #endif // ZT_ENABLE_CLUSTER