RabbitMQ.cpp 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. #include "RabbitMQ.hpp"
  2. #include <amqp.h>
  3. #include <amqp_tcp_socket.h>
  4. #include <stdexcept>
  5. #include <cstring>
  6. namespace ZeroTier
  7. {
  8. RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
  9. : _mqc(cfg)
  10. , _qName(queueName)
  11. , _socket(NULL)
  12. , _status(0)
  13. {
  14. }
  15. RabbitMQ::~RabbitMQ()
  16. {
  17. amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
  18. amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
  19. amqp_destroy_connection(_conn);
  20. }
  21. void RabbitMQ::init()
  22. {
  23. struct timeval tval;
  24. memset(&tval, 0, sizeof(struct timeval));
  25. tval.tv_sec = 5;
  26. fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
  27. _conn = amqp_new_connection();
  28. _socket = amqp_tcp_socket_new(_conn);
  29. if (!_socket) {
  30. throw std::runtime_error("Can't create socket for RabbitMQ");
  31. }
  32. _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
  33. if (_status) {
  34. throw std::runtime_error("Can't connect to RabbitMQ");
  35. }
  36. amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
  37. _mqc->username, _mqc->password);
  38. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  39. throw std::runtime_error("RabbitMQ Login Error");
  40. }
  41. static int chan = 0;
  42. {
  43. Mutex::Lock l(_chan_m);
  44. _channel = ++chan;
  45. }
  46. amqp_channel_open(_conn, _channel);
  47. r = amqp_get_rpc_reply(_conn);
  48. if(r.reply_type != AMQP_RESPONSE_NORMAL) {
  49. throw std::runtime_error("Error opening communication channel");
  50. }
  51. _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
  52. r = amqp_get_rpc_reply(_conn);
  53. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  54. throw std::runtime_error("Error declaring queue " + std::string(_qName));
  55. }
  56. amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
  57. r = amqp_get_rpc_reply(_conn);
  58. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  59. throw std::runtime_error("Error consuming queue " + std::string(_qName));
  60. }
  61. fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
  62. }
  63. std::string RabbitMQ::consume()
  64. {
  65. amqp_rpc_reply_t res;
  66. amqp_envelope_t envelope;
  67. amqp_maybe_release_buffers(_conn);
  68. res = amqp_consume_message(_conn, &envelope, NULL, 0);
  69. if (res.reply_type != AMQP_RESPONSE_NORMAL) {
  70. throw std::runtime_error("Error getting message");
  71. }
  72. std::string msg(
  73. (const char*)envelope.message.body.bytes,
  74. envelope.message.body.len
  75. );
  76. amqp_destroy_envelope(&envelope);
  77. return msg;
  78. }
  79. }