ss-mqtt.c 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349
  1. /*
  2. * libwebsockets - small server side websockets and web server implementation
  3. *
  4. * Copyright (C) 2019 - 2020 Andy Green <[email protected]>
  5. *
  6. * Permission is hereby granted, free of charge, to any person obtaining a copy
  7. * of this software and associated documentation files (the "Software"), to
  8. * deal in the Software without restriction, including without limitation the
  9. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10. * sell copies of the Software, and to permit persons to whom the Software is
  11. * furnished to do so, subject to the following conditions:
  12. *
  13. * The above copyright notice and this permission notice shall be included in
  14. * all copies or substantial portions of the Software.
  15. *
  16. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  21. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  22. * IN THE SOFTWARE.
  23. */
  24. #include <private-lib-core.h>
  25. static int
  26. secstream_mqtt(struct lws *wsi, enum lws_callback_reasons reason, void *user,
  27. void *in, size_t len)
  28. {
  29. lws_ss_handle_t *h = (lws_ss_handle_t *)lws_get_opaque_user_data(wsi);
  30. lws_mqtt_publish_param_t mqpp, *pmqpp;
  31. uint8_t buf[LWS_PRE + 1400];
  32. lws_ss_state_return_t r;
  33. size_t buflen;
  34. int f = 0;
  35. switch (reason) {
  36. /* because we are protocols[0] ... */
  37. case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
  38. lwsl_info("%s: CLIENT_CONNECTION_ERROR: %s\n", __func__,
  39. in ? (char *)in : "(null)");
  40. if (!h)
  41. break;
  42. r = lws_ss_event_helper(h, LWSSSCS_UNREACHABLE);
  43. h->wsi = NULL;
  44. if (h->u.mqtt.heap_baggage) {
  45. lws_free(h->u.mqtt.heap_baggage);
  46. h->u.mqtt.heap_baggage = NULL;
  47. }
  48. if (r == LWSSSSRET_DESTROY_ME)
  49. return _lws_ss_handle_state_ret(r, wsi, &h);
  50. r = lws_ss_backoff(h);
  51. if (r != LWSSSSRET_OK)
  52. return _lws_ss_handle_state_ret(r, wsi, &h);
  53. break;
  54. case LWS_CALLBACK_MQTT_CLIENT_CLOSED:
  55. if (!h)
  56. break;
  57. lws_sul_cancel(&h->sul_timeout);
  58. r= lws_ss_event_helper(h, LWSSSCS_DISCONNECTED);
  59. if (h->wsi)
  60. lws_set_opaque_user_data(h->wsi, NULL);
  61. h->wsi = NULL;
  62. if (h->u.mqtt.heap_baggage) {
  63. lws_free(h->u.mqtt.heap_baggage);
  64. h->u.mqtt.heap_baggage = NULL;
  65. }
  66. if (r)
  67. return _lws_ss_handle_state_ret(r, wsi, &h);
  68. if (h->policy && !(h->policy->flags & LWSSSPOLF_OPPORTUNISTIC) &&
  69. !h->txn_ok && !wsi->a.context->being_destroyed) {
  70. r = lws_ss_backoff(h);
  71. if (r != LWSSSSRET_OK)
  72. return _lws_ss_handle_state_ret(r, wsi, &h);
  73. }
  74. break;
  75. case LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED:
  76. /*
  77. * Make sure the handle wsi points to the stream wsi not the
  78. * original nwsi, in the case it was migrated
  79. */
  80. h->wsi = wsi;
  81. h->retry = 0;
  82. h->seqstate = SSSEQ_CONNECTED;
  83. lws_sul_cancel(&h->sul);
  84. r = lws_ss_event_helper(h, LWSSSCS_CONNECTED);
  85. if (r != LWSSSSRET_OK)
  86. return _lws_ss_handle_state_ret(r, wsi, &h);
  87. if (h->policy->u.mqtt.topic)
  88. lws_callback_on_writable(wsi);
  89. break;
  90. case LWS_CALLBACK_MQTT_CLIENT_RX:
  91. // lwsl_user("LWS_CALLBACK_CLIENT_RECEIVE: read %d\n", (int)len);
  92. if (!h || !h->info.rx)
  93. return 0;
  94. pmqpp = (lws_mqtt_publish_param_t *)in;
  95. f = 0;
  96. if (!pmqpp->payload_pos)
  97. f |= LWSSS_FLAG_SOM;
  98. if (pmqpp->payload_pos + len == pmqpp->payload_len)
  99. f |= LWSSS_FLAG_EOM;
  100. h->subseq = 1;
  101. r = h->info.rx(ss_to_userobj(h), (const uint8_t *)pmqpp->payload,
  102. len, f);
  103. if (r != LWSSSSRET_OK)
  104. return _lws_ss_handle_state_ret(r, wsi, &h);
  105. return 0; /* don't passthru */
  106. case LWS_CALLBACK_MQTT_SUBSCRIBED:
  107. wsi->mqtt->done_subscribe = 1;
  108. lws_callback_on_writable(wsi);
  109. break;
  110. case LWS_CALLBACK_MQTT_ACK:
  111. lws_sul_cancel(&h->sul_timeout);
  112. r = lws_ss_event_helper(h, LWSSSCS_QOS_ACK_REMOTE);
  113. if (r != LWSSSSRET_OK)
  114. return _lws_ss_handle_state_ret(r, wsi, &h);
  115. break;
  116. case LWS_CALLBACK_MQTT_CLIENT_WRITEABLE:
  117. if (!h || !h->info.tx)
  118. return 0;
  119. lwsl_notice("%s: ss %p: WRITEABLE\n", __func__, h);
  120. if (h->seqstate != SSSEQ_CONNECTED) {
  121. lwsl_warn("%s: seqstate %d\n", __func__, h->seqstate);
  122. break;
  123. }
  124. if (h->policy->u.mqtt.subscribe &&
  125. !wsi->mqtt->done_subscribe) {
  126. /*
  127. * The policy says to subscribe to something, and we
  128. * haven't done it yet. Do it using the pre-prepared
  129. * string-substituted version of the policy string.
  130. */
  131. lwsl_notice("%s: subscribing %s\n", __func__,
  132. h->u.mqtt.subscribe_to);
  133. memset(&h->u.mqtt.sub_top, 0, sizeof(h->u.mqtt.sub_top));
  134. h->u.mqtt.sub_top.name = h->u.mqtt.subscribe_to;
  135. h->u.mqtt.sub_top.qos = h->policy->u.mqtt.qos;
  136. memset(&h->u.mqtt.sub_info, 0, sizeof(h->u.mqtt.sub_info));
  137. h->u.mqtt.sub_info.num_topics = 1;
  138. h->u.mqtt.sub_info.topic = &h->u.mqtt.sub_top;
  139. if (lws_mqtt_client_send_subcribe(wsi, &h->u.mqtt.sub_info)) {
  140. lwsl_notice("%s: unable to subscribe", __func__);
  141. return -1;
  142. }
  143. return 0;
  144. }
  145. buflen = sizeof(buf) - LWS_PRE;
  146. r = h->info.tx(ss_to_userobj(h), h->txord++, buf + LWS_PRE,
  147. &buflen, &f);
  148. if (r == LWSSSSRET_TX_DONT_SEND)
  149. return 0;
  150. if (r < 0)
  151. return _lws_ss_handle_state_ret(r, wsi, &h);
  152. memset(&mqpp, 0, sizeof(mqpp));
  153. /* this is the string-substituted h->policy->u.mqtt.topic */
  154. mqpp.topic = (char *)h->u.mqtt.topic_qos.name;
  155. mqpp.topic_len = strlen(mqpp.topic);
  156. mqpp.packet_id = h->txord - 1;
  157. mqpp.payload = buf + LWS_PRE;
  158. if (h->writeable_len)
  159. mqpp.payload_len = h->writeable_len;
  160. else
  161. mqpp.payload_len = buflen;
  162. lwsl_notice("%s: payload len %d\n", __func__,
  163. (int)mqpp.payload_len);
  164. mqpp.qos = h->policy->u.mqtt.qos;
  165. if (lws_mqtt_client_send_publish(wsi, &mqpp,
  166. (const char *)buf + LWS_PRE, buflen,
  167. f & LWSSS_FLAG_EOM)) {
  168. lwsl_notice("%s: failed to publish\n", __func__);
  169. return -1;
  170. }
  171. return 0;
  172. default:
  173. break;
  174. }
  175. return lws_callback_http_dummy(wsi, reason, user, in, len);
  176. }
  177. const struct lws_protocols protocol_secstream_mqtt = {
  178. "lws-secstream-mqtt",
  179. secstream_mqtt,
  180. 0,
  181. 0,
  182. };
  183. /*
  184. * Munge connect info according to protocol-specific considerations... this
  185. * usually means interpreting aux in a protocol-specific way and using the
  186. * pieces at connection setup time, eg, http url pieces.
  187. *
  188. * len bytes of buf can be used for things with scope until after the actual
  189. * connect.
  190. *
  191. * For ws, protocol aux is <url path>;<ws subprotocol name>
  192. */
  193. enum {
  194. SSCMM_STRSUB_WILL_TOPIC,
  195. SSCMM_STRSUB_WILL_MESSAGE,
  196. SSCMM_STRSUB_SUBSCRIBE,
  197. SSCMM_STRSUB_TOPIC
  198. };
  199. static int
  200. secstream_connect_munge_mqtt(lws_ss_handle_t *h, char *buf, size_t len,
  201. struct lws_client_connect_info *i,
  202. union lws_ss_contemp *ct)
  203. {
  204. const char *sources[4] = {
  205. /* we're going to string-substitute these before use */
  206. h->policy->u.mqtt.will_topic,
  207. h->policy->u.mqtt.will_message,
  208. h->policy->u.mqtt.subscribe,
  209. h->policy->u.mqtt.topic
  210. };
  211. size_t used_in, olen[4] = { 0, 0, 0, 0 }, tot = 0;
  212. lws_strexp_t exp;
  213. char *p, *ps[4];
  214. int n;
  215. memset(&ct->ccp, 0, sizeof(ct->ccp));
  216. ct->ccp.client_id = "lwsMqttClient";
  217. ct->ccp.keep_alive = h->policy->u.mqtt.keep_alive;
  218. ct->ccp.clean_start = h->policy->u.mqtt.clean_start;
  219. ct->ccp.will_param.qos = h->policy->u.mqtt.will_qos;
  220. ct->ccp.will_param.retain = h->policy->u.mqtt.will_retain;
  221. h->u.mqtt.topic_qos.qos = h->policy->u.mqtt.qos;
  222. /*
  223. * We're going to string-substitute several of these parameters, which
  224. * have unknown, possibly large size. And, as their usage is deferred
  225. * inside the asynchronous lifetime of the MQTT connection, they need
  226. * to live on the heap.
  227. *
  228. * Notice these allocations at h->u.mqtt.heap_baggage belong to the
  229. * underlying MQTT stream lifetime, not the logical SS lifetime, and
  230. * are destroyed if present at connection error or close of the
  231. * underlying connection.
  232. *
  233. *
  234. * First, compute the length of each without producing strsubst output,
  235. * and keep a running total.
  236. */
  237. for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
  238. lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
  239. NULL, (size_t)-1);
  240. if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
  241. &used_in, &olen[n]) != LSTRX_DONE) {
  242. lwsl_err("%s: failed to subsitute %s\n", __func__,
  243. sources[n]);
  244. return 1;
  245. }
  246. tot += olen[n] + 1;
  247. }
  248. /*
  249. * Then, allocate enough space on the heap for the total of the
  250. * substituted results
  251. */
  252. h->u.mqtt.heap_baggage = lws_malloc(tot, __func__);
  253. if (!h->u.mqtt.heap_baggage)
  254. return 1;
  255. /*
  256. * Finally, issue the subsitutions one after the other into the single
  257. * allocated result buffer and prepare pointers into them
  258. */
  259. p = h->u.mqtt.heap_baggage;
  260. for (n = 0; n < (int)LWS_ARRAY_SIZE(sources); n++) {
  261. lws_strexp_init(&exp, (void *)h, lws_ss_exp_cb_metadata,
  262. p, (size_t)-1);
  263. ps[n] = p;
  264. if (lws_strexp_expand(&exp, sources[n], strlen(sources[n]),
  265. &used_in, &olen[n]) != LSTRX_DONE)
  266. return 1;
  267. p += olen[n] + 1;
  268. }
  269. /*
  270. * Point the guys who want the substituted content at the substituted
  271. * strings
  272. */
  273. ct->ccp.will_param.topic = ps[SSCMM_STRSUB_WILL_TOPIC];
  274. ct->ccp.will_param.message = ps[SSCMM_STRSUB_WILL_MESSAGE];
  275. h->u.mqtt.subscribe_to = ps[SSCMM_STRSUB_SUBSCRIBE];
  276. h->u.mqtt.subscribe_to_len = olen[SSCMM_STRSUB_SUBSCRIBE];
  277. h->u.mqtt.topic_qos.name = ps[SSCMM_STRSUB_TOPIC];
  278. i->method = "MQTT";
  279. i->mqtt_cp = &ct->ccp;
  280. i->alpn = "x-amzn-mqtt-ca";
  281. /* share connections where possible */
  282. i->ssl_connection |= LCCSCF_PIPELINE;
  283. return 0;
  284. }
  285. const struct ss_pcols ss_pcol_mqtt = {
  286. "MQTT",
  287. "x-amzn-mqtt-ca", //"mqtt/3.1.1",
  288. &protocol_secstream_mqtt,
  289. secstream_connect_munge_mqtt
  290. };