threadpool.c 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180
  1. /*
  2. * libwebsockets - small server side websockets and web server implementation
  3. *
  4. * Copyright (C) 2010 - 2020 Andy Green <[email protected]>
  5. *
  6. * Permission is hereby granted, free of charge, to any person obtaining a copy
  7. * of this software and associated documentation files (the "Software"), to
  8. * deal in the Software without restriction, including without limitation the
  9. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10. * sell copies of the Software, and to permit persons to whom the Software is
  11. * furnished to do so, subject to the following conditions:
  12. *
  13. * The above copyright notice and this permission notice shall be included in
  14. * all copies or substantial portions of the Software.
  15. *
  16. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  21. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  22. * IN THE SOFTWARE.
  23. */
  24. #if !defined(_GNU_SOURCE)
  25. #define _GNU_SOURCE
  26. #endif
  27. #if defined(WIN32)
  28. #define HAVE_STRUCT_TIMESPEC
  29. #if defined(pid_t)
  30. #undef pid_t
  31. #endif
  32. #endif
  33. #include <pthread.h>
  34. #include "private-lib-core.h"
  35. #include <string.h>
  36. #include <stdio.h>
  37. struct lws_threadpool;
  38. struct lws_threadpool_task {
  39. struct lws_threadpool_task *task_queue_next;
  40. struct lws_threadpool *tp;
  41. char name[32];
  42. struct lws_threadpool_task_args args;
  43. lws_dll2_t list;
  44. lws_usec_t created;
  45. lws_usec_t acquired;
  46. lws_usec_t done;
  47. lws_usec_t entered_state;
  48. lws_usec_t acc_running;
  49. lws_usec_t acc_syncing;
  50. pthread_cond_t wake_idle;
  51. enum lws_threadpool_task_status status;
  52. int late_sync_retries;
  53. char wanted_writeable_cb;
  54. char outlive;
  55. };
  56. struct lws_pool {
  57. struct lws_threadpool *tp;
  58. pthread_t thread;
  59. pthread_mutex_t lock; /* part of task wake_idle */
  60. struct lws_threadpool_task *task;
  61. lws_usec_t acquired;
  62. int worker_index;
  63. };
  64. struct lws_threadpool {
  65. pthread_mutex_t lock; /* protects all pool lists */
  66. pthread_cond_t wake_idle;
  67. struct lws_pool *pool_list;
  68. struct lws_context *context;
  69. struct lws_threadpool *tp_list; /* context list of threadpools */
  70. struct lws_threadpool_task *task_queue_head;
  71. struct lws_threadpool_task *task_done_head;
  72. char name[32];
  73. int threads_in_pool;
  74. int queue_depth;
  75. int done_queue_depth;
  76. int max_queue_depth;
  77. int running_tasks;
  78. unsigned int destroying:1;
  79. };
  80. static int
  81. ms_delta(lws_usec_t now, lws_usec_t then)
  82. {
  83. return (int)((now - then) / 1000);
  84. }
  85. static void
  86. us_accrue(lws_usec_t *acc, lws_usec_t then)
  87. {
  88. lws_usec_t now = lws_now_usecs();
  89. *acc += now - then;
  90. }
  91. static int
  92. pc_delta(lws_usec_t now, lws_usec_t then, lws_usec_t us)
  93. {
  94. lws_usec_t delta = (now - then) + 1;
  95. return (int)((us * 100) / delta);
  96. }
  97. static void
  98. __lws_threadpool_task_dump(struct lws_threadpool_task *task, char *buf, int len)
  99. {
  100. lws_usec_t now = lws_now_usecs();
  101. char *end = buf + len - 1;
  102. int syncms = 0, runms = 0;
  103. if (!task->acquired) {
  104. buf += lws_snprintf(buf, end - buf,
  105. "task: %s, QUEUED queued: %dms",
  106. task->name, ms_delta(now, task->created));
  107. return;
  108. }
  109. if (task->acc_running)
  110. runms = (int)task->acc_running;
  111. if (task->acc_syncing)
  112. syncms = (int)task->acc_syncing;
  113. if (!task->done) {
  114. buf += lws_snprintf(buf, end - buf,
  115. "task: %s, ONGOING state %d (%dms) alive: %dms "
  116. "(queued %dms, acquired: %dms, "
  117. "run: %d%%, sync: %d%%)", task->name, task->status,
  118. ms_delta(now, task->entered_state),
  119. ms_delta(now, task->created),
  120. ms_delta(task->acquired, task->created),
  121. ms_delta(now, task->acquired),
  122. pc_delta(now, task->acquired, runms),
  123. pc_delta(now, task->acquired, syncms));
  124. return;
  125. }
  126. lws_snprintf(buf, end - buf,
  127. "task: %s, DONE state %d lived: %dms "
  128. "(queued %dms, on thread: %dms, "
  129. "ran: %d%%, synced: %d%%)", task->name, task->status,
  130. ms_delta(task->done, task->created),
  131. ms_delta(task->acquired, task->created),
  132. ms_delta(task->done, task->acquired),
  133. pc_delta(task->done, task->acquired, runms),
  134. pc_delta(task->done, task->acquired, syncms));
  135. }
  136. void
  137. lws_threadpool_dump(struct lws_threadpool *tp)
  138. {
  139. #if 0
  140. //defined(_DEBUG)
  141. struct lws_threadpool_task **c;
  142. char buf[160];
  143. int n, count;
  144. pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
  145. lwsl_thread("%s: tp: %s, Queued: %d, Run: %d, Done: %d\n", __func__,
  146. tp->name, tp->queue_depth, tp->running_tasks,
  147. tp->done_queue_depth);
  148. count = 0;
  149. c = &tp->task_queue_head;
  150. while (*c) {
  151. struct lws_threadpool_task *task = *c;
  152. __lws_threadpool_task_dump(task, buf, sizeof(buf));
  153. lwsl_thread(" - %s\n", buf);
  154. count++;
  155. c = &(*c)->task_queue_next;
  156. }
  157. if (count != tp->queue_depth)
  158. lwsl_err("%s: tp says queue depth %d, but actually %d\n",
  159. __func__, tp->queue_depth, count);
  160. count = 0;
  161. for (n = 0; n < tp->threads_in_pool; n++) {
  162. struct lws_pool *pool = &tp->pool_list[n];
  163. struct lws_threadpool_task *task = pool->task;
  164. if (task) {
  165. __lws_threadpool_task_dump(task, buf, sizeof(buf));
  166. lwsl_thread(" - worker %d: %s\n", n, buf);
  167. count++;
  168. }
  169. }
  170. if (count != tp->running_tasks)
  171. lwsl_err("%s: tp says %d running_tasks, but actually %d\n",
  172. __func__, tp->running_tasks, count);
  173. count = 0;
  174. c = &tp->task_done_head;
  175. while (*c) {
  176. struct lws_threadpool_task *task = *c;
  177. __lws_threadpool_task_dump(task, buf, sizeof(buf));
  178. lwsl_thread(" - %s\n", buf);
  179. count++;
  180. c = &(*c)->task_queue_next;
  181. }
  182. if (count != tp->done_queue_depth)
  183. lwsl_err("%s: tp says done_queue_depth %d, but actually %d\n",
  184. __func__, tp->done_queue_depth, count);
  185. pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
  186. #endif
  187. }
  188. static void
  189. state_transition(struct lws_threadpool_task *task,
  190. enum lws_threadpool_task_status status)
  191. {
  192. task->entered_state = lws_now_usecs();
  193. task->status = status;
  194. }
  195. static struct lws *
  196. task_to_wsi(struct lws_threadpool_task *task)
  197. {
  198. #if defined(LWS_WITH_SECURE_STREAMS)
  199. if (task->args.ss)
  200. return task->args.ss->wsi;
  201. #endif
  202. return task->args.wsi;
  203. }
  204. static void
  205. lws_threadpool_task_cleanup_destroy(struct lws_threadpool_task *task)
  206. {
  207. if (task->args.cleanup)
  208. task->args.cleanup(task_to_wsi(task), task->args.user);
  209. lws_dll2_remove(&task->list);
  210. lwsl_thread("%s: tp %p: cleaned finished task for wsi %p\n",
  211. __func__, task->tp, task_to_wsi(task));
  212. lws_free(task);
  213. }
  214. static void
  215. __lws_threadpool_reap(struct lws_threadpool_task *task)
  216. {
  217. struct lws_threadpool_task **c, *t = NULL;
  218. struct lws_threadpool *tp = task->tp;
  219. /* remove the task from the done queue */
  220. if (tp) {
  221. c = &tp->task_done_head;
  222. while (*c) {
  223. if ((*c) == task) {
  224. t = *c;
  225. *c = t->task_queue_next;
  226. t->task_queue_next = NULL;
  227. tp->done_queue_depth--;
  228. lwsl_thread("%s: tp %s: reaped task wsi %p\n", __func__,
  229. tp->name, task_to_wsi(task));
  230. break;
  231. }
  232. c = &(*c)->task_queue_next;
  233. }
  234. if (!t) {
  235. lwsl_err("%s: task %p not in done queue\n", __func__, task);
  236. /*
  237. * This shouldn't occur, but in this case not really
  238. * safe to assume there's a task to destroy
  239. */
  240. return;
  241. }
  242. } else
  243. lwsl_err("%s: task->tp NULL already\n", __func__);
  244. /* call the task's cleanup and delete the task itself */
  245. lws_threadpool_task_cleanup_destroy(task);
  246. }
  247. /*
  248. * this gets called from each tsi service context after the service was
  249. * cancelled... we need to ask for the writable callback from the matching
  250. * tsi context for any wsis bound to a worked thread that need it
  251. */
  252. int
  253. lws_threadpool_tsi_context(struct lws_context *context, int tsi)
  254. {
  255. struct lws_threadpool_task **c, *task = NULL;
  256. struct lws_threadpool *tp;
  257. struct lws *wsi;
  258. lws_context_lock(context, __func__);
  259. tp = context->tp_list_head;
  260. while (tp) {
  261. int n;
  262. /* for the running (syncing...) tasks... */
  263. for (n = 0; n < tp->threads_in_pool; n++) {
  264. struct lws_pool *pool = &tp->pool_list[n];
  265. task = pool->task;
  266. if (!task)
  267. continue;
  268. wsi = task_to_wsi(task);
  269. if (!wsi || wsi->tsi != tsi ||
  270. (!task->wanted_writeable_cb &&
  271. task->status != LWS_TP_STATUS_SYNCING))
  272. continue;
  273. task->wanted_writeable_cb = 0;
  274. lws_memory_barrier();
  275. /*
  276. * finally... we can ask for the callback on
  277. * writable from the correct service thread
  278. * context
  279. */
  280. lws_callback_on_writable(wsi);
  281. }
  282. /* for the done tasks... */
  283. c = &tp->task_done_head;
  284. while (*c) {
  285. task = *c;
  286. wsi = task_to_wsi(task);
  287. if (wsi && wsi->tsi == tsi &&
  288. (task->wanted_writeable_cb ||
  289. task->status == LWS_TP_STATUS_SYNCING)) {
  290. task->wanted_writeable_cb = 0;
  291. lws_memory_barrier();
  292. /*
  293. * finally... we can ask for the callback on
  294. * writable from the correct service thread
  295. * context
  296. */
  297. lws_callback_on_writable(wsi);
  298. }
  299. c = &task->task_queue_next;
  300. }
  301. tp = tp->tp_list;
  302. }
  303. lws_context_unlock(context);
  304. return 0;
  305. }
  306. static int
  307. lws_threadpool_worker_sync(struct lws_pool *pool,
  308. struct lws_threadpool_task *task)
  309. {
  310. enum lws_threadpool_task_status temp;
  311. struct timespec abstime;
  312. struct lws *wsi;
  313. int tries = 15;
  314. /* block until writable acknowledges */
  315. lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC in\n", __func__, task);
  316. pthread_mutex_lock(&pool->lock); /* ======================= pool lock */
  317. lwsl_info("%s: %s: task %p (%s): syncing with wsi %p\n", __func__,
  318. pool->tp->name, task, task->name, task_to_wsi(task));
  319. temp = task->status;
  320. state_transition(task, LWS_TP_STATUS_SYNCING);
  321. while (tries--) {
  322. wsi = task_to_wsi(task);
  323. /*
  324. * if the wsi is no longer attached to this task, there is
  325. * nothing we can sync to usefully. Since the work wants to
  326. * sync, it means we should react to the situation by telling
  327. * the task it can't continue usefully by stopping it.
  328. */
  329. if (!wsi) {
  330. lwsl_thread("%s: %s: task %p (%s): No longer bound to any "
  331. "wsi to sync to\n", __func__, pool->tp->name,
  332. task, task->name);
  333. state_transition(task, LWS_TP_STATUS_STOPPING);
  334. goto done;
  335. }
  336. /*
  337. * So tries times this is the maximum time between SYNC asking
  338. * for a callback on writable and actually getting it we are
  339. * willing to sit still for.
  340. *
  341. * If it is exceeded, we will stop the task.
  342. */
  343. abstime.tv_sec = time(NULL) + 2;
  344. abstime.tv_nsec = 0;
  345. task->wanted_writeable_cb = 1;
  346. lws_memory_barrier();
  347. /*
  348. * This will cause lws_threadpool_tsi_context() to get called
  349. * from each tsi service context, where we can safely ask for
  350. * a callback on writeable on the wsi we are associated with.
  351. */
  352. lws_cancel_service(lws_get_context(wsi));
  353. /*
  354. * so the danger here is that we asked for a writable callback
  355. * on the wsi, but for whatever reason, we are never going to
  356. * get one. To avoid deadlocking forever, we allow a set time
  357. * for the sync to happen naturally, otherwise the cond wait
  358. * times out and we stop the task.
  359. */
  360. if (pthread_cond_timedwait(&task->wake_idle, &pool->lock,
  361. &abstime) == ETIMEDOUT) {
  362. task->late_sync_retries++;
  363. if (!tries) {
  364. lwsl_err("%s: %s: task %p (%s): SYNC timed out "
  365. "(associated wsi %p)\n",
  366. __func__, pool->tp->name, task,
  367. task->name, task_to_wsi(task));
  368. state_transition(task, LWS_TP_STATUS_STOPPING);
  369. goto done;
  370. }
  371. continue;
  372. } else
  373. break;
  374. }
  375. if (task->status == LWS_TP_STATUS_SYNCING)
  376. state_transition(task, temp);
  377. lwsl_debug("%s: %p: LWS_TP_RETURN_SYNC out\n", __func__, task);
  378. done:
  379. pthread_mutex_unlock(&pool->lock); /* ----------------- - pool unlock */
  380. return 0;
  381. }
  382. #if !defined(WIN32)
  383. static int dummy;
  384. #endif
  385. static void *
  386. lws_threadpool_worker(void *d)
  387. {
  388. struct lws_threadpool_task **c, **c2, *task;
  389. struct lws_pool *pool = d;
  390. struct lws_threadpool *tp = pool->tp;
  391. char buf[160];
  392. while (!tp->destroying) {
  393. /* we have no running task... wait and get one from the queue */
  394. pthread_mutex_lock(&tp->lock); /* =================== tp lock */
  395. /*
  396. * if there's no task already waiting in the queue, wait for
  397. * the wake_idle condition to signal us that might have changed
  398. */
  399. while (!tp->task_queue_head && !tp->destroying)
  400. pthread_cond_wait(&tp->wake_idle, &tp->lock);
  401. if (tp->destroying) {
  402. lwsl_notice("%s: bailing\n", __func__);
  403. goto doneski;
  404. }
  405. c = &tp->task_queue_head;
  406. c2 = NULL;
  407. task = NULL;
  408. pool->task = NULL;
  409. /* look at the queue tail */
  410. while (*c) {
  411. c2 = c;
  412. c = &(*c)->task_queue_next;
  413. }
  414. /* is there a task at the queue tail? */
  415. if (c2 && *c2) {
  416. pool->task = task = *c2;
  417. task->acquired = pool->acquired = lws_now_usecs();
  418. /* remove it from the queue */
  419. *c2 = task->task_queue_next;
  420. task->task_queue_next = NULL;
  421. tp->queue_depth--;
  422. /* mark it as running */
  423. state_transition(task, LWS_TP_STATUS_RUNNING);
  424. }
  425. /* someone else got it first... wait and try again */
  426. if (!task) {
  427. pthread_mutex_unlock(&tp->lock); /* ------ tp unlock */
  428. continue;
  429. }
  430. task->wanted_writeable_cb = 0;
  431. /* we have acquired a new task */
  432. __lws_threadpool_task_dump(task, buf, sizeof(buf));
  433. lwsl_thread("%s: %s: worker %d ACQUIRING: %s\n",
  434. __func__, tp->name, pool->worker_index, buf);
  435. tp->running_tasks++;
  436. pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
  437. /*
  438. * 1) The task can return with LWS_TP_RETURN_CHECKING_IN to
  439. * "resurface" periodically, and get called again with
  440. * cont = 1 immediately to indicate it is picking up where it
  441. * left off if the task is not being "stopped".
  442. *
  443. * This allows long tasks to respond to requests to stop in
  444. * a clean and opaque way.
  445. *
  446. * 2) The task can return with LWS_TP_RETURN_SYNC to register
  447. * a "callback on writable" request on the service thread and
  448. * block until it hears back from the WRITABLE handler.
  449. *
  450. * This allows the work on the thread to be synchronized to the
  451. * previous work being dispatched cleanly.
  452. *
  453. * 3) The task can return with LWS_TP_RETURN_FINISHED to
  454. * indicate its work is completed nicely.
  455. *
  456. * 4) The task can return with LWS_TP_RETURN_STOPPED to indicate
  457. * it stopped and cleaned up after incomplete work.
  458. */
  459. do {
  460. lws_usec_t then;
  461. int n;
  462. if (tp->destroying || !task_to_wsi(task)) {
  463. lwsl_info("%s: stopping on wsi gone\n", __func__);
  464. state_transition(task, LWS_TP_STATUS_STOPPING);
  465. }
  466. then = lws_now_usecs();
  467. n = task->args.task(task->args.user, task->status);
  468. lwsl_debug(" %d, status %d\n", n, task->status);
  469. us_accrue(&task->acc_running, then);
  470. if (n & LWS_TP_RETURN_FLAG_OUTLIVE)
  471. task->outlive = 1;
  472. switch (n & 7) {
  473. case LWS_TP_RETURN_CHECKING_IN:
  474. /* if not destroying the tp, continue */
  475. break;
  476. case LWS_TP_RETURN_SYNC:
  477. if (!task_to_wsi(task)) {
  478. lwsl_debug("%s: task that wants to "
  479. "outlive lost wsi asked "
  480. "to sync: bypassed\n",
  481. __func__);
  482. break;
  483. }
  484. /* block until writable acknowledges */
  485. then = lws_now_usecs();
  486. lws_threadpool_worker_sync(pool, task);
  487. us_accrue(&task->acc_syncing, then);
  488. break;
  489. case LWS_TP_RETURN_FINISHED:
  490. state_transition(task, LWS_TP_STATUS_FINISHED);
  491. break;
  492. case LWS_TP_RETURN_STOPPED:
  493. state_transition(task, LWS_TP_STATUS_STOPPED);
  494. break;
  495. }
  496. } while (task->status == LWS_TP_STATUS_RUNNING);
  497. pthread_mutex_lock(&tp->lock); /* =================== tp lock */
  498. tp->running_tasks--;
  499. if (pool->task->status == LWS_TP_STATUS_STOPPING)
  500. state_transition(task, LWS_TP_STATUS_STOPPED);
  501. /* move the task to the done queue */
  502. pool->task->task_queue_next = tp->task_done_head;
  503. tp->task_done_head = task;
  504. tp->done_queue_depth++;
  505. pool->task->done = lws_now_usecs();
  506. if (!pool->task->args.wsi &&
  507. (pool->task->status == LWS_TP_STATUS_STOPPED ||
  508. pool->task->status == LWS_TP_STATUS_FINISHED)) {
  509. __lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
  510. lwsl_thread("%s: %s: worker %d REAPING: %s\n",
  511. __func__, tp->name, pool->worker_index,
  512. buf);
  513. /*
  514. * there is no longer any wsi attached, so nothing is
  515. * going to take care of reaping us. So we must take
  516. * care of it ourselves.
  517. */
  518. __lws_threadpool_reap(pool->task);
  519. } else {
  520. __lws_threadpool_task_dump(pool->task, buf, sizeof(buf));
  521. lwsl_thread("%s: %s: worker %d DONE: %s\n",
  522. __func__, tp->name, pool->worker_index,
  523. buf);
  524. /* signal the associated wsi to take a fresh look at
  525. * task status */
  526. if (task_to_wsi(pool->task)) {
  527. task->wanted_writeable_cb = 1;
  528. lws_cancel_service(
  529. lws_get_context(task_to_wsi(pool->task)));
  530. }
  531. }
  532. doneski:
  533. pool->task = NULL;
  534. pthread_mutex_unlock(&tp->lock); /* --------------- tp unlock */
  535. }
  536. lwsl_notice("%s: Exiting\n", __func__);
  537. /* threadpool is being destroyed */
  538. #if !defined(WIN32)
  539. pthread_exit(&dummy);
  540. #endif
  541. return NULL;
  542. }
  543. struct lws_threadpool *
  544. lws_threadpool_create(struct lws_context *context,
  545. const struct lws_threadpool_create_args *args,
  546. const char *format, ...)
  547. {
  548. struct lws_threadpool *tp;
  549. va_list ap;
  550. int n;
  551. tp = lws_malloc(sizeof(*tp) + (sizeof(struct lws_pool) * args->threads),
  552. "threadpool alloc");
  553. if (!tp)
  554. return NULL;
  555. memset(tp, 0, sizeof(*tp) + (sizeof(struct lws_pool) * args->threads));
  556. tp->pool_list = (struct lws_pool *)(tp + 1);
  557. tp->max_queue_depth = args->max_queue_depth;
  558. va_start(ap, format);
  559. n = vsnprintf(tp->name, sizeof(tp->name) - 1, format, ap);
  560. va_end(ap);
  561. lws_context_lock(context, __func__);
  562. tp->context = context;
  563. tp->tp_list = context->tp_list_head;
  564. context->tp_list_head = tp;
  565. lws_context_unlock(context);
  566. pthread_mutex_init(&tp->lock, NULL);
  567. pthread_cond_init(&tp->wake_idle, NULL);
  568. for (n = 0; n < args->threads; n++) {
  569. #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
  570. char name[16];
  571. #endif
  572. tp->pool_list[n].tp = tp;
  573. tp->pool_list[n].worker_index = n;
  574. pthread_mutex_init(&tp->pool_list[n].lock, NULL);
  575. if (pthread_create(&tp->pool_list[n].thread, NULL,
  576. lws_threadpool_worker, &tp->pool_list[n])) {
  577. lwsl_err("thread creation failed\n");
  578. } else {
  579. #if defined(LWS_HAS_PTHREAD_SETNAME_NP)
  580. lws_snprintf(name, sizeof(name), "%s-%d", tp->name, n);
  581. pthread_setname_np(tp->pool_list[n].thread, name);
  582. #endif
  583. tp->threads_in_pool++;
  584. }
  585. }
  586. return tp;
  587. }
  588. void
  589. lws_threadpool_finish(struct lws_threadpool *tp)
  590. {
  591. struct lws_threadpool_task **c, *task;
  592. pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
  593. /* nothing new can start, running jobs will abort as STOPPED and the
  594. * pool threads will exit ASAP (they are joined in destroy) */
  595. tp->destroying = 1;
  596. /* stop everyone in the pending queue and move to the done queue */
  597. c = &tp->task_queue_head;
  598. while (*c) {
  599. task = *c;
  600. *c = task->task_queue_next;
  601. task->task_queue_next = tp->task_done_head;
  602. tp->task_done_head = task;
  603. state_transition(task, LWS_TP_STATUS_STOPPED);
  604. tp->queue_depth--;
  605. tp->done_queue_depth++;
  606. task->done = lws_now_usecs();
  607. c = &task->task_queue_next;
  608. }
  609. pthread_cond_broadcast(&tp->wake_idle);
  610. pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
  611. }
  612. void
  613. lws_threadpool_destroy(struct lws_threadpool *tp)
  614. {
  615. struct lws_threadpool_task *task, *next;
  616. struct lws_threadpool **ptp;
  617. void *retval;
  618. int n;
  619. /* remove us from the context list of threadpools */
  620. lws_context_lock(tp->context, __func__);
  621. ptp = &tp->context->tp_list_head;
  622. while (*ptp) {
  623. if (*ptp == tp) {
  624. *ptp = tp->tp_list;
  625. break;
  626. }
  627. ptp = &(*ptp)->tp_list;
  628. }
  629. lws_context_unlock(tp->context);
  630. /*
  631. * Wake up the threadpool guys and tell them to exit
  632. */
  633. pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
  634. tp->destroying = 1;
  635. pthread_cond_broadcast(&tp->wake_idle);
  636. pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
  637. lws_threadpool_dump(tp);
  638. lwsl_info("%s: waiting for threads to rejoin\n", __func__);
  639. #if defined(WIN32)
  640. Sleep(1000);
  641. #endif
  642. for (n = 0; n < tp->threads_in_pool; n++) {
  643. task = tp->pool_list[n].task;
  644. pthread_join(tp->pool_list[n].thread, &retval);
  645. pthread_mutex_destroy(&tp->pool_list[n].lock);
  646. }
  647. lwsl_info("%s: all threadpools exited\n", __func__);
  648. #if defined(WIN32)
  649. Sleep(1000);
  650. #endif
  651. task = tp->task_done_head;
  652. while (task) {
  653. next = task->task_queue_next;
  654. lws_threadpool_task_cleanup_destroy(task);
  655. tp->done_queue_depth--;
  656. task = next;
  657. }
  658. pthread_mutex_destroy(&tp->lock);
  659. memset(tp, 0xdd, sizeof(*tp));
  660. lws_free(tp);
  661. }
  662. /*
  663. * We want to stop and destroy the tasks and related priv.
  664. */
  665. int
  666. lws_threadpool_dequeue_task(struct lws_threadpool_task *task)
  667. {
  668. struct lws_threadpool *tp;
  669. struct lws_threadpool_task **c;
  670. int n;
  671. tp = task->tp;
  672. pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
  673. if (task->outlive && !tp->destroying) {
  674. /* disconnect from wsi, and wsi from task */
  675. lws_dll2_remove(&task->list);
  676. task->args.wsi = NULL;
  677. #if defined(LWS_WITH_SECURE_STREAMS)
  678. task->args.ss = NULL;
  679. #endif
  680. goto bail;
  681. }
  682. c = &tp->task_queue_head;
  683. /* is he queued waiting for a chance to run? Mark him as stopped and
  684. * move him on to the done queue */
  685. while (*c) {
  686. if ((*c) == task) {
  687. *c = task->task_queue_next;
  688. task->task_queue_next = tp->task_done_head;
  689. tp->task_done_head = task;
  690. state_transition(task, LWS_TP_STATUS_STOPPED);
  691. tp->queue_depth--;
  692. tp->done_queue_depth++;
  693. task->done = lws_now_usecs();
  694. lwsl_debug("%s: tp %p: removed queued task wsi %p\n",
  695. __func__, tp, task_to_wsi(task));
  696. break;
  697. }
  698. c = &(*c)->task_queue_next;
  699. }
  700. /* is he on the done queue? */
  701. c = &tp->task_done_head;
  702. while (*c) {
  703. if ((*c) == task) {
  704. *c = task->task_queue_next;
  705. task->task_queue_next = NULL;
  706. lws_threadpool_task_cleanup_destroy(task);
  707. tp->done_queue_depth--;
  708. goto bail;
  709. }
  710. c = &(*c)->task_queue_next;
  711. }
  712. /* he's not in the queue... is he already running on a thread? */
  713. for (n = 0; n < tp->threads_in_pool; n++) {
  714. if (!tp->pool_list[n].task || tp->pool_list[n].task != task)
  715. continue;
  716. /*
  717. * ensure we don't collide with tests or changes in the
  718. * worker thread
  719. */
  720. pthread_mutex_lock(&tp->pool_list[n].lock);
  721. /*
  722. * mark him as having been requested to stop...
  723. * the caller will hear about it in his service thread
  724. * context as a request to close
  725. */
  726. state_transition(task, LWS_TP_STATUS_STOPPING);
  727. /* disconnect from wsi, and wsi from task */
  728. lws_dll2_remove(&task->list);
  729. task->args.wsi = NULL;
  730. #if defined(LWS_WITH_SECURE_STREAMS)
  731. task->args.ss = NULL;
  732. #endif
  733. pthread_mutex_unlock(&tp->pool_list[n].lock);
  734. lwsl_debug("%s: tp %p: request stop running task "
  735. "for wsi %p\n", __func__, tp, task_to_wsi(task));
  736. break;
  737. }
  738. if (n == tp->threads_in_pool) {
  739. /* can't find it */
  740. lwsl_notice("%s: tp %p: no task for wsi %p, decoupling\n",
  741. __func__, tp, task_to_wsi(task));
  742. lws_dll2_remove(&task->list);
  743. task->args.wsi = NULL;
  744. #if defined(LWS_WITH_SECURE_STREAMS)
  745. task->args.ss = NULL;
  746. #endif
  747. }
  748. bail:
  749. pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
  750. return 0;
  751. }
  752. int
  753. lws_threadpool_dequeue(struct lws *wsi) /* deprecated */
  754. {
  755. struct lws_threadpool_task *task;
  756. if (!wsi->tp_task_owner.count)
  757. return 0;
  758. assert(wsi->tp_task_owner.count != 1);
  759. task = lws_container_of(wsi->tp_task_owner.head,
  760. struct lws_threadpool_task, list);
  761. return lws_threadpool_dequeue_task(task);
  762. }
  763. struct lws_threadpool_task *
  764. lws_threadpool_enqueue(struct lws_threadpool *tp,
  765. const struct lws_threadpool_task_args *args,
  766. const char *format, ...)
  767. {
  768. struct lws_threadpool_task *task = NULL;
  769. va_list ap;
  770. if (tp->destroying)
  771. return NULL;
  772. #if defined(LWS_WITH_SECURE_STREAMS)
  773. assert(args->ss || args->wsi);
  774. #endif
  775. pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
  776. /*
  777. * if there's room on the queue, the job always goes on the queue
  778. * first, then any free thread may pick it up after the wake_idle
  779. */
  780. if (tp->queue_depth == tp->max_queue_depth) {
  781. lwsl_notice("%s: queue reached limit %d\n", __func__,
  782. tp->max_queue_depth);
  783. goto bail;
  784. }
  785. /*
  786. * create the task object
  787. */
  788. task = lws_malloc(sizeof(*task), __func__);
  789. if (!task)
  790. goto bail;
  791. memset(task, 0, sizeof(*task));
  792. pthread_cond_init(&task->wake_idle, NULL);
  793. task->args = *args;
  794. task->tp = tp;
  795. task->created = lws_now_usecs();
  796. va_start(ap, format);
  797. vsnprintf(task->name, sizeof(task->name) - 1, format, ap);
  798. va_end(ap);
  799. /*
  800. * add him on the tp task queue
  801. */
  802. task->task_queue_next = tp->task_queue_head;
  803. state_transition(task, LWS_TP_STATUS_QUEUED);
  804. tp->task_queue_head = task;
  805. tp->queue_depth++;
  806. /*
  807. * mark the wsi itself as depending on this tp (so wsi close for
  808. * whatever reason can clean up)
  809. */
  810. #if defined(LWS_WITH_SECURE_STREAMS)
  811. if (args->ss)
  812. lws_dll2_add_tail(&task->list, &args->ss->wsi->tp_task_owner);
  813. else
  814. #endif
  815. lws_dll2_add_tail(&task->list, &args->wsi->tp_task_owner);
  816. lwsl_thread("%s: tp %s: enqueued task %p (%s) for wsi %p, depth %d\n",
  817. __func__, tp->name, task, task->name, task_to_wsi(task),
  818. tp->queue_depth);
  819. /* alert any idle thread there's something new on the task list */
  820. lws_memory_barrier();
  821. pthread_cond_signal(&tp->wake_idle);
  822. bail:
  823. pthread_mutex_unlock(&tp->lock); /* -------------------- tpool unlock */
  824. return task;
  825. }
  826. /* this should be called from the service thread */
  827. enum lws_threadpool_task_status
  828. lws_threadpool_task_status(struct lws_threadpool_task *task, void **user)
  829. {
  830. enum lws_threadpool_task_status status;
  831. struct lws_threadpool *tp = task->tp;
  832. if (!tp)
  833. return LWS_TP_STATUS_FINISHED;
  834. *user = task->args.user;
  835. status = task->status;
  836. if (status == LWS_TP_STATUS_FINISHED ||
  837. status == LWS_TP_STATUS_STOPPED) {
  838. char buf[160];
  839. pthread_mutex_lock(&tp->lock); /* ================ tpool lock */
  840. __lws_threadpool_task_dump(task, buf, sizeof(buf));
  841. lwsl_thread("%s: %s: service thread REAPING: %s\n",
  842. __func__, tp->name, buf);
  843. __lws_threadpool_reap(task);
  844. lws_memory_barrier();
  845. pthread_mutex_unlock(&tp->lock); /* ------------ tpool unlock */
  846. }
  847. return status;
  848. }
  849. enum lws_threadpool_task_status
  850. lws_threadpool_task_status_noreap(struct lws_threadpool_task *task)
  851. {
  852. return task->status;
  853. }
  854. enum lws_threadpool_task_status
  855. lws_threadpool_task_status_wsi(struct lws *wsi,
  856. struct lws_threadpool_task **_task, void **user)
  857. {
  858. struct lws_threadpool_task *task;
  859. if (!wsi->tp_task_owner.count) {
  860. lwsl_notice("%s: wsi has no task, ~=FINISHED\n", __func__);
  861. return LWS_TP_STATUS_FINISHED;
  862. }
  863. assert(wsi->tp_task_owner.count == 1); /* see deprecation docs in hdr */
  864. task = lws_container_of(wsi->tp_task_owner.head,
  865. struct lws_threadpool_task, list);
  866. *_task = task;
  867. return lws_threadpool_task_status(task, user);
  868. }
  869. void
  870. lws_threadpool_task_sync(struct lws_threadpool_task *task, int stop)
  871. {
  872. lwsl_debug("%s\n", __func__);
  873. if (!task)
  874. return;
  875. if (stop)
  876. state_transition(task, LWS_TP_STATUS_STOPPING);
  877. pthread_mutex_lock(&task->tp->lock);
  878. pthread_cond_signal(&task->wake_idle);
  879. pthread_mutex_unlock(&task->tp->lock);
  880. }
  881. int
  882. lws_threadpool_foreach_task_wsi(struct lws *wsi, void *user,
  883. int (*cb)(struct lws_threadpool_task *task,
  884. void *user))
  885. {
  886. struct lws_threadpool_task *task1;
  887. if (wsi->tp_task_owner.head == NULL)
  888. return 0;
  889. task1 = lws_container_of(wsi->tp_task_owner.head,
  890. struct lws_threadpool_task, list);
  891. pthread_mutex_lock(&task1->tp->lock); /* ================ tpool lock */
  892. lws_start_foreach_dll_safe(struct lws_dll2 *, d, d1,
  893. wsi->tp_task_owner.head) {
  894. struct lws_threadpool_task *task = lws_container_of(d,
  895. struct lws_threadpool_task, list);
  896. if (cb(task, user)) {
  897. pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
  898. return 1;
  899. }
  900. } lws_end_foreach_dll_safe(d, d1);
  901. pthread_mutex_unlock(&task1->tp->lock); /* ------------ tpool unlock */
  902. return 0;
  903. }
  904. #if defined(LWS_WITH_SECURE_STREAMS)
  905. int
  906. lws_threadpool_foreach_task_ss(struct lws_ss_handle *ss, void *user,
  907. int (*cb)(struct lws_threadpool_task *task,
  908. void *user))
  909. {
  910. if (!ss->wsi)
  911. return 0;
  912. return lws_threadpool_foreach_task_wsi(ss->wsi, user, cb);
  913. }
  914. #endif
  915. struct lws_threadpool_task *
  916. lws_threadpool_get_task_wsi(struct lws *wsi)
  917. {
  918. if (wsi->tp_task_owner.head == NULL)
  919. return NULL;
  920. return lws_container_of(wsi->tp_task_owner.head,
  921. struct lws_threadpool_task, list);
  922. }
  923. #if defined(LWS_WITH_SECURE_STREAMS)
  924. struct lws_threadpool_task *
  925. lws_threadpool_get_task_ss(struct lws_ss_handle *ss)
  926. {
  927. return lws_threadpool_get_task_wsi(ss->wsi);
  928. }
  929. #endif