ConnectionPool.hpp 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. /*
  2. * Copyright (c)2021 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2025-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_CONNECTION_POOL_H_
  14. #define ZT_CONNECTION_POOL_H_
  15. #ifndef _DEBUG
  16. #define _DEBUG(x)
  17. #endif
  18. #include "../node/Metrics.hpp"
  19. #include <deque>
  20. #include <set>
  21. #include <memory>
  22. #include <mutex>
  23. #include <exception>
  24. #include <string>
  25. namespace ZeroTier {
  26. struct ConnectionUnavailable : std::exception {
  27. char const* what() const throw() {
  28. return "Unable to allocate connection";
  29. };
  30. };
  31. class Connection {
  32. public:
  33. virtual ~Connection() {};
  34. };
  35. class ConnectionFactory {
  36. public:
  37. virtual ~ConnectionFactory() {};
  38. virtual std::shared_ptr<Connection> create()=0;
  39. };
  40. struct ConnectionPoolStats {
  41. size_t pool_size;
  42. size_t borrowed_size;
  43. };
  44. template<class T>
  45. class ConnectionPool {
  46. public:
  47. ConnectionPool(size_t max_pool_size, size_t min_pool_size, std::shared_ptr<ConnectionFactory> factory)
  48. : m_maxPoolSize(max_pool_size)
  49. , m_minPoolSize(min_pool_size)
  50. , m_factory(factory)
  51. {
  52. while(m_pool.size() < m_minPoolSize){
  53. m_pool.push_back(m_factory->create());
  54. Metrics::pool_avail++;
  55. }
  56. };
  57. ConnectionPoolStats get_stats() {
  58. std::unique_lock<std::mutex> lock(m_poolMutex);
  59. ConnectionPoolStats stats;
  60. stats.pool_size = m_pool.size();
  61. stats.borrowed_size = m_borrowed.size();
  62. return stats;
  63. };
  64. ~ConnectionPool() {
  65. };
  66. /**
  67. * Borrow
  68. *
  69. * Borrow a connection for temporary use
  70. *
  71. * When done, either (a) call unborrow() to return it, or (b) (if it's bad) just let it go out of scope. This will cause it to automatically be replaced.
  72. * @retval a shared_ptr to the connection object
  73. */
  74. std::shared_ptr<T> borrow() {
  75. std::unique_lock<std::mutex> l(m_poolMutex);
  76. while((m_pool.size() + m_borrowed.size()) < m_minPoolSize) {
  77. std::shared_ptr<Connection> conn = m_factory->create();
  78. m_pool.push_back(conn);
  79. Metrics::pool_avail++;
  80. }
  81. if(m_pool.size()==0){
  82. if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
  83. try {
  84. std::shared_ptr<Connection> conn = m_factory->create();
  85. m_borrowed.insert(conn);
  86. Metrics::pool_in_use++;
  87. return std::static_pointer_cast<T>(conn);
  88. } catch (std::exception &e) {
  89. Metrics::pool_errors++;
  90. throw ConnectionUnavailable();
  91. }
  92. } else {
  93. for(auto it = m_borrowed.begin(); it != m_borrowed.end(); ++it){
  94. if((*it).unique()) {
  95. // This connection has been abandoned! Destroy it and create a new connection
  96. try {
  97. // If we are able to create a new connection, return it
  98. _DEBUG("Creating new connection to replace discarded connection");
  99. std::shared_ptr<Connection> conn = m_factory->create();
  100. m_borrowed.erase(it);
  101. m_borrowed.insert(conn);
  102. return std::static_pointer_cast<T>(conn);
  103. } catch(std::exception& e) {
  104. // Error creating a replacement connection
  105. Metrics::pool_errors++;
  106. throw ConnectionUnavailable();
  107. }
  108. }
  109. }
  110. // Nothing available
  111. Metrics::pool_errors++;
  112. throw ConnectionUnavailable();
  113. }
  114. }
  115. // Take one off the front
  116. std::shared_ptr<Connection> conn = m_pool.front();
  117. m_pool.pop_front();
  118. Metrics::pool_avail--;
  119. // Add it to the borrowed list
  120. m_borrowed.insert(conn);
  121. Metrics::pool_in_use++;
  122. return std::static_pointer_cast<T>(conn);
  123. };
  124. /**
  125. * Unborrow a connection
  126. *
  127. * Only call this if you are returning a working connection. If the connection was bad, just let it go out of scope (so the connection manager can replace it).
  128. * @param the connection
  129. */
  130. void unborrow(std::shared_ptr<T> conn) {
  131. // Lock
  132. std::unique_lock<std::mutex> lock(m_poolMutex);
  133. m_borrowed.erase(conn);
  134. Metrics::pool_in_use--;
  135. if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
  136. Metrics::pool_avail++;
  137. m_pool.push_back(conn);
  138. }
  139. };
  140. protected:
  141. size_t m_maxPoolSize;
  142. size_t m_minPoolSize;
  143. std::shared_ptr<ConnectionFactory> m_factory;
  144. std::deque<std::shared_ptr<Connection> > m_pool;
  145. std::set<std::shared_ptr<Connection> > m_borrowed;
  146. std::mutex m_poolMutex;
  147. };
  148. }
  149. #endif