FragmentedTransferManager.cpp 7.8 KB


  1. /* Copyright The kNet Project.
  2. Licensed under the Apache License, Version 2.0 (the "License");
  3. you may not use this file except in compliance with the License.
  4. You may obtain a copy of the License at
  5. http://www.apache.org/licenses/LICENSE-2.0
  6. Unless required by applicable law or agreed to in writing, software
  7. distributed under the License is distributed on an "AS IS" BASIS,
  8. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  9. See the License for the specific language governing permissions and
  10. limitations under the License. */
  11. /** @file FragmentedTransferManager.cpp
  12. @brief */
  13. #include <cstring>
  14. #ifdef KNET_USE_BOOST
  15. #include <boost/thread/thread.hpp>
  16. #endif
  17. #include "kNet/DebugMemoryLeakCheck.h"
  18. #include "kNet/MessageConnection.h"
  19. #include "kNet/FragmentedTransferManager.h"
  20. #include "kNet/NetworkLogging.h"
  21. using namespace std;
  22. namespace kNet
  23. {
  24. void FragmentedSendManager::FragmentedTransfer::AddMessage(NetworkMessage *message)
  25. {
  26. fragments.push_back(message);
  27. message->transfer = this;
  28. }
  29. bool FragmentedSendManager::FragmentedTransfer::RemoveMessage(NetworkMessage *message)
  30. {
  31. for(std::list<NetworkMessage*>::iterator iter = fragments.begin(); iter != fragments.end(); ++iter)
  32. if (*iter == message)
  33. {
  34. message->transfer = 0;
  35. fragments.erase(iter);
  36. KNET_LOG(LogVerbose, "Removing message with seqnum %d (fragnum %d) from transfer ID %d (%p).", (int)message->messageNumber, (int)message->fragmentIndex, id, this);
  37. return true;
  38. }
  39. return false;
  40. }
  41. FragmentedSendManager::FragmentedTransfer *FragmentedSendManager::AllocateNewFragmentedTransfer()
  42. {
  43. transfers.push_back(FragmentedTransfer());
  44. FragmentedTransfer *transfer = &transfers.back();
  45. transfer->id = -1;
  46. transfer->totalNumFragments = 0;
  47. KNET_LOG(LogObjectAlloc, "Allocated new fragmented transfer %p.", transfer);
  48. return transfer;
  49. }
  50. void FragmentedSendManager::FreeFragmentedTransfer(FragmentedTransfer *transfer)
  51. {
  52. // Remove all references from any NetworkMessages to this structure.
  53. for(std::list<NetworkMessage*>::iterator iter = transfer->fragments.begin(); iter != transfer->fragments.end(); ++iter)
  54. (*iter)->transfer = 0;
  55. for(TransferList::iterator iter = transfers.begin(); iter != transfers.end(); ++iter)
  56. if (&*iter == transfer)
  57. {
  58. transfers.erase(iter);
  59. KNET_LOG(LogObjectAlloc, "Freed fragmented transfer ID=%d, numFragments: %d (%p).", transfer->id, (int)transfer->totalNumFragments, transfer);
  60. return;
  61. }
  62. KNET_LOG(LogError, "Tried to free a fragmented send struct that didn't exist!");
  63. }
  64. void FragmentedSendManager::RemoveMessage(FragmentedTransfer *transfer, NetworkMessage *message)
  65. {
  66. bool success = transfer->RemoveMessage(message);
  67. if (!success)
  68. {
  69. KNET_LOG(LogError, "Tried to remove a nonexisting message from a fragmented send struct!");
  70. return;
  71. }
  72. if (transfer->fragments.size() == 0)
  73. FreeFragmentedTransfer(transfer);
  74. }
  75. bool FragmentedSendManager::AllocateFragmentedTransferID(FragmentedTransfer &transfer)
  76. {
  77. assert(transfer.id == -1); // The FragmentedTransfer object must not have a previously allocated transfer ID at all.
  78. // We start allocating the ID's from number 1, and the number 0 is never used, so that we get some redundancy in the protocol
  79. // and are able to detect badly formed input.
  80. int transferID = 1;
  81. ///\todo Maintain a sorted order in Insert() instead of doing a search here - better for performance.
  82. bool used = true;
  83. while(used)
  84. {
  85. used = false;
  86. for(TransferList::iterator iter = transfers.begin(); iter != transfers.end(); ++iter)
  87. {
  88. if (iter->id == transferID)
  89. {
  90. ++transferID;
  91. used = true;
  92. }
  93. }
  94. }
  95. if (transferID >= 256)
  96. return false;
  97. transfer.id = transferID;
  98. KNET_LOG(LogObjectAlloc, "Allocated a transferID %d to a transfer of %d fragments.", transfer.id, (int)transfer.totalNumFragments);
  99. return true;
  100. }
  101. void FragmentedSendManager::FreeAllTransfers()
  102. {
  103. while(!transfers.empty())
  104. FreeFragmentedTransfer(&transfers.front());
  105. }
  106. void FragmentedReceiveManager::NewFragmentStartReceived(int transferID, int numTotalFragments, const char *data, size_t numBytes)
  107. {
  108. assert(data);
  109. KNET_LOG(LogVerbose, "Received a fragmentStart of size %db (#total fragments %d) for a transfer with ID %d.", (int)numBytes, numTotalFragments, transferID);
  110. if (numBytes == 0 || numTotalFragments <= 1)
  111. {
  112. KNET_LOG(LogError, "Discarding degenerate fragmentStart of size %db and numTotalFragments=%db!", (int)numBytes, numTotalFragments);
  113. return;
  114. }
  115. for(size_t i = 0; i < transfers.size(); ++i)
  116. if (transfers[i].transferID == transferID)
  117. {
  118. KNET_LOG(LogError, "An existing transfer with ID %d existed! Deleting it.", transferID);
  119. transfers.erase(transfers.begin() + i);
  120. --i;
  121. }
  122. transfers.push_back(ReceiveTransfer());
  123. ReceiveTransfer &transfer = transfers.back();
  124. transfer.transferID = transferID;
  125. transfer.numTotalFragments = numTotalFragments;
  126. ///\todo Can optimize by passing the pre-searched transfer struct.
  127. NewFragmentReceived(transferID, 0, data, numBytes);
  128. }
  129. bool FragmentedReceiveManager::NewFragmentReceived(int transferID, int fragmentNumber, const char *data, size_t numBytes)
  130. {
  131. KNET_LOG(LogVerbose, "Received a fragment of size %db (index %d) for a transfer with ID %d.", (int)numBytes, fragmentNumber, transferID);
  132. if (numBytes == 0)
  133. {
  134. KNET_LOG(LogError, "Discarding fragment of size 0!");
  135. return false;
  136. }
  137. for(size_t i = 0; i < transfers.size(); ++i)
  138. if (transfers[i].transferID == transferID)
  139. {
  140. ReceiveTransfer &transfer = transfers[i];
  141. for(size_t j = 0; j < transfer.fragments.size(); ++j)
  142. if (transfer.fragments[j].fragmentIndex == fragmentNumber)
  143. {
  144. KNET_LOG(LogError, "A fragment with fragmentNumber %d already exists for transferID %d. Discarding the new fragment! Old size: %db, discarded size: %db",
  145. fragmentNumber, transferID, (int)transfer.fragments[j].data.size(), (int)numBytes);
  146. return false;
  147. }
  148. transfer.fragments.push_back(ReceiveFragment());
  149. ReceiveFragment &fragment = transfer.fragments.back();
  150. fragment.fragmentIndex = fragmentNumber;
  151. fragment.data.insert(fragment.data.end(), data, data + numBytes);
  152. if (transfer.fragments.size() >= (size_t)transfer.numTotalFragments)
  153. {
  154. KNET_LOG(LogData, "Finished receiving a fragmented transfer that consisted of %d fragments (transferID=%d).",
  155. (int)transfer.fragments.size(), transfer.transferID);
  156. return true;
  157. }
  158. else
  159. return false;
  160. }
  161. KNET_LOG(LogError, "Received a fragment of size %db (index %d) for a transfer with ID %d, but that transfer had not been initiated!",
  162. (int)numBytes, fragmentNumber, transferID);
  163. return false;
  164. }
  165. void FragmentedReceiveManager::AssembleMessage(int transferID, std::vector<char> &assembledData)
  166. {
  167. for(size_t i = 0; i < transfers.size(); ++i)
  168. if (transfers[i].transferID == transferID)
  169. {
  170. ReceiveTransfer &transfer = transfers[i];
  171. size_t totalSize = 0;
  172. for(size_t j = 0; j < transfer.fragments.size(); ++j)
  173. totalSize += transfer.fragments[j].data.size();
  174. assembledData.resize(totalSize);
  175. ///\todo Sort by fragmentIndex.
  176. size_t offset = 0;
  177. for(size_t j = 0; j < transfer.fragments.size(); ++j)
  178. {
  179. assert(transfer.fragments[j].data.size() > 0);
  180. memcpy(&assembledData[offset], &transfer.fragments[j].data[0], transfer.fragments[j].data.size());
  181. offset += transfer.fragments[j].data.size();
  182. assert(offset <= assembledData.size());
  183. }
  184. }
  185. }
  186. void FragmentedReceiveManager::FreeMessage(int transferID)
  187. {
  188. for(size_t i = 0; i < transfers.size(); ++i)
  189. if (transfers[i].transferID == transferID)
  190. {
  191. transfers.erase(transfers.begin() + i);
  192. return;
  193. }
  194. }
  195. } // ~kNet