RabbitMQ.cpp 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. #include "RabbitMQ.hpp"
  2. #ifdef ZT_CONTROLLER_USE_LIBPQ
  3. #include <amqp.h>
  4. #include <amqp_tcp_socket.h>
  5. #include <stdexcept>
  6. #include <cstring>
  7. namespace ZeroTier
  8. {
  9. RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
  10. : _mqc(cfg)
  11. , _qName(queueName)
  12. , _socket(NULL)
  13. , _status(0)
  14. {
  15. }
  16. RabbitMQ::~RabbitMQ()
  17. {
  18. amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
  19. amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
  20. amqp_destroy_connection(_conn);
  21. }
  22. void RabbitMQ::init()
  23. {
  24. struct timeval tval;
  25. memset(&tval, 0, sizeof(struct timeval));
  26. tval.tv_sec = 5;
  27. fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
  28. _conn = amqp_new_connection();
  29. _socket = amqp_tcp_socket_new(_conn);
  30. if (!_socket) {
  31. throw std::runtime_error("Can't create socket for RabbitMQ");
  32. }
  33. _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
  34. if (_status) {
  35. throw std::runtime_error("Can't connect to RabbitMQ");
  36. }
  37. amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
  38. _mqc->username, _mqc->password);
  39. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  40. throw std::runtime_error("RabbitMQ Login Error");
  41. }
  42. static int chan = 0;
  43. {
  44. Mutex::Lock l(_chan_m);
  45. _channel = ++chan;
  46. }
  47. amqp_channel_open(_conn, _channel);
  48. r = amqp_get_rpc_reply(_conn);
  49. if(r.reply_type != AMQP_RESPONSE_NORMAL) {
  50. throw std::runtime_error("Error opening communication channel");
  51. }
  52. _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
  53. r = amqp_get_rpc_reply(_conn);
  54. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  55. throw std::runtime_error("Error declaring queue " + std::string(_qName));
  56. }
  57. amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
  58. r = amqp_get_rpc_reply(_conn);
  59. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  60. throw std::runtime_error("Error consuming queue " + std::string(_qName));
  61. }
  62. fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
  63. }
  64. std::string RabbitMQ::consume()
  65. {
  66. amqp_rpc_reply_t res;
  67. amqp_envelope_t envelope;
  68. amqp_maybe_release_buffers(_conn);
  69. struct timeval timeout;
  70. timeout.tv_sec = 1;
  71. timeout.tv_usec = 0;
  72. res = amqp_consume_message(_conn, &envelope, &timeout, 0);
  73. if (res.reply_type != AMQP_RESPONSE_NORMAL) {
  74. if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
  75. // timeout waiting for message. Return empty string
  76. return "";
  77. } else {
  78. throw std::runtime_error("Error getting message");
  79. }
  80. }
  81. std::string msg(
  82. (const char*)envelope.message.body.bytes,
  83. envelope.message.body.len
  84. );
  85. amqp_destroy_envelope(&envelope);
  86. return msg;
  87. }
  88. }
  89. #endif // ZT_CONTROLLER_USE_LIBPQ