smd.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. /*
  2. * lws System Message Distribution
  3. *
  4. * Copyright (C) 2019 - 2020 Andy Green <[email protected]>
  5. *
  6. * Permission is hereby granted, free of charge, to any person obtaining a copy
  7. * of this software and associated documentation files (the "Software"), to
  8. * deal in the Software without restriction, including without limitation the
  9. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10. * sell copies of the Software, and to permit persons to whom the Software is
  11. * furnished to do so, subject to the following conditions:
  12. *
  13. * The above copyright notice and this permission notice shall be included in
  14. * all copies or substantial portions of the Software.
  15. *
  16. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  21. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  22. * IN THE SOFTWARE.
  23. */
  24. #include "private-lib-core.h"
  25. #include <assert.h>
  26. void *
  27. lws_smd_msg_alloc(struct lws_context *ctx, lws_smd_class_t _class, size_t len)
  28. {
  29. lws_smd_msg_t *msg;
  30. /* only allow it if someone wants to consume this class of event */
  31. if (!(ctx->smd._class_filter & _class)) {
  32. lwsl_info("%s: rejecting class 0x%x as no participant wants it\n", __func__,
  33. (unsigned int)_class);
  34. return NULL;
  35. }
  36. assert(len <= LWS_SMD_MAX_PAYLOAD);
  37. /*
  38. * If SS configured, over-allocate LWS_SMD_SS_RX_HEADER_LEN behind
  39. * payload, ie, msg_t (gap LWS_SMD_SS_RX_HEADER_LEN) payload
  40. */
  41. msg = lws_malloc(sizeof(*msg) + LWS_SMD_SS_RX_HEADER_LEN_EFF + len,
  42. __func__);
  43. if (!msg)
  44. return NULL;
  45. memset(msg, 0, sizeof(*msg));
  46. msg->timestamp = lws_now_usecs();
  47. msg->length = (uint16_t)len;
  48. msg->_class = _class;
  49. return ((uint8_t *)&msg[1]) + LWS_SMD_SS_RX_HEADER_LEN_EFF;
  50. }
  51. void
  52. lws_smd_msg_free(void **ppay)
  53. {
  54. lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)*ppay) -
  55. LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
  56. /* if SS configured, actual alloc is LWS_SMD_SS_RX_HEADER_LEN behind */
  57. lws_free(msg);
  58. *ppay = NULL;
  59. }
  60. /*
  61. * Figure out what to set the initial refcount for the message to
  62. */
  63. static int
  64. _lws_smd_msg_assess_peers_interested(lws_smd_t *smd, lws_smd_msg_t *msg)
  65. {
  66. struct lws_context *ctx = lws_container_of(smd, struct lws_context, smd);
  67. int interested = 0;
  68. lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
  69. lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
  70. /*
  71. * In order to optimize the tail managment into a refcount,
  72. * we have to account exactly for when peers arrived and
  73. * departed (including deferring the logical peer destruction
  74. * until no message pending he may have contributed to the
  75. * refcount of)
  76. */
  77. if (pr->timestamp_joined <= msg->timestamp &&
  78. (!pr->timestamp_left || /* if zombie, only contribute to
  79. * refcount if msg from before we
  80. * left */
  81. pr->timestamp_left >= msg->timestamp) &&
  82. (msg->_class & pr->_class_filter))
  83. /*
  84. * This peer wants to consume it
  85. */
  86. interested++;
  87. } lws_end_foreach_dll(p);
  88. return interested;
  89. }
  90. static int
  91. _lws_smd_class_mask_union(lws_smd_t *smd)
  92. {
  93. uint32_t mask = 0;
  94. lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
  95. smd->owner_peers.head) {
  96. lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
  97. /* may destroy pr if zombie */
  98. mask |= pr->_class_filter;
  99. } lws_end_foreach_dll_safe(p, p1);
  100. smd->_class_filter = mask;
  101. return 0;
  102. }
  103. int
  104. lws_smd_msg_send(struct lws_context *ctx, void *pay)
  105. {
  106. lws_smd_msg_t *msg = (lws_smd_msg_t *)(((uint8_t *)pay) -
  107. LWS_SMD_SS_RX_HEADER_LEN_EFF - sizeof(*msg));
  108. if (ctx->smd.owner_messages.count >= LWS_SMD_MAX_QUEUE_DEPTH)
  109. /* reject the message due to max queue depth reached */
  110. return 1;
  111. if (!ctx->smd.delivering)
  112. lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
  113. msg->refcount = _lws_smd_msg_assess_peers_interested(&ctx->smd, msg);
  114. lws_mutex_lock(ctx->smd.lock_messages); /* +++++++++++++++++ messages */
  115. lws_dll2_add_tail(&msg->list, &ctx->smd.owner_messages);
  116. lws_mutex_unlock(ctx->smd.lock_messages); /* --------------- messages */
  117. /*
  118. * Any peer with no active tail needs to check our class to see if we
  119. * should become his tail
  120. */
  121. lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
  122. lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
  123. if (!pr->tail && (pr->_class_filter & msg->_class))
  124. pr->tail = msg;
  125. } lws_end_foreach_dll(p);
  126. if (!ctx->smd.delivering)
  127. lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
  128. /* we may be happening from another thread context */
  129. lws_cancel_service(ctx);
  130. return 0;
  131. }
  132. int
  133. lws_smd_msg_printf(struct lws_context *ctx, lws_smd_class_t _class,
  134. const char *format, ...)
  135. {
  136. lws_smd_msg_t *msg;
  137. va_list ap;
  138. void *p;
  139. int n;
  140. if (!(ctx->smd._class_filter & _class))
  141. /*
  142. * There's nobody interested in messages of this class atm.
  143. * Don't bother generating it, and act like all is well.
  144. */
  145. return 0;
  146. va_start(ap, format);
  147. n = vsnprintf(NULL, 0, format, ap);
  148. va_end(ap);
  149. if (n > LWS_SMD_MAX_PAYLOAD)
  150. /* too large to send */
  151. return 1;
  152. p = lws_smd_msg_alloc(ctx, _class, (size_t)n + 2);
  153. if (!p)
  154. return 1;
  155. msg = (lws_smd_msg_t *)(((uint8_t *)p) - LWS_SMD_SS_RX_HEADER_LEN_EFF -
  156. sizeof(*msg));
  157. msg->length = (uint16_t)n;
  158. va_start(ap, format);
  159. vsnprintf((char*)p, n + 2, format, ap);
  160. va_end(ap);
  161. /*
  162. * locks taken and released in here
  163. */
  164. if (lws_smd_msg_send(ctx, p)) {
  165. lws_smd_msg_free(&p);
  166. return 1;
  167. }
  168. return 0;
  169. }
  170. static void
  171. _lws_smd_peer_finalize_destroy(lws_smd_peer_t *pr)
  172. {
  173. lws_dll2_remove(&pr->list);
  174. lws_free(pr);
  175. }
  176. /*
  177. * Peers that deregister may need to hang around as zombies, so they account
  178. * for refcounts on messages they already contributed to. Because older
  179. * messages may be in flight over UDS links, we have to stick around and make
  180. * sure all cases have their refcount handled correctly.
  181. */
  182. static void
  183. _lws_smd_peer_zombify(lws_smd_peer_t *pr)
  184. {
  185. lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t,
  186. owner_peers);
  187. /* update the class mask union to reflect this peer no longer active */
  188. _lws_smd_class_mask_union(smd);
  189. pr->timestamp_left = lws_now_usecs();
  190. }
  191. static lws_smd_msg_t *
  192. _lws_smd_msg_next_matching_filter(lws_dll2_t *tail, lws_smd_class_t filter)
  193. {
  194. lws_smd_msg_t *msg;
  195. do {
  196. tail = tail->next;
  197. if (!tail)
  198. return NULL;
  199. msg = lws_container_of(tail, lws_smd_msg_t, list);
  200. if (msg->_class & filter)
  201. return msg;
  202. } while (1);
  203. return NULL;
  204. }
  205. /*
  206. * Note: May destroy zombie peers when it sees grace period has expired.
  207. *
  208. * Delivers only one message to the peer and advances the tail, or sets to NULL
  209. * if no more filtered queued messages. Returns nonzero if tail non-NULL.
  210. *
  211. * For Proxied SS, only asks for writeable and does not advance or change the
  212. * tail.
  213. *
  214. * This is done so if multiple messages queued, we don't get a situation where
  215. * one participant gets them all spammed, then the next etc. Instead they are
  216. * delivered round-robin.
  217. */
  218. static int
  219. _lws_smd_msg_deliver_peer(struct lws_context *ctx, lws_smd_peer_t *pr)
  220. {
  221. lws_smd_msg_t *msg;
  222. if (!pr->tail)
  223. return 0;
  224. msg = lws_container_of(pr->tail, lws_smd_msg_t, list);
  225. /*
  226. * Check if zombie peer and the message predates our leaving
  227. */
  228. if (pr->timestamp_left &&
  229. msg->timestamp > pr->timestamp_left) {
  230. /*
  231. * We do not need to modify message refcount, if it was
  232. * generated after we became a zombie, and so we
  233. * definitely did not contribute to its refcount...
  234. *
  235. * ...have we waited out the grace period?
  236. */
  237. if (lws_now_usecs() - pr->timestamp_left >
  238. LWS_SMD_INFLIGHT_GRACE_SECS * LWS_US_PER_SEC)
  239. /*
  240. * ... ok, it's time for the zombie to abandon
  241. * its attachment to the Earth and rejoin the
  242. * cosmic mandela
  243. */
  244. _lws_smd_peer_finalize_destroy(pr);
  245. /* ... either way, nothing further to do for this guy */
  246. return 0;
  247. }
  248. if (!pr->timestamp_left) {
  249. /*
  250. * Peer is not a zombie... deliver the tail
  251. */
  252. #if 0
  253. if (pr->type == LSMDT_SECURE_STREAMS_PROXIED) {
  254. #if defined(LWS_WITH_SECURE_STREAMS)
  255. if (pr->ss_handle)
  256. lws_ss_request_tx(pr->ss_handle);
  257. #endif
  258. return 0;
  259. }
  260. #endif
  261. pr->cb(pr->opaque, msg->_class, msg->timestamp,
  262. ((uint8_t *)&msg[1]) +
  263. LWS_SMD_SS_RX_HEADER_LEN_EFF,
  264. (size_t)msg->length);
  265. }
  266. assert(msg->refcount);
  267. /*
  268. * If there is one, move forward to the next queued
  269. * message that meets our filters
  270. */
  271. pr->tail = _lws_smd_msg_next_matching_filter(
  272. &pr->tail->list, pr->_class_filter);
  273. if (!--msg->refcount) {
  274. /*
  275. * We have fully delivered the message now, it
  276. * can be unlinked and destroyed
  277. */
  278. lws_dll2_remove(&msg->list);
  279. lws_free(msg);
  280. }
  281. /*
  282. * Wait out the grace period even if no live messages
  283. * for a zombie peer... there may be some in flight
  284. */
  285. return !!pr->tail;
  286. }
  287. /*
  288. * Called when the event loop could deliver messages synchronously, eg, on
  289. * entry to idle
  290. */
  291. int
  292. lws_smd_msg_distribute(struct lws_context *ctx)
  293. {
  294. char more;
  295. /* commonly, no messages and nothing to do... */
  296. if (!ctx->smd.owner_messages.count)
  297. return 0;
  298. ctx->smd.delivering = 1;
  299. do {
  300. more = 0;
  301. lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
  302. lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
  303. ctx->smd.owner_peers.head) {
  304. lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
  305. /* may destroy pr if zombie, hence _safe iterator */
  306. more |= _lws_smd_msg_deliver_peer(ctx, pr);
  307. } lws_end_foreach_dll_safe(p, p1);
  308. lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
  309. } while (more);
  310. ctx->smd.delivering = 0;
  311. return 0;
  312. }
  313. struct lws_smd_peer *
  314. lws_smd_register(struct lws_context *ctx, void *opaque, int flags,
  315. lws_smd_class_t _class_filter, lws_smd_notification_cb_t cb)
  316. {
  317. lws_smd_peer_t *pr = lws_zalloc(sizeof(*pr), __func__);
  318. if (!pr)
  319. return NULL;
  320. pr->cb = cb;
  321. pr->opaque = opaque;
  322. pr->_class_filter = _class_filter;
  323. pr->timestamp_joined = lws_now_usecs();
  324. /*
  325. * Figure out the type of peer from the situation...
  326. */
  327. #if 0
  328. #if defined(LWS_WITH_SECURE_STREAMS)
  329. if (!ctx->smd.listen_vh) {
  330. /*
  331. * The guy who is regsitering is actually a SS proxy link
  332. * between a client and SMD
  333. */
  334. } else
  335. #endif
  336. #endif
  337. pr->type = LSMDT_SAME_PROCESS;
  338. if (!ctx->smd.delivering)
  339. lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++ peers */
  340. lws_dll2_add_tail(&pr->list, &ctx->smd.owner_peers);
  341. /* update the global class mask union to account for new peer mask */
  342. _lws_smd_class_mask_union(&ctx->smd);
  343. if (!ctx->smd.delivering)
  344. lws_mutex_unlock(ctx->smd.lock_peers); /* ------------- peers */
  345. lwsl_debug("%s: registered\n", __func__);
  346. return pr;
  347. }
  348. void
  349. lws_smd_unregister(struct lws_smd_peer *pr)
  350. {
  351. lws_smd_t *smd = lws_container_of(pr->list.owner, lws_smd_t, owner_peers);
  352. lws_mutex_lock(smd->lock_peers); /* +++++++++++++++++++++++++++ peers */
  353. _lws_smd_peer_zombify(pr);
  354. lws_mutex_unlock(smd->lock_peers); /* ------------------------- peers */
  355. }
  356. int
  357. lws_smd_message_pending(struct lws_context *ctx)
  358. {
  359. int ret = 1;
  360. /*
  361. * First cheaply check the common case no messages pending, so there's
  362. * definitely nothing for this tsi or anything else
  363. */
  364. if (!ctx->smd.owner_messages.count)
  365. return 0;
  366. /*
  367. * Walk the peer list
  368. */
  369. lws_mutex_lock(ctx->smd.lock_peers); /* +++++++++++++++++++++++ peers */
  370. lws_start_foreach_dll(struct lws_dll2 *, p, ctx->smd.owner_peers.head) {
  371. lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
  372. if (pr->tail && pr->type == LSMDT_SAME_PROCESS)
  373. goto bail;
  374. } lws_end_foreach_dll(p);
  375. /*
  376. * There's no message pending that we need to handle
  377. */
  378. ret = 0;
  379. bail:
  380. lws_mutex_unlock(ctx->smd.lock_peers); /* --------------------- peers */
  381. return ret;
  382. }
  383. int
  384. _lws_smd_destroy(struct lws_context *ctx)
  385. {
  386. /* stop any message creation */
  387. ctx->smd._class_filter = 0;
  388. /*
  389. * Walk the message list, destroying them
  390. */
  391. lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
  392. ctx->smd.owner_messages.head) {
  393. lws_smd_msg_t *msg = lws_container_of(p, lws_smd_msg_t, list);
  394. lws_free(msg);
  395. } lws_end_foreach_dll_safe(p, p1);
  396. lws_mutex_destroy(ctx->smd.lock_messages);
  397. /*
  398. * Walk the peer list, destroying them
  399. */
  400. lws_start_foreach_dll_safe(struct lws_dll2 *, p, p1,
  401. ctx->smd.owner_peers.head) {
  402. lws_smd_peer_t *pr = lws_container_of(p, lws_smd_peer_t, list);
  403. _lws_smd_peer_finalize_destroy(pr);
  404. } lws_end_foreach_dll_safe(p, p1);
  405. lws_mutex_destroy(ctx->smd.lock_peers);
  406. return 0;
  407. }