|
@@ -12,17 +12,12 @@ import io.vertx.core.http.HttpServer;
|
|
import io.vertx.core.http.HttpServerOptions;
|
|
import io.vertx.core.http.HttpServerOptions;
|
|
import io.vertx.core.http.HttpServerRequest;
|
|
import io.vertx.core.http.HttpServerRequest;
|
|
import io.vertx.core.http.HttpServerResponse;
|
|
import io.vertx.core.http.HttpServerResponse;
|
|
-import io.vertx.core.json.JsonArray;
|
|
|
|
import io.vertx.core.json.JsonObject;
|
|
import io.vertx.core.json.JsonObject;
|
|
import io.vertx.core.logging.Logger;
|
|
import io.vertx.core.logging.Logger;
|
|
import io.vertx.core.logging.LoggerFactory;
|
|
import io.vertx.core.logging.LoggerFactory;
|
|
import io.vertx.sqlclient.*;
|
|
import io.vertx.sqlclient.*;
|
|
import io.vertx.sqlclient.impl.SqlClientInternal;
|
|
import io.vertx.sqlclient.impl.SqlClientInternal;
|
|
-import vertx.model.CachedWorld;
|
|
|
|
-import vertx.model.Fortune;
|
|
|
|
-import vertx.model.Message;
|
|
|
|
-import vertx.model.World;
|
|
|
|
-import vertx.model.WorldCache;
|
|
|
|
|
|
+import vertx.model.*;
|
|
import vertx.rocker.BufferRockerOutput;
|
|
import vertx.rocker.BufferRockerOutput;
|
|
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
@@ -38,9 +33,13 @@ import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
+import java.util.stream.IntStream;
|
|
|
|
|
|
public class App extends AbstractVerticle implements Handler<HttpServerRequest> {
|
|
public class App extends AbstractVerticle implements Handler<HttpServerRequest> {
|
|
|
|
|
|
|
|
+ private static final int NUM_PROCESSORS = Runtime.getRuntime().availableProcessors();
|
|
|
|
+ private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(App.class);
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Returns the value of the "queries" getRequest parameter, which is an integer
|
|
* Returns the value of the "queries" getRequest parameter, which is an integer
|
|
* bound between 1 and 500 with a default value of 1.
|
|
* bound between 1 and 500 with a default value of 1.
|
|
@@ -62,7 +61,9 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- static Logger logger = LoggerFactory.getLogger(App.class.getName());
|
|
|
|
|
|
+ private static Logger logger = LoggerFactory.getLogger(App.class.getName());
|
|
|
|
+
|
|
|
|
+ private static final Integer[] BOXED_RND = IntStream.range(1, 10001).boxed().toArray(Integer[]::new);
|
|
|
|
|
|
private static final String PATH_PLAINTEXT = "/plaintext";
|
|
private static final String PATH_PLAINTEXT = "/plaintext";
|
|
private static final String PATH_JSON = "/json";
|
|
private static final String PATH_JSON = "/json";
|
|
@@ -76,7 +77,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
|
|
|
private static final CharSequence RESPONSE_TYPE_PLAIN = HttpHeaders.createOptimized("text/plain");
|
|
private static final CharSequence RESPONSE_TYPE_PLAIN = HttpHeaders.createOptimized("text/plain");
|
|
private static final CharSequence RESPONSE_TYPE_HTML = HttpHeaders.createOptimized("text/html; charset=UTF-8");
|
|
private static final CharSequence RESPONSE_TYPE_HTML = HttpHeaders.createOptimized("text/html; charset=UTF-8");
|
|
- private static final CharSequence RESPONSE_TYPE_JSON = HttpHeaders.createOptimized("application/json");
|
|
|
|
|
|
+ static final CharSequence RESPONSE_TYPE_JSON = HttpHeaders.createOptimized("application/json");
|
|
|
|
|
|
private static final String HELLO_WORLD = "Hello, world!";
|
|
private static final String HELLO_WORLD = "Hello, world!";
|
|
private static final Buffer HELLO_WORLD_BUFFER = Buffer.buffer(HELLO_WORLD, "UTF-8");
|
|
private static final Buffer HELLO_WORLD_BUFFER = Buffer.buffer(HELLO_WORLD, "UTF-8");
|
|
@@ -89,14 +90,29 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
private static final CharSequence HELLO_WORLD_LENGTH = HttpHeaders.createOptimized("" + HELLO_WORLD.length());
|
|
private static final CharSequence HELLO_WORLD_LENGTH = HttpHeaders.createOptimized("" + HELLO_WORLD.length());
|
|
private static final CharSequence SERVER = HttpHeaders.createOptimized("vert.x");
|
|
private static final CharSequence SERVER = HttpHeaders.createOptimized("vert.x");
|
|
|
|
|
|
- private static final String UPDATE_WORLD = "UPDATE world SET randomnumber=$1 WHERE id=$2";
|
|
|
|
private static final String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
|
|
private static final String SELECT_WORLD = "SELECT id, randomnumber from WORLD where id=$1";
|
|
private static final String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
|
|
private static final String SELECT_FORTUNE = "SELECT id, message from FORTUNE";
|
|
private static final String SELECT_WORLDS = "SELECT id, randomnumber from WORLD";
|
|
private static final String SELECT_WORLDS = "SELECT id, randomnumber from WORLD";
|
|
|
|
|
|
|
|
+ public static CharSequence createDateHeader() {
|
|
|
|
+ return HttpHeaders.createOptimized(DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now()));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Returns a random integer that is a suitable value for both the {@code id}
|
|
|
|
+ * and {@code randomNumber} properties of a world object.
|
|
|
|
+ *
|
|
|
|
+ * @return a random world number
|
|
|
|
+ */
|
|
|
|
+ static Integer boxedRandomWorldNumber() {
|
|
|
|
+ final int rndValue = ThreadLocalRandom.current().nextInt(1, 10001);
|
|
|
|
+ final var boxedRnd = BOXED_RND[rndValue - 1];
|
|
|
|
+ assert boxedRnd.intValue() == rndValue;
|
|
|
|
+ return boxedRnd;
|
|
|
|
+ }
|
|
|
|
+
|
|
private HttpServer server;
|
|
private HttpServer server;
|
|
- private SqlClientInternal client1;
|
|
|
|
- private SqlClientInternal client2;
|
|
|
|
|
|
+ private SqlClientInternal client;
|
|
private CharSequence dateString;
|
|
private CharSequence dateString;
|
|
private CharSequence[] plaintextHeaders;
|
|
private CharSequence[] plaintextHeaders;
|
|
|
|
|
|
@@ -105,15 +121,10 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
private Throwable databaseErr;
|
|
private Throwable databaseErr;
|
|
private PreparedQuery<RowSet<Row>> SELECT_WORLD_QUERY;
|
|
private PreparedQuery<RowSet<Row>> SELECT_WORLD_QUERY;
|
|
private PreparedQuery<SqlResult<List<Fortune>>> SELECT_FORTUNE_QUERY;
|
|
private PreparedQuery<SqlResult<List<Fortune>>> SELECT_FORTUNE_QUERY;
|
|
- private PreparedQuery<RowSet<Row>> UPDATE_WORLD_QUERY;
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@SuppressWarnings("unchecked")
|
|
- private PreparedQuery<RowSet<Row>>[] AGGREGATED_UPDATE_WORLD_QUERY = new PreparedQuery[128];
|
|
|
|
|
|
+ private PreparedQuery<RowSet<Row>>[] AGGREGATED_UPDATE_WORLD_QUERY = new PreparedQuery[500];
|
|
private WorldCache WORLD_CACHE;
|
|
private WorldCache WORLD_CACHE;
|
|
|
|
|
|
- public static CharSequence createDateHeader() {
|
|
|
|
- return HttpHeaders.createOptimized(DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now()));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Override
|
|
@Override
|
|
public void start(Promise<Void> startPromise) throws Exception {
|
|
public void start(Promise<Void> startPromise) throws Exception {
|
|
int port = 8080;
|
|
int port = 8080;
|
|
@@ -134,6 +145,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
options.setUser(config.getString("username", "benchmarkdbuser"));
|
|
options.setUser(config.getString("username", "benchmarkdbuser"));
|
|
options.setPassword(config.getString("password", "benchmarkdbpass"));
|
|
options.setPassword(config.getString("password", "benchmarkdbpass"));
|
|
options.setCachePreparedStatements(true);
|
|
options.setCachePreparedStatements(true);
|
|
|
|
+ options.setPreparedStatementCacheMaxSize(1024);
|
|
options.setPipeliningLimit(100_000); // Large pipelining means less flushing and we use a single connection anyway
|
|
options.setPipeliningLimit(100_000); // Large pipelining means less flushing and we use a single connection anyway
|
|
Future<?> clientsInit = initClients(options);
|
|
Future<?> clientsInit = initClients(options);
|
|
clientsInit
|
|
clientsInit
|
|
@@ -146,9 +158,9 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
}
|
|
}
|
|
|
|
|
|
private Future<?> initClients(PgConnectOptions options) {
|
|
private Future<?> initClients(PgConnectOptions options) {
|
|
- Future<?> cf1 = PgConnection.connect(vertx, options)
|
|
|
|
|
|
+ return PgConnection.connect(vertx, options)
|
|
.flatMap(conn -> {
|
|
.flatMap(conn -> {
|
|
- client1 = (SqlClientInternal) conn;
|
|
|
|
|
|
+ client = (SqlClientInternal) conn;
|
|
List<Future<?>> list = new ArrayList<>();
|
|
List<Future<?>> list = new ArrayList<>();
|
|
Future<PreparedStatement> f1 = conn.prepare(SELECT_WORLD)
|
|
Future<PreparedStatement> f1 = conn.prepare(SELECT_WORLD)
|
|
.andThen(onSuccess(ps -> SELECT_WORLD_QUERY = ps.query()));
|
|
.andThen(onSuccess(ps -> SELECT_WORLD_QUERY = ps.query()));
|
|
@@ -165,15 +177,6 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
.map(worlds -> new WorldCache(worlds.value()))
|
|
.map(worlds -> new WorldCache(worlds.value()))
|
|
.andThen(onSuccess(wc -> WORLD_CACHE = wc));
|
|
.andThen(onSuccess(wc -> WORLD_CACHE = wc));
|
|
list.add(f3);
|
|
list.add(f3);
|
|
- return Future.join(list);
|
|
|
|
- });
|
|
|
|
- Future<?> cf2 = PgConnection.connect(vertx, options)
|
|
|
|
- .flatMap(conn -> {
|
|
|
|
- client2 = (SqlClientInternal) conn;
|
|
|
|
- List<Future<?>> list = new ArrayList<>();
|
|
|
|
- Future<PreparedStatement> f1 = conn.prepare(UPDATE_WORLD)
|
|
|
|
- .andThen(onSuccess(ps -> UPDATE_WORLD_QUERY = ps.query()));
|
|
|
|
- list.add(f1);
|
|
|
|
for (int i = 0; i < AGGREGATED_UPDATE_WORLD_QUERY.length; i++) {
|
|
for (int i = 0; i < AGGREGATED_UPDATE_WORLD_QUERY.length; i++) {
|
|
int idx = i;
|
|
int idx = i;
|
|
Future<PreparedStatement> fut = conn
|
|
Future<PreparedStatement> fut = conn
|
|
@@ -183,22 +186,26 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
}
|
|
}
|
|
return Future.join(list);
|
|
return Future.join(list);
|
|
});
|
|
});
|
|
- return Future.join(cf1, cf2);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private static String buildAggregatedUpdateQuery(int len) {
|
|
private static String buildAggregatedUpdateQuery(int len) {
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
|
- sb.append("UPDATE world SET randomNumber = update_data.randomNumber FROM (VALUES");
|
|
|
|
- char sep = ' ';
|
|
|
|
- for (int i = 1;i <= len;i++) {
|
|
|
|
- sb.append(sep).append("($").append(2 * i - 1).append("::int,$").append(2 * i).append("::int)");
|
|
|
|
- sep = ',';
|
|
|
|
|
|
+ StringBuilder sql = new StringBuilder();
|
|
|
|
+ sql.append("UPDATE WORLD SET RANDOMNUMBER = CASE ID");
|
|
|
|
+ for (int i = 0; i < len; i++) {
|
|
|
|
+ int offset = (i * 2) + 1;
|
|
|
|
+ sql.append(" WHEN $").append(offset).append(" THEN $").append(offset + 1);
|
|
}
|
|
}
|
|
- sb.append(") AS update_data (id, randomNumber) WHERE world.id = update_data.id");
|
|
|
|
- return sb.toString();
|
|
|
|
|
|
+ sql.append(" ELSE RANDOMNUMBER");
|
|
|
|
+ sql.append(" END WHERE ID IN ($1");
|
|
|
|
+ for (int i = 1; i < len; i++) {
|
|
|
|
+ int offset = (i * 2) + 1;
|
|
|
|
+ sql.append(",$").append(offset);
|
|
|
|
+ }
|
|
|
|
+ sql.append(")");
|
|
|
|
+ return sql.toString();
|
|
}
|
|
}
|
|
|
|
|
|
- private static <T> Handler<AsyncResult<T>> onSuccess(Handler<T> handler) {
|
|
|
|
|
|
+ public static <T> Handler<AsyncResult<T>> onSuccess(Handler<T> handler) {
|
|
return ar -> {
|
|
return ar -> {
|
|
if (ar.succeeded()) {
|
|
if (ar.succeeded()) {
|
|
handler.handle(ar.result());
|
|
handler.handle(ar.result());
|
|
@@ -232,8 +239,9 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
handleCaching(request);
|
|
handleCaching(request);
|
|
break;
|
|
break;
|
|
default:
|
|
default:
|
|
- request.response().setStatusCode(404);
|
|
|
|
- request.response().end();
|
|
|
|
|
|
+ request.response()
|
|
|
|
+ .setStatusCode(404)
|
|
|
|
+ .end();
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -267,22 +275,12 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
.add(HEADER_CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
.add(HEADER_CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
.add(HEADER_SERVER, SERVER)
|
|
.add(HEADER_SERVER, SERVER)
|
|
.add(HEADER_DATE, dateString);
|
|
.add(HEADER_DATE, dateString);
|
|
- response.end(new Message("Hello, World!").toBuffer(), NULL_HANDLER);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Returns a random integer that is a suitable value for both the {@code id}
|
|
|
|
- * and {@code randomNumber} properties of a world object.
|
|
|
|
- *
|
|
|
|
- * @return a random world number
|
|
|
|
- */
|
|
|
|
- private static int randomWorld() {
|
|
|
|
- return 1 + ThreadLocalRandom.current().nextInt(10000);
|
|
|
|
|
|
+ response.end(new Message("Hello, World!").toJson(), NULL_HANDLER);
|
|
}
|
|
}
|
|
|
|
|
|
private void handleDb(HttpServerRequest req) {
|
|
private void handleDb(HttpServerRequest req) {
|
|
HttpServerResponse resp = req.response();
|
|
HttpServerResponse resp = req.response();
|
|
- SELECT_WORLD_QUERY.execute(Tuple.of(randomWorld()), res -> {
|
|
|
|
|
|
+ SELECT_WORLD_QUERY.execute(Tuple.of(boxedRandomWorldNumber()), res -> {
|
|
if (res.succeeded()) {
|
|
if (res.succeeded()) {
|
|
RowIterator<Row> resultSet = res.result().iterator();
|
|
RowIterator<Row> resultSet = res.result().iterator();
|
|
if (!resultSet.hasNext()) {
|
|
if (!resultSet.hasNext()) {
|
|
@@ -290,11 +288,12 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
Row row = resultSet.next();
|
|
Row row = resultSet.next();
|
|
- resp
|
|
|
|
- .putHeader(HttpHeaders.SERVER, SERVER)
|
|
|
|
- .putHeader(HttpHeaders.DATE, dateString)
|
|
|
|
- .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
|
|
- .end(new World(row.getInteger(0), row.getInteger(1)).toBuffer(), NULL_HANDLER);
|
|
|
|
|
|
+ World word = new World(row.getInteger(0), row.getInteger(1));
|
|
|
|
+ MultiMap headers = resp.headers();
|
|
|
|
+ headers.add(HttpHeaders.SERVER, SERVER);
|
|
|
|
+ headers.add(HttpHeaders.DATE, dateString);
|
|
|
|
+ headers.add(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON);
|
|
|
|
+ resp.end(word.toJson(), NULL_HANDLER);
|
|
} else {
|
|
} else {
|
|
sendError(req, res.cause());
|
|
sendError(req, res.cause());
|
|
}
|
|
}
|
|
@@ -304,10 +303,11 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
class Queries implements Handler<AsyncResult<RowSet<Row>>> {
|
|
class Queries implements Handler<AsyncResult<RowSet<Row>>> {
|
|
|
|
|
|
boolean failed;
|
|
boolean failed;
|
|
- final JsonArray worlds;
|
|
|
|
|
|
+ final World[] worlds;
|
|
final HttpServerRequest req;
|
|
final HttpServerRequest req;
|
|
final HttpServerResponse resp;
|
|
final HttpServerResponse resp;
|
|
final int queries;
|
|
final int queries;
|
|
|
|
+ int worldsIndex;
|
|
|
|
|
|
public Queries(HttpServerRequest req) {
|
|
public Queries(HttpServerRequest req) {
|
|
int queries = getQueries(req);
|
|
int queries = getQueries(req);
|
|
@@ -315,13 +315,14 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
this.req = req;
|
|
this.req = req;
|
|
this.resp = req.response();
|
|
this.resp = req.response();
|
|
this.queries = queries;
|
|
this.queries = queries;
|
|
- this.worlds = new JsonArray(new ArrayList<>(queries));
|
|
|
|
|
|
+ this.worlds = new World[queries];
|
|
|
|
+ this.worldsIndex = 0;
|
|
}
|
|
}
|
|
|
|
|
|
private void handle() {
|
|
private void handle() {
|
|
- client1.group(c -> {
|
|
|
|
|
|
+ client.group(c -> {
|
|
for (int i = 0; i < queries; i++) {
|
|
for (int i = 0; i < queries; i++) {
|
|
- c.preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld()), this);
|
|
|
|
|
|
+ c.preparedQuery(SELECT_WORLD).execute(Tuple.of(boxedRandomWorldNumber()), this);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
}
|
|
}
|
|
@@ -337,50 +338,48 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
|
|
|
|
// we need a final reference
|
|
// we need a final reference
|
|
final Tuple row = ar.result().iterator().next();
|
|
final Tuple row = ar.result().iterator().next();
|
|
- worlds.add(new JsonObject().put("id", row.getInteger(0)).put("randomNumber", row.getInteger(1)));
|
|
|
|
|
|
+ worlds[worldsIndex++] = new World(row.getInteger(0), row.getInteger(1));
|
|
|
|
|
|
// stop condition
|
|
// stop condition
|
|
- if (worlds.size() == queries) {
|
|
|
|
- resp
|
|
|
|
- .putHeader(HttpHeaders.SERVER, SERVER)
|
|
|
|
- .putHeader(HttpHeaders.DATE, dateString)
|
|
|
|
- .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
|
|
- .end(worlds.toBuffer(), NULL_HANDLER);
|
|
|
|
|
|
+ if (worldsIndex == queries) {
|
|
|
|
+ MultiMap headers = resp.headers();
|
|
|
|
+ headers.add(HttpHeaders.SERVER, SERVER);
|
|
|
|
+ headers.add(HttpHeaders.DATE, dateString);
|
|
|
|
+ headers.add(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON);
|
|
|
|
+ resp.end(World.toJson(worlds), NULL_HANDLER);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- class Update {
|
|
|
|
|
|
+ private class Update {
|
|
|
|
|
|
- final HttpServerRequest req;
|
|
|
|
- boolean failed;
|
|
|
|
- int queryCount;
|
|
|
|
- final World[] worlds;
|
|
|
|
|
|
+ private final HttpServerRequest request;
|
|
|
|
+ private final World[] worldsToUpdate;
|
|
|
|
+ private boolean failed;
|
|
|
|
+ private int selectWorldCompletedCount;
|
|
|
|
|
|
- public Update(HttpServerRequest req) {
|
|
|
|
- final int queries = getQueries(req);
|
|
|
|
- this.req = req;
|
|
|
|
- this.worlds = new World[queries];
|
|
|
|
|
|
+ public Update(HttpServerRequest request) {
|
|
|
|
+ this.request = request;
|
|
|
|
+ this.worldsToUpdate = new World[getQueries(request)];
|
|
}
|
|
}
|
|
|
|
|
|
- private void handle() {
|
|
|
|
-
|
|
|
|
- client1.group(c -> {
|
|
|
|
- PreparedQuery<RowSet<Row>> preparedQuery = c.preparedQuery(SELECT_WORLD);
|
|
|
|
- for (int i = 0; i < worlds.length; i++) {
|
|
|
|
- int id = randomWorld();
|
|
|
|
- int index = i;
|
|
|
|
- preparedQuery.execute(Tuple.of(id), ar2 -> {
|
|
|
|
|
|
+ public void handle() {
|
|
|
|
+ client.group(c -> {
|
|
|
|
+ final PreparedQuery<RowSet<Row>> preparedQuery = c.preparedQuery(App.SELECT_WORLD);
|
|
|
|
+ for (int i = 0; i < worldsToUpdate.length; i++) {
|
|
|
|
+ final Integer id = boxedRandomWorldNumber();
|
|
|
|
+ final int index = i;
|
|
|
|
+ preparedQuery.execute(Tuple.of(id), res -> {
|
|
if (!failed) {
|
|
if (!failed) {
|
|
- if (ar2.failed()) {
|
|
|
|
|
|
+ if (res.failed()) {
|
|
failed = true;
|
|
failed = true;
|
|
- sendError(req, ar2.cause());
|
|
|
|
|
|
+ sendError(request, res.cause());
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
- worlds[index] = new World(ar2.result().iterator().next().getInteger(0), randomWorld());
|
|
|
|
- if (++queryCount == worlds.length) {
|
|
|
|
- handleUpdates();
|
|
|
|
|
|
+ worldsToUpdate[index] = new World(res.result().iterator().next().getInteger(0), boxedRandomWorldNumber());
|
|
|
|
+ if (++selectWorldCompletedCount == worldsToUpdate.length) {
|
|
|
|
+ randomWorldsQueryCompleted();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
});
|
|
@@ -388,43 +387,32 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
- void handleUpdates() {
|
|
|
|
- Arrays.sort(worlds);
|
|
|
|
- int len = worlds.length;
|
|
|
|
- if (0 < len && len <= AGGREGATED_UPDATE_WORLD_QUERY.length) {
|
|
|
|
- List<Object> arguments = new ArrayList<>();
|
|
|
|
- for (World world : worlds) {
|
|
|
|
- arguments.add(world.getId());
|
|
|
|
- arguments.add(world.getRandomNumber());
|
|
|
|
- }
|
|
|
|
- Tuple tuple = Tuple.tuple(arguments);
|
|
|
|
- PreparedQuery<RowSet<Row>> query = AGGREGATED_UPDATE_WORLD_QUERY[len - 1];
|
|
|
|
- query.execute(tuple, this::sendResponse);
|
|
|
|
- } else {
|
|
|
|
- List<Tuple> batch = new ArrayList<>();
|
|
|
|
- for (World world : worlds) {
|
|
|
|
- batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
|
|
|
|
- }
|
|
|
|
- UPDATE_WORLD_QUERY.executeBatch(batch, this::sendResponse);
|
|
|
|
|
|
+ private void randomWorldsQueryCompleted() {
|
|
|
|
+ Arrays.sort(worldsToUpdate);
|
|
|
|
+ final List<Integer> params = new ArrayList<>(worldsToUpdate.length * 2);
|
|
|
|
+ for (int i = 0, count = worldsToUpdate.length;i < count;i++) {
|
|
|
|
+ var world = worldsToUpdate[i];
|
|
|
|
+ params.add(world.getId());
|
|
|
|
+ params.add(world.getRandomNumber());
|
|
}
|
|
}
|
|
|
|
+ AGGREGATED_UPDATE_WORLD_QUERY[worldsToUpdate.length - 1].execute(Tuple.wrap(params), updateResult -> {
|
|
|
|
+ if (updateResult.failed()) {
|
|
|
|
+ sendError(request, updateResult.cause());
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ sendResponse();
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
- private void sendResponse(AsyncResult<?> res) {
|
|
|
|
- if (res.failed()) {
|
|
|
|
- sendError(req, res.cause());
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- JsonArray json = new JsonArray();
|
|
|
|
- for (World world : worlds) {
|
|
|
|
- json.add(world);
|
|
|
|
- }
|
|
|
|
- req.response()
|
|
|
|
- .putHeader(HttpHeaders.SERVER, SERVER)
|
|
|
|
- .putHeader(HttpHeaders.DATE, dateString)
|
|
|
|
- .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
|
|
- .end(json.toBuffer(), NULL_HANDLER);
|
|
|
|
|
|
+ private void sendResponse() {
|
|
|
|
+ var res = request.response();
|
|
|
|
+ MultiMap headers = res.headers();
|
|
|
|
+ headers.add(HttpHeaders.SERVER, App.SERVER);
|
|
|
|
+ headers.add(HttpHeaders.DATE, dateString);
|
|
|
|
+ headers.add(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON);
|
|
|
|
+ Buffer buff = WorldJsonSerializer.toJsonBuffer(worldsToUpdate);
|
|
|
|
+ res.end(buff, null);
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|
|
|
|
|
|
private void handleFortunes(HttpServerRequest req) {
|
|
private void handleFortunes(HttpServerRequest req) {
|
|
@@ -439,11 +427,12 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
List<Fortune> fortunes = result.value();
|
|
List<Fortune> fortunes = result.value();
|
|
fortunes.add(new Fortune(0, "Additional fortune added at request time."));
|
|
fortunes.add(new Fortune(0, "Additional fortune added at request time."));
|
|
Collections.sort(fortunes);
|
|
Collections.sort(fortunes);
|
|
- response
|
|
|
|
- .putHeader(HttpHeaders.SERVER, SERVER)
|
|
|
|
- .putHeader(HttpHeaders.DATE, dateString)
|
|
|
|
- .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_HTML)
|
|
|
|
- .end(FortunesTemplate.template(fortunes).render(factory).buffer(), NULL_HANDLER);
|
|
|
|
|
|
+ MultiMap headers = response.headers();
|
|
|
|
+ headers.add(HttpHeaders.SERVER, SERVER);
|
|
|
|
+ headers.add(HttpHeaders.DATE, dateString);
|
|
|
|
+ headers.add(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_HTML);
|
|
|
|
+ FortunesTemplate template = FortunesTemplate.template(fortunes);
|
|
|
|
+ response.end(template.render(factory).buffer(), NULL_HANDLER);
|
|
} else {
|
|
} else {
|
|
sendError(req, ar.cause());
|
|
sendError(req, ar.cause());
|
|
}
|
|
}
|
|
@@ -462,19 +451,17 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
count = Math.max(1, count);
|
|
count = Math.max(1, count);
|
|
count = Math.min(500, count);
|
|
count = Math.min(500, count);
|
|
List<CachedWorld> worlds = WORLD_CACHE.getCachedWorld(count);
|
|
List<CachedWorld> worlds = WORLD_CACHE.getCachedWorld(count);
|
|
- JsonArray json = new JsonArray(worlds);
|
|
|
|
HttpServerResponse response = req.response();
|
|
HttpServerResponse response = req.response();
|
|
MultiMap headers = response.headers();
|
|
MultiMap headers = response.headers();
|
|
headers
|
|
headers
|
|
.add(HEADER_CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
.add(HEADER_CONTENT_TYPE, RESPONSE_TYPE_JSON)
|
|
.add(HEADER_SERVER, SERVER)
|
|
.add(HEADER_SERVER, SERVER)
|
|
.add(HEADER_DATE, dateString);
|
|
.add(HEADER_DATE, dateString);
|
|
- response.end(json.toBuffer(), NULL_HANDLER);
|
|
|
|
|
|
+ response.end(CachedWorld.toJson(worlds), NULL_HANDLER);
|
|
}
|
|
}
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
public static void main(String[] args) throws Exception {
|
|
-
|
|
|
|
- int eventLoopPoolSize = Runtime.getRuntime().availableProcessors();
|
|
|
|
|
|
+ int eventLoopPoolSize = NUM_PROCESSORS;
|
|
String sizeProp = System.getProperty("vertx.eventLoopPoolSize");
|
|
String sizeProp = System.getProperty("vertx.eventLoopPoolSize");
|
|
if (sizeProp != null) {
|
|
if (sizeProp != null) {
|
|
try {
|
|
try {
|
|
@@ -522,6 +509,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
|
|
logger.error("Could not read Vertx version", e);;
|
|
logger.error("Could not read Vertx version", e);;
|
|
}
|
|
}
|
|
logger.info("Vertx: " + version);
|
|
logger.info("Vertx: " + version);
|
|
|
|
+ logger.info("Processors: " + NUM_PROCESSORS);
|
|
logger.info("Event Loop Size: " + ((MultithreadEventExecutorGroup)vertx.nettyEventLoopGroup()).executorCount());
|
|
logger.info("Event Loop Size: " + ((MultithreadEventExecutorGroup)vertx.nettyEventLoopGroup()).executorCount());
|
|
logger.info("Native transport : " + nativeTransport);
|
|
logger.info("Native transport : " + nativeTransport);
|
|
logger.info("Transport : " + transport);
|
|
logger.info("Transport : " + transport);
|