|
@@ -40,7 +40,45 @@ public class PgClient {
|
|
|
private PreparedQuery<RowSet<Row>> UPDATE_WORLD_QUERY;
|
|
|
private SqlClientInternal updates;
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- private PreparedQuery<RowSet<Row>>[] AGGREGATED_UPDATE_WORLD_QUERY = new PreparedQuery[128];
|
|
|
+ private PreparedQuery<RowSet<Row>>[] AGGREGATED_UPDATE_WORLD_QUERY = new PreparedQuery[500];
|
|
|
+ }
|
|
|
+
|
|
|
+ public static class PgUpdate {
|
|
|
+ private DbConnection connection;
|
|
|
+
|
|
|
+ public PgUpdate(DbConnection connection) {
|
|
|
+ this.connection = connection;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void selectWorldForUpdate(int queries, BiConsumer<Integer, PreparedQuery<RowSet<Row>>> consumer) {
|
|
|
+ connection.queries.group(c -> {
|
|
|
+ PreparedQuery<RowSet<Row>> statement = c.preparedQuery(SELECT_WORLD);
|
|
|
+ for (int i = 0; i < queries; i++) {
|
|
|
+ consumer.accept(i, statement);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public void updateWorld(World[] worlds, Handler<AsyncResult<RowSet<Row>>> handler) {
|
|
|
+ Arrays.sort(worlds);
|
|
|
+ int len = worlds.length;
|
|
|
+ if (0 < len && len <= connection.AGGREGATED_UPDATE_WORLD_QUERY.length) {
|
|
|
+ List<Object> arguments = new ArrayList<>();
|
|
|
+ for (World world : worlds) {
|
|
|
+ arguments.add(world.getId());
|
|
|
+ arguments.add(world.getRandomNumber());
|
|
|
+ }
|
|
|
+ Tuple tuple = Tuple.tuple(arguments);
|
|
|
+ PreparedQuery<RowSet<Row>> query = connection.AGGREGATED_UPDATE_WORLD_QUERY[len - 1];
|
|
|
+ query.execute(tuple).onComplete(handler);
|
|
|
+ } else {
|
|
|
+ List<Tuple> batch = new ArrayList<>();
|
|
|
+ for (World world : worlds) {
|
|
|
+ batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
|
|
|
+ }
|
|
|
+ connection.UPDATE_WORLD_QUERY.executeBatch(batch).onComplete(handler);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static class DbConnectionFactory extends ThreadLocal<DbConnection> {
|
|
@@ -129,51 +167,23 @@ public class PgClient {
|
|
|
}
|
|
|
|
|
|
public void selectWorld(Tuple row, Handler<AsyncResult<RowSet<Row>>> handler) {
|
|
|
- this.sqlClient.get().SELECT_WORLD_QUERY.execute(row, handler);
|
|
|
+ this.sqlClient.get().SELECT_WORLD_QUERY.execute(row).onComplete(handler);
|
|
|
}
|
|
|
|
|
|
public void selectWorlds(int queries, Handler<AsyncResult<RowSet<Row>>> handler) {
|
|
|
this.sqlClient.get().queries.group(c -> {
|
|
|
for (int i = 0; i < queries; i++) {
|
|
|
- c.preparedQuery(SELECT_WORLD).execute(Tuple.of(Util.randomWorld()), handler);
|
|
|
+ c.preparedQuery(SELECT_WORLD).execute(Tuple.of(Util.boxedRandomWorld())).onComplete(handler);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
public void fortunes(Handler<AsyncResult<RowSet<Row>>> handler) {
|
|
|
- this.sqlClient.get().SELECT_FORTUNE_QUERY.execute(handler);
|
|
|
- }
|
|
|
-
|
|
|
- public void selectWorldForUpdate(int queries,
|
|
|
- BiConsumer<Integer, PreparedQuery<RowSet<Row>>> consumer) {
|
|
|
- this.sqlClient.get().queries.group(c -> {
|
|
|
- PreparedQuery<RowSet<Row>> statement = c.preparedQuery(SELECT_WORLD);
|
|
|
- for (int i = 0; i < queries; i++) {
|
|
|
- consumer.accept(i, statement);
|
|
|
- }
|
|
|
- });
|
|
|
+ this.sqlClient.get().SELECT_FORTUNE_QUERY.execute().onComplete(handler);
|
|
|
}
|
|
|
|
|
|
- public void updateWorld(World[] worlds, Handler<AsyncResult<RowSet<Row>>> handler) {
|
|
|
- Arrays.sort(worlds);
|
|
|
- int len = worlds.length;
|
|
|
- var connection = this.sqlClient.get();
|
|
|
- if (0 < len && len <= connection.AGGREGATED_UPDATE_WORLD_QUERY.length) {
|
|
|
- List<Object> arguments = new ArrayList<>();
|
|
|
- for (World world : worlds) {
|
|
|
- arguments.add(world.getId());
|
|
|
- arguments.add(world.getRandomNumber());
|
|
|
- }
|
|
|
- Tuple tuple = Tuple.tuple(arguments);
|
|
|
- PreparedQuery<RowSet<Row>> query = connection.AGGREGATED_UPDATE_WORLD_QUERY[len - 1];
|
|
|
- query.execute(tuple, handler);
|
|
|
- } else {
|
|
|
- List<Tuple> batch = new ArrayList<>();
|
|
|
- for (World world : worlds) {
|
|
|
- batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
|
|
|
- }
|
|
|
- connection.UPDATE_WORLD_QUERY.executeBatch(batch, handler);
|
|
|
- }
|
|
|
+ public PgUpdate updater() {
|
|
|
+ return new PgUpdate(sqlClient.get());
|
|
|
}
|
|
|
|
|
|
private PgConnectOptions pgPoolOptions(Config config) {
|