secure-streams-process.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. /*
  2. * libwebsockets - small server side websockets and web server implementation
  3. *
  4. * Copyright (C) 2019 - 2020 Andy Green <[email protected]>
  5. *
  6. * Permission is hereby granted, free of charge, to any person obtaining a copy
  7. * of this software and associated documentation files (the "Software"), to
  8. * deal in the Software without restriction, including without limitation the
  9. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10. * sell copies of the Software, and to permit persons to whom the Software is
  11. * furnished to do so, subject to the following conditions:
  12. *
  13. * The above copyright notice and this permission notice shall be included in
  14. * all copies or substantial portions of the Software.
  15. *
  16. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  21. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  22. * IN THE SOFTWARE.
  23. *
  24. *
  25. * When the user code is in a different process, a non-tls unix domain socket
  26. * proxy is used to asynchronusly transfer buffers in each direction via the
  27. * network stack, without explicit IPC
  28. *
  29. * user_process{ [user code] | shim | socket-}------ lws_process{ lws }
  30. *
  31. * Lws exposes a listening unix domain socket in this case, the user processes
  32. * connect to it and pass just info.streamtype in an initial tx packet. All
  33. * packets are prepended by a 1-byte type field when used in this mode. See
  34. * lws-secure-streams.h for documentation and definitions.
  35. *
  36. * Proxying in either direction can face the situation it cannot send the onward
  37. * packet immediately and is subject to separating the write request from the
  38. * write action. To make the best use of memory, a single preallocated buffer
  39. * stashes pending packets in all four directions (c->p, p->c, p->ss, ss->p).
  40. * This allows it to adapt to different traffic patterns without wasted areas
  41. * dedicated to traffic that isn't coming in a particular application.
  42. *
  43. * A shim is provided to monitor the process' unix domain socket and regenerate
  44. * the secure sockets api there with callbacks happening in the process thread
  45. * context.
  46. *
  47. * This file implements the listening unix domain socket proxy... this code is
  48. * only going to run on a Linux-class device with its implications about memory
  49. * availability.
  50. */
  51. #include <private-lib-core.h>
  52. /*
  53. * Because both sides of the connection share the conn, we allocate it
  54. * during accepted adoption, and both sides point to it.
  55. *
  56. * The last one of the accepted side and the onward side to close frees it.
  57. */
  58. struct conn {
  59. struct lws_ss_serialization_parser parser;
  60. lws_dsh_t *dsh; /* unified buffer for both sides */
  61. struct lws *wsi; /* the client side */
  62. lws_ss_handle_t *ss; /* the onward, ss side */
  63. lws_ss_conn_states_t state;
  64. };
  65. struct raw_pss {
  66. struct conn *conn;
  67. };
  68. /*
  69. * Proxy - onward secure-stream handler
  70. */
  71. typedef struct ss_proxy_onward {
  72. lws_ss_handle_t *ss;
  73. struct conn *conn;
  74. } ss_proxy_t;
  75. /* secure streams payload interface */
  76. static lws_ss_state_return_t
  77. ss_proxy_onward_rx(void *userobj, const uint8_t *buf, size_t len, int flags)
  78. {
  79. ss_proxy_t *m = (ss_proxy_t *)userobj;
  80. const char *rsp = NULL;
  81. int n;
  82. // lwsl_notice("%s: len %d\n", __func__, (int)len);
  83. /*
  84. * The onward secure stream connection has received something.
  85. */
  86. if (m->ss->rideshare != m->ss->policy && m->ss->rideshare) {
  87. rsp = m->ss->rideshare->streamtype;
  88. flags |= LWSSS_FLAG_RIDESHARE;
  89. }
  90. n = lws_ss_serialize_rx_payload(m->conn->dsh, buf, len, flags, rsp);
  91. if (n)
  92. return n;
  93. if (m->conn->wsi) /* if possible, request client conn write */
  94. lws_callback_on_writable(m->conn->wsi);
  95. return 0;
  96. }
  97. /*
  98. * we are transmitting buffered payload originally from the client on to the ss
  99. */
  100. static lws_ss_state_return_t
  101. ss_proxy_onward_tx(void *userobj, lws_ss_tx_ordinal_t ord, uint8_t *buf,
  102. size_t *len, int *flags)
  103. {
  104. ss_proxy_t *m = (ss_proxy_t *)userobj;
  105. void *p;
  106. size_t si;
  107. if (!m->conn->ss || m->conn->state != LPCSPROX_OPERATIONAL) {
  108. lwsl_notice("%s: ss not ready\n", __func__);
  109. *len = 0;
  110. return 1;
  111. }
  112. /*
  113. * The onward secure stream says that we could send something to it
  114. * (by putting it in buf, and setting *len and *flags)
  115. */
  116. if (lws_ss_deserialize_tx_payload(m->conn->dsh, m->ss->wsi,
  117. ord, buf, len, flags))
  118. return 1;
  119. if (!lws_dsh_get_head(m->conn->dsh, KIND_C_TO_P, (void **)&p, &si))
  120. lws_ss_request_tx(m->conn->ss);
  121. if (!*len && !*flags)
  122. return 1; /* we don't actually want to send anything */
  123. lwsl_info("%s: onward tx %d fl 0x%x\n", __func__, (int)*len, *flags);
  124. #if 0
  125. {
  126. int ff = open("/tmp/z", O_RDWR | O_CREAT | O_APPEND, 0666);
  127. if (ff == -1)
  128. lwsl_err("%s: errno %d\n", __func__, errno);
  129. write(ff, buf, *len);
  130. close(ff);
  131. }
  132. #endif
  133. return 0;
  134. }
  135. static lws_ss_state_return_t
  136. ss_proxy_onward_state(void *userobj, void *sh,
  137. lws_ss_constate_t state, lws_ss_tx_ordinal_t ack)
  138. {
  139. ss_proxy_t *m = (ss_proxy_t *)userobj;
  140. switch (state) {
  141. case LWSSSCS_CREATING:
  142. break;
  143. case LWSSSCS_DESTROYING:
  144. if (!m->conn)
  145. break;
  146. if (!m->conn->wsi) {
  147. /*
  148. * Our onward secure stream is closing and our client
  149. * connection has already gone away... destroy the conn.
  150. */
  151. lwsl_info("%s: Destroying conn\n", __func__);
  152. lws_dsh_destroy(&m->conn->dsh);
  153. free(m->conn);
  154. m->conn = NULL;
  155. return 0;
  156. } else
  157. lwsl_info("%s: ss DESTROYING, wsi up\n", __func__);
  158. break;
  159. default:
  160. break;
  161. }
  162. if (!m->conn) {
  163. lwsl_warn("%s: dropping state due to conn not up\n", __func__);
  164. return 0;
  165. }
  166. lws_ss_serialize_state(m->conn->dsh, state, ack);
  167. if (m->conn->wsi) /* if possible, request client conn write */
  168. lws_callback_on_writable(m->conn->wsi);
  169. return 0;
  170. }
  171. void
  172. ss_proxy_onward_txcr(void *userobj, int bump)
  173. {
  174. ss_proxy_t *m = (ss_proxy_t *)userobj;
  175. if (!m->conn)
  176. return;
  177. lws_ss_serialize_txcr(m->conn->dsh, bump);
  178. if (m->conn->wsi) /* if possible, request client conn write */
  179. lws_callback_on_writable(m->conn->wsi);
  180. }
  181. /*
  182. * Client - Proxy connection on unix domain socket
  183. */
  184. static int
  185. callback_ss_proxy(struct lws *wsi, enum lws_callback_reasons reason,
  186. void *user, void *in, size_t len)
  187. {
  188. struct lws_context_per_thread *pt = &wsi->a.context->pt[(int)wsi->tsi];
  189. struct raw_pss *pss = (struct raw_pss *)user;
  190. const lws_ss_policy_t *rsp;
  191. struct conn *conn = NULL;
  192. lws_ss_info_t ssi;
  193. const uint8_t *cp;
  194. #if defined(LWS_WITH_DETAILED_LATENCY)
  195. lws_usec_t us;
  196. #endif
  197. char s[128];
  198. uint8_t *p;
  199. size_t si;
  200. char pay;
  201. int n;
  202. if (pss)
  203. conn = pss->conn;
  204. switch (reason) {
  205. case LWS_CALLBACK_PROTOCOL_INIT:
  206. break;
  207. case LWS_CALLBACK_PROTOCOL_DESTROY:
  208. break;
  209. /* callbacks related to raw socket descriptor "accepted side" */
  210. case LWS_CALLBACK_RAW_ADOPT:
  211. lwsl_info("LWS_CALLBACK_RAW_ADOPT\n");
  212. if (!pss)
  213. return -1;
  214. pss->conn = malloc(sizeof(struct conn));
  215. if (!pss->conn)
  216. return -1;
  217. memset(pss->conn, 0, sizeof(*pss->conn));
  218. pss->conn->dsh = lws_dsh_create(&pt->ss_dsh_owner,
  219. LWS_SS_MTU * 160, 2);
  220. if (!pss->conn->dsh) {
  221. free(pss->conn);
  222. return -1;
  223. }
  224. pss->conn->wsi = wsi;
  225. pss->conn->state = LPCSPROX_WAIT_INITIAL_TX;
  226. /*
  227. * Client is expected to follow the unix domain socket
  228. * acceptance up rapidly with an initial tx containing the
  229. * streamtype name. We can't create the stream until then.
  230. */
  231. lws_set_timeout(wsi,
  232. PENDING_TIMEOUT_AWAITING_CLIENT_HS_SEND, 3);
  233. break;
  234. case LWS_CALLBACK_RAW_CLOSE:
  235. lwsl_info("LWS_CALLBACK_RAW_CLOSE:\n");
  236. if (!conn)
  237. break;
  238. /*
  239. * the client unix domain socket connection (wsi / conn->wsi)
  240. * has closed... eg, client has exited or otherwise has
  241. * definitively finished with the proxying and onward connection
  242. *
  243. * But right now, the SS and possibly the SS onward wsi are
  244. * still live...
  245. */
  246. if (conn->ss) {
  247. struct lws *cw = conn->ss->wsi;
  248. /*
  249. * The onward connection is around
  250. */
  251. lwsl_info("%s: destroying ss.h=%p, ss.wsi=%p\n",
  252. __func__, conn->ss, conn->ss->wsi);
  253. /* sever relationship with ss about to be deleted */
  254. lws_set_opaque_user_data(wsi, NULL);
  255. if (wsi != cw)
  256. /*
  257. * The wsi doing the onward connection can no
  258. * longer relate to the conn... otherwise when
  259. * he gets callbacks he wants to bind to
  260. * the ss we are about to delete
  261. */
  262. lws_wsi_close(cw, LWS_TO_KILL_ASYNC);
  263. conn->wsi = NULL;
  264. lws_ss_destroy(&conn->ss);
  265. /* conn may have gone */
  266. break;
  267. }
  268. if (conn->state == LPCSPROX_DESTROYED || !conn->ss) {
  269. /*
  270. * There's no onward secure stream and our client
  271. * connection is closing. Destroy the conn.
  272. */
  273. lws_dsh_destroy(&conn->dsh);
  274. free(conn);
  275. pss->conn = NULL;
  276. } else
  277. lwsl_debug("%s: CLOSE; ss=%p\n", __func__, conn->ss);
  278. break;
  279. case LWS_CALLBACK_RAW_RX:
  280. /*
  281. * ie, the proxy is receiving something from a client
  282. */
  283. lwsl_info("%s: RX: rx %d\n", __func__, (int)len);
  284. if (!conn || !conn->wsi) {
  285. lwsl_err("%s: rx with bad conn state\n", __func__);
  286. return -1;
  287. }
  288. // lwsl_hexdump_info(in, len);
  289. if (conn->state == LPCSPROX_WAIT_INITIAL_TX) {
  290. memset(&ssi, 0, sizeof(ssi));
  291. ssi.user_alloc = sizeof(ss_proxy_t);
  292. ssi.handle_offset = offsetof(ss_proxy_t, ss);
  293. ssi.opaque_user_data_offset =
  294. offsetof(ss_proxy_t, conn);
  295. ssi.rx = ss_proxy_onward_rx;
  296. ssi.tx = ss_proxy_onward_tx;
  297. }
  298. ssi.state = ss_proxy_onward_state;
  299. ssi.flags = 0;
  300. n = lws_ss_deserialize_parse(&conn->parser,
  301. lws_get_context(wsi), conn->dsh, in, len,
  302. &conn->state, conn, &conn->ss, &ssi, 0);
  303. switch (n) {
  304. case LWSSSSRET_OK:
  305. break;
  306. case LWSSSSRET_DISCONNECT_ME:
  307. return -1;
  308. case LWSSSSRET_DESTROY_ME:
  309. if (conn->ss)
  310. lws_ss_destroy(&conn->ss);
  311. return -1;
  312. }
  313. if (conn->state == LPCSPROX_REPORTING_FAIL ||
  314. conn->state == LPCSPROX_REPORTING_OK)
  315. lws_callback_on_writable(conn->wsi);
  316. break;
  317. case LWS_CALLBACK_RAW_WRITEABLE:
  318. // lwsl_notice("LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE\n");
  319. /*
  320. * We can transmit something back to the client from the dsh
  321. * of stuff we received on its behalf from the ss
  322. */
  323. if (!conn || !conn->wsi)
  324. break;
  325. n = 0;
  326. pay = 0;
  327. s[3] = 0;
  328. cp = (const uint8_t *)s;
  329. switch (conn->state) {
  330. case LPCSPROX_REPORTING_FAIL:
  331. s[3] = 1;
  332. /* fallthru */
  333. case LPCSPROX_REPORTING_OK:
  334. s[0] = LWSSS_SER_RXPRE_CREATE_RESULT;
  335. s[1] = 0;
  336. s[2] = 1;
  337. n = 4;
  338. /*
  339. * If there's rideshare sequencing, it's added after the
  340. * first 4 bytes or the create result, comma-separated
  341. */
  342. if (conn->ss) {
  343. rsp = conn->ss->policy;
  344. while (rsp) {
  345. if (n != 4 && n < (int)sizeof(s) - 2)
  346. s[n++] = ',';
  347. n += lws_snprintf(&s[n], sizeof(s) - n,
  348. "%s", rsp->streamtype);
  349. rsp = lws_ss_policy_lookup(wsi->a.context,
  350. rsp->rideshare_streamtype);
  351. }
  352. }
  353. s[2] = n - 3;
  354. conn->state = LPCSPROX_OPERATIONAL;
  355. lws_set_timeout(wsi, 0, 0);
  356. break;
  357. case LPCSPROX_OPERATIONAL:
  358. if (lws_dsh_get_head(conn->dsh, KIND_SS_TO_P,
  359. (void **)&p, &si))
  360. break;
  361. cp = p;
  362. #if defined(LWS_WITH_DETAILED_LATENCY)
  363. if (cp[0] == LWSSS_SER_RXPRE_RX_PAYLOAD &&
  364. wsi->a.context->detailed_latency_cb) {
  365. /*
  366. * we're fulfilling rx that came in on ss
  367. * by sending it back out to the client on
  368. * the Unix Domain Socket
  369. *
  370. * + 7 u32 write will compute latency here...
  371. * + 11 u32 ust we received from ss
  372. *
  373. * lws_write will report it and fill in
  374. * LAT_DUR_PROXY_CLIENT_REQ_TO_WRITE
  375. */
  376. us = lws_now_usecs();
  377. lws_ser_wu32be(&p[7], us -
  378. lws_ser_ru64be(&p[11]));
  379. lws_ser_wu64be(&p[11], us);
  380. wsi->detlat.acc_size =
  381. wsi->detlat.req_size = si - 19;
  382. /* time proxy held it */
  383. wsi->detlat.latencies[
  384. LAT_DUR_PROXY_RX_TO_ONWARD_TX] =
  385. lws_ser_ru32be(&p[7]);
  386. }
  387. #endif
  388. pay = 1;
  389. n = (int)si;
  390. break;
  391. default:
  392. break;
  393. }
  394. again:
  395. if (!n)
  396. break;
  397. n = lws_write(wsi, (uint8_t *)cp, n, LWS_WRITE_RAW);
  398. if (n < 0) {
  399. lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
  400. goto hangup;
  401. }
  402. switch (conn->state) {
  403. case LPCSPROX_REPORTING_FAIL:
  404. goto hangup;
  405. case LPCSPROX_OPERATIONAL:
  406. if (pay)
  407. lws_dsh_free((void **)&p);
  408. if (!lws_dsh_get_head(conn->dsh, KIND_SS_TO_P,
  409. (void **)&p, &si)) {
  410. if (!lws_send_pipe_choked(wsi)) {
  411. cp = p;
  412. pay = 1;
  413. n = (int)si;
  414. goto again;
  415. }
  416. lws_callback_on_writable(wsi);
  417. }
  418. break;
  419. default:
  420. break;
  421. }
  422. break;
  423. default:
  424. break;
  425. }
  426. return lws_callback_http_dummy(wsi, reason, user, in, len);
  427. hangup:
  428. //lws_ss_destroy(&conn->ss);
  429. //conn->state = LPCSPROX_DESTROYED;
  430. /* hang up on him */
  431. return -1;
  432. }
  433. static const struct lws_protocols protocols[] = {
  434. {
  435. "ssproxy-protocol",
  436. callback_ss_proxy,
  437. sizeof(struct raw_pss),
  438. 2048, 2048, NULL, 0
  439. },
  440. { NULL, NULL, 0, 0, 0, NULL, 0 }
  441. };
  442. /*
  443. * called from create_context()
  444. */
  445. int
  446. lws_ss_proxy_create(struct lws_context *context, const char *bind, int port)
  447. {
  448. struct lws_context_creation_info info;
  449. memset(&info, 0, sizeof(info));
  450. info.vhost_name = "ssproxy";
  451. info.options = LWS_SERVER_OPTION_ADOPT_APPLY_LISTEN_ACCEPT_CONFIG;
  452. info.port = port;
  453. if (!port) {
  454. if (!bind)
  455. bind = "@proxy.ss.lws";
  456. info.options |= LWS_SERVER_OPTION_UNIX_SOCK;
  457. }
  458. info.iface = bind;
  459. info.unix_socket_perms = "root:root";
  460. info.listen_accept_role = "raw-skt";
  461. info.listen_accept_protocol = "ssproxy-protocol";
  462. info.protocols = protocols;
  463. if (!lws_create_vhost(context, &info)) {
  464. lwsl_err("%s: Failed to create ss proxy vhost\n", __func__);
  465. return 1;
  466. }
  467. return 0;
  468. }