|
@@ -18,18 +18,32 @@
|
|
|
package net.officefloor.benchmark;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.io.Writer;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.time.ZoneOffset;
|
|
|
import java.time.ZonedDateTime;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
import java.util.Timer;
|
|
|
import java.util.TimerTask;
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadLocalRandom;
|
|
|
import java.util.logging.Logger;
|
|
|
|
|
|
+import org.apache.commons.text.StringEscapeUtils;
|
|
|
+
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
|
|
|
-
|
|
|
+import com.github.mustachejava.DefaultMustacheFactory;
|
|
|
+import com.github.mustachejava.Mustache;
|
|
|
+import com.github.mustachejava.MustacheFactory;
|
|
|
+
|
|
|
+import io.r2dbc.spi.Connection;
|
|
|
+import io.r2dbc.spi.ConnectionFactories;
|
|
|
+import io.r2dbc.spi.ConnectionFactory;
|
|
|
+import io.r2dbc.spi.ConnectionFactoryOptions;
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
import lombok.Data;
|
|
|
import net.officefloor.frame.api.manage.OfficeFloor;
|
|
|
import net.officefloor.frame.api.manage.ProcessManager;
|
|
@@ -51,6 +65,8 @@ import net.officefloor.server.http.impl.ProcessAwareServerHttpConnectionManagedO
|
|
|
import net.officefloor.server.http.parse.HttpRequestParser.HttpRequestParserMetaData;
|
|
|
import net.officefloor.server.stream.StreamBufferPool;
|
|
|
import net.officefloor.server.stream.impl.ThreadLocalStreamBufferPool;
|
|
|
+import reactor.core.publisher.Flux;
|
|
|
+import reactor.core.publisher.Mono;
|
|
|
|
|
|
/**
|
|
|
* <p>
|
|
@@ -83,6 +99,23 @@ public class RawOfficeFloorMain {
|
|
|
socketManager.shutdown();
|
|
|
}
|
|
|
|
|
|
+ // Indicate details
|
|
|
+ String server = System.getProperty("OFFICE.net_officefloor_jdbc_DataSourceManagedObjectSource.server",
|
|
|
+ "tfb-database");
|
|
|
+ System.out.println("Starting server on port " + port + " talking to database " + server);
|
|
|
+
|
|
|
+ // Increase the buffer size
|
|
|
+ System.setProperty("reactor.bufferSize.small", String.valueOf(10000));
|
|
|
+
|
|
|
+ // Build the connection pool
|
|
|
+ ConnectionFactoryOptions factoryOptions = ConnectionFactoryOptions.builder()
|
|
|
+ .option(ConnectionFactoryOptions.DRIVER, "pool").option(ConnectionFactoryOptions.PROTOCOL, "postgresql")
|
|
|
+ .option(ConnectionFactoryOptions.HOST, server).option(ConnectionFactoryOptions.PORT, 5432)
|
|
|
+ .option(ConnectionFactoryOptions.DATABASE, "hello_world")
|
|
|
+ .option(ConnectionFactoryOptions.USER, "benchmarkdbuser")
|
|
|
+ .option(ConnectionFactoryOptions.PASSWORD, "benchmarkdbpass").build();
|
|
|
+ ConnectionFactory connectionFactory = ConnectionFactories.get(factoryOptions);
|
|
|
+
|
|
|
// Create the server location
|
|
|
HttpServerLocation serverLocation = new HttpServerLocationImpl("localhost", port, -1);
|
|
|
|
|
@@ -91,6 +124,7 @@ public class RawOfficeFloorMain {
|
|
|
for (int i = 0; i < executionStrategy.length; i++) {
|
|
|
executionStrategy[i] = (runnable) -> new Thread(runnable);
|
|
|
}
|
|
|
+ System.out.println("Using " + executionStrategy.length + " executors");
|
|
|
|
|
|
// Create the socket manager
|
|
|
socketManager = HttpServerSocketManagedObjectSource.createSocketManager(executionStrategy);
|
|
@@ -98,7 +132,8 @@ public class RawOfficeFloorMain {
|
|
|
// Create raw HTTP servicing
|
|
|
StreamBufferPool<ByteBuffer> serviceBufferPool = new ThreadLocalStreamBufferPool(
|
|
|
() -> ByteBuffer.allocateDirect(8046), Integer.MAX_VALUE, Integer.MAX_VALUE);
|
|
|
- RawHttpServicerFactory serviceFactory = new RawHttpServicerFactory(serverLocation, serviceBufferPool);
|
|
|
+ RawHttpServicerFactory serviceFactory = new RawHttpServicerFactory(serverLocation, serviceBufferPool,
|
|
|
+ connectionFactory);
|
|
|
socketManager.bindServerSocket(serverLocation.getClusterHttpPort(), null, null, serviceFactory, serviceFactory);
|
|
|
|
|
|
// Setup Date
|
|
@@ -110,9 +145,10 @@ public class RawOfficeFloorMain {
|
|
|
for (int i = 0; i < runnables.length; i++) {
|
|
|
executionStrategy[i].newThread(runnables[i]).start();
|
|
|
}
|
|
|
+ Thread.sleep(1000); // allow threads to start up
|
|
|
|
|
|
// Indicate running
|
|
|
- System.out.println("OfficeFloor raw running");
|
|
|
+ System.out.println("OfficeFloor raw running on port " + serverLocation.getClusterHttpPort());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -128,10 +164,16 @@ public class RawOfficeFloorMain {
|
|
|
|
|
|
private static byte[] HELLO_WORLD = "Hello, World!".getBytes(ServerHttpConnection.DEFAULT_HTTP_ENTITY_CHARSET);
|
|
|
|
|
|
- private static HttpHeaderValue APPLICATION_JSON = new HttpHeaderValue("application/json");
|
|
|
+ private static final HttpHeaderValue APPLICATION_JSON = new HttpHeaderValue("application/json");
|
|
|
|
|
|
private static final HttpHeaderValue TEXT_PLAIN = new HttpHeaderValue("text/plain");
|
|
|
|
|
|
+ private static final HttpHeaderValue TEXT_HTML = new HttpHeaderValue("text/html;charset=utf-8");
|
|
|
+
|
|
|
+ private static final String QUERIES_PATH_PREFIX = "/queries?queries=";
|
|
|
+
|
|
|
+ private static final String UPDATE_PATH_PREFIX = "/update?queries=";
|
|
|
+
|
|
|
/**
|
|
|
* <code>Date</code> {@link HttpHeaderValue}.
|
|
|
*/
|
|
@@ -171,17 +213,85 @@ public class RawOfficeFloorMain {
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+ /**
|
|
|
+ * {@link ConnectionFactory}.
|
|
|
+ */
|
|
|
+ private final ConnectionFactory connectionFactory;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link ThreadLocal} {@link Connection}.
|
|
|
+ */
|
|
|
+ private final ThreadLocal<Connection> threadLocalConnection;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link Mono} to service /db.
|
|
|
+ */
|
|
|
+ private final Mono<World> db;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link Mustache} for /fortunes.
|
|
|
+ */
|
|
|
+ private final Mustache fortuneMustache;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link Mono} to service /fortunes.
|
|
|
+ */
|
|
|
+ private final Mono<List<Fortune>> fortunes;
|
|
|
+
|
|
|
/**
|
|
|
* Instantiate.
|
|
|
*
|
|
|
* @param serverLocation {@link HttpServerLocation}.
|
|
|
* @param serviceBufferPool {@link StreamBufferPool}.
|
|
|
+ * @param connectionFactory {@link ConnectionFactory}.
|
|
|
*/
|
|
|
- public RawHttpServicerFactory(HttpServerLocation serverLocation,
|
|
|
- StreamBufferPool<ByteBuffer> serviceBufferPool) {
|
|
|
+ public RawHttpServicerFactory(HttpServerLocation serverLocation, StreamBufferPool<ByteBuffer> serviceBufferPool,
|
|
|
+ ConnectionFactory connectionFactory) {
|
|
|
super(serverLocation, false, new HttpRequestParserMetaData(100, 1000, 1000000), serviceBufferPool, null,
|
|
|
null, true);
|
|
|
this.objectMapper.registerModule(new AfterburnerModule());
|
|
|
+ this.connectionFactory = connectionFactory;
|
|
|
+
|
|
|
+ // Create thread local connection
|
|
|
+ this.threadLocalConnection = new ThreadLocal<Connection>() {
|
|
|
+ @Override
|
|
|
+ protected Connection initialValue() {
|
|
|
+ return Mono.from(RawHttpServicerFactory.this.connectionFactory.create()).block();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ // Create the db logic
|
|
|
+ this.db = Mono
|
|
|
+ .from(this.threadLocalConnection.get()
|
|
|
+ .createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
+ .bind(0, ThreadLocalRandom.current().nextInt(1, 10001)).execute())
|
|
|
+ .flatMap(result -> Mono.from(result.map((row, metadata) -> {
|
|
|
+ Integer id = row.get(0, Integer.class);
|
|
|
+ Integer number = row.get(1, Integer.class);
|
|
|
+ return new World(id, number);
|
|
|
+ })));
|
|
|
+
|
|
|
+ // Load the mustache fortunes template
|
|
|
+ MustacheFactory mustacheFactory = new DefaultMustacheFactory() {
|
|
|
+ @Override
|
|
|
+ public void encode(String value, Writer writer) {
|
|
|
+ try {
|
|
|
+ StringEscapeUtils.ESCAPE_HTML4.translate(value, writer);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ ex.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+ this.fortuneMustache = mustacheFactory.compile("fortunes.mustache");
|
|
|
+
|
|
|
+ // Create the fortunes logic
|
|
|
+ this.fortunes = Flux
|
|
|
+ .from(this.threadLocalConnection.get().createStatement("SELECT ID, MESSAGE FROM FORTUNE").execute())
|
|
|
+ .flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
|
+ Integer id = row.get(0, Integer.class);
|
|
|
+ String message = row.get(1, String.class);
|
|
|
+ return new Fortune(id, message);
|
|
|
+ }))).collectList();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -223,27 +333,167 @@ public class RawOfficeFloorMain {
|
|
|
switch (requestUri) {
|
|
|
|
|
|
case "/plaintext":
|
|
|
- response.setContentType(TEXT_PLAIN, null);
|
|
|
- response.getEntity().write(HELLO_WORLD);
|
|
|
- this.send(connection);
|
|
|
+ this.plaintext(response, connection);
|
|
|
break;
|
|
|
|
|
|
case "/json":
|
|
|
- response.setContentType(APPLICATION_JSON, null);
|
|
|
- this.objectMapper.writeValue(response.getEntityWriter(), new Message("Hello, World!"));
|
|
|
- this.send(connection);
|
|
|
+ this.json(response, connection);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/db":
|
|
|
+ this.db(response, connection);
|
|
|
+ break;
|
|
|
+
|
|
|
+ case "/fortunes":
|
|
|
+ this.fortunes(response, connection);
|
|
|
break;
|
|
|
|
|
|
default:
|
|
|
- // Unknown request
|
|
|
- response.setStatus(HttpStatus.NOT_FOUND);
|
|
|
- this.send(connection);
|
|
|
+ // Provide redirect
|
|
|
+ if (requestUri.startsWith(QUERIES_PATH_PREFIX)) {
|
|
|
+ this.queries(requestUri, response, connection);
|
|
|
+
|
|
|
+ } else if (requestUri.startsWith(UPDATE_PATH_PREFIX)) {
|
|
|
+ this.update(requestUri, response, connection);
|
|
|
+
|
|
|
+ } else {
|
|
|
+ // Unknown request
|
|
|
+ response.setStatus(HttpStatus.NOT_FOUND);
|
|
|
+ this.send(connection);
|
|
|
+ }
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
// No process management
|
|
|
return null;
|
|
|
}
|
|
|
+
|
|
|
+ private void plaintext(HttpResponse response,
|
|
|
+ ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) throws IOException {
|
|
|
+ response.setContentType(TEXT_PLAIN, null);
|
|
|
+ response.getEntity().write(HELLO_WORLD);
|
|
|
+ this.send(connection);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void json(HttpResponse response, ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection)
|
|
|
+ throws IOException {
|
|
|
+ response.setContentType(APPLICATION_JSON, null);
|
|
|
+ this.objectMapper.writeValue(response.getEntityWriter(), new Message("Hello, World!"));
|
|
|
+ this.send(connection);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void db(HttpResponse response, ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
|
+ this.db.subscribe(world -> {
|
|
|
+ try {
|
|
|
+ response.setContentType(APPLICATION_JSON, null);
|
|
|
+ this.objectMapper.writeValue(response.getEntityWriter(), world);
|
|
|
+ this.send(connection);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ ex.printStackTrace();
|
|
|
+ }
|
|
|
+ }, error -> {
|
|
|
+ this.sendError(connection, error);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private void queries(String requestUri, HttpResponse response,
|
|
|
+ ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
|
+ String queriesCountText = requestUri.substring(QUERIES_PATH_PREFIX.length());
|
|
|
+ int queryCount = getQueryCount(queriesCountText);
|
|
|
+ Flux.range(1, queryCount)
|
|
|
+ .flatMap(index -> this.threadLocalConnection.get()
|
|
|
+ .createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
+ .bind(0, ThreadLocalRandom.current().nextInt(1, 10001)).execute())
|
|
|
+ .flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
|
+ Integer id = row.get(0, Integer.class);
|
|
|
+ Integer number = row.get(1, Integer.class);
|
|
|
+ return new World(id, number);
|
|
|
+ }))).collectList().subscribe(worlds -> {
|
|
|
+ try {
|
|
|
+ response.setContentType(APPLICATION_JSON, null);
|
|
|
+ this.objectMapper.writeValue(response.getEntityWriter(), worlds);
|
|
|
+ this.send(connection);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ ex.printStackTrace();
|
|
|
+ }
|
|
|
+ }, error -> {
|
|
|
+ this.sendError(connection, error);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private void fortunes(HttpResponse response,
|
|
|
+ ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
|
+ this.fortunes.subscribe(fortunes -> {
|
|
|
+ try {
|
|
|
+ // Additional fortunes
|
|
|
+ fortunes.add(new Fortune(0, "Additional fortune added at request time."));
|
|
|
+ Collections.sort(fortunes, (a, b) -> a.message.compareTo(b.message));
|
|
|
+
|
|
|
+ // Send response
|
|
|
+ response.setContentType(TEXT_HTML, null);
|
|
|
+ this.fortuneMustache.execute(response.getEntityWriter(), fortunes);
|
|
|
+ this.send(connection);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ ex.printStackTrace();
|
|
|
+ }
|
|
|
+ }, error -> {
|
|
|
+ this.sendError(connection, error);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private void update(String requestUri, HttpResponse response,
|
|
|
+ ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection) {
|
|
|
+ String queriesCountText = requestUri.substring(UPDATE_PATH_PREFIX.length());
|
|
|
+ int queryCount = getQueryCount(queriesCountText);
|
|
|
+ Connection db = this.threadLocalConnection.get();
|
|
|
+ Flux.range(1, queryCount)
|
|
|
+ .flatMap(index -> db.createStatement("SELECT ID, RANDOMNUMBER FROM WORLD WHERE ID = $1")
|
|
|
+ .bind(0, ThreadLocalRandom.current().nextInt(1, 10001)).execute())
|
|
|
+ .flatMap(result -> Flux.from(result.map((row, metadata) -> {
|
|
|
+ Integer id = row.get(0, Integer.class);
|
|
|
+ Integer number = row.get(1, Integer.class);
|
|
|
+ return new World(id, number);
|
|
|
+ }))).flatMap(world -> {
|
|
|
+ world.randomNumber = ThreadLocalRandom.current().nextInt(1, 10001);
|
|
|
+ return Flux
|
|
|
+ .from(db.createStatement("UPDATE WORLD SET RANDOMNUMBER = $1 WHERE ID = $2")
|
|
|
+ .bind(0, world.randomNumber).bind(1, world.id).execute())
|
|
|
+ .flatMap(result -> Flux.from(result.getRowsUpdated()).map(updated -> world));
|
|
|
+ }).collectList().subscribe(worlds -> {
|
|
|
+ try {
|
|
|
+ response.setContentType(APPLICATION_JSON, null);
|
|
|
+ this.objectMapper.writeValue(response.getEntityWriter(), worlds);
|
|
|
+ this.send(connection);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ ex.printStackTrace();
|
|
|
+ }
|
|
|
+ }, error -> {
|
|
|
+ this.sendError(connection, error);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendError(ProcessAwareServerHttpConnectionManagedObject<ByteBuffer> connection,
|
|
|
+ Throwable failure) {
|
|
|
+ try {
|
|
|
+ failure.printStackTrace();
|
|
|
+
|
|
|
+ HttpResponse response = connection.getResponse();
|
|
|
+ response.reset();
|
|
|
+ response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR);
|
|
|
+ this.send(connection);
|
|
|
+ } catch (IOException ex) {
|
|
|
+ ex.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static int getQueryCount(String queries) {
|
|
|
+ try {
|
|
|
+ int count = Integer.parseInt(queries);
|
|
|
+ return (count < 1) ? 1 : (count > 500) ? 500 : count;
|
|
|
+ } catch (NumberFormatException ex) {
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Data
|
|
@@ -251,4 +501,20 @@ public class RawOfficeFloorMain {
|
|
|
private final String message;
|
|
|
}
|
|
|
|
|
|
+ @Data
|
|
|
+ @AllArgsConstructor
|
|
|
+ public static class World {
|
|
|
+
|
|
|
+ private final int id;
|
|
|
+
|
|
|
+ private int randomNumber;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Data
|
|
|
+ public static class Fortune {
|
|
|
+
|
|
|
+ private final int id;
|
|
|
+
|
|
|
+ private final String message;
|
|
|
+ }
|
|
|
}
|