private-lib-roles-mqtt.h 10 KB


  1. /*
  2. * libwebsockets - small server side websockets and web server implementation
  3. *
  4. * Copyright (C) 2010 - 2020 Andy Green <[email protected]>
  5. *
  6. * Permission is hereby granted, free of charge, to any person obtaining a copy
  7. * of this software and associated documentation files (the "Software"), to
  8. * deal in the Software without restriction, including without limitation the
  9. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10. * sell copies of the Software, and to permit persons to whom the Software is
  11. * furnished to do so, subject to the following conditions:
  12. *
  13. * The above copyright notice and this permission notice shall be included in
  14. * all copies or substantial portions of the Software.
  15. *
  16. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  21. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  22. * IN THE SOFTWARE.
  23. */
  24. #ifndef _PRIVATE_LIB_ROLES_MQTT
  25. #define _PRIVATE_LIB_ROLES_MQTT 1
  26. extern struct lws_role_ops role_ops_mqtt;
  27. #define lwsi_role_mqtt(wsi) (wsi->role_ops == &role_ops_mqtt)
  28. #define LWS_MQTT_MAX_CHILDREN 8 /* max child streams on same parent */
  29. #define LMQCP_LUT_FLAG_RESERVED_FLAGS 0x10
  30. #define LMQCP_LUT_FLAG_PACKET_ID_NONE 0x00
  31. #define LMQCP_LUT_FLAG_PACKET_ID_HAS 0x20
  32. #define LMQCP_LUT_FLAG_PACKET_ID_QOS12 0x40
  33. #define LMQCP_LUT_FLAG_PACKET_ID_MASK 0x60
  34. #define LMQCP_LUT_FLAG_PAYLOAD 0x80 /* payload req (publish = opt)*/
  35. #define lws_mqtt_str_is_not_empty(s) ( ((s)) && \
  36. ((s))->len && \
  37. ((s))->buf && \
  38. *((s))->buf )
  39. #define LWS_MQTT_RESPONSE_TIMEOUT (3 * LWS_US_PER_SEC)
  40. #define LWS_MQTT_RETRY_CEILING (60 * LWS_US_PER_SEC)
  41. typedef enum {
  42. LMSPR_COMPLETED = 0,
  43. LMSPR_NEED_MORE = 1,
  44. LMSPR_FAILED_OOM = -1,
  45. LMSPR_FAILED_OVERSIZE = -2,
  46. LMSPR_FAILED_FORMAT = -3,
  47. LMSPR_FAILED_ALREADY_COMPLETED = -4,
  48. } lws_mqtt_stateful_primitive_return_t;
  49. typedef struct {
  50. uint32_t value;
  51. char budget;
  52. char consumed;
  53. } lws_mqtt_vbi;
  54. /* works for vbi, 2-byte and 4-byte fixed length */
  55. static inline int
  56. lws_mqtt_mb_first(lws_mqtt_vbi *vbi) { return !vbi->consumed; }
  57. int
  58. lws_mqtt_vbi_encode(uint32_t value, void *buf);
  59. /*
  60. * Decode is done statefully on an arbitrary amount of input data (which may
  61. * be one byte). It's like this so it can continue seamlessly if a buffer ends
  62. * partway through the primitive, and the api matches the bulk binary data case.
  63. *
  64. * VBI decode:
  65. *
  66. * Initialize the lws_mqtt_vbi state by calling lws_mqtt_vbi_init() on it, then
  67. * feed lws_mqtt_vbi_r() bytes to decode.
  68. *
  69. * Returns <0 for error, LMSPR_COMPLETED if done (vbi->value is valid), or
  70. * LMSPR_NEED_MORE if more calls to lws_mqtt_vbi_r() with subsequent bytes
  71. * needed.
  72. *
  73. * *in and *len are updated accordingly.
  74. *
  75. * 2-byte and 4-byte decode:
  76. *
  77. * Initialize the lws_mqtt_vbi state by calling lws_mqtt_2byte_init() or
  78. * lws_mqtt_4byte_init() on it, then feed lws_mqtt_mb_parse() bytes
  79. * to decode.
  80. *
  81. * Returns <0 for error, LMSPR_COMPLETED if done (vbi->value is valid), or
  82. * LMSPR_NEED_MORE if more calls to lws_mqtt_mb_parse() with subsequent
  83. * bytes needed.
  84. *
  85. * *in and *len are updated accordingly.
  86. */
  87. void
  88. lws_mqtt_vbi_init(lws_mqtt_vbi *vbi);
  89. void
  90. lws_mqtt_2byte_init(lws_mqtt_vbi *vbi);
  91. void
  92. lws_mqtt_4byte_init(lws_mqtt_vbi *vbi);
  93. lws_mqtt_stateful_primitive_return_t
  94. lws_mqtt_vbi_r(lws_mqtt_vbi *vbi, const uint8_t **in, size_t *len);
  95. lws_mqtt_stateful_primitive_return_t
  96. lws_mqtt_mb_parse(lws_mqtt_vbi *vbi, const uint8_t **in, size_t *len);
  97. struct lws_mqtt_str_st {
  98. uint8_t *buf;
  99. uint16_t len;
  100. uint16_t limit; /* it's cheaper to add the state here than
  101. * the pointer to point to it elsewhere */
  102. uint16_t pos;
  103. char len_valid;
  104. char needs_freeing;
  105. };
  106. static inline int
  107. lws_mqtt_str_first(struct lws_mqtt_str_st *s) { return !s->buf && !s->pos; }
  108. lws_mqtt_stateful_primitive_return_t
  109. lws_mqtt_str_parse(struct lws_mqtt_str_st *bd, const uint8_t **in, size_t *len);
  110. typedef enum {
  111. LMQCPP_IDLE,
  112. /* receive packet type part of fixed header took us out of idle... */
  113. LMQCPP_CONNECT_PACKET = LMQCP_CTOS_CONNECT << 4,
  114. LMQCPP_CONNECT_REMAINING_LEN_VBI,
  115. LMQCPP_CONNECT_VH_PNAME,
  116. LMQCPP_CONNECT_VH_PVERSION,
  117. LMQCPP_CONNECT_VH_FLAGS,
  118. LMQCPP_CONNECT_VH_KEEPALIVE,
  119. LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN,
  120. LMQCPP_CONNACK_PACKET = LMQCP_STOC_CONNACK << 4,
  121. LMQCPP_CONNACK_VH_FLAGS,
  122. LMQCPP_CONNACK_VH_RETURN_CODE,
  123. LMQCPP_PUBLISH_PACKET = LMQCP_PUBLISH << 4,
  124. LMQCPP_PUBLISH_REMAINING_LEN_VBI,
  125. LMQCPP_PUBLISH_VH_TOPIC,
  126. LMQCPP_PUBLISH_VH_PKT_ID,
  127. LMQCPP_PUBACK_PACKET = LMQCP_PUBACK << 4,
  128. LMQCPP_PUBACK_VH_PKT_ID,
  129. LMQCPP_PUBACK_PROPERTIES_LEN_VBI,
  130. LMQCPP_SUBACK_PACKET = LMQCP_STOC_SUBACK << 4,
  131. LMQCPP_SUBACK_VH_PKT_ID,
  132. LMQCPP_SUBACK_PAYLOAD,
  133. LMQCPP_UNSUBACK_PACKET = LMQCP_STOC_UNSUBACK << 4,
  134. LMQCPP_UNSUBACK_VH_PKT_ID,
  135. LMQCPP_PINGRESP_ZERO = LMQCP_STOC_PINGRESP << 4,
  136. LMQCPP_PAYLOAD,
  137. LMQCPP_EAT_PROPERTIES_AND_COMPLETE,
  138. LMQCPP_PROP_ID_VBI,
  139. /* all possible property payloads */
  140. /* 3.3.2.3.2 */
  141. LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE = 0x101,
  142. LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE = 0x102,
  143. LMQCPP_PROP_CONTENT_TYPE_UTF8S = 0x103,
  144. LMQCPP_PROP_RESPONSE_TOPIC_UTF8S = 0x108,
  145. LMQCPP_PROP_CORRELATION_BINDATA = 0x109,
  146. LMQCPP_PROP_SUBSCRIPTION_ID_VBI = 0x10b,
  147. LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE = 0x111,
  148. LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S = 0x112,
  149. LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE = 0x113,
  150. LMQCPP_PROP_AUTH_METHOD_UTF8S = 0x115,
  151. LMQCPP_PROP_AUTH_DATA_BINDATA = 0x116,
  152. LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE = 0x117,
  153. LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE = 0x118,
  154. LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE = 0x119,
  155. LMQCPP_PROP_RESPONSE_INFO_UTF8S = 0x11a,
  156. LMQCPP_PROP_SERVER_REFERENCE_UTF8S = 0x11c,
  157. LMQCPP_PROP_REASON_STRING_UTF8S = 0x11f,
  158. LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE = 0x121,
  159. LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE = 0x122,
  160. LMQCPP_PROP_TOPIC_ALIAS_2BYTE = 0x123,
  161. LMQCPP_PROP_MAXIMUM_QOS_1BYTE = 0x124,
  162. LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE = 0x125,
  163. LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S = 0x126,
  164. LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S = 0x226,
  165. LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE = 0x127,
  166. LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE = 0x128,
  167. LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE = 0x129,
  168. LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE = 0x12a,
  169. } lws_mqtt_packet_parse_state_t;
  170. /*
  171. * the states an MQTT connection can be in
  172. */
  173. typedef enum {
  174. LGSMQTT_UNKNOWN,
  175. LGSMQTT_IDLE,
  176. LGSMQTT_TRANSPORT_CONNECTED,
  177. LGSMQTT_SENT_CONNECT,
  178. LGSMQTT_ESTABLISHED,
  179. LGSMQTT_SENT_SUBSCRIBE,
  180. LGSMQTT_SUBSCRIBED,
  181. } lwsgs_mqtt_states_t;
  182. typedef struct lws_mqtt_parser_st {
  183. /* struct lws_mqtt_str_st s_content_type; */
  184. lws_mqtt_packet_parse_state_t state;
  185. lws_mqtt_vbi vbit;
  186. lws_mqtt_reason_t reason;
  187. lws_mqtt_str_t s_temp;
  188. uint8_t fixed_seen[4];
  189. uint8_t props_seen[8];
  190. uint8_t cpkt_flags;
  191. uint32_t cpkt_remlen;
  192. uint32_t props_len;
  193. uint32_t consumed;
  194. uint32_t prop_id;
  195. uint32_t props_consumed;
  196. uint32_t payload_consumed;
  197. uint16_t keepalive;
  198. uint16_t cpkt_id;
  199. uint32_t n;
  200. uint8_t temp[32];
  201. uint8_t conn_rc;
  202. uint8_t payload_format;
  203. uint8_t packet_type_flags;
  204. uint8_t conn_protocol_version;
  205. uint8_t fixed;
  206. uint8_t flag_pending_send_connack_close:1;
  207. uint8_t flag_pending_send_reason_close:1;
  208. uint8_t flag_prop_multi:1;
  209. uint8_t flag_server:1;
  210. } lws_mqtt_parser_t;
  211. typedef struct lws_mqtt_subs {
  212. struct lws_mqtt_subs *next;
  213. uint8_t ref_count; /* number of children referencing */
  214. /* subscription name + NUL overallocated here */
  215. char topic[];
  216. } lws_mqtt_subs_t;
  217. typedef struct lws_mqtts {
  218. lws_mqtt_parser_t par;
  219. lwsgs_mqtt_states_t estate;
  220. struct lws_dll2 active_session_list_head;
  221. struct lws_dll2 limbo_session_list_head;
  222. } lws_mqtts_t;
  223. typedef struct lws_mqttc {
  224. lws_mqtt_parser_t par;
  225. lwsgs_mqtt_states_t estate;
  226. struct lws_mqtt_str_st *id;
  227. struct lws_mqtt_str_st *username;
  228. struct lws_mqtt_str_st *password;
  229. struct {
  230. struct lws_mqtt_str_st *topic;
  231. struct lws_mqtt_str_st *message;
  232. lws_mqtt_qos_levels_t qos;
  233. uint8_t retain;
  234. } will;
  235. uint16_t keep_alive_secs;
  236. uint8_t conn_flags;
  237. } lws_mqttc_t;
  238. struct _lws_mqtt_related {
  239. lws_mqttc_t client;
  240. lws_sorted_usec_list_t sul_qos1_puback_wait; /* QoS1 puback wait TO */
  241. struct lws *wsi; /**< so sul can use lws_container_of */
  242. lws_mqtt_subs_t *subs_head; /**< Linked-list of heap-allocated subscription objects */
  243. void *rx_cpkt_param;
  244. uint16_t pkt_id;
  245. uint16_t ack_pkt_id;
  246. uint16_t sub_size;
  247. #if defined(LWS_WITH_CLIENT)
  248. uint8_t send_pingreq:1;
  249. uint8_t session_resumed:1;
  250. #endif
  251. uint8_t inside_payload:1;
  252. uint8_t inside_subscribe:1;
  253. uint8_t inside_unsubscribe:1;
  254. uint8_t send_puback:1;
  255. uint8_t unacked_publish:1;
  256. uint8_t done_subscribe:1;
  257. };
  258. /*
  259. * New sessions are created by starting CONNECT. If the ClientID sent
  260. * by the client matches a different, extant session, then the
  261. * existing one is taken over and the new one created for duration of
  262. * CONNECT processing is destroyed.
  263. *
  264. * On the server side, bearing in mind multiple simultaneous,
  265. * fragmented CONNECTs may be interleaved ongoing, all state and
  266. * parsing temps for a session must live in the session object.
  267. */
  268. struct lws_mqtt_endpoint_st;
  269. typedef struct lws_mqtts_session_st {
  270. struct lws_dll2 session_list;
  271. } lws_mqtts_session_t;
  272. #define ctl_pkt_type(x) (x->packet_type_flags >> 4)
  273. void
  274. lws_mqttc_state_transition(lws_mqttc_t *ep, lwsgs_mqtt_states_t s);
  275. int
  276. _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
  277. const uint8_t *buf, size_t len);
  278. int
  279. lws_mqtt_client_socket_service(struct lws *wsi, struct lws_pollfd *pollfd,
  280. struct lws *wsi_conn);
  281. int
  282. lws_create_client_mqtt_object(const struct lws_client_connect_info *i,
  283. struct lws *wsi);
  284. struct lws *
  285. lws_mqtt_client_send_connect(struct lws *wsi);
  286. int
  287. lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
  288. uint8_t dup, lws_mqtt_qos_levels_t qos,
  289. uint8_t retain);
  290. struct lws *
  291. lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi);
  292. lws_mqtt_subs_t *
  293. lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic);
  294. #endif /* _PRIVATE_LIB_ROLES_MQTT */