WSClient.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. //
  2. // Copyright (c) 2008-2020 the Urho3D project.
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to deal
  6. // in the Software without restriction, including without limitation the rights
  7. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  8. // copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  19. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  20. // THE SOFTWARE.
  21. //
  22. #ifdef URHO3D_WEBSOCKETS
  23. #include "../../Core/WorkQueue.h"
  24. #include "../../IO/MemoryBuffer.h"
  25. #include "../../IO/Log.h"
  26. #include "../Network.h"
  27. #include "WSClient.h"
  28. #include "WSHandler.h"
  29. #include <libwebsockets.h>
  30. #include <string.h>
  31. #ifdef __EMSCRIPTEN__
  32. #include <emscripten.h>
  33. #endif
  34. static Urho3D::WSClient* WSClientInstance = nullptr;
  35. #if defined(WIN32)
  36. #define HAVE_STRUCT_TIMESPEC
  37. #if defined(pid_t)
  38. #undef pid_t
  39. #endif
  40. #endif
  41. #ifndef __EMSCRIPTEN__
  42. static struct lws *client_wsi;
  43. static lws_sorted_usec_list_t sul;
  44. static struct lws_context *context;
  45. static const lws_retry_bo_t retry = {
  46. .secs_since_valid_ping = 3,
  47. .secs_since_valid_hangup = 10,
  48. };
  49. #endif
  50. #ifndef __EMSCRIPTEN__
  51. static void connect_cb(lws_sorted_usec_list_t *_sul)
  52. {
  53. struct lws_client_connect_info i;
  54. lwsl_notice("%s: connecting\n", __func__);
  55. memset(&i, 0, sizeof(i));
  56. i.context = context;
  57. i.port = WSClientInstance->GetPort();
  58. i.address = WSClientInstance->GetAddress().CString();
  59. i.path = "/";
  60. i.host = i.address;
  61. i.origin = i.address;
  62. i.protocol = WSClientInstance->GetServerProtocol().CString();
  63. i.alpn = "http/1.1";
  64. i.local_protocol_name = "ws_client";
  65. i.pwsi = &client_wsi;
  66. i.retry_and_idle_policy = &retry;
  67. if (!lws_client_connect_via_info(&i))
  68. lws_sul_schedule(context, 0, _sul, connect_cb, 5 * LWS_USEC_PER_SEC);
  69. }
  70. #endif
  71. static int WSCallback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
  72. {
  73. // URHO3D_LOGINFOF("Incoming client messsage reason %d", reason);
  74. switch (reason) {
  75. case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
  76. if (WSClientInstance) {
  77. WSClientInstance->SetState(Urho3D::WCS_CONNECTION_FAILED);
  78. }
  79. break;
  80. case LWS_CALLBACK_CLIENT_ESTABLISHED:
  81. if (WSClientInstance) {
  82. WSClientInstance->SetWSConnection(wsi);
  83. WSClientInstance->SetState(Urho3D::WCS_CONNECTED);
  84. }
  85. lwsl_user("%s: established\n", __func__);
  86. break;
  87. case LWS_CALLBACK_CLIENT_RECEIVE: {
  88. URHO3D_LOGINFOF("Received buffer of size %d", len);
  89. Urho3D::VectorBuffer b((unsigned char*)in, len);
  90. if (b.GetData()[0] == URHO3D_MESSAGE) {
  91. WSPacket packet(wsi, b);
  92. if (WSClientInstance) {
  93. WSClientInstance->AddIncomingPacket(packet);
  94. }
  95. } else {
  96. URHO3D_LOGINFOF("Received message that is not part of the engine %d", b.GetData()[0]);
  97. }
  98. lws_callback_on_writable(wsi);
  99. break;
  100. }
  101. case LWS_CALLBACK_CLIENT_WRITEABLE:
  102. if (WSClientInstance) {
  103. #ifdef __EMSCRIPTEN__
  104. if(WSClientInstance->GetNumOutgoingPackets(wsi))
  105. {
  106. auto packet = WSClientInstance->GetOutgoingPacket(wsi);
  107. if (packet)
  108. {
  109. int retval = EM_ASM_INT({
  110. var socket = Module.__libwebsocket.socket;
  111. if( ! socket ) {
  112. return -1;
  113. }
  114. // alloc a Uint8Array backed by the incoming data.
  115. var data_in = new Uint8Array(Module.HEAPU8.buffer, $1, $2 );
  116. // allow the dest array
  117. var data = new Uint8Array($2);
  118. // set the dest from the src
  119. data.set(data_in);
  120. socket.send( data );
  121. return $2;
  122. }, packet->second_.GetData(), packet->second_.GetSize());
  123. if (retval < packet->second_.GetSize()) {
  124. URHO3D_LOGERRORF("Failed to write to WS, bytes written = %d", retval);
  125. break;
  126. }
  127. WSClientInstance->RemoveOutgoingPacket(wsi);
  128. }
  129. }
  130. #else
  131. if(WSClientInstance->GetNumOutgoingPackets(wsi)) {
  132. auto packet = WSClientInstance->GetOutgoingPacket(wsi);
  133. if (packet) {
  134. unsigned char buf[LWS_PRE + packet->second_.GetSize()];
  135. memcpy(&buf[LWS_PRE], packet->second_.GetData(), packet->second_.GetSize());
  136. int retval = lws_write(wsi, &buf[LWS_PRE], packet->second_.GetSize(), LWS_WRITE_BINARY);
  137. if (retval < packet->second_.GetSize()) {
  138. URHO3D_LOGERRORF("Failed to write to WS, bytes written = %d", retval);
  139. break;
  140. }
  141. WSClientInstance->RemoveOutgoingPacket(wsi);
  142. }
  143. }
  144. lws_callback_on_writable(wsi);
  145. #endif
  146. }
  147. break;
  148. case LWS_CALLBACK_CLIENT_CLOSED:
  149. if (WSClientInstance) {
  150. WSClientInstance->Disconnect();
  151. }
  152. URHO3D_LOGINFOF("LWS_CALLBACK_CLIENT_CLOSED");
  153. break;
  154. default:
  155. break;
  156. }
  157. return 0;
  158. }
  159. #ifdef __EMSCRIPTEN__
  160. extern "C" int EMSCRIPTEN_KEEPALIVE WSHelper(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len ) {
  161. return WSCallback(wsi, reason, user, in, len);
  162. }
  163. #endif
  164. #ifndef __EMSCRIPTEN__
  165. static const struct lws_protocols protocols[] = {
  166. {"ws_client", WSCallback,0,0,},
  167. { NULL, NULL, 0, 0 }
  168. };
  169. #endif
  170. using namespace Urho3D;
  171. static void RunService(const WorkItem* item, unsigned threadIndex) {
  172. #ifdef __EMSCRIPTEN
  173. // Trigger writing on the socket
  174. WSCallback(nullptr, LWS_CALLBACK_CLIENT_WRITEABLE, nullptr, nullptr, 0);
  175. #else
  176. int result = lws_service(context, 0);
  177. if (result < 0 && WSClientInstance) {
  178. WSClientInstance->Disconnect();
  179. }
  180. URHO3D_LOGINFOF("Running client service");
  181. #endif
  182. }
  183. WSClient::WSClient(Context* context):
  184. Object(context),
  185. ws_(nullptr)
  186. {
  187. SetState(WCS_DISCONNECTED);
  188. WSClientInstance = this;
  189. }
  190. WSClient::~WSClient()
  191. {
  192. WSClientInstance = nullptr;
  193. }
  194. int WSClient::Connect(const String& address, unsigned short port)
  195. {
  196. address_ = address;
  197. port_ = port;
  198. serverProtocol_ = "ws-server";
  199. SetState(WCS_CONNECTING);
  200. SubscribeToEvent(E_WORKITEMCOMPLETED, URHO3D_HANDLER(WSClient, HandleWorkItemFinished));
  201. struct lws_context_creation_info info;
  202. lws_set_log_level(LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE, OutputWSLog);
  203. memset(&info, 0, sizeof info);
  204. info.port = CONTEXT_PORT_NO_LISTEN;
  205. info.protocols = protocols;
  206. context = lws_create_context(&info);
  207. if (!context) {
  208. URHO3D_LOGERROR("Failed to start Websocket client");
  209. return 1;
  210. }
  211. #ifndef __EMSCRIPTEN__
  212. lws_sul_schedule(context, 0, &sul, connect_cb, 50);
  213. #else
  214. EM_ASM({
  215. var libwebsocket = {};
  216. libwebsocket.socket = false;
  217. libwebsocket.on_event = Module.cwrap('WSHelper', 'number', ['number', 'number', 'number', 'number', 'number']);
  218. libwebsocket.connect = function( url, protocol, user_data ) {
  219. try {
  220. var socket = new WebSocket(url,protocol);
  221. socket.binaryType = "arraybuffer";
  222. socket.user_data = user_data;
  223. socket.onopen = this.on_connect;
  224. socket.onmessage = this.on_message;
  225. socket.onclose = this.on_close;
  226. socket.onerror = this.on_error;
  227. socket.destroy = this.destroy;
  228. this.socket = socket;
  229. return socket;
  230. } catch(e) {
  231. Module.print("Socket creation failed:" + e);
  232. return 0;
  233. }
  234. };
  235. libwebsocket.on_connect = function() {
  236. var stack = stackSave();
  237. // filter protocol //
  238. var ret = libwebsocket.on_event(this.id, 9, this.user_data, allocate(intArrayFromString(this.protocol), 'i8', ALLOC_STACK), this.protocol.length);
  239. if( !ret ) {
  240. // client established
  241. ret = libwebsocket.on_event(this.id, 3, this.user_data, 0, 0);
  242. }
  243. if( ret ) {
  244. this.close();
  245. }
  246. stackRestore(stack);
  247. };
  248. libwebsocket.on_message = function(event) {
  249. var stack = stackSave();
  250. var len = event.data.byteLength;
  251. var ptr = allocate( len, 'i8', ALLOC_STACK);
  252. var data = new Uint8Array( event.data );
  253. for(var i =0, buf = ptr; i < len; ++i ) {
  254. setValue(buf, data[i], 'i8');
  255. buf++;
  256. }
  257. // client receive //
  258. if(libwebsocket.on_event(this.id, 6, this.user_data, ptr, len)) {
  259. this.close();
  260. }
  261. stackRestore(stack);
  262. };
  263. libwebsocket.on_close = function() {
  264. // closed //
  265. libwebsocket.on_event(this.protocol_id, ctx, this.id, 4, this.user_data, 0, 0);
  266. this.destroy();
  267. };
  268. libwebsocket.on_error = function() {
  269. // client connection error //
  270. libwebsocket.on_event(this.protocol_id, ctx, this.id, 2, this.user_data, 0, 0);
  271. this.destroy();
  272. };
  273. libwebsocket.destroy = function() {
  274. libwebsocket.socket = false;
  275. libwebsocket.on_event(this.protocol_id, ctx, this.id, 11, this.user_data, 0, 0);
  276. };
  277. Module.__libwebsocket = libwebsocket;
  278. });
  279. #endif
  280. return 0;
  281. }
  282. void WSClient::Update(float timestep)
  283. {
  284. if (currentState_ == WCS_CONNECTING && nextState_ == WCS_CONNECTED) {
  285. GetSubsystem<Network>()->OnServerConnected(GetWSConnection());
  286. }
  287. if (nextState_ == WCS_DISCONNECTED) {
  288. GetSubsystem<Network>()->OnServerDisconnected(GetWSConnection(), false);
  289. }
  290. if (nextState_ == WCS_CONNECTION_FAILED) {
  291. GetSubsystem<Network>()->OnServerDisconnected(GetWSConnection(), true);
  292. }
  293. if (context) {
  294. if (!serviceWorkItem_ && currentState_ != WCS_DISCONNECTED) {
  295. WorkQueue *workQueue = GetSubsystem<WorkQueue>();
  296. serviceWorkItem_ = workQueue->GetFreeItem();
  297. serviceWorkItem_->priority_ = M_MAX_INT;
  298. serviceWorkItem_->workFunction_ = RunService;
  299. serviceWorkItem_->aux_ = this;
  300. serviceWorkItem_->sendEvent_ = true;
  301. workQueue->AddWorkItem(serviceWorkItem_);
  302. }
  303. }
  304. while (GetNumIncomingPackets()) {
  305. auto packet = GetIncomingPacket();
  306. GetSubsystem<Network>()->HandleIncomingPacket(packet, false);
  307. RemoveIncomingPacket();
  308. }
  309. if (currentState_ != nextState_) {
  310. currentState_ = nextState_;
  311. }
  312. }
  313. void WSClient::Disconnect()
  314. {
  315. SetState(WCS_DISCONNECTED);
  316. #ifndef __EMSCRIPTEN__
  317. if (context)
  318. {
  319. lws_context_destroy(context);
  320. context = nullptr;
  321. }
  322. #endif
  323. }
  324. void WSClient::SetWSConnection(lws *ws)
  325. {
  326. ws_ = ws;
  327. }
  328. void WSClient::SetState(WSClientState state)
  329. {
  330. nextState_ = state;
  331. }
  332. void WSClient::HandleWorkItemFinished(StringHash eventType, VariantMap& eventData)
  333. {
  334. using namespace WorkItemCompleted;
  335. WorkItem *workItem = reinterpret_cast<WorkItem *>(eventData[P_ITEM].GetPtr());
  336. if (workItem->aux_ != this) {
  337. return;
  338. }
  339. if (workItem->workFunction_ == RunService) {
  340. serviceWorkItem_.Reset();
  341. }
  342. }
  343. #endif