|
@@ -21,16 +21,20 @@ import java.io.IOException;
|
|
import java.io.PrintWriter;
|
|
import java.io.PrintWriter;
|
|
import java.io.Writer;
|
|
import java.io.Writer;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
|
|
+import java.nio.channels.CancelledKeyException;
|
|
|
|
+import java.nio.channels.ClosedChannelException;
|
|
import java.time.ZoneOffset;
|
|
import java.time.ZoneOffset;
|
|
import java.time.ZonedDateTime;
|
|
import java.time.ZonedDateTime;
|
|
import java.time.format.DateTimeFormatter;
|
|
import java.time.format.DateTimeFormatter;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
|
|
+import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
-import java.util.Timer;
|
|
|
|
-import java.util.TimerTask;
|
|
|
|
|
|
+import java.util.concurrent.Executor;
|
|
|
|
+import java.util.concurrent.Executors;
|
|
|
|
+import java.util.concurrent.ScheduledExecutorService;
|
|
import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
-import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.logging.Logger;
|
|
import java.util.logging.Logger;
|
|
|
|
|
|
import org.apache.commons.text.StringEscapeUtils;
|
|
import org.apache.commons.text.StringEscapeUtils;
|
|
@@ -41,6 +45,7 @@ import com.github.mustachejava.DefaultMustacheFactory;
|
|
import com.github.mustachejava.Mustache;
|
|
import com.github.mustachejava.Mustache;
|
|
import com.github.mustachejava.MustacheFactory;
|
|
import com.github.mustachejava.MustacheFactory;
|
|
|
|
|
|
|
|
+import io.r2dbc.pool.PoolingConnectionFactoryProvider;
|
|
import io.r2dbc.spi.Batch;
|
|
import io.r2dbc.spi.Batch;
|
|
import io.r2dbc.spi.Connection;
|
|
import io.r2dbc.spi.Connection;
|
|
import io.r2dbc.spi.ConnectionFactories;
|
|
import io.r2dbc.spi.ConnectionFactories;
|
|
@@ -49,11 +54,17 @@ import io.r2dbc.spi.ConnectionFactoryOptions;
|
|
import io.r2dbc.spi.R2dbcTransientResourceException;
|
|
import io.r2dbc.spi.R2dbcTransientResourceException;
|
|
import lombok.AllArgsConstructor;
|
|
import lombok.AllArgsConstructor;
|
|
import lombok.Data;
|
|
import lombok.Data;
|
|
|
|
+import net.officefloor.benchmark.RawOfficeFloorMain.Fortune;
|
|
|
|
+import net.officefloor.benchmark.RawOfficeFloorMain.Message;
|
|
|
|
+import net.officefloor.benchmark.RawOfficeFloorMain.World;
|
|
import net.officefloor.frame.api.manage.OfficeFloor;
|
|
import net.officefloor.frame.api.manage.OfficeFloor;
|
|
import net.officefloor.frame.api.manage.ProcessManager;
|
|
import net.officefloor.frame.api.manage.ProcessManager;
|
|
import net.officefloor.frame.api.managedobject.ManagedObjectContext;
|
|
import net.officefloor.frame.api.managedobject.ManagedObjectContext;
|
|
import net.officefloor.frame.api.managedobject.ProcessSafeOperation;
|
|
import net.officefloor.frame.api.managedobject.ProcessSafeOperation;
|
|
|
|
+import net.officefloor.frame.api.managedobject.pool.ThreadCompletionListener;
|
|
|
|
+import net.officefloor.server.RequestHandler;
|
|
import net.officefloor.server.SocketManager;
|
|
import net.officefloor.server.SocketManager;
|
|
|
|
+import net.officefloor.server.SocketServicer;
|
|
import net.officefloor.server.http.AbstractHttpServicerFactory;
|
|
import net.officefloor.server.http.AbstractHttpServicerFactory;
|
|
import net.officefloor.server.http.HttpHeaderName;
|
|
import net.officefloor.server.http.HttpHeaderName;
|
|
import net.officefloor.server.http.HttpHeaderValue;
|
|
import net.officefloor.server.http.HttpHeaderValue;
|
|
@@ -66,11 +77,18 @@ import net.officefloor.server.http.HttpStatus;
|
|
import net.officefloor.server.http.ServerHttpConnection;
|
|
import net.officefloor.server.http.ServerHttpConnection;
|
|
import net.officefloor.server.http.impl.HttpServerLocationImpl;
|
|
import net.officefloor.server.http.impl.HttpServerLocationImpl;
|
|
import net.officefloor.server.http.impl.ProcessAwareServerHttpConnectionManagedObject;
|
|
import net.officefloor.server.http.impl.ProcessAwareServerHttpConnectionManagedObject;
|
|
|
|
+import net.officefloor.server.http.parse.HttpRequestParser;
|
|
import net.officefloor.server.http.parse.HttpRequestParser.HttpRequestParserMetaData;
|
|
import net.officefloor.server.http.parse.HttpRequestParser.HttpRequestParserMetaData;
|
|
-import net.officefloor.server.stream.StreamBufferPool;
|
|
|
|
-import net.officefloor.server.stream.impl.ThreadLocalStreamBufferPool;
|
|
|
|
|
|
+import net.officefloor.web.executive.CpuCore;
|
|
|
|
+import net.officefloor.web.executive.CpuCore.LogicalCpu;
|
|
|
|
+import net.openhft.affinity.Affinity;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
+import reactor.core.scheduler.Scheduler;
|
|
|
|
+import reactor.core.scheduler.Schedulers;
|
|
|
|
+
|
|
|
|
+import net.officefloor.web.executive.CpuCore;
|
|
|
|
+import net.openhft.affinity.Affinity;
|
|
|
|
|
|
/**
|
|
/**
|
|
* <p>
|
|
* <p>
|
|
@@ -80,6 +98,11 @@ import reactor.core.publisher.Mono;
|
|
*/
|
|
*/
|
|
public class RawOfficeFloorMain {
|
|
public class RawOfficeFloorMain {
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Database query load capacity to handle validation load.
|
|
|
|
+ */
|
|
|
|
+ private static final int QUERY_LOAD_CAPACITY = 512 * (20 + 1); // update 20 selects then batch
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Buffer size of queries.
|
|
* Buffer size of queries.
|
|
*/
|
|
*/
|
|
@@ -116,38 +139,65 @@ public class RawOfficeFloorMain {
|
|
// Increase the buffer size (note: too high and cause OOM issues)
|
|
// Increase the buffer size (note: too high and cause OOM issues)
|
|
System.setProperty("reactor.bufferSize.small", String.valueOf(QUERY_BUFFER_SIZE));
|
|
System.setProperty("reactor.bufferSize.small", String.valueOf(QUERY_BUFFER_SIZE));
|
|
|
|
|
|
- // Build the connection pool
|
|
|
|
- ConnectionFactoryOptions factoryOptions = ConnectionFactoryOptions.builder()
|
|
|
|
- .option(ConnectionFactoryOptions.DRIVER, "pool").option(ConnectionFactoryOptions.PROTOCOL, "postgresql")
|
|
|
|
- .option(ConnectionFactoryOptions.HOST, server).option(ConnectionFactoryOptions.PORT, 5432)
|
|
|
|
- .option(ConnectionFactoryOptions.DATABASE, "hello_world")
|
|
|
|
- .option(ConnectionFactoryOptions.USER, "benchmarkdbuser")
|
|
|
|
- .option(ConnectionFactoryOptions.PASSWORD, "benchmarkdbpass").build();
|
|
|
|
- ConnectionFactory connectionFactory = ConnectionFactories.get(factoryOptions);
|
|
|
|
-
|
|
|
|
// Create the server location
|
|
// Create the server location
|
|
HttpServerLocation serverLocation = new HttpServerLocationImpl("localhost", port, -1);
|
|
HttpServerLocation serverLocation = new HttpServerLocationImpl("localhost", port, -1);
|
|
|
|
|
|
- // Create the execution strategy
|
|
|
|
- ThreadFactory[] executionStrategy = new ThreadFactory[Runtime.getRuntime().availableProcessors()];
|
|
|
|
- for (int i = 0; i < executionStrategy.length; i++) {
|
|
|
|
- executionStrategy[i] = (runnable) -> new Thread(runnable);
|
|
|
|
|
|
+ // Create a thread factory per logical CPU
|
|
|
|
+ ThreadCompletionListener[] threadCompletionListenerCapture = new ThreadCompletionListener[] { null };
|
|
|
|
+ List<ThreadFactory> threadFactories = new LinkedList<>();
|
|
|
|
+ for (CpuCore cpuCore : CpuCore.getCores()) {
|
|
|
|
+ for (CpuCore.LogicalCpu logicalCpu : cpuCore.getCpus()) {
|
|
|
|
+
|
|
|
|
+ // Create thread factory for logical CPU
|
|
|
|
+ ThreadFactory boundThreadFactory = (runnable) -> new Thread(() -> {
|
|
|
|
+ try {
|
|
|
|
+ // Bind thread to logical CPU
|
|
|
|
+ Affinity.setAffinity(logicalCpu.getCpuAffinity());
|
|
|
|
+
|
|
|
|
+ // Run logic for thread
|
|
|
|
+ runnable.run();
|
|
|
|
+ } finally {
|
|
|
|
+ threadCompletionListenerCapture[0].threadComplete();
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+
|
|
|
|
+ // Add the thread factory
|
|
|
|
+ threadFactories.add(boundThreadFactory);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ ThreadFactory[] executionStrategy = threadFactories.toArray(new ThreadFactory[0]);
|
|
System.out.println("Using " + executionStrategy.length + " executors");
|
|
System.out.println("Using " + executionStrategy.length + " executors");
|
|
|
|
|
|
// Create the socket manager
|
|
// Create the socket manager
|
|
- socketManager = HttpServerSocketManagedObjectSource.createSocketManager(executionStrategy);
|
|
|
|
|
|
+ socketManager = HttpServerSocketManagedObjectSource.createSocketManager(executionStrategy,
|
|
|
|
+ (threadCompletionListener) -> threadCompletionListenerCapture[0] = threadCompletionListener);
|
|
|
|
+
|
|
|
|
+ // Must have enough connection capacity for initial load (+1 for rounding)
|
|
|
|
+ int requiredConnectionsPerSocket = (QUERY_LOAD_CAPACITY / (executionStrategy.length * QUERY_BUFFER_SIZE)) + 1;
|
|
|
|
+ int connectionsPerSocket = Math.max(4, requiredConnectionsPerSocket);
|
|
|
|
+ System.out.println("Using " + connectionsPerSocket + " connections per socket");
|
|
|
|
+
|
|
|
|
+ // Determine the pool size for connections
|
|
|
|
+ int connectionPoolSize = executionStrategy.length * connectionsPerSocket;
|
|
|
|
+
|
|
|
|
+ // Build the connection pool
|
|
|
|
+ ConnectionFactoryOptions factoryOptions = ConnectionFactoryOptions.builder()
|
|
|
|
+ .option(ConnectionFactoryOptions.DRIVER, "pool").option(ConnectionFactoryOptions.PROTOCOL, "postgresql")
|
|
|
|
+ .option(ConnectionFactoryOptions.HOST, server).option(ConnectionFactoryOptions.PORT, 5432)
|
|
|
|
+ .option(ConnectionFactoryOptions.DATABASE, "hello_world")
|
|
|
|
+ .option(ConnectionFactoryOptions.USER, "benchmarkdbuser")
|
|
|
|
+ .option(ConnectionFactoryOptions.PASSWORD, "benchmarkdbpass")
|
|
|
|
+ .option(PoolingConnectionFactoryProvider.MAX_SIZE, connectionPoolSize).build();
|
|
|
|
+ ConnectionFactory connectionFactory = ConnectionFactories.get(factoryOptions);
|
|
|
|
|
|
// Create raw HTTP servicing
|
|
// Create raw HTTP servicing
|
|
- StreamBufferPool<ByteBuffer> serviceBufferPool = new ThreadLocalStreamBufferPool(
|
|
|
|
- () -> ByteBuffer.allocateDirect(8046), Integer.MAX_VALUE, Integer.MAX_VALUE);
|
|
|
|
- RawHttpServicerFactory serviceFactory = new RawHttpServicerFactory(serverLocation, serviceBufferPool,
|
|
|
|
- connectionFactory);
|
|
|
|
|
|
+ RawHttpServicerFactory serviceFactory = new RawHttpServicerFactory(serverLocation, connectionFactory,
|
|
|
|
+ connectionsPerSocket);
|
|
socketManager.bindServerSocket(serverLocation.getClusterHttpPort(), null, null, serviceFactory, serviceFactory);
|
|
socketManager.bindServerSocket(serverLocation.getClusterHttpPort(), null, null, serviceFactory, serviceFactory);
|
|
|
|
|
|
// Setup Date
|
|
// Setup Date
|
|
- Timer dateTimer = new Timer(true);
|
|
|
|
- dateTimer.schedule(serviceFactory.updateDate, 0, 1000);
|
|
|
|
|
|
+ ScheduledExecutorService dateTimer = Executors.newScheduledThreadPool(1);
|
|
|
|
+ dateTimer.scheduleAtFixedRate(serviceFactory.updateDate, 0, 1, TimeUnit.SECONDS);
|
|
|
|
|
|
// Start servicing
|
|
// Start servicing
|
|
Runnable[] runnables = socketManager.getRunnables();
|
|
Runnable[] runnables = socketManager.getRunnables();
|
|
@@ -185,19 +235,14 @@ public class RawOfficeFloorMain {
|
|
|
|
|
|
private static final R2dbcTransientResourceException THROTTLED = new R2dbcTransientResourceException();
|
|
private static final R2dbcTransientResourceException THROTTLED = new R2dbcTransientResourceException();
|
|
|
|
|
|
- private static final int LARGE_QUERY = 100;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* <code>Date</code> {@link HttpHeaderValue}.
|
|
* <code>Date</code> {@link HttpHeaderValue}.
|
|
*/
|
|
*/
|
|
private volatile HttpHeaderValue dateHttpHeader;
|
|
private volatile HttpHeaderValue dateHttpHeader;
|
|
|
|
|
|
- private final TimerTask updateDate = new TimerTask() {
|
|
|
|
- @Override
|
|
|
|
- public void run() {
|
|
|
|
- String now = DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
|
|
|
|
- RawHttpServicerFactory.this.dateHttpHeader = new HttpHeaderValue(now);
|
|
|
|
- }
|
|
|
|
|
|
+ private final Runnable updateDate = () -> {
|
|
|
|
+ String now = DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC));
|
|
|
|
+ RawHttpServicerFactory.this.dateHttpHeader = new HttpHeaderValue(now);
|
|
};
|
|
};
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -232,78 +277,45 @@ public class RawOfficeFloorMain {
|
|
private final ConnectionFactory connectionFactory;
|
|
private final ConnectionFactory connectionFactory;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * {@link ThreadLocal} {@link Connection}.
|
|
|
|
|
|
+ * {@link ThreadLocal} {@link Connection} instances.
|
|
*/
|
|
*/
|
|
- private final ThreadLocal<Connection> threadLocalConnection;
|
|
|
|
|
|
+ private final ThreadLocal<Connection[]> threadLocalConnections;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * db {@link ThreadLocalRateLimit}.
|
|
|
|
|
|
+ * {@link ThreadLocal} {@link RateLimit}.
|
|
*/
|
|
*/
|
|
- private final ThreadLocalRateLimit dbRateLimit = new ThreadLocalRateLimit();
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * queries {@link ThreadLocalRateLimit}.
|
|
|
|
- */
|
|
|
|
- private final ThreadLocalRateLimit queriesRateLimit = new ThreadLocalRateLimit();
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * fortunes {@link ThreadLocalRateLimit}.
|
|
|
|
- */
|
|
|
|
- private final ThreadLocalRateLimit fortunesRateLimit = new ThreadLocalRateLimit();
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * updates {@link ThreadLocalRateLimit}.
|
|
|
|
- */
|
|
|
|
- private final ThreadLocalRateLimit updatesRateLimit = new ThreadLocalRateLimit();
|
|
|
|
-
|
|
|
|
- /**
|
|
|
|
- * {@link Mono} to service /db.
|
|
|
|
- */
|
|
|
|
- private final Mono<World> db;
|
|
|
|
|
|
+ private final ThreadLocal<RateLimit> threadLocalRateLimit = new ThreadLocal<RateLimit>();
|
|
|
|
|
|
/**
|
|
/**
|
|
* {@link Mustache} for /fortunes.
|
|
* {@link Mustache} for /fortunes.
|
|
*/
|
|
*/
|
|
private final Mustache fortuneMustache;
|
|
private final Mustache fortuneMustache;
|
|
|
|
|
|
- /**
|
|
|
|
- * {@link Mono} to service /fortunes.
|
|
|
|
- */
|
|
|
|
- private final Mono<List<Fortune>> fortunes;
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Instantiate.
|
|
* Instantiate.
|
|
*
|
|
*
|
|
- * @param serverLocation {@link HttpServerLocation}.
|
|
|
|
- * @param serviceBufferPool {@link StreamBufferPool}.
|
|
|
|
- * @param connectionFactory {@link ConnectionFactory}.
|
|
|
|
|
|
+ * @param serverLocation {@link HttpServerLocation}.
|
|
|
|
+ * @param connectionFactory {@link ConnectionFactory}.
|
|
|
|
+ * @param connectionsPerSocket Number of DB connections per socket.
|
|
*/
|
|
*/
|
|
- public RawHttpServicerFactory(HttpServerLocation serverLocation, StreamBufferPool<ByteBuffer> serviceBufferPool,
|
|
|
|
- ConnectionFactory connectionFactory) {
|
|
|
|
- super(serverLocation, false, new HttpRequestParserMetaData(100, 1000, 1000000), serviceBufferPool, null,
|
|
|
|
- null, true);
|
|
|
|
|
|
+ public RawHttpServicerFactory(HttpServerLocation serverLocation, ConnectionFactory connectionFactory,
|
|
|
|
+ int connectionsPerSocket) {
|
|
|
|
+ super(serverLocation, false, new HttpRequestParserMetaData(100, 1000, 1000000), null, null, true);
|
|
this.objectMapper.registerModule(new AfterburnerModule());
|
|
this.objectMapper.registerModule(new AfterburnerModule());
|
|
this.connectionFactory = connectionFactory;
|
|
this.connectionFactory = connectionFactory;
|
|
|
|
|
|
// Create thread local connection
|
|
// Create thread local connection
|
|
- this.threadLocalConnection = new ThreadLocal<Connection>() {
|
|
|
|
|
|
+ this.threadLocalConnections = new ThreadLocal<Connection[]>() {
|
|
@Override
|
|
@Override
|
|
- protected Connection initialValue() {
|
|
|
|
- return Mono.from(RawHttpServicerFactory.this.connectionFactory.create()).block();
|
|
|
|
|
|
+ protected Connection[] initialValue() {
|
|
|
|
+ Connection[] connections = new Connection[connectionsPerSocket];
|
|
|
|
+ for (int i = 0; i < connections.length; i++) {
|
|
|
|
+ connections[i] = Mono.from(RawHttpServicerFactory.this.connectionFactory.create()).block();
|
|
|
|
+ }
|
|
|
|
+ return connections;
|
|
}
|
|
}
|
|
};
|
|
};
|
|
|
|
|
|
- // Create the db logic
|
|
|
|
- this.db = Mono
|
|
|
|
- .from(this.threadLocalConnection.get()
|
|
|
|
- .createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
|
- .bind(0, ThreadLocalRandom.current().nextInt(1, 10001)).execute())
|
|
|
|
- .flatMap(result -> Mono.from(result.map((row, metadata) -> {
|
|
|
|
- Integer id = row.get(0, Integer.class);
|
|
|
|
- Integer number = row.get(1, Integer.class);
|
|
|
|
- return new World(id, number);
|
|
|
|
- })));
|
|
|
|
-
|
|
|
|
// Load the mustache fortunes template
|
|
// Load the mustache fortunes template
|
|
MustacheFactory mustacheFactory = new DefaultMustacheFactory() {
|
|
MustacheFactory mustacheFactory = new DefaultMustacheFactory() {
|
|
@Override
|
|
@Override
|
|
@@ -316,15 +328,6 @@ public class RawOfficeFloorMain {
|
|
}
|
|
}
|
|
};
|
|
};
|
|
this.fortuneMustache = mustacheFactory.compile("fortunes.mustache");
|
|
this.fortuneMustache = mustacheFactory.compile("fortunes.mustache");
|
|
-
|
|
|
|
- // Create the fortunes logic
|
|
|
|
- this.fortunes = Flux
|
|
|
|
- .from(this.threadLocalConnection.get().createStatement("SELECT ID, MESSAGE FROM FORTUNE").execute())
|
|
|
|
- .flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
|
|
- Integer id = row.get(0, Integer.class);
|
|
|
|
- String message = row.get(1, String.class);
|
|
|
|
- return new Fortune(id, message);
|
|
|
|
- }))).collectList();
|
|
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -336,11 +339,33 @@ public class RawOfficeFloorMain {
|
|
protected void send(ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) throws IOException {
|
|
protected void send(ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) throws IOException {
|
|
try {
|
|
try {
|
|
connection.getServiceFlowCallback().run(null);
|
|
connection.getServiceFlowCallback().run(null);
|
|
|
|
+ } catch (IOException ex) {
|
|
|
|
+ throw ex;
|
|
} catch (Throwable ex) {
|
|
} catch (Throwable ex) {
|
|
throw new IOException(ex);
|
|
throw new IOException(ex);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /*
|
|
|
|
+ * =============== SocketServicerFactory =================
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public SocketServicer<HttpRequestParser> createSocketServicer(
|
|
|
|
+ RequestHandler<HttpRequestParser> requestHandler) {
|
|
|
|
+
|
|
|
|
+ // Ensure rate limits for socket servicing thread
|
|
|
|
+ // Note: will always create before servicing any requests
|
|
|
|
+ if (this.threadLocalRateLimit.get() == null) {
|
|
|
|
+ Connection[] connections = this.threadLocalConnections.get();
|
|
|
|
+ RateLimit rateLimit = new RateLimit(requestHandler, connections);
|
|
|
|
+ this.threadLocalRateLimit.set(rateLimit);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Continue on to create socket servicer
|
|
|
|
+ return super.createSocketServicer(requestHandler);
|
|
|
|
+ }
|
|
|
|
+
|
|
/*
|
|
/*
|
|
* ===================== HttpServicer ====================
|
|
* ===================== HttpServicer ====================
|
|
*/
|
|
*/
|
|
@@ -418,26 +443,34 @@ public class RawOfficeFloorMain {
|
|
private void db(HttpResponse response, ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
private void db(HttpResponse response, ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
|
|
|
|
// Determine if will overload queries
|
|
// Determine if will overload queries
|
|
- RateLimit rateLimit = this.dbRateLimit.get();
|
|
|
|
- if (rateLimit.isLimit(1)) {
|
|
|
|
|
|
+ RateLimitedConnection conn = this.threadLocalRateLimit.get().getAvailableConnection(1);
|
|
|
|
+ if (conn == null) {
|
|
this.sendError(connection, THROTTLED);
|
|
this.sendError(connection, THROTTLED);
|
|
return; // rate limited
|
|
return; // rate limited
|
|
}
|
|
}
|
|
|
|
|
|
// Service
|
|
// Service
|
|
- this.db.subscribe(world -> {
|
|
|
|
- try {
|
|
|
|
- response.setContentType(APPLICATION_JSON, null);
|
|
|
|
- this.objectMapper.writeValue(response.getEntityWriter(), world);
|
|
|
|
- this.send(connection);
|
|
|
|
- } catch (IOException ex) {
|
|
|
|
- ex.printStackTrace();
|
|
|
|
- }
|
|
|
|
- }, error -> {
|
|
|
|
- this.sendError(connection, error);
|
|
|
|
- }, () -> {
|
|
|
|
- rateLimit.processed(1);
|
|
|
|
- });
|
|
|
|
|
|
+ Mono.from(conn.connection.createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
|
+ .bind(0, ThreadLocalRandom.current().nextInt(1, 10001)).execute())
|
|
|
|
+ .flatMap(result -> Mono.from(result.map((row, metadata) -> {
|
|
|
|
+ Integer id = row.get(0, Integer.class);
|
|
|
|
+ Integer number = row.get(1, Integer.class);
|
|
|
|
+ return new World(id, number);
|
|
|
|
+ }))).publishOn(conn.writeScheduler).subscribe(world -> {
|
|
|
|
+ try {
|
|
|
|
+ response.setContentType(APPLICATION_JSON, null);
|
|
|
|
+ this.objectMapper.writeValue(response.getEntityWriter(), world);
|
|
|
|
+ this.send(connection);
|
|
|
|
+ } catch (CancelledKeyException | ClosedChannelException ex) {
|
|
|
|
+ // Ignore as disconnecting client
|
|
|
|
+ } catch (IOException ex) {
|
|
|
|
+ ex.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ }, error -> {
|
|
|
|
+ this.sendError(connection, error);
|
|
|
|
+ }, () -> {
|
|
|
|
+ conn.processed(1);
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
private void queries(String requestUri, HttpResponse response,
|
|
private void queries(String requestUri, HttpResponse response,
|
|
@@ -448,37 +481,35 @@ public class RawOfficeFloorMain {
|
|
int queryCount = getQueryCount(queriesCountText);
|
|
int queryCount = getQueryCount(queriesCountText);
|
|
|
|
|
|
// Determine if will overload queries
|
|
// Determine if will overload queries
|
|
- RateLimit rateLimit = this.queriesRateLimit.get();
|
|
|
|
- if (queryCount < LARGE_QUERY) {
|
|
|
|
- if (rateLimit.isLimit(queryCount)) {
|
|
|
|
- this.sendError(connection, THROTTLED);
|
|
|
|
- return; // rate limited
|
|
|
|
- }
|
|
|
|
|
|
+ RateLimitedConnection conn = this.threadLocalRateLimit.get().getAvailableConnection(queryCount);
|
|
|
|
+ if (conn == null) {
|
|
|
|
+ this.sendError(connection, THROTTLED);
|
|
|
|
+ return; // rate limited
|
|
}
|
|
}
|
|
|
|
|
|
// Service
|
|
// Service
|
|
- Connection dbConn = this.threadLocalConnection.get();
|
|
|
|
Flux.range(1, queryCount)
|
|
Flux.range(1, queryCount)
|
|
- .flatMap(index -> dbConn.createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
|
- .bind(0, ThreadLocalRandom.current().nextInt(1, 10001)).execute())
|
|
|
|
|
|
+ .flatMap(
|
|
|
|
+ index -> conn.connection.createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
|
+ .bind(0, ThreadLocalRandom.current().nextInt(1, 10001)).execute())
|
|
.flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
.flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
Integer id = row.get(0, Integer.class);
|
|
Integer id = row.get(0, Integer.class);
|
|
Integer number = row.get(1, Integer.class);
|
|
Integer number = row.get(1, Integer.class);
|
|
return new World(id, number);
|
|
return new World(id, number);
|
|
- }))).collectList().subscribe(worlds -> {
|
|
|
|
|
|
+ }))).collectList().publishOn(conn.writeScheduler).subscribe(worlds -> {
|
|
try {
|
|
try {
|
|
response.setContentType(APPLICATION_JSON, null);
|
|
response.setContentType(APPLICATION_JSON, null);
|
|
this.objectMapper.writeValue(response.getEntityWriter(), worlds);
|
|
this.objectMapper.writeValue(response.getEntityWriter(), worlds);
|
|
this.send(connection);
|
|
this.send(connection);
|
|
|
|
+ } catch (CancelledKeyException | ClosedChannelException ex) {
|
|
|
|
+ // Ignore as disconnecting client
|
|
} catch (IOException ex) {
|
|
} catch (IOException ex) {
|
|
ex.printStackTrace();
|
|
ex.printStackTrace();
|
|
}
|
|
}
|
|
}, error -> {
|
|
}, error -> {
|
|
this.sendError(connection, error);
|
|
this.sendError(connection, error);
|
|
}, () -> {
|
|
}, () -> {
|
|
- if (queryCount < LARGE_QUERY) {
|
|
|
|
- rateLimit.processed(queryCount);
|
|
|
|
- }
|
|
|
|
|
|
+ conn.processed(queryCount);
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
@@ -486,31 +517,38 @@ public class RawOfficeFloorMain {
|
|
ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
|
|
|
|
// Determine if will overload queries
|
|
// Determine if will overload queries
|
|
- RateLimit rateLimit = this.fortunesRateLimit.get();
|
|
|
|
- if (rateLimit.isLimit(1)) {
|
|
|
|
|
|
+ RateLimitedConnection conn = this.threadLocalRateLimit.get().getAvailableConnection(1);
|
|
|
|
+ if (conn == null) {
|
|
this.sendError(connection, THROTTLED);
|
|
this.sendError(connection, THROTTLED);
|
|
return; // rate limited
|
|
return; // rate limited
|
|
}
|
|
}
|
|
|
|
|
|
// Service
|
|
// Service
|
|
- this.fortunes.subscribe(fortunes -> {
|
|
|
|
- try {
|
|
|
|
- // Additional fortunes
|
|
|
|
- fortunes.add(new Fortune(0, "Additional fortune added at request time."));
|
|
|
|
- Collections.sort(fortunes, (a, b) -> a.message.compareTo(b.message));
|
|
|
|
-
|
|
|
|
- // Send response
|
|
|
|
- response.setContentType(TEXT_HTML, null);
|
|
|
|
- this.fortuneMustache.execute(response.getEntityWriter(), fortunes);
|
|
|
|
- this.send(connection);
|
|
|
|
- } catch (IOException ex) {
|
|
|
|
- ex.printStackTrace();
|
|
|
|
- }
|
|
|
|
- }, error -> {
|
|
|
|
- this.sendError(connection, error);
|
|
|
|
- }, () -> {
|
|
|
|
- rateLimit.processed(1);
|
|
|
|
- });
|
|
|
|
|
|
+ Flux.from(conn.connection.createStatement("SELECT ID, MESSAGE FROM FORTUNE").execute())
|
|
|
|
+ .flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
|
|
+ Integer id = row.get(0, Integer.class);
|
|
|
|
+ String message = row.get(1, String.class);
|
|
|
|
+ return new Fortune(id, message);
|
|
|
|
+ }))).collectList().publishOn(conn.writeScheduler).subscribe(fortunes -> {
|
|
|
|
+ try {
|
|
|
|
+ // Additional fortunes
|
|
|
|
+ fortunes.add(new Fortune(0, "Additional fortune added at request time."));
|
|
|
|
+ Collections.sort(fortunes, (a, b) -> a.message.compareTo(b.message));
|
|
|
|
+
|
|
|
|
+ // Send response
|
|
|
|
+ response.setContentType(TEXT_HTML, null);
|
|
|
|
+ this.fortuneMustache.execute(response.getEntityWriter(), fortunes);
|
|
|
|
+ this.send(connection);
|
|
|
|
+ } catch (CancelledKeyException | ClosedChannelException ex) {
|
|
|
|
+ // Ignore as disconnecting client
|
|
|
|
+ } catch (IOException ex) {
|
|
|
|
+ ex.printStackTrace();
|
|
|
|
+ }
|
|
|
|
+ }, error -> {
|
|
|
|
+ this.sendError(connection, error);
|
|
|
|
+ }, () -> {
|
|
|
|
+ conn.processed(1);
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
private void update(String requestUri, HttpResponse response,
|
|
private void update(String requestUri, HttpResponse response,
|
|
@@ -519,49 +557,47 @@ public class RawOfficeFloorMain {
|
|
// Obtain the number of queries
|
|
// Obtain the number of queries
|
|
String queriesCountText = requestUri.substring(UPDATE_PATH_PREFIX.length());
|
|
String queriesCountText = requestUri.substring(UPDATE_PATH_PREFIX.length());
|
|
int queryCount = getQueryCount(queriesCountText);
|
|
int queryCount = getQueryCount(queriesCountText);
|
|
|
|
+ int executeQueryCount = queryCount + 1; // select all and update
|
|
|
|
|
|
// Determine if will overload queries
|
|
// Determine if will overload queries
|
|
- int executedQueryCount = queryCount * 2; // select and update
|
|
|
|
- RateLimit rateLimit = this.updatesRateLimit.get();
|
|
|
|
- if (queryCount < LARGE_QUERY) {
|
|
|
|
- if (rateLimit.isLimit(executedQueryCount)) {
|
|
|
|
- this.sendError(connection, THROTTLED);
|
|
|
|
- return; // rate limited
|
|
|
|
- }
|
|
|
|
|
|
+ RateLimitedConnection conn = this.threadLocalRateLimit.get().getAvailableConnection(executeQueryCount);
|
|
|
|
+ if (conn == null) {
|
|
|
|
+ this.sendError(connection, THROTTLED);
|
|
|
|
+ return; // rate limited
|
|
}
|
|
}
|
|
|
|
|
|
// Service
|
|
// Service
|
|
- Connection db = this.threadLocalConnection.get();
|
|
|
|
Flux.range(1, queryCount)
|
|
Flux.range(1, queryCount)
|
|
- .flatMap(index -> db.createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
|
- .bind(0, ThreadLocalRandom.current().nextInt(1, 10001)).execute())
|
|
|
|
|
|
+ .flatMap(
|
|
|
|
+ index -> conn.connection.createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
|
+ .bind(0, ThreadLocalRandom.current().nextInt(1, 10001)).execute())
|
|
.flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
.flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
Integer id = row.get(0, Integer.class);
|
|
Integer id = row.get(0, Integer.class);
|
|
Integer number = row.get(1, Integer.class);
|
|
Integer number = row.get(1, Integer.class);
|
|
return new World(id, number);
|
|
return new World(id, number);
|
|
}))).collectList().flatMap(worlds -> {
|
|
}))).collectList().flatMap(worlds -> {
|
|
Collections.sort(worlds, (a, b) -> a.id - b.id);
|
|
Collections.sort(worlds, (a, b) -> a.id - b.id);
|
|
- Batch batch = db.createBatch();
|
|
|
|
|
|
+ Batch batch = conn.connection.createBatch();
|
|
for (World world : worlds) {
|
|
for (World world : worlds) {
|
|
world.randomNumber = ThreadLocalRandom.current().nextInt(1, 10001);
|
|
world.randomNumber = ThreadLocalRandom.current().nextInt(1, 10001);
|
|
batch.add("UPDATE WORLD SET RANDOMNUMBER = " + world.randomNumber + " WHERE ID = "
|
|
batch.add("UPDATE WORLD SET RANDOMNUMBER = " + world.randomNumber + " WHERE ID = "
|
|
+ world.id);
|
|
+ world.id);
|
|
}
|
|
}
|
|
return Mono.from(batch.execute()).map((result) -> worlds);
|
|
return Mono.from(batch.execute()).map((result) -> worlds);
|
|
- }).subscribe(worlds -> {
|
|
|
|
|
|
+ }).publishOn(conn.writeScheduler).subscribe(worlds -> {
|
|
try {
|
|
try {
|
|
response.setContentType(APPLICATION_JSON, null);
|
|
response.setContentType(APPLICATION_JSON, null);
|
|
this.objectMapper.writeValue(response.getEntityWriter(), worlds);
|
|
this.objectMapper.writeValue(response.getEntityWriter(), worlds);
|
|
this.send(connection);
|
|
this.send(connection);
|
|
|
|
+ } catch (CancelledKeyException | ClosedChannelException ex) {
|
|
|
|
+ // Ignore as disconnecting client
|
|
} catch (IOException ex) {
|
|
} catch (IOException ex) {
|
|
ex.printStackTrace();
|
|
ex.printStackTrace();
|
|
}
|
|
}
|
|
}, error -> {
|
|
}, error -> {
|
|
this.sendError(connection, error);
|
|
this.sendError(connection, error);
|
|
}, () -> {
|
|
}, () -> {
|
|
- if (queryCount < LARGE_QUERY) {
|
|
|
|
- rateLimit.processed(executedQueryCount);
|
|
|
|
- }
|
|
|
|
|
|
+ conn.processed(executeQueryCount);
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
@@ -589,6 +625,8 @@ public class RawOfficeFloorMain {
|
|
// Send error response
|
|
// Send error response
|
|
this.send(connection);
|
|
this.send(connection);
|
|
|
|
|
|
|
|
+ } catch (CancelledKeyException | ClosedChannelException ex) {
|
|
|
|
+ // Ignore as disconnecting client
|
|
} catch (IOException ex) {
|
|
} catch (IOException ex) {
|
|
ex.printStackTrace();
|
|
ex.printStackTrace();
|
|
}
|
|
}
|
|
@@ -604,64 +642,64 @@ public class RawOfficeFloorMain {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private static class ThreadLocalRateLimit extends ThreadLocal<RateLimit> {
|
|
|
|
- @Override
|
|
|
|
- protected RateLimit initialValue() {
|
|
|
|
- return new RateLimit();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
private static class RateLimit {
|
|
private static class RateLimit {
|
|
|
|
|
|
- private final int INITIAL_REQUEST_COUNT = 738 / Runtime.getRuntime().availableProcessors();
|
|
|
|
|
|
+ private final RateLimitedConnection[] rateLimitedConnections;
|
|
|
|
|
|
- private int requestCount = 0;
|
|
|
|
|
|
+ private final Executor socketExecutor;
|
|
|
|
|
|
- private final AtomicInteger activeQueries = new AtomicInteger(0);
|
|
|
|
|
|
+ private RateLimit(RequestHandler<HttpRequestParser> requestHandler, Connection[] connections) {
|
|
|
|
|
|
- private boolean isActiveLimit = false;
|
|
|
|
|
|
+ // Create the write scheduler
|
|
|
|
+ this.socketExecutor = (runnable) -> requestHandler.execute(() -> {
|
|
|
|
+ runnable.run();
|
|
|
|
+ });
|
|
|
|
+ Scheduler writeScheduler = Schedulers.fromExecutor(this.socketExecutor);
|
|
|
|
|
|
- public boolean isLimit(int queryCount) {
|
|
|
|
|
|
+ // Create the rate limited connections
|
|
|
|
+ this.rateLimitedConnections = new RateLimitedConnection[connections.length];
|
|
|
|
+ for (int i = 0; i < this.rateLimitedConnections.length; i++) {
|
|
|
|
+ this.rateLimitedConnections[i] = new RateLimitedConnection(connections[i], writeScheduler);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- // Increment the request count
|
|
|
|
- this.requestCount = this.requestCount + ((this.requestCount > INITIAL_REQUEST_COUNT) ? 0 : 1);
|
|
|
|
|
|
+ private RateLimitedConnection getAvailableConnection(int queryCount) {
|
|
|
|
|
|
- // Ensure initial requests are processed
|
|
|
|
- do {
|
|
|
|
|
|
+ // Determine available connection for limit
|
|
|
|
+ for (int i = 0; i < this.rateLimitedConnections.length; i++) {
|
|
|
|
+ RateLimitedConnection connection = this.rateLimitedConnections[i];
|
|
|
|
|
|
// Determine if query count reached
|
|
// Determine if query count reached
|
|
- this.activeQueries.updateAndGet((count) -> {
|
|
|
|
- int newCount = count + queryCount;
|
|
|
|
- if (newCount > QUERY_BUFFER_SIZE) {
|
|
|
|
- // Max limit reached
|
|
|
|
- this.isActiveLimit = true;
|
|
|
|
- return count;
|
|
|
|
- } else {
|
|
|
|
- // Allow input
|
|
|
|
- this.isActiveLimit = false;
|
|
|
|
- return newCount;
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
- if (this.requestCount > INITIAL_REQUEST_COUNT) {
|
|
|
|
- // Initial requests processed, so start possible throttling
|
|
|
|
- return this.isActiveLimit;
|
|
|
|
|
|
+ int newCount = connection.activeQueries + queryCount;
|
|
|
|
+ if (newCount <= QUERY_BUFFER_SIZE) {
|
|
|
|
+ // Connection available for load
|
|
|
|
+ connection.activeQueries = newCount;
|
|
|
|
+ return connection;
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- // Allow some processing time if limit hit
|
|
|
|
- if (this.isActiveLimit) {
|
|
|
|
- try {
|
|
|
|
- Thread.sleep(1);
|
|
|
|
- } catch (InterruptedException ex) {
|
|
|
|
- // continue processing
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ // As here, no available connection
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static class RateLimitedConnection {
|
|
|
|
|
|
- } while (this.isActiveLimit);
|
|
|
|
- return false; // below limit
|
|
|
|
|
|
+ private final Scheduler writeScheduler;
|
|
|
|
+
|
|
|
|
+ private final Connection connection;
|
|
|
|
+
|
|
|
|
+ private int activeQueries;
|
|
|
|
+
|
|
|
|
+ private RateLimitedConnection(Connection connection, Scheduler writeScheduler) {
|
|
|
|
+ this.connection = connection;
|
|
|
|
+ this.writeScheduler = writeScheduler;
|
|
}
|
|
}
|
|
|
|
|
|
- public void processed(int queryCount) {
|
|
|
|
- this.activeQueries.updateAndGet((count) -> count - queryCount);
|
|
|
|
|
|
+ private void processed(int queryCount) {
|
|
|
|
+
|
|
|
|
+ // Update the active queries
|
|
|
|
+ this.activeQueries -= queryCount;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|