|
@@ -6,10 +6,13 @@ import io.jooby.Jooby;
|
|
|
import io.jooby.json.JacksonModule;
|
|
|
import io.jooby.rocker.RockerModule;
|
|
|
import io.reactiverse.pgclient.PgClient;
|
|
|
+import io.reactiverse.pgclient.PgConnection;
|
|
|
import io.reactiverse.pgclient.PgIterator;
|
|
|
import io.reactiverse.pgclient.PgPoolOptions;
|
|
|
import io.reactiverse.pgclient.Row;
|
|
|
import io.reactiverse.pgclient.Tuple;
|
|
|
+import io.vertx.core.Vertx;
|
|
|
+import io.vertx.core.VertxOptions;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
@@ -30,7 +33,9 @@ public class ReactivePg extends Jooby {
|
|
|
|
|
|
{
|
|
|
/** PG client: */
|
|
|
- PgClients pool = database(getConfig().getConfig("db"));
|
|
|
+ Vertx vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
|
|
|
+ PgPoolOptions options = pgPoolOptions(getConfig().getConfig("db"));
|
|
|
+ PgClients clients = PgClients.create(vertx, new PgPoolOptions(options).setMaxSize(1));
|
|
|
|
|
|
/** Template engine: */
|
|
|
install(new RockerModule());
|
|
@@ -41,7 +46,7 @@ public class ReactivePg extends Jooby {
|
|
|
|
|
|
/** Single query: */
|
|
|
get("/db", ctx -> {
|
|
|
- pool.next().preparedQuery(SELECT_WORLD, Tuple.of(randomWorld()), rsp -> {
|
|
|
+ clients.next().preparedQuery(SELECT_WORLD, Tuple.of(randomWorld()), rsp -> {
|
|
|
try {
|
|
|
if (rsp.succeeded()) {
|
|
|
PgIterator rs = rsp.result().iterator();
|
|
@@ -64,7 +69,7 @@ public class ReactivePg extends Jooby {
|
|
|
AtomicInteger counter = new AtomicInteger();
|
|
|
AtomicBoolean failed = new AtomicBoolean(false);
|
|
|
World[] result = new World[queries];
|
|
|
- PgClient client = pool.next();
|
|
|
+ PgClient client = clients.next();
|
|
|
for (int i = 0; i < result.length; i++) {
|
|
|
client.preparedQuery(SELECT_WORLD, Tuple.of(randomWorld()), rsp -> {
|
|
|
if (rsp.succeeded()) {
|
|
@@ -96,48 +101,58 @@ public class ReactivePg extends Jooby {
|
|
|
World[] result = new World[queries];
|
|
|
AtomicInteger counter = new AtomicInteger(0);
|
|
|
AtomicBoolean failed = new AtomicBoolean(false);
|
|
|
- PgClient client = pool.next();
|
|
|
- for (int i = 0; i < queries; i++) {
|
|
|
- client.preparedQuery(SELECT_WORLD, Tuple.of(randomWorld()), rsp -> {
|
|
|
- if (rsp.succeeded()) {
|
|
|
- PgIterator rs = rsp.result().iterator();
|
|
|
- Tuple row = rs.next();
|
|
|
- World world = new World(row.getInteger(0), row.getInteger(1));
|
|
|
- world.randomNumber = randomWorld();
|
|
|
- result[counter.get()] = world;
|
|
|
- } else {
|
|
|
- if (failed.compareAndSet(false, true)) {
|
|
|
- ctx.sendError(rsp.cause());
|
|
|
- }
|
|
|
+ clients.next().getConnection(ar -> {
|
|
|
+ if (ar.failed()) {
|
|
|
+ if (failed.compareAndSet(false, true)) {
|
|
|
+ ctx.sendError(ar.cause());
|
|
|
}
|
|
|
-
|
|
|
- if (counter.incrementAndGet() == queries && !failed.get()) {
|
|
|
- List<Tuple> batch = new ArrayList<>(queries);
|
|
|
- for (World world : result) {
|
|
|
- batch.add(Tuple.of(world.randomNumber, world.id));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ PgConnection conn = ar.result();
|
|
|
+ for (int i = 0; i < queries; i++) {
|
|
|
+ conn.preparedQuery(SELECT_WORLD, Tuple.of(randomWorld()), query -> {
|
|
|
+ if (query.succeeded()) {
|
|
|
+ PgIterator rs = query.result().iterator();
|
|
|
+ Tuple row = rs.next();
|
|
|
+ World world = new World(row.getInteger(0), randomWorld());
|
|
|
+ result[counter.get()] = world;
|
|
|
+ } else {
|
|
|
+ conn.close();
|
|
|
+ if (failed.compareAndSet(false, true)) {
|
|
|
+ ctx.sendError(query.cause());
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- client.preparedBatch(UPDATE_WORLD, batch, ar -> {
|
|
|
- if (ar.failed()) {
|
|
|
- ctx.sendError(ar.cause());
|
|
|
- } else {
|
|
|
- try {
|
|
|
- ctx.setResponseType(JSON)
|
|
|
- .send(mapper.writeValueAsBytes(result));
|
|
|
- } catch (IOException x) {
|
|
|
- ctx.sendError(x);
|
|
|
- }
|
|
|
+ if (counter.incrementAndGet() == queries && !failed.get()) {
|
|
|
+ List<Tuple> batch = new ArrayList<>(queries);
|
|
|
+ for (World world : result) {
|
|
|
+ batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
|
|
|
}
|
|
|
- });
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
+
|
|
|
+ conn.preparedBatch(UPDATE_WORLD, batch, update -> {
|
|
|
+ conn.close();
|
|
|
+ if (update.failed()) {
|
|
|
+ ctx.sendError(update.cause());
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ ctx.setResponseType(JSON)
|
|
|
+ .send(mapper.writeValueAsBytes(result));
|
|
|
+ } catch (IOException x) {
|
|
|
+ ctx.sendError(x);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ });
|
|
|
return ctx;
|
|
|
});
|
|
|
|
|
|
/** Fortunes: */
|
|
|
get("/fortunes", ctx -> {
|
|
|
- pool.next().preparedQuery(SELECT_FORTUNE, rsp -> {
|
|
|
+ clients.next().preparedQuery(SELECT_FORTUNE, rsp -> {
|
|
|
if (rsp.succeeded()) {
|
|
|
PgIterator rs = rsp.result().iterator();
|
|
|
List<Fortune> fortunes = new ArrayList<>();
|
|
@@ -161,7 +176,7 @@ public class ReactivePg extends Jooby {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private PgClients database(Config config) {
|
|
|
+ private PgPoolOptions pgPoolOptions(Config config) {
|
|
|
PgPoolOptions options = new PgPoolOptions();
|
|
|
options.setDatabase(config.getString("databaseName"));
|
|
|
options.setHost(config.getString("serverName"));
|
|
@@ -169,8 +184,7 @@ public class ReactivePg extends Jooby {
|
|
|
options.setUser(config.getString("user"));
|
|
|
options.setPassword(config.getString("password"));
|
|
|
options.setCachePreparedStatements(true);
|
|
|
- options.setMaxSize(1);
|
|
|
- return PgClients.create(options);
|
|
|
+ return options;
|
|
|
}
|
|
|
|
|
|
public static void main(String[] args) {
|