Browse Source

New connection pool using carrier threads.

Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
Santiago Pericas-Geertsen 9 months ago
parent
commit
6521f36082

+ 21 - 14
frameworks/Java/helidon/nima/src/main/java/io/helidon/benchmark/nima/models/PgClientConnectionPool.java

@@ -1,6 +1,8 @@
 
 package io.helidon.benchmark.nima.models;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
 
 import io.vertx.core.Vertx;
@@ -12,38 +14,38 @@ import io.vertx.sqlclient.RowSet;
 
 class PgClientConnectionPool implements AutoCloseable {
 
-    private final int size;
     private final Vertx vertx;
     private final PgConnectOptions options;
-    private final PgClientConnection[] connections;
     private final ReentrantLock lock = new ReentrantLock();
+    private final Map<String, PgClientConnection> connectionMap = new HashMap<>();
 
-    public PgClientConnectionPool(Vertx vertx, int size, PgConnectOptions options) {
-        this.size = size;
+    public PgClientConnectionPool(Vertx vertx, PgConnectOptions options) {
         this.vertx = vertx;
         this.options = options;
-        this.connections = new PgClientConnection[size];
     }
 
     public PgClientConnection clientConnection() {
-        int bucket = Thread.currentThread().hashCode() % size;
-        if (connections[bucket] == null) {
+        String carrierThread = carrierThread();
+        PgClientConnection connection = connectionMap.get(carrierThread);
+        if (connection == null) {
             try {
                 lock.lock();
-                if (connections[bucket] == null) {
-                    connect(bucket);
+                connection = connectionMap.get(carrierThread);
+                if (connection == null) {
+                    connection = newConnection();
+                    connectionMap.put(carrierThread, connection);
                 }
             } finally {
                 lock.unlock();
             }
         }
-        return connections[bucket];
+        return connection;
     }
 
     @Override
     public void close() {
         try {
-            for (PgClientConnection connection : connections) {
+            for (PgClientConnection connection : connectionMap.values()) {
                 connection.close();
             }
         } catch (Exception e) {
@@ -51,20 +53,25 @@ class PgClientConnectionPool implements AutoCloseable {
         }
     }
 
-    private void connect(int bucket) {
+    private PgClientConnection newConnection() {
         try {
             PgConnection conn = PgConnection.connect(vertx, options)
                     .toCompletionStage().toCompletableFuture().get();
             PgClientConnection clientConn = new PgClientConnection(conn);
             clientConn.prepare();
-            connections[bucket] = clientConn;
+            return clientConn;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
+    static String carrierThread() {
+        String threadName = Thread.currentThread().toString();
+        return threadName.substring(threadName.indexOf('@') + 1);
+    }
+
     public static class PgClientConnection implements AutoCloseable {
-        private static final int UPDATE_QUERIES = 500;
+        static final int UPDATE_QUERIES = 500;
         private static String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
         private static String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
 

+ 12 - 4
frameworks/Java/helidon/nima/src/main/java/io/helidon/benchmark/nima/models/PgClientRepository.java

@@ -15,6 +15,7 @@ import io.vertx.sqlclient.RowSet;
 import io.vertx.sqlclient.Tuple;
 
 import static io.helidon.benchmark.nima.models.DbRepository.randomWorldNumber;
+import static io.helidon.benchmark.nima.models.PgClientConnectionPool.PgClientConnection.UPDATE_QUERIES;
 
 public class PgClientRepository implements DbRepository {
     private static final Logger LOGGER = Logger.getLogger(PgClientRepository.class.getName());
@@ -23,17 +24,24 @@ public class PgClientRepository implements DbRepository {
 
     @SuppressWarnings("unchecked")
     public PgClientRepository(Config config) {
-        Vertx vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
+        VertxOptions vertxOptions = new VertxOptions()
+                .setPreferNativeTransport(true)
+                .setBlockedThreadCheckInterval(100000);
+        Vertx vertx = Vertx.vertx(vertxOptions);
         PgConnectOptions connectOptions = new PgConnectOptions()
                 .setPort(config.get("port").asInt().orElse(5432))
-                .setCachePreparedStatements(config.get("cache-prepared-statements").asBoolean().orElse(true))
                 .setHost(config.get("host").asString().orElse("tfb-database"))
                 .setDatabase(config.get("db").asString().orElse("hello_world"))
                 .setUser(config.get("username").asString().orElse("benchmarkdbuser"))
                 .setPassword(config.get("password").asString().orElse("benchmarkdbpass"))
+                .setCachePreparedStatements(true)
+                .setPreparedStatementCacheMaxSize(UPDATE_QUERIES + 2)
+                .setPreparedStatementCacheSqlFilter(s -> true)          // cache all
+                .setTcpNoDelay(true)
+                .setTcpQuickAck(true)
+                .setTcpKeepAlive(true)
                 .setPipeliningLimit(100000);
-        int sqlPoolSize = config.get("sql-pool-size").asInt().orElse(Runtime.getRuntime().availableProcessors());
-        connectionPool = new PgClientConnectionPool(vertx, sqlPoolSize, connectOptions);
+        connectionPool = new PgClientConnectionPool(vertx, connectOptions);
     }
 
     @Override