瀏覽代碼

transiton to libpqxx & connection pool for central controllers

Grant Limberg 4 年之前
父節點
當前提交
47154fa623
共有 4 個文件被更改,包括 450 次插入562 次删除
  1. 161 0
      controller/ConnectionPool.hpp
  2. 220 554
      controller/PostgreSQL.cpp
  3. 67 6
      controller/PostgreSQL.hpp
  4. 2 2
      make-mac.mk

+ 161 - 0
controller/ConnectionPool.hpp

@@ -0,0 +1,161 @@
+/*
+ * Copyright (c)2021 ZeroTier, Inc.
+ *
+ * Use of this software is governed by the Business Source License included
+ * in the LICENSE.TXT file in the project's root directory.
+ *
+ * Change Date: 2025-01-01
+ *
+ * On the date above, in accordance with the Business Source License, use
+ * of this software will be governed by version 2.0 of the Apache License.
+ */
+/****/
+
+#ifndef ZT_CONNECTION_POOL_H_
+#define ZT_CONNECTION_POOL_H_
+
+
+#ifndef _DEBUG
+	#define _DEBUG(x)
+#endif
+
+#include <deque>
+#include <set>
+#include <memory>
+#include <mutex>
+#include <exception>
+#include <string>
+
+namespace ZeroTier {
+
+struct ConnectionUnavailable : std::exception { 
+    char const* what() const throw() {
+        return "Unable to allocate connection";
+    }; 
+};
+
+
+class Connection {
+public:
+    virtual ~Connection() {};
+};
+
+class ConnectionFactory {
+public:
+    virtual ~ConnectionFactory() {};
+    virtual std::shared_ptr<Connection> create()=0;
+};
+
+struct ConnectionPoolStats {
+    size_t pool_size;
+    size_t borrowed_size;
+};
+
+template<class T>
+class ConnectionPool {
+public:
+    ConnectionPool(size_t max_pool_size, size_t min_pool_size, std::shared_ptr<ConnectionFactory> factory)
+        : m_maxPoolSize(max_pool_size)
+        , m_minPoolSize(min_pool_size)
+        , m_factory(factory)
+    {
+        while(m_pool.size() < m_minPoolSize){
+            m_pool.push_back(m_factory->create());
+        }
+    };
+
+    ConnectionPoolStats get_stats() {
+        std::unique_lock<std::mutex> lock(m_poolMutex);
+
+        ConnectionPoolStats stats;
+        stats.pool_size = m_pool.size();
+        stats.borrowed_size = m_borrowed.size();			
+
+        return stats;
+    };
+
+    ~ConnectionPool() {
+    };
+
+    /**
+     * Borrow
+     *
+     * Borrow a connection for temporary use
+     *
+     * When done, either (a) call unborrow() to return it, or (b) (if it's bad) just let it go out of scope.  This will cause it to automatically be replaced.
+     * @retval a shared_ptr to the connection object
+     */
+    std::shared_ptr<T> borrow() {
+        std::unique_lock<std::mutex> l(m_poolMutex);
+        
+        while((m_pool.size() + m_borrowed.size()) < m_minPoolSize) {
+            std::shared_ptr<Connection> conn = m_factory->create();
+            m_pool.push_back(conn);
+        }
+
+        if(m_pool.size()==0){
+            
+            if ((m_pool.size() + m_borrowed.size()) <= m_maxPoolSize) {
+                try {
+                    std::shared_ptr<Connection> conn = m_factory->create();
+                    m_borrowed.insert(conn);
+                    return std::static_pointer_cast<T>(conn);
+                } catch (std::exception &e) {
+                    throw ConnectionUnavailable();
+                }
+            } else {
+                for(auto it = m_borrowed.begin(); it != m_borrowed.end(); ++it){
+                    if((*it).unique()) {
+                        // This connection has been abandoned! Destroy it and create a new connection
+                        try {
+                            // If we are able to create a new connection, return it
+                            _DEBUG("Creating new connection to replace discarded connection");
+                            std::shared_ptr<Connection> conn = m_factory->create();
+                            m_borrowed.erase(it);
+                            m_borrowed.insert(conn);
+                            return std::static_pointer_cast<T>(conn);
+                        } catch(std::exception& e) {
+                            // Error creating a replacement connection
+                            throw ConnectionUnavailable();
+                        }
+                    }
+                }
+                // Nothing available
+                throw ConnectionUnavailable();
+            }
+        }
+
+        // Take one off the front
+        std::shared_ptr<Connection> conn = m_pool.front();
+        m_pool.pop_front();
+        // Add it to the borrowed list
+        m_borrowed.insert(conn);
+        return std::static_pointer_cast<T>(conn);
+    };
+
+    /**
+     * Unborrow a connection
+     *
+     * Only call this if you are returning a working connection.  If the connection was bad, just let it go out of scope (so the connection manager can replace it).
+     * @param the connection
+     */
+    void unborrow(std::shared_ptr<T> conn) {
+        // Lock
+        std::unique_lock<std::mutex> lock(m_poolMutex);
+        m_borrowed.erase(conn);
+        if ((m_pool.size() + m_borrowed.size()) < m_maxPoolSize) {
+            m_pool.push_back(conn);
+        }
+    };
+protected:
+    size_t m_maxPoolSize;
+    size_t m_minPoolSize;
+    std::shared_ptr<ConnectionFactory> m_factory;
+    std::deque<std::shared_ptr<Connection> > m_pool;
+    std::set<std::shared_ptr<Connection> > m_borrowed;
+    std::mutex m_poolMutex;
+};
+
+}
+
+#endif

