Browse Source

Use `PgConnection` instead of `PgPool` in the portions "vert-web" and "vert-web-kotlin-coroutines" to improve performance (#7933)

Shreck Ye 2 years ago
parent
commit
3e94d08a4a

+ 80 - 72
frameworks/Java/vertx-web/src/main/java/io/vertx/benchmark/App.java

@@ -13,7 +13,7 @@ import io.vertx.ext.web.Router;
 import io.vertx.ext.web.RoutingContext;
 import io.vertx.ext.web.RoutingContext;
 import io.vertx.ext.web.templ.rocker.RockerTemplateEngine;
 import io.vertx.ext.web.templ.rocker.RockerTemplateEngine;
 import io.vertx.pgclient.PgConnectOptions;
 import io.vertx.pgclient.PgConnectOptions;
-import io.vertx.pgclient.PgPool;
+import io.vertx.pgclient.PgConnection;
 import io.vertx.sqlclient.*;
 import io.vertx.sqlclient.*;
 
 
 import java.time.ZonedDateTime;
 import java.time.ZonedDateTime;
@@ -29,6 +29,22 @@ public class App extends AbstractVerticle {
     DatabindCodec.prettyMapper().registerModule(new BlackbirdModule());
     DatabindCodec.prettyMapper().registerModule(new BlackbirdModule());
   }
   }
 
 
+  // TODO: this function can be moved into `PgClientBenchmark`, made static, and renamed when static declarations in inner classes are supported (when the JDK is upgraded to 16 or above).
+  private void createPgClientBenchmarkAsync(Vertx vertx, JsonObject config, Handler<PgClientBenchmark> onComplete) {
+    PgConnectOptions options = new PgConnectOptions()
+        .setCachePreparedStatements(true)
+        .setHost(config.getString("host"))
+        .setPort(config.getInteger("port", 5432))
+        .setUser(config.getString("username"))
+        .setPassword(config.getString("password"))
+        .setDatabase(config.getString("database"))
+        .setPipeliningLimit(100_000); // Large pipelining means less flushing and we use a single connection anyway;
+
+    PgConnection.connect(vertx, options).onComplete(ar ->
+      onComplete.handle(new PgClientBenchmark(ar.result(), RockerTemplateEngine.create()))
+    );
+  }
+
   /**
   /**
    * PgClient implementation
    * PgClient implementation
    */
    */
