Cluster.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2015 ZeroTier, Inc.
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. * --
  19. *
  20. * ZeroTier may be used and distributed under the terms of the GPLv3, which
  21. * are available at: http://www.gnu.org/licenses/gpl-3.0.html
  22. *
  23. * If you would like to embed ZeroTier into a commercial application or
  24. * redistribute it in a modified binary form, please contact ZeroTier Networks
  25. * LLC. Start here: http://www.zerotier.com/
  26. */
  27. #ifdef ZT_ENABLE_CLUSTER
  28. #include <stdint.h>
  29. #include <stdio.h>
  30. #include <stdlib.h>
  31. #include <string.h>
  32. #include <algorithm>
  33. #include <utility>
  34. #include "Cluster.hpp"
  35. #include "RuntimeEnvironment.hpp"
  36. #include "MulticastGroup.hpp"
  37. #include "CertificateOfMembership.hpp"
  38. #include "Salsa20.hpp"
  39. #include "Poly1305.hpp"
  40. #include "Packet.hpp"
  41. #include "Peer.hpp"
  42. #include "Switch.hpp"
  43. #include "Node.hpp"
  44. namespace ZeroTier {
  45. Cluster::Cluster(const RuntimeEnvironment *renv,uint16_t id,DistanceAlgorithm da,int32_t x,int32_t y,int32_t z,void (*sendFunction)(void *,uint16_t,const void *,unsigned int),void *arg) :
  46. RR(renv),
  47. _sendFunction(sendFunction),
  48. _arg(arg),
  49. _x(x),
  50. _y(y),
  51. _z(z),
  52. _da(da),
  53. _id(id)
  54. {
  55. uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
  56. // Generate master secret by hashing the secret from our Identity key pair
  57. RR->identity.sha512PrivateKey(_masterSecret);
  58. // Generate our inbound message key, which is the master secret XORed with our ID and hashed twice
  59. memcpy(stmp,_masterSecret,sizeof(stmp));
  60. stmp[0] ^= Utils::hton(id);
  61. SHA512::hash(stmp,stmp,sizeof(stmp));
  62. SHA512::hash(stmp,stmp,sizeof(stmp));
  63. memcpy(_key,stmp,sizeof(_key));
  64. Utils::burn(stmp,sizeof(stmp));
  65. }
  66. Cluster::~Cluster()
  67. {
  68. Utils::burn(_masterSecret,sizeof(_masterSecret));
  69. Utils::burn(_key,sizeof(_key));
  70. }
  71. void Cluster::handleIncomingStateMessage(const void *msg,unsigned int len)
  72. {
  73. Buffer<ZT_CLUSTER_MAX_MESSAGE_LENGTH> dmsg;
  74. {
  75. // FORMAT: <[16] iv><[8] MAC><... data>
  76. if ((len < 24)||(len > ZT_CLUSTER_MAX_MESSAGE_LENGTH))
  77. return;
  78. // 16-byte IV: first 8 bytes XORed with key, last 8 bytes used as Salsa20 64-bit IV
  79. char keytmp[32];
  80. memcpy(keytmp,_key,32);
  81. for(int i=0;i<8;++i)
  82. keytmp[i] ^= reinterpret_cast<const char *>(msg)[i];
  83. Salsa20 s20(keytmp,256,reinterpret_cast<const char *>(msg) + 8);
  84. Utils::burn(keytmp,sizeof(keytmp));
  85. // One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
  86. char polykey[ZT_POLY1305_KEY_LEN];
  87. memset(polykey,0,sizeof(polykey));
  88. s20.encrypt12(polykey,polykey,sizeof(polykey));
  89. // Compute 16-byte MAC
  90. char mac[ZT_POLY1305_MAC_LEN];
  91. Poly1305::compute(mac,reinterpret_cast<const char *>(msg) + 24,len - 24,polykey);
  92. // Check first 8 bytes of MAC against 64-bit MAC in stream
  93. if (!Utils::secureEq(mac,reinterpret_cast<const char *>(msg) + 16,8))
  94. return;
  95. // Decrypt!
  96. dmsg.setSize(len - 24);
  97. s20.decrypt12(reinterpret_cast<const char *>(msg) + 24,const_cast<void *>(dmsg.data()),dmsg.size());
  98. }
  99. if (dmsg.size() < 2)
  100. return;
  101. const uint16_t fromMemberId = dmsg.at<uint16_t>(0);
  102. unsigned int ptr = 2;
  103. _Member &m = _members[fromMemberId];
  104. Mutex::Lock mlck(m.lock);
  105. m.lastReceivedFrom = RR->node->now();
  106. try {
  107. while (ptr < dmsg.size()) {
  108. const unsigned int mlen = dmsg.at<uint16_t>(ptr); ptr += 2;
  109. const unsigned int nextPtr = ptr + mlen;
  110. int mtype = -1;
  111. try {
  112. switch((StateMessageType)(mtype = (int)dmsg[ptr++])) {
  113. default:
  114. break;
  115. case STATE_MESSAGE_ALIVE: {
  116. ptr += 7; // skip version stuff, not used yet
  117. m.x = dmsg.at<int32_t>(ptr); ptr += 4;
  118. m.y = dmsg.at<int32_t>(ptr); ptr += 4;
  119. m.z = dmsg.at<int32_t>(ptr); ptr += 4;
  120. ptr += 8; // skip local clock, not used
  121. m.load = dmsg.at<uint64_t>(ptr); ptr += 8;
  122. ptr += 8; // skip flags, unused
  123. m.physicalAddressCount = dmsg[ptr++];
  124. if (m.physicalAddressCount > ZT_CLUSTER_MEMBER_MAX_PHYSICAL_ADDRS)
  125. m.physicalAddressCount = ZT_CLUSTER_MEMBER_MAX_PHYSICAL_ADDRS;
  126. for(unsigned int i=0;i<m.physicalAddressCount;++i)
  127. ptr += m.physicalAddresses[i].deserialize(dmsg,ptr);
  128. m.lastReceivedAliveAnnouncement = RR->node->now();
  129. } break;
  130. case STATE_MESSAGE_HAVE_PEER: {
  131. try {
  132. Identity id;
  133. ptr += id.deserialize(dmsg,ptr);
  134. RR->topology->saveIdentity(id);
  135. { // Add or update peer affinity entry
  136. _PeerAffinity pa(id.address(),fromMemberId,RR->node->now());
  137. Mutex::Lock _l2(_peerAffinities_m);
  138. std::vector<_PeerAffinity>::iterator i(std::lower_bound(_peerAffinities.begin(),_peerAffinities.end(),pa)); // O(log(n))
  139. if ((i != _peerAffinities.end())&&(i->key == pa.key)) {
  140. i->timestamp = pa.timestamp;
  141. } else {
  142. _peerAffinities.push_back(pa);
  143. std::sort(_peerAffinities.begin(),_peerAffinities.end()); // probably a more efficient way to insert but okay for now
  144. }
  145. }
  146. } catch ( ... ) {
  147. // ignore invalid identities
  148. }
  149. } break;
  150. case STATE_MESSAGE_MULTICAST_LIKE: {
  151. const uint64_t nwid = dmsg.at<uint64_t>(ptr); ptr += 8;
  152. const Address address(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH); ptr += ZT_ADDRESS_LENGTH;
  153. const MAC mac(dmsg.field(ptr,6),6); ptr += 6;
  154. const uint32_t adi = dmsg.at<uint32_t>(ptr); ptr += 4;
  155. RR->mc->add(RR->node->now(),nwid,MulticastGroup(mac,adi),address);
  156. } break;
  157. case STATE_MESSAGE_COM: {
  158. // TODO: not used yet
  159. } break;
  160. case STATE_MESSAGE_RELAY: {
  161. const unsigned int numRemotePeerPaths = dmsg[ptr++];
  162. InetAddress remotePeerPaths[256]; // size is 8-bit, so 256 is max
  163. for(unsigned int i=0;i<numRemotePeerPaths;++i)
  164. ptr += remotePeerPaths[i].deserialize(dmsg,ptr);
  165. const unsigned int packetLen = dmsg.at<uint16_t>(ptr); ptr += 2;
  166. const void *packet = (const void *)dmsg.field(ptr,packetLen); ptr += packetLen;
  167. if (packetLen >= ZT_PROTO_MIN_FRAGMENT_LENGTH) { // ignore anything too short to contain a dest address
  168. const Address destinationAddress(reinterpret_cast<const char *>(packet) + 8,ZT_ADDRESS_LENGTH);
  169. SharedPtr<Peer> destinationPeer(RR->topology->getPeer(destinationAddress));
  170. if (destinationPeer) {
  171. if (
  172. (destinationPeer->send(RR,packet,packetLen,RR->node->now()))&&
  173. (numRemotePeerPaths > 0)&&
  174. (packetLen >= 18)&&
  175. (reinterpret_cast<const unsigned char *>(packet)[ZT_PACKET_FRAGMENT_IDX_FRAGMENT_INDICATOR] == ZT_PACKET_FRAGMENT_INDICATOR)
  176. ) {
  177. // If remote peer paths were sent with this relayed packet, we do
  178. // RENDEZVOUS. It's handled here for cluster-relayed packets since
  179. // we don't have both Peer records so this is a different path.
  180. const Address remotePeerAddress(reinterpret_cast<const char *>(packet) + 13,ZT_ADDRESS_LENGTH);
  181. InetAddress bestDestV4,bestDestV6;
  182. destinationPeer->getBestActiveAddresses(RR->node->now(),bestDestV4,bestDestV6);
  183. InetAddress bestRemoteV4,bestRemoteV6;
  184. for(unsigned int i=0;i<numRemotePeerPaths;++i) {
  185. if ((bestRemoteV4)&&(bestRemoteV6))
  186. break;
  187. switch(remotePeerPaths[i].ss_family) {
  188. case AF_INET:
  189. if (!bestRemoteV4)
  190. bestRemoteV4 = remotePeerPaths[i];
  191. break;
  192. case AF_INET6:
  193. if (!bestRemoteV6)
  194. bestRemoteV6 = remotePeerPaths[i];
  195. break;
  196. }
  197. }
  198. Packet rendezvousForDest(destinationAddress,RR->identity.address(),Packet::VERB_RENDEZVOUS);
  199. rendezvousForDest.append((uint8_t)0);
  200. remotePeerAddress.appendTo(rendezvousForDest);
  201. Buffer<2048> rendezvousForOtherEnd;
  202. rendezvousForOtherEnd.addSize(2); // leave room for payload size
  203. rendezvousForOtherEnd.append((uint8_t)STATE_MESSAGE_PROXY_SEND);
  204. remotePeerAddress.appendTo(rendezvousForOtherEnd);
  205. rendezvousForOtherEnd.append((uint8_t)Packet::VERB_RENDEZVOUS);
  206. const unsigned int rendezvousForOtherEndPayloadSizePtr = rendezvousForOtherEnd.size();
  207. rendezvousForOtherEnd.addSize(2); // space for actual packet payload length
  208. rendezvousForOtherEnd.append((uint8_t)0); // flags == 0
  209. destinationAddress.appendTo(rendezvousForOtherEnd);
  210. bool haveMatch = false;
  211. if ((bestDestV6)&&(bestRemoteV6)) {
  212. haveMatch = true;
  213. rendezvousForDest.append((uint16_t)bestRemoteV6.port());
  214. rendezvousForDest.append((uint8_t)16);
  215. rendezvousForDest.append(bestRemoteV6.rawIpData(),16);
  216. rendezvousForOtherEnd.append((uint16_t)bestDestV6.port());
  217. rendezvousForOtherEnd.append((uint8_t)16);
  218. rendezvousForOtherEnd.append(bestDestV6.rawIpData(),16);
  219. rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 16));
  220. } else if ((bestDestV4)&&(bestRemoteV4)) {
  221. haveMatch = true;
  222. rendezvousForDest.append((uint16_t)bestRemoteV4.port());
  223. rendezvousForDest.append((uint8_t)4);
  224. rendezvousForDest.append(bestRemoteV4.rawIpData(),4);
  225. rendezvousForOtherEnd.append((uint16_t)bestDestV4.port());
  226. rendezvousForOtherEnd.append((uint8_t)4);
  227. rendezvousForOtherEnd.append(bestDestV4.rawIpData(),4);
  228. rendezvousForOtherEnd.setAt<uint16_t>(rendezvousForOtherEndPayloadSizePtr,(uint16_t)(9 + 4));
  229. }
  230. if (haveMatch) {
  231. RR->sw->send(rendezvousForDest,true,0);
  232. rendezvousForOtherEnd.setAt<uint16_t>(0,(uint16_t)(rendezvousForOtherEnd.size() - 2));
  233. _send(fromMemberId,rendezvousForOtherEnd.data(),rendezvousForOtherEnd.size());
  234. }
  235. }
  236. }
  237. }
  238. } break;
  239. case STATE_MESSAGE_PROXY_SEND: {
  240. const Address rcpt(dmsg.field(ptr,ZT_ADDRESS_LENGTH),ZT_ADDRESS_LENGTH);
  241. const Packet::Verb verb = (Packet::Verb)dmsg[ptr++];
  242. const unsigned int len = dmsg.at<uint16_t>(ptr); ptr += 2;
  243. Packet outp(rcpt,RR->identity.address(),verb);
  244. outp.append(dmsg.field(ptr,len),len);
  245. RR->sw->send(outp,true,0);
  246. } break;
  247. }
  248. } catch ( ... ) {
  249. TRACE("invalid message of size %u type %d (inner decode), discarding",mlen,mtype);
  250. // drop invalids
  251. }
  252. ptr = nextPtr;
  253. }
  254. } catch ( ... ) {
  255. TRACE("invalid message (outer loop), discarding");
  256. // drop invalids
  257. }
  258. }
  259. void Cluster::replicateHavePeer(const Address &peerAddress)
  260. {
  261. }
  262. void Cluster::replicateMulticastLike(uint64_t nwid,const Address &peerAddress,const MulticastGroup &group)
  263. {
  264. }
  265. void Cluster::replicateCertificateOfNetworkMembership(const CertificateOfMembership &com)
  266. {
  267. }
  268. void Cluster::doPeriodicTasks()
  269. {
  270. // Go ahead and flush whenever possible right now
  271. {
  272. Mutex::Lock _l(_memberIds_m);
  273. for(std::vector<uint16_t>::const_iterator mid(_memberIds.begin());mid!=_memberIds.end();++mid) {
  274. Mutex::Lock _l2(_members[*mid].lock);
  275. _flush(*mid);
  276. }
  277. }
  278. }
  279. void Cluster::addMember(uint16_t memberId)
  280. {
  281. Mutex::Lock _l2(_members[memberId].lock);
  282. Mutex::Lock _l(_memberIds_m);
  283. _memberIds.push_back(memberId);
  284. std::sort(_memberIds.begin(),_memberIds.end());
  285. // Generate this member's message key from the master and its ID
  286. uint16_t stmp[ZT_SHA512_DIGEST_LEN / sizeof(uint16_t)];
  287. memcpy(stmp,_masterSecret,sizeof(stmp));
  288. stmp[0] ^= Utils::hton(memberId);
  289. SHA512::hash(stmp,stmp,sizeof(stmp));
  290. SHA512::hash(stmp,stmp,sizeof(stmp));
  291. memcpy(_members[memberId].key,stmp,sizeof(_members[memberId].key));
  292. Utils::burn(stmp,sizeof(stmp));
  293. // Prepare q
  294. _members[memberId].q.clear();
  295. char iv[16];
  296. Utils::getSecureRandom(iv,16);
  297. _members[memberId].q.append(iv,16);
  298. _members[memberId].q.addSize(8); // room for MAC
  299. _members[memberId].q.append((uint16_t)_id);
  300. }
  301. void Cluster::_send(uint16_t memberId,const void *msg,unsigned int len)
  302. {
  303. _Member &m = _members[memberId];
  304. // assumes m.lock is locked!
  305. for(;;) {
  306. if ((m.q.size() + len) > ZT_CLUSTER_MAX_MESSAGE_LENGTH)
  307. _flush(memberId);
  308. else {
  309. m.q.append(msg,len);
  310. break;
  311. }
  312. }
  313. }
  314. void Cluster::_flush(uint16_t memberId)
  315. {
  316. _Member &m = _members[memberId];
  317. // assumes m.lock is locked!
  318. if (m.q.size() > 26) { // 16-byte IV + 8-byte MAC + 2-byte cluster member ID (latter two bytes are inside crypto envelope)
  319. // Create key from member's key and IV
  320. char keytmp[32];
  321. memcpy(keytmp,m.key,32);
  322. for(int i=0;i<8;++i)
  323. keytmp[i] ^= m.q[i];
  324. Salsa20 s20(keytmp,256,m.q.field(8,8));
  325. Utils::burn(keytmp,sizeof(keytmp));
  326. // One-time-use Poly1305 key from first 32 bytes of Salsa20 keystream (as per DJB/NaCl "standard")
  327. char polykey[ZT_POLY1305_KEY_LEN];
  328. memset(polykey,0,sizeof(polykey));
  329. s20.encrypt12(polykey,polykey,sizeof(polykey));
  330. // Encrypt m.q in place
  331. s20.encrypt12(reinterpret_cast<const char *>(m.q.data()) + 24,const_cast<char *>(reinterpret_cast<const char *>(m.q.data())) + 24,m.q.size() - 24);
  332. // Add MAC for authentication (encrypt-then-MAC)
  333. char mac[ZT_POLY1305_MAC_LEN];
  334. Poly1305::compute(mac,reinterpret_cast<const char *>(m.q.data()) + 24,m.q.size() - 24,polykey);
  335. memcpy(m.q.field(16,8),mac,8);
  336. // Send!
  337. _sendFunction(_arg,memberId,m.q.data(),m.q.size());
  338. // Prepare for more
  339. m.q.clear();
  340. char iv[16];
  341. Utils::getSecureRandom(iv,16);
  342. m.q.append(iv,16);
  343. m.q.addSize(8); // room for MAC
  344. m.q.append((uint16_t)_id);
  345. }
  346. }
  347. } // namespace ZeroTier
  348. #endif // ZT_ENABLE_CLUSTER