소스 검색

amqp_consume_message now has a timeout

RabbitMQ::consume() will return an empty string if the call to amqp_consume_message times out
Grant Limberg 6 년 전
부모
커밋
6a027c9c0a
2개의 변경된 파일17개의 추가작업 그리고 2개의 파일을 삭제
  1. 6 0
      controller/PostgreSQL.cpp
  2. 11 2
      controller/RabbitMQ.cpp

+ 6 - 0
controller/PostgreSQL.cpp

@@ -666,6 +666,9 @@ void PostgreSQL::_membersWatcher_RabbitMQ() {
 		try {
 			std::string msg = rmq.consume();
 			// fprintf(stderr, "Got Member Update: %s\n", msg.c_str());
+			if (msg.empty()) {
+				continue;
+			}
 			json tmp(json::parse(msg));
 			json &ov = tmp["old_val"];
 			json &nv = tmp["new_val"];
@@ -766,6 +769,9 @@ void PostgreSQL::_networksWatcher_RabbitMQ() {
 	while (_run == 1) {
 		try {
 			std::string msg = rmq.consume();
+			if (msg.empty()) {
+				continue;
+			}
 			// fprintf(stderr, "Got network update: %s\n", msg.c_str());
 			json tmp(json::parse(msg));
 			json &ov = tmp["old_val"];

+ 11 - 2
controller/RabbitMQ.cpp

@@ -80,9 +80,18 @@ std::string RabbitMQ::consume()
     amqp_envelope_t envelope;
     amqp_maybe_release_buffers(_conn);
 
-    res = amqp_consume_message(_conn, &envelope, NULL, 0);
+    struct timeval timeout;
+    timeout.tv_sec = 1;
+    timeout.tv_usec = 0;
+
+    res = amqp_consume_message(_conn, &envelope, &timeout, 0);
     if (res.reply_type != AMQP_RESPONSE_NORMAL) {
-        throw std::runtime_error("Error getting message");
+        if (res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
+            // timeout waiting for message.  Return empty string
+            return "";
+        } else {
+            throw std::runtime_error("Error getting message");
+        }
     }
 
     std::string msg(