|
@@ -18,6 +18,7 @@
|
|
|
package net.officefloor.benchmark;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.io.PrintWriter;
|
|
|
import java.io.Writer;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.time.ZoneOffset;
|
|
@@ -29,6 +30,7 @@ import java.util.Timer;
|
|
|
import java.util.TimerTask;
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.logging.Logger;
|
|
|
|
|
|
import org.apache.commons.text.StringEscapeUtils;
|
|
@@ -39,10 +41,12 @@ import com.github.mustachejava.DefaultMustacheFactory;
|
|
|
import com.github.mustachejava.Mustache;
|
|
|
import com.github.mustachejava.MustacheFactory;
|
|
|
|
|
|
+import io.r2dbc.spi.Batch;
|
|
|
import io.r2dbc.spi.Connection;
|
|
|
import io.r2dbc.spi.ConnectionFactories;
|
|
|
import io.r2dbc.spi.ConnectionFactory;
|
|
|
import io.r2dbc.spi.ConnectionFactoryOptions;
|
|
|
+import io.r2dbc.spi.R2dbcTransientResourceException;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.Data;
|
|
|
import net.officefloor.frame.api.manage.OfficeFloor;
|
|
@@ -76,6 +80,11 @@ import reactor.core.publisher.Mono;
|
|
|
*/
|
|
|
public class RawOfficeFloorMain {
|
|
|
|
|
|
+ /**
|
|
|
+ * Buffer size of queries.
|
|
|
+ */
|
|
|
+ private static final int QUERY_BUFFER_SIZE = 512;
|
|
|
+
|
|
|
/**
|
|
|
* {@link SocketManager}.
|
|
|
*/
|
|
@@ -104,8 +113,8 @@ public class RawOfficeFloorMain {
|
|
|
"tfb-database");
|
|
|
System.out.println("Starting server on port " + port + " talking to database " + server);
|
|
|
|
|
|
- // Increase the buffer size
|
|
|
- System.setProperty("reactor.bufferSize.small", String.valueOf(10000));
|
|
|
+ // Increase the buffer size (note: too high and cause OOM issues)
|
|
|
+ System.setProperty("reactor.bufferSize.small", String.valueOf(QUERY_BUFFER_SIZE));
|
|
|
|
|
|
// Build the connection pool
|
|
|
ConnectionFactoryOptions factoryOptions = ConnectionFactoryOptions.builder()
|
|
@@ -174,6 +183,10 @@ public class RawOfficeFloorMain {
|
|
|
|
|
|
private static final String UPDATE_PATH_PREFIX = "/update?queries=";
|
|
|
|
|
|
+ private static final R2dbcTransientResourceException THROTTLED = new R2dbcTransientResourceException();
|
|
|
+
|
|
|
+ private static final int LARGE_QUERY = 100;
|
|
|
+
|
|
|
/**
|
|
|
* <code>Date</code> {@link HttpHeaderValue}.
|
|
|
*/
|
|
@@ -223,6 +236,26 @@ public class RawOfficeFloorMain {
|
|
|
*/
|
|
|
private final ThreadLocal<Connection> threadLocalConnection;
|
|
|
|
|
|
+ /**
|
|
|
+ * db {@link ThreadLocalRateLimit}.
|
|
|
+ */
|
|
|
+ private final ThreadLocalRateLimit dbRateLimit = new ThreadLocalRateLimit();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * queries {@link ThreadLocalRateLimit}.
|
|
|
+ */
|
|
|
+ private final ThreadLocalRateLimit queriesRateLimit = new ThreadLocalRateLimit();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * fortunes {@link ThreadLocalRateLimit}.
|
|
|
+ */
|
|
|
+ private final ThreadLocalRateLimit fortunesRateLimit = new ThreadLocalRateLimit();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * updates {@link ThreadLocalRateLimit}.
|
|
|
+ */
|
|
|
+ private final ThreadLocalRateLimit updatesRateLimit = new ThreadLocalRateLimit();
|
|
|
+
|
|
|
/**
|
|
|
* {@link Mono} to service /db.
|
|
|
*/
|
|
@@ -383,6 +416,15 @@ public class RawOfficeFloorMain {
|
|
|
}
|
|
|
|
|
|
private void db(HttpResponse response, ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
|
+
|
|
|
+ // Determine if will overload queries
|
|
|
+ RateLimit rateLimit = this.dbRateLimit.get();
|
|
|
+ if (rateLimit.isLimit(1)) {
|
|
|
+ this.sendError(connection, THROTTLED);
|
|
|
+ return; // rate limited
|
|
|
+ }
|
|
|
+
|
|
|
+ // Service
|
|
|
this.db.subscribe(world -> {
|
|
|
try {
|
|
|
response.setContentType(APPLICATION_JSON, null);
|
|
@@ -393,16 +435,31 @@ public class RawOfficeFloorMain {
|
|
|
}
|
|
|
}, error -> {
|
|
|
this.sendError(connection, error);
|
|
|
+ }, () -> {
|
|
|
+ rateLimit.processed(1);
|
|
|
});
|
|
|
}
|
|
|
|
|
|
private void queries(String requestUri, HttpResponse response,
|
|
|
ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
|
+
|
|
|
+ // Obtain the number of queries
|
|
|
String queriesCountText = requestUri.substring(QUERIES_PATH_PREFIX.length());
|
|
|
int queryCount = getQueryCount(queriesCountText);
|
|
|
+
|
|
|
+ // Determine if will overload queries
|
|
|
+ RateLimit rateLimit = this.queriesRateLimit.get();
|
|
|
+ if (queryCount < LARGE_QUERY) {
|
|
|
+ if (rateLimit.isLimit(queryCount)) {
|
|
|
+ this.sendError(connection, THROTTLED);
|
|
|
+ return; // rate limited
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Service
|
|
|
+ Connection dbConn = this.threadLocalConnection.get();
|
|
|
Flux.range(1, queryCount)
|
|
|
- .flatMap(index -> this.threadLocalConnection.get()
|
|
|
- .createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
+ .flatMap(index -> dbConn.createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
.bind(0, ThreadLocalRandom.current().nextInt(1, 10001)).execute())
|
|
|
.flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
|
Integer id = row.get(0, Integer.class);
|
|
@@ -418,11 +475,24 @@ public class RawOfficeFloorMain {
|
|
|
}
|
|
|
}, error -> {
|
|
|
this.sendError(connection, error);
|
|
|
+ }, () -> {
|
|
|
+ if (queryCount < LARGE_QUERY) {
|
|
|
+ rateLimit.processed(queryCount);
|
|
|
+ }
|
|
|
});
|
|
|
}
|
|
|
|
|
|
private void fortunes(HttpResponse response,
|
|
|
ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
|
+
|
|
|
+ // Determine if will overload queries
|
|
|
+ RateLimit rateLimit = this.fortunesRateLimit.get();
|
|
|
+ if (rateLimit.isLimit(1)) {
|
|
|
+ this.sendError(connection, THROTTLED);
|
|
|
+ return; // rate limited
|
|
|
+ }
|
|
|
+
|
|
|
+ // Service
|
|
|
this.fortunes.subscribe(fortunes -> {
|
|
|
try {
|
|
|
// Additional fortunes
|
|
@@ -438,13 +508,29 @@ public class RawOfficeFloorMain {
|
|
|
}
|
|
|
}, error -> {
|
|
|
this.sendError(connection, error);
|
|
|
+ }, () -> {
|
|
|
+ rateLimit.processed(1);
|
|
|
});
|
|
|
}
|
|
|
|
|
|
private void update(String requestUri, HttpResponse response,
|
|
|
ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
|
+
|
|
|
+ // Obtain the number of queries
|
|
|
String queriesCountText = requestUri.substring(UPDATE_PATH_PREFIX.length());
|
|
|
int queryCount = getQueryCount(queriesCountText);
|
|
|
+
|
|
|
+ // Determine if will overload queries
|
|
|
+ int executedQueryCount = queryCount * 2; // select and update
|
|
|
+ RateLimit rateLimit = this.updatesRateLimit.get();
|
|
|
+ if (queryCount < LARGE_QUERY) {
|
|
|
+ if (rateLimit.isLimit(executedQueryCount)) {
|
|
|
+ this.sendError(connection, THROTTLED);
|
|
|
+ return; // rate limited
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Service
|
|
|
Connection db = this.threadLocalConnection.get();
|
|
|
Flux.range(1, queryCount)
|
|
|
.flatMap(index -> db.createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
@@ -453,13 +539,16 @@ public class RawOfficeFloorMain {
|
|
|
Integer id = row.get(0, Integer.class);
|
|
|
Integer number = row.get(1, Integer.class);
|
|
|
return new World(id, number);
|
|
|
- }))).flatMap(world -> {
|
|
|
- world.randomNumber = ThreadLocalRandom.current().nextInt(1, 10001);
|
|
|
- return Flux
|
|
|
- .from(db.createStatement("UPDATE WORLD SET RANDOMNUMBER = $1 WHERE ID = $2")
|
|
|
- .bind(0, world.randomNumber).bind(1, world.id).execute())
|
|
|
- .flatMap(result -> Flux.from(result.getRowsUpdated()).map(updated -> world));
|
|
|
- }).collectList().subscribe(worlds -> {
|
|
|
+ }))).collectList().flatMap(worlds -> {
|
|
|
+ Collections.sort(worlds, (a, b) -> a.id - b.id);
|
|
|
+ Batch batch = db.createBatch();
|
|
|
+ for (World world : worlds) {
|
|
|
+ world.randomNumber = ThreadLocalRandom.current().nextInt(1, 10001);
|
|
|
+ batch.add("UPDATE WORLD SET RANDOMNUMBER = " + world.randomNumber + " WHERE ID = "
|
|
|
+ + world.id);
|
|
|
+ }
|
|
|
+ return Mono.from(batch.execute()).map((result) -> worlds);
|
|
|
+ }).subscribe(worlds -> {
|
|
|
try {
|
|
|
response.setContentType(APPLICATION_JSON, null);
|
|
|
this.objectMapper.writeValue(response.getEntityWriter(), worlds);
|
|
@@ -469,18 +558,37 @@ public class RawOfficeFloorMain {
|
|
|
}
|
|
|
}, error -> {
|
|
|
this.sendError(connection, error);
|
|
|
+ }, () -> {
|
|
|
+ if (queryCount < LARGE_QUERY) {
|
|
|
+ rateLimit.processed(executedQueryCount);
|
|
|
+ }
|
|
|
});
|
|
|
}
|
|
|
|
|
|
private void sendError(ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection,
|
|
|
Throwable failure) {
|
|
|
try {
|
|
|
- failure.printStackTrace();
|
|
|
|
|
|
+ // Setup to send response
|
|
|
HttpResponse response = connection.getResponse();
|
|
|
response.reset();
|
|
|
- response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR);
|
|
|
+
|
|
|
+ // Determine type of error
|
|
|
+ if (failure instanceof R2dbcTransientResourceException) {
|
|
|
+
|
|
|
+ // Indicate overloaded
|
|
|
+ response.setStatus(HttpStatus.SERVICE_UNAVAILABLE);
|
|
|
+
|
|
|
+ } else {
|
|
|
+ // Provide details of failure
|
|
|
+ response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR);
|
|
|
+ response.setContentType(TEXT_PLAIN, null);
|
|
|
+ failure.printStackTrace(new PrintWriter(response.getEntityWriter()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Send error response
|
|
|
this.send(connection);
|
|
|
+
|
|
|
} catch (IOException ex) {
|
|
|
ex.printStackTrace();
|
|
|
}
|
|
@@ -496,6 +604,67 @@ public class RawOfficeFloorMain {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static class ThreadLocalRateLimit extends ThreadLocal<RateLimit> {
|
|
|
+ @Override
|
|
|
+ protected RateLimit initialValue() {
|
|
|
+ return new RateLimit();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class RateLimit {
|
|
|
+
|
|
|
+ private final int INITIAL_REQUEST_COUNT = 738 / Runtime.getRuntime().availableProcessors();
|
|
|
+
|
|
|
+ private int requestCount = 0;
|
|
|
+
|
|
|
+ private final AtomicInteger activeQueries = new AtomicInteger(0);
|
|
|
+
|
|
|
+ private boolean isActiveLimit = false;
|
|
|
+
|
|
|
+ public boolean isLimit(int queryCount) {
|
|
|
+
|
|
|
+ // Increment the request count
|
|
|
+ this.requestCount = this.requestCount + ((this.requestCount > INITIAL_REQUEST_COUNT) ? 0 : 1);
|
|
|
+
|
|
|
+ // Ensure initial requests are processed
|
|
|
+ do {
|
|
|
+
|
|
|
+ // Determine if query count reached
|
|
|
+ this.activeQueries.updateAndGet((count) -> {
|
|
|
+ int newCount = count + queryCount;
|
|
|
+ if (newCount > QUERY_BUFFER_SIZE) {
|
|
|
+ // Max limit reached
|
|
|
+ this.isActiveLimit = true;
|
|
|
+ return count;
|
|
|
+ } else {
|
|
|
+ // Allow input
|
|
|
+ this.isActiveLimit = false;
|
|
|
+ return newCount;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ if (this.requestCount > INITIAL_REQUEST_COUNT) {
|
|
|
+ // Initial requests processed, so start possible throttling
|
|
|
+ return this.isActiveLimit;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Allow some processing time if limit hit
|
|
|
+ if (this.isActiveLimit) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1);
|
|
|
+ } catch (InterruptedException ex) {
|
|
|
+ // continue processing
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ } while (this.isActiveLimit);
|
|
|
+ return false; // below limit
|
|
|
+ }
|
|
|
+
|
|
|
+ public void processed(int queryCount) {
|
|
|
+ this.activeQueries.updateAndGet((count) -> count - queryCount);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Data
|
|
|
public static class Message {
|
|
|
private final String message;
|