RabbitMQ.cpp 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. #include "RabbitMQ.hpp"
  2. #ifdef ZT_CONTROLLER_USE_LIBPQ
  3. #include <amqp.h>
  4. #include <amqp_tcp_socket.h>
  5. #include <stdexcept>
  6. #include <cstring>
  7. namespace ZeroTier
  8. {
  9. RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
  10. : _mqc(cfg)
  11. , _qName(queueName)
  12. , _socket(NULL)
  13. , _status(0)
  14. {
  15. }
  16. RabbitMQ::~RabbitMQ()
  17. {
  18. amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
  19. amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
  20. amqp_destroy_connection(_conn);
  21. }
  22. void RabbitMQ::init()
  23. {
  24. struct timeval tval;
  25. memset(&tval, 0, sizeof(struct timeval));
  26. tval.tv_sec = 5;
  27. fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
  28. _conn = amqp_new_connection();
  29. _socket = amqp_tcp_socket_new(_conn);
  30. if (!_socket) {
  31. throw std::runtime_error("Can't create socket for RabbitMQ");
  32. }
  33. _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
  34. if (_status) {
  35. throw std::runtime_error("Can't connect to RabbitMQ");
  36. }
  37. amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
  38. _mqc->username, _mqc->password);
  39. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  40. throw std::runtime_error("RabbitMQ Login Error");
  41. }
  42. static int chan = 0;
  43. {
  44. Mutex::Lock l(_chan_m);
  45. _channel = ++chan;
  46. }
  47. amqp_channel_open(_conn, _channel);
  48. r = amqp_get_rpc_reply(_conn);
  49. if(r.reply_type != AMQP_RESPONSE_NORMAL) {
  50. throw std::runtime_error("Error opening communication channel");
  51. }
  52. _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
  53. r = amqp_get_rpc_reply(_conn);
  54. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  55. throw std::runtime_error("Error declaring queue " + std::string(_qName));
  56. }
  57. amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
  58. r = amqp_get_rpc_reply(_conn);
  59. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  60. throw std::runtime_error("Error consuming queue " + std::string(_qName));
  61. }
  62. fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
  63. }
  64. std::string RabbitMQ::consume()
  65. {
  66. amqp_rpc_reply_t res;
  67. amqp_envelope_t envelope;
  68. amqp_maybe_release_buffers(_conn);
  69. res = amqp_consume_message(_conn, &envelope, NULL, 0);
  70. if (res.reply_type != AMQP_RESPONSE_NORMAL) {
  71. throw std::runtime_error("Error getting message");
  72. }
  73. std::string msg(
  74. (const char*)envelope.message.body.bytes,
  75. envelope.message.body.len
  76. );
  77. amqp_destroy_envelope(&envelope);
  78. return msg;
  79. }
  80. }
  81. #endif // ZT_CONTROLLER_USE_LIBPQ