Browse Source

jooby: upgrade 3.2.4 (#9156)

- get back threadlocal buffer for fortunes
- implements vertx like sql updates
Edgar Espina 1 year ago
parent
commit
527c38d3dc

+ 2 - 2
frameworks/Java/jooby/pom.xml

@@ -11,8 +11,8 @@
   <name>jooby</name>
 
   <properties>
-    <jooby.version>3.1.2</jooby.version>
-    <netty.version>4.1.110.Final</netty.version>
+    <jooby.version>3.2.4</jooby.version>
+    <netty.version>4.1.111.Final</netty.version>
     <dsl-json.version>2.0.2</dsl-json.version>
     <postgresql.version>42.7.3</postgresql.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

+ 2 - 7
frameworks/Java/jooby/src/main/java/com/techempower/App.java

@@ -28,13 +28,8 @@ public class App extends Jooby {
 
   private static final byte[] MESSAGE_BYTES = MESSAGE.getBytes(StandardCharsets.US_ASCII);
 
-  private static final ByteBuffer MESSAGE_BUFFER = (ByteBuffer) ByteBuffer
-      .allocateDirect(MESSAGE_BYTES.length)
-      .put(MESSAGE_BYTES)
-      .flip();
-
   {
-
+    var bufferFactory = getBufferFactory();
     /** Database: */
     install(new HikariModule());
     DataSource ds = require(DataSource.class);
@@ -43,7 +38,7 @@ public class App extends Jooby {
     install(new RockerModule());
 
     get("/plaintext", ctx ->
-        ctx.send(MESSAGE_BUFFER.duplicate())
+        ctx.send(bufferFactory.wrap(MESSAGE_BYTES))
     );
 
     get("/json", ctx -> ctx

+ 61 - 14
frameworks/Java/jooby/src/main/java/com/techempower/PgClient.java

@@ -1,5 +1,7 @@
 package com.techempower;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.function.BiConsumer;
@@ -32,10 +34,13 @@ public class PgClient {
   private static final String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
 
   private static class DbConnection {
+    private SqlClientInternal queries;
     private PreparedQuery<RowSet<Row>> SELECT_WORLD_QUERY;
     private PreparedQuery<RowSet<Row>> SELECT_FORTUNE_QUERY;
     private PreparedQuery<RowSet<Row>> UPDATE_WORLD_QUERY;
-    private SqlClientInternal connection;
+    private SqlClientInternal updates;
+    @SuppressWarnings("unchecked")
+    private PreparedQuery<RowSet<Row>>[] AGGREGATED_UPDATE_WORLD_QUERY = new PreparedQuery[128];
   }
 
   private static class DbConnectionFactory extends ThreadLocal<DbConnection> {
@@ -64,20 +69,32 @@ public class PgClient {
                 .setWorkerPoolSize(1)
                 .setInternalBlockingPoolSize(1)
         );
-        var future = PgConnection.connect(vertx, options)
+        var client1 = PgConnection.connect(vertx, options)
             .flatMap(conn -> {
-              result.connection = (SqlClientInternal) conn;
+              result.queries = (SqlClientInternal) conn;
               Future<PreparedStatement> f1 = conn.prepare(SELECT_WORLD)
                   .andThen(onSuccess(ps -> result.SELECT_WORLD_QUERY = ps.query()));
               Future<PreparedStatement> f2 = conn.prepare(SELECT_FORTUNE)
                   .andThen(onSuccess(ps -> result.SELECT_FORTUNE_QUERY = ps.query()));
-              Future<PreparedStatement> f3 = conn.prepare(UPDATE_WORLD)
-                  .andThen(onSuccess(ps -> result.UPDATE_WORLD_QUERY = ps.query()));
-              return Future.join(f1, f2, f3);
-            })
-            .toCompletionStage()
-            .toCompletableFuture()
-            .get();
+              return Future.join(f1, f2);
+            });
+
+        var client2 = PgConnection.connect(vertx, options)
+                .flatMap(conn -> {
+                  result.updates = (SqlClientInternal) conn;
+                  List<Future<?>> list = new ArrayList<>();
+                  Future<PreparedStatement> f1 = conn.prepare(UPDATE_WORLD)
+                          .andThen(onSuccess(ps -> result.UPDATE_WORLD_QUERY = ps.query()));
+                  list.add(f1);
+                  for (int i = 0; i < result.AGGREGATED_UPDATE_WORLD_QUERY.length; i++) {
+                    int idx = i;
+                    list.add(conn
+                            .prepare(buildAggregatedUpdateQuery(1 + idx))
+                            .andThen(onSuccess(ps -> result.AGGREGATED_UPDATE_WORLD_QUERY[idx] = ps.query())));
+                  }
+                  return Future.join(list);
+                });
+        var future = Future.join(client1, client2).toCompletionStage().toCompletableFuture().get();
 
         Throwable cause = future.cause();
         if (cause != null) {
@@ -91,6 +108,18 @@ public class PgClient {
         throw SneakyThrows.propagate(ex.getCause());
       }
     }
+
+    private static String buildAggregatedUpdateQuery(int len) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("UPDATE world SET randomNumber = update_data.randomNumber FROM (VALUES");
+      char sep = ' ';
+      for (int i = 1;i <= len;i++) {
+        sb.append(sep).append("($").append(2 * i - 1).append("::int,$").append(2 * i).append("::int)");
+        sep = ',';
+      }
+      sb.append(") AS update_data (id, randomNumber) WHERE  world.id = update_data.id");
+      return sb.toString();
+    }
   }
 
   private final ThreadLocal<DbConnection> sqlClient;
@@ -104,7 +133,7 @@ public class PgClient {
   }
 
   public void selectWorlds(int queries, Handler<AsyncResult<RowSet<Row>>> handler) {
-    this.sqlClient.get().connection.group(c -> {
+    this.sqlClient.get().queries.group(c -> {
       for (int i = 0; i < queries; i++) {
         c.preparedQuery(SELECT_WORLD).execute(Tuple.of(Util.randomWorld()), handler);
       }
@@ -117,7 +146,7 @@ public class PgClient {
 
   public void selectWorldForUpdate(int queries,
       BiConsumer<Integer, PreparedQuery<RowSet<Row>>> consumer) {
-    this.sqlClient.get().connection.group(c -> {
+    this.sqlClient.get().queries.group(c -> {
       PreparedQuery<RowSet<Row>> statement = c.preparedQuery(SELECT_WORLD);
       for (int i = 0; i < queries; i++) {
         consumer.accept(i, statement);
@@ -125,8 +154,26 @@ public class PgClient {
     });
   }
 
-  public void updateWorld(List<Tuple> batch, Handler<AsyncResult<RowSet<Row>>> handler) {
-    this.sqlClient.get().UPDATE_WORLD_QUERY.executeBatch(batch, handler);
+  public void updateWorld(World[] worlds, Handler<AsyncResult<RowSet<Row>>> handler) {
+    Arrays.sort(worlds);
+    int len = worlds.length;
+    var connection = this.sqlClient.get();
+    if (0 < len && len <= connection.AGGREGATED_UPDATE_WORLD_QUERY.length) {
+      List<Object> arguments = new ArrayList<>();
+      for (World world : worlds) {
+        arguments.add(world.getId());
+        arguments.add(world.getRandomNumber());
+      }
+      Tuple tuple = Tuple.tuple(arguments);
+      PreparedQuery<RowSet<Row>> query = connection.AGGREGATED_UPDATE_WORLD_QUERY[len - 1];
+      query.execute(tuple, handler);
+    } else {
+      List<Tuple> batch = new ArrayList<>();
+      for (World world : worlds) {
+        batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
+      }
+      connection.UPDATE_WORLD_QUERY.executeBatch(batch, handler);
+    }
   }
 
   private PgConnectOptions pgPoolOptions(Config config) {

+ 4 - 14
frameworks/Java/jooby/src/main/java/com/techempower/ReactivePg.java

@@ -7,10 +7,8 @@ import static io.jooby.MediaType.JSON;
 import java.util.*;
 
 import com.fizzed.rocker.RockerOutputFactory;
-import io.jooby.Context;
-import io.jooby.Jooby;
-import io.jooby.MediaType;
-import io.jooby.ServerOptions;
+import com.techempower.rocker.BufferRockerOutput;
+import io.jooby.*;
 import io.jooby.rocker.DataBufferOutput;
 import io.jooby.rocker.RockerModule;
 import io.vertx.sqlclient.Row;
@@ -18,7 +16,6 @@ import io.vertx.sqlclient.RowIterator;
 import io.vertx.sqlclient.Tuple;
 
 public class ReactivePg extends Jooby {
-
   {
     /** Reduce the number of resources due we do reactive processing. */
     setServerOptions(
@@ -84,14 +81,7 @@ public class ReactivePg extends Jooby {
               selectCallback.result().iterator().next().getInteger(0),
               randomWorld());
           if (index == queries - 1) {
-            // Sort results... avoid dead locks
-            Arrays.sort(result);
-            List<Tuple> batch = new ArrayList<>(queries);
-            for (World world : result) {
-              batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
-            }
-
-            client.updateWorld(batch, updateCallback -> {
+            client.updateWorld(result, updateCallback -> {
               if (updateCallback.failed()) {
                 sendError(ctx, updateCallback.cause());
               } else {
@@ -106,7 +96,7 @@ public class ReactivePg extends Jooby {
     }).setNonBlocking(true);
 
     /** Fortunes: */
-    RockerOutputFactory<DataBufferOutput> factory = require(RockerOutputFactory.class);
+    var factory = BufferRockerOutput.factory();
     get("/fortunes", ctx -> {
       client.fortunes(rsp -> {
         if (rsp.succeeded()) {

+ 65 - 0
frameworks/Java/jooby/src/main/java/com/techempower/rocker/BufferRockerOutput.java

@@ -0,0 +1,65 @@
+package com.techempower.rocker;
+
+import com.fizzed.rocker.ContentType;
+import com.fizzed.rocker.RockerOutput;
+import com.fizzed.rocker.RockerOutputFactory;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+public class BufferRockerOutput implements RockerOutput<BufferRockerOutput> {
+    private final ByteBuffer buffer;
+
+    public BufferRockerOutput(ByteBuffer buffer) {
+        this.buffer = buffer;
+    }
+
+    @Override
+    public ContentType getContentType() {
+        return ContentType.RAW;
+    }
+
+    @Override
+    public Charset getCharset() {
+        return StandardCharsets.UTF_8;
+    }
+
+    @Override
+    public BufferRockerOutput w(String string) throws IOException {
+        buffer.put(string.getBytes(getCharset()));
+        return this;
+    }
+
+    @Override
+    public BufferRockerOutput w(byte[] bytes) throws IOException {
+        buffer.put(bytes);
+        return this;
+    }
+
+    @Override
+    public int getByteLength() {
+        return buffer.remaining();
+    }
+
+    public ByteBuffer toBuffer() {
+        return buffer.flip();
+    }
+
+    public static RockerOutputFactory<BufferRockerOutput> factory() {
+        var cache = new ThreadLocal<BufferRockerOutput>() {
+            @Override
+            protected BufferRockerOutput initialValue() {
+                return new BufferRockerOutput(ByteBuffer.allocateDirect(2048));
+            }
+        };
+        return (contentType, charsetName) -> cache.get().reset();
+    }
+
+    private BufferRockerOutput reset() {
+        buffer.clear();
+        return this;
+    }
+}