Browse Source

jooby: fix postgres pg-client tests (#8409)

- there were some blocked threads (regression while upgrading to jooby 3.x)
- upgrade to latest jooby
Edgar Espina 1 year ago
parent
commit
8c27c79b67

+ 1 - 1
frameworks/Java/jooby/config.toml

@@ -48,7 +48,7 @@ os = "Linux"
 orm = "Raw"
 platform = "Netty"
 webserver = "None"
-versus = "undertow"
+versus = "netty"
 
 [jetty]
 urls.plaintext = "/plaintext"

+ 2 - 2
frameworks/Java/jooby/jooby-mvc.dockerfile

@@ -4,8 +4,8 @@ COPY pom.xml pom.xml
 COPY src src
 COPY public public
 COPY conf conf
-RUN mvn package -q -P undertow
+RUN mvn package -q -P netty
 
 EXPOSE 8080
 
-CMD ["java", "-server", "-Xms2g", "-Xmx2g", "-XX:+UseNUMA", "-XX:+UseParallelGC", "-cp", "target/jooby.jar", "com.techempower.MvcApp"]
+CMD ["java", "-server", "-Xms2g", "-Xmx2g", "-XX:+UseNUMA", "-XX:+UseParallelGC", "-Dio.netty.disableHttpHeadersValidation=true", "-Dio.netty.buffer.checkBounds=false", "-Dio.netty.buffer.checkAccessible=false", "-cp", "target/jooby.jar", "com.techempower.MvcApp"]

+ 1 - 1
frameworks/Java/jooby/pom.xml

@@ -11,7 +11,7 @@
   <name>jooby</name>
 
   <properties>
-    <jooby.version>3.0.0</jooby.version>
+    <jooby.version>3.0.5</jooby.version>
     <dsl-json.version>1.10.0</dsl-json.version>
     <postgresql.version>42.6.0</postgresql.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

+ 13 - 3
frameworks/Java/jooby/src/main/java/com/techempower/Json.java

@@ -1,6 +1,7 @@
 package com.techempower;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 
 import com.dslplatform.json.DslJson;
 import com.dslplatform.json.JsonWriter;
@@ -15,7 +16,7 @@ public class Json {
     }
   };
 
