ckms_quantiles.h 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. #pragma once
  2. #include <array>
  3. #include <cstddef>
  4. #include <functional>
  5. #include <vector>
  6. namespace prometheus {
  7. namespace detail {
  8. class CKMSQuantiles {
  9. public:
  10. struct Quantile {
  11. double quantile;
  12. double error;
  13. double u;
  14. double v;
  15. Quantile(double quantile, double error)
  16. : quantile(quantile),
  17. error(error),
  18. u(2.0 * error / (1.0 - quantile)),
  19. v(2.0 * error / quantile) {}
  20. };
  21. private:
  22. struct Item {
  23. double value;
  24. int g;
  25. int delta;
  26. Item(double value, int lower_delta, int delta)
  27. : value(value), g(lower_delta), delta(delta) {}
  28. };
  29. public:
  30. explicit CKMSQuantiles(const std::vector<Quantile>& quantiles)
  31. : quantiles_(quantiles), count_(0), buffer_{}, buffer_count_(0) {}
  32. void insert(double value) {
  33. buffer_[buffer_count_] = value;
  34. ++buffer_count_;
  35. if (buffer_count_ == buffer_.size()) {
  36. insertBatch();
  37. compress();
  38. }
  39. }
  40. double get(double q) {
  41. insertBatch();
  42. compress();
  43. if (sample_.empty()) {
  44. return std::numeric_limits<double>::quiet_NaN();
  45. }
  46. int rankMin = 0;
  47. const auto desired = static_cast<int>(q * static_cast<double>(count_));
  48. const auto bound = desired + (allowableError(desired) / 2);
  49. auto it = sample_.begin();
  50. decltype(it) prev;
  51. auto cur = it++;
  52. while (it != sample_.end()) {
  53. prev = cur;
  54. cur = it++;
  55. rankMin += prev->g;
  56. if (rankMin + cur->g + cur->delta > bound) {
  57. return prev->value;
  58. }
  59. }
  60. return sample_.back().value;
  61. }
  62. void reset() {
  63. count_ = 0;
  64. sample_.clear();
  65. buffer_count_ = 0;
  66. }
  67. private:
  68. double allowableError(int rank) {
  69. auto size = sample_.size();
  70. double minError = static_cast<double>(size + 1);
  71. for (const auto& q : quantiles_.get()) {
  72. double error;
  73. if (static_cast<double>(rank) <= q.quantile * static_cast<double>(size)) {
  74. error = q.u * static_cast<double>(size - rank);
  75. }
  76. else {
  77. error = q.v * rank;
  78. }
  79. if (error < minError) {
  80. minError = error;
  81. }
  82. }
  83. return minError;
  84. }
  85. bool insertBatch() {
  86. if (buffer_count_ == 0) {
  87. return false;
  88. }
  89. std::sort(buffer_.begin(), buffer_.begin() + buffer_count_);
  90. std::size_t start = 0;
  91. if (sample_.empty()) {
  92. sample_.emplace_back(buffer_[0], 1, 0);
  93. ++start;
  94. ++count_;
  95. }
  96. std::size_t idx = 0;
  97. std::size_t item = idx++;
  98. for (std::size_t i = start; i < buffer_count_; ++i) {
  99. double v = buffer_[i];
  100. while (idx < sample_.size() && sample_[item].value < v) {
  101. item = idx++;
  102. }
  103. if (sample_[item].value > v) {
  104. --idx;
  105. }
  106. int delta;
  107. if (idx - 1 == 0 || idx + 1 == sample_.size()) {
  108. delta = 0;
  109. }
  110. else {
  111. delta = static_cast<int>(std::floor(allowableError(static_cast<int>(idx + 1)))) + 1;
  112. }
  113. sample_.emplace(sample_.begin() + idx, v, 1, delta);
  114. count_++;
  115. item = idx++;
  116. }
  117. buffer_count_ = 0;
  118. return true;
  119. }
  120. void compress() {
  121. if (sample_.size() < 2) {
  122. return;
  123. }
  124. std::size_t idx = 0;
  125. std::size_t prev;
  126. std::size_t next = idx++;
  127. while (idx < sample_.size()) {
  128. prev = next;
  129. next = idx++;
  130. if (sample_[prev].g + sample_[next].g + sample_[next].delta <=
  131. allowableError(static_cast<int>(idx - 1))) {
  132. sample_[next].g += sample_[prev].g;
  133. sample_.erase(sample_.begin() + prev);
  134. }
  135. }
  136. }
  137. private:
  138. const std::reference_wrapper<const std::vector<Quantile>> quantiles_;
  139. std::size_t count_;
  140. std::vector<Item> sample_;
  141. std::array<double, 500> buffer_;
  142. std::size_t buffer_count_;
  143. };
  144. } // namespace detail
  145. } // namespace prometheus