Prechádzať zdrojové kódy

[java/vertx] Vert.x 4.5.8 bump + bunch of improvements (#9142)

* Other contenders have clearly demonstrated that update benchmark performs better using a prepared statement with a aggregation of tuples instead of a postgres batch with a list of tuple. This change shamelessly borrows the SQL statement uwebsockets (thank you uwebsockets) with a small tweaks for the datatype.

In other words

UPDATE world SET randomNumber = update_data.randomNumber
  FROM (VALUES ($1::int,$2::int),...)
  AS update_data (id, randomNumber)
  WHERE world.id = update_data.id

is superior (in this context) to a batch of UPDATE world SET randomnumber=$1 WHERE id=$2

* Use a single event-loop per CPU instead of the double.

* Trim whitespace in rocker templates, shamelessly borrowed from Quarkus/Vertx benchmark.

* Use a collecting query for fortunes that maps a row to a fortune instance which allocates a single row instance. This avoids allocating intermedary rows object when we map them to Fortune instances.

Xerox tip: TFB Contenders using vertx-pg-client should copy this change to save some cycles.

* Allocate a JsonArray of the right size for the query benchmark, avoid un-necessary string conversion in the generated JSON.

* Avoid using Jackson databind that is actually not needed

* Split reading/writing to the world table in two separate connections for the update benchmark, the hypothesis is that for the update benchmark first read then write to the World table, even though the write depends on the read, each change has a queue (the pipelined statements) and using two connections adds the operation to a queue with a smaller latency.

This seems to give better results in my benchmark, let's see how this behaves out there.

Xerox tip: TFB Contenders using vertx-pg-client should copy this change to save some cycles.

* Bump Vert.x to 4.5.8 and its deps.
Julien Viet 1 rok pred
rodič
commit
9dffd6a776

+ 5 - 10
frameworks/Java/vertx/pom.xml

@@ -10,10 +10,10 @@
 		<maven.compiler.target>17</maven.compiler.target>
 		<!-- the main class -->
 		<main.class>vertx.App</main.class>
-		<stack.version>4.5.3</stack.version>
-		<jackson.version>2.15.0</jackson.version>
-		<netty.version>4.1.92.Final</netty.version>
-		<netty.io_uring.version>0.0.21.Final</netty.io_uring.version>
+		<stack.version>4.5.8</stack.version>
+		<jackson.version>2.16.1</jackson.version>
+		<netty.version>4.1.110.Final</netty.version>
+		<netty.io_uring.version>0.0.25.Final</netty.io_uring.version>
 	</properties>
 
 	<dependencies>
@@ -32,11 +32,6 @@
 			<artifactId>jackson-core</artifactId>
 			<version>${jackson.version}</version>
 		</dependency>
-		<dependency>
-			<groupId>com.fasterxml.jackson.core</groupId>
-			<artifactId>jackson-databind</artifactId>
-			<version>${jackson.version}</version>
-		</dependency>
 		<dependency>
 			<groupId>io.netty</groupId>
 			<artifactId>netty-transport-native-kqueue</artifactId>
@@ -91,7 +86,7 @@
 							<javaVersion>17</javaVersion>
 							<templateDirectory>${basedir}/src/main/templates</templateDirectory>
 							<outputDirectory>${basedir}/target/generated-sources/rocker</outputDirectory>
-							<discardLogicWhitespace>false</discardLogicWhitespace>
+							<discardLogicWhitespace>true</discardLogicWhitespace>
 							<addAsSources>true</addAsSources>
 							<optimize>true</optimize>
 							<failOnError>true</failOnError>

+ 106 - 61
frameworks/Java/vertx/src/main/java/vertx/App.java

@@ -12,17 +12,11 @@ import io.vertx.core.http.HttpServer;
 import io.vertx.core.http.HttpServerOptions;
 import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.http.HttpServerResponse;
-import io.vertx.core.json.Json;
 import io.vertx.core.json.JsonArray;
 import io.vertx.core.json.JsonObject;
 import io.vertx.core.logging.Logger;
 import io.vertx.core.logging.LoggerFactory;
-import io.vertx.sqlclient.PreparedQuery;
-import io.vertx.sqlclient.PreparedStatement;
-import io.vertx.sqlclient.Row;
-import io.vertx.sqlclient.RowIterator;
-import io.vertx.sqlclient.RowSet;
-import io.vertx.sqlclient.Tuple;
+import io.vertx.sqlclient.*;
 import io.vertx.sqlclient.impl.SqlClientInternal;
 import vertx.model.CachedWorld;
 import vertx.model.Fortune;
@@ -41,7 +35,6 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
@@ -102,7 +95,8 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
   private static final String SELECT_WORLDS = "SELECT id, randomnumber from WORLD";
 
   private HttpServer server;
-  private SqlClientInternal client;
+  private SqlClientInternal client1;
+  private SqlClientInternal client2;
   private CharSequence dateString;
   private CharSequence[] plaintextHeaders;
 
@@ -110,8 +104,10 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
 
   private Throwable databaseErr;
   private PreparedQuery<RowSet<Row>> SELECT_WORLD_QUERY;
-  private PreparedQuery<RowSet<Row>> SELECT_FORTUNE_QUERY;
+  private PreparedQuery<SqlResult<List<Fortune>>> SELECT_FORTUNE_QUERY;
   private PreparedQuery<RowSet<Row>> UPDATE_WORLD_QUERY;
+  @SuppressWarnings("unchecked")
+  private PreparedQuery<RowSet<Row>>[] AGGREGATED_UPDATE_WORLD_QUERY = new PreparedQuery[128];
   private WorldCache WORLD_CACHE;
 
   public static CharSequence createDateHeader() {
@@ -139,28 +135,67 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
     options.setPassword(config.getString("password", "benchmarkdbpass"));
     options.setCachePreparedStatements(true);
     options.setPipeliningLimit(100_000); // Large pipelining means less flushing and we use a single connection anyway
-    PgConnection.connect(vertx, options)
+    Future<?> clientsInit = initClients(options);
+    clientsInit
+            .transform(ar -> {
+              databaseErr = ar.cause();
+              return server.listen(port);
+            })
+            .<Void>mapEmpty()
+            .onComplete(startPromise);
+  }
+
+  private Future<?> initClients(PgConnectOptions options) {
+    Future<?> cf1 = PgConnection.connect(vertx, options)
             .flatMap(conn -> {
-              client = (SqlClientInternal) conn;
+              client1 = (SqlClientInternal) conn;
+              List<Future<?>> list = new ArrayList<>();
               Future<PreparedStatement> f1 = conn.prepare(SELECT_WORLD)
                       .andThen(onSuccess(ps -> SELECT_WORLD_QUERY = ps.query()));
+              list.add(f1);
               Future<PreparedStatement> f2 = conn.prepare(SELECT_FORTUNE)
-                      .andThen(onSuccess(ps -> SELECT_FORTUNE_QUERY = ps.query()));
-              Future<PreparedStatement> f3 = conn.prepare(UPDATE_WORLD)
-                      .andThen(onSuccess(ps -> UPDATE_WORLD_QUERY = ps.query()));
-              Future<WorldCache> f4 = conn.preparedQuery(SELECT_WORLDS)
+                      .andThen(onSuccess(ps -> {
+                        SELECT_FORTUNE_QUERY = ps.query().
+                                collecting(Collectors.mapping(row -> new Fortune(row.getInteger(0), row.getString(1)), Collectors.toList()));
+                      }));
+              list.add(f2);
+              Future<WorldCache> f3 = conn.preparedQuery(SELECT_WORLDS)
                       .collecting(Collectors.mapping(row -> new CachedWorld(row.getInteger(0), row.getInteger(1)), Collectors.toList()))
                       .execute()
                       .map(worlds -> new WorldCache(worlds.value()))
                       .andThen(onSuccess(wc -> WORLD_CACHE = wc));
-              return CompositeFuture.join(f1, f2, f3, f4);
-            })
-            .transform(ar -> {
-              databaseErr = ar.cause();
-              return server.listen(port);
-            })
-            .<Void>mapEmpty()
-            .onComplete(startPromise);
+              list.add(f3);
+              return Future.join(list);
+            });
+    Future<?> cf2 = PgConnection.connect(vertx, options)
+            .flatMap(conn -> {
+              client2 = (SqlClientInternal) conn;
+              List<Future<?>> list = new ArrayList<>();
+              Future<PreparedStatement> f1 = conn.prepare(UPDATE_WORLD)
+                      .andThen(onSuccess(ps -> UPDATE_WORLD_QUERY = ps.query()));
+              list.add(f1);
+              for (int i = 0; i < AGGREGATED_UPDATE_WORLD_QUERY.length; i++) {
+                int idx = i;
+                Future<PreparedStatement> fut = conn
+                        .prepare(buildAggregatedUpdateQuery(1 + idx))
+                        .andThen(onSuccess(ps -> AGGREGATED_UPDATE_WORLD_QUERY[idx] = ps.query()));
+                list.add(fut);
+              }
+              return Future.join(list);
+            });
+    return Future.join(cf1, cf2);
+  }
+
+  private static String buildAggregatedUpdateQuery(int len) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("UPDATE world SET randomNumber = update_data.randomNumber FROM (VALUES");
+    char sep = ' ';
+    for (int i = 1;i <= len;i++) {
+      sb.append(sep).append("($").append(2 * i - 1).append("::int,$").append(2 * i).append("::int)");
+      sep = ',';
+    }
+    sb.append(") AS update_data (id, randomNumber) WHERE  world.id = update_data.id");
+    return sb.toString();
   }
 
   private static <T> Handler<AsyncResult<T>> onSuccess(Handler<T> handler) {
@@ -259,7 +294,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
             .putHeader(HttpHeaders.SERVER, SERVER)
             .putHeader(HttpHeaders.DATE, dateString)
             .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
-            .end(Json.encode(new World(row.getInteger(0), row.getInteger(1))), NULL_HANDLER);
+            .end(new World(row.getInteger(0), row.getInteger(1)).toBuffer(), NULL_HANDLER);
       } else {
         sendError(req, res.cause());
       }
