|
@@ -2,7 +2,8 @@ package vertx;
|
|
|
|
|
|
import com.fizzed.rocker.ContentType;
|
|
|
import com.fizzed.rocker.RockerOutputFactory;
|
|
|
-import io.reactiverse.pgclient.*;
|
|
|
+import io.netty.util.concurrent.MultithreadEventExecutorGroup;
|
|
|
+import io.vertx.pgclient.*;
|
|
|
import io.vertx.core.*;
|
|
|
import io.vertx.core.buffer.Buffer;
|
|
|
import io.vertx.core.http.HttpHeaders;
|
|
@@ -15,6 +16,15 @@ import io.vertx.core.json.JsonArray;
|
|
|
import io.vertx.core.json.JsonObject;
|
|
|
import io.vertx.core.logging.Logger;
|
|
|
import io.vertx.core.logging.LoggerFactory;
|
|
|
+import io.vertx.sqlclient.PoolOptions;
|
|
|
+import io.vertx.sqlclient.PreparedQuery;
|
|
|
+import io.vertx.sqlclient.PreparedStatement;
|
|
|
+import io.vertx.sqlclient.Row;
|
|
|
+import io.vertx.sqlclient.RowIterator;
|
|
|
+import io.vertx.sqlclient.RowSet;
|
|
|
+import io.vertx.sqlclient.Tuple;
|
|
|
+import io.vertx.sqlclient.impl.SqlClientInternal;
|
|
|
+import io.vertx.sqlclient.impl.command.CompositeCommand;
|
|
|
import vertx.model.Fortune;
|
|
|
import vertx.model.Message;
|
|
|
import vertx.model.World;
|
|
@@ -65,12 +75,14 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
private static final String PATH_UPDATES = "/updates";
|
|
|
private static final String PATH_FORTUNES = "/fortunes";
|
|
|
|
|
|
+ private static final Handler<AsyncResult<Void>> NULL_HANDLER = null;
|
|
|
+
|
|
|
private static final CharSequence RESPONSE_TYPE_PLAIN = HttpHeaders.createOptimized("text/plain");
|
|
|
private static final CharSequence RESPONSE_TYPE_HTML = HttpHeaders.createOptimized("text/html; charset=UTF-8");
|
|
|
private static final CharSequence RESPONSE_TYPE_JSON = HttpHeaders.createOptimized("application/json");
|
|
|
|
|
|
private static final String HELLO_WORLD = "Hello, world!";
|
|
|
- private static final Buffer HELLO_WORLD_BUFFER = Buffer.factory.directBuffer(HELLO_WORLD, "UTF-8");
|
|
|
+ private static final Buffer HELLO_WORLD_BUFFER = Buffer.buffer(HELLO_WORLD, "UTF-8");
|
|
|
|
|
|
private static final CharSequence HEADER_SERVER = HttpHeaders.createOptimized("server");
|
|
|
private static final CharSequence HEADER_DATE = HttpHeaders.createOptimized("date");
|
|
@@ -86,8 +98,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
|
|
|
private HttpServer server;
|
|
|
|
|
|
- private PgClient client;
|
|
|
- private PgPool pool;
|
|
|
+ private SqlClientInternal client;
|
|
|
|
|
|
private CharSequence dateString;
|
|
|
|
|
@@ -95,12 +106,16 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
|
|
|
private final RockerOutputFactory<BufferRockerOutput> factory = BufferRockerOutput.factory(ContentType.RAW);
|
|
|
|
|
|
+ private PreparedQuery<RowSet<Row>> SELECT_WORLD_QUERY;
|
|
|
+ private PreparedQuery<RowSet<Row>> SELECT_FORTUNE_QUERY;
|
|
|
+ private PreparedQuery<RowSet<Row>> UPDATE_WORLD_QUERY;
|
|
|
+
|
|
|
public static CharSequence createDateHeader() {
|
|
|
return HttpHeaders.createOptimized(DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now()));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void start() throws Exception {
|
|
|
+ public void start(Promise<Void> startPromise) throws Exception {
|
|
|
int port = 8080;
|
|
|
server = vertx.createHttpServer(new HttpServerOptions());
|
|
|
server.requestHandler(App.this).listen(port);
|
|
@@ -112,15 +127,24 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
HEADER_CONTENT_LENGTH, HELLO_WORLD_LENGTH };
|
|
|
JsonObject config = config();
|
|
|
vertx.setPeriodic(1000, id -> plaintextHeaders[5] = dateString = createDateHeader());
|
|
|
- PgPoolOptions options = new PgPoolOptions();
|
|
|
- options.setDatabase(config.getString("database"));
|
|
|
- options.setHost(config.getString("host"));
|
|
|
+ PgConnectOptions options = new PgConnectOptions();
|
|
|
+ options.setDatabase(config.getString("database", "hello_world"));
|
|
|
+ options.setHost(config.getString("host", "tfb-database"));
|
|
|
options.setPort(config.getInteger("port", 5432));
|
|
|
- options.setUser(config.getString("username"));
|
|
|
- options.setPassword(config.getString("password"));
|
|
|
+ options.setUser(config.getString("username", "benchmarkdbuser"));
|
|
|
+ options.setPassword(config.getString("password", "benchmarkdbpass"));
|
|
|
options.setCachePreparedStatements(true);
|
|
|
- client = PgClient.pool(vertx, new PgPoolOptions(options).setMaxSize(1));
|
|
|
- pool = PgClient.pool(vertx, new PgPoolOptions(options).setMaxSize(4));
|
|
|
+ options.setPipeliningLimit(100_000); // Large pipelining means less flushing and we use a single connection anyway
|
|
|
+ PgConnection.connect(vertx, options).flatMap(conn -> {
|
|
|
+ client = (SqlClientInternal)conn;
|
|
|
+ Future<PreparedStatement> f1 = conn.prepare(SELECT_WORLD);
|
|
|
+ Future<PreparedStatement> f2 = conn.prepare(SELECT_FORTUNE);
|
|
|
+ Future<PreparedStatement> f3 = conn.prepare(UPDATE_WORLD);
|
|
|
+ f1.onSuccess(ps -> SELECT_WORLD_QUERY = ps.query());
|
|
|
+ f2.onSuccess(ps -> SELECT_FORTUNE_QUERY = ps.query());
|
|
|
+ f3.onSuccess(ps -> UPDATE_WORLD_QUERY = ps.query());
|
|
|
+ return CompositeFuture.all(f1, f2, f3);
|
|
|
+ }).onComplete(ar -> startPromise.complete());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -136,7 +160,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
handleDb(request);
|
|
|
break;
|
|
|
case PATH_QUERIES:
|
|
|
- new Queries().handle(request);
|
|
|
+ new Queries(request).handle();
|
|
|
break;
|
|
|
case PATH_UPDATES:
|
|
|
new Update(request).handle();
|
|
@@ -162,7 +186,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
for (int i = 0;i < plaintextHeaders.length; i+= 2) {
|
|
|
headers.add(plaintextHeaders[i], plaintextHeaders[i + 1]);
|
|
|
}
|
|
|
- response.end(HELLO_WORLD_BUFFER);
|
|
|
+ response.end(HELLO_WORLD_BUFFER, NULL_HANDLER);
|
|
|
}
|
|
|
|
|
|
private void handleJson(HttpServerRequest request) {
|
|
@@ -172,7 +196,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
.add(HEADER_CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
|
.add(HEADER_SERVER, SERVER)
|
|
|
.add(HEADER_DATE, dateString);
|
|
|
- response.end(new Message("Hello, World!").toBuffer());
|
|
|
+ response.end(new Message("Hello, World!").toBuffer(), NULL_HANDLER);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -187,19 +211,19 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
|
|
|
private void handleDb(HttpServerRequest req) {
|
|
|
HttpServerResponse resp = req.response();
|
|
|
- client.preparedQuery(SELECT_WORLD, Tuple.of(randomWorld()), res -> {
|
|
|
+ SELECT_WORLD_QUERY.execute(Tuple.of(randomWorld()), res -> {
|
|
|
if (res.succeeded()) {
|
|
|
- PgIterator resultSet = res.result().iterator();
|
|
|
+ RowIterator<Row> resultSet = res.result().iterator();
|
|
|
if (!resultSet.hasNext()) {
|
|
|
resp.setStatusCode(404).end();
|
|
|
return;
|
|
|
}
|
|
|
- Tuple row = resultSet.next();
|
|
|
+ Row row = resultSet.next();
|
|
|
resp
|
|
|
.putHeader(HttpHeaders.SERVER, SERVER)
|
|
|
.putHeader(HttpHeaders.DATE, dateString)
|
|
|
.putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
|
- .end(Json.encode(new World(row.getInteger(0), row.getInteger(1))));
|
|
|
+ .end(Json.encode(new World(row.getInteger(0), row.getInteger(1))), NULL_HANDLER);
|
|
|
} else {
|
|
|
logger.error(res.cause());
|
|
|
resp.setStatusCode(500).end(res.cause().getMessage());
|
|
@@ -207,37 +231,50 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- class Queries {
|
|
|
+
|
|
|
+ class Queries implements Handler<AsyncResult<RowSet<Row>>> {
|
|
|
|
|
|
boolean failed;
|
|
|
JsonArray worlds = new JsonArray();
|
|
|
+ final HttpServerRequest req;
|
|
|
+ final HttpServerResponse resp;
|
|
|
+ final int queries;
|
|
|
|
|
|
- private void handle(HttpServerRequest req) {
|
|
|
- HttpServerResponse resp = req.response();
|
|
|
- final int queries = getQueries(req);
|
|
|
- for (int i = 0; i < queries; i++) {
|
|
|
- client.preparedQuery(SELECT_WORLD, Tuple.of(randomWorld()), ar -> {
|
|
|
- if (!failed) {
|
|
|
- if (ar.failed()) {
|
|
|
- failed = true;
|
|
|
- resp.setStatusCode(500).end(ar.cause().getMessage());
|
|
|
- return;
|
|
|
- }
|
|
|
+ public Queries(HttpServerRequest req) {
|
|
|
+ this.req = req;
|
|
|
+ this.resp = req.response();
|
|
|
+ this.queries = getQueries(req);
|
|
|
+ }
|
|
|
|
|
|
- // we need a final reference
|
|
|
- final Tuple row = ar.result().iterator().next();
|
|
|
- worlds.add(new JsonObject().put("id", "" + row.getInteger(0)).put("randomNumber", "" + row.getInteger(1)));
|
|
|
-
|
|
|
- // stop condition
|
|
|
- if (worlds.size() == queries) {
|
|
|
- resp
|
|
|
- .putHeader(HttpHeaders.SERVER, SERVER)
|
|
|
- .putHeader(HttpHeaders.DATE, dateString)
|
|
|
- .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
|
- .end(worlds.encode());
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
+ private void handle() {
|
|
|
+ client.group(c -> {
|
|
|
+ for (int i = 0; i < queries; i++) {
|
|
|
+ c.preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld()), this);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handle(AsyncResult<RowSet<Row>> ar) {
|
|
|
+ if (!failed) {
|
|
|
+ if (ar.failed()) {
|
|
|
+ failed = true;
|
|
|
+ resp.setStatusCode(500).end(ar.cause().getMessage());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // we need a final reference
|
|
|
+ final Tuple row = ar.result().iterator().next();
|
|
|
+ worlds.add(new JsonObject().put("id", "" + row.getInteger(0)).put("randomNumber", "" + row.getInteger(1)));
|
|
|
+
|
|
|
+ // stop condition
|
|
|
+ if (worlds.size() == queries) {
|
|
|
+ resp
|
|
|
+ .putHeader(HttpHeaders.SERVER, SERVER)
|
|
|
+ .putHeader(HttpHeaders.DATE, dateString)
|
|
|
+ .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
|
+ .end(worlds.encode(), NULL_HANDLER);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -257,27 +294,21 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
|
|
|
private void handle() {
|
|
|
|
|
|
- pool.getConnection(ar1 -> {
|
|
|
- if (ar1.failed()) {
|
|
|
- failed = true;
|
|
|
- sendError(ar1.cause());
|
|
|
- return;
|
|
|
- }
|
|
|
- PgConnection conn = ar1.result();
|
|
|
+ client.group(c -> {
|
|
|
+ PreparedQuery<RowSet<Row>> preparedQuery = c.preparedQuery(SELECT_WORLD);
|
|
|
for (int i = 0; i < worlds.length; i++) {
|
|
|
int id = randomWorld();
|
|
|
int index = i;
|
|
|
- conn.preparedQuery(SELECT_WORLD, Tuple.of(id), ar2 -> {
|
|
|
+ preparedQuery.execute(Tuple.of(id), ar2 -> {
|
|
|
if (!failed) {
|
|
|
if (ar2.failed()) {
|
|
|
- conn.close();
|
|
|
failed = true;
|
|
|
sendError(ar2.cause());
|
|
|
return;
|
|
|
}
|
|
|
worlds[index] = new World(ar2.result().iterator().next().getInteger(0), randomWorld());
|
|
|
if (++queryCount == worlds.length) {
|
|
|
- handleUpdates(conn);
|
|
|
+ handleUpdates();
|
|
|
}
|
|
|
}
|
|
|
});
|
|
@@ -285,16 +316,15 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- void handleUpdates(PgConnection conn) {
|
|
|
+ void handleUpdates() {
|
|
|
Arrays.sort(worlds);
|
|
|
List<Tuple> batch = new ArrayList<>();
|
|
|
for (World world : worlds) {
|
|
|
batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
|
|
|
}
|
|
|
- conn.preparedBatch(UPDATE_WORLD, batch, ar -> {
|
|
|
- conn.close();
|
|
|
- if (ar.failed()) {
|
|
|
- sendError(ar.cause());
|
|
|
+ UPDATE_WORLD_QUERY.executeBatch(batch, ar2 -> {
|
|
|
+ if (ar2.failed()) {
|
|
|
+ sendError(ar2.cause());
|
|
|
return;
|
|
|
}
|
|
|
JsonArray json = new JsonArray();
|
|
@@ -305,7 +335,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
.putHeader(HttpHeaders.SERVER, SERVER)
|
|
|
.putHeader(HttpHeaders.DATE, dateString)
|
|
|
.putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
|
- .end(json.toBuffer());
|
|
|
+ .end(json.toBuffer(), NULL_HANDLER);
|
|
|
});
|
|
|
}
|
|
|
|
|
@@ -316,17 +346,17 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
}
|
|
|
|
|
|
private void handleFortunes(HttpServerRequest req) {
|
|
|
- client.preparedQuery(SELECT_FORTUNE, ar -> {
|
|
|
+ SELECT_FORTUNE_QUERY.execute(ar -> {
|
|
|
HttpServerResponse response = req.response();
|
|
|
if (ar.succeeded()) {
|
|
|
List<Fortune> fortunes = new ArrayList<>();
|
|
|
- PgIterator resultSet = ar.result().iterator();
|
|
|
+ RowIterator<Row> resultSet = ar.result().iterator();
|
|
|
if (!resultSet.hasNext()) {
|
|
|
response.setStatusCode(404).end("No results");
|
|
|
return;
|
|
|
}
|
|
|
while (resultSet.hasNext()) {
|
|
|
- Tuple row = resultSet.next();
|
|
|
+ Row row = resultSet.next();
|
|
|
fortunes.add(new Fortune(row.getInteger(0), row.getString(1)));
|
|
|
}
|
|
|
fortunes.add(new Fortune(0, "Additional fortune added at request time."));
|
|
@@ -335,7 +365,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
.putHeader(HttpHeaders.SERVER, SERVER)
|
|
|
.putHeader(HttpHeaders.DATE, dateString)
|
|
|
.putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_HTML)
|
|
|
- .end(FortunesTemplate.template(fortunes).render(factory).buffer());
|
|
|
+ .end(FortunesTemplate.template(fortunes).render(factory).buffer(), NULL_HANDLER);
|
|
|
} else {
|
|
|
Throwable err = ar.cause();
|
|
|
logger.error("", err);
|
|
@@ -345,14 +375,24 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
}
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
+
|
|
|
+ int eventLoopPoolSize = VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE;
|
|
|
+ String sizeProp = System.getProperty("vertx.eventLoopPoolSize");
|
|
|
+ if (sizeProp != null) {
|
|
|
+ try {
|
|
|
+ eventLoopPoolSize = Integer.parseInt(sizeProp);
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
JsonObject config = new JsonObject(new String(Files.readAllBytes(new File(args[0]).toPath())));
|
|
|
- Vertx vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
|
|
|
+ Vertx vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(eventLoopPoolSize).setPreferNativeTransport(true));
|
|
|
vertx.exceptionHandler(err -> {
|
|
|
err.printStackTrace();
|
|
|
});
|
|
|
printConfig(vertx);
|
|
|
vertx.deployVerticle(App.class.getName(),
|
|
|
- new DeploymentOptions().setInstances(VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE).setConfig(config), event -> {
|
|
|
+ new DeploymentOptions().setInstances(eventLoopPoolSize).setConfig(config), event -> {
|
|
|
if (event.succeeded()) {
|
|
|
logger.info("Server listening on port " + 8080);
|
|
|
} else {
|
|
@@ -383,7 +423,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
logger.error("Could not read Vertx version", e);;
|
|
|
}
|
|
|
logger.info("Vertx: " + version);
|
|
|
- logger.info("Event Loop Size: " + VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE);
|
|
|
+ logger.info("Event Loop Size: " + ((MultithreadEventExecutorGroup)vertx.nettyEventLoopGroup()).executorCount());
|
|
|
logger.info("Native transport : " + nativeTransport);
|
|
|
}
|
|
|
}
|