common.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. /*
  2. * ***** BEGIN LICENSE BLOCK *****
  3. * Version: MIT
  4. *
  5. * Portions created by Alan Antonuk are Copyright (c) 2012-2013
  6. * Alan Antonuk. All Rights Reserved.
  7. *
  8. * Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
  9. * All Rights Reserved.
  10. *
  11. * Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
  12. * VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
  13. *
  14. * Permission is hereby granted, free of charge, to any person
  15. * obtaining a copy of this software and associated documentation
  16. * files (the "Software"), to deal in the Software without
  17. * restriction, including without limitation the rights to use, copy,
  18. * modify, merge, publish, distribute, sublicense, and/or sell copies
  19. * of the Software, and to permit persons to whom the Software is
  20. * furnished to do so, subject to the following conditions:
  21. *
  22. * The above copyright notice and this permission notice shall be
  23. * included in all copies or substantial portions of the Software.
  24. *
  25. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  26. * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  27. * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  28. * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  29. * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  30. * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  31. * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  32. * SOFTWARE.
  33. * ***** END LICENSE BLOCK *****
  34. */
  35. #ifdef HAVE_CONFIG_H
  36. #include "config.h"
  37. #endif
  38. #include "common.h"
  39. #ifdef WITH_SSL
  40. #include <amqp_ssl_socket.h>
  41. #endif
  42. #include <amqp_tcp_socket.h>
  43. #include <errno.h>
  44. #include <fcntl.h>
  45. #include <stdarg.h>
  46. #include <stdio.h>
  47. #include <stdlib.h>
  48. #include <string.h>
  49. #include <unistd.h>
  50. #ifdef WINDOWS
  51. #include "compat.h"
  52. #endif
  53. void die(const char *fmt, ...) {
  54. va_list ap;
  55. va_start(ap, fmt);
  56. vfprintf(stderr, fmt, ap);
  57. va_end(ap);
  58. fprintf(stderr, "\n");
  59. exit(1);
  60. }
  61. void die_errno(int err, const char *fmt, ...) {
  62. va_list ap;
  63. if (err == 0) {
  64. return;
  65. }
  66. va_start(ap, fmt);
  67. vfprintf(stderr, fmt, ap);
  68. va_end(ap);
  69. fprintf(stderr, ": %s\n", strerror(err));
  70. exit(1);
  71. }
  72. void die_amqp_error(int err, const char *fmt, ...) {
  73. va_list ap;
  74. if (err >= 0) {
  75. return;
  76. }
  77. va_start(ap, fmt);
  78. vfprintf(stderr, fmt, ap);
  79. va_end(ap);
  80. fprintf(stderr, ": %s\n", amqp_error_string2(err));
  81. exit(1);
  82. }
  83. const char *amqp_server_exception_string(amqp_rpc_reply_t r) {
  84. int res;
  85. static char s[512];
  86. switch (r.reply.id) {
  87. case AMQP_CONNECTION_CLOSE_METHOD: {
  88. amqp_connection_close_t *m = (amqp_connection_close_t *)r.reply.decoded;
  89. res = snprintf(s, sizeof(s), "server connection error %d, message: %.*s",
  90. m->reply_code, (int)m->reply_text.len,
  91. (char *)m->reply_text.bytes);
  92. break;
  93. }
  94. case AMQP_CHANNEL_CLOSE_METHOD: {
  95. amqp_channel_close_t *m = (amqp_channel_close_t *)r.reply.decoded;
  96. res = snprintf(s, sizeof(s), "server channel error %d, message: %.*s",
  97. m->reply_code, (int)m->reply_text.len,
  98. (char *)m->reply_text.bytes);
  99. break;
  100. }
  101. default:
  102. res = snprintf(s, sizeof(s), "unknown server error, method id 0x%08X",
  103. r.reply.id);
  104. break;
  105. }
  106. return res >= 0 ? s : NULL;
  107. }
  108. const char *amqp_rpc_reply_string(amqp_rpc_reply_t r) {
  109. switch (r.reply_type) {
  110. case AMQP_RESPONSE_NORMAL:
  111. return "normal response";
  112. case AMQP_RESPONSE_NONE:
  113. return "missing RPC reply type";
  114. case AMQP_RESPONSE_LIBRARY_EXCEPTION:
  115. return amqp_error_string2(r.library_error);
  116. case AMQP_RESPONSE_SERVER_EXCEPTION:
  117. return amqp_server_exception_string(r);
  118. default:
  119. abort();
  120. }
  121. }
  122. void die_rpc(amqp_rpc_reply_t r, const char *fmt, ...) {
  123. va_list ap;
  124. if (r.reply_type == AMQP_RESPONSE_NORMAL) {
  125. return;
  126. }
  127. va_start(ap, fmt);
  128. vfprintf(stderr, fmt, ap);
  129. va_end(ap);
  130. fprintf(stderr, ": %s\n", amqp_rpc_reply_string(r));
  131. exit(1);
  132. }
  133. static char *amqp_url;
  134. static char *amqp_server;
  135. static int amqp_port = -1;
  136. static char *amqp_vhost;
  137. static char *amqp_username;
  138. static char *amqp_password;
  139. static int amqp_heartbeat = 0;
  140. #ifdef WITH_SSL
  141. static int amqp_ssl = 0;
  142. static char *amqp_cacert = "/etc/ssl/certs/cacert.pem";
  143. static char *amqp_key = NULL;
  144. static char *amqp_cert = NULL;
  145. #endif /* WITH_SSL */
  146. const char *connect_options_title = "Connection options";
  147. struct poptOption connect_options[] = {
  148. {"url", 'u', POPT_ARG_STRING, &amqp_url, 0, "the AMQP URL to connect to",
  149. "amqp://..."},
  150. {"server", 's', POPT_ARG_STRING, &amqp_server, 0,
  151. "the AMQP server to connect to", "hostname"},
  152. {"port", 0, POPT_ARG_INT, &amqp_port, 0, "the port to connect on", "port"},
  153. {"vhost", 0, POPT_ARG_STRING, &amqp_vhost, 0,
  154. "the vhost to use when connecting", "vhost"},
  155. {"username", 0, POPT_ARG_STRING, &amqp_username, 0,
  156. "the username to login with", "username"},
  157. {"password", 0, POPT_ARG_STRING, &amqp_password, 0,
  158. "the password to login with", "password"},
  159. {"heartbeat", 0, POPT_ARG_INT, &amqp_heartbeat, 0,
  160. "heartbeat interval, set to 0 to disable", "heartbeat"},
  161. #ifdef WITH_SSL
  162. {"ssl", 0, POPT_ARG_NONE, &amqp_ssl, 0, "connect over SSL/TLS", NULL},
  163. {"cacert", 0, POPT_ARG_STRING, &amqp_cacert, 0,
  164. "path to the CA certificate file", "cacert.pem"},
  165. {"key", 0, POPT_ARG_STRING, &amqp_key, 0,
  166. "path to the client private key file", "key.pem"},
  167. {"cert", 0, POPT_ARG_STRING, &amqp_cert, 0,
  168. "path to the client certificate file", "cert.pem"},
  169. #endif /* WITH_SSL */
  170. {NULL, '\0', 0, NULL, 0, NULL, NULL}};
  171. static void init_connection_info(struct amqp_connection_info *ci) {
  172. ci->user = NULL;
  173. ci->password = NULL;
  174. ci->host = NULL;
  175. ci->port = -1;
  176. ci->vhost = NULL;
  177. ci->user = NULL;
  178. amqp_default_connection_info(ci);
  179. if (amqp_url)
  180. die_amqp_error(amqp_parse_url(strdup(amqp_url), ci), "Parsing URL '%s'",
  181. amqp_url);
  182. if (amqp_server) {
  183. char *colon;
  184. if (amqp_url) {
  185. die("--server and --url options cannot be used at the same time");
  186. }
  187. /* parse the server string into a hostname and a port */
  188. colon = strchr(amqp_server, ':');
  189. if (colon) {
  190. char *port_end;
  191. size_t host_len;
  192. /* Deprecate specifying the port number with the
  193. --server option, because it is not ipv6 friendly.
  194. --url now allows connection options to be
  195. specified concisely. */
  196. fprintf(stderr,
  197. "Specifying the port number with --server is deprecated\n");
  198. host_len = colon - amqp_server;
  199. ci->host = malloc(host_len + 1);
  200. memcpy(ci->host, amqp_server, host_len);
  201. ci->host[host_len] = 0;
  202. if (amqp_port >= 0) {
  203. die("both --server and --port options specify server port");
  204. }
  205. ci->port = strtol(colon + 1, &port_end, 10);
  206. if (ci->port < 0 || ci->port > 65535 || port_end == colon + 1 ||
  207. *port_end != 0)
  208. die("bad server port number in '%s'", amqp_server);
  209. }
  210. #if WITH_SSL
  211. if (amqp_ssl && !ci->ssl) {
  212. die("the --ssl option specifies an SSL connection"
  213. " but the --url option does not");
  214. }
  215. #endif
  216. }
  217. if (amqp_port >= 0) {
  218. if (amqp_url) {
  219. die("--port and --url options cannot be used at the same time");
  220. }
  221. ci->port = amqp_port;
  222. }
  223. if (amqp_username) {
  224. if (amqp_url) {
  225. die("--username and --url options cannot be used at the same time");
  226. }
  227. ci->user = amqp_username;
  228. }
  229. if (amqp_password) {
  230. if (amqp_url) {
  231. die("--password and --url options cannot be used at the same time");
  232. }
  233. ci->password = amqp_password;
  234. }
  235. if (amqp_vhost) {
  236. if (amqp_url) {
  237. die("--vhost and --url options cannot be used at the same time");
  238. }
  239. ci->vhost = amqp_vhost;
  240. }
  241. if (amqp_heartbeat < 0) {
  242. die("--heartbeat must be a positive value");
  243. }
  244. }
  245. amqp_connection_state_t make_connection(void) {
  246. int status;
  247. amqp_socket_t *socket = NULL;
  248. struct amqp_connection_info ci;
  249. amqp_connection_state_t conn;
  250. init_connection_info(&ci);
  251. conn = amqp_new_connection();
  252. if (ci.ssl) {
  253. #ifdef WITH_SSL
  254. socket = amqp_ssl_socket_new(conn);
  255. if (!socket) {
  256. die("creating SSL/TLS socket");
  257. }
  258. if (amqp_cacert) {
  259. amqp_ssl_socket_set_cacert(socket, amqp_cacert);
  260. }
  261. if (amqp_key) {
  262. amqp_ssl_socket_set_key(socket, amqp_cert, amqp_key);
  263. }
  264. #else
  265. die("librabbitmq was not built with SSL/TLS support");
  266. #endif
  267. } else {
  268. socket = amqp_tcp_socket_new(conn);
  269. if (!socket) {
  270. die("creating TCP socket (out of memory)");
  271. }
  272. }
  273. status = amqp_socket_open(socket, ci.host, ci.port);
  274. if (status) {
  275. die("opening socket to %s:%d", ci.host, ci.port);
  276. }
  277. die_rpc(amqp_login(conn, ci.vhost, 0, 131072, amqp_heartbeat,
  278. AMQP_SASL_METHOD_PLAIN, ci.user, ci.password),
  279. "logging in to AMQP server");
  280. if (!amqp_channel_open(conn, 1)) {
  281. die_rpc(amqp_get_rpc_reply(conn), "opening channel");
  282. }
  283. return conn;
  284. }
  285. void close_connection(amqp_connection_state_t conn) {
  286. int res;
  287. die_rpc(amqp_channel_close(conn, 1, AMQP_REPLY_SUCCESS), "closing channel");
  288. die_rpc(amqp_connection_close(conn, AMQP_REPLY_SUCCESS),
  289. "closing connection");
  290. res = amqp_destroy_connection(conn);
  291. die_amqp_error(res, "closing connection");
  292. }
  293. amqp_bytes_t read_all(int fd) {
  294. size_t space = 4096;
  295. amqp_bytes_t bytes;
  296. bytes.bytes = malloc(space);
  297. bytes.len = 0;
  298. for (;;) {
  299. ssize_t res = read(fd, (char *)bytes.bytes + bytes.len, space - bytes.len);
  300. if (res == 0) {
  301. break;
  302. }
  303. if (res < 0) {
  304. if (errno == EINTR) {
  305. continue;
  306. }
  307. die_errno(errno, "reading");
  308. }
  309. bytes.len += res;
  310. if (bytes.len == space) {
  311. space *= 2;
  312. bytes.bytes = realloc(bytes.bytes, space);
  313. }
  314. }
  315. return bytes;
  316. }
  317. void write_all(int fd, amqp_bytes_t data) {
  318. while (data.len > 0) {
  319. ssize_t res = write(fd, data.bytes, data.len);
  320. if (res < 0) {
  321. die_errno(errno, "write");
  322. }
  323. data.len -= res;
  324. data.bytes = (char *)data.bytes + res;
  325. }
  326. }
  327. void copy_body(amqp_connection_state_t conn, int fd) {
  328. size_t body_remaining;
  329. amqp_frame_t frame;
  330. int res = amqp_simple_wait_frame(conn, &frame);
  331. die_amqp_error(res, "waiting for header frame");
  332. if (frame.frame_type != AMQP_FRAME_HEADER) {
  333. die("expected header, got frame type 0x%X", frame.frame_type);
  334. }
  335. body_remaining = frame.payload.properties.body_size;
  336. while (body_remaining) {
  337. res = amqp_simple_wait_frame(conn, &frame);
  338. die_amqp_error(res, "waiting for body frame");
  339. if (frame.frame_type != AMQP_FRAME_BODY) {
  340. die("expected body, got frame type 0x%X", frame.frame_type);
  341. }
  342. write_all(fd, frame.payload.body_fragment);
  343. body_remaining -= frame.payload.body_fragment.len;
  344. }
  345. }
  346. poptContext process_options(int argc, const char **argv,
  347. struct poptOption *options, const char *help) {
  348. int c;
  349. poptContext opts = poptGetContext(NULL, argc, argv, options, 0);
  350. poptSetOtherOptionHelp(opts, help);
  351. while ((c = poptGetNextOpt(opts)) >= 0) {
  352. /* no options require explicit handling */
  353. }
  354. if (c < -1) {
  355. fprintf(stderr, "%s: %s\n", poptBadOption(opts, POPT_BADOPTION_NOALIAS),
  356. poptStrerror(c));
  357. poptPrintUsage(opts, stderr, 0);
  358. exit(1);
  359. }
  360. return opts;
  361. }
  362. void process_all_options(int argc, const char **argv,
  363. struct poptOption *options) {
  364. poptContext opts = process_options(argc, argv, options, "[OPTIONS]...");
  365. const char *opt = poptPeekArg(opts);
  366. if (opt) {
  367. fprintf(stderr, "unexpected operand: %s\n", opt);
  368. poptPrintUsage(opts, stderr, 0);
  369. exit(1);
  370. }
  371. poptFreeContext(opts);
  372. }
  373. amqp_bytes_t cstring_bytes(const char *str) {
  374. return str ? amqp_cstring_bytes(str) : amqp_empty_bytes;
  375. }