RabbitMQ.cpp 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #include "RabbitMQ.hpp"
  2. #include <amqp.h>
  3. #include <amqp_tcp_socket.h>
  4. #include <stdexcept>
  5. #include <cstring>
  6. namespace ZeroTier
  7. {
  8. RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
  9. : _mqc(cfg)
  10. , _qName(queueName)
  11. , _socket(NULL)
  12. , _status(0)
  13. {
  14. }
  15. RabbitMQ::~RabbitMQ()
  16. {
  17. amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
  18. amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
  19. amqp_destroy_connection(_conn);
  20. }
  21. void RabbitMQ::init()
  22. {
  23. struct timeval tval;
  24. memset(&tval, 0, sizeof(struct timeval));
  25. tval.tv_sec = 5;
  26. fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
  27. _conn = amqp_new_connection();
  28. _socket = amqp_tcp_socket_new(_conn);
  29. if (!_socket) {
  30. throw std::runtime_error("Can't create socket for RabbitMQ");
  31. }
  32. _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
  33. if (_status) {
  34. throw std::runtime_error("Can't connect to RabbitMQ");
  35. }
  36. amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
  37. _mqc->username, _mqc->password);
  38. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  39. throw std::runtime_error("RabbitMQ Login Error");
  40. }
  41. static int chan = 0;
  42. _channel = ++chan;
  43. amqp_channel_open(_conn, _channel);
  44. r = amqp_get_rpc_reply(_conn);
  45. if(r.reply_type != AMQP_RESPONSE_NORMAL) {
  46. throw std::runtime_error("Error opening communication channel");
  47. }
  48. _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
  49. r = amqp_get_rpc_reply(_conn);
  50. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  51. throw std::runtime_error("Error declaring queue " + std::string(_qName));
  52. }
  53. amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
  54. r = amqp_get_rpc_reply(_conn);
  55. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  56. throw std::runtime_error("Error consuming queue " + std::string(_qName));
  57. }
  58. fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
  59. }
  60. std::string RabbitMQ::consume()
  61. {
  62. amqp_rpc_reply_t res;
  63. amqp_envelope_t envelope;
  64. amqp_maybe_release_buffers(_conn);
  65. res = amqp_consume_message(_conn, &envelope, NULL, 0);
  66. if (res.reply_type != AMQP_RESPONSE_NORMAL) {
  67. throw std::runtime_error("Error getting message");
  68. }
  69. std::string msg(
  70. (const char*)envelope.message.body.bytes,
  71. envelope.message.body.len
  72. );
  73. amqp_destroy_envelope(&envelope);
  74. return msg;
  75. }
  76. }