sequencer.c 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. /*
  2. * libwebsockets - small server side websockets and web server implementation
  3. *
  4. * Copyright (C) 2010 - 2019 Andy Green <[email protected]>
  5. *
  6. * Permission is hereby granted, free of charge, to any person obtaining a copy
  7. * of this software and associated documentation files (the "Software"), to
  8. * deal in the Software without restriction, including without limitation the
  9. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10. * sell copies of the Software, and to permit persons to whom the Software is
  11. * furnished to do so, subject to the following conditions:
  12. *
  13. * The above copyright notice and this permission notice shall be included in
  14. * all copies or substantial portions of the Software.
  15. *
  16. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  21. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  22. * IN THE SOFTWARE.
  23. */
  24. #include "private-lib-core.h"
  25. /*
  26. * per pending event
  27. */
  28. typedef struct lws_seq_event {
  29. struct lws_dll2 seq_event_list;
  30. void *data;
  31. void *aux;
  32. lws_seq_events_t e;
  33. } lws_seq_event_t;
  34. /*
  35. * per sequencer
  36. */
  37. typedef struct lws_sequencer {
  38. struct lws_dll2 seq_list;
  39. lws_sorted_usec_list_t sul_timeout;
  40. lws_sorted_usec_list_t sul_pending;
  41. struct lws_dll2_owner seq_event_owner;
  42. struct lws_context_per_thread *pt;
  43. lws_seq_event_cb cb;
  44. const char *name;
  45. const lws_retry_bo_t *retry;
  46. lws_usec_t time_created;
  47. lws_usec_t timeout; /* 0 or time we timeout */
  48. uint8_t going_down:1;
  49. uint8_t wakesuspend:1;
  50. } lws_seq_t;
  51. #define QUEUE_SANITY_LIMIT 10
  52. static void
  53. lws_sul_seq_heartbeat_cb(lws_sorted_usec_list_t *sul)
  54. {
  55. struct lws_context_per_thread *pt = lws_container_of(sul,
  56. struct lws_context_per_thread, sul_seq_heartbeat);
  57. /* send every sequencer a heartbeat message... it can ignore it */
  58. lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp,
  59. lws_dll2_get_head(&pt->seq_owner)) {
  60. lws_seq_t *s = lws_container_of(p, lws_seq_t, seq_list);
  61. /* queue the message to inform the sequencer */
  62. lws_seq_queue_event(s, LWSSEQ_HEARTBEAT, NULL, NULL);
  63. } lws_end_foreach_dll_safe(p, tp);
  64. /* schedule the next one */
  65. __lws_sul_insert_us(&pt->pt_sul_owner[LWSSULLI_MISS_IF_SUSPENDED],
  66. &pt->sul_seq_heartbeat, LWS_US_PER_SEC);
  67. }
  68. int
  69. lws_seq_pt_init(struct lws_context_per_thread *pt)
  70. {
  71. pt->sul_seq_heartbeat.cb = lws_sul_seq_heartbeat_cb;
  72. /* schedule the first heartbeat */
  73. __lws_sul_insert_us(&pt->pt_sul_owner[LWSSULLI_MISS_IF_SUSPENDED],
  74. &pt->sul_seq_heartbeat, LWS_US_PER_SEC);
  75. return 0;
  76. }
  77. lws_seq_t *
  78. lws_seq_create(lws_seq_info_t *i)
  79. {
  80. struct lws_context_per_thread *pt = &i->context->pt[i->tsi];
  81. lws_seq_t *seq = lws_zalloc(sizeof(*seq) + i->user_size, __func__);
  82. if (!seq)
  83. return NULL;
  84. seq->cb = i->cb;
  85. seq->pt = pt;
  86. seq->name = i->name;
  87. seq->retry = i->retry;
  88. seq->wakesuspend = i->wakesuspend;
  89. *i->puser = (void *)&seq[1];
  90. /* add the sequencer to the pt */
  91. lws_pt_lock(pt, __func__); /* ---------------------------------- pt { */
  92. lws_dll2_add_tail(&seq->seq_list, &pt->seq_owner);
  93. lws_pt_unlock(pt); /* } pt ------------------------------------------ */
  94. seq->time_created = lws_now_usecs();
  95. /* try to queue the creation cb */
  96. if (lws_seq_queue_event(seq, LWSSEQ_CREATED, NULL, NULL)) {
  97. lws_dll2_remove(&seq->seq_list);
  98. lws_free(seq);
  99. return NULL;
  100. }
  101. return seq;
  102. }
  103. static int
  104. seq_ev_destroy(struct lws_dll2 *d, void *user)
  105. {
  106. lws_seq_event_t *seqe = lws_container_of(d, lws_seq_event_t,
  107. seq_event_list);
  108. lws_dll2_remove(&seqe->seq_event_list);
  109. lws_free(seqe);
  110. return 0;
  111. }
  112. void
  113. lws_seq_destroy(lws_seq_t **pseq)
  114. {
  115. lws_seq_t *seq = *pseq;
  116. /* defeat another thread racing to add events while we are destroying */
  117. seq->going_down = 1;
  118. seq->cb(seq, (void *)&seq[1], LWSSEQ_DESTROYED, NULL, NULL);
  119. lws_pt_lock(seq->pt, __func__); /* -------------------------- pt { */
  120. lws_dll2_remove(&seq->seq_list);
  121. lws_dll2_remove(&seq->sul_timeout.list);
  122. lws_dll2_remove(&seq->sul_pending.list);
  123. /* remove and destroy any pending events */
  124. lws_dll2_foreach_safe(&seq->seq_event_owner, NULL, seq_ev_destroy);
  125. lws_pt_unlock(seq->pt); /* } pt ---------------------------------- */
  126. lws_free_set_NULL(seq);
  127. }
  128. void
  129. lws_seq_destroy_all_on_pt(struct lws_context_per_thread *pt)
  130. {
  131. lws_start_foreach_dll_safe(struct lws_dll2 *, p, tp,
  132. pt->seq_owner.head) {
  133. lws_seq_t *s = lws_container_of(p, lws_seq_t,
  134. seq_list);
  135. lws_seq_destroy(&s);
  136. } lws_end_foreach_dll_safe(p, tp);
  137. }
  138. static void
  139. lws_seq_sul_pending_cb(lws_sorted_usec_list_t *sul)
  140. {
  141. lws_seq_t *seq = lws_container_of(sul, lws_seq_t, sul_pending);
  142. lws_seq_event_t *seqe;
  143. struct lws_dll2 *dh;
  144. int n;
  145. if (!seq->seq_event_owner.count)
  146. return;
  147. /* events are only added at tail, so no race possible yet... */
  148. dh = lws_dll2_get_head(&seq->seq_event_owner);
  149. seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list);
  150. n = seq->cb(seq, (void *)&seq[1], seqe->e, seqe->data, seqe->aux);
  151. /* ... have to lock here though, because we will change the list */
  152. lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
  153. /* detach event from sequencer event list and free it */
  154. lws_dll2_remove(&seqe->seq_event_list);
  155. lws_free(seqe);
  156. lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
  157. if (n) {
  158. lwsl_info("%s: destroying seq '%s' by request\n", __func__,
  159. seq->name);
  160. lws_seq_destroy(&seq);
  161. }
  162. }
  163. int
  164. lws_seq_queue_event(lws_seq_t *seq, lws_seq_events_t e, void *data, void *aux)
  165. {
  166. lws_seq_event_t *seqe;
  167. if (!seq || seq->going_down)
  168. return 1;
  169. seqe = lws_zalloc(sizeof(*seqe), __func__);
  170. if (!seqe)
  171. return 1;
  172. seqe->e = e;
  173. seqe->data = data;
  174. seqe->aux = aux;
  175. // lwsl_notice("%s: seq %s: event %d\n", __func__, seq->name, e);
  176. lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
  177. if (seq->seq_event_owner.count > QUEUE_SANITY_LIMIT) {
  178. lwsl_err("%s: more than %d events queued\n", __func__,
  179. QUEUE_SANITY_LIMIT);
  180. }
  181. lws_dll2_add_tail(&seqe->seq_event_list, &seq->seq_event_owner);
  182. seq->sul_pending.cb = lws_seq_sul_pending_cb;
  183. __lws_sul_insert_us(&seq->pt->pt_sul_owner[seq->wakesuspend],
  184. &seq->sul_pending, 1);
  185. lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
  186. return 0;
  187. }
  188. /*
  189. * Check if wsi still extant, by peeking in the message queue for a
  190. * LWSSEQ_WSI_CONN_CLOSE message about wsi. (Doesn't need to do the same for
  191. * CONN_FAIL since that will never have produced any messages prior to that).
  192. *
  193. * Use this to avoid trying to perform operations on wsi that have already
  194. * closed but we didn't get to that message yet.
  195. *
  196. * Returns 0 if not closed yet or 1 if it has closed but we didn't process the
  197. * close message yet.
  198. */
  199. int
  200. lws_seq_check_wsi(lws_seq_t *seq, struct lws *wsi)
  201. {
  202. lws_seq_event_t *seqe;
  203. struct lws_dll2 *dh;
  204. lws_pt_lock(seq->pt, __func__); /* ----------------------------- pt { */
  205. dh = lws_dll2_get_head(&seq->seq_event_owner);
  206. while (dh) {
  207. seqe = lws_container_of(dh, lws_seq_event_t, seq_event_list);
  208. if (seqe->e == LWSSEQ_WSI_CONN_CLOSE && seqe->data == wsi)
  209. break;
  210. dh = dh->next;
  211. }
  212. lws_pt_unlock(seq->pt); /* } pt ------------------------------------- */
  213. return !!dh;
  214. }
  215. static void
  216. lws_seq_sul_timeout_cb(lws_sorted_usec_list_t *sul)
  217. {
  218. lws_seq_t *s = lws_container_of(sul, lws_seq_t, sul_timeout);
  219. lws_seq_queue_event(s, LWSSEQ_TIMED_OUT, NULL, NULL);
  220. }
  221. /* set us to LWS_SET_TIMER_USEC_CANCEL to remove timeout */
  222. int
  223. lws_seq_timeout_us(lws_seq_t *seq, lws_usec_t us)
  224. {
  225. seq->sul_timeout.cb = lws_seq_sul_timeout_cb;
  226. /* list is always at the very top of the sul */
  227. __lws_sul_insert_us(&seq->pt->pt_sul_owner[seq->wakesuspend],
  228. (lws_sorted_usec_list_t *)&seq->sul_timeout.list, us);
  229. return 0;
  230. }
  231. lws_seq_t *
  232. lws_seq_from_user(void *u)
  233. {
  234. return &((lws_seq_t *)u)[-1];
  235. }
  236. const char *
  237. lws_seq_name(lws_seq_t *seq)
  238. {
  239. return seq->name;
  240. }
  241. lws_usec_t
  242. lws_seq_us_since_creation(lws_seq_t *seq)
  243. {
  244. return lws_now_usecs() - seq->time_created;
  245. }
  246. struct lws_context *
  247. lws_seq_get_context(lws_seq_t *seq)
  248. {
  249. return seq->pt->context;
  250. }