|
@@ -8,27 +8,25 @@ import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
import java.util.logging.Logger;
|
|
|
|
|
|
-import io.helidon.common.reactive.Multi;
|
|
|
-import io.helidon.common.reactive.Single;
|
|
|
import io.helidon.config.Config;
|
|
|
+
|
|
|
import io.vertx.core.Vertx;
|
|
|
import io.vertx.core.VertxOptions;
|
|
|
+import io.vertx.core.Future;
|
|
|
import io.vertx.pgclient.PgConnectOptions;
|
|
|
import io.vertx.pgclient.PgPool;
|
|
|
import io.vertx.sqlclient.PoolOptions;
|
|
|
+import io.vertx.sqlclient.PreparedQuery;
|
|
|
import io.vertx.sqlclient.Row;
|
|
|
+import io.vertx.sqlclient.RowSet;
|
|
|
import io.vertx.sqlclient.SqlClient;
|
|
|
import io.vertx.sqlclient.Tuple;
|
|
|
-import jakarta.json.JsonArray;
|
|
|
-import jakarta.json.JsonArrayBuilder;
|
|
|
-import jakarta.json.JsonObject;
|
|
|
|
|
|
import static io.helidon.benchmark.nima.models.DbRepository.randomWorldNumber;
|
|
|
|
|
|
public class PgClientRepository implements DbRepository {
|
|
|
private static final Logger LOGGER = Logger.getLogger(PgClientRepository.class.getName());
|
|
|
|
|
|
-
|
|
|
private final SqlClient queryPool;
|
|
|
private final SqlClient updatePool;
|
|
|
|
|
@@ -36,9 +34,13 @@ public class PgClientRepository implements DbRepository {
|
|
|
private final long updateTimeout;
|
|
|
private final int maxRetries;
|
|
|
|
|
|
+ private final PreparedQuery<RowSet<Row>> getFortuneQuery;
|
|
|
+ private final PreparedQuery<RowSet<Row>> getWorldQuery;
|
|
|
+ private final PreparedQuery<RowSet<Row>> updateWorldQuery;
|
|
|
+
|
|
|
public PgClientRepository(Config config) {
|
|
|
Vertx vertx = Vertx.vertx(new VertxOptions()
|
|
|
- .setPreferNativeTransport(true));
|
|
|
+ .setPreferNativeTransport(true));
|
|
|
PgConnectOptions connectOptions = new PgConnectOptions()
|
|
|
.setPort(config.get("port").asInt().orElse(5432))
|
|
|
.setCachePreparedStatements(config.get("cache-prepared-statements").asBoolean().orElse(true))
|
|
@@ -59,31 +61,20 @@ public class PgClientRepository implements DbRepository {
|
|
|
|
|
|
queryPool = PgPool.client(vertx, connectOptions, clientOptions);
|
|
|
updatePool = PgPool.client(vertx, connectOptions, clientOptions);
|
|
|
- }
|
|
|
|
|
|
- @Override
|
|
|
- public JsonObject getWorldAsJson(int id) {
|
|
|
- return getWorld(id, queryPool).map(World::toJson).await();
|
|
|
+ getWorldQuery = queryPool.preparedQuery("SELECT id, randomnumber FROM world WHERE id = $1");
|
|
|
+ updateWorldQuery = queryPool.preparedQuery("UPDATE world SET randomnumber = $1 WHERE id = $2");
|
|
|
+ getFortuneQuery = queryPool.preparedQuery("SELECT id, message FROM fortune");
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public World getWorld(int id) {
|
|
|
try {
|
|
|
- return getWorld(id, queryPool).toCompletableFuture().get();
|
|
|
- } catch (Exception e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public JsonArray getWorldsAsJson(int count) {
|
|
|
- try {
|
|
|
- return Multi.range(0, count)
|
|
|
- .flatMap(i -> getWorld(randomWorldNumber(), queryPool))
|
|
|
- .map(World::toJson)
|
|
|
- .reduce(JSON::createArrayBuilder, JsonArrayBuilder::add)
|
|
|
- .map(JsonArrayBuilder::build)
|
|
|
- .await();
|
|
|
+ return getWorldQuery.execute(Tuple.of(id))
|
|
|
+ .map(rows -> {
|
|
|
+ Row r = rows.iterator().next();
|
|
|
+ return new World(r.getInteger(0), r.getInteger(1));
|
|
|
+ }).toCompletionStage().toCompletableFuture().get();
|
|
|
} catch (Exception e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
@@ -92,17 +83,15 @@ public class PgClientRepository implements DbRepository {
|
|
|
@Override
|
|
|
public List<World> getWorlds(int count) {
|
|
|
try {
|
|
|
- List<World> result = new ArrayList<>(count);
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
for (int i = 0; i < count; i++) {
|
|
|
- World world = queryPool.preparedQuery("SELECT id, randomnumber FROM world WHERE id = $1")
|
|
|
- .execute(Tuple.of(randomWorldNumber()))
|
|
|
- .map(rows -> {
|
|
|
- Row r = rows.iterator().next();
|
|
|
- return new World(r.getInteger(0), r.getInteger(1));
|
|
|
- }).toCompletionStage().toCompletableFuture().get();
|
|
|
- result.add(world);
|
|
|
+ futures.add(getWorldQuery.execute(Tuple.of(randomWorldNumber()))
|
|
|
+ .map(rows -> {
|
|
|
+ Row r = rows.iterator().next();
|
|
|
+ return new World(r.getInteger(0), r.getInteger(1));
|
|
|
+ }));
|
|
|
}
|
|
|
- return result;
|
|
|
+ return Future.all(futures).toCompletionStage().toCompletableFuture().get().list();
|
|
|
} catch (Exception e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
@@ -110,10 +99,14 @@ public class PgClientRepository implements DbRepository {
|
|
|
|
|
|
@Override
|
|
|
public World updateWorld(World world) {
|
|
|
- return Single.create(queryPool.preparedQuery("UPDATE world SET randomnumber = $1 WHERE id = $2")
|
|
|
- .execute(Tuple.of(world.id, world.id))
|
|
|
- .toCompletionStage()
|
|
|
- .thenApply(rows -> world)).await();
|
|
|
+ try {
|
|
|
+ return updateWorldQuery.execute(Tuple.of(world.id, world.id))
|
|
|
+ .toCompletionStage()
|
|
|
+ .thenApply(rows -> world)
|
|
|
+ .toCompletableFuture().get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -165,25 +158,18 @@ public class PgClientRepository implements DbRepository {
|
|
|
|
|
|
@Override
|
|
|
public List<Fortune> getFortunes() {
|
|
|
- return Single.create(queryPool.preparedQuery("SELECT id, message FROM fortune")
|
|
|
- .execute()
|
|
|
- .map(rows -> {
|
|
|
- List<Fortune> fortunes = new ArrayList<>(rows.size() + 1);
|
|
|
- for (Row r : rows) {
|
|
|
- fortunes.add(new Fortune(r.getInteger(0), r.getString(1)));
|
|
|
- }
|
|
|
- return fortunes;
|
|
|
- }).toCompletionStage()).await();
|
|
|
- }
|
|
|
-
|
|
|
- private static Single<World> getWorld(int id, SqlClient pool) {
|
|
|
- return Single.create(pool.preparedQuery("SELECT id, randomnumber FROM world WHERE id = $1")
|
|
|
- .execute(Tuple.of(id))
|
|
|
- .map(rows -> {
|
|
|
- Row r = rows.iterator().next();
|
|
|
- return new World(r.getInteger(0), r.getInteger(1));
|
|
|
- }).toCompletionStage());
|
|
|
-
|
|
|
+ try {
|
|
|
+ return getFortuneQuery.execute()
|
|
|
+ .map(rows -> {
|
|
|
+ List<Fortune> fortunes = new ArrayList<>(rows.size() + 1);
|
|
|
+ for (Row r : rows) {
|
|
|
+ fortunes.add(new Fortune(r.getInteger(0), r.getString(1)));
|
|
|
+ }
|
|
|
+ return fortunes;
|
|
|
+ }).toCompletionStage().toCompletableFuture().get();
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private CompletableFuture<List<World>> updateWorlds(List<World> worlds, int from, SqlClient pool) {
|
|
@@ -193,8 +179,7 @@ public class PgClientRepository implements DbRepository {
|
|
|
World w = worlds.get(i);
|
|
|
tuples.add(Tuple.of(w.randomNumber, w.id));
|
|
|
}
|
|
|
- return pool.preparedQuery("UPDATE world SET randomnumber = $1 WHERE id = $2")
|
|
|
- .executeBatch(tuples)
|
|
|
+ return updateWorldQuery.executeBatch(tuples)
|
|
|
.toCompletionStage()
|
|
|
.thenApply(rows -> worlds)
|
|
|
.toCompletableFuture();
|