|
@@ -2,13 +2,18 @@ package net.officefloor.benchmark;
|
|
|
|
|
|
import java.net.Socket;
|
|
import java.net.Socket;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.BitSet;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
import io.vertx.core.Vertx;
|
|
import io.vertx.core.Vertx;
|
|
import io.vertx.core.VertxOptions;
|
|
import io.vertx.core.VertxOptions;
|
|
|
|
+import io.vertx.core.impl.VertxBuilder;
|
|
|
|
+import io.vertx.core.impl.VertxThread;
|
|
|
|
+import io.vertx.core.spi.VertxThreadFactory;
|
|
import io.vertx.pgclient.PgConnectOptions;
|
|
import io.vertx.pgclient.PgConnectOptions;
|
|
import io.vertx.pgclient.PgConnection;
|
|
import io.vertx.pgclient.PgConnection;
|
|
import io.vertx.sqlclient.Row;
|
|
import io.vertx.sqlclient.Row;
|
|
@@ -18,6 +23,7 @@ import io.vertx.sqlclient.Tuple;
|
|
import net.officefloor.server.RequestHandler;
|
|
import net.officefloor.server.RequestHandler;
|
|
import net.officefloor.server.http.parse.HttpRequestParser;
|
|
import net.officefloor.server.http.parse.HttpRequestParser;
|
|
import net.officefloor.vertx.OfficeFloorVertx;
|
|
import net.officefloor.vertx.OfficeFloorVertx;
|
|
|
|
+import net.openhft.affinity.Affinity;
|
|
|
|
|
|
/**
|
|
/**
|
|
* R2DBC server.
|
|
* R2DBC server.
|
|
@@ -52,20 +58,38 @@ public class SqlClientOfficeFloorMain implements DatabaseOperations {
|
|
public SqlClientOfficeFloorMain(int socketCount, String server, int port, String database, String username,
|
|
public SqlClientOfficeFloorMain(int socketCount, String server, int port, String database, String username,
|
|
String password) {
|
|
String password) {
|
|
|
|
|
|
- // Obtain the vertx
|
|
|
|
- int workerThreadCount = Math.max(1, socketCount / 4);
|
|
|
|
- Vertx vertx = Vertx
|
|
|
|
- .vertx(new VertxOptions().setPreferNativeTransport(true).setWorkerPoolSize(workerThreadCount));
|
|
|
|
|
|
+ // Should be all I/O processing for SQL responses
|
|
|
|
+ System.setProperty("vertx.nettyIORatio", "100");
|
|
|
|
|
|
// Create connection
|
|
// Create connection
|
|
PgConnectOptions connectOptions = new PgConnectOptions().setHost(server).setPort(port).setDatabase(database)
|
|
PgConnectOptions connectOptions = new PgConnectOptions().setHost(server).setPort(port).setDatabase(database)
|
|
.setUser(username).setPassword(password).setCachePreparedStatements(true);
|
|
.setUser(username).setPassword(password).setCachePreparedStatements(true);
|
|
|
|
|
|
- // Create thread local connection
|
|
|
|
|
|
+ // Provide connection
|
|
this.threadLocalConnection = new ThreadLocal<PgConnection>() {
|
|
this.threadLocalConnection = new ThreadLocal<PgConnection>() {
|
|
@Override
|
|
@Override
|
|
protected PgConnection initialValue() {
|
|
protected PgConnection initialValue() {
|
|
try {
|
|
try {
|
|
|
|
+ // Obtain thread affinity
|
|
|
|
+ BitSet affinity = Affinity.getAffinity();
|
|
|
|
+ System.out.println(Thread.currentThread().getName() + " has affinity " + affinity);
|
|
|
|
+
|
|
|
|
+ // Setup Vertx for connection
|
|
|
|
+ VertxOptions options = new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1)
|
|
|
|
+ .setWorkerPoolSize(1).setInternalBlockingPoolSize(1);
|
|
|
|
+ VertxBuilder builder = new VertxBuilder(options).threadFactory(new VertxThreadFactory() {
|
|
|
|
+ @Override
|
|
|
|
+ public VertxThread newVertxThread(Runnable target, String name, boolean worker,
|
|
|
|
+ long maxExecTime, TimeUnit maxExecTimeUnit) {
|
|
|
|
+ return VertxThreadFactory.INSTANCE.newVertxThread(() -> {
|
|
|
|
+ Affinity.setAffinity(affinity);
|
|
|
|
+ target.run();
|
|
|
|
+ }, name, worker, maxExecTime, maxExecTimeUnit);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ Vertx vertx = builder.init().vertx();
|
|
|
|
+
|
|
|
|
+ // Obtain the connection
|
|
return OfficeFloorVertx.block(PgConnection.connect(vertx, connectOptions));
|
|
return OfficeFloorVertx.block(PgConnection.connect(vertx, connectOptions));
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
throw new IllegalStateException("Failed to setup connection", ex);
|
|
throw new IllegalStateException("Failed to setup connection", ex);
|