|
@@ -1,7 +1,5 @@
|
|
package com.techempower.inverno.benchmark.internal;
|
|
package com.techempower.inverno.benchmark.internal;
|
|
|
|
|
|
-import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
|
-import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
import com.techempower.inverno.benchmark.model.Fortune;
|
|
import com.techempower.inverno.benchmark.model.Fortune;
|
|
import com.techempower.inverno.benchmark.model.Message;
|
|
import com.techempower.inverno.benchmark.model.Message;
|
|
import com.techempower.inverno.benchmark.model.World;
|
|
import com.techempower.inverno.benchmark.model.World;
|
|
@@ -14,15 +12,14 @@ import io.inverno.mod.base.Charsets;
|
|
import io.inverno.mod.base.concurrent.Reactor;
|
|
import io.inverno.mod.base.concurrent.Reactor;
|
|
import io.inverno.mod.base.concurrent.ReactorScope;
|
|
import io.inverno.mod.base.concurrent.ReactorScope;
|
|
import io.inverno.mod.base.converter.ConverterException;
|
|
import io.inverno.mod.base.converter.ConverterException;
|
|
|
|
+import io.inverno.mod.base.reflect.Types;
|
|
import io.inverno.mod.http.base.ExchangeContext;
|
|
import io.inverno.mod.http.base.ExchangeContext;
|
|
import io.inverno.mod.http.base.HttpException;
|
|
import io.inverno.mod.http.base.HttpException;
|
|
-import io.inverno.mod.http.base.InternalServerErrorException;
|
|
|
|
import io.inverno.mod.http.base.Parameter;
|
|
import io.inverno.mod.http.base.Parameter;
|
|
import io.inverno.mod.http.base.Status;
|
|
import io.inverno.mod.http.base.Status;
|
|
-import io.inverno.mod.http.server.Exchange;
|
|
|
|
import io.inverno.mod.http.server.ErrorExchange;
|
|
import io.inverno.mod.http.server.ErrorExchange;
|
|
|
|
+import io.inverno.mod.http.server.Exchange;
|
|
import io.inverno.mod.http.server.ServerController;
|
|
import io.inverno.mod.http.server.ServerController;
|
|
-import io.inverno.mod.sql.SqlClient;
|
|
|
|
import io.inverno.mod.sql.UnsafeSqlOperations;
|
|
import io.inverno.mod.sql.UnsafeSqlOperations;
|
|
import io.netty.buffer.ByteBuf;
|
|
import io.netty.buffer.ByteBuf;
|
|
import io.netty.buffer.Unpooled;
|
|
import io.netty.buffer.Unpooled;
|
|
@@ -30,13 +27,14 @@ import io.netty.channel.EventLoopGroup;
|
|
import io.netty.handler.codec.http.HttpHeaderNames;
|
|
import io.netty.handler.codec.http.HttpHeaderNames;
|
|
import io.netty.handler.codec.http.HttpHeaderValues;
|
|
import io.netty.handler.codec.http.HttpHeaderValues;
|
|
import io.netty.util.AsciiString;
|
|
import io.netty.util.AsciiString;
|
|
|
|
+import java.lang.reflect.Type;
|
|
import java.time.ZonedDateTime;
|
|
import java.time.ZonedDateTime;
|
|
import java.time.format.DateTimeFormatter;
|
|
import java.time.format.DateTimeFormatter;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
|
|
+import java.util.List;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.CompletableFuture;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.ThreadLocalRandom;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
-import java.util.function.Supplier;
|
|
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Flux;
|
|
import reactor.core.publisher.Mono;
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
@@ -51,36 +49,32 @@ public class Controller implements ServerController<ExchangeContext, Exchange<Ex
|
|
private static final String PATH_FORTUNES = "/fortunes";
|
|
private static final String PATH_FORTUNES = "/fortunes";
|
|
|
|
|
|
public static final String DB_SELECT_WORLD = "SELECT id, randomnumber from WORLD where id = $1";
|
|
public static final String DB_SELECT_WORLD = "SELECT id, randomnumber from WORLD where id = $1";
|
|
- public static final String DB_UPDATE_WORLD = "UPDATE world SET randomnumber=$1 WHERE id=$2";
|
|
|
|
- public static final String DB_SELECT_FORTUNE = "SELECT id, message from FORTUNE";
|
|
|
|
-
|
|
|
|
|
|
+
|
|
private static final CharSequence STATIC_SERVER = AsciiString.cached("inverno");
|
|
private static final CharSequence STATIC_SERVER = AsciiString.cached("inverno");
|
|
|
|
|
|
|
|
+ private static final Type LIST_WORLD_TYPE = Types.type(List.class).type(World.class).and().build();
|
|
|
|
+
|
|
private final Reactor reactor;
|
|
private final Reactor reactor;
|
|
- private final ObjectMapper mapper;
|
|
|
|
- private final ReactorScope<Mono<SqlClient>> sqlClient;
|
|
|
|
|
|
+ private final ReactorScope<JsonSerializer> jsonSerializer;
|
|
|
|
+ private final ReactorScope<Mono<DbRepository>> dbRepository;
|
|
|
|
|
|
private EventLoopGroup dateEventLoopGroup;
|
|
private EventLoopGroup dateEventLoopGroup;
|
|
|
|
|
|
private CharSequence date;
|
|
private CharSequence date;
|
|
|
|
|
|
public Controller(Reactor reactor,
|
|
public Controller(Reactor reactor,
|
|
- ObjectMapper mapper,
|
|
|
|
- ReactorScope<Mono<SqlClient>> sqlClient
|
|
|
|
|
|
+ ReactorScope<JsonSerializer> jsonSerializer,
|
|
|
|
+ ReactorScope<Mono<DbRepository>> dbRepository
|
|
) {
|
|
) {
|
|
this.reactor = reactor;
|
|
this.reactor = reactor;
|
|
- this.mapper = mapper;
|
|
|
|
- this.sqlClient = sqlClient;
|
|
|
|
|
|
+ this.jsonSerializer = jsonSerializer;
|
|
|
|
+ this.dbRepository = dbRepository;
|
|
}
|
|
}
|
|
|
|
|
|
@Init
|
|
@Init
|
|
public void init() {
|
|
public void init() {
|
|
this.dateEventLoopGroup = this.reactor.createIoEventLoopGroup(1);
|
|
this.dateEventLoopGroup = this.reactor.createIoEventLoopGroup(1);
|
|
- this.dateEventLoopGroup.scheduleAtFixedRate(() -> {
|
|
|
|
- this.date = new AsciiString(DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now()));
|
|
|
|
- }, 0, 1000, TimeUnit.MILLISECONDS);
|
|
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+ this.dateEventLoopGroup.scheduleAtFixedRate(() -> this.date = new AsciiString(DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now())), 0, 1000, TimeUnit.MILLISECONDS);
|
|
}
|
|
}
|
|
|
|
|
|
@Destroy
|
|
@Destroy
|
|
@@ -139,15 +133,8 @@ public class Controller implements ServerController<ExchangeContext, Exchange<Ex
|
|
}
|
|
}
|
|
|
|
|
|
private static final CharSequence STATIC_PLAINTEXT_LEN_VALUE = AsciiString.cached(String.valueOf(STATIC_PLAINTEXT_LEN));
|
|
private static final CharSequence STATIC_PLAINTEXT_LEN_VALUE = AsciiString.cached(String.valueOf(STATIC_PLAINTEXT_LEN));
|
|
-
|
|
|
|
- private static class PlaintextSupplier implements Supplier<ByteBuf> {
|
|
|
|
- @Override
|
|
|
|
- public ByteBuf get() {
|
|
|
|
- return STATIC_PLAINTEXT_BYTEBUF.duplicate();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private static final Mono<ByteBuf> PLAIN_TEXT_MONO = Mono.fromSupplier(new PlaintextSupplier());
|
|
|
|
|
|
+
|
|
|
|
+ private static final Mono<ByteBuf> PLAIN_TEXT_MONO = Mono.fromSupplier(STATIC_PLAINTEXT_BYTEBUF::duplicate);
|
|
|
|
|
|
public void handle_plaintext(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
public void handle_plaintext(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
exchange.response()
|
|
exchange.response()
|
|
@@ -163,20 +150,15 @@ public class Controller implements ServerController<ExchangeContext, Exchange<Ex
|
|
}
|
|
}
|
|
|
|
|
|
public void handle_json(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
public void handle_json(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
- try {
|
|
|
|
- exchange.response()
|
|
|
|
- .headers(h -> h
|
|
|
|
- .add(HttpHeaderNames.SERVER, STATIC_SERVER)
|
|
|
|
- .add(HttpHeaderNames.DATE, this.date)
|
|
|
|
- .add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
|
|
- )
|
|
|
|
- .body()
|
|
|
|
- .raw()
|
|
|
|
- .value(Unpooled.wrappedBuffer(this.mapper.writeValueAsBytes(new Message("Hello, World!"))));
|
|
|
|
- }
|
|
|
|
- catch (JsonProcessingException | IllegalStateException e) {
|
|
|
|
- throw new InternalServerErrorException("Error serializing message as JSON", e);
|
|
|
|
- }
|
|
|
|
|
|
+ exchange.response()
|
|
|
|
+ .headers(h -> h
|
|
|
|
+ .add(HttpHeaderNames.SERVER, STATIC_SERVER)
|
|
|
|
+ .add(HttpHeaderNames.DATE, this.date)
|
|
|
|
+ .add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
|
|
+ )
|
|
|
|
+ .body()
|
|
|
|
+ .raw()
|
|
|
|
+ .value(this.jsonSerializer.get().serialize(new Message("Hello, World!"), Message.class));
|
|
}
|
|
}
|
|
|
|
|
|
private static int randomWorldId() {
|
|
private static int randomWorldId() {
|
|
@@ -191,20 +173,10 @@ public class Controller implements ServerController<ExchangeContext, Exchange<Ex
|
|
.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
)
|
|
)
|
|
.body()
|
|
.body()
|
|
- .raw().stream(this.sqlClient.get().flatMap(client ->
|
|
|
|
- client.queryForObject(
|
|
|
|
- DB_SELECT_WORLD,
|
|
|
|
- row -> {
|
|
|
|
- try {
|
|
|
|
- return Unpooled.wrappedBuffer(this.mapper.writeValueAsBytes(new World(row.getInteger(0), row.getInteger(1))));
|
|
|
|
- }
|
|
|
|
- catch (JsonProcessingException e) {
|
|
|
|
- throw new InternalServerErrorException(e);
|
|
|
|
- }
|
|
|
|
- },
|
|
|
|
- randomWorldId()
|
|
|
|
- )
|
|
|
|
- ));
|
|
|
|
|
|
+ .raw().stream(this.dbRepository.get()
|
|
|
|
+ .flatMap(repository -> repository.getWorld(randomWorldId()))
|
|
|
|
+ .map(world -> this.jsonSerializer.get().serialize(world, World.class))
|
|
|
|
+ );
|
|
}
|
|
}
|
|
|
|
|
|
private static final String PARAMETER_QUERIES = "queries";
|
|
private static final String PARAMETER_QUERIES = "queries";
|
|
@@ -227,8 +199,8 @@ public class Controller implements ServerController<ExchangeContext, Exchange<Ex
|
|
.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
)
|
|
)
|
|
.body()
|
|
.body()
|
|
- .raw().stream(this.sqlClient.get()
|
|
|
|
- .flatMapMany(client -> ((UnsafeSqlOperations)client)
|
|
|
|
|
|
+ .raw().stream(this.dbRepository.get()
|
|
|
|
+ .flatMapMany(repository -> ((UnsafeSqlOperations)repository.getSqlClient())
|
|
.batchQueries(ops ->
|
|
.batchQueries(ops ->
|
|
Flux.range(0, queries)
|
|
Flux.range(0, queries)
|
|
.map(ign -> ops.queryForObject(
|
|
.map(ign -> ops.queryForObject(
|
|
@@ -239,58 +211,39 @@ public class Controller implements ServerController<ExchangeContext, Exchange<Ex
|
|
)
|
|
)
|
|
)
|
|
)
|
|
.collectList()
|
|
.collectList()
|
|
- .map(worlds -> {
|
|
|
|
- try {
|
|
|
|
- return Unpooled.wrappedBuffer(this.mapper.writeValueAsBytes(worlds));
|
|
|
|
- }
|
|
|
|
- catch (JsonProcessingException e) {
|
|
|
|
- throw new InternalServerErrorException(e);
|
|
|
|
- }
|
|
|
|
- })
|
|
|
|
|
|
+ .map(worlds -> this.jsonSerializer.get().serialize(worlds, LIST_WORLD_TYPE))
|
|
);
|
|
);
|
|
}
|
|
}
|
|
|
|
|
|
public void handle_updates(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
public void handle_updates(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
int queries = this.extractQueriesParameter(exchange);
|
|
int queries = this.extractQueriesParameter(exchange);
|
|
-
|
|
|
|
exchange.response()
|
|
exchange.response()
|
|
- .headers(h -> h
|
|
|
|
- .add(HttpHeaderNames.SERVER, STATIC_SERVER)
|
|
|
|
- .add(HttpHeaderNames.DATE, this.date)
|
|
|
|
- .add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
|
|
- )
|
|
|
|
- .body()
|
|
|
|
- .raw().stream(this.sqlClient.get()
|
|
|
|
- .flatMapMany(client -> Flux.from(((UnsafeSqlOperations)client)
|
|
|
|
- .batchQueries(ops ->
|
|
|
|
- Flux.range(0, queries)
|
|
|
|
- .map(ign -> ops.queryForObject(
|
|
|
|
- DB_SELECT_WORLD,
|
|
|
|
- row -> new World(row.getInteger(0), randomWorldId()),
|
|
|
|
- randomWorldId()
|
|
|
|
- ))
|
|
|
|
- ))
|
|
|
|
- .collectSortedList()
|
|
|
|
- .delayUntil(worlds -> client.batchUpdate(
|
|
|
|
- DB_UPDATE_WORLD,
|
|
|
|
- worlds.stream().map(world -> new Object[] { world.getRandomNumber(), world.getId() })
|
|
|
|
- )
|
|
|
|
|
|
+ .headers(h -> h
|
|
|
|
+ .add(HttpHeaderNames.SERVER, STATIC_SERVER)
|
|
|
|
+ .add(HttpHeaderNames.DATE, this.date)
|
|
|
|
+ .add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
|
|
+ )
|
|
|
|
+ .body()
|
|
|
|
+ .raw().stream(this.dbRepository.get()
|
|
|
|
+ .flatMapMany(repository -> Flux.from(((UnsafeSqlOperations)repository.getSqlClient())
|
|
|
|
+ .batchQueries(ops ->
|
|
|
|
+ Flux.range(0, queries)
|
|
|
|
+ .map(ign -> ops.queryForObject(
|
|
|
|
+ DB_SELECT_WORLD,
|
|
|
|
+ row -> new World(row.getInteger(0), randomWorldId()),
|
|
|
|
+ randomWorldId()
|
|
|
|
+ ))
|
|
|
|
+ ))
|
|
|
|
+ .collectSortedList()
|
|
|
|
+ .delayUntil(repository::updateWorlds)
|
|
|
|
+ .map(worlds -> this.jsonSerializer.get().serialize(worlds, LIST_WORLD_TYPE))
|
|
)
|
|
)
|
|
- .map(worlds -> {
|
|
|
|
- try {
|
|
|
|
- return Unpooled.wrappedBuffer(this.mapper.writeValueAsBytes(worlds));
|
|
|
|
- }
|
|
|
|
- catch (JsonProcessingException e) {
|
|
|
|
- throw new InternalServerErrorException(e);
|
|
|
|
- }
|
|
|
|
- })
|
|
|
|
- )
|
|
|
|
- );
|
|
|
|
|
|
+ );
|
|
}
|
|
}
|
|
|
|
|
|
private static final CharSequence MEDIA_TEXT_HTML_UTF8 = AsciiString.cached("text/html; charset=utf-8");
|
|
private static final CharSequence MEDIA_TEXT_HTML_UTF8 = AsciiString.cached("text/html; charset=utf-8");
|
|
|
|
|
|
- private static final FortunesTemplate.Renderer<CompletableFuture<ByteBuf>> FORTUNES_RENDERER = FortunesTemplate.bytebuf(() -> Unpooled.buffer());
|
|
|
|
|
|
+ private static final FortunesTemplate.Renderer<CompletableFuture<ByteBuf>> FORTUNES_RENDERER = FortunesTemplate.bytebuf(Unpooled::buffer);
|
|
|
|
|
|
public void handle_fortunes(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
public void handle_fortunes(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
exchange.response()
|
|
exchange.response()
|
|
@@ -300,12 +253,7 @@ public class Controller implements ServerController<ExchangeContext, Exchange<Ex
|
|
.add(HttpHeaderNames.CONTENT_TYPE, MEDIA_TEXT_HTML_UTF8)
|
|
.add(HttpHeaderNames.CONTENT_TYPE, MEDIA_TEXT_HTML_UTF8)
|
|
)
|
|
)
|
|
.body()
|
|
.body()
|
|
- .raw().stream(this.sqlClient.get().flatMapMany(client ->
|
|
|
|
- client.query(
|
|
|
|
- DB_SELECT_FORTUNE,
|
|
|
|
- row -> new Fortune(row.getInteger(0), row.getString(1))
|
|
|
|
- )
|
|
|
|
- )
|
|
|
|
|
|
+ .raw().stream(this.dbRepository.get().flatMapMany(DbRepository::listFortunes)
|
|
.collectList()
|
|
.collectList()
|
|
.flatMap(fortunes -> {
|
|
.flatMap(fortunes -> {
|
|
fortunes.add(new Fortune(0, "Additional fortune added at request time."));
|
|
fortunes.add(new Fortune(0, "Additional fortune added at request time."));
|