metric_test_stress.cc 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. #include <gtest/gtest.h>
  4. #include <stdint.h>
  5. #include <atomic>
  6. #include <chrono>
  7. #include <random>
  8. #include <thread>
  9. #include <utility>
  10. #include <vector>
  11. #include "common.h"
  12. #include "opentelemetry/context/context.h"
  13. #include "opentelemetry/metrics/meter.h"
  14. #include "opentelemetry/metrics/sync_instruments.h"
  15. #include "opentelemetry/nostd/function_ref.h"
  16. #include "opentelemetry/nostd/shared_ptr.h"
  17. #include "opentelemetry/nostd/unique_ptr.h"
  18. #include "opentelemetry/nostd/variant.h"
  19. #include "opentelemetry/sdk/common/exporter_utils.h"
  20. #include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"
  21. #include "opentelemetry/sdk/metrics/data/metric_data.h"
  22. #include "opentelemetry/sdk/metrics/data/point_data.h"
  23. #include "opentelemetry/sdk/metrics/export/metric_producer.h"
  24. #include "opentelemetry/sdk/metrics/instruments.h"
  25. #include "opentelemetry/sdk/metrics/meter_provider.h"
  26. #include "opentelemetry/sdk/metrics/metric_reader.h"
  27. #include "opentelemetry/sdk/metrics/push_metric_exporter.h"
  28. using namespace opentelemetry;
  29. using namespace opentelemetry::sdk::instrumentationscope;
  30. using namespace opentelemetry::sdk::metrics;
  31. class MockMetricExporterForStress : public opentelemetry::sdk::metrics::PushMetricExporter
  32. {
  33. public:
  34. MockMetricExporterForStress() = default;
  35. opentelemetry::sdk::metrics::AggregationTemporality GetAggregationTemporality(
  36. opentelemetry::sdk::metrics::InstrumentType) const noexcept override
  37. {
  38. return AggregationTemporality::kDelta;
  39. }
  40. opentelemetry::sdk::common::ExportResult Export(
  41. const opentelemetry::sdk::metrics::ResourceMetrics &) noexcept override
  42. {
  43. return opentelemetry::sdk::common::ExportResult::kSuccess;
  44. }
  45. bool ForceFlush(std::chrono::microseconds) noexcept override { return true; }
  46. bool Shutdown(std::chrono::microseconds) noexcept override { return true; }
  47. };
  48. TEST(HistogramStress, UnsignedInt64)
  49. {
  50. MeterProvider mp;
  51. auto m = mp.GetMeter("meter1", "version1", "schema1");
  52. std::unique_ptr<MockMetricExporterForStress> exporter(new MockMetricExporterForStress());
  53. std::shared_ptr<MetricReader> reader{new MockMetricReader(std::move(exporter))};
  54. mp.AddMetricReader(reader);
  55. auto h = m->CreateUInt64Histogram("histogram1", "histogram1_description", "histogram1_unit");
  56. //
  57. // Start a dedicated thread to collect the metrics
  58. //
  59. std::vector<HistogramPointData> actuals;
  60. auto stop_collecting = std::make_shared<std::atomic<bool>>(false);
  61. auto collect_thread = std::thread([&reader, &actuals, stop_collecting]() {
  62. while (!*stop_collecting)
  63. {
  64. std::this_thread::sleep_for(std::chrono::milliseconds(1000));
  65. reader->Collect([&](ResourceMetrics &rm) {
  66. for (const ScopeMetrics &smd : rm.scope_metric_data_)
  67. {
  68. for (const MetricData &md : smd.metric_data_)
  69. {
  70. for (const PointDataAttributes &dp : md.point_data_attr_)
  71. {
  72. actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));
  73. }
  74. }
  75. }
  76. return true;
  77. });
  78. }
  79. });
  80. //
  81. // Start logging threads
  82. //
  83. int record_thread_count = std::thread::hardware_concurrency() - 1;
  84. if (record_thread_count <= 0)
  85. {
  86. record_thread_count = 1;
  87. }
  88. std::vector<std::thread> threads(record_thread_count);
  89. constexpr int iterations_per_thread = 2000000;
  90. auto expected_sum = std::make_shared<std::atomic<uint64_t>>(0);
  91. for (int i = 0; i < record_thread_count; ++i)
  92. {
  93. threads[i] = std::thread([&] {
  94. std::random_device rd;
  95. std::mt19937 random_engine(rd());
  96. std::uniform_int_distribution<> gen_random(1, 20000);
  97. for (int j = 0; j < iterations_per_thread; ++j)
  98. {
  99. int64_t val = gen_random(random_engine);
  100. expected_sum->fetch_add(val, std::memory_order_relaxed);
  101. h->Record(val, {});
  102. }
  103. });
  104. }
  105. for (int i = 0; i < record_thread_count; ++i)
  106. {
  107. threads[i].join();
  108. }
  109. //
  110. // Stop the dedicated collection thread
  111. //
  112. *stop_collecting = true;
  113. collect_thread.join();
  114. //
  115. // run the the final collection
  116. //
  117. reader->Collect([&](ResourceMetrics &rm) {
  118. for (const ScopeMetrics &smd : rm.scope_metric_data_)
  119. {
  120. for (const MetricData &md : smd.metric_data_)
  121. {
  122. for (const PointDataAttributes &dp : md.point_data_attr_)
  123. {
  124. actuals.push_back(opentelemetry::nostd::get<HistogramPointData>(dp.point_data));
  125. }
  126. }
  127. }
  128. return true;
  129. });
  130. //
  131. // Aggregate the results
  132. //
  133. int64_t expected_count = record_thread_count * iterations_per_thread;
  134. int64_t collected_count = 0;
  135. int64_t collected_sum = 0;
  136. for (const auto &actual : actuals)
  137. {
  138. int64_t collected_bucket_sum = 0;
  139. for (const auto &count : actual.counts_)
  140. {
  141. collected_bucket_sum += count;
  142. }
  143. ASSERT_EQ(collected_bucket_sum, actual.count_);
  144. collected_sum += opentelemetry::nostd::get<int64_t>(actual.sum_);
  145. collected_count += actual.count_;
  146. }
  147. ASSERT_EQ(expected_count, collected_count);
  148. ASSERT_EQ(*expected_sum, collected_sum);
  149. }