|
@@ -28,8 +28,10 @@ import io.inverno.mod.http.base.InternalServerErrorException;
|
|
|
import io.inverno.mod.http.base.Parameter;
|
|
|
import io.inverno.mod.http.base.Status;
|
|
|
import io.inverno.mod.http.server.Exchange;
|
|
|
-import io.inverno.mod.http.server.ExchangeHandler;
|
|
|
+import io.inverno.mod.http.server.ExchangeContext;
|
|
|
+import io.inverno.mod.http.server.RootExchangeHandler;
|
|
|
import io.inverno.mod.sql.SqlClient;
|
|
|
+import io.inverno.mod.sql.UnsafeSqlOperations;
|
|
|
import io.netty.buffer.ByteBuf;
|
|
|
import io.netty.buffer.Unpooled;
|
|
|
import io.netty.channel.EventLoopGroup;
|
|
@@ -40,7 +42,7 @@ import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
@Bean( visibility = Visibility.PRIVATE )
|
|
|
-public class Handler implements ExchangeHandler<Exchange> {
|
|
|
+public class Handler implements RootExchangeHandler<ExchangeContext, Exchange<ExchangeContext>> {
|
|
|
|
|
|
private static final String PATH_PLAINTEXT = "/plaintext";
|
|
|
private static final String PATH_JSON = "/json";
|
|
@@ -49,12 +51,15 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
private static final String PATH_UPDATES = "/updates";
|
|
|
private static final String PATH_FORTUNES = "/fortunes";
|
|
|
|
|
|
+ public static final String DB_SELECT_WORLD = "SELECT id, randomnumber from WORLD where id = $1";
|
|
|
+ public static final String DB_UPDATE_WORLD = "UPDATE world SET randomnumber=$1 WHERE id=$2";
|
|
|
+ public static final String DB_SELECT_FORTUNE = "SELECT id, message from FORTUNE";
|
|
|
+
|
|
|
private static final CharSequence STATIC_SERVER = AsciiString.cached("inverno");
|
|
|
|
|
|
private final Reactor reactor;
|
|
|
private final ObjectMapper mapper;
|
|
|
- private final ReactorScope<SqlClient> pooledClientSqlClient;
|
|
|
- private final ReactorScope<SqlClient> poolSqlClient;
|
|
|
+ private final ReactorScope<Mono<SqlClient>> sqlClient;
|
|
|
|
|
|
private EventLoopGroup dateEventLoopGroup;
|
|
|
|
|
@@ -62,13 +67,11 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
|
|
|
public Handler(Reactor reactor,
|
|
|
ObjectMapper mapper,
|
|
|
- ReactorScope<SqlClient> pooledClientSqlClient,
|
|
|
- ReactorScope<SqlClient> poolSqlClient
|
|
|
+ ReactorScope<Mono<SqlClient>> sqlClient
|
|
|
) {
|
|
|
this.reactor = reactor;
|
|
|
this.mapper = mapper;
|
|
|
- this.pooledClientSqlClient = pooledClientSqlClient;
|
|
|
- this.poolSqlClient = poolSqlClient;
|
|
|
+ this.sqlClient = sqlClient;
|
|
|
}
|
|
|
|
|
|
@Init
|
|
@@ -85,7 +88,7 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void handle(Exchange exchange) throws HttpException {
|
|
|
+ public void handle(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
|
switch(exchange.request().getPath()) {
|
|
|
case PATH_PLAINTEXT: {
|
|
|
this.handle_plaintext(exchange);
|
|
@@ -145,7 +148,7 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
|
|
|
private static final Mono<ByteBuf> PLAIN_TEXT_MONO = Mono.fromSupplier(new PlaintextSupplier());
|
|
|
|
|
|
- public void handle_plaintext(Exchange exchange) throws HttpException {
|
|
|
+ public void handle_plaintext(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
|
exchange.response()
|
|
|
.headers(h -> h
|
|
|
.add(HttpHeaderNames.SERVER, STATIC_SERVER)
|
|
@@ -158,7 +161,7 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
.stream(PLAIN_TEXT_MONO);
|
|
|
}
|
|
|
|
|
|
- public void handle_json(Exchange exchange) throws HttpException {
|
|
|
+ public void handle_json(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
|
try {
|
|
|
exchange.response()
|
|
|
.headers(h -> h
|
|
@@ -175,13 +178,11 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static final String DB_SELECT_WORLD = "SELECT id, randomnumber from WORLD where id = $1";
|
|
|
-
|
|
|
private static int randomWorldId() {
|
|
|
return 1 + ThreadLocalRandom.current().nextInt(10000);
|
|
|
}
|
|
|
|
|
|
- public void handle_db(Exchange exchange) throws HttpException {
|
|
|
+ public void handle_db(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
|
exchange.response()
|
|
|
.headers(h -> h
|
|
|
.add(HttpHeaderNames.SERVER, STATIC_SERVER)
|
|
@@ -189,7 +190,8 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
|
)
|
|
|
.body()
|
|
|
- .raw().stream(this.pooledClientSqlClient.get().queryForObject(
|
|
|
+ .raw().stream(this.sqlClient.get().flatMap(client ->
|
|
|
+ client.queryForObject(
|
|
|
DB_SELECT_WORLD,
|
|
|
row -> {
|
|
|
try {
|
|
@@ -201,21 +203,21 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
},
|
|
|
randomWorldId()
|
|
|
)
|
|
|
- );
|
|
|
+ ));
|
|
|
}
|
|
|
|
|
|
private static final String PARAMETER_QUERIES = "queries";
|
|
|
|
|
|
- private int extractQueriesParameter(Exchange exchange) {
|
|
|
+ private int extractQueriesParameter(Exchange<ExchangeContext> exchange) {
|
|
|
try {
|
|
|
return Math.min(500, Math.max(1, exchange.request().queryParameters().get(PARAMETER_QUERIES).map(Parameter::asInteger).orElse(1)));
|
|
|
}
|
|
|
- catch (ConverterException e) { // TODO
|
|
|
+ catch (ConverterException e) {
|
|
|
return 1;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void handle_queries(Exchange exchange) throws HttpException {
|
|
|
+ public void handle_queries(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
|
int queries = this.extractQueriesParameter(exchange);
|
|
|
exchange.response()
|
|
|
.headers(h -> h
|
|
@@ -224,12 +226,17 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
.add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
|
)
|
|
|
.body()
|
|
|
- .raw().stream(Flux.range(0, queries)
|
|
|
- .flatMap(ign -> this.pooledClientSqlClient.get().queryForObject(
|
|
|
- DB_SELECT_WORLD,
|
|
|
- row -> new World(row.getInteger(0), row.getInteger(1)),
|
|
|
- randomWorldId()
|
|
|
- ))
|
|
|
+ .raw().stream(this.sqlClient.get()
|
|
|
+ .flatMapMany(client -> ((UnsafeSqlOperations)client)
|
|
|
+ .batchQueries(ops ->
|
|
|
+ Flux.range(0, queries)
|
|
|
+ .map(ign -> ops.queryForObject(
|
|
|
+ DB_SELECT_WORLD,
|
|
|
+ row -> new World(row.getInteger(0), row.getInteger(1)),
|
|
|
+ randomWorldId()
|
|
|
+ ))
|
|
|
+ )
|
|
|
+ )
|
|
|
.collectList()
|
|
|
.map(worlds -> {
|
|
|
try {
|
|
@@ -242,27 +249,28 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- private static final String DB_UPDATE_WORLD = "UPDATE world SET randomnumber=$1 WHERE id=$2";
|
|
|
-
|
|
|
- public void handle_updates(Exchange exchange) throws HttpException {
|
|
|
+ public void handle_updates(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
|
int queries = this.extractQueriesParameter(exchange);
|
|
|
|
|
|
exchange.response()
|
|
|
- .headers(h -> h
|
|
|
- .add(HttpHeaderNames.SERVER, STATIC_SERVER)
|
|
|
- .add(HttpHeaderNames.DATE, this.date)
|
|
|
- .add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
|
- )
|
|
|
- .body()
|
|
|
- .raw().stream(this.poolSqlClient.get().connection(ops -> Flux.range(0, queries)
|
|
|
- .flatMap(ign -> ops.queryForObject(
|
|
|
- DB_SELECT_WORLD,
|
|
|
- row -> new World(row.getInteger(0), randomWorldId()),
|
|
|
- randomWorldId()
|
|
|
- )
|
|
|
- )
|
|
|
+ .headers(h -> h
|
|
|
+ .add(HttpHeaderNames.SERVER, STATIC_SERVER)
|
|
|
+ .add(HttpHeaderNames.DATE, this.date)
|
|
|
+ .add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
|
|
+ )
|
|
|
+ .body()
|
|
|
+ .raw().stream(this.sqlClient.get()
|
|
|
+ .flatMapMany(client -> Flux.from(((UnsafeSqlOperations)client)
|
|
|
+ .batchQueries(ops ->
|
|
|
+ Flux.range(0, queries)
|
|
|
+ .map(ign -> ops.queryForObject(
|
|
|
+ DB_SELECT_WORLD,
|
|
|
+ row -> new World(row.getInteger(0), randomWorldId()),
|
|
|
+ randomWorldId()
|
|
|
+ ))
|
|
|
+ ))
|
|
|
.collectSortedList()
|
|
|
- .delayUntil(worlds -> ops.batchUpdate(
|
|
|
+ .delayUntil(worlds -> client.batchUpdate(
|
|
|
DB_UPDATE_WORLD,
|
|
|
worlds.stream().map(world -> new Object[] { world.getRandomNumber(), world.getId() })
|
|
|
)
|
|
@@ -274,16 +282,16 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
catch (JsonProcessingException e) {
|
|
|
throw new InternalServerErrorException(e);
|
|
|
}
|
|
|
- }))
|
|
|
- );
|
|
|
+ })
|
|
|
+ )
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
- private static final String DB_SELECT_FORTUNE = "SELECT id, message from FORTUNE";
|
|
|
private static final CharSequence MEDIA_TEXT_HTML_UTF8 = AsciiString.cached("text/html; charset=utf-8");
|
|
|
|
|
|
private static final FortunesTemplate.Renderer<CompletableFuture<ByteBuf>> FORTUNES_RENDERER = FortunesTemplate.bytebuf(() -> Unpooled.unreleasableBuffer(Unpooled.buffer()));
|
|
|
|
|
|
- public void handle_fortunes(Exchange exchange) throws HttpException {
|
|
|
+ public void handle_fortunes(Exchange<ExchangeContext> exchange) throws HttpException {
|
|
|
exchange.response()
|
|
|
.headers(h -> h
|
|
|
.add(HttpHeaderNames.SERVER, STATIC_SERVER)
|
|
@@ -291,10 +299,12 @@ public class Handler implements ExchangeHandler<Exchange> {
|
|
|
.add(HttpHeaderNames.CONTENT_TYPE, MEDIA_TEXT_HTML_UTF8)
|
|
|
)
|
|
|
.body()
|
|
|
- .raw().stream(Flux.from(this.pooledClientSqlClient.get().query(
|
|
|
- DB_SELECT_FORTUNE,
|
|
|
- row -> new Fortune(row.getInteger(0), row.getString(1))
|
|
|
- ))
|
|
|
+ .raw().stream(this.sqlClient.get().flatMapMany(client ->
|
|
|
+ client.query(
|
|
|
+ DB_SELECT_FORTUNE,
|
|
|
+ row -> new Fortune(row.getInteger(0), row.getString(1))
|
|
|
+ )
|
|
|
+ )
|
|
|
.collectList()
|
|
|
.flatMap(fortunes -> {
|
|
|
fortunes.add(new Fortune(0, "Additional fortune added at request time."));
|