文件差異過大導致無法顯示
+ 220 - 554
controller/PostgreSQL.cpp


+ 67 - 6
controller/PostgreSQL.hpp

@@ -20,6 +20,9 @@
 
 #define ZT_CENTRAL_CONTROLLER_COMMIT_THREADS 4
 
+#include "ConnectionPool.hpp"
+#include <pqxx/pqxx>
+
 #include <memory>
 #include <redis++/redis++.h>
 
@@ -31,14 +34,65 @@ namespace ZeroTier {
 
 struct RedisConfig;
 
+
+class PostgresConnection : public Connection {
+public:
+	virtual ~PostgresConnection() {
+	}
+
+	std::shared_ptr<pqxx::connection> c;
+	int a;
+};
+
+
+class PostgresConnFactory : public ConnectionFactory {
+public:
+	PostgresConnFactory(std::string &connString) 
+		: m_connString(connString)
+	{
+	}
+
+	virtual std::shared_ptr<Connection> create() {
+		auto c = std::shared_ptr<PostgresConnection>(new PostgresConnection());
+		c->c = std::make_shared<pqxx::connection>(m_connString);
+		return std::static_pointer_cast<Connection>(c);
+	}
+private:
+	std::string m_connString;
+};
+
+class PostgreSQL;
+
+class MemberNotificationReceiver : public pqxx::notification_receiver {
+public: 
+	MemberNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel);
+	virtual ~MemberNotificationReceiver() {}
+
+	virtual void operator() (const std::string &payload, int backendPid);
+private:
+	PostgreSQL *_psql;
+};
+
+class NetworkNotificationReceiver : public pqxx::notification_receiver {
+public:
+	NetworkNotificationReceiver(PostgreSQL *p, pqxx::connection &c, const std::string &channel);
+	virtual ~NetworkNotificationReceiver() {};
+
+	virtual void operator() (const std::string &payload, int packend_pid);
+private:
+	PostgreSQL *_psql;
+};
+
 /**
  * A controller database driver that talks to PostgreSQL
  *
  * This is for use with ZeroTier Central.  Others are free to build and use it
- * but be aware taht we might change it at any time.
+ * but be aware that we might change it at any time.
  */
 class PostgreSQL : public DB
 {
+	friend class MemberNotificationReceiver;
+	friend class NetworkNotificationReceiver;
 public:
 	PostgreSQL(const Identity &myId, const char *path, int listenPort, RedisConfig *rc);
 	virtual ~PostgreSQL();
@@ -56,15 +110,22 @@ protected:
 	{
 		inline std::size_t operator()(const std::pair<uint64_t,uint64_t> &p) const { return (std::size_t)(p.first ^ p.second); }
 	};
+	void _memberChanged(nlohmann::json &old,nlohmann::json &memberConfig,bool notifyListeners) {
+		DB::_memberChanged(old, memberConfig, notifyListeners);
+	}
+
+	void _networkChanged(nlohmann::json &old,nlohmann::json &networkConfig,bool notifyListeners) {
+		DB::_memberChanged(old, networkConfig, notifyListeners);
+	}
 
 private:
-	void initializeNetworks(PGconn *conn);
-	void initializeMembers(PGconn *conn);
+	void initializeNetworks();
+	void initializeMembers();
 	void heartbeat();
 	void membersDbWatcher();
-	void _membersWatcher_Postgres(PGconn *conn);
+	void _membersWatcher_Postgres();
 	void networksDbWatcher();
-	void _networksWatcher_Postgres(PGconn *conn);
+	void _networksWatcher_Postgres();
 
 	void _membersWatcher_Redis();
 	void _networksWatcher_Redis();
@@ -81,7 +142,7 @@ private:
 		NO_OVERRIDE = 1
 	};
 
-	PGconn * getPgConn( OverrideMode m = ALLOW_PGBOUNCER_OVERRIDE );
+	std::shared_ptr<ConnectionPool<PostgresConnection> > _pool;
 
 	const Identity _myId;
 	const Address _myAddress;

+ 2 - 2
make-mac.mk

@@ -28,9 +28,9 @@ include objects.mk
 ONE_OBJS+=osdep/MacEthernetTap.o osdep/MacKextEthernetTap.o osdep/MacDNSHelper.o ext/http-parser/http_parser.o
 
 ifeq ($(ZT_CONTROLLER),1)
-	LIBS+=-L/usr/local/opt/libpq/lib -lpq ext/redis-plus-plus-1.1.1/install/macos/lib/libredis++.a ext/hiredis-0.14.1/lib/macos/libhiredis.a
+	LIBS+=-L/usr/local/opt/libpqxx@6/lib -L/usr/local/opt/libpq/lib -L/usr/local/opt/openssl/lib/ -lpqxx -lpq -lssl -lcrypto -lgssapi_krb5 ext/redis-plus-plus-1.1.1/install/macos/lib/libredis++.a ext/hiredis-0.14.1/lib/macos/libhiredis.a
 	DEFS+=-DZT_CONTROLLER_USE_LIBPQ -DZT_CONTROLLER_USE_REDIS -DZT_CONTROLLER 
-	INCLUDES+=-I/usr/local/opt/libpq/include -Iext/hiredis-0.14.1/include/ -Iext/redis-plus-plus-1.1.1/install/macos/include/sw/
+	INCLUDES+=-I/usr/local/opt/libpq/include -I/usr/local/opt/libpqxx@6/include -Iext/hiredis-0.14.1/include/ -Iext/redis-plus-plus-1.1.1/install/macos/include/sw/
 endif
 
 LIBS+=-framework CoreServices -framework SystemConfiguration -framework CoreFoundation

部分文件因文件數量過多而無法顯示