WSServer.cpp 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. //
  2. // Copyright (c) 2008-2020 the Urho3D project.
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to deal
  6. // in the Software without restriction, including without limitation the rights
  7. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. // copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  20. // THE SOFTWARE.
  21. //
  22. #if defined(URHO3D_WEBSOCKETS) && !defined(__EMSCRIPTEN__)
  23. #include "../Network.h"
  24. #include "../../Core/WorkQueue.h"
  25. #include "../../IO/Log.h"
  26. #include "../../IO/MemoryBuffer.h"
  27. #include "WSHandler.h"
  28. #include "WSServer.h"
  29. #include <libwebsockets.h>
  30. #include <string.h>
  31. static Urho3D::WSServer* WSServerInstance = nullptr;
  32. static struct lws_context *context;
  33. static int WSCallback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len) {
  34. // URHO3D_LOGINFOF("Incoming server messsage reason %d", reason);
  35. switch (reason) {
  36. case LWS_CALLBACK_PROTOCOL_INIT:
  37. break;
  38. case LWS_CALLBACK_ESTABLISHED:
  39. if (WSServerInstance) {
  40. int currentConnectionCount = WSServerInstance->GetSubsystem<Urho3D::Network>()->GetClientConnections().Size();
  41. if (currentConnectionCount >= WSServerInstance->GetMaxConnections()) {
  42. // Drop the connection, server is full
  43. return -1;
  44. }
  45. WSServerInstance->AddPendingConnection(wsi);
  46. }
  47. break;
  48. case LWS_CALLBACK_SERVER_WRITEABLE:
  49. case LWS_CALLBACK_CLIENT_WRITEABLE:
  50. if (WSServerInstance) {
  51. if (WSServerInstance->IsMarkedForDisconnect(wsi)) {
  52. WSServerInstance->RemoveDisconnected(wsi);
  53. return -1;
  54. }
  55. Urho3D::MutexLock lock(WSServerInstance->GetOutgoingMutex());
  56. auto* packets = WSServerInstance->GetOutgoingPackets(wsi);
  57. for (auto it = packets->Begin(); it != packets->End(); ++it) {
  58. WSPacket& packet = (*it);
  59. unsigned char buf[LWS_PRE + packet.second_.GetSize()];
  60. memcpy(&buf[LWS_PRE], packet.second_.GetData(), packet.second_.GetSize());
  61. int retval = lws_write(packet.first_.GetWS(), &buf[LWS_PRE], packet.second_.GetSize(), LWS_WRITE_BINARY);
  62. if (retval < packet.second_.GetSize()) {
  63. URHO3D_LOGERRORF("Failed to write to WS, ret = %d", retval);
  64. break;
  65. }
  66. packets->Erase(it);
  67. lws_callback_on_writable(wsi);
  68. break;
  69. }
  70. }
  71. break;
  72. case LWS_CALLBACK_RECEIVE: {
  73. Urho3D::VectorBuffer b((unsigned char*)in, len);
  74. if (b.GetData()[0] == URHO3D_MESSAGE) {
  75. WSPacket packet(wsi, b);
  76. if (WSServerInstance) {
  77. WSServerInstance->AddIncomingPacket(packet);
  78. }
  79. } else {
  80. URHO3D_LOGINFOF("Received message that is not part of the engine %d", b.GetData()[0]);
  81. }
  82. break;
  83. }
  84. case LWS_CALLBACK_CLOSED: {
  85. if (WSServerInstance) {
  86. WSServerInstance->AddClosedConnection(wsi);
  87. }
  88. URHO3D_LOGINFOF("LWS_CALLBACK_CLOSED");
  89. break;
  90. }
  91. default:
  92. break;
  93. }
  94. return 0;
  95. }
  96. static struct lws_protocols protocols[] = {
  97. { "ws-server", WSCallback, 0, 0 },
  98. { NULL, NULL, 0, 0 } /* terminator */
  99. };
  100. using namespace Urho3D;
  101. static void RunService(const WorkItem* item, unsigned threadIndex) {
  102. auto packets = WSServerInstance->GetAllOutgoingPackets();
  103. for (auto it = packets->Begin(); it != packets->End(); ++it) {
  104. if (!(*it).second_.Empty()) {
  105. auto ws = (*it).second_.Front().first_;
  106. // URHO3D_LOGINFOF("Outgoing packet count (server) %d", WSServerInstance->GetNumOutgoingPackets(ws));
  107. lws_callback_on_writable(ws.GetWS());
  108. }
  109. }
  110. int result = lws_service(context, 0);
  111. if (result < 0 && WSServerInstance) {
  112. WSServerInstance->StopServer();
  113. }
  114. }
  115. WSServer::WSServer(Context* context):
  116. Object(context)
  117. {
  118. WSServerInstance = this;
  119. }
  120. WSServer::~WSServer()
  121. {
  122. WSServerInstance = nullptr;
  123. }
  124. int WSServer::StartServer(unsigned short port, unsigned int maxConnections)
  125. {
  126. maxConnections_ = maxConnections;
  127. struct lws_context_creation_info info;
  128. lws_set_log_level(LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE, OutputWSLog);
  129. memset(&info, 0, sizeof info);
  130. info.port = port;
  131. info.protocols = protocols;
  132. info.options = LWS_SERVER_OPTION_HTTP_HEADERS_SECURITY_BEST_PRACTICES_ENFORCE;
  133. info.gid = -1;
  134. info.uid = -1;
  135. context = lws_create_context(&info);
  136. if (!context) {
  137. URHO3D_LOGERROR("Failed to start websocket server");
  138. return 1;
  139. }
  140. URHO3D_LOGINFOF("Websockets server started on port %d", port);
  141. SubscribeToEvent(E_WORKITEMCOMPLETED, URHO3D_HANDLER(WSServer, HandleWorkItemFinished));
  142. return 0;
  143. }
  144. void WSServer::Update(float timestep)
  145. {
  146. if (currentState_ == WSS_RUNNING && nextState_ == WSS_STOPPED) {
  147. GetSubsystem<Network>()->StopServer();
  148. }
  149. while(!closedConnections.Empty()) {
  150. auto ws = closedConnections.Front().GetWS();
  151. GetSubsystem<Network>()->ClientDisconnected(closedConnections.Front());
  152. auto packets = GetAllOutgoingPackets();
  153. packets->Erase(ws);
  154. closedConnections.PopFront();
  155. }
  156. while(!pendingConnections_.Empty()) {
  157. auto ws = pendingConnections_.Front();
  158. GetSubsystem<Network>()->NewConnectionEstablished(ws);
  159. pendingConnections_.PopFront();
  160. }
  161. if (context) {
  162. if (!serviceWorkItem_) {
  163. WorkQueue *workQueue = GetSubsystem<WorkQueue>();
  164. serviceWorkItem_ = workQueue->GetFreeItem();
  165. serviceWorkItem_->priority_ = M_MAX_INT;
  166. serviceWorkItem_->workFunction_ = RunService;
  167. serviceWorkItem_->aux_ = this;
  168. serviceWorkItem_->sendEvent_ = true;
  169. workQueue->AddWorkItem(serviceWorkItem_);
  170. }
  171. }
  172. while (GetNumIncomingPackets()) {
  173. auto packet = GetIncomingPacket();
  174. GetSubsystem<Network>()->HandleIncomingPacket(packet, true);
  175. RemoveIncomingPacket();
  176. }
  177. if (currentState_ != nextState_) {
  178. currentState_ = nextState_;
  179. }
  180. }
  181. void WSServer::StopServer()
  182. {
  183. SetState(WSS_STOPPED);
  184. if (context)
  185. {
  186. lws_context_destroy(context);
  187. context = nullptr;
  188. }
  189. }
  190. void WSServer::HandleWorkItemFinished(StringHash eventType, VariantMap& eventData)
  191. {
  192. using namespace WorkItemCompleted;
  193. WorkItem *workItem = reinterpret_cast<WorkItem *>(eventData[P_ITEM].GetPtr());
  194. if (workItem->aux_ != this)
  195. return;
  196. if (workItem->workFunction_ == RunService)
  197. serviceWorkItem_.Reset();
  198. }
  199. void WSServer::AddPendingConnection(lws* ws)
  200. {
  201. pendingConnections_.Push(ws);
  202. }
  203. void WSServer::AddClosedConnection(lws* ws)
  204. {
  205. closedConnections.Push(ws);
  206. }
  207. void WSServer::SetState(WSServerState state)
  208. {
  209. nextState_ = state;
  210. }
  211. void WSServer::MarkForDisconnect(const WSConnection& ws)
  212. {
  213. MutexLock lock(GetOutgoingMutex());
  214. pendingDisconnects_.Push(ws);
  215. }
  216. bool WSServer::IsMarkedForDisconnect(lws* ws)
  217. {
  218. MutexLock lock(GetOutgoingMutex());
  219. if (pendingDisconnects_.Contains(ws))
  220. return true;
  221. return false;
  222. }
  223. void WSServer::RemoveDisconnected(lws* ws)
  224. {
  225. MutexLock lock(GetOutgoingMutex());
  226. auto pIt = pendingDisconnects_.Find(ws);
  227. if (pIt != pendingDisconnects_.End())
  228. pendingDisconnects_.Erase(pIt);
  229. auto oIt = outgoingPackets_.Find(ws);
  230. if (oIt != outgoingPackets_.End())
  231. outgoingPackets_.Erase(oIt);
  232. }
  233. #endif