-  public static final ByteBuffer encode(Message data) {
+  public static ByteBuffer encode(Message data) {
     JsonWriter writer = pool.get();
     writer.reset();
     _Message_DslJsonConverter.ObjectFormatConverter converter = new _Message_DslJsonConverter.ObjectFormatConverter(
@@ -24,7 +25,7 @@ public class Json {
     return ByteBuffer.wrap(writer.getByteBuffer(), 0, writer.size());
   }
 
-  public static final ByteBuffer encode(World data) {
+  public static ByteBuffer encode(World data) {
     JsonWriter writer = pool.get();
     writer.reset();
     _World_DslJsonConverter.ObjectFormatConverter converter = new _World_DslJsonConverter.ObjectFormatConverter(
@@ -33,7 +34,16 @@ public class Json {
     return ByteBuffer.wrap(writer.getByteBuffer(), 0, writer.size());
   }
 
-  public static final ByteBuffer encode(World[] data) {
+  public static ByteBuffer encode(World[] data) {
+    JsonWriter writer = pool.get();
+    writer.reset();
+    _World_DslJsonConverter.ObjectFormatConverter converter = new _World_DslJsonConverter.ObjectFormatConverter(
+        dslJson);
+    writer.serialize(data, converter);
+    return ByteBuffer.wrap(writer.getByteBuffer(), 0, writer.size());
+  }
+
+  public static ByteBuffer encode(List<World> data) {
     JsonWriter writer = pool.get();
     writer.reset();
     _World_DslJsonConverter.ObjectFormatConverter converter = new _World_DslJsonConverter.ObjectFormatConverter(

+ 78 - 40
frameworks/Java/jooby/src/main/java/com/techempower/PgClient.java

@@ -1,14 +1,12 @@
 package com.techempower;
 
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.function.BiConsumer;
 
 import com.typesafe.config.Config;
 import io.jooby.SneakyThrows;
 import io.vertx.core.AsyncResult;
-import io.vertx.core.CompositeFuture;
 import io.vertx.core.Future;
 import io.vertx.core.Handler;
 import io.vertx.core.Vertx;
@@ -23,63 +21,103 @@ import io.vertx.sqlclient.Tuple;
 import io.vertx.sqlclient.impl.SqlClientInternal;
 
 public class PgClient {
+
+  static {
+    // Should be all I/O processing for SQL responses
+    System.setProperty("vertx.nettyIORatio", "100");
+  }
+
   private static final String UPDATE_WORLD = "UPDATE world SET randomnumber=$1 WHERE id=$2";
   private static final String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
   private static final String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
-  private SqlClientInternal client;
-  private PreparedQuery<RowSet<Row>> SELECT_WORLD_QUERY;
-  private PreparedQuery<RowSet<Row>> SELECT_FORTUNE_QUERY;
-  private PreparedQuery<RowSet<Row>> UPDATE_WORLD_QUERY;
 
-  public PgClient(Config config) {
-    try {
-      Vertx vertx = Vertx.vertx(
-          new VertxOptions().setPreferNativeTransport(true).setWorkerPoolSize(4));
-      PgConnectOptions connectOptions = pgPoolOptions(config);
-
-      Future future = PgConnection.connect(vertx, connectOptions)
-          .flatMap(conn -> {
-            client = (SqlClientInternal) conn;
-            Future<PreparedStatement> f1 = conn.prepare(SELECT_WORLD);
-            Future<PreparedStatement> f2 = conn.prepare(SELECT_FORTUNE);
-            Future<PreparedStatement> f3 = conn.prepare(UPDATE_WORLD);
-            f1.onSuccess(ps -> SELECT_WORLD_QUERY = ps.query());
-            f2.onSuccess(ps -> SELECT_FORTUNE_QUERY = ps.query());
-            f3.onSuccess(ps -> UPDATE_WORLD_QUERY = ps.query());
-            return Future.all(f1, f2, f3);
-          })
-          .toCompletionStage()
-          .toCompletableFuture()
-          .get();
-      Throwable cause = future.cause();
-      if (cause != null) {
-        throw SneakyThrows.propagate(cause);
+  private static class DbConnection {
+    private PreparedQuery<RowSet<Row>> SELECT_WORLD_QUERY;
+    private PreparedQuery<RowSet<Row>> SELECT_FORTUNE_QUERY;
+    private PreparedQuery<RowSet<Row>> UPDATE_WORLD_QUERY;
+    private SqlClientInternal connection;
+  }
+
+  private static class DbConnectionFactory extends ThreadLocal<DbConnection> {
+
+    private final PgConnectOptions options;
+
+    public DbConnectionFactory(PgConnectOptions options) {
+      this.options = options;
+    }
+
+    private <T> Handler<AsyncResult<T>> onSuccess(Handler<T> handler) {
+      return ar -> {
+        if (ar.succeeded()) {
+          handler.handle(ar.result());
+        }
+      };
+    }
+
+    @Override protected DbConnection initialValue() {
+      try {
+        DbConnection result = new DbConnection();
+        Vertx vertx = Vertx.vertx(
+            new VertxOptions()
+                .setPreferNativeTransport(true)
+                .setEventLoopPoolSize(1)
+                .setWorkerPoolSize(1)
+                .setInternalBlockingPoolSize(1)
+        );
+        var future = PgConnection.connect(vertx, options)
+            .flatMap(conn -> {
+              result.connection = (SqlClientInternal) conn;
+              Future<PreparedStatement> f1 = conn.prepare(SELECT_WORLD)
+                  .andThen(onSuccess(ps -> result.SELECT_WORLD_QUERY = ps.query()));
+              Future<PreparedStatement> f2 = conn.prepare(SELECT_FORTUNE)
+                  .andThen(onSuccess(ps -> result.SELECT_FORTUNE_QUERY = ps.query()));
+              Future<PreparedStatement> f3 = conn.prepare(UPDATE_WORLD)
+                  .andThen(onSuccess(ps -> result.UPDATE_WORLD_QUERY = ps.query()));
+              return Future.join(f1, f2, f3);
+            })
+            .toCompletionStage()
+            .toCompletableFuture()
+            .get();
+
+        Throwable cause = future.cause();
+        if (cause != null) {
+          throw SneakyThrows.propagate(cause);
+        }
+        return result;
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+        throw SneakyThrows.propagate(ex);
+      } catch (ExecutionException ex) {
+        throw SneakyThrows.propagate(ex.getCause());
       }
-    } catch (InterruptedException | ExecutionException cause) {
-      throw SneakyThrows.propagate(cause);
     }
   }
 
+  private final ThreadLocal<DbConnection> sqlClient;
+
+  public PgClient(Config config) {
+    this.sqlClient = new DbConnectionFactory(pgPoolOptions(config));
+  }
+
   public void selectWorld(Tuple row, Handler<AsyncResult<RowSet<Row>>> handler) {
-    SELECT_WORLD_QUERY.execute(row, handler);
+    this.sqlClient.get().SELECT_WORLD_QUERY.execute(row, handler);
   }
 
-  public void selectWorldQuery(int queries,
-      BiConsumer<Integer, PreparedQuery<RowSet<Row>>> consumer) {
-    client.group(c -> {
+  public void selectWorlds(int queries, Handler<AsyncResult<RowSet<Row>>> handler) {
+    this.sqlClient.get().connection.group(c -> {
       for (int i = 0; i < queries; i++) {
-        consumer.accept(i, c.preparedQuery(SELECT_WORLD));
+        c.preparedQuery(SELECT_WORLD).execute(Tuple.of(Util.randomWorld()), handler);
       }
     });
   }
 
-  public PreparedQuery<RowSet<Row>> fortuneQuery() {
-    return SELECT_FORTUNE_QUERY;
+  public void fortunes(Handler<AsyncResult<RowSet<Row>>> handler) {
+    this.sqlClient.get().SELECT_FORTUNE_QUERY.execute(handler);
   }
 
   public void selectWorldForUpdate(int queries,
       BiConsumer<Integer, PreparedQuery<RowSet<Row>>> consumer) {
-    client.group(c -> {
+    this.sqlClient.get().connection.group(c -> {
       PreparedQuery<RowSet<Row>> statement = c.preparedQuery(SELECT_WORLD);
       for (int i = 0; i < queries; i++) {
         consumer.accept(i, statement);
@@ -88,7 +126,7 @@ public class PgClient {
   }
 
   public void updateWorld(List<Tuple> batch, Handler<AsyncResult<RowSet<Row>>> handler) {
-    UPDATE_WORLD_QUERY.executeBatch(batch, handler);
+    this.sqlClient.get().UPDATE_WORLD_QUERY.executeBatch(batch, handler);
   }
 
   private PgConnectOptions pgPoolOptions(Config config) {

+ 15 - 18
frameworks/Java/jooby/src/main/java/com/techempower/ReactivePg.java

@@ -8,7 +8,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.fizzed.rocker.RockerOutputFactory;
 import io.jooby.Context;
@@ -55,22 +54,20 @@ public class ReactivePg extends Jooby {
     /** Multiple queries: */
     get("/queries", ctx -> {
       int queries = Util.queries(ctx);
-      World[] result = new World[queries];
-      client.selectWorldQuery(queries, (index, statement) -> {
-        statement.execute(Tuple.of(randomWorld()), rsp -> {
-          if (rsp.succeeded()) {
-            RowIterator<Row> rs = rsp.result().iterator();
-            Row row = rs.next();
-            result[index] = new World(row.getInteger(0), row.getInteger(1));
-          } else {
-            sendError(ctx, rsp.cause());
-          }
-          // ready?
-          if (index == queries - 1) {
-            ctx.setResponseType(JSON)
-                .send(Json.encode(result));
-          }
-        });
+      var result = new ArrayList<World>(queries);
+      client.selectWorlds(queries, rsp -> {
+        if (rsp.succeeded()) {
+          RowIterator<Row> rs = rsp.result().iterator();
+          Row row = rs.next();
+          result.add(new World(row.getInteger(0), row.getInteger(1)));
+        } else {
+          sendError(ctx, rsp.cause());
+        }
+        // ready?
+        if (result.size() == queries) {
+          ctx.setResponseType(JSON)
+              .send(Json.encode(result));
+        }
       });
       return ctx;
     }).setNonBlocking(true);
@@ -114,7 +111,7 @@ public class ReactivePg extends Jooby {
     /** Fortunes: */
     RockerOutputFactory<ByteBufferOutput> factory = require(RockerOutputFactory.class);
     get("/fortunes", ctx -> {
-      client.fortuneQuery().execute(rsp -> {
+      client.fortunes(rsp -> {
         if (rsp.succeeded()) {
           RowIterator<Row> rs = rsp.result().iterator();
           List<Fortune> fortunes = new ArrayList<>();

+ 1 - 1
frameworks/Kotlin/kooby/kooby.dockerfile

@@ -8,4 +8,4 @@ RUN mvn package -q
 
 EXPOSE 8080
 
-CMD ["java", "-server", "-Xms2g", "-Xmx2g", "-XX:+UseNUMA", "-XX:+UseParallelGC", "-jar", "target/kooby.jar"]
+CMD ["java", "-server", "-Xms2g", "-Xmx2g", "-XX:+UseNUMA", "-XX:+UseParallelGC", "-Dio.netty.disableHttpHeadersValidation=true", "-Dio.netty.buffer.checkBounds=false", "-Dio.netty.buffer.checkAccessible=false", "-jar", "target/kooby.jar"]

+ 2 - 2
frameworks/Kotlin/kooby/pom.xml

@@ -12,7 +12,7 @@
   <name>kooby: jooby+kotlin</name>
 
   <properties>
-    <jooby.version>3.0.0</jooby.version>
+    <jooby.version>3.0.5</jooby.version>
     <postgresql.version>42.6.0</postgresql.version>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <maven.compiler.source>17</maven.compiler.source>
@@ -64,7 +64,7 @@
 
     <dependency>
       <groupId>io.jooby</groupId>
-      <artifactId>jooby-undertow</artifactId>
+      <artifactId>jooby-netty</artifactId>
     </dependency>
 
   </dependencies>