RabbitMQ.cpp 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. /*
  2. * Copyright (c)2013-2020 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2024-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #include "RabbitMQ.hpp"
  14. #ifdef ZT_CONTROLLER_USE_LIBPQ
  15. #include <amqp.h>
  16. #include <amqp_tcp_socket.h>
  17. #include <stdexcept>
  18. #include <cstring>
  19. namespace ZeroTier
  20. {
  21. RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
  22. : _mqc(cfg)
  23. , _qName(queueName)
  24. , _socket(NULL)
  25. , _status(0)
  26. {
  27. }
  28. RabbitMQ::~RabbitMQ()
  29. {
  30. amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
  31. amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
  32. amqp_destroy_connection(_conn);
  33. }
  34. void RabbitMQ::init()
  35. {
  36. struct timeval tval;
  37. memset(&tval, 0, sizeof(struct timeval));
  38. tval.tv_sec = 5;
  39. fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
  40. _conn = amqp_new_connection();
  41. _socket = amqp_tcp_socket_new(_conn);
  42. if (!_socket) {
  43. throw std::runtime_error("Can't create socket for RabbitMQ");
  44. }
  45. _status = amqp_socket_open_noblock(_socket, _mqc->host.c_str(), _mqc->port, &tval);
  46. if (_status) {
  47. throw std::runtime_error("Can't connect to RabbitMQ");
  48. }
  49. amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
  50. _mqc->username.c_str(), _mqc->password.c_str());
  51. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  52. throw std::runtime_error("RabbitMQ Login Error");
  53. }
  54. static int chan = 0;
  55. {
  56. Mutex::Lock l(_chan_m);
  57. _channel = ++chan;
  58. }
  59. amqp_channel_open(_conn, _channel);
  60. r = amqp_get_rpc_reply(_conn);
  61. if(r.reply_type != AMQP_RESPONSE_NORMAL) {
  62. throw std::runtime_error("Error opening communication channel");
  63. }
  64. _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
  65. r = amqp_get_rpc_reply(_conn);
  66. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  67. throw std::runtime_error("Error declaring queue " + std::string(_qName));
  68. }
  69. amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
  70. r = amqp_get_rpc_reply(_conn);
  71. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  72. throw std::runtime_error("Error consuming queue " + std::string(_qName));
  73. }
  74. fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
  75. }
  76. std::string RabbitMQ::consume()
  77. {
  78. amqp_rpc_reply_t res;
  79. amqp_envelope_t envelope;
  80. amqp_maybe_release_buffers(_conn);
  81. struct timeval timeout;
  82. timeout.tv_sec = 1;
  83. timeout.tv_usec = 0;
  84. res = amqp_consume_message(_conn, &envelope, &timeout, 0);
  85. if (res.reply_type != AMQP_RESPONSE_NORMAL) {
  86. if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
  87. // timeout waiting for message. Return empty string
  88. return "";
  89. } else {
  90. throw std::runtime_error("Error getting message");
  91. }
  92. }
  93. std::string msg(
  94. (const char*)envelope.message.body.bytes,
  95. envelope.message.body.len
  96. );
  97. amqp_destroy_envelope(&envelope);
  98. return msg;
  99. }
  100. }
  101. #endif // ZT_CONTROLLER_USE_LIBPQ