Forráskód Böngészése

New connection pool.

Signed-off-by: Santiago Pericas-Geertsen <[email protected]>
Santiago Pericas-Geertsen 10 hónapja
szülő
commit
dab117a50b

+ 125 - 0
frameworks/Java/helidon/nima/src/main/java/io/helidon/benchmark/nima/models/PgClientConnectionPool.java

@@ -0,0 +1,125 @@
+
+package io.helidon.benchmark.nima.models;
+
+import io.vertx.core.Vertx;
+import io.vertx.pgclient.PgConnectOptions;
+import io.vertx.pgclient.PgConnection;
+import io.vertx.sqlclient.PreparedQuery;
+import io.vertx.sqlclient.Row;
+import io.vertx.sqlclient.RowSet;
+
+class PgClientConnectionPool implements AutoCloseable {
+
+    private final int size;
+    private final Vertx vertx;
+    private final PgConnectOptions options;
+    private final PgClientConnection[] connections;
+
+    public PgClientConnectionPool(Vertx vertx, int size, PgConnectOptions options) {
+        this.size = size;
+        this.vertx = vertx;
+        this.options = options;
+        this.connections = new PgClientConnection[size];
+    }
+
+    public PgClientConnection clientConnection() {
+        int bucket = Thread.currentThread().hashCode() % size;
+        return connections[bucket];
+    }
+
+    public void connect() {
+        try {
+            for (int i = 0; i < size; i++) {
+                PgConnection conn = PgConnection.connect(vertx, options)
+                        .toCompletionStage().toCompletableFuture().get();
+                PgClientConnection clientConn = new PgClientConnection(conn);
+                clientConn.prepare();
+                connections[i] = clientConn;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            for (PgClientConnection connection : connections) {
+                connection.close();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static class PgClientConnection implements AutoCloseable {
+        private static final int UPDATE_QUERIES = 500;
+        private static String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
+        private static String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
+
+        private PreparedQuery<RowSet<Row>> worldQuery;
+        private PreparedQuery<RowSet<Row>> fortuneQuery;
+        private PreparedQuery<RowSet<Row>>[] updateQuery;
+
+        private final PgConnection conn;
+
+        PgClientConnection(PgConnection conn) {
+            this.conn = conn;
+        }
+
+        public PgConnection pgConnection() {
+            return conn;
+        }
+
+        @Override
+        public void close() {
+            conn.close();
+        }
+
+        public PreparedQuery<RowSet<Row>> worldQuery() {
+            return worldQuery;
+        }
+
+        public PreparedQuery<RowSet<Row>> fortuneQuery() {
+            return fortuneQuery;
+        }
+
+        public PreparedQuery<RowSet<Row>> updateQuery(int queryCount) {
+            return updateQuery[queryCount - 1];
+        }
+
+        @SuppressWarnings("unchecked")
+        void prepare() {
+            try {
+                worldQuery = conn.prepare(SELECT_WORLD)
+                        .toCompletionStage().toCompletableFuture().get().query();
+                fortuneQuery = conn.prepare(SELECT_FORTUNE)
+                        .toCompletionStage().toCompletableFuture().get().query();
+                updateQuery = (PreparedQuery<RowSet<Row>>[]) new PreparedQuery<?>[UPDATE_QUERIES];
+                for (int i = 0; i < UPDATE_QUERIES; i++) {
+                    updateQuery[i] = conn.prepare(singleUpdate(i + 1))
+                            .toCompletionStage().toCompletableFuture().get().query();
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private static String singleUpdate(int count) {
+            StringBuilder sql = new StringBuilder();
+            sql.append("UPDATE WORLD SET RANDOMNUMBER = CASE ID");
+            for (int i = 0; i < count; i++) {
+                int k = i * 2 + 1;
+                sql.append(" WHEN $").append(k).append(" THEN $").append(k + 1);
+            }
+            sql.append(" ELSE RANDOMNUMBER");
+            sql.append(" END WHERE ID IN ($1");
+            for (int i = 1; i < count; i++) {
+                int k = i * 2 + 1;
+                sql.append(",$").append(k);
+            }
+            sql.append(")");
+            return sql.toString();
+        }
+    }
+}

+ 26 - 66
frameworks/Java/helidon/nima/src/main/java/io/helidon/benchmark/nima/models/PgClientRepository.java

@@ -2,7 +2,6 @@ package io.helidon.benchmark.nima.models;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.logging.Logger;
 
 import io.helidon.config.Config;
@@ -10,25 +9,17 @@ import io.vertx.core.Future;
 import io.vertx.core.Vertx;
 import io.vertx.core.VertxOptions;
 import io.vertx.pgclient.PgConnectOptions;
-import io.vertx.pgclient.PgPool;
-import io.vertx.sqlclient.PoolOptions;
 import io.vertx.sqlclient.PreparedQuery;
 import io.vertx.sqlclient.Row;
 import io.vertx.sqlclient.RowSet;
-import io.vertx.sqlclient.SqlClient;
 import io.vertx.sqlclient.Tuple;
 
 import static io.helidon.benchmark.nima.models.DbRepository.randomWorldNumber;
 
 public class PgClientRepository implements DbRepository {
     private static final Logger LOGGER = Logger.getLogger(PgClientRepository.class.getName());
-    private static final int UPDATE_QUERIES = 500;
 
-    private final SqlClient updatePool;
-
-    private final PreparedQuery<RowSet<Row>> getFortuneQuery;
-    private final PreparedQuery<RowSet<Row>> getWorldQuery;
-    private final PreparedQuery<RowSet<Row>>[] updateWorldSingleQuery;
+    private final PgClientConnectionPool connectionPool;
 
     @SuppressWarnings("unchecked")
     public PgClientRepository(Config config) {
@@ -41,27 +32,16 @@ public class PgClientRepository implements DbRepository {
                 .setUser(config.get("username").asString().orElse("benchmarkdbuser"))
                 .setPassword(config.get("password").asString().orElse("benchmarkdbpass"))
                 .setPipeliningLimit(100000);
-
-        int sqlPoolSize = config.get("sql-pool-size").asInt().orElse(64);
-        PoolOptions clientOptions = new PoolOptions().setMaxSize(sqlPoolSize);
-        LOGGER.info("sql-pool-size is " + sqlPoolSize);
-
-        SqlClient queryPool = PgPool.client(vertx, connectOptions, clientOptions);
-        updatePool = PgPool.client(vertx, connectOptions, clientOptions);
-
-        getWorldQuery = queryPool.preparedQuery("SELECT id, randomnumber FROM world WHERE id = $1");
-        getFortuneQuery = queryPool.preparedQuery("SELECT id, message FROM fortune");
-
-        updateWorldSingleQuery = new PreparedQuery[UPDATE_QUERIES];
-        for (int i = 0; i < UPDATE_QUERIES; i++) {
-            updateWorldSingleQuery[i] = queryPool.preparedQuery(singleUpdate(i + 1));
-        }
+        int sqlPoolSize = config.get("sql-pool-size").asInt().orElse(Runtime.getRuntime().availableProcessors());
+        connectionPool = new PgClientConnectionPool(vertx, sqlPoolSize, connectOptions);
+        connectionPool.connect();
     }
 
     @Override
     public World getWorld(int id) {
         try {
-            return getWorldQuery.execute(Tuple.of(id))
+            PreparedQuery<RowSet<Row>> worldQuery = connectionPool.clientConnection().worldQuery();
+            return worldQuery.execute(Tuple.of(id))
                     .map(rows -> {
                         Row r = rows.iterator().next();
                         return new World(r.getInteger(0), r.getInteger(1));
@@ -74,13 +54,14 @@ public class PgClientRepository implements DbRepository {
     @Override
     public List<World> getWorlds(int count) {
         try {
+            PreparedQuery<RowSet<Row>> worldQuery = connectionPool.clientConnection().worldQuery();
             List<Future<?>> futures = new ArrayList<>();
             for (int i = 0; i < count; i++) {
-                futures.add(getWorldQuery.execute(Tuple.of(randomWorldNumber()))
-                                    .map(rows -> {
-                                        Row r = rows.iterator().next();
-                                        return new World(r.getInteger(0), r.getInteger(1));
-                                    }));
+                futures.add(worldQuery.execute(Tuple.of(randomWorldNumber()))
+                        .map(rows -> {
+                            Row r = rows.iterator().next();
+                            return new World(r.getInteger(0), r.getInteger(1));
+                        }));
             }
             return Future.all(futures).toCompletionStage().toCompletableFuture().get().list();
         } catch (Exception e) {
@@ -92,7 +73,18 @@ public class PgClientRepository implements DbRepository {
     public List<World> updateWorlds(int count) {
         List<World> worlds = getWorlds(count);
         try {
-            return updateWorlds(worlds, count, updatePool);
+            PreparedQuery<RowSet<Row>> updateQuery = connectionPool.clientConnection().updateQuery(count);
+            List<Integer> updateParams = new ArrayList<>(count * 2);
+            for (World world : worlds) {
+                updateParams.add(world.id);
+                world.randomNumber = randomWorldNumber();
+                updateParams.add(world.randomNumber);
+            }
+            return updateQuery.execute(Tuple.wrap(updateParams))
+                    .toCompletionStage()
+                    .thenApply(rows -> worlds)
+                    .toCompletableFuture()
+                    .get();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -101,7 +93,8 @@ public class PgClientRepository implements DbRepository {
     @Override
     public List<Fortune> getFortunes() {
         try {
-            return getFortuneQuery.execute()
+            PreparedQuery<RowSet<Row>> fortuneQuery = connectionPool.clientConnection().fortuneQuery();
+            return fortuneQuery.execute()
                     .map(rows -> {
                         List<Fortune> fortunes = new ArrayList<>(rows.size() + 1);
                         for (Row r : rows) {
@@ -113,37 +106,4 @@ public class PgClientRepository implements DbRepository {
             throw new RuntimeException(e);
         }
     }
-
-    private List<World> updateWorlds(List<World> worlds, int count, SqlClient pool)
-            throws ExecutionException, InterruptedException {
-        int size = worlds.size();
-        List<Integer> updateParams = new ArrayList<>(size * 2);
-        for (World world : worlds) {
-            updateParams.add(world.id);
-            world.randomNumber = randomWorldNumber();
-            updateParams.add(world.randomNumber);
-        }
-        return updateWorldSingleQuery[count - 1].execute(Tuple.wrap(updateParams))
-                .toCompletionStage()
-                .thenApply(rows -> worlds)
-                .toCompletableFuture()
-                .get();
-    }
-
-    private static String singleUpdate(int count) {
-        StringBuilder sql = new StringBuilder();
-        sql.append("UPDATE WORLD SET RANDOMNUMBER = CASE ID");
-        for (int i = 0; i < count; i++) {
-            int k = i * 2 + 1;
-            sql.append(" WHEN $").append(k).append(" THEN $").append(k + 1);
-        }
-        sql.append(" ELSE RANDOMNUMBER");
-        sql.append(" END WHERE ID IN ($1");
-        for (int i = 1; i < count; i++) {
-            int k = i * 2 + 1;
-            sql.append(",$").append(k);
-        }
-        sql.append(")");
-        return sql.toString();
-    }
 }

+ 3 - 0
frameworks/Java/helidon/nima/src/main/java/io/helidon/benchmark/nima/services/FortuneHandler.java

@@ -33,18 +33,21 @@ public class FortuneHandler implements Handler {
         res.header(SERVER);
         res.header(CONTENT_TYPE_HTML);
 
+        // render using template and get list of buffers
         List<Fortune> fortuneList = repository.getFortunes();
         fortuneList.add(ADDITIONAL_FORTUNE);
         Collections.sort(fortuneList);
         ArrayOfByteArraysOutput output = fortunes.template(fortuneList).render(ArrayOfByteArraysOutput.FACTORY);
         List<byte[]> entity = output.getArrays();
 
+        // compute entity length and set header
         int length = 0;
         for (byte[] bytes : entity) {
             length += bytes.length;
         }
         res.header(CONTENT_LENGTH, String.valueOf(length));
 
+        // write entity to output
         try (var out = res.outputStream()) {
             for (byte[] bytes : entity) {
                 out.write(bytes);

+ 0 - 1
frameworks/Java/helidon/nima/src/main/resources/application.yaml

@@ -36,6 +36,5 @@ host: "tfb-database"
 db: "hello_world"
 username: benchmarkdbuser
 password: benchmarkdbpass
-sql-pool-size: 300
 db-repository: "pgclient"     # "pgclient" (default) or "hikari"