gateway.h 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. #pragma once
  2. #include "prometheus/collectable.h"
  3. #include "prometheus/text_serializer.h"
  4. #include "prometheus/metric_family.h"
  5. #include <jdl/httpclientlite.h>
  6. #include <memory>
  7. #include <mutex>
  8. #include <string>
  9. #include <sstream>
  10. #include <vector>
  11. #include <map>
  12. #include <future>
  13. #include <algorithm>
  14. #include <utility>
  15. namespace prometheus {
  16. class Gateway {
  17. using CollectableEntry = std::pair<std::weak_ptr<Collectable>, std::string>;
  18. std::string job_uri_;
  19. std::string labels_;
  20. std::mutex mutex_;
  21. std::vector<CollectableEntry> collectables_;
  22. enum class HttpMethod : uint8_t{
  23. Post,
  24. Put,
  25. Delete,
  26. };
  27. public:
  28. using Labels = std::map<std::string, std::string>;
  29. Gateway(const std::string host, const std::string port,
  30. const std::string jobname, const Labels& labels = {})
  31. : job_uri_(host + ':' + port + std::string("/metrics/job/") + jobname)
  32. , labels_{}
  33. {
  34. std::stringstream label_strm;
  35. for (const auto& label : labels) {
  36. label_strm << "/" << label.first << "/" << label.second;
  37. }
  38. labels_ = label_strm.str();
  39. }
  40. void RegisterCollectable(const std::weak_ptr<Collectable>& collectable,
  41. const Labels* labels = nullptr) {
  42. std::stringstream label_strm;
  43. if (labels != nullptr) {
  44. for (const auto& label : *labels) {
  45. label_strm << "/" << label.first << "/" << label.second;
  46. }
  47. }
  48. CleanupStalePointers(collectables_);
  49. collectables_.emplace_back(std::make_pair(collectable, label_strm.str()));
  50. }
  51. static const Labels GetInstanceLabel(const std::string& hostname) {
  52. if (hostname.empty()) {
  53. return Gateway::Labels{};
  54. }
  55. return Gateway::Labels{{"instance", hostname}};
  56. }
  57. // Push metrics to the given pushgateway.
  58. int Push() {
  59. return push(HttpMethod::Post);
  60. }
  61. std::future<int> AsyncPush() {
  62. return async_push(HttpMethod::Post);
  63. }
  64. // PushAdd metrics to the given pushgateway.
  65. int PushAdd() {
  66. return push(HttpMethod::Put);
  67. }
  68. std::future<int> AsyncPushAdd() {
  69. return async_push(HttpMethod::Put);
  70. }
  71. // Delete metrics from the given pushgateway.
  72. int Delete() {
  73. return performHttpRequest(HttpMethod::Delete, job_uri_, {});
  74. }
  75. // Delete metrics from the given pushgateway.
  76. std::future<int> AsyncDelete() {
  77. return std::async(std::launch::async, [&] { return Delete(); });
  78. }
  79. private:
  80. std::string getUri(const CollectableEntry& collectable) const {
  81. return (job_uri_ + labels_ + collectable.second);
  82. }
  83. int performHttpRequest(HttpMethod /*method*/, const std::string& uri_str, const std::string& body) {
  84. std::lock_guard<std::mutex> l(mutex_);
  85. /* Stub function. The implementation will be later, after connecting the
  86. * additional library of HTTP requests. */
  87. jdl::URI uri(uri_str);
  88. jdl::HTTPResponse response = jdl::HTTPClient::request(jdl::HTTPClient::m_post, uri, body);
  89. return std::stoi(response.response);
  90. }
  91. int push(HttpMethod method) {
  92. const auto serializer = TextSerializer{};
  93. for (const auto& wcollectable : collectables_) {
  94. auto collectable = wcollectable.first.lock();
  95. if (!collectable) {
  96. continue;
  97. }
  98. auto metrics = collectable->Collect();
  99. auto uri = getUri(wcollectable);
  100. std::stringstream body;
  101. serializer.Serialize(body, metrics);
  102. std::string body_str = body.str();
  103. auto status_code = performHttpRequest(method, uri, body_str);
  104. if (status_code < 100 || status_code >= 400) {
  105. return status_code;
  106. }
  107. }
  108. return 200;
  109. }
  110. std::future<int> async_push(HttpMethod method) {
  111. const auto serializer = TextSerializer{};
  112. std::vector<std::future<int>> futures;
  113. for (const auto& wcollectable : collectables_) {
  114. auto collectable = wcollectable.first.lock();
  115. if (!collectable) {
  116. continue;
  117. }
  118. auto metrics = collectable->Collect();
  119. auto uri = getUri(wcollectable);
  120. std::stringstream body;
  121. serializer.Serialize(body, metrics);
  122. auto body_ptr = std::make_shared<std::string>(body.str());
  123. futures.emplace_back(std::async(std::launch::async, [method, &uri, &body_ptr, this] {
  124. return performHttpRequest(method, uri, *body_ptr);
  125. }));
  126. }
  127. const auto reduceFutures = [](std::vector<std::future<int>> lfutures) {
  128. auto final_status_code = 200;
  129. for (auto& future : lfutures) {
  130. auto status_code = future.get();
  131. if (status_code < 100 || status_code >= 400) {
  132. final_status_code = status_code;
  133. }
  134. }
  135. return final_status_code;
  136. };
  137. return std::async(std::launch::async, reduceFutures, std::move(futures));
  138. }
  139. static void CleanupStalePointers(std::vector<CollectableEntry>& collectables) {
  140. collectables.erase(std::remove_if(std::begin(collectables), std::end(collectables),
  141. [](const CollectableEntry& candidate) {
  142. return candidate.first.expired();
  143. }),
  144. std::end(collectables));
  145. }
  146. };
  147. } // namespace prometheus