protocol_lws_raw_proxy.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. /*
  2. * libwebsockets - small server side websockets and web server implementation
  3. *
  4. * Copyright (C) 2010 - 2019 Andy Green <[email protected]>
  5. *
  6. * Permission is hereby granted, free of charge, to any person obtaining a copy
  7. * of this software and associated documentation files (the "Software"), to
  8. * deal in the Software without restriction, including without limitation the
  9. * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  10. * sell copies of the Software, and to permit persons to whom the Software is
  11. * furnished to do so, subject to the following conditions:
  12. *
  13. * The above copyright notice and this permission notice shall be included in
  14. * all copies or substantial portions of the Software.
  15. *
  16. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  21. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  22. * IN THE SOFTWARE.
  23. */
  24. #if !defined (LWS_PLUGIN_STATIC)
  25. #define LWS_DLL
  26. #define LWS_INTERNAL
  27. #include <libwebsockets.h>
  28. #endif
  29. #include <string.h>
  30. #include <sys/types.h>
  31. #include <fcntl.h>
  32. #define RING_DEPTH 8
  33. struct packet {
  34. void *payload;
  35. uint32_t len;
  36. uint32_t ticket;
  37. };
  38. enum {
  39. ACC,
  40. ONW
  41. };
  42. /*
  43. * Because both sides of the connection want to share this, we allocate it
  44. * during accepted adoption and both sides have a pss that is just a wrapper
  45. * pointing to this.
  46. *
  47. * The last one of the accepted side and the onward side to close frees it.
  48. * This removes any chance of one side or the other having an invalidated
  49. * pointer to the pss.
  50. */
  51. struct conn {
  52. struct lws *wsi[2];
  53. /* rings containing unsent rx from accepted and onward sides */
  54. struct lws_ring *r[2];
  55. uint32_t t[2]; /* ring tail */
  56. uint32_t ticket_next;
  57. uint32_t ticket_retired;
  58. char rx_enabled[2];
  59. char closed[2];
  60. char established[2];
  61. };
  62. struct raw_pss {
  63. struct conn *conn;
  64. };
  65. /* one of these is created for each vhost our protocol is used with */
  66. struct raw_vhd {
  67. char addr[128];
  68. uint16_t port;
  69. char ipv6;
  70. };
  71. static void
  72. __destroy_packet(void *_pkt)
  73. {
  74. struct packet *pkt = _pkt;
  75. free(pkt->payload);
  76. pkt->payload = NULL;
  77. pkt->len = 0;
  78. }
  79. static void
  80. destroy_conn(struct raw_vhd *vhd, struct raw_pss *pss)
  81. {
  82. struct conn *conn = pss->conn;
  83. if (conn->r[ACC])
  84. lws_ring_destroy(conn->r[ACC]);
  85. if (conn->r[ONW])
  86. lws_ring_destroy(conn->r[ONW]);
  87. pss->conn = NULL;
  88. free(conn);
  89. }
  90. static int
  91. connect_client(struct raw_vhd *vhd, struct raw_pss *pss)
  92. {
  93. struct lws_client_connect_info i;
  94. char host[128];
  95. struct lws *cwsi;
  96. lws_snprintf(host, sizeof(host), "%s:%u", vhd->addr, vhd->port);
  97. memset(&i, 0, sizeof(i));
  98. i.method = "RAW";
  99. i.context = lws_get_context(pss->conn->wsi[ACC]);
  100. i.port = vhd->port;
  101. i.address = vhd->addr;
  102. i.host = host;
  103. i.origin = host;
  104. i.ssl_connection = 0;
  105. i.vhost = lws_get_vhost(pss->conn->wsi[ACC]);
  106. i.local_protocol_name = "raw-proxy";
  107. i.protocol = "raw-proxy";
  108. i.path = "/";
  109. /*
  110. * The "onward" client wsi has its own pss but shares the "conn"
  111. * created when the inbound connection was accepted. We need to stash
  112. * the address of the shared conn and apply it to the client psss
  113. * when the client connection completes.
  114. */
  115. i.opaque_user_data = pss->conn;
  116. i.pwsi = &pss->conn->wsi[ONW];
  117. lwsl_info("%s: onward: %s:%d%s\n", __func__, i.address, i.port, i.path);
  118. cwsi = lws_client_connect_via_info(&i);
  119. if (!cwsi)
  120. lwsl_err("%s: client connect failed early\n", __func__);
  121. return !cwsi;
  122. }
  123. static int
  124. flow_control(struct conn *conn, int side, int enable)
  125. {
  126. if (conn->closed[side] ||
  127. enable == conn->rx_enabled[side] ||
  128. !conn->established[side])
  129. return 0;
  130. if (lws_rx_flow_control(conn->wsi[side], enable))
  131. return 1;
  132. conn->rx_enabled[side] = enable;
  133. lwsl_info("%s: %s side: %s\n", __func__, side ? "ONW" : "ACC",
  134. enable ? "rx enabled" : "rx flow controlled");
  135. return 0;
  136. }
  137. static int
  138. callback_raw_proxy(struct lws *wsi, enum lws_callback_reasons reason,
  139. void *user, void *in, size_t len)
  140. {
  141. struct raw_pss *pss = (struct raw_pss *)user;
  142. struct raw_vhd *vhd = (struct raw_vhd *)lws_protocol_vh_priv_get(
  143. lws_get_vhost(wsi), lws_get_protocol(wsi));
  144. const struct packet *ppkt;
  145. struct conn *conn = NULL;
  146. struct lws_tokenize ts;
  147. lws_tokenize_elem e;
  148. struct packet pkt;
  149. const char *cp;
  150. int n;
  151. if (pss)
  152. conn = pss->conn;
  153. switch (reason) {
  154. case LWS_CALLBACK_PROTOCOL_INIT:
  155. vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),
  156. lws_get_protocol(wsi), sizeof(struct raw_vhd));
  157. if (lws_pvo_get_str(in, "onward", &cp)) {
  158. lwsl_err("%s: vh %s: pvo 'onward' required\n", __func__,
  159. lws_get_vhost_name(lws_get_vhost(wsi)));
  160. return -1;
  161. }
  162. lws_tokenize_init(&ts, cp, LWS_TOKENIZE_F_DOT_NONTERM |
  163. LWS_TOKENIZE_F_MINUS_NONTERM |
  164. LWS_TOKENIZE_F_NO_FLOATS);
  165. ts.len = strlen(cp);
  166. if (lws_tokenize(&ts) != LWS_TOKZE_TOKEN)
  167. goto bad_onward;
  168. if (!strncmp(ts.token, "ipv6", ts.token_len))
  169. vhd->ipv6 = 1;
  170. else
  171. if (strncmp(ts.token, "ipv4", ts.token_len))
  172. goto bad_onward;
  173. /* then the colon */
  174. if (lws_tokenize(&ts) != LWS_TOKZE_DELIMITER)
  175. goto bad_onward;
  176. e = lws_tokenize(&ts);
  177. if (!vhd->ipv6) {
  178. if (e != LWS_TOKZE_TOKEN ||
  179. ts.token_len + 1 >= (int)sizeof(vhd->addr))
  180. goto bad_onward;
  181. lws_strncpy(vhd->addr, ts.token, ts.token_len + 1);
  182. e = lws_tokenize(&ts);
  183. if (e == LWS_TOKZE_DELIMITER) {
  184. /* there should be a port then */
  185. e = lws_tokenize(&ts);
  186. if (e != LWS_TOKZE_INTEGER)
  187. goto bad_onward;
  188. vhd->port = atoi(ts.token);
  189. e = lws_tokenize(&ts);
  190. }
  191. if (e != LWS_TOKZE_ENDED)
  192. goto bad_onward;
  193. } else
  194. lws_strncpy(vhd->addr, ts.token, sizeof(vhd->addr));
  195. lwsl_notice("%s: vh %s: onward %s:%s:%d\n", __func__,
  196. lws_get_vhost_name(lws_get_vhost(wsi)),
  197. vhd->ipv6 ? "ipv6": "ipv4", vhd->addr, vhd->port);
  198. break;
  199. bad_onward:
  200. lwsl_err("%s: onward pvo format must be ipv4:addr[:port] "
  201. " or ipv6:addr, not '%s'\n", __func__, cp);
  202. return -1;
  203. case LWS_CALLBACK_PROTOCOL_DESTROY:
  204. break;
  205. /* callbacks related to client "onward side" */
  206. case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
  207. lwsl_err("CLIENT_CONNECTION_ERROR: %s\n",
  208. in ? (char *)in : "(null)");
  209. break;
  210. case LWS_CALLBACK_RAW_PROXY_CLI_ADOPT:
  211. lwsl_debug("%s: %p: LWS_CALLBACK_RAW_CLI_ADOPT: pss %p\n", __func__, wsi, pss);
  212. if (conn || !pss)
  213. break;
  214. conn = pss->conn = lws_get_opaque_user_data(wsi);
  215. if (!conn)
  216. break;
  217. conn->established[ONW] = 1;
  218. /* they start enabled */
  219. conn->rx_enabled[ACC] = 1;
  220. conn->rx_enabled[ONW] = 1;
  221. /* he disabled his rx while waiting for use to be established */
  222. flow_control(conn, ACC, 1);
  223. lws_callback_on_writable(wsi);
  224. lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
  225. break;
  226. case LWS_CALLBACK_RAW_PROXY_CLI_CLOSE:
  227. lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_CLOSE\n");
  228. if (!conn)
  229. break;
  230. conn->closed[ONW] = 1;
  231. if (conn->closed[ACC])
  232. destroy_conn(vhd, pss);
  233. break;
  234. case LWS_CALLBACK_RAW_PROXY_CLI_RX:
  235. lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_RX: %d\n", (int)len);
  236. if (!conn)
  237. return 0;
  238. if (!pss || !conn->wsi[ACC] || conn->closed[ACC]) {
  239. lwsl_info(" pss %p, wsi[ACC] %p, closed[ACC] %d\n",
  240. pss, conn->wsi[ACC], conn->closed[ACC]);
  241. return -1;
  242. }
  243. pkt.payload = malloc(len);
  244. if (!pkt.payload) {
  245. lwsl_notice("OOM: dropping\n");
  246. return -1;
  247. }
  248. pkt.len = len;
  249. pkt.ticket = conn->ticket_next++;
  250. memcpy(pkt.payload, in, len);
  251. if (!lws_ring_insert(conn->r[ONW], &pkt, 1)) {
  252. __destroy_packet(&pkt);
  253. lwsl_notice("dropping!\n");
  254. return -1;
  255. }
  256. lwsl_debug("After onward RX: acc free: %d...\n",
  257. (int)lws_ring_get_count_free_elements(conn->r[ONW]));
  258. if (conn->rx_enabled[ONW] &&
  259. lws_ring_get_count_free_elements(conn->r[ONW]) < 2)
  260. flow_control(conn, ONW, 0);
  261. if (!conn->closed[ACC])
  262. lws_callback_on_writable(conn->wsi[ACC]);
  263. break;
  264. case LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE:
  265. lwsl_debug("LWS_CALLBACK_RAW_PROXY_CLI_WRITEABLE\n");
  266. if (!conn)
  267. break;
  268. ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
  269. if (!ppkt) {
  270. lwsl_info("%s: CLI_WRITABLE had nothing in acc ring\n",
  271. __func__);
  272. break;
  273. }
  274. if (ppkt->ticket != conn->ticket_retired + 1) {
  275. lwsl_info("%s: acc ring has %d but next %d\n", __func__,
  276. ppkt->ticket, conn->ticket_retired + 1);
  277. lws_callback_on_writable(conn->wsi[ACC]);
  278. break;
  279. }
  280. n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
  281. if (n < 0) {
  282. lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
  283. return -1;
  284. }
  285. conn->ticket_retired = ppkt->ticket;
  286. lws_ring_consume(conn->r[ACC], &conn->t[ACC], NULL, 1);
  287. lws_ring_update_oldest_tail(conn->r[ACC], conn->t[ACC]);
  288. lwsl_debug("acc free: %d...\n",
  289. (int)lws_ring_get_count_free_elements(conn->r[ACC]));
  290. if (!conn->rx_enabled[ACC] &&
  291. lws_ring_get_count_free_elements(conn->r[ACC]) > 2)
  292. flow_control(conn, ACC, 1);
  293. ppkt = lws_ring_get_element(conn->r[ACC], &conn->t[ACC]);
  294. lwsl_debug("%s: CLI_WRITABLE: next acc pkt %p idx %d vs %d\n",
  295. __func__, ppkt, ppkt ? ppkt->ticket : 0,
  296. conn->ticket_retired + 1);
  297. if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
  298. lws_callback_on_writable(wsi);
  299. else {
  300. /*
  301. * defer checking for accepted side closing until we
  302. * sent everything in the ring to onward
  303. */
  304. if (conn->closed[ACC])
  305. /*
  306. * there is never going to be any more... but
  307. * we may have some tx still in tx buflist /
  308. * partial
  309. */
  310. return lws_raw_transaction_completed(wsi);
  311. if (lws_ring_get_element(conn->r[ONW], &conn->t[ONW]))
  312. lws_callback_on_writable(conn->wsi[ACC]);
  313. }
  314. break;
  315. /* callbacks related to raw socket descriptor "accepted side" */
  316. case LWS_CALLBACK_RAW_PROXY_SRV_ADOPT:
  317. lwsl_debug("LWS_CALLBACK_RAW_SRV_ADOPT\n");
  318. if (!pss)
  319. return -1;
  320. conn = pss->conn = malloc(sizeof(struct conn));
  321. if (!pss->conn)
  322. return -1;
  323. memset(conn, 0, sizeof(*conn));
  324. conn->wsi[ACC] = wsi;
  325. conn->ticket_next = 1;
  326. conn->r[ACC] = lws_ring_create(sizeof(struct packet),
  327. RING_DEPTH, __destroy_packet);
  328. if (!conn->r[ACC]) {
  329. lwsl_err("%s: OOM\n", __func__);
  330. return -1;
  331. }
  332. conn->r[ONW] = lws_ring_create(sizeof(struct packet),
  333. RING_DEPTH, __destroy_packet);
  334. if (!conn->r[ONW]) {
  335. lws_ring_destroy(conn->r[ACC]);
  336. conn->r[ACC] = NULL;
  337. lwsl_err("%s: OOM\n", __func__);
  338. return -1;
  339. }
  340. conn->established[ACC] = 1;
  341. /* disable any rx until the client side is up */
  342. flow_control(conn, ACC, 0);
  343. lws_set_timeout(wsi, NO_PENDING_TIMEOUT, 0);
  344. /* try to create the onward client connection */
  345. connect_client(vhd, pss);
  346. break;
  347. case LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:
  348. lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_CLOSE:\n");
  349. if (!conn)
  350. break;
  351. conn->closed[ACC] = 1;
  352. if (conn->closed[ONW])
  353. destroy_conn(vhd, pss);
  354. break;
  355. case LWS_CALLBACK_RAW_PROXY_SRV_RX:
  356. lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_RX: rx %d\n", (int)len);
  357. if (!conn || !conn->wsi[ONW]) {
  358. lwsl_err("%s: LWS_CALLBACK_RAW_PROXY_SRV_RX: "
  359. "conn->wsi[ONW] NULL\n", __func__);
  360. return -1;
  361. }
  362. if (conn->closed[ONW]) {
  363. lwsl_info(" closed[ONW] %d\n", conn->closed[ONW]);
  364. return -1;
  365. }
  366. if (!len)
  367. return 0;
  368. pkt.payload = malloc(len);
  369. if (!pkt.payload) {
  370. lwsl_notice("OOM: dropping\n");
  371. return -1;
  372. }
  373. pkt.len = len;
  374. pkt.ticket = conn->ticket_next++;
  375. memcpy(pkt.payload, in, len);
  376. if (!lws_ring_insert(conn->r[ACC], &pkt, 1)) {
  377. __destroy_packet(&pkt);
  378. lwsl_notice("dropping!\n");
  379. return -1;
  380. }
  381. lwsl_debug("After acc RX: acc free: %d...\n",
  382. (int)lws_ring_get_count_free_elements(conn->r[ACC]));
  383. if (conn->rx_enabled[ACC] &&
  384. lws_ring_get_count_free_elements(conn->r[ACC]) <= 2)
  385. flow_control(conn, ACC, 0);
  386. if (conn->established[ONW] && !conn->closed[ONW])
  387. lws_callback_on_writable(conn->wsi[ONW]);
  388. break;
  389. case LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE:
  390. lwsl_debug("LWS_CALLBACK_RAW_PROXY_SRV_WRITEABLE\n");
  391. if (!conn || !conn->established[ONW] || conn->closed[ONW])
  392. break;
  393. ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
  394. if (!ppkt) {
  395. lwsl_info("%s: SRV_WRITABLE nothing in onw ring\n",
  396. __func__);
  397. break;
  398. }
  399. if (ppkt->ticket != conn->ticket_retired + 1) {
  400. lwsl_info("%s: onw ring has %d but next %d\n", __func__,
  401. ppkt->ticket, conn->ticket_retired + 1);
  402. lws_callback_on_writable(conn->wsi[ONW]);
  403. break;
  404. }
  405. n = lws_write(wsi, ppkt->payload, ppkt->len, LWS_WRITE_RAW);
  406. if (n < 0) {
  407. lwsl_info("%s: WRITEABLE: %d\n", __func__, n);
  408. return -1;
  409. }
  410. conn->ticket_retired = ppkt->ticket;
  411. lws_ring_consume(conn->r[ONW], &conn->t[ONW], NULL, 1);
  412. lws_ring_update_oldest_tail(conn->r[ONW], conn->t[ONW]);
  413. lwsl_debug("onward free: %d... waiting %d\n",
  414. (int)lws_ring_get_count_free_elements(conn->r[ONW]),
  415. (int)lws_ring_get_count_waiting_elements(conn->r[ONW],
  416. &conn->t[ONW]));
  417. if (!conn->rx_enabled[ONW] &&
  418. lws_ring_get_count_free_elements(conn->r[ONW]) > 2)
  419. flow_control(conn, ONW, 1);
  420. ppkt = lws_ring_get_element(conn->r[ONW], &conn->t[ONW]);
  421. lwsl_debug("%s: SRV_WRITABLE: next onw pkt %p idx %d vs %d\n",
  422. __func__, ppkt, ppkt ? ppkt->ticket : 0,
  423. conn->ticket_retired + 1);
  424. if (ppkt && ppkt->ticket == conn->ticket_retired + 1)
  425. lws_callback_on_writable(wsi);
  426. else {
  427. /*
  428. * defer checking for onward side closing until we
  429. * sent everything in the ring to accepted side
  430. */
  431. if (conn->closed[ONW])
  432. /*
  433. * there is never going to be any more... but
  434. * we may have some tx still in tx buflist /
  435. * partial
  436. */
  437. return lws_raw_transaction_completed(wsi);
  438. if (lws_ring_get_element(conn->r[ACC], &conn->t[ACC]))
  439. lws_callback_on_writable(conn->wsi[ONW]);
  440. }
  441. break;
  442. default:
  443. break;
  444. }
  445. return lws_callback_http_dummy(wsi, reason, user, in, len);
  446. }
  447. #define LWS_PLUGIN_PROTOCOL_RAW_PROXY { \
  448. "raw-proxy", \
  449. callback_raw_proxy, \
  450. sizeof(struct raw_pss), \
  451. 8192, \
  452. 8192, NULL, 0 \
  453. }
  454. #if !defined (LWS_PLUGIN_STATIC)
  455. static const struct lws_protocols protocols[] = {
  456. LWS_PLUGIN_PROTOCOL_RAW_PROXY
  457. };
  458. LWS_VISIBLE const lws_plugin_protocol_t lws_raw_proxy = {
  459. .hdr = {
  460. "raw proxy",
  461. "lws_protocol_plugin",
  462. LWS_PLUGIN_API_MAGIC
  463. },
  464. .protocols = protocols,
  465. .count_protocols = LWS_ARRAY_SIZE(protocols),
  466. .extensions = NULL,
  467. .count_extensions = 0,
  468. };
  469. #endif