ops-mqtt.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. /*
  2. * libwebsockets - small server side websockets and web server implementation
  3. *
  4. * Copyright (C) 2010 - 2020 Andy Green <[email protected]>
  5. *
  6. * Permission is hereby granted, free of charge, to any person obtaining a copy
  7. * of this software and associated documentation files (the "Software"), to
  8. * deal in the Software without restriction, including without limitation the
  9. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10. * sell copies of the Software, and to permit persons to whom the Software is
  11. * furnished to do so, subject to the following conditions:
  12. *
  13. * The above copyright notice and this permission notice shall be included in
  14. * all copies or substantial portions of the Software.
  15. *
  16. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  21. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  22. * IN THE SOFTWARE.
  23. */
  24. #include "private-lib-core.h"
  25. static int
  26. rops_handle_POLLIN_mqtt(struct lws_context_per_thread *pt, struct lws *wsi,
  27. struct lws_pollfd *pollfd)
  28. {
  29. unsigned int pending = 0;
  30. struct lws_tokens ebuf;
  31. int n = 0;
  32. char buffered = 0;
  33. lwsl_debug("%s: wsistate 0x%x, %s pollout %d\n", __func__,
  34. (unsigned int)wsi->wsistate, wsi->a.protocol->name,
  35. pollfd->revents);
  36. /*
  37. * After the CONNACK and nwsi establishment, the first logical
  38. * stream is migrated out of the nwsi to be child sid 1, and the
  39. * nwsi no longer has a wsi->mqtt of its own.
  40. *
  41. * RX events on the nwsi must be converted to events seen or not
  42. * seen by one or more child streams.
  43. *
  44. * SUBACK - reflected to child stream that asked for it
  45. * PUBACK - routed to child that did the related publish
  46. */
  47. ebuf.token = NULL;
  48. ebuf.len = 0;
  49. if (lwsi_state(wsi) != LRS_ESTABLISHED) {
  50. #if defined(LWS_WITH_CLIENT)
  51. if (lwsi_state(wsi) == LRS_WAITING_SSL &&
  52. ((pollfd->revents & LWS_POLLOUT)) &&
  53. lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
  54. lwsl_info("failed at set pollfd\n");
  55. return LWS_HPI_RET_PLEASE_CLOSE_ME;
  56. }
  57. if ((pollfd->revents & LWS_POLLOUT) &&
  58. lws_handle_POLLOUT_event(wsi, pollfd)) {
  59. lwsl_debug("POLLOUT event closed it\n");
  60. return LWS_HPI_RET_PLEASE_CLOSE_ME;
  61. }
  62. n = lws_mqtt_client_socket_service(wsi, pollfd, NULL);
  63. if (n)
  64. return LWS_HPI_RET_WSI_ALREADY_DIED;
  65. #endif
  66. return LWS_HPI_RET_HANDLED;
  67. }
  68. /* 1: something requested a callback when it was OK to write */
  69. if ((pollfd->revents & LWS_POLLOUT) &&
  70. lwsi_state_can_handle_POLLOUT(wsi) &&
  71. lws_handle_POLLOUT_event(wsi, pollfd)) {
  72. if (lwsi_state(wsi) == LRS_RETURNED_CLOSE)
  73. lwsi_set_state(wsi, LRS_FLUSHING_BEFORE_CLOSE);
  74. return LWS_HPI_RET_PLEASE_CLOSE_ME;
  75. }
  76. /* 3: buflist needs to be drained
  77. */
  78. read:
  79. // lws_buflist_describe(&wsi->buflist, wsi, __func__);
  80. ebuf.len = (int)lws_buflist_next_segment_len(&wsi->buflist, &ebuf.token);
  81. if (ebuf.len) {
  82. lwsl_info("draining buflist (len %d)\n", ebuf.len);
  83. buffered = 1;
  84. goto drain;
  85. }
  86. if (!(pollfd->revents & pollfd->events & LWS_POLLIN))
  87. return LWS_HPI_RET_HANDLED;
  88. /* if (lws_is_flowcontrolled(wsi)) { */
  89. /* lwsl_info("%s: %p should be rxflow (bm 0x%x)..\n", */
  90. /* __func__, wsi, wsi->rxflow_bitmap); */
  91. /* return LWS_HPI_RET_HANDLED; */
  92. /* } */
  93. if (!(lwsi_role_client(wsi) && lwsi_state(wsi) != LRS_ESTABLISHED)) {
  94. /*
  95. * In case we are going to react to this rx by scheduling
  96. * writes, we need to restrict the amount of rx to the size
  97. * the protocol reported for rx buffer.
  98. *
  99. * Otherwise we get a situation we have to absorb possibly a
  100. * lot of reads before we get a chance to drain them by writing
  101. * them, eg, with echo type tests in autobahn.
  102. */
  103. buffered = 0;
  104. ebuf.token = pt->serv_buf;
  105. ebuf.len = wsi->a.context->pt_serv_buf_size;
  106. if ((unsigned int)ebuf.len > wsi->a.context->pt_serv_buf_size)
  107. ebuf.len = wsi->a.context->pt_serv_buf_size;
  108. if ((int)pending > ebuf.len)
  109. pending = ebuf.len;
  110. ebuf.len = lws_ssl_capable_read(wsi, ebuf.token,
  111. pending ? (int)pending :
  112. ebuf.len);
  113. switch (ebuf.len) {
  114. case 0:
  115. lwsl_info("%s: zero length read\n",
  116. __func__);
  117. return LWS_HPI_RET_PLEASE_CLOSE_ME;
  118. case LWS_SSL_CAPABLE_MORE_SERVICE:
  119. lwsl_info("SSL Capable more service\n");
  120. return LWS_HPI_RET_HANDLED;
  121. case LWS_SSL_CAPABLE_ERROR:
  122. lwsl_info("%s: LWS_SSL_CAPABLE_ERROR\n",
  123. __func__);
  124. return LWS_HPI_RET_PLEASE_CLOSE_ME;
  125. }
  126. /*
  127. * coverity thinks ssl_capable_read() may read over
  128. * 2GB. Dissuade it...
  129. */
  130. ebuf.len &= 0x7fffffff;
  131. }
  132. drain:
  133. /* service incoming data */
  134. //lws_buflist_describe(&wsi->buflist, wsi, __func__);
  135. if (ebuf.len) {
  136. n = lws_read_mqtt(wsi, ebuf.token, ebuf.len);
  137. if (n < 0) {
  138. lwsl_notice("%s: lws_read_mqtt returned %d\n",
  139. __func__, n);
  140. /* we closed wsi */
  141. goto fail;
  142. }
  143. // lws_buflist_describe(&wsi->buflist, wsi, __func__);
  144. lwsl_debug("%s: consuming %d / %d\n", __func__, n, ebuf.len);
  145. if (lws_buflist_aware_finished_consuming(wsi, &ebuf, ebuf.len,
  146. buffered, __func__))
  147. return LWS_HPI_RET_PLEASE_CLOSE_ME;
  148. }
  149. ebuf.token = NULL;
  150. ebuf.len = 0;
  151. pending = lws_ssl_pending(wsi);
  152. if (pending) {
  153. pending = pending > wsi->a.context->pt_serv_buf_size ?
  154. wsi->a.context->pt_serv_buf_size : pending;
  155. goto read;
  156. }
  157. if (buffered && /* were draining, now nothing left */
  158. !lws_buflist_next_segment_len(&wsi->buflist, NULL)) {
  159. lwsl_info("%s: %p flow buf: drained\n", __func__, wsi);
  160. /* having drained the rxflow buffer, can rearm POLLIN */
  161. #if !defined(LWS_WITH_SERVER)
  162. n =
  163. #endif
  164. __lws_rx_flow_control(wsi);
  165. /* n ignored, needed for NO_SERVER case */
  166. }
  167. /* n = 0 */
  168. return LWS_HPI_RET_HANDLED;
  169. fail:
  170. lwsl_err("%s: Failed, bailing\n", __func__);
  171. lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS, "mqtt svc fail");
  172. return LWS_HPI_RET_WSI_ALREADY_DIED;
  173. }
  174. #if 0 /* defined(LWS_WITH_SERVER) */
  175. static int
  176. rops_adoption_bind_mqtt(struct lws *wsi, int type, const char *vh_prot_name)
  177. {
  178. /* no http but socket... must be mqtt */
  179. if ((type & LWS_ADOPT_HTTP) || !(type & LWS_ADOPT_SOCKET) ||
  180. (type & _LWS_ADOPT_FINISH))
  181. return 0; /* no match */
  182. lws_role_transition(wsi, 0, (type & LWS_ADOPT_ALLOW_SSL) ? LRS_SSL_INIT :
  183. LRS_ESTABLISHED, &role_ops_mqtt);
  184. if (vh_prot_name)
  185. lws_bind_protocol(wsi, wsi->a.protocol, __func__);
  186. else
  187. /* this is the only time he will transition */
  188. lws_bind_protocol(wsi,
  189. &wsi->a.vhost->protocols[wsi->a.vhost->mqtt_protocol_index],
  190. __func__);
  191. return 1; /* bound */
  192. }
  193. #endif
  194. static int
  195. rops_client_bind_mqtt(struct lws *wsi, const struct lws_client_connect_info *i)
  196. {
  197. lwsl_debug("%s: i = %p\n", __func__, i);
  198. if (!i) {
  199. /* finalize */
  200. if (!wsi->user_space && wsi->stash->cis[CIS_METHOD])
  201. if (lws_ensure_user_space(wsi))
  202. return 1;
  203. if (!wsi->stash->cis[CIS_METHOD] && !wsi->stash->cis[CIS_ALPN])
  204. wsi->stash->cis[CIS_ALPN] = "x-amzn-mqtt-ca";
  205. /* if we went on the ah waiting list, it's ok, we can
  206. * wait.
  207. *
  208. * When we do get the ah, now or later, he will end up
  209. * at lws_http_client_connect_via_info2().
  210. */
  211. #if defined(LWS_WITH_CLIENT)
  212. if (lws_header_table_attach(wsi, 0) < 0)
  213. /*
  214. * if we failed here, the connection is already closed
  215. * and freed.
  216. */
  217. return -1;
  218. #else
  219. if (lws_header_table_attach(wsi, 0))
  220. return 0;
  221. #endif
  222. return 0;
  223. }
  224. /* if a recognized mqtt method, bind to it */
  225. if (strcmp(i->method, "MQTT"))
  226. return 0; /* no match */
  227. if (lws_create_client_mqtt_object(i, wsi))
  228. return 1;
  229. lws_role_transition(wsi, LWSIFR_CLIENT, LRS_UNCONNECTED,
  230. &role_ops_mqtt);
  231. return 1; /* matched */
  232. }
  233. static int
  234. rops_handle_POLLOUT_mqtt(struct lws *wsi)
  235. {
  236. struct lws **wsi2;
  237. lwsl_debug("%s\n", __func__);
  238. #if defined(LWS_WITH_CLIENT)
  239. if (wsi->mqtt && wsi->mqtt->send_pingreq && !wsi->mqtt->inside_payload) {
  240. uint8_t buf[LWS_PRE + 2];
  241. /*
  242. * We are swallowing this POLLOUT in order to send a PINGREQ
  243. * autonomously
  244. */
  245. wsi->mqtt->send_pingreq = 0;
  246. lwsl_notice("%s: issuing PINGREQ\n", __func__);
  247. buf[LWS_PRE] = LMQCP_CTOS_PINGREQ << 4;
  248. buf[LWS_PRE + 1] = 0;
  249. if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 2,
  250. LWS_WRITE_BINARY) != 2)
  251. return LWS_HP_RET_BAIL_DIE;
  252. return LWS_HP_RET_BAIL_OK;
  253. }
  254. #endif
  255. wsi = lws_get_network_wsi(wsi);
  256. wsi->mux.requested_POLLOUT = 0;
  257. wsi2 = &wsi->mux.child_list;
  258. if (!*wsi2) {
  259. lwsl_debug("%s: no children\n", __func__);
  260. return LWS_HP_RET_DROP_POLLOUT;
  261. }
  262. lws_wsi_mux_dump_waiting_children(wsi);
  263. do {
  264. struct lws *w, **wa;
  265. wa = &(*wsi2)->mux.sibling_list;
  266. if (!(*wsi2)->mux.requested_POLLOUT)
  267. goto next_child;
  268. if (!lwsi_state_can_handle_POLLOUT(wsi))
  269. goto next_child;
  270. /*
  271. * If the nwsi is in the middle of a frame, we can only
  272. * continue to send that
  273. */
  274. if (wsi->mqtt->inside_payload && !(*wsi2)->mqtt->inside_payload)
  275. goto next_child;
  276. /*
  277. * we're going to do writable callback for this child.
  278. * move him to be the last child
  279. */
  280. w = lws_wsi_mux_move_child_to_tail(wsi2);
  281. if (!w) {
  282. wa = &wsi->mux.child_list;
  283. goto next_child;
  284. }
  285. lwsl_debug("%s: child %p (wsistate 0x%x)\n", __func__, w,
  286. (unsigned int)w->wsistate);
  287. if (lwsi_state(wsi) == LRS_ESTABLISHED &&
  288. !wsi->mqtt->inside_payload &&
  289. wsi->mqtt->send_puback) {
  290. uint8_t buf[LWS_PRE + 4];
  291. lwsl_notice("%s: issuing PUBACK for pkt id: %d\n",
  292. __func__, wsi->mqtt->ack_pkt_id);
  293. /* Fixed header */
  294. buf[LWS_PRE] = LMQCP_PUBACK << 4;
  295. /* Remaining len = 2 */
  296. buf[LWS_PRE + 1] = 2;
  297. /* Packet ID */
  298. lws_ser_wu16be(&buf[LWS_PRE + 2], wsi->mqtt->ack_pkt_id);
  299. if (lws_write(wsi, (uint8_t *)&buf[LWS_PRE], 4,
  300. LWS_WRITE_BINARY) != 4)
  301. return LWS_HP_RET_BAIL_DIE;
  302. wsi->mqtt->send_puback = 0;
  303. w->mux.requested_POLLOUT = 1;
  304. wa = &wsi->mux.child_list;
  305. goto next_child;
  306. }
  307. if (lws_callback_as_writeable(w)) {
  308. lwsl_notice("%s: Closing child %p\n", __func__, w);
  309. lws_close_free_wsi(w, LWS_CLOSE_STATUS_NOSTATUS,
  310. "mqtt pollout handle");
  311. wa = &wsi->mux.child_list;
  312. }
  313. next_child:
  314. wsi2 = wa;
  315. } while (wsi2 && *wsi2 && !lws_send_pipe_choked(wsi));
  316. // lws_wsi_mux_dump_waiting_children(wsi);
  317. if (lws_wsi_mux_action_pending_writeable_reqs(wsi))
  318. return LWS_HP_RET_BAIL_DIE;
  319. return LWS_HP_RET_BAIL_OK;
  320. }
  321. #if defined(LWS_WITH_CLIENT)
  322. static int
  323. rops_issue_keepalive_mqtt(struct lws *wsi, int isvalid)
  324. {
  325. struct lws *nwsi = lws_get_network_wsi(wsi);
  326. if (isvalid) {
  327. _lws_validity_confirmed_role(nwsi);
  328. return 0;
  329. }
  330. nwsi->mqtt->send_pingreq = 1;
  331. lws_callback_on_writable(nwsi);
  332. return 0;
  333. }
  334. #endif
  335. static int
  336. rops_close_role_mqtt(struct lws_context_per_thread *pt, struct lws *wsi)
  337. {
  338. struct lws *nwsi = lws_get_network_wsi(wsi);
  339. lws_mqtt_subs_t *s, *s1, *mysub;
  340. lws_mqttc_t *c;
  341. if (!wsi->mqtt)
  342. return 0;
  343. c = &wsi->mqtt->client;
  344. lws_sul_cancel(&wsi->mqtt->sul_qos1_puback_wait);
  345. lws_mqtt_str_free(&c->username);
  346. lws_mqtt_str_free(&c->password);
  347. lws_mqtt_str_free(&c->will.message);
  348. lws_mqtt_str_free(&c->will.topic);
  349. lws_mqtt_str_free(&c->id);
  350. /* clean up any subscription allocations */
  351. s = wsi->mqtt->subs_head;
  352. wsi->mqtt->subs_head = NULL;
  353. while (s) {
  354. s1 = s->next;
  355. /*
  356. * Account for children no longer using nwsi subscription
  357. */
  358. mysub = lws_mqtt_find_sub(nwsi->mqtt, (const char *)&s[1]);
  359. // assert(mysub); /* if child subscribed, nwsi must feel the same */
  360. if (mysub) {
  361. assert(mysub->ref_count);
  362. mysub->ref_count--;
  363. }
  364. lws_free(s);
  365. s = s1;
  366. }
  367. lws_mqtt_publish_param_t *pub =
  368. (lws_mqtt_publish_param_t *)
  369. wsi->mqtt->rx_cpkt_param;
  370. if (pub)
  371. lws_free_set_NULL(pub->topic);
  372. lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
  373. lws_free_set_NULL(wsi->mqtt);
  374. return 0;
  375. }
  376. static int
  377. rops_callback_on_writable_mqtt(struct lws *wsi)
  378. {
  379. #if defined(LWS_WITH_CLIENT)
  380. struct lws *network_wsi;
  381. #endif
  382. int already;
  383. lwsl_debug("%s: %p (wsistate 0x%x)\n", __func__, wsi, (unsigned int)wsi->wsistate);
  384. if (wsi->mux.requested_POLLOUT
  385. #if defined(LWS_WITH_CLIENT)
  386. && !wsi->client_h2_alpn
  387. #endif
  388. ) {
  389. lwsl_debug("already pending writable\n");
  390. return 1;
  391. }
  392. #if 0
  393. /* is this for DATA or for control messages? */
  394. if (wsi->upgraded_to_http2 && !wsi->h2.h2n->pps &&
  395. !lws_h2_tx_cr_get(wsi)) {
  396. /*
  397. * other side is not able to cope with us sending DATA
  398. * anything so no matter if we have POLLOUT on our side if it's
  399. * DATA we want to send.
  400. *
  401. * Delay waiting for our POLLOUT until peer indicates he has
  402. * space for more using tx window command in http2 layer
  403. */
  404. lwsl_notice("%s: %p: skint (%d)\n", __func__, wsi,
  405. wsi->h2.tx_cr);
  406. wsi->h2.skint = 1;
  407. return 0;
  408. }
  409. wsi->h2.skint = 0;
  410. #endif
  411. #if defined(LWS_WITH_CLIENT)
  412. network_wsi = lws_get_network_wsi(wsi);
  413. #endif
  414. already = lws_wsi_mux_mark_parents_needing_writeable(wsi);
  415. /* for network action, act only on the network wsi */
  416. if (already
  417. #if defined(LWS_WITH_CLIENT)
  418. && !network_wsi->client_mux_substream
  419. #endif
  420. )
  421. return 1;
  422. return 0;
  423. }
  424. static int
  425. rops_close_kill_connection_mqtt(struct lws *wsi, enum lws_close_status reason)
  426. {
  427. lwsl_info(" wsi: %p, his parent %p: child list %p, siblings:\n", wsi,
  428. wsi->mux.parent_wsi, wsi->mux.child_list);
  429. //lws_wsi_mux_dump_children(wsi);
  430. if (wsi->mux_substream
  431. #if defined(LWS_WITH_CLIENT)
  432. || wsi->client_mux_substream
  433. #endif
  434. ) {
  435. lwsl_info("closing %p: parent %p: first child %p\n", wsi,
  436. wsi->mux.parent_wsi, wsi->mux.child_list);
  437. if (wsi->mux.child_list && lwsl_visible(LLL_INFO)) {
  438. lwsl_info(" parent %p: closing children: list:\n", wsi);
  439. lws_wsi_mux_dump_children(wsi);
  440. }
  441. lws_wsi_mux_close_children(wsi, reason);
  442. }
  443. if ((
  444. #if defined(LWS_WITH_CLIENT)
  445. wsi->client_mux_substream ||
  446. #endif
  447. wsi->mux_substream) &&
  448. wsi->mux.parent_wsi) {
  449. lws_wsi_mux_sibling_disconnect(wsi);
  450. }
  451. return 0;
  452. }
  453. struct lws_role_ops role_ops_mqtt = {
  454. /* role name */ "mqtt",
  455. /* alpn id */ "x-amzn-mqtt-ca", /* "mqtt/3.1.1" */
  456. /* check_upgrades */ NULL,
  457. /* pt_init_destroy */ NULL,
  458. /* init_vhost */ NULL,
  459. /* destroy_vhost */ NULL,
  460. /* service_flag_pending */ NULL,
  461. .handle_POLLIN = rops_handle_POLLIN_mqtt,
  462. .handle_POLLOUT = rops_handle_POLLOUT_mqtt,
  463. /* perform_user_POLLOUT */ NULL,
  464. /* callback_on_writable */ rops_callback_on_writable_mqtt,
  465. /* tx_credit */ NULL,
  466. .write_role_protocol = NULL,
  467. /* encapsulation_parent */ NULL,
  468. /* alpn_negotiated */ NULL,
  469. /* close_via_role_protocol */ NULL,
  470. .close_role = rops_close_role_mqtt,
  471. .close_kill_connection = rops_close_kill_connection_mqtt,
  472. /* destroy_role */ NULL,
  473. #if 0 /* defined(LWS_WITH_SERVER) */
  474. /* adoption_bind */ rops_adoption_bind_mqtt,
  475. #else
  476. NULL,
  477. #endif
  478. #if defined(LWS_WITH_CLIENT)
  479. .client_bind = rops_client_bind_mqtt,
  480. .issue_keepalive = rops_issue_keepalive_mqtt,
  481. #else
  482. .client_bind = NULL,
  483. .issue_keepalive = NULL,
  484. #endif
  485. .adoption_cb = { LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED,
  486. LWS_CALLBACK_MQTT_NEW_CLIENT_INSTANTIATED },
  487. .rx_cb = { LWS_CALLBACK_MQTT_CLIENT_RX,
  488. LWS_CALLBACK_MQTT_CLIENT_RX },
  489. .writeable_cb = { LWS_CALLBACK_MQTT_CLIENT_WRITEABLE,
  490. LWS_CALLBACK_MQTT_CLIENT_WRITEABLE },
  491. .close_cb = { LWS_CALLBACK_MQTT_CLIENT_CLOSED,
  492. LWS_CALLBACK_MQTT_CLIENT_CLOSED },
  493. .protocol_bind_cb = { LWS_CALLBACK_MQTT_IDLE,
  494. LWS_CALLBACK_MQTT_IDLE },
  495. .protocol_unbind_cb = { LWS_CALLBACK_MQTT_DROP_PROTOCOL,
  496. LWS_CALLBACK_MQTT_DROP_PROTOCOL },
  497. .file_handle = 0,
  498. };