Defragmenter.hpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. /*
  2. * Copyright (c)2013-2020 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2025-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_DEFRAGMENTER_HPP
  14. #define ZT_DEFRAGMENTER_HPP
  15. #include "Constants.hpp"
  16. #include "Buf.hpp"
  17. #include "SharedPtr.hpp"
  18. #include "Mutex.hpp"
  19. #include "Path.hpp"
  20. #include "FCV.hpp"
  21. #include "Containers.hpp"
  22. namespace ZeroTier {
  23. /**
  24. * Generalized putter back together-er for fragmented messages
  25. *
  26. * This is used both for packet fragment assembly and multi-chunk network config
  27. * assembly. This is abstracted out of the code that uses it because it's a bit of
  28. * a hairy and difficult thing to get both correct and fast, and because its
  29. * hairiness makes it very desirable to be able to test and fuzz this code
  30. * independently.
  31. *
  32. * This class is thread-safe and handles locking internally.
  33. *
  34. * Templating is so that this class can be placed in a test harness and tested
  35. * without dependencies on external code. The default template parameters are
  36. * the ones used throughout the ZeroTier core.
  37. *
  38. * @tparam MF Maximum number of fragments that each message can possess (default: ZT_MAX_PACKET_FRAGMENTS)
  39. * @tparam MFP Maximum number of incoming fragments per path (if paths are specified) (default: ZT_MAX_INCOMING_FRAGMENTS_PER_PATH)
  40. * @tparam GCS Garbage collection target size for the incoming message queue (default: ZT_MAX_PACKET_FRAGMENTS * 2)
  41. * @tparam GCT Garbage collection trigger threshold, usually 2X GCS (default: ZT_MAX_PACKET_FRAGMENTS * 4)
  42. * @tparam P Type for pointer to a path object (default: SharedPtr<Path>)
  43. */
  44. template<
  45. unsigned int MF = ZT_MAX_PACKET_FRAGMENTS,
  46. unsigned int MFP = ZT_MAX_INCOMING_FRAGMENTS_PER_PATH,
  47. unsigned int GCS = (ZT_MAX_PACKET_FRAGMENTS * 2),
  48. unsigned int GCT = (ZT_MAX_PACKET_FRAGMENTS * 4),
  49. typename P = SharedPtr <Path> >
  50. class Defragmenter
  51. {
  52. public:
  53. /**
  54. * Return values from assemble()
  55. */
  56. enum ResultCode
  57. {
  58. /**
  59. * No error occurred, fragment accepted
  60. */
  61. OK,
  62. /**
  63. * Message fully assembled and placed in message vector
  64. */
  65. COMPLETE,
  66. /**
  67. * We already have this fragment number or the message is complete
  68. */
  69. ERR_DUPLICATE_FRAGMENT,
  70. /**
  71. * The fragment is invalid, such as e.g. having a fragment number beyond the expected count.
  72. */
  73. ERR_INVALID_FRAGMENT,
  74. /**
  75. * Too many fragments are in flight for this path
  76. *
  77. * The message will be marked as if it's done (all fragments received) but will
  78. * be abandoned. Subsequent fragments will generate a DUPLICATE_FRAGMENT error.
  79. *
  80. * This is an anti-denial-of-service feature to limit the number of inbound
  81. * fragments that can be in flight over a given physical network path.
  82. */
  83. ERR_TOO_MANY_FRAGMENTS_FOR_PATH,
  84. /**
  85. * Memory (or some other limit) exhausted
  86. */
  87. ERR_OUT_OF_MEMORY
  88. };
  89. ZT_INLINE Defragmenter()
  90. {}
  91. /**
  92. * Process a fragment of a multi-part message
  93. *
  94. * The message ID is arbitrary but must be something that can uniquely
  95. * group fragments for a given final message. The total fragments
  96. * value is expected to be the same for all fragments in a message. Results
  97. * are undefined and probably wrong if this value changes across a message.
  98. * Fragment numbers must be sequential starting with 0 and going up to
  99. * one minus total fragments expected (non-inclusive range).
  100. *
  101. * Fragments can arrive in any order. Duplicates are dropped and ignored.
  102. *
  103. * It's the responsibility of the caller to do whatever validation needs to
  104. * be done before considering a fragment valid and to make sure the fragment
  105. * data index and size parameters are valid.
  106. *
  107. * The fragment supplied to this function is kept and held under the supplied
  108. * message ID until or unless (1) the message is fully assembled, (2) the
  109. * message is orphaned and its entry is taken by a new message, or (3) the
  110. * clear() function is called to forget all incoming messages. The pointer
  111. * at the 'fragment' reference will be zeroed since this pointer is handed
  112. * off, so the SharedPtr<> passed in as 'fragment' will be NULL after this
  113. * function is called.
  114. *
  115. * The 'via' parameter causes this fragment to be registered with a path and
  116. * unregistered when done or abandoned. It's only used the first time it's
  117. * supplied (the first non-NULL) for a given message ID. This is a mitigation
  118. * against memory exhausting DOS attacks.
  119. *
  120. * @tparam X Template parameter type for Buf<> containing fragment (inferred)
  121. * @param messageId Message ID (a unique ID identifying this message)
  122. * @param message Fixed capacity vector that will be filled with the result if result code is DONE
  123. * @param fragment Buffer containing fragment that will be filed under this message's ID
  124. * @param fragmentDataIndex Index of data in fragment's data.bytes (fragment's data.fields type is ignored)
  125. * @param fragmentDataSize Length of data in fragment's data.bytes (fragment's data.fields type is ignored)
  126. * @param fragmentNo Number of fragment (0..totalFragmentsExpected, non-inclusive)
  127. * @param totalFragmentsExpected Total number of expected fragments in this message or 0 to use cached value
  128. * @param now Current time
  129. * @param via If non-NULL this is the path on which this message fragment was received
  130. * @return Result code
  131. */
  132. ZT_INLINE ResultCode assemble(
  133. const uint64_t messageId,
  134. FCV <Buf::Slice, MF> &message,
  135. SharedPtr <Buf> &fragment,
  136. const unsigned int fragmentDataIndex,
  137. const unsigned int fragmentDataSize,
  138. const unsigned int fragmentNo,
  139. const unsigned int totalFragmentsExpected,
  140. const int64_t now,
  141. const P &via)
  142. {
  143. // Sanity checks for malformed fragments or invalid input parameters.
  144. if ((fragmentNo >= totalFragmentsExpected) || (totalFragmentsExpected > MF) || (totalFragmentsExpected == 0))
  145. return ERR_INVALID_FRAGMENT;
  146. // We hold the read lock on _messages unless we need to add a new entry or do GC.
  147. RWMutex::RMaybeWLock ml(m_messages_l);
  148. // Check message hash table size and perform GC if necessary.
  149. if (m_messages.size() >= GCT) {
  150. try {
  151. // Scan messages with read lock still locked first and make a sorted list of
  152. // message entries by last modified time. Then lock for writing and delete
  153. // the oldest entries to bring the size of the messages hash table down to
  154. // under the target size. This tries to minimize the amount of time the write
  155. // lock is held since many threads can hold the read lock but all threads must
  156. // wait if someone holds the write lock.
  157. std::vector< std::pair< int64_t, uint64_t > > messagesByLastUsedTime;
  158. messagesByLastUsedTime.reserve(m_messages.size());
  159. for (typename Map< uint64_t, p_E >::const_iterator i(m_messages.begin()); i != m_messages.end(); ++i)
  160. messagesByLastUsedTime.push_back(std::pair< int64_t, uint64_t >(i->second.lastUsed, i->first));
  161. std::sort(messagesByLastUsedTime.begin(), messagesByLastUsedTime.end());
  162. ml.writing(); // acquire write lock on _messages
  163. for (unsigned long x = 0, y = (messagesByLastUsedTime.size() - GCS); x <= y; ++x)
  164. m_messages.erase(messagesByLastUsedTime[x].second);
  165. } catch (...) {
  166. return ERR_OUT_OF_MEMORY;
  167. }
  168. }
  169. // Get or create message fragment.
  170. Defragmenter< MF, MFP, GCS, GCT, P >::p_E *e;
  171. {
  172. typename Map< uint64_t, Defragmenter< MF, MFP, GCS, GCT, P >::p_E >::iterator ee(m_messages.find(messageId));
  173. if (ee == m_messages.end()) {
  174. ml.writing(); // acquire write lock on _messages if not already
  175. try {
  176. e = &(m_messages[messageId]);
  177. } catch (...) {
  178. return ERR_OUT_OF_MEMORY;
  179. }
  180. e->id = messageId;
  181. } else {
  182. e = &(ee->second);
  183. }
  184. }
  185. // Switch back to holding only the read lock on _messages if we have locked for write
  186. ml.reading();
  187. // Acquire lock on entry itself
  188. Mutex::Lock el(e->lock);
  189. // This magic value means this message has already been assembled and is done.
  190. if (e->lastUsed < 0)
  191. return ERR_DUPLICATE_FRAGMENT;
  192. // Update last-activity timestamp for this entry, delaying GC.
  193. e->lastUsed = now;
  194. // Learn total fragments expected if a value is given. Otherwise the cached
  195. // value gets used. This is to support the implementation of fragmentation
  196. // in the ZT protocol where only fragments carry the total.
  197. if (totalFragmentsExpected > 0)
  198. e->totalFragmentsExpected = totalFragmentsExpected;
  199. // If there is a path associated with this fragment make sure we've registered
  200. // ourselves as in flight, check the limit, and abort if exceeded.
  201. if ((via) && (!e->via)) {
  202. e->via = via;
  203. bool tooManyPerPath = false;
  204. via->m_inboundFragmentedMessages_l.lock();
  205. try {
  206. if (via->m_inboundFragmentedMessages.size() < MFP) {
  207. via->m_inboundFragmentedMessages.insert(messageId);
  208. } else {
  209. tooManyPerPath = true;
  210. }
  211. } catch (...) {
  212. // This would indicate something like bad_alloc thrown by the set. Treat
  213. // it as limit exceeded.
  214. tooManyPerPath = true;
  215. }
  216. via->m_inboundFragmentedMessages_l.unlock();
  217. if (tooManyPerPath)
  218. return ERR_TOO_MANY_FRAGMENTS_FOR_PATH;
  219. }
  220. // If we already have fragment number X, abort. Note that we do not
  221. // actually compare data here. Two same-numbered fragments with different
  222. // data would just mean the transfer is corrupt and would be detected
  223. // later e.g. by packet MAC check. Other use cases of this code like
  224. // network configs check each fragment so this basically can't happen.
  225. Buf::Slice &s = e->message.at(fragmentNo);
  226. if (s.b)
  227. return ERR_DUPLICATE_FRAGMENT;
  228. // Take ownership of fragment, setting 'fragment' pointer to NULL. The simple
  229. // transfer of the pointer avoids a synchronized increment/decrement of the object's
  230. // reference count.
  231. s.b.move(fragment);
  232. s.s = fragmentDataIndex;
  233. s.e = fragmentDataIndex + fragmentDataSize;
  234. ++e->fragmentsReceived;
  235. // If we now have all fragments then assemble them.
  236. if ((e->fragmentsReceived >= e->totalFragmentsExpected) && (e->totalFragmentsExpected > 0)) {
  237. // This message is done so de-register it with its path if one is associated.
  238. if (e->via) {
  239. e->via->m_inboundFragmentedMessages_l.lock();
  240. e->via->m_inboundFragmentedMessages.erase(messageId);
  241. e->via->m_inboundFragmentedMessages_l.unlock();
  242. e->via.zero();
  243. }
  244. // Slices are TriviallyCopyable and so may be raw copied from e->message to
  245. // the result parameter. This is fast.
  246. e->message.unsafeMoveTo(message);
  247. e->lastUsed = -1; // mark as "done" and force GC to collect
  248. return COMPLETE;
  249. }
  250. return OK;
  251. }
  252. /**
  253. * Erase all message entries in the internal queue
  254. */
  255. ZT_INLINE void clear()
  256. {
  257. RWMutex::Lock ml(m_messages_l);
  258. m_messages.clear();
  259. }
  260. /**
  261. * @return Number of entries currently in message defragmentation cache
  262. */
  263. ZT_INLINE unsigned int cacheSize() noexcept
  264. {
  265. RWMutex::RLock ml(m_messages_l);
  266. return m_messages.size();
  267. }
  268. private:
  269. // p_E is an entry in the message queue.
  270. struct p_E
  271. {
  272. ZT_INLINE p_E() noexcept:
  273. id(0),
  274. lastUsed(0),
  275. totalFragmentsExpected(0),
  276. fragmentsReceived(0)
  277. {}
  278. ZT_INLINE p_E(const p_E &e) noexcept:
  279. id(e.id),
  280. lastUsed(e.lastUsed),
  281. totalFragmentsExpected(e.totalFragmentsExpected),
  282. fragmentsReceived(e.fragmentsReceived),
  283. via(e.via),
  284. message(e.message),
  285. lock()
  286. {}
  287. ZT_INLINE ~p_E()
  288. {
  289. if (via) {
  290. via->m_inboundFragmentedMessages_l.lock();
  291. via->m_inboundFragmentedMessages.erase(id);
  292. via->m_inboundFragmentedMessages_l.unlock();
  293. }
  294. }
  295. ZT_INLINE p_E &operator=(const p_E &e)
  296. {
  297. if (this != &e) {
  298. id = e.id;
  299. lastUsed = e.lastUsed;
  300. totalFragmentsExpected = e.totalFragmentsExpected;
  301. fragmentsReceived = e.fragmentsReceived;
  302. via = e.via;
  303. message = e.message;
  304. }
  305. return *this;
  306. }
  307. uint64_t id;
  308. int64_t lastUsed;
  309. unsigned int totalFragmentsExpected;
  310. unsigned int fragmentsReceived;
  311. P via;
  312. FCV <Buf::Slice, MF> message;
  313. Mutex lock;
  314. };
  315. Map <uint64_t, Defragmenter< MF, MFP, GCS, GCT, P >::p_E> m_messages;
  316. RWMutex m_messages_l;
  317. };
  318. } // namespace ZeroTier
  319. #endif