ConnectionPool.hpp 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. /*
  2. * Copyright (c)2021 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2026-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_CONNECTION_POOL_H_
  14. #define ZT_CONNECTION_POOL_H_
  15. #ifndef _DEBUG
  16. #define _DEBUG(x)
  17. #endif
  18. #include "../node/Metrics.hpp"
  19. #include "opentelemetry/trace/provider.h"
  20. #include <deque>
  21. #include <exception>
  22. #include <memory>
  23. #include <mutex>
  24. #include <set>
  25. #include <string>
  26. namespace ZeroTier {
  27. struct ConnectionUnavailable : std::exception {
  28. char const* what() const throw()
  29. {
  30. return "Unable to allocate connection";
  31. };
  32. };
  33. class Connection {
  34. public:
  35. virtual ~Connection() {};
  36. };
  37. class ConnectionFactory {
  38. public:
  39. virtual ~ConnectionFactory() {};
  40. virtual std::shared_ptr<Connection> create() = 0;
  41. };
  42. struct ConnectionPoolStats {
  43. size_t pool_size;
  44. size_t borrowed_size;
  45. };
  46. template <class T> class ConnectionPool {
  47. public:
  48. ConnectionPool(size_t max_pool_size, size_t min_pool_size, std::shared_ptr<ConnectionFactory> factory) : m_maxPoolSize(max_pool_size), m_minPoolSize(min_pool_size), m_factory(factory)
  49. {
  50. Metrics::max_pool_size += max_pool_size;
  51. Metrics::min_pool_size += min_pool_size;
  52. while (m_pool.size() < m_minPoolSize) {
  53. m_pool.push_back(m_factory->create());
  54. Metrics::pool_avail++;
  55. }
  56. };
  57. ConnectionPoolStats get_stats()
  58. {
  59. std::unique_lock<std::mutex> lock(m_poolMutex);
  60. ConnectionPoolStats stats;
  61. stats.pool_size = m_pool.size();
  62. stats.borrowed_size = m_borrowed.size();
  63. return stats;
  64. };
  65. ~ConnectionPool() {};
  66. /**
  67. * Borrow
  68. *
  69. * Borrow a connection for temporary use
  70. *
  71. * When done, either (a) call unborrow() to return it, or (b) (if it's bad) just let it go out of scope. This will cause it to automatically be replaced.
  72. * @retval a shared_ptr to the connection object
  73. */
  74. std::shared_ptr<T> borrow()
  75. {
  76. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  77. auto tracer = provider->GetTracer("connection_pool");
  78. auto span = tracer->StartSpan("connection_pool::borrow");
  79. auto scope = tracer->WithActiveSpan(span);
  80. std::unique_lock<std::mutex> l(m_poolMutex);
  81. while ((m_pool.size() + m_borrowed.size()) < m_minPoolSize) {
  82. std::shared_ptr<Connection> conn = m_factory->create();
  83. m_pool.push_back(conn);
  84. Metrics::pool_avail++;
  85. }
  86. if (m_pool.size() == 0) {
  87. if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
  88. try {
  89. std::shared_ptr<Connection> conn = m_factory->create();
  90. m_borrowed.insert(conn);
  91. Metrics::pool_in_use++;
  92. return std::static_pointer_cast<T>(conn);
  93. }
  94. catch (std::exception& e) {
  95. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  96. Metrics::pool_errors++;
  97. throw ConnectionUnavailable();
  98. }
  99. }
  100. else {
  101. for (auto it = m_borrowed.begin(); it != m_borrowed.end(); ++it) {
  102. if ((*it).unique()) {
  103. // This connection has been abandoned! Destroy it and create a new connection
  104. try {
  105. // If we are able to create a new connection, return it
  106. _DEBUG("Creating new connection to replace discarded connection");
  107. std::shared_ptr<Connection> conn = m_factory->create();
  108. m_borrowed.erase(it);
  109. m_borrowed.insert(conn);
  110. return std::static_pointer_cast<T>(conn);
  111. }
  112. catch (std::exception& e) {
  113. span->SetStatus(opentelemetry::trace::StatusCode::kError, e.what());
  114. // Error creating a replacement connection
  115. Metrics::pool_errors++;
  116. throw ConnectionUnavailable();
  117. }
  118. }
  119. }
  120. span->SetStatus(opentelemetry::trace::StatusCode::kError, "No available connections in pool");
  121. // Nothing available
  122. Metrics::pool_errors++;
  123. throw ConnectionUnavailable();
  124. }
  125. }
  126. // Take one off the front
  127. std::shared_ptr<Connection> conn = m_pool.front();
  128. m_pool.pop_front();
  129. Metrics::pool_avail--;
  130. // Add it to the borrowed list
  131. m_borrowed.insert(conn);
  132. Metrics::pool_in_use++;
  133. return std::static_pointer_cast<T>(conn);
  134. };
  135. /**
  136. * Unborrow a connection
  137. *
  138. * Only call this if you are returning a working connection. If the connection was bad, just let it go out of scope (so the connection manager can replace it).
  139. * @param the connection
  140. */
  141. void unborrow(std::shared_ptr<T> conn)
  142. {
  143. auto provider = opentelemetry::trace::Provider::GetTracerProvider();
  144. auto tracer = provider->GetTracer("connection_pool");
  145. auto span = tracer->StartSpan("connection_pool::unborrow");
  146. auto scope = tracer->WithActiveSpan(span);
  147. // Lock
  148. std::unique_lock<std::mutex> lock(m_poolMutex);
  149. m_borrowed.erase(conn);
  150. Metrics::pool_in_use--;
  151. if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
  152. Metrics::pool_avail++;
  153. m_pool.push_back(conn);
  154. }
  155. };
  156. protected:
  157. size_t m_maxPoolSize;
  158. size_t m_minPoolSize;
  159. std::shared_ptr<ConnectionFactory> m_factory;
  160. std::deque<std::shared_ptr<Connection> > m_pool;
  161. std::set<std::shared_ptr<Connection> > m_borrowed;
  162. std::mutex m_poolMutex;
  163. };
  164. } // namespace ZeroTier
  165. #endif