|
@@ -1,4 +1,5 @@
|
|
import io.netty.channel.unix.Errors.NativeIoException
|
|
import io.netty.channel.unix.Errors.NativeIoException
|
|
|
|
+import io.vertx.core.buffer.Buffer
|
|
import io.vertx.core.http.HttpHeaders
|
|
import io.vertx.core.http.HttpHeaders
|
|
import io.vertx.core.http.HttpServer
|
|
import io.vertx.core.http.HttpServer
|
|
import io.vertx.core.http.HttpServerRequest
|
|
import io.vertx.core.http.HttpServerRequest
|
|
@@ -7,39 +8,28 @@ import io.vertx.ext.web.Route
|
|
import io.vertx.ext.web.Router
|
|
import io.vertx.ext.web.Router
|
|
import io.vertx.ext.web.RoutingContext
|
|
import io.vertx.ext.web.RoutingContext
|
|
import io.vertx.kotlin.core.http.httpServerOptionsOf
|
|
import io.vertx.kotlin.core.http.httpServerOptionsOf
|
|
|
|
+import io.vertx.kotlin.coroutines.CoroutineRouterSupport
|
|
import io.vertx.kotlin.coroutines.CoroutineVerticle
|
|
import io.vertx.kotlin.coroutines.CoroutineVerticle
|
|
-import io.vertx.kotlin.coroutines.await
|
|
|
|
|
|
+import io.vertx.kotlin.coroutines.coAwait
|
|
import io.vertx.kotlin.pgclient.pgConnectOptionsOf
|
|
import io.vertx.kotlin.pgclient.pgConnectOptionsOf
|
|
import io.vertx.pgclient.PgConnection
|
|
import io.vertx.pgclient.PgConnection
|
|
import io.vertx.sqlclient.PreparedQuery
|
|
import io.vertx.sqlclient.PreparedQuery
|
|
import io.vertx.sqlclient.Row
|
|
import io.vertx.sqlclient.Row
|
|
import io.vertx.sqlclient.RowSet
|
|
import io.vertx.sqlclient.RowSet
|
|
import io.vertx.sqlclient.Tuple
|
|
import io.vertx.sqlclient.Tuple
|
|
-import kotlinx.coroutines.*
|
|
|
|
|
|
+import kotlinx.coroutines.Dispatchers
|
|
import kotlinx.html.*
|
|
import kotlinx.html.*
|
|
import kotlinx.html.stream.appendHTML
|
|
import kotlinx.html.stream.appendHTML
|
|
|
|
+import kotlinx.io.buffered
|
|
|
|
+import kotlinx.serialization.ExperimentalSerializationApi
|
|
import kotlinx.serialization.Serializable
|
|
import kotlinx.serialization.Serializable
|
|
-import kotlinx.serialization.encodeToString
|
|
|
|
|
|
+import kotlinx.serialization.SerializationStrategy
|
|
import kotlinx.serialization.json.Json
|
|
import kotlinx.serialization.json.Json
|
|
-import java.net.SocketException
|
|
|
|
|
|
+import kotlinx.serialization.json.io.encodeToSink
|
|
import java.time.ZonedDateTime
|
|
import java.time.ZonedDateTime
|
|
import java.time.format.DateTimeFormatter
|
|
import java.time.format.DateTimeFormatter
|
|
|
|
|
|
-class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
|
|
|
|
- inline fun Route.checkedCoroutineHandlerUnconfined(crossinline requestHandler: suspend (RoutingContext) -> Unit): Route =
|
|
|
|
- handler { ctx ->
|
|
|
|
- /* Some conclusions from the Plaintext test results with trailing `await()`s:
|
|
|
|
- 1. `launch { /*...*/ }` < `launch(start = CoroutineStart.UNDISPATCHED) { /*...*/ }` < `launch(Dispatchers.Unconfined) { /*...*/ }`.
|
|
|
|
- 1. `launch { /*...*/ }` without `context` or `start` lead to `io.netty.channel.StacklessClosedChannelException` and `io.netty.channel.unix.Errors$NativeIoException: sendAddress(..) failed: Connection reset by peer`. */
|
|
|
|
- launch(Dispatchers.Unconfined) {
|
|
|
|
- try {
|
|
|
|
- requestHandler(ctx)
|
|
|
|
- } catch (t: Throwable) {
|
|
|
|
- ctx.fail(t)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
|
|
+class MainVerticle(val hasDb: Boolean) : CoroutineVerticle(), CoroutineRouterSupport {
|
|
// `PgConnection`s as used in the "vertx" portion offers better performance than `PgPool`s.
|
|
// `PgConnection`s as used in the "vertx" portion offers better performance than `PgPool`s.
|
|
lateinit var pgConnection: PgConnection
|
|
lateinit var pgConnection: PgConnection
|
|
lateinit var date: String
|
|
lateinit var date: String
|
|
@@ -68,7 +58,7 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
|
|
cachePreparedStatements = true,
|
|
cachePreparedStatements = true,
|
|
pipeliningLimit = 100000
|
|
pipeliningLimit = 100000
|
|
)
|
|
)
|
|
- ).await()
|
|
|
|
|
|
+ ).coAwait()
|
|
|
|
|
|
selectWorldQuery = pgConnection.preparedQuery(SELECT_WORLD_SQL)
|
|
selectWorldQuery = pgConnection.preparedQuery(SELECT_WORLD_SQL)
|
|
selectFortuneQuery = pgConnection.preparedQuery(SELECT_FORTUNE_SQL)
|
|
selectFortuneQuery = pgConnection.preparedQuery(SELECT_FORTUNE_SQL)
|
|
@@ -81,15 +71,19 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
|
|
.requestHandler(Router.router(vertx).apply { routes() })
|
|
.requestHandler(Router.router(vertx).apply { routes() })
|
|
.exceptionHandler {
|
|
.exceptionHandler {
|
|
// wrk resets the connections when benchmarking is finished.
|
|
// wrk resets the connections when benchmarking is finished.
|
|
- if ((it is NativeIoException && it.message == "recvAddress(..) failed: Connection reset by peer")
|
|
|
|
- || (it is SocketException && it.message == "Connection reset")
|
|
|
|
|
|
+ if (
|
|
|
|
+ // for epoll
|
|
|
|
+ /*(it is NativeIoException && it.message == "recvAddress(..) failed: Connection reset by peer")
|
|
|
|
+ || (it is SocketException && it.message == "Connection reset")*/
|
|
|
|
+ // for io_uring
|
|
|
|
+ it is NativeIoException && it.message == "io_uring read(..) failed: Connection reset by peer"
|
|
)
|
|
)
|
|
return@exceptionHandler
|
|
return@exceptionHandler
|
|
|
|
|
|
logger.info("Exception in HttpServer: $it")
|
|
logger.info("Exception in HttpServer: $it")
|
|
it.printStackTrace()
|
|
it.printStackTrace()
|
|
}
|
|
}
|
|
- .listen().await()
|
|
|
|
|
|
+ .listen().coAwait()
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -110,14 +104,46 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
|
|
putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
|
|
putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
|
|
}
|
|
}
|
|
|
|
|
|
- inline fun <reified T : Any> Route.jsonResponseHandler(crossinline requestHandler: suspend (RoutingContext) -> @Serializable T) =
|
|
|
|
- checkedCoroutineHandlerUnconfined {
|
|
|
|
|
|
+
|
|
|
|
+ fun Route.coHandlerUnconfined(requestHandler: suspend (RoutingContext) -> Unit): Route =
|
|
|
|
+ /* Some conclusions from the Plaintext test results with trailing `await()`s:
|
|
|
|
+ 1. `launch { /*...*/ }` < `launch(start = CoroutineStart.UNDISPATCHED) { /*...*/ }` < `launch(Dispatchers.Unconfined) { /*...*/ }`.
|
|
|
|
+ 1. `launch { /*...*/ }` without `context` or `start` lead to `io.netty.channel.StacklessClosedChannelException` and `io.netty.channel.unix.Errors$NativeIoException: sendAddress(..) failed: Connection reset by peer`. */
|
|
|
|
+ coHandler(Dispatchers.Unconfined, requestHandler)
|
|
|
|
+
|
|
|
|
+ inline fun <reified T : Any> Route.jsonResponseCoHandler(
|
|
|
|
+ serializer: SerializationStrategy<T>,
|
|
|
|
+ crossinline requestHandler: suspend (RoutingContext) -> @Serializable T
|
|
|
|
+ ) =
|
|
|
|
+ coHandlerUnconfined {
|
|
it.response().run {
|
|
it.response().run {
|
|
putJsonResponseHeader()
|
|
putJsonResponseHeader()
|
|
- end(Json.encodeToString(requestHandler(it)))/*.await()*/
|
|
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ // approach 1
|
|
|
|
+ end(Json.encodeToString(serializer, requestHandler(it)))/*.coAwait()*/
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ // approach 2
|
|
|
|
+ // java.lang.IllegalStateException: You must set the Content-Length header to be the total size of the message body BEFORE sending any data if you are not using HTTP chunked encoding.
|
|
|
|
+ toRawSink().buffered().use { bufferedSink ->
|
|
|
|
+ @OptIn(ExperimentalSerializationApi::class)
|
|
|
|
+ Json.encodeToSink(serializer, requestHandler(it), bufferedSink)
|
|
|
|
+ }
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+ // approach 3
|
|
|
|
+ end(Buffer.buffer().apply {
|
|
|
|
+ toRawSink().buffered().use { bufferedSink ->
|
|
|
|
+ @OptIn(ExperimentalSerializationApi::class)
|
|
|
|
+ Json.encodeToSink(serializer, requestHandler(it), bufferedSink)
|
|
|
|
+ }
|
|
|
|
+ })
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
suspend fun selectRandomWorlds(queries: Int): List<World> {
|
|
suspend fun selectRandomWorlds(queries: Int): List<World> {
|
|
val rowSets = List(queries) {
|
|
val rowSets = List(queries) {
|
|
selectWorldQuery.execute(Tuple.of(randomIntBetween1And10000()))
|
|
selectWorldQuery.execute(Tuple.of(randomIntBetween1And10000()))
|
|
@@ -126,23 +152,23 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
|
|
}
|
|
}
|
|
|
|
|
|
fun Router.routes() {
|
|
fun Router.routes() {
|
|
- get("/json").jsonResponseHandler {
|
|
|
|
|
|
+ get("/json").jsonResponseCoHandler(Serializers.message) {
|
|
jsonSerializationMessage
|
|
jsonSerializationMessage
|
|
}
|
|
}
|
|
|
|
|
|
- get("/db").jsonResponseHandler {
|
|
|
|
- val rowSet = selectWorldQuery.execute(Tuple.of(randomIntBetween1And10000())).await()
|
|
|
|
|
|
+ get("/db").jsonResponseCoHandler(Serializers.world) {
|
|
|
|
+ val rowSet = selectWorldQuery.execute(Tuple.of(randomIntBetween1And10000())).coAwait()
|
|
rowSet.single().toWorld()
|
|
rowSet.single().toWorld()
|
|
}
|
|
}
|
|
|
|
|
|
- get("/queries").jsonResponseHandler {
|
|
|
|
|
|
+ get("/queries").jsonResponseCoHandler(Serializers.worlds) {
|
|
val queries = it.request().getQueries()
|
|
val queries = it.request().getQueries()
|
|
selectRandomWorlds(queries)
|
|
selectRandomWorlds(queries)
|
|
}
|
|
}
|
|
|
|
|
|
- get("/fortunes").checkedCoroutineHandlerUnconfined {
|
|
|
|
|
|
+ get("/fortunes").coHandlerUnconfined {
|
|
val fortunes = mutableListOf<Fortune>()
|
|
val fortunes = mutableListOf<Fortune>()
|
|
- selectFortuneQuery.execute().await()
|
|
|
|
|
|
+ selectFortuneQuery.execute().coAwait()
|
|
.mapTo(fortunes) { it.toFortune() }
|
|
.mapTo(fortunes) { it.toFortune() }
|
|
|
|
|
|
fortunes.add(Fortune(0, "Additional fortune added at request time."))
|
|
fortunes.add(Fortune(0, "Additional fortune added at request time."))
|
|
@@ -173,11 +199,11 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
|
|
it.response().run {
|
|
it.response().run {
|
|
putCommonHeaders()
|
|
putCommonHeaders()
|
|
putHeader(HttpHeaders.CONTENT_TYPE, "text/html; charset=utf-8")
|
|
putHeader(HttpHeaders.CONTENT_TYPE, "text/html; charset=utf-8")
|
|
- end(htmlString)/*.await()*/
|
|
|
|
|
|
+ end(htmlString)/*.coAwait()*/
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- get("/updates").jsonResponseHandler {
|
|
|
|
|
|
+ get("/updates").jsonResponseCoHandler(Serializers.worlds) {
|
|
val queries = it.request().getQueries()
|
|
val queries = it.request().getQueries()
|
|
val worlds = selectRandomWorlds(queries)
|
|
val worlds = selectRandomWorlds(queries)
|
|
val updatedWorlds = worlds.map { it.copy(randomNumber = randomIntBetween1And10000()) }
|
|
val updatedWorlds = worlds.map { it.copy(randomNumber = randomIntBetween1And10000()) }
|
|
@@ -185,7 +211,7 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
|
|
// Approach 1
|
|
// Approach 1
|
|
// The updated worlds need to be sorted first to avoid deadlocks.
|
|
// The updated worlds need to be sorted first to avoid deadlocks.
|
|
updateWordQuery
|
|
updateWordQuery
|
|
- .executeBatch(updatedWorlds.sortedBy { it.id }.map { Tuple.of(it.randomNumber, it.id) }).await()
|
|
|
|
|
|
+ .executeBatch(updatedWorlds.sortedBy { it.id }.map { Tuple.of(it.randomNumber, it.id) }).coAwait()
|
|
|
|
|
|
/*
|
|
/*
|
|
// Approach 2, worse performance
|
|
// Approach 2, worse performance
|
|
@@ -197,11 +223,11 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
|
|
updatedWorlds
|
|
updatedWorlds
|
|
}
|
|
}
|
|
|
|
|
|
- get("/plaintext").checkedCoroutineHandlerUnconfined {
|
|
|
|
|
|
+ get("/plaintext").coHandlerUnconfined {
|
|
it.response().run {
|
|
it.response().run {
|
|
putCommonHeaders()
|
|
putCommonHeaders()
|
|
putHeader(HttpHeaders.CONTENT_TYPE, "text/plain")
|
|
putHeader(HttpHeaders.CONTENT_TYPE, "text/plain")
|
|
- end("Hello, World!")/*.await()*/
|
|
|
|
|
|
+ end("Hello, World!")/*.coAwait()*/
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|