circular_buffer_test.cc 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. #include <gtest/gtest.h>
  4. #include <stddef.h>
  5. #include <algorithm>
  6. #include <atomic>
  7. #include <cassert>
  8. #include <cstdint>
  9. #include <functional>
  10. #include <initializer_list>
  11. #include <memory>
  12. #include <random>
  13. #include <string>
  14. #include <thread>
  15. #include <vector>
  16. #include "opentelemetry/sdk/common/atomic_unique_ptr.h"
  17. #include "opentelemetry/sdk/common/circular_buffer.h"
  18. #include "opentelemetry/sdk/common/circular_buffer_range.h"
  19. using opentelemetry::sdk::common::AtomicUniquePtr;
  20. using opentelemetry::sdk::common::CircularBuffer;
  21. using opentelemetry::sdk::common::CircularBufferRange;
  22. static thread_local std::mt19937 RandomNumberGenerator{std::random_device{}()};
  23. static void GenerateRandomNumbers(CircularBuffer<uint32_t> &buffer,
  24. std::vector<uint32_t> &numbers,
  25. int n)
  26. {
  27. for (int i = 0; i < n; ++i)
  28. {
  29. auto value = static_cast<uint32_t>(RandomNumberGenerator());
  30. std::unique_ptr<uint32_t> x{new uint32_t{value}};
  31. if (buffer.Add(x))
  32. {
  33. numbers.push_back(value);
  34. }
  35. }
  36. }
  37. static void RunNumberProducers(CircularBuffer<uint32_t> &buffer,
  38. std::vector<uint32_t> &numbers,
  39. int num_threads,
  40. int n)
  41. {
  42. std::vector<std::vector<uint32_t>> thread_numbers(num_threads);
  43. std::vector<std::thread> threads(num_threads);
  44. for (int thread_index = 0; thread_index < num_threads; ++thread_index)
  45. {
  46. threads[thread_index] = std::thread{GenerateRandomNumbers, std::ref(buffer),
  47. std::ref(thread_numbers[thread_index]), n};
  48. }
  49. for (auto &thread : threads)
  50. {
  51. thread.join();
  52. }
  53. for (int thread_index = 0; thread_index < num_threads; ++thread_index)
  54. {
  55. numbers.insert(numbers.end(), thread_numbers[thread_index].begin(),
  56. thread_numbers[thread_index].end());
  57. }
  58. }
  59. void RunNumberConsumer(CircularBuffer<uint32_t> &buffer,
  60. std::atomic<bool> &exit,
  61. std::vector<uint32_t> &numbers)
  62. {
  63. while (true)
  64. {
  65. if (exit && buffer.Peek().empty())
  66. {
  67. return;
  68. }
  69. auto n = std::uniform_int_distribution<size_t>{0, buffer.Peek().size()}(RandomNumberGenerator);
  70. buffer.Consume(n, [&](CircularBufferRange<AtomicUniquePtr<uint32_t>> range) noexcept {
  71. assert(range.size() == n);
  72. range.ForEach([&](AtomicUniquePtr<uint32_t> &ptr) noexcept {
  73. assert(!ptr.IsNull());
  74. numbers.push_back(*ptr);
  75. ptr.Reset();
  76. return true;
  77. });
  78. });
  79. }
  80. }
  81. TEST(CircularBufferTest, Add)
  82. {
  83. CircularBuffer<int> buffer{10};
  84. std::unique_ptr<int> x{new int{11}};
  85. EXPECT_TRUE(buffer.Add(x));
  86. EXPECT_EQ(x, nullptr);
  87. auto range = buffer.Peek();
  88. EXPECT_EQ(range.size(), 1);
  89. range.ForEach([](const AtomicUniquePtr<int> &y) {
  90. EXPECT_EQ(*y, 11);
  91. return true;
  92. });
  93. }
  94. TEST(CircularBufferTest, Clear)
  95. {
  96. CircularBuffer<int> buffer{10};
  97. std::unique_ptr<int> x{new int{11}};
  98. EXPECT_TRUE(buffer.Add(x));
  99. EXPECT_EQ(x, nullptr);
  100. buffer.Clear();
  101. EXPECT_TRUE(buffer.empty());
  102. }
  103. TEST(CircularBufferTest, AddOnFull)
  104. {
  105. CircularBuffer<int> buffer{10};
  106. for (int i = 0; i < static_cast<int>(buffer.max_size()); ++i)
  107. {
  108. std::unique_ptr<int> x{new int{i}};
  109. EXPECT_TRUE(buffer.Add(x));
  110. }
  111. std::unique_ptr<int> x{new int{33}};
  112. EXPECT_FALSE(buffer.Add(x));
  113. EXPECT_NE(x, nullptr);
  114. EXPECT_EQ(*x, 33);
  115. }
  116. TEST(CircularBufferTest, Consume)
  117. {
  118. CircularBuffer<int> buffer{10};
  119. for (int i = 0; i < static_cast<int>(buffer.max_size()); ++i)
  120. {
  121. std::unique_ptr<int> x{new int{i}};
  122. EXPECT_TRUE(buffer.Add(x));
  123. }
  124. int count = 0;
  125. buffer.Consume(5, [&](CircularBufferRange<AtomicUniquePtr<int>> range) noexcept {
  126. range.ForEach([&](AtomicUniquePtr<int> &ptr) {
  127. EXPECT_EQ(*ptr, count++);
  128. ptr.Reset();
  129. return true;
  130. });
  131. });
  132. EXPECT_EQ(count, 5);
  133. }
  134. TEST(CircularBufferTest, Simulation)
  135. {
  136. const int num_producer_threads = 4;
  137. const int n = 25000;
  138. for (size_t max_size : {1, 2, 10, 50, 100, 1000})
  139. {
  140. CircularBuffer<uint32_t> buffer{max_size};
  141. std::vector<uint32_t> producer_numbers;
  142. std::vector<uint32_t> consumer_numbers;
  143. auto producers = std::thread{RunNumberProducers, std::ref(buffer), std::ref(producer_numbers),
  144. num_producer_threads, n};
  145. std::atomic<bool> exit{false};
  146. auto consumer = std::thread{RunNumberConsumer, std::ref(buffer), std::ref(exit),
  147. std::ref(consumer_numbers)};
  148. producers.join();
  149. exit = true;
  150. consumer.join();
  151. std::sort(producer_numbers.begin(), producer_numbers.end());
  152. std::sort(consumer_numbers.begin(), consumer_numbers.end());
  153. EXPECT_EQ(producer_numbers.size(), consumer_numbers.size());
  154. EXPECT_EQ(producer_numbers, consumer_numbers);
  155. }
  156. }