RabbitMQ.cpp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. /*
  2. * ZeroTier One - Network Virtualization Everywhere
  3. * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  17. *
  18. * --
  19. *
  20. * You can be released from the requirements of the license by purchasing
  21. * a commercial license. Buying such a license is mandatory as soon as you
  22. * develop commercial closed-source software that incorporates or links
  23. * directly against ZeroTier software without disclosing the source code
  24. * of your own application.
  25. */
  26. #include "RabbitMQ.hpp"
  27. #ifdef ZT_CONTROLLER_USE_LIBPQ
  28. #include <amqp.h>
  29. #include <amqp_tcp_socket.h>
  30. #include <stdexcept>
  31. #include <cstring>
  32. namespace ZeroTier
  33. {
  34. RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
  35. : _mqc(cfg)
  36. , _qName(queueName)
  37. , _socket(NULL)
  38. , _status(0)
  39. {
  40. }
  41. RabbitMQ::~RabbitMQ()
  42. {
  43. amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
  44. amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
  45. amqp_destroy_connection(_conn);
  46. }
  47. void RabbitMQ::init()
  48. {
  49. struct timeval tval;
  50. memset(&tval, 0, sizeof(struct timeval));
  51. tval.tv_sec = 5;
  52. fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
  53. _conn = amqp_new_connection();
  54. _socket = amqp_tcp_socket_new(_conn);
  55. if (!_socket) {
  56. throw std::runtime_error("Can't create socket for RabbitMQ");
  57. }
  58. _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
  59. if (_status) {
  60. throw std::runtime_error("Can't connect to RabbitMQ");
  61. }
  62. amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
  63. _mqc->username, _mqc->password);
  64. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  65. throw std::runtime_error("RabbitMQ Login Error");
  66. }
  67. static int chan = 0;
  68. {
  69. Mutex::Lock l(_chan_m);
  70. _channel = ++chan;
  71. }
  72. amqp_channel_open(_conn, _channel);
  73. r = amqp_get_rpc_reply(_conn);
  74. if(r.reply_type != AMQP_RESPONSE_NORMAL) {
  75. throw std::runtime_error("Error opening communication channel");
  76. }
  77. _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
  78. r = amqp_get_rpc_reply(_conn);
  79. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  80. throw std::runtime_error("Error declaring queue " + std::string(_qName));
  81. }
  82. amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
  83. r = amqp_get_rpc_reply(_conn);
  84. if (r.reply_type != AMQP_RESPONSE_NORMAL) {
  85. throw std::runtime_error("Error consuming queue " + std::string(_qName));
  86. }
  87. fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
  88. }
  89. std::string RabbitMQ::consume()
  90. {
  91. amqp_rpc_reply_t res;
  92. amqp_envelope_t envelope;
  93. amqp_maybe_release_buffers(_conn);
  94. struct timeval timeout;
  95. timeout.tv_sec = 1;
  96. timeout.tv_usec = 0;
  97. res = amqp_consume_message(_conn, &envelope, &timeout, 0);
  98. if (res.reply_type != AMQP_RESPONSE_NORMAL) {
  99. if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
  100. // timeout waiting for message. Return empty string
  101. return "";
  102. } else {
  103. throw std::runtime_error("Error getting message");
  104. }
  105. }
  106. std::string msg(
  107. (const char*)envelope.message.body.bytes,
  108. envelope.message.body.len
  109. );
  110. amqp_destroy_envelope(&envelope);
  111. return msg;
  112. }
  113. }
  114. #endif // ZT_CONTROLLER_USE_LIBPQ