@@ -38,23 +54,14 @@ public class App extends AbstractVerticle {
     private static final String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
     private static final String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
     private static final String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
     private static final String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
 
 
-    private final PgPool client;
+    private final PgConnection client;
 
 
     // In order to use a template we first need to create an engine
     // In order to use a template we first need to create an engine
     private final RockerTemplateEngine engine;
     private final RockerTemplateEngine engine;
 
 
-    public PgClientBenchmark(Vertx vertx, JsonObject config) {
-      PgConnectOptions options = new PgConnectOptions()
-        .setCachePreparedStatements(true)
-        .setHost(config.getString("host"))
-        .setPort(config.getInteger("port", 5432))
-        .setUser(config.getString("username"))
-        .setPassword(config.getString("password"))
-        .setDatabase(config.getString("database"))
-        .setPipeliningLimit(100_000); // Large pipelining means less flushing and we use a single connection anyway;
-
-      client = PgPool.pool(vertx, options, new PoolOptions().setMaxSize(4));
-      this.engine = RockerTemplateEngine.create();
+    private PgClientBenchmark(PgConnection client, RockerTemplateEngine engine) {
+      this.client = client;
+      this.engine = engine;
     }
     }
 
 
     public void dbHandler(final RoutingContext ctx) {
     public void dbHandler(final RoutingContext ctx) {
@@ -206,72 +213,73 @@ public class App extends AbstractVerticle {
   private String date;
   private String date;
 
 
   @Override
   @Override
-  public void start() {
+  public void start(Promise<Void> startPromise) {
     final Router app = Router.router(vertx);
     final Router app = Router.router(vertx);
     // initialize the date header
     // initialize the date header
     date = DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now());
     date = DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now());
     // refresh the value as a periodic task
     // refresh the value as a periodic task
     vertx.setPeriodic(1000, handler -> date = DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now()));
     vertx.setPeriodic(1000, handler -> date = DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now()));
 
 
-    final PgClientBenchmark pgClientBenchmark= new PgClientBenchmark(vertx, config());
-
-    /*
-     * This test exercises the framework fundamentals including keep-alive support, request routing, request header
-     * parsing, object instantiation, JSON serialization, response header generation, and request count throughput.
-     */
-    app.get("/json").handler(ctx -> {
-      ctx.response()
-          .putHeader(HttpHeaders.SERVER, SERVER)
-          .putHeader(HttpHeaders.DATE, date)
-          .putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
-          .end(Json.encodeToBuffer(new Message("Hello, World!")));
-    });
+    createPgClientBenchmarkAsync(vertx, config(), pgClientBenchmark -> {
+      /*
+       * This test exercises the framework fundamentals including keep-alive support, request routing, request header
+       * parsing, object instantiation, JSON serialization, response header generation, and request count throughput.
+       */
+      app.get("/json").handler(ctx -> {
+        ctx.response()
+            .putHeader(HttpHeaders.SERVER, SERVER)
+            .putHeader(HttpHeaders.DATE, date)
+            .putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
+            .end(Json.encodeToBuffer(new Message("Hello, World!")));
+      });
 
 
-    /*
-     * This test exercises the framework's object-relational mapper (ORM), random number generator, database driver,
-     * and database connection pool.
-     */
-    app.get("/db").handler(pgClientBenchmark::dbHandler);
-
-    /*
-     * This test is a variation of Test #2 and also uses the World table. Multiple rows are fetched to more dramatically
-     * punish the database driver and connection pool. At the highest queries-per-request tested (20), this test
-     * demonstrates all frameworks' convergence toward zero requests-per-second as database activity increases.
-     */
-    app.get("/queries").handler(pgClientBenchmark::queriesHandler);
-
-    /*
-     * This test exercises the ORM, database connectivity, dynamic-size collections, sorting, server-side templates,
-     * XSS countermeasures, and character encoding.
-     */
-    app.get("/fortunes").handler(pgClientBenchmark::fortunesHandler);
-
-    /*
-     * This test is a variation of Test #3 that exercises the ORM's persistence of objects and the database driver's
-     * performance at running UPDATE statements or similar. The spirit of this test is to exercise a variable number of
-     * read-then-write style database operations.
-     */
-    app.route("/update").handler(pgClientBenchmark::updateHandler);
-
-    /*
-     * This test is an exercise of the request-routing fundamentals only, designed to demonstrate the capacity of
-     * high-performance platforms in particular. Requests will be sent using HTTP pipelining. The response payload is
-     * still small, meaning good performance is still necessary in order to saturate the gigabit Ethernet of the test
-     * environment.
-     */
-    app.get("/plaintext").handler(ctx -> {
-      ctx.response()
-          .putHeader(HttpHeaders.SERVER, SERVER)
-          .putHeader(HttpHeaders.DATE, date)
-          .putHeader(HttpHeaders.CONTENT_TYPE, "text/plain")
-          .end("Hello, World!");
-    });
+      /*
+       * This test exercises the framework's object-relational mapper (ORM), random number generator, database driver,
+       * and database connection pool.
+       */
+      app.get("/db").handler(pgClientBenchmark::dbHandler);
+
+      /*
+       * This test is a variation of Test #2 and also uses the World table. Multiple rows are fetched to more dramatically
+       * punish the database driver and connection pool. At the highest queries-per-request tested (20), this test
+       * demonstrates all frameworks' convergence toward zero requests-per-second as database activity increases.
+       */
+      app.get("/queries").handler(pgClientBenchmark::queriesHandler);
+
+      /*
+       * This test exercises the ORM, database connectivity, dynamic-size collections, sorting, server-side templates,
+       * XSS countermeasures, and character encoding.
+       */
+      app.get("/fortunes").handler(pgClientBenchmark::fortunesHandler);
+
+      /*
+       * This test is a variation of Test #3 that exercises the ORM's persistence of objects and the database driver's
+       * performance at running UPDATE statements or similar. The spirit of this test is to exercise a variable number of
+       * read-then-write style database operations.
+       */
+      app.route("/update").handler(pgClientBenchmark::updateHandler);
+
+      /*
+       * This test is an exercise of the request-routing fundamentals only, designed to demonstrate the capacity of
+       * high-performance platforms in particular. Requests will be sent using HTTP pipelining. The response payload is
+       * still small, meaning good performance is still necessary in order to saturate the gigabit Ethernet of the test
+       * environment.
+       */
+      app.get("/plaintext").handler(ctx -> {
+        ctx.response()
+            .putHeader(HttpHeaders.SERVER, SERVER)
+            .putHeader(HttpHeaders.DATE, date)
+            .putHeader(HttpHeaders.CONTENT_TYPE, "text/plain")
+            .end("Hello, World!");
+      });
 
 
-    vertx.createHttpServer().requestHandler(app).listen(8080, listen -> {
-      if (listen.failed()) {
-        listen.cause().printStackTrace();
-        System.exit(1);
-      }
+      vertx.createHttpServer().requestHandler(app).listen(8080, listen -> {
+        if (listen.failed()) {
+          listen.cause().printStackTrace();
+          System.exit(1);
+        }
+        startPromise.complete();
+      });
     });
     });
   }
   }
 }
 }