@@ -269,19 +304,22 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
   class Queries implements Handler<AsyncResult<RowSet<Row>>> {
 
     boolean failed;
-    JsonArray worlds = new JsonArray();
+    final JsonArray worlds;
     final HttpServerRequest req;
     final HttpServerResponse resp;
     final int queries;
 
     public Queries(HttpServerRequest req) {
+      int queries = getQueries(req);
+
       this.req = req;
       this.resp = req.response();
-      this.queries = getQueries(req);
+      this.queries = queries;
+      this.worlds = new JsonArray(new ArrayList<>(queries));
     }
 
     private void handle() {
-      client.group(c -> {
+      client1.group(c -> {
         for (int i = 0; i < queries; i++) {
           c.preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld()), this);
         }
@@ -299,7 +337,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
 
         // we need a final reference
         final Tuple row = ar.result().iterator().next();
-        worlds.add(new JsonObject().put("id", "" + row.getInteger(0)).put("randomNumber", "" + row.getInteger(1)));
+        worlds.add(new JsonObject().put("id", row.getInteger(0)).put("randomNumber", row.getInteger(1)));
 
         // stop condition
         if (worlds.size() == queries) {
@@ -307,7 +345,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
               .putHeader(HttpHeaders.SERVER, SERVER)
               .putHeader(HttpHeaders.DATE, dateString)
               .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
-              .end(worlds.encode(), NULL_HANDLER);
+              .end(worlds.toBuffer(), NULL_HANDLER);
         }
       }
     }
@@ -328,7 +366,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
 
     private void handle() {
 
-      client.group(c -> {
+      client1.group(c -> {
         PreparedQuery<RowSet<Row>> preparedQuery = c.preparedQuery(SELECT_WORLD);
         for (int i = 0; i < worlds.length; i++) {
           int id = randomWorld();
@@ -352,42 +390,53 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
 
     void handleUpdates() {
       Arrays.sort(worlds);
-      List<Tuple> batch = new ArrayList<>();
-      for (World world : worlds) {
-        batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
-      }
-      UPDATE_WORLD_QUERY.executeBatch(batch, ar2 -> {
-        if (ar2.failed()) {
-          sendError(req, ar2.cause());
-          return;
+      int len = worlds.length;
+      if (0 < len && len <= AGGREGATED_UPDATE_WORLD_QUERY.length) {
+        List<Object> arguments = new ArrayList<>();
+        for (World world : worlds) {
+            arguments.add(world.getId());
+            arguments.add(world.getRandomNumber());
         }
-        JsonArray json = new JsonArray();
+        Tuple tuple = Tuple.tuple(arguments);
+        PreparedQuery<RowSet<Row>> query = AGGREGATED_UPDATE_WORLD_QUERY[len - 1];
+        query.execute(tuple, this::sendResponse);
+      } else {
+        List<Tuple> batch = new ArrayList<>();
         for (World world : worlds) {
-          json.add(new JsonObject().put("id", "" + world.getId()).put("randomNumber", "" + world.getRandomNumber()));
+          batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
         }
-        req.response()
-            .putHeader(HttpHeaders.SERVER, SERVER)
-            .putHeader(HttpHeaders.DATE, dateString)
-            .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
-            .end(json.toBuffer(), NULL_HANDLER);
-      });
+        UPDATE_WORLD_QUERY.executeBatch(batch, this::sendResponse);
+      }
     }
+
+    private void sendResponse(AsyncResult<?> res) {
+      if (res.failed()) {
+        sendError(req, res.cause());
+        return;
+      }
+      JsonArray json = new JsonArray();
+      for (World world : worlds) {
+        json.add(world);
+      }
+      req.response()
+              .putHeader(HttpHeaders.SERVER, SERVER)
+              .putHeader(HttpHeaders.DATE, dateString)
+              .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
+              .end(json.toBuffer(), NULL_HANDLER);
+    }
+
   }
 
   private void handleFortunes(HttpServerRequest req) {
     SELECT_FORTUNE_QUERY.execute(ar -> {
       HttpServerResponse response = req.response();
       if (ar.succeeded()) {
-        List<Fortune> fortunes = new ArrayList<>();
-        RowIterator<Row> resultSet = ar.result().iterator();
-        if (!resultSet.hasNext()) {
+        SqlResult<List<Fortune>> result = ar.result();
+        if (result.size() == 0) {
           response.setStatusCode(404).end("No results");
           return;
         }
-        while (resultSet.hasNext()) {
-          Row row = resultSet.next();
-          fortunes.add(new Fortune(row.getInteger(0), row.getString(1)));
-        }
+        List<Fortune> fortunes = result.value();
         fortunes.add(new Fortune(0, "Additional fortune added at request time."));
         Collections.sort(fortunes);
         response
@@ -412,12 +461,8 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
     }
     count = Math.max(1, count);
     count = Math.min(500, count);
-    CachedWorld[] worlds = WORLD_CACHE.getCachedWorld(count);
-    JsonArray json = new JsonArray(new ArrayList<>(count));
-    for (int i = 0;i < count;i++) {
-      CachedWorld world = worlds[i];
-      json.add(JsonObject.of("id", world.getId(), "randomNumber", world.getRandomNumber()));
-    }
+    List<CachedWorld> worlds = WORLD_CACHE.getCachedWorld(count);
+    JsonArray json = new JsonArray(worlds);
     HttpServerResponse response = req.response();
     MultiMap headers = response.headers();
     headers
@@ -429,7 +474,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
 
   public static void main(String[] args) throws Exception {
 
-    int eventLoopPoolSize = VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE;
+    int eventLoopPoolSize = Runtime.getRuntime().availableProcessors();
     String sizeProp = System.getProperty("vertx.eventLoopPoolSize");
     if (sizeProp != null) {
       try {

+ 6 - 1
frameworks/Java/vertx/src/main/java/vertx/model/CachedWorld.java

@@ -1,9 +1,13 @@
 package vertx.model;
 
+import io.vertx.core.json.JsonObject;
+
+import java.util.Map;
+
 /**
  * The model for the "world" database table.
  */
-public final class CachedWorld implements Comparable<CachedWorld> {
+public final class CachedWorld extends JsonObject implements Comparable<CachedWorld> {
 
   private final int id;
   private final int randomNumber;
@@ -15,6 +19,7 @@ public final class CachedWorld implements Comparable<CachedWorld> {
    * @param randomNumber the random number of the world
    */
   public CachedWorld(int id, int randomNumber) {
+    super(Map.of("id", id, "randomNumber", randomNumber));
     this.id = id;
     this.randomNumber = randomNumber;
   }

+ 6 - 1
frameworks/Java/vertx/src/main/java/vertx/model/World.java

@@ -1,9 +1,13 @@
 package vertx.model;
 
+import io.vertx.core.json.JsonObject;
+
+import java.util.Map;
+
 /**
  * The model for the "world" database table.
  */
-public final class World implements Comparable<World> {
+public final class World extends JsonObject implements Comparable<World> {
 
   private final int id;
   private final int randomNumber;
@@ -15,6 +19,7 @@ public final class World implements Comparable<World> {
    * @param randomNumber the random number of the world
    */
   public World(int id, int randomNumber) {
+    super(Map.of("id", id, "randomNumber", randomNumber));
     this.id = id;
     this.randomNumber = randomNumber;
   }

+ 4 - 3
frameworks/Java/vertx/src/main/java/vertx/model/WorldCache.java

@@ -3,6 +3,7 @@ package vertx.model;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -19,12 +20,12 @@ public class WorldCache {
         this.cache = cache;
     }
 
-    public CachedWorld[] getCachedWorld(int count) {
-        CachedWorld[] ret = new CachedWorld[count];
+    public List<CachedWorld> getCachedWorld(int count) {
+        List<CachedWorld> ret = new ArrayList<>(count);
         ThreadLocalRandom current = ThreadLocalRandom.current();
         for (int i = 0;i < count;i++) {
             Integer key = Integer.valueOf(current.nextInt(1000));
-            ret[i] = cache.getIfPresent(key);
+            ret.add(cache.getIfPresent(key));
         }
         return ret;
     }

+ 5 - 15
frameworks/Java/vertx/src/main/templates/vertx/FortunesTemplate.rocker.html

@@ -1,18 +1,8 @@
 @import vertx.model.Fortune
 @import java.util.List
 @args(List<Fortune> fortunes)
-<!DOCTYPE html>
-<html>
-<head><title>Fortunes</title></head>
-<body>
-<table>
-  <tr>
-    <th>id</th>
-    <th>message</th>
-  </tr> @for ((ForIterator i, Fortune fortune) : fortunes) {
-  <tr>
-    <td>@fortune.getId()</td>
-    <td>@fortune.getMessage()</td>
-  </tr> } </table>
-</body>
-</html>
+<!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>
+@for ((ForIterator i, Fortune fortune) : fortunes) {
+<tr><td>@fortune.getId()</td><td>@fortune.getMessage()</td></tr>
+}
+</table></body></html>