async_metric_storage_test.cc 14 KB


  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. #include <gtest/gtest.h>
  4. #include <algorithm>
  5. #include <chrono>
  6. #include <cstdint>
  7. #include <map>
  8. #include <memory>
  9. #include <string>
  10. #include <unordered_map>
  11. #include <utility>
  12. #include <vector>
  13. #include "common.h"
  14. #include "opentelemetry/common/timestamp.h"
  15. #include "opentelemetry/nostd/function_ref.h"
  16. #include "opentelemetry/nostd/span.h"
  17. #include "opentelemetry/nostd/variant.h"
  18. #include "opentelemetry/sdk/instrumentationscope/instrumentation_scope.h"
  19. #include "opentelemetry/sdk/metrics/data/metric_data.h"
  20. #include "opentelemetry/sdk/metrics/data/point_data.h"
  21. #include "opentelemetry/sdk/metrics/export/metric_producer.h"
  22. #include "opentelemetry/sdk/metrics/instruments.h"
  23. #include "opentelemetry/sdk/metrics/state/async_metric_storage.h"
  24. #include "opentelemetry/sdk/metrics/state/attributes_hashmap.h"
  25. #include "opentelemetry/sdk/metrics/state/filtered_ordered_attribute_map.h"
  26. #include "opentelemetry/sdk/metrics/state/metric_collector.h"
  27. #include "opentelemetry/sdk/metrics/view/attributes_processor.h"
  28. #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
  29. # include "opentelemetry/sdk/metrics/exemplar/filter_type.h"
  30. # include "opentelemetry/sdk/metrics/exemplar/reservoir.h"
  31. #endif
  32. using namespace opentelemetry::sdk::metrics;
  33. using namespace opentelemetry::sdk::instrumentationscope;
  34. using namespace opentelemetry::sdk::resource;
  35. using namespace opentelemetry::common;
  36. namespace nostd = opentelemetry::nostd;
  37. using M = std::map<std::string, std::string>;
  38. class WritableMetricStorageTestFixture : public ::testing::TestWithParam<AggregationTemporality>
  39. {};
  40. class WritableMetricStorageTestUpDownFixture
  41. : public ::testing::TestWithParam<AggregationTemporality>
  42. {};
  43. class WritableMetricStorageTestObservableGaugeFixture
  44. : public ::testing::TestWithParam<AggregationTemporality>
  45. {};
  46. TEST_P(WritableMetricStorageTestFixture, TestAggregation)
  47. {
  48. AggregationTemporality temporality = GetParam();
  49. InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kObservableCounter,
  50. InstrumentValueType::kLong};
  51. auto sdk_start_ts = std::chrono::system_clock::now();
  52. // Some computation here
  53. auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5);
  54. std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
  55. std::vector<std::shared_ptr<CollectorHandle>> collectors;
  56. collectors.push_back(collector);
  57. opentelemetry::sdk::metrics::AsyncMetricStorage storage(
  58. instr_desc, AggregationType::kSum,
  59. #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
  60. ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(),
  61. #endif
  62. nullptr);
  63. int64_t get_count1 = 20;
  64. int64_t put_count1 = 10;
  65. std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
  66. {{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}};
  67. storage.RecordLong(measurements1,
  68. opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
  69. storage.Collect(
  70. collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
  71. for (const auto &data_attr : metric_data.point_data_attr_)
  72. {
  73. const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
  74. if (opentelemetry::nostd::get<std::string>(
  75. data_attr.attributes.find("RequestType")->second) == "GET")
  76. {
  77. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count1);
  78. }
  79. else if (opentelemetry::nostd::get<std::string>(
  80. data_attr.attributes.find("RequestType")->second) == "PUT")
  81. {
  82. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count1);
  83. }
  84. }
  85. return true;
  86. });
  87. // subsequent recording after collection shouldn't fail
  88. // monotonic increasing values;
  89. int64_t get_count2 = 50;
  90. int64_t put_count2 = 70;
  91. std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements2 = {
  92. {{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}};
  93. storage.RecordLong(measurements2,
  94. opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
  95. storage.Collect(
  96. collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
  97. for (const auto &data_attr : metric_data.point_data_attr_)
  98. {
  99. const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
  100. if (opentelemetry::nostd::get<std::string>(
  101. data_attr.attributes.find("RequestType")->second) == "GET")
  102. {
  103. if (temporality == AggregationTemporality::kCumulative)
  104. {
  105. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count2);
  106. }
  107. else
  108. {
  109. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count2 - get_count1);
  110. }
  111. }
  112. else if (opentelemetry::nostd::get<std::string>(
  113. data_attr.attributes.find("RequestType")->second) == "PUT")
  114. {
  115. if (temporality == AggregationTemporality::kCumulative)
  116. {
  117. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count2);
  118. }
  119. else
  120. {
  121. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count2 - put_count1);
  122. }
  123. }
  124. }
  125. return true;
  126. });
  127. }
  128. INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestLong,
  129. WritableMetricStorageTestFixture,
  130. ::testing::Values(AggregationTemporality::kCumulative,
  131. AggregationTemporality::kDelta));
  132. TEST_P(WritableMetricStorageTestUpDownFixture, TestAggregation)
  133. {
  134. AggregationTemporality temporality = GetParam();
  135. InstrumentDescriptor instr_desc = {"name", "desc", "1unit",
  136. InstrumentType::kObservableUpDownCounter,
  137. InstrumentValueType::kLong};
  138. auto sdk_start_ts = std::chrono::system_clock::now();
  139. // Some computation here
  140. auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5);
  141. std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
  142. std::vector<std::shared_ptr<CollectorHandle>> collectors;
  143. collectors.push_back(collector);
  144. opentelemetry::sdk::metrics::AsyncMetricStorage storage(
  145. instr_desc, AggregationType::kDefault,
  146. #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
  147. ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(),
  148. #endif
  149. nullptr);
  150. int64_t get_count1 = 20;
  151. int64_t put_count1 = 10;
  152. std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
  153. {{{"RequestType", "GET"}}, get_count1}, {{{"RequestType", "PUT"}}, put_count1}};
  154. storage.RecordLong(measurements1,
  155. opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
  156. storage.Collect(
  157. collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
  158. for (const auto &data_attr : metric_data.point_data_attr_)
  159. {
  160. const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
  161. if (opentelemetry::nostd::get<std::string>(
  162. data_attr.attributes.find("RequestType")->second) == "GET")
  163. {
  164. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count1);
  165. }
  166. else if (opentelemetry::nostd::get<std::string>(
  167. data_attr.attributes.find("RequestType")->second) == "PUT")
  168. {
  169. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count1);
  170. }
  171. }
  172. return true;
  173. });
  174. // subsequent recording after collection shouldn't fail
  175. // monotonic increasing values;
  176. int64_t get_count2 = -50;
  177. int64_t put_count2 = -70;
  178. std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements2 = {
  179. {{{"RequestType", "GET"}}, get_count2}, {{{"RequestType", "PUT"}}, put_count2}};
  180. storage.RecordLong(measurements2,
  181. opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
  182. storage.Collect(
  183. collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
  184. for (const auto &data_attr : metric_data.point_data_attr_)
  185. {
  186. const auto &data = opentelemetry::nostd::get<SumPointData>(data_attr.point_data);
  187. if (opentelemetry::nostd::get<std::string>(
  188. data_attr.attributes.find("RequestType")->second) == "GET")
  189. {
  190. if (temporality == AggregationTemporality::kCumulative)
  191. {
  192. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count2);
  193. }
  194. else
  195. {
  196. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), get_count2 - get_count1);
  197. }
  198. }
  199. else if (opentelemetry::nostd::get<std::string>(
  200. data_attr.attributes.find("RequestType")->second) == "PUT")
  201. {
  202. if (temporality == AggregationTemporality::kCumulative)
  203. {
  204. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count2);
  205. }
  206. else
  207. {
  208. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), put_count2 - put_count1);
  209. }
  210. }
  211. }
  212. return true;
  213. });
  214. }
  215. INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestUpDownLong,
  216. WritableMetricStorageTestUpDownFixture,
  217. ::testing::Values(AggregationTemporality::kCumulative,
  218. AggregationTemporality::kDelta));
  219. TEST_P(WritableMetricStorageTestObservableGaugeFixture, TestAggregation)
  220. {
  221. AggregationTemporality temporality = GetParam();
  222. InstrumentDescriptor instr_desc = {"name", "desc", "1unit", InstrumentType::kObservableGauge,
  223. InstrumentValueType::kLong};
  224. auto sdk_start_ts = std::chrono::system_clock::now();
  225. // Some computation here
  226. auto collection_ts = std::chrono::system_clock::now() + std::chrono::seconds(5);
  227. std::shared_ptr<CollectorHandle> collector(new MockCollectorHandle(temporality));
  228. std::vector<std::shared_ptr<CollectorHandle>> collectors;
  229. collectors.push_back(collector);
  230. opentelemetry::sdk::metrics::AsyncMetricStorage storage(
  231. instr_desc, AggregationType::kLastValue,
  232. #ifdef ENABLE_METRICS_EXEMPLAR_PREVIEW
  233. ExemplarFilterType::kAlwaysOff, ExemplarReservoir::GetNoExemplarReservoir(),
  234. #endif
  235. nullptr);
  236. int64_t freq_cpu0 = 3;
  237. int64_t freq_cpu1 = 5;
  238. std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements1 = {
  239. {{{"CPU", "0"}}, freq_cpu0}, {{{"CPU", "1"}}, freq_cpu1}};
  240. storage.RecordLong(measurements1,
  241. opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
  242. storage.Collect(
  243. collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
  244. for (auto data_attr : metric_data.point_data_attr_)
  245. {
  246. auto data = opentelemetry::nostd::get<LastValuePointData>(data_attr.point_data);
  247. if (opentelemetry::nostd::get<std::string>(data_attr.attributes.find("CPU")->second) ==
  248. "0")
  249. {
  250. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), freq_cpu0);
  251. }
  252. else if (opentelemetry::nostd::get<std::string>(
  253. data_attr.attributes.find("CPU")->second) == "1")
  254. {
  255. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), freq_cpu1);
  256. }
  257. }
  258. return true;
  259. });
  260. freq_cpu0 = 6;
  261. freq_cpu1 = 8;
  262. std::unordered_map<MetricAttributes, int64_t, AttributeHashGenerator> measurements2 = {
  263. {{{"CPU", "0"}}, freq_cpu0}, {{{"CPU", "1"}}, freq_cpu1}};
  264. storage.RecordLong(measurements2,
  265. opentelemetry::common::SystemTimestamp(std::chrono::system_clock::now()));
  266. storage.Collect(
  267. collector.get(), collectors, sdk_start_ts, collection_ts, [&](const MetricData &metric_data) {
  268. for (auto data_attr : metric_data.point_data_attr_)
  269. {
  270. auto data = opentelemetry::nostd::get<LastValuePointData>(data_attr.point_data);
  271. if (opentelemetry::nostd::get<std::string>(data_attr.attributes.find("CPU")->second) ==
  272. "0")
  273. {
  274. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), freq_cpu0);
  275. }
  276. else if (opentelemetry::nostd::get<std::string>(
  277. data_attr.attributes.find("CPU")->second) == "1")
  278. {
  279. EXPECT_EQ(opentelemetry::nostd::get<int64_t>(data.value_), freq_cpu1);
  280. }
  281. }
  282. return true;
  283. });
  284. }
  285. INSTANTIATE_TEST_SUITE_P(WritableMetricStorageTestObservableGaugeFixtureLong,
  286. WritableMetricStorageTestObservableGaugeFixture,
  287. ::testing::Values(AggregationTemporality::kCumulative,
  288. AggregationTemporality::kDelta));