+ 33 - 28
frameworks/Kotlin/vertx-web-kotlin-coroutines/src/main/kotlin/io/vertx/benchmark/App.kt

@@ -16,13 +16,13 @@ import io.vertx.ext.web.templ.rocker.RockerTemplateEngine
 import io.vertx.kotlin.coroutines.CoroutineVerticle
 import io.vertx.kotlin.coroutines.CoroutineVerticle
 import io.vertx.kotlin.coroutines.await
 import io.vertx.kotlin.coroutines.await
 import io.vertx.kotlin.pgclient.pgConnectOptionsOf
 import io.vertx.kotlin.pgclient.pgConnectOptionsOf
-import io.vertx.kotlin.sqlclient.poolOptionsOf
-import io.vertx.pgclient.PgPool
+import io.vertx.pgclient.PgConnection
 import io.vertx.sqlclient.Tuple
 import io.vertx.sqlclient.Tuple
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.Dispatchers
 import kotlinx.coroutines.async
 import kotlinx.coroutines.async
 import kotlinx.coroutines.awaitAll
 import kotlinx.coroutines.awaitAll
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.launch
+import java.net.UnknownHostException
 import java.time.ZonedDateTime
 import java.time.ZonedDateTime
 import java.time.format.DateTimeFormatter
 import java.time.format.DateTimeFormatter
 import kotlin.system.exitProcess
 import kotlin.system.exitProcess
