|
@@ -40,10 +40,27 @@ public class SqlClientOfficeFloorMain implements DatabaseOperations {
|
|
|
password) -> new SqlClientOfficeFloorMain(socketCount, server, port, database, username, password));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Connections.
|
|
|
+ */
|
|
|
+ private static class Connections {
|
|
|
+ private int index = -1;
|
|
|
+ private final PgConnection[] connections;
|
|
|
+
|
|
|
+ private Connections(PgConnection[] connections) {
|
|
|
+ this.connections = connections;
|
|
|
+ }
|
|
|
+
|
|
|
+ private PgConnection getConnection(int max) {
|
|
|
+ this.index = (this.index + 1) % max;
|
|
|
+ return this.connections[this.index];
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* {@link ThreadLocal} {@link PgConnection} instances.
|
|
|
*/
|
|
|
- private final ThreadLocal<PgConnection> threadLocalConnection;
|
|
|
+ private final ThreadLocal<Connections> threadLocalConnections;
|
|
|
|
|
|
/**
|
|
|
* Instantiate.
|
|
@@ -67,9 +84,9 @@ public class SqlClientOfficeFloorMain implements DatabaseOperations {
|
|
|
.setTcpQuickAck(true);
|
|
|
|
|
|
// Provide connection
|
|
|
- this.threadLocalConnection = new ThreadLocal<PgConnection>() {
|
|
|
+ this.threadLocalConnections = new ThreadLocal<Connections>() {
|
|
|
@Override
|
|
|
- protected PgConnection initialValue() {
|
|
|
+ protected Connections initialValue() {
|
|
|
try {
|
|
|
// Obtain thread affinity
|
|
|
BitSet affinity = Affinity.getAffinity();
|
|
@@ -91,7 +108,12 @@ public class SqlClientOfficeFloorMain implements DatabaseOperations {
|
|
|
Vertx vertx = builder.init().vertx();
|
|
|
|
|
|
// Obtain the connection
|
|
|
- return OfficeFloorVertx.block(PgConnection.connect(vertx, connectOptions));
|
|
|
+ PgConnection[] connections = new PgConnection[Math
|
|
|
+ .max(Math.max(Math.max(DB_COUNT, QUERIES_COUNT), FORTUNES_COUNT), UPDATE_COUNT)];
|
|
|
+ for (int i = 0; i < connections.length; i++) {
|
|
|
+ connections[i] = OfficeFloorVertx.block(PgConnection.connect(vertx, connectOptions));
|
|
|
+ }
|
|
|
+ return new Connections(connections);
|
|
|
} catch (Exception ex) {
|
|
|
throw new IllegalStateException("Failed to setup connection", ex);
|
|
|
}
|
|
@@ -108,9 +130,12 @@ public class SqlClientOfficeFloorMain implements DatabaseOperations {
|
|
|
// Nothing thread specific to set up
|
|
|
}
|
|
|
|
|
|
+ private static final int DB_COUNT = 5;
|
|
|
+
|
|
|
@Override
|
|
|
public void db(DbSendResponse sender) {
|
|
|
- this.threadLocalConnection.get().preparedQuery("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID=$1")
|
|
|
+ this.threadLocalConnections.get().getConnection(DB_COUNT)
|
|
|
+ .preparedQuery("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID=$1")
|
|
|
.execute(Tuple.of(ThreadLocalRandom.current().nextInt(1, 10001)), result -> {
|
|
|
if (result.failed()) {
|
|
|
sender.sendError(result.cause());
|
|
@@ -127,11 +152,13 @@ public class SqlClientOfficeFloorMain implements DatabaseOperations {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ private static final int QUERIES_COUNT = 3;
|
|
|
+
|
|
|
@Override
|
|
|
public void queries(int queryCount, QueriesSendResponse sender) {
|
|
|
World[] worlds = new World[queryCount];
|
|
|
AtomicInteger count = new AtomicInteger(0);
|
|
|
- SqlConnection sqlConnection = this.threadLocalConnection.get();
|
|
|
+ SqlConnection sqlConnection = this.threadLocalConnections.get().getConnection(QUERIES_COUNT);
|
|
|
for (int i = 0; i < queryCount; i++) {
|
|
|
sqlConnection.preparedQuery("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID=$1")
|
|
|
.execute(Tuple.of(ThreadLocalRandom.current().nextInt(1, 10001)), result -> {
|
|
@@ -155,28 +182,33 @@ public class SqlClientOfficeFloorMain implements DatabaseOperations {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static final int FORTUNES_COUNT = 7;
|
|
|
+
|
|
|
@Override
|
|
|
public void fortunes(FortunesSendResponse sender) {
|
|
|
- this.threadLocalConnection.get().preparedQuery("SELECT ID, MESSAGE FROM FORTUNE").execute(result -> {
|
|
|
- if (result.failed()) {
|
|
|
- sender.sendError(result.cause());
|
|
|
- } else {
|
|
|
- List<Fortune> fortunes = new ArrayList<>(16);
|
|
|
- RowIterator<Row> rows = result.result().iterator();
|
|
|
- while (rows.hasNext()) {
|
|
|
- Row row = rows.next();
|
|
|
- fortunes.add(new Fortune(row.getInteger(0), row.getString(1)));
|
|
|
- }
|
|
|
- sender.sendFortunes(fortunes);
|
|
|
- }
|
|
|
- });
|
|
|
+ this.threadLocalConnections.get().getConnection(FORTUNES_COUNT).preparedQuery("SELECT ID, MESSAGE FROM FORTUNE")
|
|
|
+ .execute(result -> {
|
|
|
+ if (result.failed()) {
|
|
|
+ sender.sendError(result.cause());
|
|
|
+ } else {
|
|
|
+ List<Fortune> fortunes = new ArrayList<>(16);
|
|
|
+ RowIterator<Row> rows = result.result().iterator();
|
|
|
+ while (rows.hasNext()) {
|
|
|
+ Row row = rows.next();
|
|
|
+ fortunes.add(new Fortune(row.getInteger(0), row.getString(1)));
|
|
|
+ }
|
|
|
+ sender.sendFortunes(fortunes);
|
|
|
+ }
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
+ private static final int UPDATE_COUNT = 3;
|
|
|
+
|
|
|
@Override
|
|
|
public void update(int queryCount, UpdateSendResponse sender) {
|
|
|
World[] worlds = new World[queryCount];
|
|
|
AtomicInteger count = new AtomicInteger(0);
|
|
|
- SqlConnection sqlConnection = this.threadLocalConnection.get();
|
|
|
+ SqlConnection sqlConnection = this.threadLocalConnections.get().getConnection(UPDATE_COUNT);
|
|
|
for (int i = 0; i < queryCount; i++) {
|
|
|
sqlConnection.preparedQuery("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID=$1")
|
|
|
.execute(Tuple.of(ThreadLocalRandom.current().nextInt(1, 10001)), result -> {
|