Defragmenter.hpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. /*
  2. * Copyright (c)2013-2020 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2024-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_DEFRAGMENTER_HPP
  14. #define ZT_DEFRAGMENTER_HPP
  15. #include "Constants.hpp"
  16. #include "Buf.hpp"
  17. #include "AtomicCounter.hpp"
  18. #include "SharedPtr.hpp"
  19. #include "Hashtable.hpp"
  20. #include "Mutex.hpp"
  21. #include "Path.hpp"
  22. #include <cstring>
  23. #include <cstdlib>
  24. #include <vector>
  25. namespace ZeroTier {
  26. /**
  27. * Generalized putter back together-er for fragmented messages
  28. *
  29. * This is used both for packet fragment assembly and multi-chunk network config
  30. * assembly. This is abstracted out of the code that uses it because it's a bit of
  31. * a hairy and difficult thing to get both correct and fast, and because its
  32. * hairiness makes it very desirable to be able to test and fuzz this code
  33. * independently.
  34. *
  35. * Here be dragons!
  36. *
  37. * @tparam MF Maximum number of fragments that each message can possess
  38. */
  39. template<unsigned int MF>
  40. class Defragmenter
  41. {
  42. public:
  43. /**
  44. * Error codes for assemble()
  45. */
  46. enum ErrorCode
  47. {
  48. /**
  49. * No error occurred
  50. */
  51. ERR_NONE,
  52. /**
  53. * This fragment duplicates another with the same fragment number for this message
  54. */
  55. ERR_DUPLICATE_FRAGMENT,
  56. /**
  57. * The fragment is invalid, such as e.g. having a fragment number beyond the expected count.
  58. */
  59. ERR_INVALID_FRAGMENT,
  60. /**
  61. * Too many fragments are in flight for this path
  62. *
  63. * The message will be marked as if it's done (all fragments received) but will
  64. * be abandoned. Subsequent fragments will generate a DUPLICATE_FRAGMENT error.
  65. *
  66. * This is an anti-denial-of-service feature to limit the number of inbound
  67. * fragments that can be in flight over a given physical network path.
  68. */
  69. ERR_TOO_MANY_FRAGMENTS_FOR_PATH,
  70. /**
  71. * Memory (or some other limit) exhausted
  72. */
  73. ERR_OUT_OF_MEMORY
  74. };
  75. /**
  76. * Return tuple for assemble()
  77. */
  78. struct Result
  79. {
  80. ZT_ALWAYS_INLINE Result() : message(),messageFragmentCount(0),error(Defragmenter::ERR_NONE) {}
  81. /**
  82. * Fully assembled message as a series of slices of fragments
  83. */
  84. Buf<>::Slice message[MF];
  85. /**
  86. * Fully assembled message fragment count (number of slices)
  87. *
  88. * This will be nonzero if the message is fully assembled.
  89. */
  90. unsigned int messageFragmentCount;
  91. /**
  92. * Error code or ERR_NONE if none
  93. */
  94. Defragmenter::ErrorCode error;
  95. };
  96. /**
  97. * Process a fragment of a multi-part message
  98. *
  99. * The message ID is arbitrary but must be something that can uniquely
  100. * group fragments for a given final message. The total fragments expected
  101. * value is expectded to be the same for all fragments in a message. Results
  102. * are undefined and probably wrong if this value changes across a message.
  103. * Fragment numbers must be sequential starting with 0 and going up to
  104. * one minus total fragments expected (non-inclusive range).
  105. *
  106. * Fragments can arrive in any order. Duplicates are dropped and ignored.
  107. *
  108. * It's the responsibility of the caller to do whatever validation needs to
  109. * be done before considering a fragment valid and to make sure the fragment
  110. * data index and size parameters are valid.
  111. *
  112. * The fragment supplied to this function is kept and held under the supplied
  113. * message ID until or unless (1) the message is fully assembled, (2) the
  114. * message is orphaned and its entry is taken by a new message, or (3) the
  115. * clear() function is called to forget all incoming messages. The pointer
  116. * at the 'fragment' reference will be zeroed since this pointer is handed
  117. * off, so the SharedPtr<> passed in as 'fragment' will be NULL after this
  118. * function is called.
  119. *
  120. * The result returned by this function is a structure containing a series
  121. * of assembled and complete fragments, a fragment count, and an error.
  122. * If the message fragment count is non-zero then the message has been
  123. * successfully assembled. If the fragment count is zero then an error may
  124. * have occurred or the message may simply not yet be complete.
  125. *
  126. * The calling code must decide what to do with the assembled and ordered
  127. * fragments, such as memcpy'ing them into a contiguous buffer or handling
  128. * them as a vector of fragments.
  129. *
  130. * The 'via' parameter causes this fragment to be registered with a path and
  131. * unregistered when done or abandoned. It's only used the first time it's
  132. * supplied (the first non-NULL) for a given message ID. This is a mitigation
  133. * against memory exhausting DOS attacks.
  134. *
  135. * Lastly the message queue size target and GC trigger parameters control
  136. * garbage collection of defragmenter message queue entries. If the size
  137. * target parameter is non-zero then the message queue is cleaned when its
  138. * size reaches the GC trigger parameter, which MUST be larger than the size
  139. * target. Cleaning is done by sorting all entries by their last modified
  140. * timestamp and removing the oldest N entries so as to bring the size down
  141. * to under the size target. The use of a trigger size that is larger than
  142. * the size target reduces CPU-wasting thrashing. A good value for the trigger
  143. * is 2X the size target, causing cleanups to happen only occasionally.
  144. *
  145. * If the GC parameters are set to zero then clear() must be called from time
  146. * to time or memory use will grow without bound.
  147. *
  148. * @tparam X Template parameter type for Buf<> containing fragment (inferred)
  149. * @param messageId Message ID (a unique ID identifying this message)
  150. * @param fragment Buffer containing fragment that will be filed under this message's ID
  151. * @param fragmentDataIndex Index of data in fragment's data.bytes (fragment's data.fields type is ignored)
  152. * @param fragmentDataSize Length of data in fragment's data.bytes (fragment's data.fields type is ignored)
  153. * @param fragmentNo Number of fragment (0..totalFragmentsExpected, non-inclusive)
  154. * @param totalFragmentsExpected Total number of expected fragments in this message
  155. * @param now Current time
  156. * @param via If non-NULL this is the path on which this message fragment was received
  157. * @param maxIncomingFragmentsPerPath If via is non-NULL this is a cutoff for maximum fragments in flight via this path
  158. * @param messageQueueSizeTarget If non-zero periodically clean the message queue to bring it under this size
  159. * @param messageQueueSizeGCTrigger A value larger than messageQueueSizeTarget that is when cleaning is performed
  160. * @return Result buffer (pointer to 'result' or newly allocated buffer) or NULL if message not complete
  161. */
  162. ZT_ALWAYS_INLINE Result assemble(
  163. const uint64_t messageId,
  164. SharedPtr< Buf<> > &fragment,
  165. const unsigned int fragmentDataIndex,
  166. const unsigned int fragmentDataSize,
  167. const unsigned int fragmentNo,
  168. const unsigned int totalFragmentsExpected,
  169. const int64_t now,
  170. const SharedPtr< Path > &via,
  171. const unsigned int maxIncomingFragmentsPerPath,
  172. const unsigned long messageQueueSizeTarget,
  173. const unsigned long messageQueueSizeGCTrigger)
  174. {
  175. Result r;
  176. // Sanity checks for malformed fragments or invalid input parameters.
  177. if ((fragmentNo >= totalFragmentsExpected)||(totalFragmentsExpected > MF)||(totalFragmentsExpected == 0)) {
  178. r.error = ERR_INVALID_FRAGMENT;
  179. return r;
  180. }
  181. // If there is only one fragment just return that fragment and we are done.
  182. if (totalFragmentsExpected < 2) {
  183. if (fragmentNo == 0) {
  184. r.message[0].b.move(fragment);
  185. r.message[0].s = fragmentDataIndex;
  186. r.message[0].e = fragmentDataSize;
  187. r.messageFragmentCount = 1;
  188. return r;
  189. } else {
  190. r.error = ERR_INVALID_FRAGMENT;
  191. return r;
  192. }
  193. }
  194. // Lock messages for read and look up current entry. Also check the
  195. // GC trigger and if we've exceeded that threshold then older message
  196. // entries are garbage collected.
  197. _messages_l.rlock();
  198. if (messageQueueSizeTarget > 0) {
  199. if (_messages.size() >= messageQueueSizeGCTrigger) {
  200. try {
  201. // Scan messages with read lock still locked first and make a sorted list of
  202. // message entries by last modified time. Then lock for writing and delete
  203. // the oldest entries to bring the size of the messages hash table down to
  204. // under the target size. This tries to minimize the amount of time the write
  205. // lock is held since many threads can hold the read lock but all threads must
  206. // wait if someone holds the write lock.
  207. std::vector< std::pair<int64_t,uint64_t> > messagesByLastUsedTime;
  208. messagesByLastUsedTime.reserve(_messages.size());
  209. typename Hashtable<uint64_t,_E>::Iterator i(_messages);
  210. uint64_t *mk = nullptr;
  211. _E *mv = nullptr;
  212. while (i.next(mk,mv))
  213. messagesByLastUsedTime.push_back(std::pair<int64_t,uint64_t>(mv->lastUsed,*mk));
  214. std::sort(messagesByLastUsedTime.begin(),messagesByLastUsedTime.end());
  215. _messages_l.runlock();
  216. _messages_l.lock();
  217. for (unsigned long x = 0,y = (messagesByLastUsedTime.size() - messageQueueSizeTarget); x <= y; ++x)
  218. _messages.erase(messagesByLastUsedTime[x].second);
  219. _messages_l.unlock();
  220. _messages_l.rlock();
  221. } catch (...) {
  222. // The only way something in that code can throw is if a bad_alloc occurs when
  223. // reserve() is called in the vector. In this case we flush the entire queue
  224. // and error out. This is very rare and on some platforms impossible.
  225. _messages_l.runlock();
  226. _messages_l.lock();
  227. _messages.clear();
  228. _messages_l.unlock();
  229. r.error = ERR_OUT_OF_MEMORY;
  230. return r;
  231. }
  232. }
  233. }
  234. _E *e = _messages.get(messageId);
  235. _messages_l.runlock();
  236. // If no entry exists we must briefly lock messages for write and create a new one.
  237. if (!e) {
  238. try {
  239. RWMutex::Lock ml(_messages_l);
  240. e = &(_messages[messageId]);
  241. } catch ( ... ) {
  242. r.error = ERR_OUT_OF_MEMORY;
  243. return r;
  244. }
  245. e->id = messageId;
  246. }
  247. // Now handle this fragment within this individual message entry.
  248. Mutex::Lock el(e->lock);
  249. // Note: it's important that _messages_l is not locked while the entry
  250. // is locked or a deadlock could occur due to GC or clear() being called
  251. // in another thread.
  252. // If there is a path associated with this fragment make sure we've registered
  253. // ourselves as in flight, check the limit, and abort if exceeded.
  254. if ((via)&&(!e->via)) {
  255. e->via = via;
  256. bool tooManyPerPath = false;
  257. via->_inboundFragmentedMessages_l.lock();
  258. try {
  259. if (via->_inboundFragmentedMessages.size() < maxIncomingFragmentsPerPath) {
  260. via->_inboundFragmentedMessages.insert(messageId);
  261. } else {
  262. tooManyPerPath = true;
  263. }
  264. } catch ( ... ) {
  265. // This would indicate something like bad_alloc thrown by the set. Treat
  266. // it as limit exceeded.
  267. tooManyPerPath = true;
  268. }
  269. via->_inboundFragmentedMessages_l.unlock();
  270. if (tooManyPerPath) {
  271. r.error = ERR_TOO_MANY_FRAGMENTS_FOR_PATH;
  272. return r;
  273. }
  274. }
  275. // Update last-activity timestamp for this entry.
  276. e->lastUsed = now;
  277. // If we already have fragment number X, abort. Note that we do not
  278. // actually compare data here. Two same-numbered fragments with different
  279. // data would just mean the transfer is corrupt and would be detected
  280. // later e.g. by packet MAC check. Other use cases of this code like
  281. // network configs check each fragment so this basically can't happen.
  282. Buf<>::Slice &s = e->fragment[fragmentNo];
  283. if (s.b) {
  284. r.error = ERR_DUPLICATE_FRAGMENT;
  285. return r;
  286. }
  287. // Take ownership of fragment, setting 'fragment' pointer to NULL. The simple
  288. // transfer of the pointer avoids a synchronized increment/decrement of the object's
  289. // reference count.
  290. s.b.move(fragment);
  291. s.s = fragmentDataIndex;
  292. s.e = fragmentDataIndex + fragmentDataSize;
  293. // If we now have all fragments then assemble them.
  294. if (++e->fragmentCount >= totalFragmentsExpected) {
  295. // This message is done so de-register it with its path if one is associated.
  296. if (e->via) {
  297. e->via->_inboundFragmentedMessages_l.lock();
  298. e->via->_inboundFragmentedMessages.erase(messageId);
  299. e->via->_inboundFragmentedMessages_l.unlock();
  300. e->via.zero();
  301. }
  302. // PERFORMANCE HACK: SharedPtr<> is introspective and only holds a pointer, so we
  303. // can 'move' the pointers it holds very quickly by bulk copying the source
  304. // slices and then zeroing the originals. This is only okay if the destination
  305. // currently holds no pointers, which should always be the case. Don't try this
  306. // at home kids.
  307. unsigned int msize = e->fragmentCount * sizeof(Buf<>::Slice);
  308. memcpy(reinterpret_cast<void *>(r.message),reinterpret_cast<const void *>(e->fragment),msize);
  309. memset(reinterpret_cast<void *>(e->fragment),0,msize);
  310. r.messageFragmentCount = e->fragmentCount;
  311. }
  312. return r;
  313. }
  314. /**
  315. * Erase all message entries in the internal queue
  316. */
  317. ZT_ALWAYS_INLINE void clear()
  318. {
  319. RWMutex::Lock ml(_messages_l);
  320. _messages.clear();
  321. }
  322. private:
  323. struct _E
  324. {
  325. ZT_ALWAYS_INLINE _E() : id(0),lastUsed(0),via(),fragmentCount(0) {}
  326. ZT_ALWAYS_INLINE ~_E()
  327. {
  328. // Ensure that this entry is not in use while it is being deleted!
  329. lock.lock();
  330. if (via) {
  331. via->_inboundFragmentedMessages_l.lock();
  332. via->_inboundFragmentedMessages.erase(id);
  333. via->_inboundFragmentedMessages_l.unlock();
  334. }
  335. lock.unlock();
  336. }
  337. uint64_t id;
  338. volatile int64_t lastUsed;
  339. SharedPtr<Path> via;
  340. Buf<>::Slice fragment[MF];
  341. unsigned int fragmentCount;
  342. Mutex lock;
  343. };
  344. Hashtable< uint64_t,_E > _messages;
  345. RWMutex _messages_l;
  346. };
  347. } // namespace ZeroTier
  348. #endif