RabbitMQ.cpp 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. /*
  2. * Copyright (c)2019 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2023-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include "RabbitMQ.hpp"
  14. #ifdef ZT_CONTROLLER_USE_LIBPQ
  15. #include <amqp.h>
  16. #include <amqp_tcp_socket.h>
  17. #include <stdexcept>
  18. #include <cstring>
  19. namespace ZeroTier
  20. {
  21. RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
  22. : _mqc(cfg)
  23. , _qName(queueName)
  24. , _socket(NULL)
  25. , _status(0)
  26. {
  27. }
  28. RabbitMQ::~RabbitMQ()
  29. {
  30. amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
  31. amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
  32. amqp_destroy_connection(_conn);
  33. }
  34. void RabbitMQ::init()
  35. {
  36. struct timeval tval;
  37. memset(&tval, 0, sizeof(struct timeval));
  38. tval.tv_sec = 5;
  39. fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
  40. _conn = amqp_new_connection();
  41. _socket = amqp_tcp_socket_new(_conn);
  42. if (!_socket) {
  43. throw std::runtime_error("Can't create socket for RabbitMQ");
  44. }
  45. fprintf(stderr, "RabbitMQ: amqp://%s:%s@%s:%d\n", _mqc->username.c_str(), _mqc->password.c_str(), _mqc->host.c_str(), _mqc->port);
  46. _status = amqp_socket_open_noblock(_socket, _mqc->host.c_str(), _mqc->port, &tval);
  47. if (_status) {
  48. throw std::runtime_error("Can't connect to RabbitMQ");
  49. }
  50. amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
  51. _mqc->username.c_str(), _mqc->password.c_str());
  52. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  53. throw std::runtime_error("RabbitMQ Login Error");
  54. }
  55. static int chan = 0;
  56. {
  57. Mutex::Lock l(_chan_m);
  58. _channel = ++chan;
  59. }
  60. amqp_channel_open(_conn, _channel);
  61. r = amqp_get_rpc_reply(_conn);
  62. if(r.reply_type != AMQP_RESPONSE_NORMAL) {
  63. throw std::runtime_error("Error opening communication channel");
  64. }
  65. _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
  66. r = amqp_get_rpc_reply(_conn);
  67. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  68. throw std::runtime_error("Error declaring queue " + std::string(_qName));
  69. }
  70. amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
  71. r = amqp_get_rpc_reply(_conn);
  72. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  73. throw std::runtime_error("Error consuming queue " + std::string(_qName));
  74. }
  75. fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
  76. }
  77. std::string RabbitMQ::consume()
  78. {
  79. amqp_rpc_reply_t res;
  80. amqp_envelope_t envelope;
  81. amqp_maybe_release_buffers(_conn);
  82. struct timeval timeout;
  83. timeout.tv_sec = 1;
  84. timeout.tv_usec = 0;
  85. res = amqp_consume_message(_conn, &envelope, &timeout, 0);
  86. if (res.reply_type != AMQP_RESPONSE_NORMAL) {
  87. if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
  88. // timeout waiting for message. Return empty string
  89. return "";
  90. } else {
  91. throw std::runtime_error("Error getting message");
  92. }
  93. }
  94. std::string msg(
  95. (const char*)envelope.message.body.bytes,
  96. envelope.message.body.len
  97. );
  98. amqp_destroy_envelope(&envelope);
  99. return msg;
  100. }
  101. }
  102. #endif // ZT_CONTROLLER_USE_LIBPQ