| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125 |
- /*
- * libwebsockets - small server side websockets and web server implementation
- *
- * Copyright (C) 2010 - 2020 Andy Green <[email protected]>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to
- * deal in the Software without restriction, including without limitation the
- * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- * IN THE SOFTWARE.
- *
- * MQTT v5
- *
- * http://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
- *
- * Control Packet structure
- *
- * - Always: 2+ byte: Fixed Hdr
- * - Required in some: variable: Variable Hdr + [(CONNECT)Will Props] + Props
- * - Required in some: variable: Payload
- *
- * For CONNECT, the props if present MUST be in the order [MQTT-3.1.3-1]
- *
- * - Client Identifier
- * - Will Properties
- * - Will Topic
- * - Will Payload
- * - User Name
- * - Password
- */
- #include "private-lib-core.h"
- #include <string.h>
- #include <sys/types.h>
- #include <assert.h>
- typedef enum {
- LMQPRS_AWAITING_CONNECT,
- } lws_mqtt_protocol_server_connstate_t;
- const char * const reason_names_g1[] = {
- "Success / Normal disconnection / QoS0",
- "QoS1",
- "QoS2",
- "Disconnect Will",
- "No matching subscriber",
- "No subscription existed",
- "Continue authentication",
- "Re-authenticate"
- };
- const char * const reason_names_g2[] = {
- "Unspecified error",
- "Malformed packet",
- "Protocol error",
- "Implementation specific error",
- "Unsupported protocol",
- "Client ID invalid",
- "Bad credentials",
- "Not Authorized",
- "Server Unavailable",
- "Server Busy",
- "Banned",
- "Server Shutting Down",
- "Bad Authentication Method",
- "Keepalive Timeout",
- "Session taken over",
- "Topic Filter Invalid",
- "Packet ID in use",
- "Packet ID not found",
- "Max RX Exceeded",
- "Topic Alias Invalid",
- "Packet too large",
- "Ratelimit",
- "Quota Exceeded",
- "Administrative Action",
- "Payload format invalid",
- "Retain not supported",
- "QoS not supported",
- "Use another server",
- "Server Moved",
- "Shared subscriptions not supported",
- "Connection rate exceeded",
- "Maximum Connect Time",
- "Subscription IDs not supported",
- "Wildcard subscriptions not supported"
- };
- #define LMQCP_WILL_PROPERTIES 0
- /* For each property, a bitmap describing which commands it is valid for */
- static const uint16_t property_valid[] = {
- [LMQPROP_PAYLOAD_FORMAT_INDICATOR] = (1 << LMQCP_PUBLISH) |
- (1 << LMQCP_WILL_PROPERTIES),
- [LMQPROP_MESSAGE_EXPIRY_INTERVAL] = (1 << LMQCP_PUBLISH) |
- (1 << LMQCP_WILL_PROPERTIES),
- [LMQPROP_CONTENT_TYPE] = (1 << LMQCP_PUBLISH) |
- (1 << LMQCP_WILL_PROPERTIES),
- [LMQPROP_RESPONSE_TOPIC] = (1 << LMQCP_PUBLISH) |
- (1 << LMQCP_WILL_PROPERTIES),
- [LMQPROP_CORRELATION_DATA] = (1 << LMQCP_PUBLISH) |
- (1 << LMQCP_WILL_PROPERTIES),
- [LMQPROP_SUBSCRIPTION_IDENTIFIER] = (1 << LMQCP_PUBLISH) |
- (1 << LMQCP_CTOS_SUBSCRIBE),
- [LMQPROP_SESSION_EXPIRY_INTERVAL] = (1 << LMQCP_CTOS_CONNECT) |
- (1 << LMQCP_STOC_CONNACK) |
- (1 << LMQCP_DISCONNECT),
- [LMQPROP_ASSIGNED_CLIENT_IDENTIFIER] = (1 << LMQCP_STOC_CONNACK),
- [LMQPROP_SERVER_KEEP_ALIVE] = (1 << LMQCP_STOC_CONNACK),
- [LMQPROP_AUTHENTICATION_METHOD] = (1 << LMQCP_CTOS_CONNECT) |
- (1 << LMQCP_STOC_CONNACK) |
- (1 << LMQCP_AUTH),
- [LMQPROP_AUTHENTICATION_DATA] = (1 << LMQCP_CTOS_CONNECT) |
- (1 << LMQCP_STOC_CONNACK) |
- (1 << LMQCP_AUTH),
- [LMQPROP_REQUEST_PROBLEM_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
- [LMQPROP_WILL_DELAY_INTERVAL] = (1 << LMQCP_WILL_PROPERTIES),
- [LMQPROP_REQUEST_RESPONSE_INFORMATION] = (1 << LMQCP_CTOS_CONNECT),
- [LMQPROP_RESPONSE_INFORMATION] = (1 << LMQCP_STOC_CONNACK),
- [LMQPROP_SERVER_REFERENCE] = (1 << LMQCP_STOC_CONNACK) |
- (1 << LMQCP_DISCONNECT),
- [LMQPROP_REASON_STRING] = (1 << LMQCP_STOC_CONNACK) |
- (1 << LMQCP_PUBACK) |
- (1 << LMQCP_PUBREC) |
- (1 << LMQCP_PUBREL) |
- (1 << LMQCP_PUBCOMP) |
- (1 << LMQCP_STOC_SUBACK) |
- (1 << LMQCP_STOC_UNSUBACK) |
- (1 << LMQCP_DISCONNECT) |
- (1 << LMQCP_AUTH),
- [LMQPROP_RECEIVE_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
- (1 << LMQCP_STOC_CONNACK),
- [LMQPROP_TOPIC_ALIAS_MAXIMUM] = (1 << LMQCP_CTOS_CONNECT) |
- (1 << LMQCP_STOC_CONNACK),
- [LMQPROP_TOPIC_ALIAS] = (1 << LMQCP_PUBLISH),
- [LMQPROP_MAXIMUM_QOS] = (1 << LMQCP_STOC_CONNACK),
- [LMQPROP_RETAIN_AVAILABLE] = (1 << LMQCP_STOC_CONNACK),
- [LMQPROP_USER_PROPERTY] = (1 << LMQCP_CTOS_CONNECT) |
- (1 << LMQCP_STOC_CONNACK) |
- (1 << LMQCP_PUBLISH) |
- (1 << LMQCP_WILL_PROPERTIES) |
- (1 << LMQCP_PUBACK) |
- (1 << LMQCP_PUBREC) |
- (1 << LMQCP_PUBREL) |
- (1 << LMQCP_PUBCOMP) |
- (1 << LMQCP_CTOS_SUBSCRIBE) |
- (1 << LMQCP_STOC_SUBACK) |
- (1 << LMQCP_CTOS_UNSUBSCRIBE) |
- (1 << LMQCP_STOC_UNSUBACK) |
- (1 << LMQCP_DISCONNECT) |
- (1 << LMQCP_AUTH),
- [LMQPROP_MAXIMUM_PACKET_SIZE] = (1 << LMQCP_CTOS_CONNECT) |
- (1 << LMQCP_STOC_CONNACK),
- [LMQPROP_WILDCARD_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK),
- [LMQPROP_SUBSCRIPTION_IDENTIFIER_AVAIL] = (1 << LMQCP_STOC_CONNACK),
- [LMQPROP_SHARED_SUBSCRIPTION_AVAIL] = (1 << LMQCP_STOC_CONNACK)
- };
- /*
- * For each command index, maps flags, id, qos and payload legality
- * notice in most cases PUBLISH requires further processing
- */
- static const uint8_t map_flags[] = {
- [LMQCP_RESERVED] = 0x00,
- [LMQCP_CTOS_CONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PAYLOAD |
- LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
- [LMQCP_STOC_CONNACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
- [LMQCP_PUBLISH] = LMQCP_LUT_FLAG_PAYLOAD | /* option */
- LMQCP_LUT_FLAG_PACKET_ID_QOS12 | 0x00,
- [LMQCP_PUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
- [LMQCP_PUBREC] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
- [LMQCP_PUBREL] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
- [LMQCP_PUBCOMP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
- [LMQCP_CTOS_SUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PAYLOAD |
- LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
- [LMQCP_STOC_SUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PAYLOAD |
- LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x00,
- [LMQCP_CTOS_UNSUBSCRIBE] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PAYLOAD |
- LMQCP_LUT_FLAG_PACKET_ID_HAS | 0x02,
- [LMQCP_STOC_UNSUBACK] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PAYLOAD |
- LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
- [LMQCP_CTOS_PINGREQ] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
- [LMQCP_STOC_PINGRESP] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
- [LMQCP_DISCONNECT] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
- [LMQCP_AUTH] = LMQCP_LUT_FLAG_RESERVED_FLAGS |
- LMQCP_LUT_FLAG_PACKET_ID_NONE | 0x00,
- };
- void
- lws_mqttc_state_transition(lws_mqttc_t *c, lwsgs_mqtt_states_t s)
- {
- lwsl_debug("%s: ep %p: state %d -> %d\n", __func__, c, c->estate, s);
- c->estate = s;
- }
- static int
- lws_mqtt_pconsume(lws_mqtt_parser_t *par, int consumed)
- {
- par->consumed += consumed;
- if (par->consumed > par->props_len)
- return -1;
- /* more properties coming */
- if (par->consumed < par->props_len) {
- par->state = LMQCPP_PROP_ID_VBI;
- return 0;
- }
- /* properties finished: are we headed for payload or idle? */
- if ((map_flags[ctl_pkt_type(par)] & LMQCP_LUT_FLAG_PAYLOAD) &&
- /* A PUBLISH packet MUST NOT contain a Packet Identifier if
- * its QoS value is set to 0 [MQTT-2.2.1-2]. */
- (ctl_pkt_type(par) != LMQCP_PUBLISH ||
- (par->packet_type_flags & 6))) {
- par->state = LMQCPP_PAYLOAD;
- return 0;
- }
- par->state = LMQCPP_IDLE;
- return 0;
- }
- static int
- lws_mqtt_set_client_established(struct lws *wsi)
- {
- lws_role_transition(wsi, LWSIFR_CLIENT, LRS_ESTABLISHED,
- &role_ops_mqtt);
- if (user_callback_handle_rxflow(wsi->a.protocol->callback,
- wsi, LWS_CALLBACK_MQTT_CLIENT_ESTABLISHED,
- wsi->user_space, NULL, 0) < 0) {
- lwsl_err("%s: MQTT_ESTABLISHED failed\n", __func__);
- return -1;
- }
- /*
- * If we made a new connection and got the ACK, our connection is
- * definitely working in both directions at the moment
- */
- lws_validity_confirmed(wsi);
- /* clear connection timeout */
- lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
- return 0;
- }
- lws_mqtt_subs_t *
- lws_mqtt_find_sub(struct _lws_mqtt_related *mqtt, const char *topic)
- {
- lws_mqtt_subs_t *s = mqtt->subs_head;
- while (s) {
- if (!strcmp((const char *)s->topic, topic))
- return s;
- s = s->next;
- }
- return NULL;
- }
- static lws_mqtt_subs_t *
- lws_mqtt_create_sub(struct _lws_mqtt_related *mqtt, const char *topic)
- {
- lws_mqtt_subs_t *mysub;
- mysub = lws_malloc(sizeof(*mysub) + strlen(topic) + 1, "sub");
- if (!mysub)
- return NULL;
- mysub->next = mqtt->subs_head;
- mqtt->subs_head = mysub;
- memcpy(mysub->topic, topic, strlen(topic) + 1);
- mysub->ref_count = 1;
- lwsl_info("%s: Created mysub %p for wsi->mqtt %p\n",
- __func__, mysub, mqtt);
- return mysub;
- }
- static int
- lws_mqtt_client_remove_subs(struct _lws_mqtt_related *mqtt)
- {
- lws_mqtt_subs_t *s = mqtt->subs_head;
- lws_mqtt_subs_t *temp = NULL;
- lwsl_info("%s: Called to remove subs from wsi->mqtt %p\n",
- __func__, mqtt);
- while (s && s->next) {
- if (s->next->ref_count == 0)
- break;
- s = s->next;
- }
- if (s && s->next) {
- temp = s->next;
- lwsl_info("%s: Removing sub %p from wsi->mqtt %p\n",
- __func__, temp, mqtt);
- s->next = temp->next;
- lws_free(temp);
- return 0;
- }
- return 1;
- }
- int
- _lws_mqtt_rx_parser(struct lws *wsi, lws_mqtt_parser_t *par,
- const uint8_t *buf, size_t len)
- {
- struct lws *w;
- int n;
- if (par->flag_pending_send_reason_close)
- return 0;
- /*
- * Stateful, fragmentation-immune parser
- *
- * Notice that len can always be 1 if under attack, even over tls if
- * the server is compromised or malicious.
- */
- while (len) {
- lwsl_debug("%s: %d, len = %d\n", __func__, par->state, (int)len);
- switch (par->state) {
- case LMQCPP_IDLE:
- par->packet_type_flags = *buf++;
- len--;
- #if defined(LWS_WITH_CLIENT)
- /*
- * The case where we sent the connect, but we received
- * something else before any CONNACK
- */
- if (lwsi_state(wsi) == LRS_MQTTC_AWAIT_CONNACK &&
- par->packet_type_flags >> 4 != LMQCP_STOC_CONNACK) {
- lwsl_notice("%s: server sent non-CONNACK\n",
- __func__);
- goto send_protocol_error_and_close;
- }
- #endif /* LWS_WITH_CLIENT */
- n = map_flags[par->packet_type_flags >> 4];
- /*
- * Where a flag bit is marked as “Reserved”, it is
- * reserved for future use and MUST be set to the value
- * listed [MQTT-2.1.3-1].
- */
- if ((n & LMQCP_LUT_FLAG_RESERVED_FLAGS) &&
- ((par->packet_type_flags & 0x0f) != (n & 0x0f))) {
- lwsl_notice("%s: wsi %p: bad flags, 0x%02x mask 0x%02x (len %d)\n",
- __func__, wsi, par->packet_type_flags, n, (int)len + 1);
- lwsl_hexdump_err(buf - 1, len + 1);
- goto send_protocol_error_and_close;
- }
- lwsl_debug("%s: received pkt type 0x%x / flags 0x%x\n",
- __func__, par->packet_type_flags >> 4,
- par->packet_type_flags & 0xf);
- /* allows us to know if a property that can only be
- * given once, appears twice */
- memset(par->props_seen, 0, sizeof(par->props_seen));
- par->state = par->packet_type_flags & 0xf0;
- break;
- case LMQCPP_CONNECT_PACKET:
- lwsl_debug("%s: received CONNECT pkt\n", __func__);
- par->state = LMQCPP_CONNECT_REMAINING_LEN_VBI;
- lws_mqtt_vbi_init(&par->vbit);
- break;
- case LMQCPP_CONNECT_REMAINING_LEN_VBI:
- switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- par->cpkt_remlen = par->vbit.value;
- n = map_flags[ctl_pkt_type(par)];
- lws_mqtt_str_init(&par->s_temp, par->temp,
- sizeof(par->temp), 0);
- par->state = LMQCPP_CONNECT_VH_PNAME;
- break;
- default:
- lwsl_notice("%s: bad vbi\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_CONNECT_VH_PNAME:
- switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- if (par->s_temp.len != 4 ||
- memcmp(par->s_temp.buf, "MQTT",
- par->s_temp.len)) {
- lwsl_notice("%s: protocol name: %.*s\n",
- __func__, par->s_temp.len,
- par->s_temp.buf);
- goto send_unsupp_connack_and_close;
- }
- par->state = LMQCPP_CONNECT_VH_PVERSION;
- break;
- default:
- lwsl_notice("%s: bad protocol name\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_CONNECT_VH_PVERSION:
- par->conn_protocol_version = *buf++;
- len--;
- if (par->conn_protocol_version != 5) {
- lwsl_info("%s: unsupported MQTT version %d\n",
- __func__, par->conn_protocol_version);
- goto send_unsupp_connack_and_close;
- }
- par->state = LMQCPP_CONNECT_VH_FLAGS;
- break;
- case LMQCPP_CONNECT_VH_FLAGS:
- par->cpkt_flags = *buf++;
- len--;
- if (par->cpkt_flags & 1) {
- /*
- * The Server MUST validate that the reserved
- * flag in the CONNECT packet is set to 0
- * [MQTT-3.1.2-3].
- */
- par->reason = LMQCP_REASON_MALFORMED_PACKET;
- goto send_reason_and_close;
- }
- /*
- * conn_flags specifies the Will Properties that should
- * appear in the payload section
- */
- lws_mqtt_2byte_init(&par->vbit);
- par->state = LMQCPP_CONNECT_VH_KEEPALIVE;
- break;
- case LMQCPP_CONNECT_VH_KEEPALIVE:
- switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- par->keepalive = (uint16_t)par->vbit.value;
- lws_mqtt_vbi_init(&par->vbit);
- par->state = LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN;
- break;
- default:
- lwsl_notice("%s: ka bad vbi\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_PINGRESP_ZERO:
- len--;
- /* second byte of PINGRESP must be zero */
- if (*buf++)
- goto send_protocol_error_and_close;
- goto cmd_completion;
- case LMQCPP_CONNECT_VH_PROPERTIES_VBI_LEN:
- switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- /* reset consumption counter */
- par->consumed = 0;
- par->props_len = par->vbit.value;
- lws_mqtt_vbi_init(&par->vbit);
- par->state = LMQCPP_PROP_ID_VBI;
- break;
- default:
- lwsl_notice("%s: connpr bad vbi\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_PUBLISH_PACKET:
- if (lwsi_role_client(wsi) && wsi->mqtt->inside_subscribe) {
- lwsl_notice("%s: Topic rx before subscribing\n",
- __func__);
- goto send_protocol_error_and_close;
- }
- lwsl_info("%s: received PUBLISH pkt\n", __func__);
- par->state = LMQCPP_PUBLISH_REMAINING_LEN_VBI;
- lws_mqtt_vbi_init(&par->vbit);
- break;
- case LMQCPP_PUBLISH_REMAINING_LEN_VBI:
- switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- par->cpkt_remlen = par->vbit.value;
- lwsl_debug("%s: PUBLISH pkt len = %d\n",
- __func__, (int)par->cpkt_remlen);
- /* Move on to PUBLISH's variable header */
- par->state = LMQCPP_PUBLISH_VH_TOPIC;
- break;
- default:
- lwsl_notice("%s: pubrem bad vbi\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_PUBLISH_VH_TOPIC:
- {
- lws_mqtt_publish_param_t *pub = NULL;
- if (len < 2) {
- lwsl_notice("%s: topic too short\n", __func__);
- return -1;
- }
- /* Topic len */
- par->n = lws_ser_ru16be(buf);
- buf += 2;
- len -= 2;
- if (len < par->n) {/* the way this is written... */
- lwsl_notice("%s: len breakage\n", __func__);
- return -1;
- }
- /* Invalid topic len */
- if (par->n == 0) {
- lwsl_notice("%s: zero topic len\n", __func__);
- par->reason = LMQCP_REASON_MALFORMED_PACKET;
- goto send_reason_and_close;
- }
- lwsl_debug("%s: PUBLISH topic len %d\n",
- __func__, (int)par->n);
- assert(!wsi->mqtt->rx_cpkt_param);
- wsi->mqtt->rx_cpkt_param = lws_zalloc(
- sizeof(lws_mqtt_publish_param_t), "rx pub param");
- if (!wsi->mqtt->rx_cpkt_param)
- goto oom;
- pub = (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
- pub->topic_len = par->n;
- /* Topic Name */
- pub->topic = (char *)lws_zalloc((size_t)pub->topic_len + 1,
- "rx publish topic");
- if (!pub->topic)
- goto oom;
- lws_strncpy(pub->topic, (const char *)buf,
- (size_t)pub->topic_len + 1);
- buf += pub->topic_len;
- len -= pub->topic_len;
- /* Extract QoS Level from Fixed Header Flags */
- pub->qos = (lws_mqtt_qos_levels_t)
- ((par->packet_type_flags >> 1) & 0x3);
- pub->payload_pos = 0;
- pub->payload_len = par->cpkt_remlen -
- (2 + pub->topic_len + ((pub->qos) ? 2 : 0));
- switch (pub->qos) {
- case QOS0:
- par->state = LMQCPP_PAYLOAD;
- if (pub->payload_len == 0)
- goto cmd_completion;
- break;
- case QOS1:
- case QOS2:
- par->state = LMQCPP_PUBLISH_VH_PKT_ID;
- break;
- default:
- par->reason = LMQCP_REASON_MALFORMED_PACKET;
- lws_free_set_NULL(pub->topic);
- lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
- goto send_reason_and_close;
- }
- break;
- }
- case LMQCPP_PUBLISH_VH_PKT_ID:
- {
- lws_mqtt_publish_param_t *pub =
- (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
- if (len < 2) {
- lwsl_notice("%s: len breakage 2\n", __func__);
- return -1;
- }
- par->cpkt_id = lws_ser_ru16be(buf);
- buf += 2;
- len -= 2;
- wsi->mqtt->ack_pkt_id = par->cpkt_id;
- lwsl_debug("%s: Packet ID %d\n",
- __func__, (int)par->cpkt_id);
- par->state = LMQCPP_PAYLOAD;
- pub->payload_pos = 0;
- pub->payload_len = par->cpkt_remlen -
- (2 + pub->topic_len + ((pub->qos) ? 2 : 0));
- if (pub->payload_len == 0)
- goto cmd_completion;
- break;
- }
- case LMQCPP_PAYLOAD:
- {
- lws_mqtt_publish_param_t *pub =
- (lws_mqtt_publish_param_t *)wsi->mqtt->rx_cpkt_param;
- if (pub == NULL) {
- lwsl_err("%s: Uninitialized pub_param\n",
- __func__);
- goto send_protocol_error_and_close;
- }
- pub->payload = buf;
- goto cmd_completion;
- }
- case LMQCPP_CONNACK_PACKET:
- if (!lwsi_role_client(wsi)) {
- lwsl_err("%s: CONNACK is only Server to Client",
- __func__);
- goto send_unsupp_connack_and_close;
- }
- lwsl_debug("%s: received CONNACK pkt\n", __func__);
- lws_mqtt_vbi_init(&par->vbit);
- switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- par->cpkt_remlen = par->vbit.value;
- lwsl_debug("%s: CONNACK pkt len = %d\n",
- __func__, (int)par->cpkt_remlen);
- if (par->cpkt_remlen != 2)
- goto send_protocol_error_and_close;
- par->state = LMQCPP_CONNACK_VH_FLAGS;
- break;
- default:
- lwsl_notice("%s: connack bad vbi\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_CONNACK_VH_FLAGS:
- {
- lws_mqttc_t *c = &wsi->mqtt->client;
- par->cpkt_flags = *buf++;
- len--;
- if (par->cpkt_flags & ~LMQCFT_SESSION_PRESENT) {
- /*
- * Byte 1 is the "Connect Acknowledge
- * Flags". Bits 7-1 are reserved and
- * MUST be set to 0.
- */
- par->reason = LMQCP_REASON_MALFORMED_PACKET;
- goto send_reason_and_close;
- }
- /*
- * If the Server accepts a connection with
- * CleanSession set to 1, the Server MUST set
- * Session Present to 0 in the CONNACK packet
- * in addition to setting a zero return code
- * in the CONNACK packet [MQTT-3.2.2-1]. If
- * the Server accepts a connection with
- * CleanSession set to 0, the value set in
- * Session Present depends on whether the
- * Server already has stored Session state for
- * the supplied client ID. If the Server has
- * stored Session state, it MUST set
- * SessionPresent to 1 in the CONNACK packet
- * [MQTT-3.2.2-2]. If the Server does not have
- * stored Session state, it MUST set Session
- * Present to 0 in the CONNACK packet. This is
- * in addition to setting a zero return code
- * in the CONNACK packet [MQTT-3.2.2-3].
- */
- if ((c->conn_flags & LMQCFT_CLEAN_START) &&
- (par->cpkt_flags & LMQCFT_SESSION_PRESENT))
- goto send_protocol_error_and_close;
- wsi->mqtt->session_resumed = (par->cpkt_flags &
- LMQCFT_SESSION_PRESENT);
- /* Move on to Connect Return Code */
- par->state = LMQCPP_CONNACK_VH_RETURN_CODE;
- break;
- }
- case LMQCPP_CONNACK_VH_RETURN_CODE:
- par->conn_rc = *buf++;
- len--;
- /*
- * If a server sends a CONNACK packet containing a
- * non-zero return code it MUST then close the Network
- * Connection [MQTT-3.2.2-5]
- */
- switch (par->conn_rc) {
- case 0:
- goto cmd_completion;
- case 1:
- case 2:
- case 3:
- case 4:
- case 5:
- par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL +
- par->conn_rc - 1;
- goto send_reason_and_close;
- default:
- lwsl_notice("%s: bad connack retcode\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- /* SUBACK */
- case LMQCPP_SUBACK_PACKET:
- if (!lwsi_role_client(wsi)) {
- lwsl_err("%s: SUBACK is only Server to Client",
- __func__);
- goto send_unsupp_connack_and_close;
- }
- lwsl_debug("%s: received SUBACK pkt\n", __func__);
- lws_mqtt_vbi_init(&par->vbit);
- switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- par->cpkt_remlen = par->vbit.value;
- lwsl_debug("%s: SUBACK pkt len = %d\n",
- __func__, (int)par->cpkt_remlen);
- if (par->cpkt_remlen <= 2)
- goto send_protocol_error_and_close;
- par->state = LMQCPP_SUBACK_VH_PKT_ID;
- break;
- default:
- lwsl_notice("%s: suback bad vbi\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_SUBACK_VH_PKT_ID:
- if (len < 2) {
- lwsl_notice("%s: len breakage 4\n", __func__);
- return -1;
- }
- par->cpkt_id = lws_ser_ru16be(buf);
- wsi->mqtt->ack_pkt_id = par->cpkt_id;
- buf += 2;
- len -= 2;
- par->cpkt_remlen -= 2;
- par->n = 0;
- par->state = LMQCPP_SUBACK_PAYLOAD;
- *par->temp = 0;
- break;
- case LMQCPP_SUBACK_PAYLOAD:
- {
- lws_mqtt_qos_levels_t qos = (lws_mqtt_qos_levels_t)*buf++;
- len--;
- switch (qos) {
- case QOS0:
- case QOS1:
- case QOS2:
- break;
- case FAILURE_QOS_LEVEL:
- goto send_protocol_error_and_close;
- default:
- par->reason = LMQCP_REASON_MALFORMED_PACKET;
- goto send_reason_and_close;
- }
- if (++(par->n) == par->cpkt_remlen) {
- par->n = 0;
- goto cmd_completion;
- }
- break;
- }
- /* UNSUBACK */
- case LMQCPP_UNSUBACK_PACKET:
- if (!lwsi_role_client(wsi)) {
- lwsl_err("%s: UNSUBACK is only Server to Client",
- __func__);
- goto send_unsupp_connack_and_close;
- }
- lwsl_debug("%s: received UNSUBACK pkt\n", __func__);
- lws_mqtt_vbi_init(&par->vbit);
- switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- par->cpkt_remlen = par->vbit.value;
- lwsl_debug("%s: UNSUBACK pkt len = %d\n",
- __func__, (int)par->cpkt_remlen);
- if (par->cpkt_remlen < 2)
- goto send_protocol_error_and_close;
- par->state = LMQCPP_UNSUBACK_VH_PKT_ID;
- break;
- default:
- lwsl_notice("%s: unsuback bad vbi\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_UNSUBACK_VH_PKT_ID:
- if (len < 2) {
- lwsl_notice("%s: len breakage 3\n", __func__);
- return -1;
- }
- par->cpkt_id = lws_ser_ru16be(buf);
- wsi->mqtt->ack_pkt_id = par->cpkt_id;
- buf += 2;
- len -= 2;
- par->cpkt_remlen -= 2;
- par->n = 0;
- goto cmd_completion;
- case LMQCPP_PUBACK_PACKET:
- lws_mqtt_vbi_init(&par->vbit);
- switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- par->cpkt_remlen = par->vbit.value;
- lwsl_info("%s: PUBACK pkt len = %d\n", __func__,
- (int)par->cpkt_remlen);
- /*
- * must be 4 or more, with special case that 2
- * means success with no reason code or props
- */
- if (par->cpkt_remlen <= 1 ||
- par->cpkt_remlen == 3)
- goto send_protocol_error_and_close;
- par->state = LMQCPP_PUBACK_VH_PKT_ID;
- par->fixed_seen[2] = par->fixed_seen[3] = 0;
- par->fixed = 0;
- par->n = 0;
- break;
- default:
- lwsl_notice("%s: puback bad vbi\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_PUBACK_VH_PKT_ID:
- /*
- * There are 3 fixed bytes and then a VBI for the
- * property section length
- */
- par->fixed_seen[par->fixed++] = *buf++;
- if (len < par->cpkt_remlen - par->n) {
- lwsl_notice("%s: len breakage 4\n", __func__);
- return -1;
- }
- len--;
- par->n++;
- if (par->fixed == 2)
- par->cpkt_id = lws_ser_ru16be(par->fixed_seen);
- if (par->fixed == 3) {
- lws_mqtt_vbi_init(&par->vbit);
- par->props_consumed = 0;
- par->state = LMQCPP_PUBACK_PROPERTIES_LEN_VBI;
- }
- /* length of 2 is truncated packet and we completed it */
- if (par->cpkt_remlen == par->fixed)
- goto cmd_completion;
- break;
- case LMQCPP_PUBACK_PROPERTIES_LEN_VBI:
- switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- par->props_len = par->vbit.value;
- lwsl_info("%s: PUBACK props len = %d\n",
- __func__, (int)par->cpkt_remlen);
- /*
- * If there are no properties, this is a
- * command completion event in itself
- */
- if (!par->props_len)
- goto cmd_completion;
- /*
- * Otherwise consume the properties before
- * completing the command
- */
- lws_mqtt_vbi_init(&par->vbit);
- par->state = LMQCPP_PUBACK_VH_PKT_ID;
- break;
- default:
- lwsl_notice("%s: puback pr bad vbi\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_EAT_PROPERTIES_AND_COMPLETE:
- /*
- * TODO: stash the props
- */
- par->props_consumed++;
- len--;
- buf++;
- if (par->props_len != par->props_consumed)
- break;
- cmd_completion:
- /*
- * We come here when we understood we just processed
- * the last byte of a command packet, regardless of the
- * packet type
- */
- par->state = LMQCPP_IDLE;
- switch (par->packet_type_flags >> 4) {
- case LMQCP_STOC_CONNACK:
- lwsl_info("%s: cmd_completion: CONNACK\n",
- __func__);
- /*
- * Getting the CONNACK means we are the first,
- * the nwsi, and we succeeded to create a new
- * network connection ourselves.
- *
- * Since others may join us sharing the nwsi,
- * and we may close while they still want to use
- * it, our wsi lifecycle alone can no longer
- * define the lifecycle of the nwsi... it means
- * we need to do a "magic trick" and instead of
- * being both the nwsi and act like a child
- * stream, create a new wsi to take over the
- * nwsi duties and turn our wsi into a child of
- * the nwsi with its own lifecycle.
- *
- * The nwsi gets a mostly empty wsi->nwsi used
- * to track already-subscribed topics globally
- * for the connection.
- */
- /* we were under SENT_CLIENT_HANDSHAKE timeout */
- lws_set_timeout(wsi, 0, 0);
- w = lws_create_new_server_wsi(wsi->a.vhost,
- wsi->tsi);
- if (!w) {
- lwsl_notice("%s: sid 1 migrate failed\n",
- __func__);
- return -1;
- }
- wsi->mux.highest_sid = 1;
- lws_wsi_mux_insert(w, wsi, wsi->mux.highest_sid++);
- wsi->mux_substream = 1;
- w->mux_substream = 1;
- w->client_mux_substream = 1;
- wsi->client_mux_migrated = 1;
- wsi->told_user_closed = 1; /* don't tell nwsi closed */
- lwsi_set_state(w, LRS_ESTABLISHED);
- lwsi_set_state(wsi, LRS_ESTABLISHED);
- lwsi_set_role(w, lwsi_role(wsi));
- #if defined(LWS_WITH_CLIENT)
- w->flags = wsi->flags;
- #endif
- w->mqtt = wsi->mqtt;
- wsi->mqtt = lws_zalloc(sizeof(*wsi->mqtt), "nwsi mqtt");
- if (!wsi->mqtt)
- return -1;
- w->mqtt->wsi = w;
- w->a.protocol = wsi->a.protocol;
- if (w->user_space &&
- !w->user_space_externally_allocated)
- lws_free_set_NULL(w->user_space);
- w->user_space = wsi->user_space;
- wsi->user_space = NULL;
- w->user_space_externally_allocated =
- wsi->user_space_externally_allocated;
- if (lws_ensure_user_space(w))
- goto bail1;
- w->a.opaque_user_data = wsi->a.opaque_user_data;
- wsi->a.opaque_user_data = NULL;
- w->stash = wsi->stash;
- wsi->stash = NULL;
- lws_mux_mark_immortal(w);
- lwsl_notice("%s: migrated nwsi %p to sid 1 %p\n",
- __func__, wsi, w);
- #if defined(LWS_WITH_SERVER_STATUS)
- wsi->a.vhost->conn_stats.h2_subs++;
- #endif
- /*
- * It was the last thing we were waiting for
- * before we can be fully ESTABLISHED
- */
- if (lws_mqtt_set_client_established(w)) {
- lwsl_notice("%s: set EST fail\n", __func__);
- return -1;
- }
- /* get the ball rolling */
- lws_validity_confirmed(wsi);
- /* well, add the queued guys as children */
- lws_wsi_mux_apply_queue(wsi);
- break;
- bail1:
- /* undo the insert */
- wsi->mux.child_list = w->mux.sibling_list;
- wsi->mux.child_count--;
- w->a.context->count_wsi_allocated--;
- if (w->user_space)
- lws_free_set_NULL(w->user_space);
- w->a.vhost->protocols[0].callback(w,
- LWS_CALLBACK_WSI_DESTROY,
- NULL, NULL, 0);
- lws_vhost_unbind_wsi(w);
- lws_free(w);
- return 0;
- case LMQCP_PUBACK:
- lwsl_info("%s: cmd_completion: PUBACK\n",
- __func__);
- /*
- * Figure out which child asked for this
- */
- n = 0;
- lws_start_foreach_ll(struct lws *, w,
- wsi->mux.child_list) {
- if (w->mqtt->unacked_publish &&
- w->mqtt->ack_pkt_id == par->cpkt_id) {
- char requested_close = 0;
- w->mqtt->unacked_publish = 0;
- if (user_callback_handle_rxflow(
- w->a.protocol->callback,
- w, LWS_CALLBACK_MQTT_ACK,
- w->user_space, NULL, 0) < 0) {
- lwsl_info("%s: MQTT_ACK requests close\n",
- __func__);
- requested_close = 1;
- }
- n = 1;
- /*
- * We got an assertive PUBACK,
- * no need for ACK timeout wait
- * any more
- */
- lws_sul_cancel(&w->mqtt->sul_qos1_puback_wait);
- if (requested_close) {
- __lws_close_free_wsi(w,
- 0, "ack cb");
- break;
- }
- break;
- }
- } lws_end_foreach_ll(w, mux.sibling_list);
- if (!n) {
- lwsl_err("%s: unsolicited PUBACK\n",
- __func__);
- return -1;
- }
- /*
- * If we published something and it was acked,
- * our connection is definitely working in both
- * directions at the moment.
- */
- lws_validity_confirmed(wsi);
- break;
- case LMQCP_STOC_PINGRESP:
- lwsl_info("%s: cmd_completion: PINGRESP\n",
- __func__);
- /*
- * If we asked for a PINGRESP and it came,
- * our connection is definitely working in both
- * directions at the moment.
- */
- lws_validity_confirmed(wsi);
- break;
- case LMQCP_STOC_SUBACK:
- lwsl_info("%s: cmd_completion: SUBACK\n",
- __func__);
- /*
- * Figure out which child asked for this
- */
- n = 0;
- lws_start_foreach_ll(struct lws *, w,
- wsi->mux.child_list) {
- if (w->mqtt->inside_subscribe &&
- w->mqtt->ack_pkt_id == par->cpkt_id) {
- w->mqtt->inside_subscribe = 0;
- if (user_callback_handle_rxflow(
- w->a.protocol->callback,
- w, LWS_CALLBACK_MQTT_SUBSCRIBED,
- w->user_space, NULL, 0) < 0) {
- lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
- __func__);
- return -1;
- }
- n = 1;
- break;
- }
- } lws_end_foreach_ll(w, mux.sibling_list);
- if (!n) {
- lwsl_err("%s: unsolicited SUBACK\n",
- __func__);
- return -1;
- }
- /*
- * If we subscribed to something and SUBACK came,
- * our connection is definitely working in both
- * directions at the moment.
- */
- lws_validity_confirmed(wsi);
- break;
- case LMQCP_STOC_UNSUBACK:
- {
- char requested_close = 0;
- lwsl_info("%s: cmd_completion: UNSUBACK\n",
- __func__);
- /*
- * Figure out which child asked for this
- */
- n = 0;
- lws_start_foreach_ll(struct lws *, w,
- wsi->mux.child_list) {
- if (w->mqtt->inside_unsubscribe &&
- w->mqtt->ack_pkt_id == par->cpkt_id) {
- struct lws *nwsi = lws_get_network_wsi(w);
- /*
- * No more subscribers left,
- * remove the topic from nwsi
- */
- lws_mqtt_client_remove_subs(nwsi->mqtt);
- w->mqtt->inside_unsubscribe = 0;
- if (user_callback_handle_rxflow(
- w->a.protocol->callback,
- w, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
- w->user_space, NULL, 0) < 0) {
- lwsl_info("%s: MQTT_UNSUBACK requests close\n",
- __func__);
- requested_close = 1;
- }
- n = 1;
- if (requested_close) {
- __lws_close_free_wsi(w,
- 0, "unsub ack cb");
- break;
- }
- break;
- }
- } lws_end_foreach_ll(w, mux.sibling_list);
- if (!n) {
- lwsl_err("%s: unsolicited UNSUBACK\n",
- __func__);
- return -1;
- }
- /*
- * If we unsubscribed to something and
- * UNSUBACK came, our connection is
- * definitely working in both
- * directions at the moment.
- */
- lws_validity_confirmed(wsi);
- break;
- }
- case LMQCP_PUBLISH:
- {
- lws_mqtt_publish_param_t *pub =
- (lws_mqtt_publish_param_t *)
- wsi->mqtt->rx_cpkt_param;
- size_t chunk;
- if (pub == NULL) {
- lwsl_notice("%s: no pub\n", __func__);
- return -1;
- }
- /*
- * RX PUBLISH is delivered to any children that
- * registered for the related topic
- */
- n = wsi->role_ops->rx_cb[lwsi_role_server(wsi)];
- chunk = pub->payload_len - pub->payload_pos;
- if (chunk > len)
- chunk = len;
- lws_start_foreach_ll(struct lws *, w,
- wsi->mux.child_list) {
- if (lws_mqtt_find_sub(w->mqtt,
- pub->topic))
- if (w->a.protocol->callback(
- w, n,
- w->user_space,
- (void *)pub,
- chunk))
- return 1;
- } lws_end_foreach_ll(w, mux.sibling_list);
- pub->payload_pos += (uint32_t)chunk;
- len -= chunk;
- buf += chunk;
- lwsl_debug("%s: post pos %d, plen %d, len %d\n",
- __func__, (int)pub->payload_pos,
- (int)pub->payload_len, (int)len);
- if (pub->payload_pos != pub->payload_len) {
- /*
- * More chunks of the payload pending,
- * blocking this connection from doing
- * anything else
- */
- par->state = LMQCPP_PAYLOAD;
- break;
- }
- /* For QOS>0, send out PUBACK */
- if (pub->qos) {
- wsi->mqtt->send_puback = 1;
- lws_callback_on_writable(wsi);
- }
- par->payload_consumed = 0;
- lws_free_set_NULL(pub->topic);
- lws_free_set_NULL(wsi->mqtt->rx_cpkt_param);
- break;
- }
- default:
- break;
- }
- break;
- case LMQCPP_PROP_ID_VBI:
- switch (lws_mqtt_vbi_r(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- par->consumed += par->vbit.consumed;
- if (par->vbit.value >
- LWS_ARRAY_SIZE(property_valid)) {
- lwsl_notice("%s: undef prop id 0x%x\n",
- __func__, (int)par->vbit.value);
- goto send_protocol_error_and_close;
- }
- if (!(property_valid[par->vbit.value] &
- (1 << ctl_pkt_type(par)))) {
- lwsl_notice("%s: prop id 0x%x invalid for"
- " control pkt %d\n", __func__,
- (int)par->vbit.value,
- ctl_pkt_type(par));
- goto send_protocol_error_and_close;
- }
- par->prop_id = par->vbit.value;
- par->flag_prop_multi =
- par->props_seen[par->prop_id >> 3] &
- (1 << (par->prop_id & 7));
- par->props_seen[par->prop_id >> 3] |=
- (1 << (par->prop_id & 7));
- /*
- * even if it's not a vbi property arg,
- * .consumed of this will be zero the first time
- */
- lws_mqtt_vbi_init(&par->vbit);
- /*
- * if it's a string, next state must set the
- * destination and size limit itself. But
- * resetting it generically here lets it use
- * lws_mqtt_str_first() to understand it's the
- * first time around.
- */
- lws_mqtt_str_init(&par->s_temp, NULL, 0, 0);
- /* property arg state enums are so encoded */
- par->state = 0x100 | par->vbit.value;
- break;
- default:
- lwsl_notice("%s: prop id bad vbi\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- /*
- * All possible property payloads... restricting which ones
- * can appear in which control packets is already done above
- * in LMQCPP_PROP_ID_VBI
- */
- case LMQCPP_PROP_REQUEST_PROBLEM_INFO_1BYTE:
- case LMQCPP_PROP_REQUEST_REPSONSE_INFO_1BYTE:
- case LMQCPP_PROP_MAXIMUM_QOS_1BYTE:
- case LMQCPP_PROP_RETAIN_AVAILABLE_1BYTE:
- case LMQCPP_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE_1BYTE:
- case LMQCPP_PROP_SUBSCRIPTION_IDENTIFIER_AVAILABLE_1BYTE:
- case LMQCPP_PROP_SHARED_SUBSCRIPTION_AVAILABLE_1BYTE:
- case LMQCPP_PROP_PAYLOAD_FORMAT_INDICATOR_1BYTE: /* 3.3.2.3.2 */
- if (par->flag_prop_multi)
- goto singular_prop_seen_twice;
- par->payload_format = *buf++;
- len--;
- if (lws_mqtt_pconsume(par, 1))
- goto send_protocol_error_and_close;
- break;
- case LMQCPP_PROP_MAXIMUM_PACKET_SIZE_4BYTE:
- case LMQCPP_PROP_WILL_DELAY_INTERVAL_4BYTE:
- case LMQCPP_PROP_SESSION_EXPIRY_INTERVAL_4BYTE:
- case LMQCPP_PROP_MSG_EXPIRY_INTERVAL_4BYTE:
- if (par->flag_prop_multi)
- goto singular_prop_seen_twice;
- if (lws_mqtt_mb_first(&par->vbit))
- lws_mqtt_4byte_init(&par->vbit);
- switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- if (lws_mqtt_pconsume(par, par->vbit.consumed))
- goto send_protocol_error_and_close;
- break;
- default:
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_PROP_SERVER_KEEPALIVE_2BYTE:
- case LMQCPP_PROP_RECEIVE_MAXIMUM_2BYTE:
- case LMQCPP_PROP_TOPIC_MAXIMUM_2BYTE:
- case LMQCPP_PROP_TOPIC_ALIAS_2BYTE:
- if (par->flag_prop_multi)
- goto singular_prop_seen_twice;
- if (lws_mqtt_mb_first(&par->vbit))
- lws_mqtt_2byte_init(&par->vbit);
- switch (lws_mqtt_mb_parse(&par->vbit, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- if (lws_mqtt_pconsume(par, par->vbit.consumed))
- goto send_protocol_error_and_close;
- break;
- default:
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_PROP_ASSIGNED_CLIENTID_UTF8S:
- case LMQCPP_PROP_AUTH_METHOD_UTF8S:
- case LMQCPP_PROP_USER_PROPERTY_NAME_UTF8S:
- case LMQCPP_PROP_USER_PROPERTY_VALUE_UTF8S:
- case LMQCPP_PROP_RESPONSE_INFO_UTF8S:
- case LMQCPP_PROP_SERVER_REFERENCE_UTF8S:
- case LMQCPP_PROP_REASON_STRING_UTF8S:
- case LMQCPP_PROP_RESPONSE_TOPIC_UTF8S:
- case LMQCPP_PROP_CONTENT_TYPE_UTF8S:
- if (par->flag_prop_multi)
- goto singular_prop_seen_twice;
- if (lws_mqtt_str_first(&par->s_temp))
- lws_mqtt_str_init(&par->s_temp, par->temp,
- sizeof(par->temp), 0);
- switch (lws_mqtt_str_parse(&par->s_temp, &buf, &len)) {
- case LMSPR_NEED_MORE:
- break;
- case LMSPR_COMPLETED:
- if (lws_mqtt_pconsume(par, par->s_temp.len))
- goto send_protocol_error_and_close;
- break;
- default:
- lwsl_info("%s: bad protocol name\n", __func__);
- goto send_protocol_error_and_close;
- }
- break;
- case LMQCPP_PROP_SUBSCRIPTION_ID_VBI:
- case LMQCPP_PROP_CORRELATION_BINDATA:
- case LMQCPP_PROP_AUTH_DATA_BINDATA:
- /* TODO */
- lwsl_err("%s: Unimplemented packet state 0x%x\n",
- __func__, par->state);
- return -1;
- }
- }
- return 0;
- oom:
- lwsl_err("%s: OOM!\n", __func__);
- goto send_protocol_error_and_close;
- singular_prop_seen_twice:
- lwsl_info("%s: property appears twice\n", __func__);
- send_protocol_error_and_close:
- lwsl_notice("%s: peac\n", __func__);
- par->reason = LMQCP_REASON_PROTOCOL_ERROR;
- send_reason_and_close:
- lwsl_notice("%s: srac\n", __func__);
- par->flag_pending_send_reason_close = 1;
- goto ask;
- send_unsupp_connack_and_close:
- lwsl_notice("%s: unsupac\n", __func__);
- par->reason = LMQCP_REASON_UNSUPPORTED_PROTOCOL;
- par->flag_pending_send_connack_close = 1;
- ask:
- /* Should we ask for clients? */
- lws_callback_on_writable(wsi);
- return -1;
- }
- int
- lws_mqtt_fill_fixed_header(uint8_t *p, lws_mqtt_control_packet_t ctrl_pkt_type,
- uint8_t dup, lws_mqtt_qos_levels_t qos,
- uint8_t retain)
- {
- lws_mqtt_fixed_hdr_t hdr;
- hdr.bits = 0;
- hdr.flags.ctrl_pkt_type = (uint8_t) ctrl_pkt_type;
- switch(ctrl_pkt_type) {
- case LMQCP_PUBLISH:
- hdr.flags.dup = !!dup;
- /*
- * A PUBLISH Packet MUST NOT have both QoS bits set to
- * 1. If a Server or Client receives a PUBLISH Packet
- * which has both QoS bits set to 1 it MUST close the
- * Network Connection [MQTT-3.3.1-4].
- */
- if (qos >= RESERVED_QOS_LEVEL) {
- lwsl_err("%s: Unsupport QoS level 0x%x\n",
- __func__, qos);
- return -1;
- }
- hdr.flags.qos = (uint8_t)qos;
- hdr.flags.retain = !!retain;
- break;
- case LMQCP_CTOS_CONNECT:
- case LMQCP_STOC_CONNACK:
- case LMQCP_PUBACK:
- case LMQCP_PUBREC:
- case LMQCP_PUBCOMP:
- case LMQCP_STOC_SUBACK:
- case LMQCP_STOC_UNSUBACK:
- case LMQCP_CTOS_PINGREQ:
- case LMQCP_STOC_PINGRESP:
- case LMQCP_DISCONNECT:
- case LMQCP_AUTH:
- hdr.bits &= 0xf0;
- break;
- /*
- * Bits 3,2,1 and 0 of the fixed header of the PUBREL,
- * SUBSCRIBE, UNSUBSCRIBE Control Packets are reserved and
- * MUST be set to 0,0,1 and 0 respectively. The Server MUST
- * treat any other value as malformed and close the Network
- * Connection [MQTT-3.6.1-1], [MQTT-3.8.1-1], [MQTT-3.10.1-1].
- */
- case LMQCP_PUBREL:
- case LMQCP_CTOS_SUBSCRIBE:
- case LMQCP_CTOS_UNSUBSCRIBE:
- hdr.bits |= 0x02;
- break;
- default:
- return -1;
- }
- *p = hdr.bits;
- return 0;
- }
- /*
- * This fires if the wsi did a PUBLISH under QoS1, but no PUBACK came before
- * the timeout period
- */
- static void
- lws_mqtt_publish_resend(struct lws_sorted_usec_list *sul)
- {
- struct _lws_mqtt_related *mqtt = lws_container_of(sul,
- struct _lws_mqtt_related, sul_qos1_puback_wait);
- lwsl_notice("%s: wsi %p\n", __func__, mqtt->wsi);
- if (mqtt->wsi->a.protocol->callback(mqtt->wsi, LWS_CALLBACK_MQTT_RESEND,
- mqtt->wsi->user_space, NULL, 0))
- lws_set_timeout(mqtt->wsi, 1, LWS_TO_KILL_ASYNC);
- }
- int
- lws_mqtt_client_send_publish(struct lws *wsi, lws_mqtt_publish_param_t *pub,
- const void *buf, uint32_t len, int is_complete)
- {
- struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
- uint8_t *b = (uint8_t *)pt->serv_buf, *start, *p;
- struct lws *nwsi = lws_get_network_wsi(wsi);
- lws_mqtt_str_t mqtt_vh_payload;
- uint32_t vh_len, rem_len;
- assert(pub->topic);
- lwsl_debug("%s: len = %d, is_complete = %d\n",
- __func__, (int)len, (int)is_complete);
- if (lwsi_state(wsi) != LRS_ESTABLISHED) {
- lwsl_err("%s: wsi %p: unknown state 0x%x\n", __func__, wsi,
- lwsi_state(wsi));
- assert(0);
- return 1;
- }
- if (wsi->mqtt->inside_payload) {
- /*
- * Headers are filled, we are sending
- * the payload - a buffer with LWS_PRE
- * in front it.
- */
- start = (uint8_t *)buf;
- p = start + len;
- if (is_complete)
- wsi->mqtt->inside_payload = 0;
- goto do_write;
- }
- start = b + LWS_PRE;
- p = start;
- /*
- * Fill headers and the first chunk of the
- * payload (if any)
- */
- if (lws_mqtt_fill_fixed_header(p++, LMQCP_PUBLISH,
- 0, pub->qos, 0)) {
- lwsl_err("%s: Failed to fill fixed header\n", __func__);
- return 1;
- }
- /*
- * Topic len field + Topic len + Packet ID
- * (for QOS>0) + Payload len
- */
- vh_len = 2 + pub->topic_len + ((pub->qos) ? 2 : 0);
- rem_len = vh_len + pub->payload_len;
- lwsl_debug("%s: Remaining len = %d\n", __func__, (int) rem_len);
- /* Will the chunk of payload fit? */
- if ((vh_len + len) >=
- (wsi->a.context->pt_serv_buf_size - LWS_PRE)) {
- lwsl_err("%s: Payload is too big\n", __func__);
- return 1;
- }
- p += lws_mqtt_vbi_encode(rem_len, p);
- /* Topic's Len */
- lws_ser_wu16be(p, pub->topic_len);
- p += 2;
- /*
- * Init lws_mqtt_str for "MQTT Variable
- * Headers + payload" (only the supplied
- * chuncked payload)
- */
- lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p,
- (pub->topic_len + ((pub->qos) ? 2 : 0) + len),
- 0);
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- lws_strncpy((char *)p, pub->topic, (size_t)pub->topic_len+1);
- if (lws_mqtt_str_advance(&mqtt_vh_payload, pub->topic_len)) {
- lwsl_err("%s: a\n", __func__);
- return 1;
- }
- /* Packet ID */
- if (pub->qos != QOS0) {
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- wsi->mqtt->ack_pkt_id = pub->packet_id = ++nwsi->mqtt->pkt_id;
- lwsl_debug("%s: pkt_id = %d\n", __func__,
- (int)wsi->mqtt->ack_pkt_id);
- lws_ser_wu16be(p, pub->packet_id);
- if (lws_mqtt_str_advance(&mqtt_vh_payload, 2)) {
- lwsl_err("%s: b\n", __func__);
- return 1;
- }
- }
- /*
- * A non-empty Payload is expected and a chunk
- * is present
- */
- if (pub->payload_len && len) {
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- memcpy(p, buf, len);
- if (lws_mqtt_str_advance(&mqtt_vh_payload, len))
- return 1;
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- }
- if (!is_complete)
- nwsi->mqtt->inside_payload = wsi->mqtt->inside_payload = 1;
- do_write:
- // lwsl_hexdump_err(start, lws_ptr_diff(p, start));
- if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
- lws_ptr_diff(p, start)) {
- lwsl_err("%s: write failed\n", __func__);
- return 1;
- }
- if (!is_complete) {
- /* still some more chunks to come... */
- lws_callback_on_writable(wsi);
- return 0;
- }
- wsi->mqtt->inside_payload = nwsi->mqtt->inside_payload = 0;
- if (pub->qos != QOS0)
- wsi->mqtt->unacked_publish = 1;
- /* this was the last part of the publish message */
- if (pub->qos == QOS0) {
- /*
- * There won't be any real PUBACK, act like we got one
- * so the user callback logic is the same for QoS0 or
- * QoS1
- */
- if (wsi->a.protocol->callback(wsi, LWS_CALLBACK_MQTT_ACK,
- wsi->user_space, NULL, 0)) {
- lwsl_err("%s: ACK callback exited\n", __func__);
- return 1;
- }
- return 0;
- }
- /* For QoS1, if no PUBACK coming after 3s, we must RETRY the publish */
- wsi->mqtt->sul_qos1_puback_wait.cb = lws_mqtt_publish_resend;
- __lws_sul_insert_us(&pt->pt_sul_owner[wsi->conn_validity_wakesuspend],
- &wsi->mqtt->sul_qos1_puback_wait,
- 3 * LWS_USEC_PER_SEC);
- return 0;
- }
- int
- lws_mqtt_client_send_subcribe(struct lws *wsi, lws_mqtt_subscribe_param_t *sub)
- {
- struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
- uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
- struct lws *nwsi = lws_get_network_wsi(wsi);
- lws_mqtt_str_t mqtt_vh_payload;
- uint8_t exists[8], extant;
- lws_mqtt_subs_t *mysub;
- uint32_t rem_len;
- #if defined(_DEBUG)
- uint32_t tops;
- #endif
- uint32_t n;
- assert(sub->num_topics);
- assert(sub->num_topics < sizeof(exists));
- switch (lwsi_state(wsi)) {
- case LRS_ESTABLISHED: /* Protocol connection established */
- if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_SUBSCRIBE,
- 0, 0, 0)) {
- lwsl_err("%s: Failed to fill fixed header\n", __func__);
- return 1;
- }
- /*
- * The stream wants to subscribe to one or more topic, but
- * the shared nwsi may already be subscribed to some or all of
- * them from interactions with other streams. For those cases,
- * we filter them from the list the child wants until we just
- * have ones that are new to the nwsi. If nothing left, we just
- * synthesize the callback to the child as if SUBACK had come
- * and we're done, otherwise just ask the server for topics that
- * are new to the wsi.
- */
- extant = 0;
- memset(&exists, 0, sizeof(exists));
- for (n = 0; n < sub->num_topics; n++) {
- lwsl_info("%s: Subscribing to topic[%d] = \"%s\"\n",
- __func__, (int)n, sub->topic[n].name);
- mysub = lws_mqtt_find_sub(nwsi->mqtt, sub->topic[n].name);
- if (mysub && mysub->ref_count) {
- mysub->ref_count++; /* another stream using it */
- exists[n] = 1;
- extant++;
- }
- /*
- * Attach the topic we're subscribing to, to wsi->mqtt
- */
- if (!lws_mqtt_create_sub(wsi->mqtt, sub->topic[n].name)) {
- lwsl_err("%s: create sub fail\n", __func__);
- return 1;
- }
- }
- if (extant == sub->num_topics) {
- /*
- * It turns out there's nothing to do here, the nwsi has
- * already subscribed to all the topics this stream
- * wanted. Just tell it it can have them.
- */
- lwsl_notice("%s: all topics already subscribed\n", __func__);
- if (user_callback_handle_rxflow(
- wsi->a.protocol->callback,
- wsi, LWS_CALLBACK_MQTT_SUBSCRIBED,
- wsi->user_space, NULL, 0) < 0) {
- lwsl_err("%s: MQTT_SUBSCRIBE failed\n",
- __func__);
- return -1;
- }
- return 0;
- }
- #if defined(_DEBUG)
- /*
- * zero or more of the topics already existed, but not all,
- * so we must go to the server with a filtered list of the
- * new ones only
- */
- tops = sub->num_topics - extant;
- #endif
- /*
- * Pid + (Topic len field + Topic len + Req. QoS) x Num of Topics
- */
- rem_len = 2;
- for (n = 0; n < sub->num_topics; n++)
- if (!exists[n])
- rem_len += (2 + (uint32_t)strlen(sub->topic[n].name) + (uint32_t)1);
- wsi->mqtt->sub_size = rem_len;
- #if defined(_DEBUG)
- lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
- __func__, (int)tops, (int)rem_len);
- #endif
- p += lws_mqtt_vbi_encode(rem_len, p);
- if ((rem_len + lws_ptr_diff(p, start)) >=
- wsi->a.context->pt_serv_buf_size) {
- lwsl_err("%s: Payload is too big\n", __func__);
- return 1;
- }
- /* Init lws_mqtt_str */
- lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, rem_len, 0);
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- /* Packet ID */
- wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
- lwsl_debug("%s: pkt_id = %d\n", __func__,
- (int)wsi->mqtt->ack_pkt_id);
- lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
- if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
- return 1;
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- for (n = 0; n < sub->num_topics; n++) {
- lwsl_info("%s: topics[%d] = %s\n", __func__,
- (int)n, sub->topic[n].name);
- /* if the nwsi already has it, don't ask server for it */
- if (exists[n]) {
- lwsl_info("%s: topics[%d] \"%s\" exists in nwsi\n",
- __func__, (int)n, sub->topic[n].name);
- continue;
- }
- /*
- * Attach the topic we're subscribing to, to nwsi->mqtt
- * so we know the nwsi itself has a subscription to it
- */
- if (!lws_mqtt_create_sub(nwsi->mqtt, sub->topic[n].name))
- return 1;
- /* Topic's Len */
- lws_ser_wu16be(p, (uint16_t)strlen(sub->topic[n].name));
- if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
- return 1;
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- /* Topic Name */
- lws_strncpy((char *)p, sub->topic[n].name,
- strlen(sub->topic[n].name) + 1);
- if (lws_mqtt_str_advance(&mqtt_vh_payload,
- (int)strlen(sub->topic[n].name)))
- return 1;
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- /* QoS */
- *p = sub->topic[n].qos;
- if (lws_mqtt_str_advance(&mqtt_vh_payload, 1))
- return 1;
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- }
- break;
- default:
- return 1;
- }
- if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
- lws_ptr_diff(p, start))
- return 1;
- wsi->mqtt->inside_subscribe = 1;
- return 0;
- }
- int
- lws_mqtt_client_send_unsubcribe(struct lws *wsi,
- const lws_mqtt_subscribe_param_t *unsub)
- {
- struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
- uint8_t *b = (uint8_t *)pt->serv_buf + LWS_PRE, *start = b, *p = start;
- struct lws *nwsi = lws_get_network_wsi(wsi);
- lws_mqtt_str_t mqtt_vh_payload;
- uint8_t send_unsub[8], orphaned;
- uint32_t rem_len, n;
- lws_mqtt_subs_t *mysub;
- #if defined(_DEBUG)
- uint32_t tops;
- #endif
- lwsl_info("%s: Enter\n", __func__);
- switch (lwsi_state(wsi)) {
- case LRS_ESTABLISHED: /* Protocol connection established */
- orphaned = 0;
- memset(&send_unsub, 0, sizeof(send_unsub));
- for (n = 0; n < unsub->num_topics; n++) {
- mysub = lws_mqtt_find_sub(nwsi->mqtt,
- unsub->topic[n].name);
- assert(mysub);
- if (mysub && --mysub->ref_count == 0) {
- lwsl_notice("%s: Need to send UNSUB\n", __func__);
- send_unsub[n] = 1;
- orphaned++;
- }
- }
- if (!orphaned) {
- /*
- * The nwsi still has other subscribers bound to the
- * topics.
- *
- * So, don't send UNSUB to server, and just fake the
- * UNSUB ACK event for the guy going away.
- */
- lwsl_notice("%s: unsubscribed!\n", __func__);
- if (user_callback_handle_rxflow(
- wsi->a.protocol->callback,
- wsi, LWS_CALLBACK_MQTT_UNSUBSCRIBED,
- wsi->user_space, NULL, 0) < 0) {
- /*
- * We can't directly close here, because the
- * caller still has the wsi. Inform the
- * caller that we want to close
- */
- return 1;
- }
- return 0;
- }
- #if defined(_DEBUG)
- /*
- * one or more of the topics needs to be unsubscribed
- * from, so we must go to the server with a filtered
- * list of the new ones only
- */
- tops = orphaned;
- #endif
- if (lws_mqtt_fill_fixed_header(p++, LMQCP_CTOS_UNSUBSCRIBE,
- 0, 0, 0)) {
- lwsl_err("%s: Failed to fill fixed header\n", __func__);
- return 1;
- }
- /*
- * Pid + (Topic len field + Topic len) x Num of Topics
- */
- rem_len = 2;
- for (n = 0; n < unsub->num_topics; n++)
- if (send_unsub[n])
- rem_len += (2 + (uint32_t)strlen(unsub->topic[n].name));
- wsi->mqtt->sub_size = rem_len;
- lwsl_debug("%s: Number of topics = %d, Remaining len = %d\n",
- __func__, (int)tops, (int)rem_len);
- p += lws_mqtt_vbi_encode(rem_len, p);
- if ((rem_len + lws_ptr_diff(p, start)) >=
- wsi->a.context->pt_serv_buf_size) {
- lwsl_err("%s: Payload is too big\n", __func__);
- return 1;
- }
- /* Init lws_mqtt_str */
- lws_mqtt_str_init(&mqtt_vh_payload, (uint8_t *)p, rem_len, 0);
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- /* Packet ID */
- wsi->mqtt->ack_pkt_id = ++nwsi->mqtt->pkt_id;
- lwsl_debug("%s: pkt_id = %d\n", __func__,
- (int)wsi->mqtt->ack_pkt_id);
- lws_ser_wu16be(p, wsi->mqtt->ack_pkt_id);
- if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
- return 1;
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- for (n = 0; n < unsub->num_topics; n++) {
- lwsl_info("%s: topics[%d] = %s\n", __func__,
- (int)n, unsub->topic[n].name);
- /*
- * Subscriber still bound to it, don't UBSUB
- * from the server
- */
- if (!send_unsub[n])
- continue;
- /* Topic's Len */
- lws_ser_wu16be(p, (uint16_t)strlen(unsub->topic[n].name));
- if (lws_mqtt_str_advance(&mqtt_vh_payload, 2))
- return 1;
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- /* Topic Name */
- lws_strncpy((char *)p, unsub->topic[n].name,
- strlen(unsub->topic[n].name) + 1);
- if (lws_mqtt_str_advance(&mqtt_vh_payload,
- (int)strlen(unsub->topic[n].name)))
- return 1;
- p = lws_mqtt_str_next(&mqtt_vh_payload, NULL);
- }
- break;
- default:
- return 1;
- }
- if (lws_write(nwsi, start, lws_ptr_diff(p, start), LWS_WRITE_BINARY) !=
- lws_ptr_diff(p, start))
- return 1;
- wsi->mqtt->inside_unsubscribe = 1;
- return 0;
- }
- /*
- * This is called when child streams bind to an already-existing and compatible
- * MQTT stream
- */
- struct lws *
- lws_wsi_mqtt_adopt(struct lws *parent_wsi, struct lws *wsi)
- {
- /* no more children allowed by parent? */
- if (parent_wsi->mux.child_count + 1 > LWS_MQTT_MAX_CHILDREN) {
- lwsl_err("%s: reached concurrent stream limit\n", __func__);
- return NULL;
- }
- #if defined(LWS_WITH_CLIENT)
- wsi->client_mux_substream = 1;
- #endif
- lws_wsi_mux_insert(wsi, parent_wsi, wsi->mux.my_sid);
- if (lws_ensure_user_space(wsi))
- goto bail1;
- lws_mqtt_set_client_established(wsi);
- lws_callback_on_writable(wsi);
- #if defined(LWS_WITH_SERVER_STATUS)
- wsi->a.vhost->conn_stats.mqtt_subs++;
- #endif
- return wsi;
- bail1:
- /* undo the insert */
- parent_wsi->mux.child_list = wsi->mux.sibling_list;
- parent_wsi->mux.child_count--;
- if (wsi->user_space)
- lws_free_set_NULL(wsi->user_space);
- wsi->a.protocol->callback(wsi, LWS_CALLBACK_WSI_DESTROY, NULL, NULL, 0);
- lws_free(wsi);
- return NULL;
- }
|