Browse Source

Improve Vertx code. Bump Gradle, Micronaut platform (#8478)

* Improve Vertx code. Bump Gradle, Micronaut platform

* Correct

* Switch to a record

* Update frameworks/Java/micronaut/gradle.properties

Co-authored-by: Sergio del Amo <[email protected]>

* Trigger build

---------

Co-authored-by: Sergio del Amo <[email protected]>
Denis Stepanov 1 year ago
parent
commit
c4a1f769dc
17 changed files with 205 additions and 161 deletions
  1. 0 1
      frameworks/Java/micronaut/common/src/main/java/benchmark/controller/AbstractBenchmarkController.java
  2. 81 0
      frameworks/Java/micronaut/common/src/main/java/benchmark/controller/AsyncBenchmarkController.java
  3. 2 7
      frameworks/Java/micronaut/common/src/main/java/benchmark/controller/BenchmarkController.java
  4. 2 7
      frameworks/Java/micronaut/common/src/main/java/benchmark/controller/ReactiveBenchmarkController.java
  5. 1 28
      frameworks/Java/micronaut/common/src/main/java/benchmark/model/Fortune.java
  6. 17 0
      frameworks/Java/micronaut/common/src/main/java/benchmark/repository/AsyncFortuneRepository.java
  7. 22 0
      frameworks/Java/micronaut/common/src/main/java/benchmark/repository/AsyncWorldRepository.java
  8. 1 1
      frameworks/Java/micronaut/common/src/main/resources/views/fortunes.rocker.html
  9. 1 1
      frameworks/Java/micronaut/gradle.properties
  10. BIN
      frameworks/Java/micronaut/gradle/wrapper/gradle-wrapper.jar
  11. 1 1
      frameworks/Java/micronaut/gradle/wrapper/gradle-wrapper.properties
  12. 9 8
      frameworks/Java/micronaut/gradlew
  13. 2 2
      frameworks/Java/micronaut/micronaut-jdbc/src/main/java/benchmark/JdbcFortuneRepository.java
  14. 2 2
      frameworks/Java/micronaut/micronaut-r2dbc/src/main/java/benchmark/R2dbcFortuneRepository.java
  15. 26 62
      frameworks/Java/micronaut/micronaut-vertx-pg-client/src/main/java/benchmark/AbstractVertxSqlClientRepository.java
  16. 10 10
      frameworks/Java/micronaut/micronaut-vertx-pg-client/src/main/java/benchmark/VertxPgFortuneRepository.java
  17. 28 31
      frameworks/Java/micronaut/micronaut-vertx-pg-client/src/main/java/benchmark/VertxPgWorldRepository.java

+ 0 - 1
frameworks/Java/micronaut/common/src/main/java/benchmark/controller/AbstractBenchmarkController.java

@@ -7,7 +7,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 public class AbstractBenchmarkController {

+ 81 - 0
frameworks/Java/micronaut/common/src/main/java/benchmark/controller/AsyncBenchmarkController.java

@@ -0,0 +1,81 @@
+package benchmark.controller;
+
+import benchmark.model.Fortune;
+import benchmark.model.World;
+import benchmark.repository.AsyncFortuneRepository;
+import benchmark.repository.AsyncWorldRepository;
+import io.micronaut.context.annotation.Requires;
+import io.micronaut.http.HttpResponse;
+import io.micronaut.http.annotation.Controller;
+import io.micronaut.http.annotation.Get;
+import io.micronaut.http.annotation.QueryValue;
+import views.fortunes;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+
+import static java.util.Comparator.comparing;
+
+@Requires(beans = {AsyncWorldRepository.class, AsyncFortuneRepository.class})
+@Controller
+public class AsyncBenchmarkController extends AbstractBenchmarkController {
+
+    private final AsyncWorldRepository worldRepository;
+    private final AsyncFortuneRepository fortuneRepository;
+
+    public AsyncBenchmarkController(AsyncWorldRepository worldRepository,
+                                    AsyncFortuneRepository fortuneRepository) {
+        this.worldRepository = worldRepository;
+        this.fortuneRepository = fortuneRepository;
+    }
+
+    @Get("/prepare-data-for-test")
+    public CompletionStage<?> prepareDataForTest() {
+        return worldRepository.initDb(createWords()).thenCompose(ignore -> fortuneRepository.initDb(createFortunes()));
+    }
+
+    // https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview#single-database-query
+    @Get("/db")
+    public CompletionStage<World> db() {
+        return worldRepository.findById(randomId());
+    }
+
+    // https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview#multiple-database-queries
+    @Get("/queries")
+    public CompletionStage<List<World>> queries(@QueryValue String queries) {
+        int count = parseQueryCount(queries);
+        List<Integer> ids = new ArrayList<>(count);
+        for (int i = 0; i < count; i++) {
+            ids.add(randomId());
+        }
+        return worldRepository.findByIds(ids);
+    }
+
+    // https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview#fortunes
+    @Get(value = "/fortunes", produces = "text/html;charset=utf-8")
+    public CompletionStage<HttpResponse<String>> fortune() {
+        return fortuneRepository.findAll().thenApply(fortuneList -> {
+            List<Fortune> all = new ArrayList<>(fortuneList.size() + 1);
+            all.add(new Fortune(0, "Additional fortune added at request time."));
+            all.addAll(fortuneList);
+            all.sort(comparing(Fortune::message));
+            String body = fortunes.template(all).render().toString();
+            return HttpResponse.ok(body).contentType("text/html;charset=utf-8");
+        });
+    }
+
+    // https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview#database-updates
+    @Get("/updates")
+    public CompletionStage<List<World>> updates(@QueryValue String queries) {
+        return queries(queries).thenCompose(worlds -> {
+            for (World world : worlds) {
+                world.setRandomNumber(randomWorldNumber());
+            }
+            worlds.sort(Comparator.comparingInt(World::getId)); // Avoid deadlock
+            return worldRepository.updateAll(worlds).thenApply(ignore -> worlds);
+        });
+    }
+
+}

+ 2 - 7
frameworks/Java/micronaut/common/src/main/java/benchmark/controller/BenchmarkController.java

@@ -14,19 +14,14 @@ import io.micronaut.scheduling.annotation.ExecuteOn;
 import views.fortunes;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static java.util.Comparator.comparing;
 
 @ExecuteOn(TaskExecutors.IO)
-@Requires(missingBeans = ReactiveBenchmarkController.class)
+@Requires(beans = {WorldRepository.class, FortuneRepository.class})
 @Controller
 public class BenchmarkController extends AbstractBenchmarkController {
 
@@ -69,7 +64,7 @@ public class BenchmarkController extends AbstractBenchmarkController {
         List<Fortune> fortunesList = new ArrayList<>(all.size() + 1);
         fortunesList.add(new Fortune(0, "Additional fortune added at request time."));
         fortunesList.addAll(all);
-        fortunesList.sort(comparing(Fortune::getMessage));
+        fortunesList.sort(comparing(Fortune::message));
         String body = fortunes.template(fortunesList).render().toString();
         return HttpResponse.ok(body).contentType("text/html;charset=utf-8");
     }

+ 2 - 7
frameworks/Java/micronaut/common/src/main/java/benchmark/controller/ReactiveBenchmarkController.java

@@ -7,7 +7,6 @@ import benchmark.repository.ReactiveWorldRepository;
 import io.micronaut.context.annotation.Requires;
 import io.micronaut.core.async.annotation.SingleResult;
 import io.micronaut.http.HttpResponse;
-import io.micronaut.http.MutableHttpResponse;
 import io.micronaut.http.annotation.Controller;
 import io.micronaut.http.annotation.Get;
 import io.micronaut.http.annotation.QueryValue;
@@ -17,16 +16,12 @@ import reactor.core.publisher.Mono;
 import views.fortunes;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static java.util.Comparator.comparing;
 
-@Requires(beans = {ReactiveFortuneRepository.class, ReactiveWorldRepository.class})
+@Requires(beans = {ReactiveWorldRepository.class, ReactiveFortuneRepository.class})
 @Controller
 public class ReactiveBenchmarkController extends AbstractBenchmarkController {
 
@@ -71,7 +66,7 @@ public class ReactiveBenchmarkController extends AbstractBenchmarkController {
             List<Fortune> all = new ArrayList<>(fortuneList.size() + 1);
             all.add(new Fortune(0, "Additional fortune added at request time."));
             all.addAll(fortuneList);
-            all.sort(comparing(Fortune::getMessage));
+            all.sort(comparing(Fortune::message));
             String body = fortunes.template(all).render().toString();
             return HttpResponse.ok(body).contentType("text/html;charset=utf-8");
         });

+ 1 - 28
frameworks/Java/micronaut/common/src/main/java/benchmark/model/Fortune.java

@@ -7,34 +7,7 @@ import java.util.Objects;
 
 // Disable escape to have case-insensitive Postgres columns and table name and set lowercase name for MongoDB
 @MappedEntity(value = "fortune", escape = false)
-public class Fortune {
-    @Id
-    private int id;
-    private String message;
-
-    public Fortune() {
-    }
-
-    public Fortune(int id, String message) {
-        this.id = id;
-        this.message = message;
-    }
-
-    public int getId() {
-        return id;
-    }
-
-    public void setId(int id) {
-        this.id = id;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-
-    public void setMessage(String message) {
-        this.message = message;
-    }
+public record Fortune(@Id int id, String message) {
 
     @Override
     public boolean equals(Object o) {

+ 17 - 0
frameworks/Java/micronaut/common/src/main/java/benchmark/repository/AsyncFortuneRepository.java

@@ -0,0 +1,17 @@
+package benchmark.repository;
+
+import benchmark.model.Fortune;
+import org.reactivestreams.Publisher;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+public interface AsyncFortuneRepository {
+
+    CompletionStage<?> initDb(Collection<Fortune> fortunes);
+
+    CompletionStage<List<Fortune>> findAll();
+
+}

+ 22 - 0
frameworks/Java/micronaut/common/src/main/java/benchmark/repository/AsyncWorldRepository.java

@@ -0,0 +1,22 @@
+package benchmark.repository;
+
+import benchmark.model.World;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+public interface AsyncWorldRepository {
+
+    CompletionStage<?> initDb(Collection<World> worlds);
+
+    CompletionStage<World> findById(Integer id);
+
+    CompletionStage<List<World>> findByIds(List<Integer> ids);
+
+    CompletionStage<?> updateAll(Collection<World> worlds);
+
+}

+ 1 - 1
frameworks/Java/micronaut/common/src/main/resources/views/fortunes.rocker.html

@@ -3,6 +3,6 @@
 @args(List fortunes)
 <!DOCTYPE html><html><head><title>Fortunes</title></head><body><table><tr><th>id</th><th>message</th></tr>
     @for ((ForIterator i, Fortune fortune) : fortunes) {
-    <tr><td>@fortune.getId()</td><td>@fortune.getMessage()</td></tr>
+    <tr><td>@fortune.id()</td><td>@fortune.message()</td></tr>
     }
 </table></body></html>

+ 1 - 1
frameworks/Java/micronaut/gradle.properties

@@ -1 +1 @@
-micronautVersion = 4.1.1
+micronautVersion = 4.1.4

BIN
frameworks/Java/micronaut/gradle/wrapper/gradle-wrapper.jar


+ 1 - 1
frameworks/Java/micronaut/gradle/wrapper/gradle-wrapper.properties

@@ -1,6 +1,6 @@
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-rc-1-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-bin.zip
 networkTimeout=10000
 validateDistributionUrl=true
 zipStoreBase=GRADLE_USER_HOME

+ 9 - 8
frameworks/Java/micronaut/gradlew

@@ -83,7 +83,8 @@ done
 # This is normally unused
 # shellcheck disable=SC2034
 APP_BASE_NAME=${0##*/}
-APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
+# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036)
+APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit
 
 # Use the maximum available, or set MAX_FD != -1 to use that value.
 MAX_FD=maximum
@@ -144,7 +145,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
     case $MAX_FD in #(
       max*)
         # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked.
-        # shellcheck disable=SC3045
+        # shellcheck disable=SC2039,SC3045
         MAX_FD=$( ulimit -H -n ) ||
             warn "Could not query maximum file descriptor limit"
     esac
@@ -152,7 +153,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
       '' | soft) :;; #(
       *)
         # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked.
-        # shellcheck disable=SC3045
+        # shellcheck disable=SC2039,SC3045
         ulimit -n "$MAX_FD" ||
             warn "Could not set maximum file descriptor limit to $MAX_FD"
     esac
@@ -201,11 +202,11 @@ fi
 # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
 DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
 
-# Collect all arguments for the java command;
-#   * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
-#     shell script including quotes and variable substitutions, so put them in
-#     double quotes to make sure that they get re-expanded; and
-#   * put everything else in single quotes, so that it's not re-expanded.
+# Collect all arguments for the java command:
+#   * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments,
+#     and any embedded shellness will be escaped.
+#   * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be
+#     treated as '${Hostname}' itself on the command line.
 
 set -- \
         "-Dorg.gradle.appname=$APP_BASE_NAME" \

+ 2 - 2
frameworks/Java/micronaut/micronaut-jdbc/src/main/java/benchmark/JdbcFortuneRepository.java

@@ -30,8 +30,8 @@ public class JdbcFortuneRepository implements FortuneRepository {
             connection.createStatement().execute("CREATE TABLE Fortune (id INTEGER NOT NULL,message VARCHAR(255) NOT NULL);");
             try (PreparedStatement statement = connection.prepareStatement("INSERT INTO fortune VALUES (?, ?);")) {
                 for (Fortune fortune : fortunes) {
-                    statement.setInt(1, fortune.getId());
-                    statement.setString(2, fortune.getMessage());
+                    statement.setInt(1, fortune.id());
+                    statement.setString(2, fortune.message());
                     statement.addBatch();
                 }
                 statement.executeBatch();

+ 2 - 2
frameworks/Java/micronaut/micronaut-r2dbc/src/main/java/benchmark/R2dbcFortuneRepository.java

@@ -48,8 +48,8 @@ public class R2dbcFortuneRepository implements ReactiveFortuneRepository {
             } else {
                 first = false;
             }
-            statement.bind(0, fortune.getId());
-            statement.bind(1, fortune.getMessage());
+            statement.bind(0, fortune.id());
+            statement.bind(1, fortune.message());
         }
         return Flux.from(statement.execute()).flatMap(Result::getRowsUpdated).then();
     }

+ 26 - 62
frameworks/Java/micronaut/micronaut-vertx-pg-client/src/main/java/benchmark/AbstractVertxSqlClientRepository.java

@@ -1,18 +1,16 @@
 package benchmark;
 
-import io.vertx.core.AsyncResult;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
 import io.vertx.sqlclient.Pool;
 import io.vertx.sqlclient.Row;
-import io.vertx.sqlclient.RowIterator;
 import io.vertx.sqlclient.RowSet;
 import io.vertx.sqlclient.SqlClient;
 import io.vertx.sqlclient.Tuple;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletionStage;
 import java.util.function.Function;
 
 public class AbstractVertxSqlClientRepository {
@@ -23,72 +21,38 @@ public class AbstractVertxSqlClientRepository {
         this.client = client;
     }
 
-    protected Flux<Row> execute(String sql) {
-        return Flux.defer(() -> {
-            Sinks.Many<Row> sink = Sinks.many().multicast().onBackpressureBuffer();
-            client.preparedQuery(sql).execute(event -> mapResult(sink, event));
-            return sink.asFlux();
-        });
+    protected CompletionStage<?> execute(String sql) {
+        return client.preparedQuery(sql).execute().toCompletionStage();
     }
 
-    protected <T> Mono<T> executeAndCollectOne(String sql, Tuple tuple, Function<Row, T> mapper) {
-        Sinks.One<T> sink = Sinks.one();
-        client.preparedQuery(sql).execute(tuple, event -> {
-            if (event.failed()) {
-                sink.emitError(event.cause(), Sinks.EmitFailureHandler.FAIL_FAST);
-            } else  {
-                RowIterator<Row> iterator = event.result().iterator();
-                if (iterator.hasNext()) {
-                    sink.emitValue(mapper.apply(iterator.next()), Sinks.EmitFailureHandler.FAIL_FAST);
-                } else {
-                    sink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST);
-                }
-            }
-        });
-        return sink.asMono();
+    protected <T> CompletionStage<T> executeAndCollectOne(String sql, Tuple tuple, Function<Row, T> mapper) {
+        return client.preparedQuery(sql).execute(tuple).map(rows -> mapper.apply(rows.iterator().next()))
+                .toCompletionStage();
     }
 
-    protected <T> Mono<List<T>> executeAndCollectList(String sql, Function<Row, T> mapper) {
-        Sinks.One<List<T>> sink = Sinks.one();
-        client.preparedQuery(sql).execute(event -> {
-            if (event.failed()) {
-                sink.emitError(event.cause(), Sinks.EmitFailureHandler.FAIL_FAST);
-            } else {
-                List<T> list = new ArrayList<>();
-                for (Row row : event.result()) {
-                    list.add(mapper.apply(row));
-                }
-                sink.emitValue(list, Sinks.EmitFailureHandler.FAIL_FAST);
+    protected <T> CompletionStage<List<T>> executeAndCollectList(String sql, Function<Row, T> mapper) {
+        return client.preparedQuery(sql).execute().map(rows -> {
+            List<T> result = new ArrayList<>(rows.size());
+            for (Row row : rows) {
+                result.add(mapper.apply(row));
             }
-        });
-        return sink.asMono();
-    }
-
-    protected Flux<Row> execute(SqlClient sqlClient, String sql, Tuple data) {
-        return Flux.defer(() -> {
-            Sinks.Many<Row> sink = Sinks.many().multicast().onBackpressureBuffer();
-            sqlClient.preparedQuery(sql).execute(data, event -> mapResult(sink, event));
-            return sink.asFlux();
-        });
+            return result;
+        }).toCompletionStage();
     }
 
-    protected Flux<Row> executeBatch(String sql, List<Tuple> data) {
-        return Flux.defer(() -> {
-            Sinks.Many<Row> sink = Sinks.many().multicast().onBackpressureBuffer();
-            client.preparedQuery(sql).executeBatch(data, event -> mapResult(sink, event));
-            return sink.asFlux();
-        });
+    protected <T> Future<List<T>> executeMany(SqlClient sqlClient, String sql, List<Tuple> data, Function<Row, T> mapper) {
+        List<Future<T>> futures = new ArrayList<>(data.size());
+        Function<RowSet<Row>, T> rowsMapper = rows -> mapper.apply(rows.iterator().next());
+        for (Tuple d : data) {
+            futures.add(
+                    sqlClient.preparedQuery(sql).execute(d).map(rowsMapper)
+            );
+        }
+        return Future.all(futures).map(CompositeFuture::list);
     }
 
-    private void mapResult(Sinks.Many<Row> sink, AsyncResult<RowSet<Row>> event) {
-        if (event.failed()) {
-            sink.emitError(event.cause(), Sinks.EmitFailureHandler.FAIL_FAST);
-        } else {
-            for (Row row : event.result()) {
-                sink.emitNext(row, Sinks.EmitFailureHandler.FAIL_FAST);
-            }
-            sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
-        }
+    protected CompletionStage<?> executeBatch(String sql, List<Tuple> data) {
+        return client.preparedQuery(sql).executeBatch(data).toCompletionStage();
     }
 
 }

+ 10 - 10
frameworks/Java/micronaut/micronaut-vertx-pg-client/src/main/java/benchmark/VertxPgFortuneRepository.java

@@ -1,36 +1,36 @@
 package benchmark;
 
 import benchmark.model.Fortune;
-import benchmark.repository.ReactiveFortuneRepository;
+import benchmark.repository.AsyncFortuneRepository;
 import io.vertx.sqlclient.Pool;
 import io.vertx.sqlclient.Tuple;
 import jakarta.inject.Singleton;
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Mono;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletionStage;
 import java.util.stream.Collectors;
 
 @Singleton
-public class VertxPgFortuneRepository extends AbstractVertxSqlClientRepository implements ReactiveFortuneRepository {
+public class VertxPgFortuneRepository extends AbstractVertxSqlClientRepository implements AsyncFortuneRepository {
 
     public VertxPgFortuneRepository(Pool client) {
         super(client);
     }
 
-    private Mono<Void> createTable() {
-        return execute("DROP TABLE IF EXISTS Fortune;").then(execute("CREATE TABLE Fortune (id INTEGER NOT NULL,message VARCHAR(255) NOT NULL);").then());
+    private CompletionStage<?> createTable() {
+        return execute("DROP TABLE IF EXISTS Fortune;")
+                .thenCompose(ignore -> execute("CREATE TABLE Fortune (id INTEGER NOT NULL,message VARCHAR(255) NOT NULL);"));
     }
 
     @Override
-    public Publisher<Void> initDb(Collection<Fortune> fortunes) {
-        List<Tuple> data = fortunes.stream().map(fortune -> Tuple.of(fortune.getId(), fortune.getMessage())).collect(Collectors.toList());
-        return createTable().then(executeBatch("INSERT INTO Fortune VALUES ($1, $2);", data).then());
+    public CompletionStage<?> initDb(Collection<Fortune> fortunes) {
+        List<Tuple> data = fortunes.stream().map(fortune -> Tuple.of(fortune.id(), fortune.message())).collect(Collectors.toList());
+        return createTable().thenCompose(ignore -> executeBatch("INSERT INTO Fortune VALUES ($1, $2);", data));
     }
 
     @Override
-    public Publisher<List<Fortune>> findAll() {
+    public CompletionStage<List<Fortune>> findAll() {
         return executeAndCollectList("SELECT * FROM Fortune", row -> new Fortune(row.getInteger(0), row.getString(1)));
     }
 

+ 28 - 31
frameworks/Java/micronaut/micronaut-vertx-pg-client/src/main/java/benchmark/VertxPgWorldRepository.java

@@ -1,64 +1,61 @@
 package benchmark;
 
 import benchmark.model.World;
-import benchmark.repository.ReactiveWorldRepository;
-import io.vertx.core.Future;
+import benchmark.repository.AsyncWorldRepository;
 import io.vertx.sqlclient.Pool;
+import io.vertx.sqlclient.Row;
 import io.vertx.sqlclient.Tuple;
 import jakarta.inject.Singleton;
-import org.reactivestreams.Publisher;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 @Singleton
-public class VertxPgWorldRepository extends AbstractVertxSqlClientRepository implements ReactiveWorldRepository {
+public class VertxPgWorldRepository extends AbstractVertxSqlClientRepository implements AsyncWorldRepository {
+
+    private static final Function<Row, World> ROW_TO_WORLD_MAPPER = row -> new World(row.getInteger(0), row.getInteger(1));
 
     public VertxPgWorldRepository(Pool client) {
         super(client);
     }
 
-    private Mono<Void> createTable() {
-        return execute("DROP TABLE IF EXISTS World;").then(execute("CREATE TABLE World (id INTEGER NOT NULL,randomNumber INTEGER NOT NULL);").then());
+    private CompletionStage<?> createTable() {
+        return execute("DROP TABLE IF EXISTS World;").thenCompose(ignore -> execute("CREATE TABLE World (id INTEGER NOT NULL,randomNumber INTEGER NOT NULL);"));
     }
 
     @Override
-    public Mono<Void> initDb(Collection<World> worlds) {
+    public CompletionStage<?> initDb(Collection<World> worlds) {
         List<Tuple> data = worlds.stream().map(world -> Tuple.of(world.getId(), world.getRandomNumber())).collect(Collectors.toList());
-        return createTable().then(executeBatch("INSERT INTO world VALUES ($1, $2);", data).then());
+        return createTable().thenCompose(ignore -> executeBatch("INSERT INTO world VALUES ($1, $2);", data));
     }
 
     @Override
-    public Publisher<World> findById(Integer id) {
-        return executeAndCollectOne("SELECT * FROM world WHERE id = $1", Tuple.of(id), row -> new World(row.getInteger(0), row.getInteger(1)));
+    public CompletionStage<World> findById(Integer id) {
+        return executeAndCollectOne("SELECT * FROM world WHERE id = $1", Tuple.of(id), ROW_TO_WORLD_MAPPER);
     }
 
     @Override
-    public Publisher<List<World>> findByIds(List<Integer> ids) {
-        return asMono(
-                client.withConnection(sqlConnection -> asFuture(
-                        Flux.fromIterable(ids)
-                                .flatMap(id -> execute(sqlConnection, "SELECT * FROM world WHERE id = $1", Tuple.of(id))
-                                        .map(row -> new World(row.getInteger(0), row.getInteger(1))))
-                                .collectList()))
-        );
-    }
-
-    private <T> Mono<T> asMono(Future<T> future) {
-        return Mono.fromFuture(future.toCompletionStage().toCompletableFuture());
-    }
-
-    private <T> Future<T> asFuture(Mono<T> mono) {
-        return Future.fromCompletionStage(mono.toFuture());
+    public CompletionStage<List<World>> findByIds(List<Integer> ids) {
+        List<Tuple> data = new ArrayList<>(ids.size());
+        for (Integer id : ids) {
+            data.add(Tuple.of(id));
+        }
+        return client.withConnection(sqlConnection ->
+                executeMany(sqlConnection, "SELECT * FROM world WHERE id = $1", data, ROW_TO_WORLD_MAPPER))
+                .toCompletionStage();
     }
 
     @Override
-    public Publisher<Void> updateAll(Collection<World> worlds) {
-        List<Tuple> data = worlds.stream().map(world -> Tuple.of(world.getId(), world.getRandomNumber())).collect(Collectors.toList());
-        return executeBatch("UPDATE world SET randomnumber = $2 WHERE id = $1", data).then();
+    public CompletionStage<?> updateAll(Collection<World> worlds) {
+        List<Tuple> data = new ArrayList<>(worlds.size());
+        for (World world : worlds) {
+            data.add(Tuple.of(world.getId(), world.getRandomNumber()));
+        }
+        return executeBatch("UPDATE world SET randomnumber = $2 WHERE id = $1", data);
     }
 
 }