batch_span_processor_test.cc 13 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. #include <gtest/gtest.h>
  4. #include <algorithm>
  5. #include <atomic>
  6. #include <chrono>
  7. #include <cstddef>
  8. #include <string>
  9. #include <thread>
  10. #include <utility>
  11. #include <vector>
  12. #include "opentelemetry/nostd/shared_ptr.h"
  13. #include "opentelemetry/nostd/span.h"
  14. #include "opentelemetry/sdk/common/attribute_utils.h"
  15. #include "opentelemetry/sdk/common/exporter_utils.h"
  16. #include "opentelemetry/sdk/common/global_log_handler.h"
  17. #include "opentelemetry/sdk/trace/batch_span_processor.h"
  18. #include "opentelemetry/sdk/trace/batch_span_processor_options.h"
  19. #include "opentelemetry/sdk/trace/exporter.h"
  20. #include "opentelemetry/sdk/trace/processor.h"
  21. #include "opentelemetry/sdk/trace/recordable.h"
  22. #include "opentelemetry/sdk/trace/span_data.h"
  23. #include "opentelemetry/version.h"
  24. OPENTELEMETRY_BEGIN_NAMESPACE
  25. /**
  26. * Returns a mock span exporter meant exclusively for testing only
  27. */
  28. class MockSpanExporter final : public sdk::trace::SpanExporter
  29. {
  30. public:
  31. MockSpanExporter(
  32. std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received,
  33. std::shared_ptr<std::atomic<std::size_t>> shut_down_counter,
  34. std::shared_ptr<std::atomic<bool>> is_shutdown,
  35. std::shared_ptr<std::atomic<bool>> is_export_completed =
  36. std::shared_ptr<std::atomic<bool>>(new std::atomic<bool>(false)),
  37. const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) noexcept
  38. : spans_received_(std::move(spans_received)),
  39. shut_down_counter_(std::move(shut_down_counter)),
  40. is_shutdown_(std::move(is_shutdown)),
  41. is_export_completed_(std::move(is_export_completed)),
  42. export_delay_(export_delay)
  43. {}
  44. std::unique_ptr<sdk::trace::Recordable> MakeRecordable() noexcept override
  45. {
  46. return std::unique_ptr<sdk::trace::Recordable>(new sdk::trace::SpanData);
  47. }
  48. sdk::common::ExportResult Export(
  49. const nostd::span<std::unique_ptr<sdk::trace::Recordable>> &recordables) noexcept override
  50. {
  51. *is_export_completed_ = false;
  52. std::this_thread::sleep_for(export_delay_);
  53. for (auto &recordable : recordables)
  54. {
  55. auto span = std::unique_ptr<sdk::trace::SpanData>(
  56. static_cast<sdk::trace::SpanData *>(recordable.release()));
  57. if (span != nullptr)
  58. {
  59. spans_received_->push_back(std::move(span));
  60. }
  61. }
  62. *is_export_completed_ = true;
  63. return sdk::common::ExportResult::kSuccess;
  64. }
  65. bool ForceFlush(std::chrono::microseconds /*timeout*/) noexcept override
  66. {
  67. ++(*shut_down_counter_);
  68. return true;
  69. }
  70. bool Shutdown(std::chrono::microseconds /* timeout */) noexcept override
  71. {
  72. *is_shutdown_ = true;
  73. return true;
  74. }
  75. bool IsExportCompleted() { return is_export_completed_->load(); }
  76. private:
  77. std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received_;
  78. std::shared_ptr<std::atomic<std::size_t>> shut_down_counter_;
  79. std::shared_ptr<std::atomic<bool>> is_shutdown_;
  80. std::shared_ptr<std::atomic<bool>> is_export_completed_;
  81. // Meant exclusively to test force flush timeout
  82. const std::chrono::milliseconds export_delay_;
  83. };
  84. /**
  85. * Fixture Class
  86. */
  87. class BatchSpanProcessorTestPeer : public testing::Test
  88. {
  89. public:
  90. std::unique_ptr<std::vector<std::unique_ptr<sdk::trace::Recordable>>> GetTestSpans(
  91. const std::shared_ptr<sdk::trace::SpanProcessor> &processor,
  92. const int num_spans)
  93. {
  94. std::unique_ptr<std::vector<std::unique_ptr<sdk::trace::Recordable>>> test_spans(
  95. new std::vector<std::unique_ptr<sdk::trace::Recordable>>);
  96. for (int i = 0; i < num_spans; ++i)
  97. {
  98. test_spans->push_back(processor->MakeRecordable());
  99. static_cast<sdk::trace::SpanData *>(test_spans->at(i).get())
  100. ->SetName("Span " + std::to_string(i));
  101. }
  102. return test_spans;
  103. }
  104. };
  105. /* ################################## TESTS ############################################ */
  106. TEST_F(BatchSpanProcessorTestPeer, TestShutdown)
  107. {
  108. std::shared_ptr<std::atomic<std::size_t>> shut_down_counter(new std::atomic<std::size_t>(0));
  109. std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
  110. std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
  111. new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
  112. auto batch_processor =
  113. std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
  114. std::unique_ptr<sdk::trace::SpanExporter>(
  115. new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
  116. sdk::trace::BatchSpanProcessorOptions()));
  117. const int num_spans = 3;
  118. auto test_spans = GetTestSpans(batch_processor, num_spans);
  119. for (int i = 0; i < num_spans; ++i)
  120. {
  121. batch_processor->OnEnd(std::move(test_spans->at(i)));
  122. }
  123. EXPECT_TRUE(batch_processor->Shutdown());
  124. // It's safe to shutdown again
  125. EXPECT_TRUE(batch_processor->Shutdown());
  126. EXPECT_EQ(num_spans, spans_received->size());
  127. for (int i = 0; i < num_spans; ++i)
  128. {
  129. EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
  130. }
  131. EXPECT_TRUE(is_shutdown->load());
  132. }
  133. TEST_F(BatchSpanProcessorTestPeer, TestForceFlush)
  134. {
  135. std::shared_ptr<std::atomic<std::size_t>> shut_down_counter(new std::atomic<std::size_t>(0));
  136. std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
  137. std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
  138. new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
  139. auto batch_processor =
  140. std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
  141. std::unique_ptr<sdk::trace::SpanExporter>(
  142. new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
  143. sdk::trace::BatchSpanProcessorOptions()));
  144. const int num_spans = 2048;
  145. auto test_spans = GetTestSpans(batch_processor, num_spans);
  146. for (int i = 0; i < num_spans; ++i)
  147. {
  148. batch_processor->OnEnd(std::move(test_spans->at(i)));
  149. }
  150. // Give some time to export
  151. std::this_thread::sleep_for(std::chrono::milliseconds(50));
  152. EXPECT_TRUE(batch_processor->ForceFlush());
  153. EXPECT_GE(shut_down_counter->load(), 1);
  154. EXPECT_EQ(num_spans, spans_received->size());
  155. for (int i = 0; i < num_spans; ++i)
  156. {
  157. EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
  158. }
  159. // Create some more spans to make sure that the processor still works
  160. auto more_test_spans = GetTestSpans(batch_processor, num_spans);
  161. for (int i = 0; i < num_spans; ++i)
  162. {
  163. batch_processor->OnEnd(std::move(more_test_spans->at(i)));
  164. }
  165. // Give some time to export the spans
  166. std::this_thread::sleep_for(std::chrono::milliseconds(50));
  167. auto shut_down_counter_before = shut_down_counter->load();
  168. EXPECT_TRUE(batch_processor->ForceFlush());
  169. EXPECT_GT(shut_down_counter->load(), shut_down_counter_before);
  170. EXPECT_EQ(num_spans * 2, spans_received->size());
  171. for (int i = 0; i < num_spans; ++i)
  172. {
  173. EXPECT_EQ("Span " + std::to_string(i % num_spans),
  174. spans_received->at(num_spans + i)->GetName());
  175. }
  176. }
  177. // A mock log handler to check whether log messages with a specific level were emitted.
  178. struct MockLogHandler : public sdk::common::internal_log::LogHandler
  179. {
  180. using Message = std::pair<sdk::common::internal_log::LogLevel, std::string>;
  181. void Handle(sdk::common::internal_log::LogLevel level,
  182. const char * /*file*/,
  183. int /*line*/,
  184. const char *msg,
  185. const sdk::common::AttributeMap & /*attributes*/) noexcept override
  186. {
  187. messages.emplace_back(level, msg);
  188. }
  189. std::vector<Message> messages;
  190. };
  191. TEST_F(BatchSpanProcessorTestPeer, TestManySpansLoss)
  192. {
  193. /* Test that when exporting more than max_queue_size spans, some are most likely lost*/
  194. // Set up a log handler to verify a warning is generated.
  195. auto log_handler = nostd::shared_ptr<sdk::common::internal_log::LogHandler>(new MockLogHandler());
  196. sdk::common::internal_log::GlobalLogHandler::SetLogHandler(log_handler);
  197. std::shared_ptr<std::atomic<std::size_t>> shut_down_counter(new std::atomic<std::size_t>(0));
  198. std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
  199. std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
  200. new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
  201. const int max_queue_size = 4096;
  202. auto batch_processor =
  203. std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
  204. std::unique_ptr<sdk::trace::SpanExporter>(
  205. new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
  206. sdk::trace::BatchSpanProcessorOptions()));
  207. auto test_spans = GetTestSpans(batch_processor, max_queue_size);
  208. for (int i = 0; i < max_queue_size; ++i)
  209. {
  210. batch_processor->OnEnd(std::move(test_spans->at(i)));
  211. }
  212. // Give some time to export the spans
  213. std::this_thread::sleep_for(std::chrono::milliseconds(700));
  214. EXPECT_TRUE(batch_processor->ForceFlush());
  215. // Span should be exported by now
  216. EXPECT_GE(max_queue_size, spans_received->size());
  217. // If we haven't received all spans, some must have dropped, verify a warning was logged.
  218. // Only do this when the log level is warning or above.
  219. #if OTEL_INTERNAL_LOG_LEVEL >= OTEL_INTERNAL_LOG_LEVEL_WARN
  220. if (max_queue_size > spans_received->size())
  221. {
  222. auto &messages = static_cast<MockLogHandler *>(log_handler.get())->messages;
  223. EXPECT_TRUE(
  224. std::find(messages.begin(), messages.end(),
  225. MockLogHandler::Message(sdk::common::internal_log::LogLevel::Warning,
  226. "BatchSpanProcessor queue is full - dropping span.")) !=
  227. messages.end());
  228. }
  229. #endif
  230. // Reinstate the default log handler.
  231. sdk::common::internal_log::GlobalLogHandler::SetLogHandler(
  232. nostd::shared_ptr<sdk::common::internal_log::LogHandler>(
  233. new sdk::common::internal_log::DefaultLogHandler()));
  234. }
  235. TEST_F(BatchSpanProcessorTestPeer, TestManySpansLossLess)
  236. {
  237. /* Test that no spans are lost when sending max_queue_size spans */
  238. std::shared_ptr<std::atomic<std::size_t>> shut_down_counter(new std::atomic<std::size_t>(0));
  239. std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
  240. std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
  241. new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
  242. const int num_spans = 2048;
  243. auto batch_processor =
  244. std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
  245. std::unique_ptr<sdk::trace::SpanExporter>(
  246. new MockSpanExporter(spans_received, shut_down_counter, is_shutdown)),
  247. sdk::trace::BatchSpanProcessorOptions()));
  248. auto test_spans = GetTestSpans(batch_processor, num_spans);
  249. for (int i = 0; i < num_spans; ++i)
  250. {
  251. batch_processor->OnEnd(std::move(test_spans->at(i)));
  252. }
  253. // Give some time to export the spans
  254. std::this_thread::sleep_for(std::chrono::milliseconds(50));
  255. EXPECT_TRUE(batch_processor->ForceFlush());
  256. EXPECT_EQ(num_spans, spans_received->size());
  257. for (int i = 0; i < num_spans; ++i)
  258. {
  259. EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
  260. }
  261. }
  262. TEST_F(BatchSpanProcessorTestPeer, TestScheduleDelayMillis)
  263. {
  264. /* Test that max_export_batch_size spans are exported every schedule_delay_millis
  265. seconds */
  266. std::shared_ptr<std::atomic<std::size_t>> shut_down_counter(new std::atomic<std::size_t>(0));
  267. std::shared_ptr<std::atomic<bool>> is_shutdown(new std::atomic<bool>(false));
  268. std::shared_ptr<std::atomic<bool>> is_export_completed(new std::atomic<bool>(false));
  269. std::shared_ptr<std::vector<std::unique_ptr<sdk::trace::SpanData>>> spans_received(
  270. new std::vector<std::unique_ptr<sdk::trace::SpanData>>);
  271. const std::chrono::milliseconds export_delay(0);
  272. const size_t max_export_batch_size = 512;
  273. sdk::trace::BatchSpanProcessorOptions options{};
  274. options.schedule_delay_millis = std::chrono::milliseconds(2000);
  275. auto batch_processor =
  276. std::shared_ptr<sdk::trace::BatchSpanProcessor>(new sdk::trace::BatchSpanProcessor(
  277. std::unique_ptr<sdk::trace::SpanExporter>(new MockSpanExporter(
  278. spans_received, shut_down_counter, is_shutdown, is_export_completed, export_delay)),
  279. options));
  280. auto test_spans = GetTestSpans(batch_processor, max_export_batch_size);
  281. for (size_t i = 0; i < max_export_batch_size; ++i)
  282. {
  283. batch_processor->OnEnd(std::move(test_spans->at(i)));
  284. }
  285. // Sleep for schedule_delay_millis milliseconds
  286. std::this_thread::sleep_for(options.schedule_delay_millis);
  287. // small delay to give time to export
  288. std::this_thread::sleep_for(std::chrono::milliseconds(50));
  289. // Spans should be exported by now
  290. EXPECT_TRUE(is_export_completed->load());
  291. EXPECT_EQ(max_export_batch_size, spans_received->size());
  292. for (size_t i = 0; i < max_export_batch_size; ++i)
  293. {
  294. EXPECT_EQ("Span " + std::to_string(i), spans_received->at(i)->GetName());
  295. }
  296. }
  297. OPENTELEMETRY_END_NAMESPACE