ConnectionPool.hpp 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. /*
  2. * Copyright (c)2021 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2026-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_CONNECTION_POOL_H_
  14. #define ZT_CONNECTION_POOL_H_
  15. #ifndef _DEBUG
  16. #define _DEBUG(x)
  17. #endif
  18. #include "../node/Metrics.hpp"
  19. #include <deque>
  20. #include <exception>
  21. #include <memory>
  22. #include <mutex>
  23. #include <set>
  24. #include <string>
  25. namespace ZeroTier {
  26. struct ConnectionUnavailable : std::exception {
  27. char const* what() const throw()
  28. {
  29. return "Unable to allocate connection";
  30. };
  31. };
  32. class Connection {
  33. public:
  34. virtual ~Connection() {};
  35. };
  36. class ConnectionFactory {
  37. public:
  38. virtual ~ConnectionFactory() {};
  39. virtual std::shared_ptr<Connection> create() = 0;
  40. };
  41. struct ConnectionPoolStats {
  42. size_t pool_size;
  43. size_t borrowed_size;
  44. };
  45. template <class T> class ConnectionPool {
  46. public:
  47. ConnectionPool(size_t max_pool_size, size_t min_pool_size, std::shared_ptr<ConnectionFactory> factory) : m_maxPoolSize(max_pool_size), m_minPoolSize(min_pool_size), m_factory(factory)
  48. {
  49. Metrics::max_pool_size += max_pool_size;
  50. Metrics::min_pool_size += min_pool_size;
  51. while (m_pool.size() < m_minPoolSize) {
  52. m_pool.push_back(m_factory->create());
  53. Metrics::pool_avail++;
  54. }
  55. };
  56. ConnectionPoolStats get_stats()
  57. {
  58. std::unique_lock<std::mutex> lock(m_poolMutex);
  59. ConnectionPoolStats stats;
  60. stats.pool_size = m_pool.size();
  61. stats.borrowed_size = m_borrowed.size();
  62. return stats;
  63. };
  64. ~ConnectionPool() {};
  65. /**
  66. * Borrow
  67. *
  68. * Borrow a connection for temporary use
  69. *
  70. * When done, either (a) call unborrow() to return it, or (b) (if it's bad) just let it go out of scope. This will cause it to automatically be replaced.
  71. * @retval a shared_ptr to the connection object
  72. */
  73. std::shared_ptr<T> borrow()
  74. {
  75. std::unique_lock<std::mutex> l(m_poolMutex);
  76. while ((m_pool.size() + m_borrowed.size()) < m_minPoolSize) {
  77. std::shared_ptr<Connection> conn = m_factory->create();
  78. m_pool.push_back(conn);
  79. Metrics::pool_avail++;
  80. }
  81. if (m_pool.size() == 0) {
  82. if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
  83. try {
  84. std::shared_ptr<Connection> conn = m_factory->create();
  85. m_borrowed.insert(conn);
  86. Metrics::pool_in_use++;
  87. return std::static_pointer_cast<T>(conn);
  88. }
  89. catch (std::exception& e) {
  90. Metrics::pool_errors++;
  91. throw ConnectionUnavailable();
  92. }
  93. }
  94. else {
  95. for (auto it = m_borrowed.begin(); it != m_borrowed.end(); ++it) {
  96. if ((*it).unique()) {
  97. // This connection has been abandoned! Destroy it and create a new connection
  98. try {
  99. // If we are able to create a new connection, return it
  100. _DEBUG("Creating new connection to replace discarded connection");
  101. std::shared_ptr<Connection> conn = m_factory->create();
  102. m_borrowed.erase(it);
  103. m_borrowed.insert(conn);
  104. return std::static_pointer_cast<T>(conn);
  105. }
  106. catch (std::exception& e) {
  107. // Error creating a replacement connection
  108. Metrics::pool_errors++;
  109. throw ConnectionUnavailable();
  110. }
  111. }
  112. }
  113. // Nothing available
  114. Metrics::pool_errors++;
  115. throw ConnectionUnavailable();
  116. }
  117. }
  118. // Take one off the front
  119. std::shared_ptr<Connection> conn = m_pool.front();
  120. m_pool.pop_front();
  121. Metrics::pool_avail--;
  122. // Add it to the borrowed list
  123. m_borrowed.insert(conn);
  124. Metrics::pool_in_use++;
  125. return std::static_pointer_cast<T>(conn);
  126. };
  127. /**
  128. * Unborrow a connection
  129. *
  130. * Only call this if you are returning a working connection. If the connection was bad, just let it go out of scope (so the connection manager can replace it).
  131. * @param the connection
  132. */
  133. void unborrow(std::shared_ptr<T> conn)
  134. {
  135. // Lock
  136. std::unique_lock<std::mutex> lock(m_poolMutex);
  137. m_borrowed.erase(conn);
  138. Metrics::pool_in_use--;
  139. if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
  140. Metrics::pool_avail++;
  141. m_pool.push_back(conn);
  142. }
  143. };
  144. protected:
  145. size_t m_maxPoolSize;
  146. size_t m_minPoolSize;
  147. std::shared_ptr<ConnectionFactory> m_factory;
  148. std::deque<std::shared_ptr<Connection> > m_pool;
  149. std::set<std::shared_ptr<Connection> > m_borrowed;
  150. std::mutex m_poolMutex;
  151. };
  152. } // namespace ZeroTier
  153. #endif