123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- /*
- * ZeroTier One - Network Virtualization Everywhere
- * Copyright (C) 2011-2019 ZeroTier, Inc. https://www.zerotier.com/
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * --
- *
- * You can be released from the requirements of the license by purchasing
- * a commercial license. Buying such a license is mandatory as soon as you
- * develop commercial closed-source software that incorporates or links
- * directly against ZeroTier software without disclosing the source code
- * of your own application.
- */
- #include "RabbitMQ.hpp"
- #ifdef ZT_CONTROLLER_USE_LIBPQ
- #include <amqp.h>
- #include <amqp_tcp_socket.h>
- #include <stdexcept>
- #include <cstring>
- namespace ZeroTier
- {
- RabbitMQ::RabbitMQ(MQConfig *cfg, const char *queueName)
- : _mqc(cfg)
- , _qName(queueName)
- , _socket(NULL)
- , _status(0)
- {
- }
- RabbitMQ::~RabbitMQ()
- {
- amqp_channel_close(_conn, _channel, AMQP_REPLY_SUCCESS);
- amqp_connection_close(_conn, AMQP_REPLY_SUCCESS);
- amqp_destroy_connection(_conn);
- }
- void RabbitMQ::init()
- {
- struct timeval tval;
- memset(&tval, 0, sizeof(struct timeval));
- tval.tv_sec = 5;
- fprintf(stderr, "Initializing RabbitMQ %s\n", _qName);
- _conn = amqp_new_connection();
- _socket = amqp_tcp_socket_new(_conn);
- if (!_socket) {
- throw std::runtime_error("Can't create socket for RabbitMQ");
- }
-
- _status = amqp_socket_open_noblock(_socket, _mqc->host, _mqc->port, &tval);
- if (_status) {
- throw std::runtime_error("Can't connect to RabbitMQ");
- }
-
- amqp_rpc_reply_t r = amqp_login(_conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN,
- _mqc->username, _mqc->password);
- if (r.reply_type != AMQP_RESPONSE_NORMAL) {
- throw std::runtime_error("RabbitMQ Login Error");
- }
- static int chan = 0;
- {
- Mutex::Lock l(_chan_m);
- _channel = ++chan;
- }
- amqp_channel_open(_conn, _channel);
- r = amqp_get_rpc_reply(_conn);
- if(r.reply_type != AMQP_RESPONSE_NORMAL) {
- throw std::runtime_error("Error opening communication channel");
- }
-
- _q = amqp_queue_declare(_conn, _channel, amqp_cstring_bytes(_qName), 0, 0, 0, 0, amqp_empty_table);
- r = amqp_get_rpc_reply(_conn);
- if (r.reply_type != AMQP_RESPONSE_NORMAL) {
- throw std::runtime_error("Error declaring queue " + std::string(_qName));
- }
- amqp_basic_consume(_conn, _channel, amqp_cstring_bytes(_qName), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
- r = amqp_get_rpc_reply(_conn);
- if (r.reply_type != AMQP_RESPONSE_NORMAL) {
- throw std::runtime_error("Error consuming queue " + std::string(_qName));
- }
- fprintf(stderr, "RabbitMQ Init OK %s\n", _qName);
- }
- std::string RabbitMQ::consume()
- {
- amqp_rpc_reply_t res;
- amqp_envelope_t envelope;
- amqp_maybe_release_buffers(_conn);
- struct timeval timeout;
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
- res = amqp_consume_message(_conn, &envelope, &timeout, 0);
- if (res.reply_type != AMQP_RESPONSE_NORMAL) {
- if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
- // timeout waiting for message. Return empty string
- return "";
- } else {
- throw std::runtime_error("Error getting message");
- }
- }
- std::string msg(
- (const char*)envelope.message.body.bytes,
- envelope.message.body.len
- );
- amqp_destroy_envelope(&envelope);
- return msg;
- }
- }
- #endif // ZT_CONTROLLER_USE_LIBPQ
|