test_basic.c 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. /*
  2. * Copyright 2017 Simon Giesecke
  3. *
  4. * Permission is hereby granted, free of charge, to any person
  5. * obtaining a copy of this software and associated documentation
  6. * files (the "Software"), to deal in the Software without
  7. * restriction, including without limitation the rights to use, copy,
  8. * modify, merge, publish, distribute, sublicense, and/or sell copies
  9. * of the Software, and to permit persons to whom the Software is
  10. * furnished to do so, subject to the following conditions:
  11. *
  12. * The above copyright notice and this permission notice shall be
  13. * included in all copies or substantial portions of the Software.
  14. *
  15. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  16. * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  17. * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  18. * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  19. * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  20. * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  21. * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  22. * SOFTWARE.
  23. */
  24. #include "amqp.h"
  25. #include "amqp_tcp_socket.h"
  26. #include "amqp_time.h"
  27. #include <stdio.h>
  28. #include <stdlib.h>
  29. #include <string.h>
  30. #ifdef _WIN32
  31. #include <WinSock2.h>
  32. #else
  33. #include <sys/time.h>
  34. #endif
  35. #ifdef NDEBUG
  36. #undef NDEBUG
  37. #endif
  38. #include <assert.h>
  39. static const int fixed_channel_id = 1;
  40. static const char test_queue_name[] = "test_queue";
  41. amqp_connection_state_t setup_connection_and_channel(void) {
  42. amqp_connection_state_t connection_state_ = amqp_new_connection();
  43. amqp_socket_t *socket = amqp_tcp_socket_new(connection_state_);
  44. assert(socket);
  45. int rc = amqp_socket_open(socket, "localhost", AMQP_PROTOCOL_PORT);
  46. assert(rc == AMQP_STATUS_OK);
  47. amqp_rpc_reply_t rpc_reply = amqp_login(
  48. connection_state_, "/", 1, AMQP_DEFAULT_FRAME_SIZE,
  49. AMQP_DEFAULT_HEARTBEAT, AMQP_SASL_METHOD_PLAIN, "guest", "guest");
  50. assert(rpc_reply.reply_type == AMQP_RESPONSE_NORMAL);
  51. amqp_channel_open_ok_t *res =
  52. amqp_channel_open(connection_state_, fixed_channel_id);
  53. assert(res != NULL);
  54. return connection_state_;
  55. }
  56. void close_and_destroy_connection(amqp_connection_state_t connection_state_) {
  57. amqp_rpc_reply_t rpc_reply =
  58. amqp_connection_close(connection_state_, AMQP_REPLY_SUCCESS);
  59. assert(rpc_reply.reply_type == AMQP_RESPONSE_NORMAL);
  60. int rc = amqp_destroy_connection(connection_state_);
  61. assert(rc == AMQP_STATUS_OK);
  62. }
  63. void basic_publish(amqp_connection_state_t connectionState_,
  64. const char *message_) {
  65. amqp_bytes_t message_bytes = amqp_cstring_bytes(message_);
  66. amqp_basic_properties_t properties;
  67. properties._flags = 0;
  68. properties._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
  69. properties.delivery_mode = AMQP_DELIVERY_NONPERSISTENT;
  70. int retval = amqp_basic_publish(
  71. connectionState_, fixed_channel_id, amqp_cstring_bytes(""),
  72. amqp_cstring_bytes(test_queue_name),
  73. /* mandatory=*/1,
  74. /* immediate=*/0, /* RabbitMQ 3.x does not support the "immediate" flag
  75. according to
  76. https://www.rabbitmq.com/specification.html */
  77. &properties, message_bytes);
  78. assert(retval == 0);
  79. }
  80. void queue_declare(amqp_connection_state_t connection_state_,
  81. const char *queue_name_) {
  82. amqp_queue_declare_ok_t *res = amqp_queue_declare(
  83. connection_state_, fixed_channel_id, amqp_cstring_bytes(queue_name_),
  84. /*passive*/ 0,
  85. /*durable*/ 0,
  86. /*exclusive*/ 0,
  87. /*auto_delete*/ 1, amqp_empty_table);
  88. assert(res != NULL);
  89. }
  90. char *basic_get(amqp_connection_state_t connection_state_,
  91. const char *queue_name_, uint64_t *out_body_size_) {
  92. amqp_rpc_reply_t rpc_reply;
  93. amqp_time_t deadline;
  94. struct timeval timeout = {5, 0};
  95. int time_rc = amqp_time_from_now(&deadline, &timeout);
  96. assert(time_rc == AMQP_STATUS_OK);
  97. do {
  98. rpc_reply = amqp_basic_get(connection_state_, fixed_channel_id,
  99. amqp_cstring_bytes(queue_name_), /*no_ack*/ 1);
  100. } while (rpc_reply.reply_type == AMQP_RESPONSE_NORMAL &&
  101. rpc_reply.reply.id == AMQP_BASIC_GET_EMPTY_METHOD &&
  102. amqp_time_has_past(deadline) == AMQP_STATUS_OK);
  103. assert(rpc_reply.reply_type == AMQP_RESPONSE_NORMAL);
  104. assert(rpc_reply.reply.id == AMQP_BASIC_GET_OK_METHOD);
  105. amqp_message_t message;
  106. rpc_reply =
  107. amqp_read_message(connection_state_, fixed_channel_id, &message, 0);
  108. assert(rpc_reply.reply_type == AMQP_RESPONSE_NORMAL);
  109. char *body = malloc(message.body.len);
  110. memcpy(body, message.body.bytes, message.body.len);
  111. *out_body_size_ = message.body.len;
  112. amqp_destroy_message(&message);
  113. return body;
  114. }
  115. void publish_and_basic_get_message(const char *msg_to_publish) {
  116. amqp_connection_state_t connection_state = setup_connection_and_channel();
  117. queue_declare(connection_state, test_queue_name);
  118. basic_publish(connection_state, msg_to_publish);
  119. uint64_t body_size;
  120. char *msg = basic_get(connection_state, test_queue_name, &body_size);
  121. assert(body_size == strlen(msg_to_publish));
  122. assert(strncmp(msg_to_publish, msg, body_size) == 0);
  123. free(msg);
  124. close_and_destroy_connection(connection_state);
  125. }
  126. char *consume_message(amqp_connection_state_t connection_state_,
  127. const char *queue_name_, uint64_t *out_body_size_) {
  128. amqp_basic_consume_ok_t *result =
  129. amqp_basic_consume(connection_state_, fixed_channel_id,
  130. amqp_cstring_bytes(queue_name_), amqp_empty_bytes,
  131. /*no_local*/ 0,
  132. /*no_ack*/ 1,
  133. /*exclusive*/ 0, amqp_empty_table);
  134. assert(result != NULL);
  135. amqp_envelope_t envelope;
  136. struct timeval timeout = {5, 0};
  137. amqp_rpc_reply_t rpc_reply =
  138. amqp_consume_message(connection_state_, &envelope, &timeout, 0);
  139. assert(rpc_reply.reply_type == AMQP_RESPONSE_NORMAL);
  140. *out_body_size_ = envelope.message.body.len;
  141. char *body = malloc(*out_body_size_);
  142. if (*out_body_size_) {
  143. memcpy(body, envelope.message.body.bytes, *out_body_size_);
  144. }
  145. amqp_destroy_envelope(&envelope);
  146. return body;
  147. }
  148. void publish_and_consume_message(const char *msg_to_publish) {
  149. amqp_connection_state_t connection_state = setup_connection_and_channel();
  150. queue_declare(connection_state, test_queue_name);
  151. basic_publish(connection_state, msg_to_publish);
  152. uint64_t body_size;
  153. char *msg = consume_message(connection_state, test_queue_name, &body_size);
  154. assert(body_size == strlen(msg_to_publish));
  155. assert(strncmp(msg_to_publish, msg, body_size) == 0);
  156. free(msg);
  157. close_and_destroy_connection(connection_state);
  158. }
  159. int main(void) {
  160. publish_and_basic_get_message("");
  161. publish_and_basic_get_message("TEST");
  162. publish_and_consume_message("");
  163. publish_and_consume_message("TEST");
  164. return 0;
  165. }