@@ -55,35 +55,40 @@ class App : CoroutineVerticle() {
     inline fun Route.checkedCoroutineHandlerUnconfined(crossinline requestHandler: suspend (RoutingContext) -> Unit): Route =
     inline fun Route.checkedCoroutineHandlerUnconfined(crossinline requestHandler: suspend (RoutingContext) -> Unit): Route =
         coroutineHandlerUnconfined { ctx -> ctx.checkedRun { requestHandler(ctx) } }
         coroutineHandlerUnconfined { ctx -> ctx.checkedRun { requestHandler(ctx) } }
 
 
+    suspend fun PgClientBenchmark(vertx: Vertx, config: JsonObject): PgClientBenchmark {
+        val options = with(config) {
+            pgConnectOptionsOf(
+                cachePreparedStatements = true,
+                host = getString("host"),
+                port = getInteger("port", 5432),
+                user = getString("username"),
+                password = getString("password"),
+                database = config.getString("database"),
+                pipeliningLimit = 100000 // Large pipelining means less flushing and we use a single connection anyway;
+            )
+        }
+
+        return PgClientBenchmark(
+            try {
+                PgConnection.connect(vertx, options).await()
+            } catch (e: UnknownHostException) {
+                null
+            },
+            RockerTemplateEngine.create()
+        )
+    }
+
     /**
     /**
      * PgClient implementation
      * PgClient implementation
      */
      */
-    private inner class PgClientBenchmark(vertx: Vertx, config: JsonObject) {
-        private val client: PgPool
-
+    inner class PgClientBenchmark(
+        private val client: PgConnection?,
         // In order to use a template we first need to create an engine
         // In order to use a template we first need to create an engine
         private val engine: RockerTemplateEngine
         private val engine: RockerTemplateEngine
-
-        init {
-            val options = with(config) {
-                pgConnectOptionsOf(
-                    cachePreparedStatements = true,
-                    host = getString("host"),
-                    port = getInteger("port", 5432),
-                    user = getString("username"),
-                    password = getString("password"),
-                    database = config.getString("database"),
-                    pipeliningLimit = 100000 // Large pipelining means less flushing and we use a single connection anyway;
-                )
-            }
-
-            client = PgPool.pool(vertx, options, poolOptionsOf(maxSize = 4))
-            engine = RockerTemplateEngine.create()
-        }
-
+    ) {
         suspend fun dbHandler(ctx: RoutingContext) {
         suspend fun dbHandler(ctx: RoutingContext) {
             val result = try {
             val result = try {
-                client
+                client!!
                     .preparedQuery(SELECT_WORLD)
                     .preparedQuery(SELECT_WORLD)
                     .execute(Tuple.of(randomWorld()))
                     .execute(Tuple.of(randomWorld()))
                     .await()
                     .await()
@@ -117,7 +122,7 @@ class App : CoroutineVerticle() {
             val cnt = intArrayOf(0)
             val cnt = intArrayOf(0)
             List(queries) {
             List(queries) {
                 async {
                 async {
-                    val result = `try` { client.preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld())).await() }
+                    val result = `try` { client!!.preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld())).await() }
 
 
                     if (!failed[0]) {
                     if (!failed[0]) {
                         if (result is Try.Failure) {
                         if (result is Try.Failure) {
@@ -146,7 +151,7 @@ class App : CoroutineVerticle() {
         }
         }
 
 
         suspend fun fortunesHandler(ctx: RoutingContext) {
         suspend fun fortunesHandler(ctx: RoutingContext) {
-            val result = client.preparedQuery(SELECT_FORTUNE).execute().await()
+            val result = client!!.preparedQuery(SELECT_FORTUNE).execute().await()
 
 
             val resultSet = result.iterator()
             val resultSet = result.iterator()
             if (!resultSet.hasNext()) {
             if (!resultSet.hasNext()) {
@@ -180,7 +185,7 @@ class App : CoroutineVerticle() {
             List(worlds.size) {
             List(worlds.size) {
                 val id = randomWorld()
                 val id = randomWorld()
                 async {
                 async {
-                    val r2 = `try` { client.preparedQuery(SELECT_WORLD).execute(Tuple.of(id)).await() }
+                    val r2 = `try` { client!!.preparedQuery(SELECT_WORLD).execute(Tuple.of(id)).await() }
 
 
                     if (!failed[0]) {
                     if (!failed[0]) {
                         if (r2 is Try.Failure) {
                         if (r2 is Try.Failure) {
@@ -198,7 +203,7 @@ class App : CoroutineVerticle() {
                                 batch.add(Tuple.of(world.randomNumber, world.id))
                                 batch.add(Tuple.of(world.randomNumber, world.id))
                             }
                             }
                             ctx.checkedRun {
                             ctx.checkedRun {
-                                client.preparedQuery(UPDATE_WORLD)
+                                client!!.preparedQuery(UPDATE_WORLD)
                                     .executeBatch(batch)
                                     .executeBatch(batch)
                                     .await()
                                     .await()
                                 ctx.response()
                                 ctx.response()