|
@@ -4,7 +4,6 @@ import static com.techempower.Util.randomWorld;
|
|
import static io.jooby.ExecutionMode.EVENT_LOOP;
|
|
import static io.jooby.ExecutionMode.EVENT_LOOP;
|
|
import static io.jooby.MediaType.JSON;
|
|
import static io.jooby.MediaType.JSON;
|
|
|
|
|
|
-import java.nio.ByteBuffer;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
@@ -18,18 +17,12 @@ import io.jooby.MediaType;
|
|
import io.jooby.ServerOptions;
|
|
import io.jooby.ServerOptions;
|
|
import io.jooby.rocker.ByteBufferOutput;
|
|
import io.jooby.rocker.ByteBufferOutput;
|
|
import io.jooby.rocker.RockerModule;
|
|
import io.jooby.rocker.RockerModule;
|
|
-import io.vertx.pgclient.PgPool;
|
|
|
|
import io.vertx.sqlclient.Row;
|
|
import io.vertx.sqlclient.Row;
|
|
import io.vertx.sqlclient.RowIterator;
|
|
import io.vertx.sqlclient.RowIterator;
|
|
-import io.vertx.sqlclient.SqlConnection;
|
|
|
|
import io.vertx.sqlclient.Tuple;
|
|
import io.vertx.sqlclient.Tuple;
|
|
|
|
|
|
public class ReactivePg extends Jooby {
|
|
public class ReactivePg extends Jooby {
|
|
|
|
|
|
- private static final String UPDATE_WORLD = "UPDATE world SET randomnumber=$1 WHERE id=$2";
|
|
|
|
- private static final String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
|
|
|
|
- private static final String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
|
|
|
|
-
|
|
|
|
{
|
|
{
|
|
/** Reduce the number of resources due we do reactive processing. */
|
|
/** Reduce the number of resources due we do reactive processing. */
|
|
setServerOptions(
|
|
setServerOptions(
|
|
@@ -39,14 +32,14 @@ public class ReactivePg extends Jooby {
|
|
);
|
|
);
|
|
|
|
|
|
/** PG client: */
|
|
/** PG client: */
|
|
- PgClients clients = new PgClients(getConfig().getConfig("db"));
|
|
|
|
|
|
+ PgClient client = new PgClient(getConfig().getConfig("db"));
|
|
|
|
|
|
/** Template engine: */
|
|
/** Template engine: */
|
|
install(new RockerModule().reuseBuffer(true));
|
|
install(new RockerModule().reuseBuffer(true));
|
|
|
|
|
|
/** Single query: */
|
|
/** Single query: */
|
|
get("/db", ctx -> {
|
|
get("/db", ctx -> {
|
|
- clients.next().preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld()), rsp -> {
|
|
|
|
|
|
+ client.selectWorld(Tuple.of(randomWorld()), rsp -> {
|
|
if (rsp.succeeded()) {
|
|
if (rsp.succeeded()) {
|
|
RowIterator<Row> rs = rsp.result().iterator();
|
|
RowIterator<Row> rs = rsp.result().iterator();
|
|
Row row = rs.next();
|
|
Row row = rs.next();
|
|
@@ -57,84 +50,71 @@ public class ReactivePg extends Jooby {
|
|
}
|
|
}
|
|
});
|
|
});
|
|
return ctx;
|
|
return ctx;
|
|
- });
|
|
|
|
|
|
+ }).setNonBlocking(true);
|
|
|
|
|
|
/** Multiple queries: */
|
|
/** Multiple queries: */
|
|
get("/queries", ctx -> {
|
|
get("/queries", ctx -> {
|
|
int queries = Util.queries(ctx);
|
|
int queries = Util.queries(ctx);
|
|
- AtomicInteger counter = new AtomicInteger();
|
|
|
|
World[] result = new World[queries];
|
|
World[] result = new World[queries];
|
|
- PgPool client = clients.next();
|
|
|
|
- for (int i = 0; i < result.length; i++) {
|
|
|
|
- client.preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld()), rsp -> {
|
|
|
|
|
|
+ client.selectWorldQuery(queries, (index, statement) -> {
|
|
|
|
+ statement.execute(Tuple.of(randomWorld()), rsp -> {
|
|
if (rsp.succeeded()) {
|
|
if (rsp.succeeded()) {
|
|
RowIterator<Row> rs = rsp.result().iterator();
|
|
RowIterator<Row> rs = rsp.result().iterator();
|
|
Row row = rs.next();
|
|
Row row = rs.next();
|
|
- result[counter.get()] = new World(row.getInteger(0), row.getInteger(1));
|
|
|
|
|
|
+ result[index] = new World(row.getInteger(0), row.getInteger(1));
|
|
} else {
|
|
} else {
|
|
sendError(ctx, rsp.cause());
|
|
sendError(ctx, rsp.cause());
|
|
}
|
|
}
|
|
// ready?
|
|
// ready?
|
|
- if (counter.incrementAndGet() == queries) {
|
|
|
|
|
|
+ if (index == queries - 1) {
|
|
ctx.setResponseType(JSON)
|
|
ctx.setResponseType(JSON)
|
|
.send(Json.encode(result));
|
|
.send(Json.encode(result));
|
|
}
|
|
}
|
|
});
|
|
});
|
|
- }
|
|
|
|
|
|
+ });
|
|
return ctx;
|
|
return ctx;
|
|
- });
|
|
|
|
|
|
+ }).setNonBlocking(true);
|
|
|
|
|
|
/** Update queries: */
|
|
/** Update queries: */
|
|
get("/updates", ctx -> {
|
|
get("/updates", ctx -> {
|
|
int queries = Util.queries(ctx);
|
|
int queries = Util.queries(ctx);
|
|
World[] result = new World[queries];
|
|
World[] result = new World[queries];
|
|
- AtomicInteger counter = new AtomicInteger(0);
|
|
|
|
- PgPool pool = clients.next();
|
|
|
|
- pool.getConnection(connectCallback -> {
|
|
|
|
- if (connectCallback.failed()) {
|
|
|
|
- sendError(ctx, connectCallback.cause());
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- SqlConnection conn = connectCallback.result();
|
|
|
|
- for (int i = 0; i < queries; i++) {
|
|
|
|
- int id = randomWorld();
|
|
|
|
- conn.preparedQuery(SELECT_WORLD).execute(Tuple.of(id), selectCallback -> {
|
|
|
|
- if (selectCallback.failed()) {
|
|
|
|
- conn.close();
|
|
|
|
- sendError(ctx, selectCallback.cause());
|
|
|
|
- return;
|
|
|
|
|
|
+ client.selectWorldForUpdate(queries, (index, statement) -> {
|
|
|
|
+ int id = randomWorld();
|
|
|
|
+ statement.execute(Tuple.of(id), selectCallback -> {
|
|
|
|
+ if (selectCallback.failed()) {
|
|
|
|
+ sendError(ctx, selectCallback.cause());
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ result[index] = new World(
|
|
|
|
+ selectCallback.result().iterator().next().getInteger(0),
|
|
|
|
+ randomWorld());
|
|
|
|
+ if (index == queries - 1) {
|
|
|
|
+ // Sort results... avoid dead locks
|
|
|
|
+ Arrays.sort(result);
|
|
|
|
+ List<Tuple> batch = new ArrayList<>(queries);
|
|
|
|
+ for (World world : result) {
|
|
|
|
+ batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
|
|
}
|
|
}
|
|
- result[counter.get()] = new World(
|
|
|
|
- selectCallback.result().iterator().next().getInteger(0),
|
|
|
|
- randomWorld());
|
|
|
|
- if (counter.incrementAndGet() == queries) {
|
|
|
|
- // Sort results... avoid dead locks
|
|
|
|
- Arrays.sort(result);
|
|
|
|
- List<Tuple> batch = new ArrayList<>(queries);
|
|
|
|
- for (World world : result) {
|
|
|
|
- batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
|
|
|
|
- }
|
|
|
|
|
|
|
|
- conn.preparedQuery(UPDATE_WORLD).executeBatch(batch, updateCallback -> {
|
|
|
|
- if (updateCallback.failed()) {
|
|
|
|
- sendError(ctx, updateCallback.cause());
|
|
|
|
- } else {
|
|
|
|
- ctx.setResponseType(JSON)
|
|
|
|
- .send(Json.encode(result));
|
|
|
|
- }
|
|
|
|
- conn.close();
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- }
|
|
|
|
|
|
+ client.updateWorld(batch, updateCallback -> {
|
|
|
|
+ if (updateCallback.failed()) {
|
|
|
|
+ sendError(ctx, updateCallback.cause());
|
|
|
|
+ } else {
|
|
|
|
+ ctx.setResponseType(JSON)
|
|
|
|
+ .send(Json.encode(result));
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ });
|
|
});
|
|
});
|
|
return ctx;
|
|
return ctx;
|
|
- });
|
|
|
|
|
|
+ }).setNonBlocking(true);
|
|
|
|
|
|
/** Fortunes: */
|
|
/** Fortunes: */
|
|
RockerOutputFactory<ByteBufferOutput> factory = require(RockerOutputFactory.class);
|
|
RockerOutputFactory<ByteBufferOutput> factory = require(RockerOutputFactory.class);
|
|
get("/fortunes", ctx -> {
|
|
get("/fortunes", ctx -> {
|
|
- clients.next().preparedQuery(SELECT_FORTUNE).execute(rsp -> {
|
|
|
|
|
|
+ client.fortuneQuery().execute(rsp -> {
|
|
if (rsp.succeeded()) {
|
|
if (rsp.succeeded()) {
|
|
RowIterator<Row> rs = rsp.result().iterator();
|
|
RowIterator<Row> rs = rsp.result().iterator();
|
|
List<Fortune> fortunes = new ArrayList<>();
|
|
List<Fortune> fortunes = new ArrayList<>();
|
|
@@ -152,11 +132,11 @@ public class ReactivePg extends Jooby {
|
|
ctx.setResponseType(MediaType.html)
|
|
ctx.setResponseType(MediaType.html)
|
|
.send(template.render(factory).toBuffer());
|
|
.send(template.render(factory).toBuffer());
|
|
} else {
|
|
} else {
|
|
- ctx.sendError(rsp.cause());
|
|
|
|
|
|
+ sendError(ctx, rsp.cause());
|
|
}
|
|
}
|
|
});
|
|
});
|
|
return ctx;
|
|
return ctx;
|
|
- });
|
|
|
|
|
|
+ }).setNonBlocking(true);
|
|
}
|
|
}
|
|
|
|
|
|
private void sendError(Context ctx, Throwable cause) {
|
|
private void sendError(Context ctx, Throwable cause) {
|