ConnectionPool.hpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. /*
  2. * Copyright (c)2021 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2025-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_CONNECTION_POOL_H_
  14. #define ZT_CONNECTION_POOL_H_
  15. #ifndef _DEBUG
  16. #define _DEBUG(x)
  17. #endif
  18. #include <deque>
  19. #include <set>
  20. #include <memory>
  21. #include <mutex>
  22. #include <exception>
  23. #include <string>
  24. namespace ZeroTier {
  25. struct ConnectionUnavailable : std::exception {
  26. char const* what() const throw() {
  27. return "Unable to allocate connection";
  28. };
  29. };
  30. class Connection {
  31. public:
  32. virtual ~Connection() {};
  33. };
  34. class ConnectionFactory {
  35. public:
  36. virtual ~ConnectionFactory() {};
  37. virtual std::shared_ptr<Connection> create()=0;
  38. };
  39. struct ConnectionPoolStats {
  40. size_t pool_size;
  41. size_t borrowed_size;
  42. };
  43. template<class T>
  44. class ConnectionPool {
  45. public:
  46. ConnectionPool(size_t max_pool_size, size_t min_pool_size, std::shared_ptr<ConnectionFactory> factory)
  47. : m_maxPoolSize(max_pool_size)
  48. , m_minPoolSize(min_pool_size)
  49. , m_factory(factory)
  50. {
  51. while(m_pool.size() < m_minPoolSize){
  52. m_pool.push_back(m_factory->create());
  53. }
  54. };
  55. ConnectionPoolStats get_stats() {
  56. std::unique_lock<std::mutex> lock(m_poolMutex);
  57. ConnectionPoolStats stats;
  58. stats.pool_size = m_pool.size();
  59. stats.borrowed_size = m_borrowed.size();
  60. return stats;
  61. };
  62. ~ConnectionPool() {
  63. };
  64. /**
  65. * Borrow
  66. *
  67. * Borrow a connection for temporary use
  68. *
  69. * When done, either (a) call unborrow() to return it, or (b) (if it's bad) just let it go out of scope. This will cause it to automatically be replaced.
  70. * @retval a shared_ptr to the connection object
  71. */
  72. std::shared_ptr<T> borrow() {
  73. std::unique_lock<std::mutex> l(m_poolMutex);
  74. while((m_pool.size() + m_borrowed.size()) < m_minPoolSize) {
  75. std::shared_ptr<Connection> conn = m_factory->create();
  76. m_pool.push_back(conn);
  77. }
  78. if(m_pool.size()==0){
  79. if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
  80. try {
  81. std::shared_ptr<Connection> conn = m_factory->create();
  82. m_borrowed.insert(conn);
  83. return std::static_pointer_cast<T>(conn);
  84. } catch (std::exception &e) {
  85. throw ConnectionUnavailable();
  86. }
  87. } else {
  88. for(auto it = m_borrowed.begin(); it != m_borrowed.end(); ++it){
  89. if((*it).unique()) {
  90. // This connection has been abandoned! Destroy it and create a new connection
  91. try {
  92. // If we are able to create a new connection, return it
  93. _DEBUG("Creating new connection to replace discarded connection");
  94. std::shared_ptr<Connection> conn = m_factory->create();
  95. m_borrowed.erase(it);
  96. m_borrowed.insert(conn);
  97. return std::static_pointer_cast<T>(conn);
  98. } catch(std::exception& e) {
  99. // Error creating a replacement connection
  100. throw ConnectionUnavailable();
  101. }
  102. }
  103. }
  104. // Nothing available
  105. throw ConnectionUnavailable();
  106. }
  107. }
  108. // Take one off the front
  109. std::shared_ptr<Connection> conn = m_pool.front();
  110. m_pool.pop_front();
  111. // Add it to the borrowed list
  112. m_borrowed.insert(conn);
  113. return std::static_pointer_cast<T>(conn);
  114. };
  115. /**
  116. * Unborrow a connection
  117. *
  118. * Only call this if you are returning a working connection. If the connection was bad, just let it go out of scope (so the connection manager can replace it).
  119. * @param the connection
  120. */
  121. void unborrow(std::shared_ptr<T> conn) {
  122. // Lock
  123. std::unique_lock<std::mutex> lock(m_poolMutex);
  124. m_borrowed.erase(conn);
  125. if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
  126. m_pool.push_back(conn);
  127. }
  128. };
  129. protected:
  130. size_t m_maxPoolSize;
  131. size_t m_minPoolSize;
  132. std::shared_ptr<ConnectionFactory> m_factory;
  133. std::deque<std::shared_ptr<Connection> > m_pool;
  134. std::set<std::shared_ptr<Connection> > m_borrowed;
  135. std::mutex m_poolMutex;
  136. };
  137. }
  138. #endif