mqtt.c 56 KB


  1. /*
  2. * libwebsockets - small server side websockets and web server implementation
  3. *
  4. * Copyright (C) 2010 - 2020 Andy Green <[email protected]>
  5. *
  6. * Permission is hereby granted, free of charge, to any person obtaining a copy
  7. * of this software and associated documentation files (the "Software"), to
  8. * deal in the Software without restriction, including without limitation the
  9. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10. * sell copies of the Software, and to permit persons to whom the Software is
  11. * furnished to do so, subject to the following conditions:
  12. *
  13. * The above copyright notice and this permission notice shall be included in
  14. * all copies or substantial portions of the Software.
  15. *
  16. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  21. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  22. * IN THE SOFTWARE.
  23. *
  24. * MQTT v5
  25. *
  26. * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
  27. *
  28. * Control Packet structure
  29. *
  30. * - Always: 2+ byte: Fixed Hdr
  31. * - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
  32. * - Required in some: variable: Payload
  33. *
  34. * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
  35. *
  36. * - Client Identifier
  37. * - Will Properties
  38. * - Will Topic
  39. * - Will Payload
  40. * - User Name
  41. * - Password
  42. */
  43. #include "private-lib-core.h"
  44. #include <string.h>
  45. #include <sys/types.h>
  46. #include <assert.h>
  47. typedef enum {
  48. LMQPRS_AWAITING_CONNECT,
  49. } lws_mqtt_protocol_server_connstate_t;
  50. const char * const reason_names_g1[] = {
  51. "Success / Normal disconnection / QoS0",
  52. "QoS1",
  53. "QoS2",
  54. "Disconnect Will",
  55. "No matching subscriber",
  56. "No subscription existed",
  57. "Continue authentication",
  58. "Re-authenticate"
  59. };
  60. const char * const reason_names_g2[] = {
  61. "Unspecified error",
  62. "Malformed packet",
  63. "Protocol error",
  64. "Implementation specific error",
  65. "Unsupported protocol",
  66. "Client ID invalid",
  67. "Bad credentials",
  68. "Not Authorized",
  69. "Server Unavailable",
  70. "Server Busy",
  71. "Banned",
  72. "Server Shutting Down",
  73. "Bad Authentication Method",
  74. "Keepalive Timeout",
  75. "Session taken over",
  76. "Topic Filter Invalid",
  77. "Packet ID in use",
  78. "Packet ID not found",
  79. "Max RX Exceeded",
  80. "Topic Alias Invalid",
  81. "Packet too large",
  82. "Ratelimit",
  83. "Quota Exceeded",
  84. "Administrative Action",
  85. "Payload format invalid",
  86. "Retain not supported",
  87. "QoS not supported",
  88. "Use another server",
  89. "Server Moved",
  90. "Shared subscriptions not supported",
  91. "Connection rate exceeded",
  92. "Maximum Connect Time",
  93. "Subscription IDs not supported",
  94. "Wildcard subscriptions not supported"
  95. };
  96. #define LMQCP_WILL_PROPERTIES 0
  97. /* For each property, a bitmap describing which commands it is valid for */
  98. static const uint16_t property_valid[] = {
  99. [LMQPROP_PAYLOAD_FORMAT_INDICATOR] = (1 << LMQCP_PUBLISH) |
  100. (1 << LMQCP_WILL_PROPERTIES),
  101. [LMQPROP_MESSAGE_EXPIRY_INTERVAL] = (1 << LMQCP_PUBLISH) |
  102. (1 << LMQCP_WILL_PROPERTIES),
  103. [LMQPROP_CONTENT_TYPE] = (1 << LMQCP_PUBLISH) |
  104. (1 << LMQCP_WILL_PROPERTIES),
  105. [LMQPROP_RESPONSE_TOPIC] = (1 << LMQCP_PUBLISH) |
  106. (1 << LMQCP_WILL_PROPERTIES),
  107. [LMQPROP_CORRELATION_DATA] = (1 << LMQCP_PUBLISH) |
  108. (1 << LMQCP_WILL_PROPERTIES),
  109. [LMQPROP_SUBSCRIPTION_IDENTIFIER] = (1 << LMQCP_PUBLISH) |
  110. (1 << LMQCP_CTOS_SUBSCRIBE),
  111. [LMQPROP_SESSION_EXPIRY_INTERVAL] = (1 << LMQCP_CTOS_CONNECT) |
  112. (1 << LMQCP_STOC_CONNACK) |
  113. (1 << LMQCP_DISCONNECT),
  114. [LMQPROP_ASSIGNED_CLIENT_IDENTIFIER] = (1 << LMQCP_STOC_CONNACK),
  115. [LMQPROP_SERVER_KEEP_ALIVE] = (1 << LMQCP_STOC_CONNACK),
  116. [LMQPROP_AUTHENTICATION_METHOD] = (1 << LMQCP_CTOS_CONNECT) |
  117. (1 << LMQCP_STOC_CONNACK) |
  118. (1 << LMQCP_AUTH),
  119. [LMQPROP_AUTHENTICATION_DATA] = (1 << LMQCP_CTOS_CONNECT) |
  120. (1 << LMQCP_STOC_CONNACK) |
  121. (1 << LMQCP_AUTH),
  122. [LMQPROP_REQUEST_PROBLEM_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
  123. [LMQPROP_WILL_DELAY_INTERVAL] = (1 << LMQCP_WILL_PROPERTIES),
  124. [LMQPROP_REQUEST_RESPONSE_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
  125. [LMQPROP_RESPONSE_INFORMATION] = (1 << LMQCP_STOC_CONNACK),
  126. [LMQPROP_SERVER_REFERENCE] = (1 << LMQCP_STOC_CONNACK) |
  127. (1 << LMQCP_DISCONNECT),
  128. [LMQPROP_REASON_STRING] = (1 << LMQCP_STOC_CONNACK) |
  129. (1 << LMQCP_PUBACK) |
  130. (1 << LMQCP_PUBREC) |
  131. (1 << LMQCP_PUBREL) |
  132. (1 << LMQCP_PUBCOMP) |
  133. (1 << LMQCP_STOC_SUBACK) |
  134. (1 << LMQCP_STOC_UNSUBACK) |
  135. (1 << LMQCP_DISCONNECT) |
  136. (1 << LMQCP_AUTH),
  137. [LMQPROP_RECEIVE_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
  138. (1 << LMQCP_STOC_CONNACK),
  139. [LMQPROP_TOPIC_ALIAS_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
  140. (1 << LMQCP_STOC_CONNACK),
  141. [LMQPROP_TOPIC_ALIAS] = (1 << LMQCP_PUBLISH),
  142. [LMQPROP_MAXIMUM_QOS] = (1 << LMQCP_STOC_CONNACK),
  143. [LMQPROP_RETAIN_AVAILABLE] = (1 << LMQCP_STOC_CONNACK),
  144. [LMQPROP_USER_PROPERTY] = (1 << LMQCP_CTOS_CONNECT) |
  145. (1 << LMQCP_STOC_CONNACK) |
  146. (1 << LMQCP_PUBLISH) |
  147. (1 << LMQCP_WILL_PROPERTIES) |
  148. (1 << LMQCP_PUBACK) |
  149. (1 << LMQCP_PUBREC) |
  150. (1 << LMQCP_PUBREL) |
  151. (1 << LMQCP_PUBCOMP) |
  152. (1 << LMQCP_CTOS_SUBSCRIBE) |
  153. (1 << LMQCP_STOC_SUBACK) |
  154. (1 << LMQCP_CTOS_UNSUBSCRIBE) |
  155. (1 << LMQCP_STOC_UNSUBACK) |
  156. (1 << LMQCP_DISCONNECT) |
  157. (1 << LMQCP_AUTH),
  158. [LMQPROP_MAXIMUM_PACKET_SIZE] = (1 << LMQCP_CTOS_CONNECT) |
  159. (1 << LMQCP_STOC_CONNACK),
  160. [LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK),
  161. [LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL] = (1 << LMQCP_STOC_CONNACK),
  162. [LMQPROP_SHARED_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK)
  163. };
  164. /*
  165. * For each command index, maps flags, id, qos and payload legality
  166. * notice in most cases PUBLISH requires further processing
  167. */
  168. static const uint8_t map_flags[] = {
  169. [LMQCP_RESERVED] = 0x00,
  170. [LMQCP_CTOS_CONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  171. LMQCP_LUT_FLAG_PAYLOAD |
  172. LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
  173. [LMQCP_STOC_CONNACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  174. LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
  175. [LMQCP_PUBLISH] = LMQCP_LUT_FLAG_PAYLOAD | /* option */
  176. LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
  177. [LMQCP_PUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  178. LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
  179. [LMQCP_PUBREC] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  180. LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
  181. [LMQCP_PUBREL] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  182. LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
  183. [LMQCP_PUBCOMP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  184. LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
  185. [LMQCP_CTOS_SUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  186. LMQCP_LUT_FLAG_PAYLOAD |
  187. LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
  188. [LMQCP_STOC_SUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  189. LMQCP_LUT_FLAG_PAYLOAD |
  190. LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
  191. [LMQCP_CTOS_UNSUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  192. LMQCP_LUT_FLAG_PAYLOAD |
  193. LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
  194. [LMQCP_STOC_UNSUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  195. LMQCP_LUT_FLAG_PAYLOAD |
  196. LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
  197. [LMQCP_CTOS_PINGREQ] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  198. LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
  199. [LMQCP_STOC_PINGRESP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  200. LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
  201. [LMQCP_DISCONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  202. LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
  203. [LMQCP_AUTH] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
  204. LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
  205. };
  206. void
  207. lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
  208. {
  209. lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
  210. c->estate = s;
  211. }
  212. static int
  213. lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
  214. {
  215. par->consumed += consumed;
  216. if (par->consumed > par->props_len)
  217. return -1;
  218. /* more properties coming */
  219. if (par->consumed < par->props_len) {
  220. par->state = LMQCPP_PROP_ID_VBI;
  221. return 0;
  222. }
  223. /* properties finished: are we headed for payload or idle? */
  224. if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
  225. /* A PUBLISH packet MUST NOT contain a Packet Identifier if
  226. * its QoS value is set to 0 [MQTT-2.2.1-2]. */
  227. (ctl_pkt_type(par) != LMQCP_PUBLISH ||
  228. (par->packet_type_flags & 6))) {
  229. par->state = LMQCPP_PAYLOAD;
  230. return 0;
  231. }
  232. par->state = LMQCPP_IDLE;
  233. return 0;
  234. }
  235. static int
  236. lws_mqtt_set_client_established(struct lws *wsi)
  237. {
  238. lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
  239. &role_ops_mqtt);
  240. if (user_callback_handle_rxflow(wsi->a.protocol->callback,
  241. wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
  242. wsi->user_space, NULL, 0) < 0) {
  243. lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
  244. return -1;
  245. }
  246. /*
  247. * If we made a new connection and got the ACK, our connection is
  248. * definitely working in both directions at the moment
  249. */
  250. lws_validity_confirmed(wsi);
  251. /* clear connection timeout */
  252. lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
  253. return 0;
  254. }
  255. lws_mqtt_subs_t *
  256. lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic)
  257. {
  258. lws_mqtt_subs_t *s = mqtt->subs_head;
  259. while (s) {
  260. if (!strcmp((const char *)s->topic, topic))
  261. return s;
  262. s = s->next;
  263. }
  264. return NULL;
  265. }
  266. static lws_mqtt_subs_t *
  267. lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
  268. {
  269. lws_mqtt_subs_t *mysub;
  270. mysub = lws_malloc(sizeof(*mysub) + strlen(topic) + 1, "sub");
  271. if (!mysub)
  272. return NULL;
  273. mysub->next = mqtt->subs_head;
  274. mqtt->subs_head = mysub;
  275. memcpy(mysub->topic, topic, strlen(topic) + 1);
  276. mysub->ref_count = 1;
  277. lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
  278. __func__, mysub, mqtt);
  279. return mysub;
  280. }
  281. static int
  282. lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
  283. {
  284. lws_mqtt_subs_t *s = mqtt->subs_head;
  285. lws_mqtt_subs_t *temp = NULL;
  286. lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
  287. __func__, mqtt);
  288. while (s && s->next) {
  289. if (s->next->ref_count == 0)
  290. break;
  291. s = s->next;
  292. }
  293. if (s && s->next) {
  294. temp = s->next;
  295. lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
  296. __func__, temp, mqtt);
  297. s->next = temp->next;
  298. lws_free(temp);
  299. return 0;
  300. }
  301. return 1;
  302. }
  303. int
  304. _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
  305. const uint8_t *buf, size_t len)
  306. {
  307. struct lws *w;
  308. int n;
  309. if (par->flag_pending_send_reason_close)
  310. return 0;
  311. /*
  312. * Stateful, fragmentation-immune parser
  313. *
  314. * Notice that len can always be 1 if under attack, even over tls if
  315. * the server is compromised or malicious.
  316. */
  317. while (len) {
  318. lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
  319. switch (par->state) {
  320. case LMQCPP_IDLE:
  321. par->packet_type_flags = *buf++;
  322. len--;
  323. #if defined(LWS_WITH_CLIENT)
  324. /*
  325. * The case where we sent the connect, but we received
  326. * something else before any CONNACK
  327. */
  328. if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
  329. par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
  330. lwsl_notice("%s: server sent non-CONNACK\n",
  331. __func__);
  332. goto send_protocol_error_and_close;
  333. }
  334. #endif /* LWS_WITH_CLIENT */
  335. n = map_flags[par->packet_type_flags >> 4];
  336. /*
  337. * Where a flag bit is marked as “Reserved”, it is
  338. * reserved for future use and MUST be set to the value
  339. * listed [MQTT-2.1.3-1].
  340. */
  341. if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
  342. ((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
  343. lwsl_notice("%s: wsi %p: bad flags, 0x%02x mask 0x%02x (len %d)\n",
  344. __func__, wsi, par->packet_type_flags, n, (int)len + 1);
  345. lwsl_hexdump_err(buf - 1, len + 1);
  346. goto send_protocol_error_and_close;
  347. }
  348. lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
  349. __func__, par->packet_type_flags >> 4,
  350. par->packet_type_flags & 0xf);
  351. /* allows us to know if a property that can only be
  352. * given once, appears twice */
  353. memset(par->props_seen, 0, sizeof(par->props_seen));
  354. par->state = par->packet_type_flags & 0xf0;
  355. break;
  356. case LMQCPP_CONNECT_PACKET:
  357. lwsl_debug("%s: received CONNECT pkt\n", __func__);
  358. par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
  359. lws_mqtt_vbi_init(&par->vbit);
  360. break;
  361. case LMQCPP_CONNECT_REMAINING_LEN_VBI:
  362. switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
  363. case LMSPR_NEED_MORE:
  364. break;
  365. case LMSPR_COMPLETED:
  366. par->cpkt_remlen = par->vbit.value;
  367. n = map_flags[ctl_pkt_type(par)];
  368. lws_mqtt_str_init(&par->s_temp, par->temp,
  369. sizeof(par->temp), 0);
  370. par->state = LMQCPP_CONNECT_VH_PNAME;
  371. break;
  372. default:
  373. lwsl_notice("%s: bad vbi\n", __func__);
  374. goto send_protocol_error_and_close;
  375. }
  376. break;
  377. case LMQCPP_CONNECT_VH_PNAME:
  378. switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
  379. case LMSPR_NEED_MORE:
  380. break;
  381. case LMSPR_COMPLETED:
  382. if (par->s_temp.len != 4 ||
  383. memcmp(par->s_temp.buf, "MQTT",
  384. par->s_temp.len)) {
  385. lwsl_notice("%s: protocol name: %.*s\n",
  386. __func__, par->s_temp.len,
  387. par->s_temp.buf);
  388. goto send_unsupp_connack_and_close;
  389. }
  390. par->state = LMQCPP_CONNECT_VH_PVERSION;
  391. break;
  392. default:
  393. lwsl_notice("%s: bad protocol name\n", __func__);
  394. goto send_protocol_error_and_close;
  395. }
  396. break;
  397. case LMQCPP_CONNECT_VH_PVERSION:
  398. par->conn_protocol_version = *buf++;
  399. len--;
  400. if (par->conn_protocol_version != 5) {
  401. lwsl_info("%s: unsupported MQTT version %d\n",
  402. __func__, par->conn_protocol_version);
  403. goto send_unsupp_connack_and_close;
  404. }
  405. par->state = LMQCPP_CONNECT_VH_FLAGS;
  406. break;
  407. case LMQCPP_CONNECT_VH_FLAGS:
  408. par->cpkt_flags = *buf++;
  409. len--;
  410. if (par->cpkt_flags & 1) {
  411. /*
  412. * The Server MUST validate that the reserved
  413. * flag in the CONNECT packet is set to 0
  414. * [MQTT-3.1.2-3].
  415. */
  416. par->reason = LMQCP_REASON_MALFORMED_PACKET;
  417. goto send_reason_and_close;
  418. }
  419. /*
  420. * conn_flags specifies the Will Properties that should
  421. * appear in the payload section
  422. */
  423. lws_mqtt_2byte_init(&par->vbit);
  424. par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
  425. break;
  426. case LMQCPP_CONNECT_VH_KEEPALIVE:
  427. switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
  428. case LMSPR_NEED_MORE:
  429. break;
  430. case LMSPR_COMPLETED:
  431. par->keepalive = (uint16_t)par->vbit.value;
  432. lws_mqtt_vbi_init(&par->vbit);
  433. par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
  434. break;
  435. default:
  436. lwsl_notice("%s: ka bad vbi\n", __func__);
  437. goto send_protocol_error_and_close;
  438. }
  439. break;
  440. case LMQCPP_PINGRESP_ZERO:
  441. len--;
  442. /* second byte of PINGRESP must be zero */
  443. if (*buf++)
  444. goto send_protocol_error_and_close;
  445. goto cmd_completion;
  446. case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
  447. switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
  448. case LMSPR_NEED_MORE:
  449. break;
  450. case LMSPR_COMPLETED:
  451. /* reset consumption counter */
  452. par->consumed = 0;
  453. par->props_len = par->vbit.value;
  454. lws_mqtt_vbi_init(&par->vbit);
  455. par->state = LMQCPP_PROP_ID_VBI;
  456. break;
  457. default:
  458. lwsl_notice("%s: connpr bad vbi\n", __func__);
  459. goto send_protocol_error_and_close;
  460. }
  461. break;
  462. case LMQCPP_PUBLISH_PACKET:
  463. if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
  464. lwsl_notice("%s: Topic rx before subscribing\n",
  465. __func__);
  466. goto send_protocol_error_and_close;
  467. }
  468. lwsl_info("%s: received PUBLISH pkt\n", __func__);
  469. par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
  470. lws_mqtt_vbi_init(&par->vbit);
  471. break;
  472. case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
  473. switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
  474. case LMSPR_NEED_MORE:
  475. break;
  476. case LMSPR_COMPLETED:
  477. par->cpkt_remlen = par->vbit.value;
  478. lwsl_debug("%s: PUBLISH pkt len = %d\n",
  479. __func__, (int)par->cpkt_remlen);
  480. /* Move on to PUBLISH's variable header */
  481. par->state = LMQCPP_PUBLISH_VH_TOPIC;
  482. break;
  483. default:
  484. lwsl_notice("%s: pubrem bad vbi\n", __func__);
  485. goto send_protocol_error_and_close;
  486. }
  487. break;
  488. case LMQCPP_PUBLISH_VH_TOPIC:
  489. {
  490. lws_mqtt_publish_param_t *pub = NULL;
  491. if (len < 2) {
  492. lwsl_notice("%s: topic too short\n", __func__);
  493. return -1;
  494. }
  495. /* Topic len */
  496. par->n = lws_ser_ru16be(buf);
  497. buf += 2;
  498. len -= 2;
  499. if (len < par->n) {/* the way this is written... */
  500. lwsl_notice("%s: len breakage\n", __func__);
  501. return -1;
  502. }
  503. /* Invalid topic len */
  504. if (par->n == 0) {
  505. lwsl_notice("%s: zero topic len\n", __func__);
  506. par->reason = LMQCP_REASON_MALFORMED_PACKET;
  507. goto send_reason_and_close;
  508. }
  509. lwsl_debug("%s: PUBLISH topic len %d\n",
  510. __func__, (int)par->n);
  511. assert(!wsi->mqtt->rx_cpkt_param);
  512. wsi->mqtt->rx_cpkt_param = lws_zalloc(
  513. sizeof(lws_mqtt_publish_param_t), "rx pub param");
  514. if (!wsi->mqtt->rx_cpkt_param)
  515. goto oom;
  516. pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
  517. pub->topic_len = par->n;
  518. /* Topic Name */
  519. pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
  520. "rx publish topic");
  521. if (!pub->topic)
  522. goto oom;
  523. lws_strncpy(pub->topic, (const char *)buf,
  524. (size_t)pub->topic_len + 1);
  525. buf += pub->topic_len;
  526. len -= pub->topic_len;
  527. /* Extract QoS Level from Fixed Header Flags */
  528. pub->qos = (lws_mqtt_qos_levels_t)
  529. ((par->packet_type_flags >> 1) & 0x3);
  530. pub->payload_pos = 0;
  531. pub->payload_len = par->cpkt_remlen -
  532. (2 + pub->topic_len + ((pub->qos) ? 2 : 0));
  533. switch (pub->qos) {
  534. case QOS0:
  535. par->state = LMQCPP_PAYLOAD;
  536. if (pub->payload_len == 0)
  537. goto cmd_completion;
  538. break;
  539. case QOS1:
  540. case QOS2:
  541. par->state = LMQCPP_PUBLISH_VH_PKT_ID;
  542. break;
  543. default:
  544. par->reason = LMQCP_REASON_MALFORMED_PACKET;
  545. lws_free_set_NULL(pub->topic);
  546. lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
  547. goto send_reason_and_close;
  548. }
  549. break;
  550. }
  551. case LMQCPP_PUBLISH_VH_PKT_ID:
  552. {
  553. lws_mqtt_publish_param_t *pub =
  554. (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
  555. if (len < 2) {
  556. lwsl_notice("%s: len breakage 2\n", __func__);
  557. return -1;
  558. }
  559. par->cpkt_id = lws_ser_ru16be(buf);
  560. buf += 2;
  561. len -= 2;
  562. wsi->mqtt->ack_pkt_id = par->cpkt_id;
  563. lwsl_debug("%s: Packet ID %d\n",
  564. __func__, (int)par->cpkt_id);
  565. par->state = LMQCPP_PAYLOAD;
  566. pub->payload_pos = 0;
  567. pub->payload_len = par->cpkt_remlen -
  568. (2 + pub->topic_len + ((pub->qos) ? 2 : 0));
  569. if (pub->payload_len == 0)
  570. goto cmd_completion;
  571. break;
  572. }
  573. case LMQCPP_PAYLOAD:
  574. {
  575. lws_mqtt_publish_param_t *pub =
  576. (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
  577. if (pub == NULL) {
  578. lwsl_err("%s: Uninitialized pub_param\n",
  579. __func__);
  580. goto send_protocol_error_and_close;
  581. }
  582. pub->payload = buf;
  583. goto cmd_completion;
  584. }
  585. case LMQCPP_CONNACK_PACKET:
  586. if (!lwsi_role_client(wsi)) {
  587. lwsl_err("%s: CONNACK is only Server to Client",
  588. __func__);
  589. goto send_unsupp_connack_and_close;
  590. }
  591. lwsl_debug("%s: received CONNACK pkt\n", __func__);
  592. lws_mqtt_vbi_init(&par->vbit);
  593. switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
  594. case LMSPR_NEED_MORE:
  595. break;
  596. case LMSPR_COMPLETED:
  597. par->cpkt_remlen = par->vbit.value;
  598. lwsl_debug("%s: CONNACK pkt len = %d\n",
  599. __func__, (int)par->cpkt_remlen);
  600. if (par->cpkt_remlen != 2)
  601. goto send_protocol_error_and_close;
  602. par->state = LMQCPP_CONNACK_VH_FLAGS;
  603. break;
  604. default:
  605. lwsl_notice("%s: connack bad vbi\n", __func__);
  606. goto send_protocol_error_and_close;
  607. }
  608. break;
  609. case LMQCPP_CONNACK_VH_FLAGS:
  610. {
  611. lws_mqttc_t *c = &wsi->mqtt->client;
  612. par->cpkt_flags = *buf++;
  613. len--;
  614. if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
  615. /*
  616. * Byte 1 is the "Connect Acknowledge
  617. * Flags". Bits 7-1 are reserved and
  618. * MUST be set to 0.
  619. */
  620. par->reason = LMQCP_REASON_MALFORMED_PACKET;
  621. goto send_reason_and_close;
  622. }
  623. /*
  624. * If the Server accepts a connection with
  625. * CleanSession set to 1, the Server MUST set
  626. * Session Present to 0 in the CONNACK packet
  627. * in addition to setting a zero return code
  628. * in the CONNACK packet [MQTT-3.2.2-1]. If
  629. * the Server accepts a connection with
  630. * CleanSession set to 0, the value set in
  631. * Session Present depends on whether the
  632. * Server already has stored Session state for
  633. * the supplied client ID. If the Server has
  634. * stored Session state, it MUST set
  635. * SessionPresent to 1 in the CONNACK packet
  636. * [MQTT-3.2.2-2]. If the Server does not have
  637. * stored Session state, it MUST set Session
  638. * Present to 0 in the CONNACK packet. This is
  639. * in addition to setting a zero return code
  640. * in the CONNACK packet [MQTT-3.2.2-3].
  641. */
  642. if ((c->conn_flags & LMQCFT_CLEAN_START) &&
  643. (par->cpkt_flags & LMQCFT_SESSION_PRESENT))
  644. goto send_protocol_error_and_close;
  645. wsi->mqtt->session_resumed = (par->cpkt_flags &
  646. LMQCFT_SESSION_PRESENT);
  647. /* Move on to Connect Return Code */
  648. par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
  649. break;
  650. }
  651. case LMQCPP_CONNACK_VH_RETURN_CODE:
  652. par->conn_rc = *buf++;
  653. len--;
  654. /*
  655. * If a server sends a CONNACK packet containing a
  656. * non-zero return code it MUST then close the Network
  657. * Connection [MQTT-3.2.2-5]
  658. */
  659. switch (par->conn_rc) {
  660. case 0:
  661. goto cmd_completion;
  662. case 1:
  663. case 2:
  664. case 3:
  665. case 4:
  666. case 5:
  667. par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
  668. par->conn_rc - 1;
  669. goto send_reason_and_close;
  670. default:
  671. lwsl_notice("%s: bad connack retcode\n", __func__);
  672. goto send_protocol_error_and_close;
  673. }
  674. break;
  675. /* SUBACK */
  676. case LMQCPP_SUBACK_PACKET:
  677. if (!lwsi_role_client(wsi)) {
  678. lwsl_err("%s: SUBACK is only Server to Client",
  679. __func__);
  680. goto send_unsupp_connack_and_close;
  681. }
  682. lwsl_debug("%s: received SUBACK pkt\n", __func__);
  683. lws_mqtt_vbi_init(&par->vbit);
  684. switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
  685. case LMSPR_NEED_MORE:
  686. break;
  687. case LMSPR_COMPLETED:
  688. par->cpkt_remlen = par->vbit.value;
  689. lwsl_debug("%s: SUBACK pkt len = %d\n",
  690. __func__, (int)par->cpkt_remlen);
  691. if (par->cpkt_remlen <= 2)
  692. goto send_protocol_error_and_close;
  693. par->state = LMQCPP_SUBACK_VH_PKT_ID;
  694. break;
  695. default:
  696. lwsl_notice("%s: suback bad vbi\n", __func__);
  697. goto send_protocol_error_and_close;
  698. }
  699. break;
  700. case LMQCPP_SUBACK_VH_PKT_ID:
  701. if (len < 2) {
  702. lwsl_notice("%s: len breakage 4\n", __func__);
  703. return -1;
  704. }
  705. par->cpkt_id = lws_ser_ru16be(buf);
  706. wsi->mqtt->ack_pkt_id = par->cpkt_id;
  707. buf += 2;
  708. len -= 2;
  709. par->cpkt_remlen -= 2;
  710. par->n = 0;
  711. par->state = LMQCPP_SUBACK_PAYLOAD;
  712. *par->temp = 0;
  713. break;
  714. case LMQCPP_SUBACK_PAYLOAD:
  715. {
  716. lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
  717. len--;
  718. switch (qos) {
  719. case QOS0:
  720. case QOS1:
  721. case QOS2:
  722. break;
  723. case FAILURE_QOS_LEVEL:
  724. goto send_protocol_error_and_close;
  725. default:
  726. par->reason = LMQCP_REASON_MALFORMED_PACKET;
  727. goto send_reason_and_close;
  728. }
  729. if (++(par->n) == par->cpkt_remlen) {
  730. par->n = 0;
  731. goto cmd_completion;
  732. }
  733. break;
  734. }
  735. /* UNSUBACK */
  736. case LMQCPP_UNSUBACK_PACKET:
  737. if (!lwsi_role_client(wsi)) {
  738. lwsl_err("%s: UNSUBACK is only Server to Client",
  739. __func__);
  740. goto send_unsupp_connack_and_close;
  741. }
  742. lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
  743. lws_mqtt_vbi_init(&par->vbit);
  744. switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
  745. case LMSPR_NEED_MORE:
  746. break;
  747. case LMSPR_COMPLETED:
  748. par->cpkt_remlen = par->vbit.value;
  749. lwsl_debug("%s: UNSUBACK pkt len = %d\n",
  750. __func__, (int)par->cpkt_remlen);
  751. if (par->cpkt_remlen < 2)
  752. goto send_protocol_error_and_close;
  753. par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
  754. break;
  755. default:
  756. lwsl_notice("%s: unsuback bad vbi\n", __func__);
  757. goto send_protocol_error_and_close;
  758. }
  759. break;
  760. case LMQCPP_UNSUBACK_VH_PKT_ID:
  761. if (len < 2) {
  762. lwsl_notice("%s: len breakage 3\n", __func__);
  763. return -1;
  764. }
  765. par->cpkt_id = lws_ser_ru16be(buf);
  766. wsi->mqtt->ack_pkt_id = par->cpkt_id;
  767. buf += 2;
  768. len -= 2;
  769. par->cpkt_remlen -= 2;
  770. par->n = 0;
  771. goto cmd_completion;
  772. case LMQCPP_PUBACK_PACKET:
  773. lws_mqtt_vbi_init(&par->vbit);
  774. switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
  775. case LMSPR_NEED_MORE:
  776. break;
  777. case LMSPR_COMPLETED:
  778. par->cpkt_remlen = par->vbit.value;
  779. lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
  780. (int)par->cpkt_remlen);
  781. /*
  782. * must be 4 or more, with special case that 2
  783. * means success with no reason code or props
  784. */
  785. if (par->cpkt_remlen <= 1 ||
  786. par->cpkt_remlen == 3)
  787. goto send_protocol_error_and_close;
  788. par->state = LMQCPP_PUBACK_VH_PKT_ID;
  789. par->fixed_seen[2] = par->fixed_seen[3] = 0;
  790. par->fixed = 0;
  791. par->n = 0;
  792. break;
  793. default:
  794. lwsl_notice("%s: puback bad vbi\n", __func__);
  795. goto send_protocol_error_and_close;
  796. }
  797. break;
  798. case LMQCPP_PUBACK_VH_PKT_ID:
  799. /*
  800. * There are 3 fixed bytes and then a VBI for the
  801. * property section length
  802. */
  803. par->fixed_seen[par->fixed++] = *buf++;
  804. if (len < par->cpkt_remlen - par->n) {
  805. lwsl_notice("%s: len breakage 4\n", __func__);
  806. return -1;
  807. }
  808. len--;
  809. par->n++;
  810. if (par->fixed == 2)
  811. par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
  812. if (par->fixed == 3) {
  813. lws_mqtt_vbi_init(&par->vbit);
  814. par->props_consumed = 0;
  815. par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
  816. }
  817. /* length of 2 is truncated packet and we completed it */
  818. if (par->cpkt_remlen == par->fixed)
  819. goto cmd_completion;
  820. break;
  821. case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
  822. switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
  823. case LMSPR_NEED_MORE:
  824. break;
  825. case LMSPR_COMPLETED:
  826. par->props_len = par->vbit.value;
  827. lwsl_info("%s: PUBACK props len = %d\n",
  828. __func__, (int)par->cpkt_remlen);
  829. /*
  830. * If there are no properties, this is a
  831. * command completion event in itself
  832. */
  833. if (!par->props_len)
  834. goto cmd_completion;
  835. /*
  836. * Otherwise consume the properties before
  837. * completing the command
  838. */
  839. lws_mqtt_vbi_init(&par->vbit);
  840. par->state = LMQCPP_PUBACK_VH_PKT_ID;
  841. break;
  842. default:
  843. lwsl_notice("%s: puback pr bad vbi\n", __func__);
  844. goto send_protocol_error_and_close;
  845. }
  846. break;
  847. case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
  848. /*
  849. * TODO: stash the props
  850. */
  851. par->props_consumed++;
  852. len--;
  853. buf++;
  854. if (par->props_len != par->props_consumed)
  855. break;
  856. cmd_completion:
  857. /*
  858. * We come here when we understood we just processed
  859. * the last byte of a command packet, regardless of the
  860. * packet type
  861. */
  862. par->state = LMQCPP_IDLE;
  863. switch (par->packet_type_flags >> 4) {
  864. case LMQCP_STOC_CONNACK:
  865. lwsl_info("%s: cmd_completion: CONNACK\n",
  866. __func__);
  867. /*
  868. * Getting the CONNACK means we are the first,
  869. * the nwsi, and we succeeded to create a new
  870. * network connection ourselves.
  871. *
  872. * Since others may join us sharing the nwsi,
  873. * and we may close while they still want to use
  874. * it, our wsi lifecycle alone can no longer
  875. * define the lifecycle of the nwsi... it means
  876. * we need to do a "magic trick" and instead of
  877. * being both the nwsi and act like a child
  878. * stream, create a new wsi to take over the
  879. * nwsi duties and turn our wsi into a child of
  880. * the nwsi with its own lifecycle.
  881. *
  882. * The nwsi gets a mostly empty wsi->nwsi used
  883. * to track already-subscribed topics globally
  884. * for the connection.
  885. */
  886. /* we were under SENT_CLIENT_HANDSHAKE timeout */
  887. lws_set_timeout(wsi, 0, 0);
  888. w = lws_create_new_server_wsi(wsi->a.vhost,
  889. wsi->tsi);
  890. if (!w) {
  891. lwsl_notice("%s: sid 1 migrate failed\n",
  892. __func__);
  893. return -1;
  894. }
  895. wsi->mux.highest_sid = 1;
  896. lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
  897. wsi->mux_substream = 1;
  898. w->mux_substream = 1;
  899. w->client_mux_substream = 1;
  900. wsi->client_mux_migrated = 1;
  901. wsi->told_user_closed = 1; /* don't tell nwsi closed */
  902. lwsi_set_state(w, LRS_ESTABLISHED);
  903. lwsi_set_state(wsi, LRS_ESTABLISHED);
  904. lwsi_set_role(w, lwsi_role(wsi));
  905. #if defined(LWS_WITH_CLIENT)
  906. w->flags = wsi->flags;
  907. #endif
  908. w->mqtt = wsi->mqtt;
  909. wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
  910. if (!wsi->mqtt)
  911. return -1;
  912. w->mqtt->wsi = w;
  913. w->a.protocol = wsi->a.protocol;
  914. if (w->user_space &&
  915. !w->user_space_externally_allocated)
  916. lws_free_set_NULL(w->user_space);
  917. w->user_space = wsi->user_space;
  918. wsi->user_space = NULL;
  919. w->user_space_externally_allocated =
  920. wsi->user_space_externally_allocated;
  921. if (lws_ensure_user_space(w))
  922. goto bail1;
  923. w->a.opaque_user_data = wsi->a.opaque_user_data;
  924. wsi->a.opaque_user_data = NULL;
  925. w->stash = wsi->stash;
  926. wsi->stash = NULL;
  927. lws_mux_mark_immortal(w);
  928. lwsl_notice("%s: migrated nwsi %p to sid 1 %p\n",
  929. __func__, wsi, w);
  930. #if defined(LWS_WITH_SERVER_STATUS)
  931. wsi->a.vhost->conn_stats.h2_subs++;
  932. #endif
  933. /*
  934. * It was the last thing we were waiting for
  935. * before we can be fully ESTABLISHED
  936. */
  937. if (lws_mqtt_set_client_established(w)) {
  938. lwsl_notice("%s: set EST fail\n", __func__);
  939. return -1;
  940. }
  941. /* get the ball rolling */
  942. lws_validity_confirmed(wsi);
  943. /* well, add the queued guys as children */
  944. lws_wsi_mux_apply_queue(wsi);
  945. break;
  946. bail1:
  947. /* undo the insert */
  948. wsi->mux.child_list = w->mux.sibling_list;
  949. wsi->mux.child_count--;
  950. w->a.context->count_wsi_allocated--;
  951. if (w->user_space)
  952. lws_free_set_NULL(w->user_space);
  953. w->a.vhost->protocols[0].callback(w,
  954. LWS_CALLBACK_WSI_DESTROY,
  955. NULL, NULL, 0);
  956. lws_vhost_unbind_wsi(w);
  957. lws_free(w);
  958. return 0;
  959. case LMQCP_PUBACK:
  960. lwsl_info("%s: cmd_completion: PUBACK\n",
  961. __func__);
  962. /*
  963. * Figure out which child asked for this
  964. */
  965. n = 0;
  966. lws_start_foreach_ll(struct lws *, w,
  967. wsi->mux.child_list) {
  968. if (w->mqtt->unacked_publish &&
  969. w->mqtt->ack_pkt_id == par->cpkt_id) {
  970. char requested_close = 0;
  971. w->mqtt->unacked_publish = 0;
  972. if (user_callback_handle_rxflow(
  973. w->a.protocol->callback,
  974. w, LWS_CALLBACK_MQTT_ACK,
  975. w->user_space, NULL, 0) < 0) {
  976. lwsl_info("%s: MQTT_ACK requests close\n",
  977. __func__);
  978. requested_close = 1;
  979. }
  980. n = 1;
  981. /*
  982. * We got an assertive PUBACK,
  983. * no need for ACK timeout wait
  984. * any more
  985. */
  986. lws_sul_cancel(&w->mqtt->sul_qos1_puback_wait);
  987. if (requested_close) {
  988. __lws_close_free_wsi(w,
  989. 0, "ack cb");
  990. break;
  991. }
  992. break;
  993. }
  994. } lws_end_foreach_ll(w, mux.sibling_list);
  995. if (!n) {
  996. lwsl_err("%s: unsolicited PUBACK\n",
  997. __func__);
  998. return -1;
  999. }
  1000. /*
  1001. * If we published something and it was acked,
  1002. * our connection is definitely working in both
  1003. * directions at the moment.
  1004. */
  1005. lws_validity_confirmed(wsi);
  1006. break;
  1007. case LMQCP_STOC_PINGRESP:
  1008. lwsl_info("%s: cmd_completion: PINGRESP\n",
  1009. __func__);
  1010. /*
  1011. * If we asked for a PINGRESP and it came,
  1012. * our connection is definitely working in both
  1013. * directions at the moment.
  1014. */
  1015. lws_validity_confirmed(wsi);
  1016. break;
  1017. case LMQCP_STOC_SUBACK:
  1018. lwsl_info("%s: cmd_completion: SUBACK\n",
  1019. __func__);
  1020. /*
  1021. * Figure out which child asked for this
  1022. */
  1023. n = 0;
  1024. lws_start_foreach_ll(struct lws *, w,
  1025. wsi->mux.child_list) {
  1026. if (w->mqtt->inside_subscribe &&
  1027. w->mqtt->ack_pkt_id == par->cpkt_id) {
  1028. w->mqtt->inside_subscribe = 0;
  1029. if (user_callback_handle_rxflow(
  1030. w->a.protocol->callback,
  1031. w, LWS_CALLBACK_MQTT_SUBSCRIBED,
  1032. w->user_space, NULL, 0) < 0) {
  1033. lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
  1034. __func__);
  1035. return -1;
  1036. }
  1037. n = 1;
  1038. break;
  1039. }
  1040. } lws_end_foreach_ll(w, mux.sibling_list);
  1041. if (!n) {
  1042. lwsl_err("%s: unsolicited SUBACK\n",
  1043. __func__);
  1044. return -1;
  1045. }
  1046. /*
  1047. * If we subscribed to something and SUBACK came,
  1048. * our connection is definitely working in both
  1049. * directions at the moment.
  1050. */
  1051. lws_validity_confirmed(wsi);
  1052. break;
  1053. case LMQCP_STOC_UNSUBACK:
  1054. {
  1055. char requested_close = 0;
  1056. lwsl_info("%s: cmd_completion: UNSUBACK\n",
  1057. __func__);
  1058. /*
  1059. * Figure out which child asked for this
  1060. */
  1061. n = 0;
  1062. lws_start_foreach_ll(struct lws *, w,
  1063. wsi->mux.child_list) {
  1064. if (w->mqtt->inside_unsubscribe &&
  1065. w->mqtt->ack_pkt_id == par->cpkt_id) {
  1066. struct lws *nwsi = lws_get_network_wsi(w);
  1067. /*
  1068. * No more subscribers left,
  1069. * remove the topic from nwsi
  1070. */
  1071. lws_mqtt_client_remove_subs(nwsi->mqtt);
  1072. w->mqtt->inside_unsubscribe = 0;
  1073. if (user_callback_handle_rxflow(
  1074. w->a.protocol->callback,
  1075. w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
  1076. w->user_space, NULL, 0) < 0) {
  1077. lwsl_info("%s: MQTT_UNSUBACK requests close\n",
  1078. __func__);
  1079. requested_close = 1;
  1080. }
  1081. n = 1;
  1082. if (requested_close) {
  1083. __lws_close_free_wsi(w,
  1084. 0, "unsub ack cb");
  1085. break;
  1086. }
  1087. break;
  1088. }
  1089. } lws_end_foreach_ll(w, mux.sibling_list);
  1090. if (!n) {
  1091. lwsl_err("%s: unsolicited UNSUBACK\n",
  1092. __func__);
  1093. return -1;
  1094. }
  1095. /*
  1096. * If we unsubscribed to something and
  1097. * UNSUBACK came, our connection is
  1098. * definitely working in both
  1099. * directions at the moment.
  1100. */
  1101. lws_validity_confirmed(wsi);
  1102. break;
  1103. }
  1104. case LMQCP_PUBLISH:
  1105. {
  1106. lws_mqtt_publish_param_t *pub =
  1107. (lws_mqtt_publish_param_t *)
  1108. wsi->mqtt->rx_cpkt_param;
  1109. size_t chunk;
  1110. if (pub == NULL) {
  1111. lwsl_notice("%s: no pub\n", __func__);
  1112. return -1;
  1113. }
  1114. /*
  1115. * RX PUBLISH is delivered to any children that
  1116. * registered for the related topic
  1117. */
  1118. n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
  1119. chunk = pub->payload_len - pub->payload_pos;
  1120. if (chunk > len)
  1121. chunk = len;
  1122. lws_start_foreach_ll(struct lws *, w,
  1123. wsi->mux.child_list) {
  1124. if (lws_mqtt_find_sub(w->mqtt,
  1125. pub->topic))
  1126. if (w->a.protocol->callback(
  1127. w, n,
  1128. w->user_space,
  1129. (void *)pub,
  1130. chunk))
  1131. return 1;
  1132. } lws_end_foreach_ll(w, mux.sibling_list);
  1133. pub->payload_pos += (uint32_t)chunk;
  1134. len -= chunk;
  1135. buf += chunk;
  1136. lwsl_debug("%s: post pos %d, plen %d, len %d\n",
  1137. __func__, (int)pub->payload_pos,
  1138. (int)pub->payload_len, (int)len);
  1139. if (pub->payload_pos != pub->payload_len) {
  1140. /*
  1141. * More chunks of the payload pending,
  1142. * blocking this connection from doing
  1143. * anything else
  1144. */
  1145. par->state = LMQCPP_PAYLOAD;
  1146. break;
  1147. }
  1148. /* For QOS>0, send out PUBACK */
  1149. if (pub->qos) {
  1150. wsi->mqtt->send_puback = 1;
  1151. lws_callback_on_writable(wsi);
  1152. }
  1153. par->payload_consumed = 0;
  1154. lws_free_set_NULL(pub->topic);
  1155. lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
  1156. break;
  1157. }
  1158. default:
  1159. break;
  1160. }
  1161. break;
  1162. case LMQCPP_PROP_ID_VBI:
  1163. switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
  1164. case LMSPR_NEED_MORE:
  1165. break;
  1166. case LMSPR_COMPLETED:
  1167. par->consumed += par->vbit.consumed;
  1168. if (par->vbit.value >
  1169. LWS_ARRAY_SIZE(property_valid)) {
  1170. lwsl_notice("%s: undef prop id 0x%x\n",
  1171. __func__, (int)par->vbit.value);
  1172. goto send_protocol_error_and_close;
  1173. }
  1174. if (!(property_valid[par->vbit.value] &
  1175. (1 << ctl_pkt_type(par)))) {
  1176. lwsl_notice("%s: prop id 0x%x invalid for"
  1177. " control pkt %d\n", __func__,
  1178. (int)par->vbit.value,
  1179. ctl_pkt_type(par));
  1180. goto send_protocol_error_and_close;
  1181. }
  1182. par->prop_id = par->vbit.value;
  1183. par->flag_prop_multi =
  1184. par->props_seen[par->prop_id >> 3] &
  1185. (1 << (par->prop_id & 7));
  1186. par->props_seen[par->prop_id >> 3] |=
  1187. (1 << (par->prop_id & 7));
  1188. /*
  1189. * even if it's not a vbi property arg,
  1190. * .consumed of this will be zero the first time
  1191. */
  1192. lws_mqtt_vbi_init(&par->vbit);
  1193. /*
  1194. * if it's a string, next state must set the
  1195. * destination and size limit itself. But
  1196. * resetting it generically here lets it use
  1197. * lws_mqtt_str_first() to understand it's the
  1198. * first time around.
  1199. */
  1200. lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
  1201. /* property arg state enums are so encoded */
  1202. par->state = 0x100 | par->vbit.value;
  1203. break;
  1204. default:
  1205. lwsl_notice("%s: prop id bad vbi\n", __func__);
  1206. goto send_protocol_error_and_close;
  1207. }
  1208. break;
  1209. /*
  1210. * All possible property payloads... restricting which ones
  1211. * can appear in which control packets is already done above
  1212. * in LMQCPP_PROP_ID_VBI
  1213. */
  1214. case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
  1215. case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
  1216. case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
  1217. case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
  1218. case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
  1219. case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
  1220. case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
  1221. case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
  1222. if (par->flag_prop_multi)
  1223. goto singular_prop_seen_twice;
  1224. par->payload_format = *buf++;
  1225. len--;
  1226. if (lws_mqtt_pconsume(par, 1))
  1227. goto send_protocol_error_and_close;
  1228. break;
  1229. case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
  1230. case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
  1231. case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
  1232. case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
  1233. if (par->flag_prop_multi)
  1234. goto singular_prop_seen_twice;
  1235. if (lws_mqtt_mb_first(&par->vbit))
  1236. lws_mqtt_4byte_init(&par->vbit);
  1237. switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
  1238. case LMSPR_NEED_MORE:
  1239. break;
  1240. case LMSPR_COMPLETED:
  1241. if (lws_mqtt_pconsume(par, par->vbit.consumed))
  1242. goto send_protocol_error_and_close;
  1243. break;
  1244. default:
  1245. goto send_protocol_error_and_close;
  1246. }
  1247. break;
  1248. case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
  1249. case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
  1250. case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
  1251. case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
  1252. if (par->flag_prop_multi)
  1253. goto singular_prop_seen_twice;
  1254. if (lws_mqtt_mb_first(&par->vbit))
  1255. lws_mqtt_2byte_init(&par->vbit);
  1256. switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
  1257. case LMSPR_NEED_MORE:
  1258. break;
  1259. case LMSPR_COMPLETED:
  1260. if (lws_mqtt_pconsume(par, par->vbit.consumed))
  1261. goto send_protocol_error_and_close;
  1262. break;
  1263. default:
  1264. goto send_protocol_error_and_close;
  1265. }
  1266. break;
  1267. case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
  1268. case LMQCPP_PROP_AUTH_METHOD_UTF8S:
  1269. case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
  1270. case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
  1271. case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
  1272. case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
  1273. case LMQCPP_PROP_REASON_STRING_UTF8S:
  1274. case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
  1275. case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
  1276. if (par->flag_prop_multi)
  1277. goto singular_prop_seen_twice;
  1278. if (lws_mqtt_str_first(&par->s_temp))
  1279. lws_mqtt_str_init(&par->s_temp, par->temp,
  1280. sizeof(par->temp), 0);
  1281. switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
  1282. case LMSPR_NEED_MORE:
  1283. break;
  1284. case LMSPR_COMPLETED:
  1285. if (lws_mqtt_pconsume(par, par->s_temp.len))
  1286. goto send_protocol_error_and_close;
  1287. break;
  1288. default:
  1289. lwsl_info("%s: bad protocol name\n", __func__);
  1290. goto send_protocol_error_and_close;
  1291. }
  1292. break;
  1293. case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
  1294. case LMQCPP_PROP_CORRELATION_BINDATA:
  1295. case LMQCPP_PROP_AUTH_DATA_BINDATA:
  1296. /* TODO */
  1297. lwsl_err("%s: Unimplemented packet state 0x%x\n",
  1298. __func__, par->state);
  1299. return -1;
  1300. }
  1301. }
  1302. return 0;
  1303. oom:
  1304. lwsl_err("%s: OOM!\n", __func__);
  1305. goto send_protocol_error_and_close;
  1306. singular_prop_seen_twice:
  1307. lwsl_info("%s: property appears twice\n", __func__);
  1308. send_protocol_error_and_close:
  1309. lwsl_notice("%s: peac\n", __func__);
  1310. par->reason = LMQCP_REASON_PROTOCOL_ERROR;
  1311. send_reason_and_close:
  1312. lwsl_notice("%s: srac\n", __func__);
  1313. par->flag_pending_send_reason_close = 1;
  1314. goto ask;
  1315. send_unsupp_connack_and_close:
  1316. lwsl_notice("%s: unsupac\n", __func__);
  1317. par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
  1318. par->flag_pending_send_connack_close = 1;
  1319. ask:
  1320. /* Should we ask for clients? */
  1321. lws_callback_on_writable(wsi);
  1322. return -1;
  1323. }
  1324. int
  1325. lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
  1326. uint8_t dup, lws_mqtt_qos_levels_t qos,
  1327. uint8_t retain)
  1328. {
  1329. lws_mqtt_fixed_hdr_t hdr;
  1330. hdr.bits = 0;
  1331. hdr.flags.ctrl_pkt_type = (uint8_t) ctrl_pkt_type;
  1332. switch(ctrl_pkt_type) {
  1333. case LMQCP_PUBLISH:
  1334. hdr.flags.dup = !!dup;
  1335. /*
  1336. * A PUBLISH Packet MUST NOT have both QoS bits set to
  1337. * 1. If a Server or Client receives a PUBLISH Packet
  1338. * which has both QoS bits set to 1 it MUST close the
  1339. * Network Connection [MQTT-3.3.1-4].
  1340. */
  1341. if (qos >= RESERVED_QOS_LEVEL) {
  1342. lwsl_err("%s: Unsupport QoS level 0x%x\n",
  1343. __func__, qos);
  1344. return -1;
  1345. }
  1346. hdr.flags.qos = (uint8_t)qos;
  1347. hdr.flags.retain = !!retain;
  1348. break;
  1349. case LMQCP_CTOS_CONNECT:
  1350. case LMQCP_STOC_CONNACK:
  1351. case LMQCP_PUBACK:
  1352. case LMQCP_PUBREC:
  1353. case LMQCP_PUBCOMP:
  1354. case LMQCP_STOC_SUBACK:
  1355. case LMQCP_STOC_UNSUBACK:
  1356. case LMQCP_CTOS_PINGREQ:
  1357. case LMQCP_STOC_PINGRESP:
  1358. case LMQCP_DISCONNECT:
  1359. case LMQCP_AUTH:
  1360. hdr.bits &= 0xf0;
  1361. break;
  1362. /*
  1363. * Bits 3,2,1 and 0 of the fixed header of the PUBREL,
  1364. * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
  1365. * MUST be set to 0,0,1 and 0 respectively. The Server MUST
  1366. * treat any other value as malformed and close the Network
  1367. * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
  1368. */
  1369. case LMQCP_PUBREL:
  1370. case LMQCP_CTOS_SUBSCRIBE:
  1371. case LMQCP_CTOS_UNSUBSCRIBE:
  1372. hdr.bits |= 0x02;
  1373. break;
  1374. default:
  1375. return -1;
  1376. }
  1377. *p = hdr.bits;
  1378. return 0;
  1379. }
  1380. /*
  1381. * This fires if the wsi did a PUBLISH under QoS1, but no PUBACK came before
  1382. * the timeout period
  1383. */
  1384. static void
  1385. lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
  1386. {
  1387. struct _lws_mqtt_related *mqtt = lws_container_of(sul,
  1388. struct _lws_mqtt_related, sul_qos1_puback_wait);
  1389. lwsl_notice("%s: wsi %p\n", __func__, mqtt->wsi);
  1390. if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
  1391. mqtt->wsi->user_space, NULL, 0))
  1392. lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
  1393. }
  1394. int
  1395. lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
  1396. const void *buf, uint32_t len, int is_complete)
  1397. {
  1398. struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
  1399. uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
  1400. struct lws *nwsi = lws_get_network_wsi(wsi);
  1401. lws_mqtt_str_t mqtt_vh_payload;
  1402. uint32_t vh_len, rem_len;
  1403. assert(pub->topic);
  1404. lwsl_debug("%s: len = %d, is_complete = %d\n",
  1405. __func__, (int)len, (int)is_complete);
  1406. if (lwsi_state(wsi) != LRS_ESTABLISHED) {
  1407. lwsl_err("%s: wsi %p: unknown state 0x%x\n", __func__, wsi,
  1408. lwsi_state(wsi));
  1409. assert(0);
  1410. return 1;
  1411. }
  1412. if (wsi->mqtt->inside_payload) {
  1413. /*
  1414. * Headers are filled, we are sending
  1415. * the payload - a buffer with LWS_PRE
  1416. * in front it.
  1417. */
  1418. start = (uint8_t *)buf;
  1419. p = start + len;
  1420. if (is_complete)
  1421. wsi->mqtt->inside_payload = 0;
  1422. goto do_write;
  1423. }
  1424. start = b + LWS_PRE;
  1425. p = start;
  1426. /*
  1427. * Fill headers and the first chunk of the
  1428. * payload (if any)
  1429. */
  1430. if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
  1431. 0, pub->qos, 0)) {
  1432. lwsl_err("%s: Failed to fill fixed header\n", __func__);
  1433. return 1;
  1434. }
  1435. /*
  1436. * Topic len field + Topic len + Packet ID
  1437. * (for QOS>0) + Payload len
  1438. */
  1439. vh_len = 2 + pub->topic_len + ((pub->qos) ? 2 : 0);
  1440. rem_len = vh_len + pub->payload_len;
  1441. lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
  1442. /* Will the chunk of payload fit? */
  1443. if ((vh_len + len) >=
  1444. (wsi->a.context->pt_serv_buf_size - LWS_PRE)) {
  1445. lwsl_err("%s: Payload is too big\n", __func__);
  1446. return 1;
  1447. }
  1448. p += lws_mqtt_vbi_encode(rem_len, p);
  1449. /* Topic's Len */
  1450. lws_ser_wu16be(p, pub->topic_len);
  1451. p += 2;
  1452. /*
  1453. * Init lws_mqtt_str for "MQTT Variable
  1454. * Headers + payload" (only the supplied
  1455. * chuncked payload)
  1456. */
  1457. lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
  1458. (pub->topic_len + ((pub->qos) ? 2 : 0) + len),
  1459. 0);
  1460. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1461. lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
  1462. if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
  1463. lwsl_err("%s: a\n", __func__);
  1464. return 1;
  1465. }
  1466. /* Packet ID */
  1467. if (pub->qos != QOS0) {
  1468. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1469. wsi->mqtt->ack_pkt_id = pub->packet_id = ++nwsi->mqtt->pkt_id;
  1470. lwsl_debug("%s: pkt_id = %d\n", __func__,
  1471. (int)wsi->mqtt->ack_pkt_id);
  1472. lws_ser_wu16be(p, pub->packet_id);
  1473. if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
  1474. lwsl_err("%s: b\n", __func__);
  1475. return 1;
  1476. }
  1477. }
  1478. /*
  1479. * A non-empty Payload is expected and a chunk
  1480. * is present
  1481. */
  1482. if (pub->payload_len && len) {
  1483. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1484. memcpy(p, buf, len);
  1485. if (lws_mqtt_str_advance(&mqtt_vh_payload, len))
  1486. return 1;
  1487. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1488. }
  1489. if (!is_complete)
  1490. nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
  1491. do_write:
  1492. // lwsl_hexdump_err(start, lws_ptr_diff(p, start));
  1493. if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
  1494. lws_ptr_diff(p, start)) {
  1495. lwsl_err("%s: write failed\n", __func__);
  1496. return 1;
  1497. }
  1498. if (!is_complete) {
  1499. /* still some more chunks to come... */
  1500. lws_callback_on_writable(wsi);
  1501. return 0;
  1502. }
  1503. wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
  1504. if (pub->qos != QOS0)
  1505. wsi->mqtt->unacked_publish = 1;
  1506. /* this was the last part of the publish message */
  1507. if (pub->qos == QOS0) {
  1508. /*
  1509. * There won't be any real PUBACK, act like we got one
  1510. * so the user callback logic is the same for QoS0 or
  1511. * QoS1
  1512. */
  1513. if (wsi->a.protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
  1514. wsi->user_space, NULL, 0)) {
  1515. lwsl_err("%s: ACK callback exited\n", __func__);
  1516. return 1;
  1517. }
  1518. return 0;
  1519. }
  1520. /* For QoS1, if no PUBACK coming after 3s, we must RETRY the publish */
  1521. wsi->mqtt->sul_qos1_puback_wait.cb = lws_mqtt_publish_resend;
  1522. __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
  1523. &wsi->mqtt->sul_qos1_puback_wait,
  1524. 3 * LWS_USEC_PER_SEC);
  1525. return 0;
  1526. }
  1527. int
  1528. lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
  1529. {
  1530. struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
  1531. uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
  1532. struct lws *nwsi = lws_get_network_wsi(wsi);
  1533. lws_mqtt_str_t mqtt_vh_payload;
  1534. uint8_t exists[8], extant;
  1535. lws_mqtt_subs_t *mysub;
  1536. uint32_t rem_len;
  1537. #if defined(_DEBUG)
  1538. uint32_t tops;
  1539. #endif
  1540. uint32_t n;
  1541. assert(sub->num_topics);
  1542. assert(sub->num_topics < sizeof(exists));
  1543. switch (lwsi_state(wsi)) {
  1544. case LRS_ESTABLISHED: /* Protocol connection established */
  1545. if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
  1546. 0, 0, 0)) {
  1547. lwsl_err("%s: Failed to fill fixed header\n", __func__);
  1548. return 1;
  1549. }
  1550. /*
  1551. * The stream wants to subscribe to one or more topic, but
  1552. * the shared nwsi may already be subscribed to some or all of
  1553. * them from interactions with other streams. For those cases,
  1554. * we filter them from the list the child wants until we just
  1555. * have ones that are new to the nwsi. If nothing left, we just
  1556. * synthesize the callback to the child as if SUBACK had come
  1557. * and we're done, otherwise just ask the server for topics that
  1558. * are new to the wsi.
  1559. */
  1560. extant = 0;
  1561. memset(&exists, 0, sizeof(exists));
  1562. for (n = 0; n < sub->num_topics; n++) {
  1563. lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
  1564. __func__, (int)n, sub->topic[n].name);
  1565. mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
  1566. if (mysub && mysub->ref_count) {
  1567. mysub->ref_count++; /* another stream using it */
  1568. exists[n] = 1;
  1569. extant++;
  1570. }
  1571. /*
  1572. * Attach the topic we're subscribing to, to wsi->mqtt
  1573. */
  1574. if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
  1575. lwsl_err("%s: create sub fail\n", __func__);
  1576. return 1;
  1577. }
  1578. }
  1579. if (extant == sub->num_topics) {
  1580. /*
  1581. * It turns out there's nothing to do here, the nwsi has
  1582. * already subscribed to all the topics this stream
  1583. * wanted. Just tell it it can have them.
  1584. */
  1585. lwsl_notice("%s: all topics already subscribed\n", __func__);
  1586. if (user_callback_handle_rxflow(
  1587. wsi->a.protocol->callback,
  1588. wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
  1589. wsi->user_space, NULL, 0) < 0) {
  1590. lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
  1591. __func__);
  1592. return -1;
  1593. }
  1594. return 0;
  1595. }
  1596. #if defined(_DEBUG)
  1597. /*
  1598. * zero or more of the topics already existed, but not all,
  1599. * so we must go to the server with a filtered list of the
  1600. * new ones only
  1601. */
  1602. tops = sub->num_topics - extant;
  1603. #endif
  1604. /*
  1605. * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
  1606. */
  1607. rem_len = 2;
  1608. for (n = 0; n < sub->num_topics; n++)
  1609. if (!exists[n])
  1610. rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
  1611. wsi->mqtt->sub_size = rem_len;
  1612. #if defined(_DEBUG)
  1613. lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
  1614. __func__, (int)tops, (int)rem_len);
  1615. #endif
  1616. p += lws_mqtt_vbi_encode(rem_len, p);
  1617. if ((rem_len + lws_ptr_diff(p, start)) >=
  1618. wsi->a.context->pt_serv_buf_size) {
  1619. lwsl_err("%s: Payload is too big\n", __func__);
  1620. return 1;
  1621. }
  1622. /* Init lws_mqtt_str */
  1623. lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, rem_len, 0);
  1624. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1625. /* Packet ID */
  1626. wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
  1627. lwsl_debug("%s: pkt_id = %d\n", __func__,
  1628. (int)wsi->mqtt->ack_pkt_id);
  1629. lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
  1630. if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
  1631. return 1;
  1632. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1633. for (n = 0; n < sub->num_topics; n++) {
  1634. lwsl_info("%s: topics[%d] = %s\n", __func__,
  1635. (int)n, sub->topic[n].name);
  1636. /* if the nwsi already has it, don't ask server for it */
  1637. if (exists[n]) {
  1638. lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
  1639. __func__, (int)n, sub->topic[n].name);
  1640. continue;
  1641. }
  1642. /*
  1643. * Attach the topic we're subscribing to, to nwsi->mqtt
  1644. * so we know the nwsi itself has a subscription to it
  1645. */
  1646. if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
  1647. return 1;
  1648. /* Topic's Len */
  1649. lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
  1650. if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
  1651. return 1;
  1652. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1653. /* Topic Name */
  1654. lws_strncpy((char *)p, sub->topic[n].name,
  1655. strlen(sub->topic[n].name) + 1);
  1656. if (lws_mqtt_str_advance(&mqtt_vh_payload,
  1657. (int)strlen(sub->topic[n].name)))
  1658. return 1;
  1659. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1660. /* QoS */
  1661. *p = sub->topic[n].qos;
  1662. if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
  1663. return 1;
  1664. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1665. }
  1666. break;
  1667. default:
  1668. return 1;
  1669. }
  1670. if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
  1671. lws_ptr_diff(p, start))
  1672. return 1;
  1673. wsi->mqtt->inside_subscribe = 1;
  1674. return 0;
  1675. }
  1676. int
  1677. lws_mqtt_client_send_unsubcribe(struct lws *wsi,
  1678. const lws_mqtt_subscribe_param_t *unsub)
  1679. {
  1680. struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
  1681. uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
  1682. struct lws *nwsi = lws_get_network_wsi(wsi);
  1683. lws_mqtt_str_t mqtt_vh_payload;
  1684. uint8_t send_unsub[8], orphaned;
  1685. uint32_t rem_len, n;
  1686. lws_mqtt_subs_t *mysub;
  1687. #if defined(_DEBUG)
  1688. uint32_t tops;
  1689. #endif
  1690. lwsl_info("%s: Enter\n", __func__);
  1691. switch (lwsi_state(wsi)) {
  1692. case LRS_ESTABLISHED: /* Protocol connection established */
  1693. orphaned = 0;
  1694. memset(&send_unsub, 0, sizeof(send_unsub));
  1695. for (n = 0; n < unsub->num_topics; n++) {
  1696. mysub = lws_mqtt_find_sub(nwsi->mqtt,
  1697. unsub->topic[n].name);
  1698. assert(mysub);
  1699. if (mysub && --mysub->ref_count == 0) {
  1700. lwsl_notice("%s: Need to send UNSUB\n", __func__);
  1701. send_unsub[n] = 1;
  1702. orphaned++;
  1703. }
  1704. }
  1705. if (!orphaned) {
  1706. /*
  1707. * The nwsi still has other subscribers bound to the
  1708. * topics.
  1709. *
  1710. * So, don't send UNSUB to server, and just fake the
  1711. * UNSUB ACK event for the guy going away.
  1712. */
  1713. lwsl_notice("%s: unsubscribed!\n", __func__);
  1714. if (user_callback_handle_rxflow(
  1715. wsi->a.protocol->callback,
  1716. wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
  1717. wsi->user_space, NULL, 0) < 0) {
  1718. /*
  1719. * We can't directly close here, because the
  1720. * caller still has the wsi. Inform the
  1721. * caller that we want to close
  1722. */
  1723. return 1;
  1724. }
  1725. return 0;
  1726. }
  1727. #if defined(_DEBUG)
  1728. /*
  1729. * one or more of the topics needs to be unsubscribed
  1730. * from, so we must go to the server with a filtered
  1731. * list of the new ones only
  1732. */
  1733. tops = orphaned;
  1734. #endif
  1735. if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
  1736. 0, 0, 0)) {
  1737. lwsl_err("%s: Failed to fill fixed header\n", __func__);
  1738. return 1;
  1739. }
  1740. /*
  1741. * Pid + (Topic len field + Topic len) x Num of Topics
  1742. */
  1743. rem_len = 2;
  1744. for (n = 0; n < unsub->num_topics; n++)
  1745. if (send_unsub[n])
  1746. rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
  1747. wsi->mqtt->sub_size = rem_len;
  1748. lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
  1749. __func__, (int)tops, (int)rem_len);
  1750. p += lws_mqtt_vbi_encode(rem_len, p);
  1751. if ((rem_len + lws_ptr_diff(p, start)) >=
  1752. wsi->a.context->pt_serv_buf_size) {
  1753. lwsl_err("%s: Payload is too big\n", __func__);
  1754. return 1;
  1755. }
  1756. /* Init lws_mqtt_str */
  1757. lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, rem_len, 0);
  1758. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1759. /* Packet ID */
  1760. wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
  1761. lwsl_debug("%s: pkt_id = %d\n", __func__,
  1762. (int)wsi->mqtt->ack_pkt_id);
  1763. lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
  1764. if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
  1765. return 1;
  1766. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1767. for (n = 0; n < unsub->num_topics; n++) {
  1768. lwsl_info("%s: topics[%d] = %s\n", __func__,
  1769. (int)n, unsub->topic[n].name);
  1770. /*
  1771. * Subscriber still bound to it, don't UBSUB
  1772. * from the server
  1773. */
  1774. if (!send_unsub[n])
  1775. continue;
  1776. /* Topic's Len */
  1777. lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
  1778. if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
  1779. return 1;
  1780. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1781. /* Topic Name */
  1782. lws_strncpy((char *)p, unsub->topic[n].name,
  1783. strlen(unsub->topic[n].name) + 1);
  1784. if (lws_mqtt_str_advance(&mqtt_vh_payload,
  1785. (int)strlen(unsub->topic[n].name)))
  1786. return 1;
  1787. p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
  1788. }
  1789. break;
  1790. default:
  1791. return 1;
  1792. }
  1793. if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
  1794. lws_ptr_diff(p, start))
  1795. return 1;
  1796. wsi->mqtt->inside_unsubscribe = 1;
  1797. return 0;
  1798. }
  1799. /*
  1800. * This is called when child streams bind to an already-existing and compatible
  1801. * MQTT stream
  1802. */
  1803. struct lws *
  1804. lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
  1805. {
  1806. /* no more children allowed by parent? */
  1807. if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
  1808. lwsl_err("%s: reached concurrent stream limit\n", __func__);
  1809. return NULL;
  1810. }
  1811. #if defined(LWS_WITH_CLIENT)
  1812. wsi->client_mux_substream = 1;
  1813. #endif
  1814. lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
  1815. if (lws_ensure_user_space(wsi))
  1816. goto bail1;
  1817. lws_mqtt_set_client_established(wsi);
  1818. lws_callback_on_writable(wsi);
  1819. #if defined(LWS_WITH_SERVER_STATUS)
  1820. wsi->a.vhost->conn_stats.mqtt_subs++;
  1821. #endif
  1822. return wsi;
  1823. bail1:
  1824. /* undo the insert */
  1825. parent_wsi->mux.child_list = wsi->mux.sibling_list;
  1826. parent_wsi->mux.child_count--;
  1827. if (wsi->user_space)
  1828. lws_free_set_NULL(wsi->user_space);
  1829. wsi->a.protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
  1830. lws_free(wsi);
  1831. return NULL;
  1832. }