2
0

ConnectionPool.hpp 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. /*
  2. * Copyright (c)2021 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2026-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_CONNECTION_POOL_H_
  14. #define ZT_CONNECTION_POOL_H_
  15. #ifndef _DEBUG
  16. #define _DEBUG(x)
  17. #endif
  18. #include "../node/Metrics.hpp"
  19. #include <deque>
  20. #include <set>
  21. #include <memory>
  22. #include <mutex>
  23. #include <exception>
  24. #include <string>
  25. namespace ZeroTier {
  26. struct ConnectionUnavailable : std::exception {
  27. char const* what() const throw() {
  28. return "Unable to allocate connection";
  29. };
  30. };
  31. class Connection {
  32. public:
  33. virtual ~Connection() {};
  34. };
  35. class ConnectionFactory {
  36. public:
  37. virtual ~ConnectionFactory() {};
  38. virtual std::shared_ptr<Connection> create()=0;
  39. };
  40. struct ConnectionPoolStats {
  41. size_t pool_size;
  42. size_t borrowed_size;
  43. };
  44. template<class T>
  45. class ConnectionPool {
  46. public:
  47. ConnectionPool(size_t max_pool_size, size_t min_pool_size, std::shared_ptr<ConnectionFactory> factory)
  48. : m_maxPoolSize(max_pool_size)
  49. , m_minPoolSize(min_pool_size)
  50. , m_factory(factory)
  51. {
  52. Metrics::max_pool_size += max_pool_size;
  53. Metrics::min_pool_size += min_pool_size;
  54. while(m_pool.size() < m_minPoolSize){
  55. m_pool.push_back(m_factory->create());
  56. Metrics::pool_avail++;
  57. }
  58. };
  59. ConnectionPoolStats get_stats() {
  60. std::unique_lock<std::mutex> lock(m_poolMutex);
  61. ConnectionPoolStats stats;
  62. stats.pool_size = m_pool.size();
  63. stats.borrowed_size = m_borrowed.size();
  64. return stats;
  65. };
  66. ~ConnectionPool() {
  67. };
  68. /**
  69. * Borrow
  70. *
  71. * Borrow a connection for temporary use
  72. *
  73. * When done, either (a) call unborrow() to return it, or (b) (if it's bad) just let it go out of scope. This will cause it to automatically be replaced.
  74. * @retval a shared_ptr to the connection object
  75. */
  76. std::shared_ptr<T> borrow() {
  77. std::unique_lock<std::mutex> l(m_poolMutex);
  78. while((m_pool.size() + m_borrowed.size()) < m_minPoolSize) {
  79. std::shared_ptr<Connection> conn = m_factory->create();
  80. m_pool.push_back(conn);
  81. Metrics::pool_avail++;
  82. }
  83. if(m_pool.size()==0){
  84. if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
  85. try {
  86. std::shared_ptr<Connection> conn = m_factory->create();
  87. m_borrowed.insert(conn);
  88. Metrics::pool_in_use++;
  89. return std::static_pointer_cast<T>(conn);
  90. } catch (std::exception &e) {
  91. Metrics::pool_errors++;
  92. throw ConnectionUnavailable();
  93. }
  94. } else {
  95. for(auto it = m_borrowed.begin(); it != m_borrowed.end(); ++it){
  96. if((*it).unique()) {
  97. // This connection has been abandoned! Destroy it and create a new connection
  98. try {
  99. // If we are able to create a new connection, return it
  100. _DEBUG("Creating new connection to replace discarded connection");
  101. std::shared_ptr<Connection> conn = m_factory->create();
  102. m_borrowed.erase(it);
  103. m_borrowed.insert(conn);
  104. return std::static_pointer_cast<T>(conn);
  105. } catch(std::exception& e) {
  106. // Error creating a replacement connection
  107. Metrics::pool_errors++;
  108. throw ConnectionUnavailable();
  109. }
  110. }
  111. }
  112. // Nothing available
  113. Metrics::pool_errors++;
  114. throw ConnectionUnavailable();
  115. }
  116. }
  117. // Take one off the front
  118. std::shared_ptr<Connection> conn = m_pool.front();
  119. m_pool.pop_front();
  120. Metrics::pool_avail--;
  121. // Add it to the borrowed list
  122. m_borrowed.insert(conn);
  123. Metrics::pool_in_use++;
  124. return std::static_pointer_cast<T>(conn);
  125. };
  126. /**
  127. * Unborrow a connection
  128. *
  129. * Only call this if you are returning a working connection. If the connection was bad, just let it go out of scope (so the connection manager can replace it).
  130. * @param the connection
  131. */
  132. void unborrow(std::shared_ptr<T> conn) {
  133. // Lock
  134. std::unique_lock<std::mutex> lock(m_poolMutex);
  135. m_borrowed.erase(conn);
  136. Metrics::pool_in_use--;
  137. if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
  138. Metrics::pool_avail++;
  139. m_pool.push_back(conn);
  140. }
  141. };
  142. protected:
  143. size_t m_maxPoolSize;
  144. size_t m_minPoolSize;
  145. std::shared_ptr<ConnectionFactory> m_factory;
  146. std::deque<std::shared_ptr<Connection> > m_pool;
  147. std::set<std::shared_ptr<Connection> > m_borrowed;
  148. std::mutex m_poolMutex;
  149. };
  150. }
  151. #endif