Defragmenter.hpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. /*
  2. * Copyright (c)2013-2020 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2024-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_DEFRAGMENTER_HPP
  14. #define ZT_DEFRAGMENTER_HPP
  15. #include "Constants.hpp"
  16. #include "Buf.hpp"
  17. #include "SharedPtr.hpp"
  18. #include "Hashtable.hpp"
  19. #include "Mutex.hpp"
  20. #include "Path.hpp"
  21. #include "FCV.hpp"
  22. #include <cstring>
  23. #include <cstdlib>
  24. #include <vector>
  25. namespace ZeroTier {
  26. /**
  27. * Generalized putter back together-er for fragmented messages
  28. *
  29. * This is used both for packet fragment assembly and multi-chunk network config
  30. * assembly. This is abstracted out of the code that uses it because it's a bit of
  31. * a hairy and difficult thing to get both correct and fast, and because its
  32. * hairiness makes it very desirable to be able to test and fuzz this code
  33. * independently.
  34. *
  35. * This class is thread-safe and handles locking internally.
  36. *
  37. * @tparam MF Maximum number of fragments that each message can possess
  38. * @tparam GCS Garbage collection target size for the incoming message queue
  39. * @tparam GCT Garbage collection trigger threshold, usually 2X GCS
  40. */
  41. template<unsigned int MF = 16,unsigned int GCS = 32,unsigned int GCT = 64>
  42. class Defragmenter
  43. {
  44. public:
  45. /**
  46. * Return values from assemble()
  47. */
  48. enum ResultCode
  49. {
  50. /**
  51. * No error occurred, fragment accepted
  52. */
  53. OK,
  54. /**
  55. * Message fully assembled and placed in message vector
  56. */
  57. COMPLETE,
  58. /**
  59. * We already have this fragment number or the message is complete
  60. */
  61. ERR_DUPLICATE_FRAGMENT,
  62. /**
  63. * The fragment is invalid, such as e.g. having a fragment number beyond the expected count.
  64. */
  65. ERR_INVALID_FRAGMENT,
  66. /**
  67. * Too many fragments are in flight for this path
  68. *
  69. * The message will be marked as if it's done (all fragments received) but will
  70. * be abandoned. Subsequent fragments will generate a DUPLICATE_FRAGMENT error.
  71. *
  72. * This is an anti-denial-of-service feature to limit the number of inbound
  73. * fragments that can be in flight over a given physical network path.
  74. */
  75. ERR_TOO_MANY_FRAGMENTS_FOR_PATH,
  76. /**
  77. * Memory (or some other limit) exhausted
  78. */
  79. ERR_OUT_OF_MEMORY
  80. };
  81. ZT_INLINE Defragmenter() :
  82. _messages(GCT * 2)
  83. {
  84. }
  85. /**
  86. * Process a fragment of a multi-part message
  87. *
  88. * The message ID is arbitrary but must be something that can uniquely
  89. * group fragments for a given final message. The total fragments expected
  90. * value is expectded to be the same for all fragments in a message. Results
  91. * are undefined and probably wrong if this value changes across a message.
  92. * Fragment numbers must be sequential starting with 0 and going up to
  93. * one minus total fragments expected (non-inclusive range).
  94. *
  95. * Fragments can arrive in any order. Duplicates are dropped and ignored.
  96. *
  97. * It's the responsibility of the caller to do whatever validation needs to
  98. * be done before considering a fragment valid and to make sure the fragment
  99. * data index and size parameters are valid.
  100. *
  101. * The fragment supplied to this function is kept and held under the supplied
  102. * message ID until or unless (1) the message is fully assembled, (2) the
  103. * message is orphaned and its entry is taken by a new message, or (3) the
  104. * clear() function is called to forget all incoming messages. The pointer
  105. * at the 'fragment' reference will be zeroed since this pointer is handed
  106. * off, so the SharedPtr<> passed in as 'fragment' will be NULL after this
  107. * function is called.
  108. *
  109. * The 'via' parameter causes this fragment to be registered with a path and
  110. * unregistered when done or abandoned. It's only used the first time it's
  111. * supplied (the first non-NULL) for a given message ID. This is a mitigation
  112. * against memory exhausting DOS attacks.
  113. *
  114. * @tparam X Template parameter type for Buf<> containing fragment (inferred)
  115. * @param messageId Message ID (a unique ID identifying this message)
  116. * @param message Fixed capacity vector that will be filled with the result if result code is DONE
  117. * @param fragment Buffer containing fragment that will be filed under this message's ID
  118. * @param fragmentDataIndex Index of data in fragment's data.bytes (fragment's data.fields type is ignored)
  119. * @param fragmentDataSize Length of data in fragment's data.bytes (fragment's data.fields type is ignored)
  120. * @param fragmentNo Number of fragment (0..totalFragmentsExpected, non-inclusive)
  121. * @param totalFragmentsExpected Total number of expected fragments in this message or 0 to use cached value
  122. * @param now Current time
  123. * @param via If non-NULL this is the path on which this message fragment was received
  124. * @param maxIncomingFragmentsPerPath If via is non-NULL this is a cutoff for maximum fragments in flight via this path
  125. * @return Result code
  126. */
  127. ZT_INLINE ResultCode assemble(
  128. const uint64_t messageId,
  129. FCV< Buf::Slice,MF > &message,
  130. SharedPtr<Buf> &fragment,
  131. const unsigned int fragmentDataIndex,
  132. const unsigned int fragmentDataSize,
  133. const unsigned int fragmentNo,
  134. const unsigned int totalFragmentsExpected,
  135. const int64_t now,
  136. const SharedPtr< Path > &via,
  137. const unsigned int maxIncomingFragmentsPerPath)
  138. {
  139. // Sanity checks for malformed fragments or invalid input parameters.
  140. if ((fragmentNo >= totalFragmentsExpected)||(totalFragmentsExpected > MF)||(totalFragmentsExpected == 0))
  141. return ERR_INVALID_FRAGMENT;
  142. // We hold the read lock on _messages unless we need to add a new entry or do GC.
  143. RWMutex::RMaybeWLock ml(_messages_l);
  144. // Check message hash table size and perform GC if necessary.
  145. if (_messages.size() >= GCT) {
  146. try {
  147. // Scan messages with read lock still locked first and make a sorted list of
  148. // message entries by last modified time. Then lock for writing and delete
  149. // the oldest entries to bring the size of the messages hash table down to
  150. // under the target size. This tries to minimize the amount of time the write
  151. // lock is held since many threads can hold the read lock but all threads must
  152. // wait if someone holds the write lock.
  153. std::vector<std::pair<int64_t,uint64_t> > messagesByLastUsedTime;
  154. messagesByLastUsedTime.reserve(_messages.size());
  155. typename Hashtable<uint64_t,_E>::Iterator i(_messages);
  156. uint64_t *mk = nullptr;
  157. _E *mv = nullptr;
  158. while (i.next(mk,mv))
  159. messagesByLastUsedTime.push_back(std::pair<int64_t,uint64_t>(mv->lastUsed,*mk));
  160. std::sort(messagesByLastUsedTime.begin(),messagesByLastUsedTime.end());
  161. ml.writing(); // acquire write lock on _messages
  162. for (unsigned long x = 0,y = (messagesByLastUsedTime.size() - GCS); x <= y; ++x)
  163. _messages.erase(messagesByLastUsedTime[x].second);
  164. } catch (...) {
  165. return ERR_OUT_OF_MEMORY;
  166. }
  167. }
  168. // Get or create message fragment.
  169. _E *e = _messages.get(messageId);
  170. if (!e) {
  171. ml.writing(); // acquire write lock on _messages if not already
  172. try {
  173. e = &(_messages[messageId]);
  174. } catch (...) {
  175. return ERR_OUT_OF_MEMORY;
  176. }
  177. e->id = messageId;
  178. }
  179. // Switch back to holding only the read lock on _messages if we have locked for write
  180. ml.reading();
  181. // Acquire lock on entry itself
  182. Mutex::Lock el(e->lock);
  183. // This magic value means this message has already been assembled and is done.
  184. if (e->lastUsed < 0)
  185. return ERR_DUPLICATE_FRAGMENT;
  186. // Update last-activity timestamp for this entry, delaying GC.
  187. e->lastUsed = now;
  188. // Learn total fragments expected if a value is given. Otherwise the cached
  189. // value gets used. This is to support the implementation of fragmentation
  190. // in the ZT protocol where only fragments carry the total.
  191. if (totalFragmentsExpected > 0)
  192. e->totalFragmentsExpected = totalFragmentsExpected;
  193. // If there is a path associated with this fragment make sure we've registered
  194. // ourselves as in flight, check the limit, and abort if exceeded.
  195. if ((via)&&(!e->via)) {
  196. e->via = via;
  197. bool tooManyPerPath = false;
  198. via->_inboundFragmentedMessages_l.lock();
  199. try {
  200. if (via->_inboundFragmentedMessages.size() < maxIncomingFragmentsPerPath) {
  201. via->_inboundFragmentedMessages.insert(messageId);
  202. } else {
  203. tooManyPerPath = true;
  204. }
  205. } catch ( ... ) {
  206. // This would indicate something like bad_alloc thrown by the set. Treat
  207. // it as limit exceeded.
  208. tooManyPerPath = true;
  209. }
  210. via->_inboundFragmentedMessages_l.unlock();
  211. if (tooManyPerPath)
  212. return ERR_TOO_MANY_FRAGMENTS_FOR_PATH;
  213. }
  214. // If we already have fragment number X, abort. Note that we do not
  215. // actually compare data here. Two same-numbered fragments with different
  216. // data would just mean the transfer is corrupt and would be detected
  217. // later e.g. by packet MAC check. Other use cases of this code like
  218. // network configs check each fragment so this basically can't happen.
  219. Buf::Slice &s = e->message.at(fragmentNo);
  220. if (s.b)
  221. return ERR_DUPLICATE_FRAGMENT;
  222. // Take ownership of fragment, setting 'fragment' pointer to NULL. The simple
  223. // transfer of the pointer avoids a synchronized increment/decrement of the object's
  224. // reference count.
  225. s.b.move(fragment);
  226. s.s = fragmentDataIndex;
  227. s.e = fragmentDataIndex + fragmentDataSize;
  228. ++e->fragmentsReceived;
  229. // If we now have all fragments then assemble them.
  230. if ((e->fragmentsReceived >= e->totalFragmentsExpected)&&(e->totalFragmentsExpected > 0)) {
  231. // This message is done so de-register it with its path if one is associated.
  232. if (e->via) {
  233. e->via->_inboundFragmentedMessages_l.lock();
  234. e->via->_inboundFragmentedMessages.erase(messageId);
  235. e->via->_inboundFragmentedMessages_l.unlock();
  236. e->via.zero();
  237. }
  238. // Slices are TriviallyCopyable and so may be memcpy'd from e->message to
  239. // the result parameter. This is fast.
  240. e->message.unsafeMoveTo(message);
  241. e->lastUsed = -1; // mark as "done" and force GC to collect
  242. return COMPLETE;
  243. }
  244. return OK;
  245. }
  246. /**
  247. * Erase all message entries in the internal queue
  248. */
  249. ZT_INLINE void clear()
  250. {
  251. RWMutex::Lock ml(_messages_l);
  252. _messages.clear();
  253. }
  254. /**
  255. * @return Number of entries currently in message defragmentation cache
  256. */
  257. ZT_INLINE unsigned int cacheSize() noexcept
  258. {
  259. RWMutex::RLock ml(_messages_l);
  260. return _messages.size();
  261. }
  262. private:
  263. struct _E
  264. {
  265. ZT_INLINE _E() noexcept : id(0),lastUsed(0),totalFragmentsExpected(0),fragmentsReceived(0),via(),message(),lock() {}
  266. ZT_INLINE ~_E()
  267. {
  268. if (via) {
  269. via->_inboundFragmentedMessages_l.lock();
  270. via->_inboundFragmentedMessages.erase(id);
  271. via->_inboundFragmentedMessages_l.unlock();
  272. }
  273. }
  274. uint64_t id;
  275. volatile int64_t lastUsed;
  276. unsigned int totalFragmentsExpected;
  277. unsigned int fragmentsReceived;
  278. SharedPtr<Path> via;
  279. FCV< Buf::Slice,MF > message;
  280. Mutex lock;
  281. };
  282. Hashtable< uint64_t,_E > _messages;
  283. RWMutex _messages_l;
  284. };
  285. } // namespace ZeroTier
  286. #endif