|
@@ -15,15 +15,15 @@ import io.r2dbc.postgresql.PostgresqlConnectionFactory
|
|
|
import io.r2dbc.postgresql.client.SSLMode
|
|
import io.r2dbc.postgresql.client.SSLMode
|
|
|
import io.r2dbc.spi.Connection
|
|
import io.r2dbc.spi.Connection
|
|
|
import io.r2dbc.spi.ConnectionFactory
|
|
import io.r2dbc.spi.ConnectionFactory
|
|
|
-import kotlinx.coroutines.flow.Flow
|
|
|
|
|
-import kotlinx.coroutines.flow.flow
|
|
|
|
|
import kotlinx.coroutines.reactive.awaitFirst
|
|
import kotlinx.coroutines.reactive.awaitFirst
|
|
|
import kotlinx.coroutines.reactive.awaitFirstOrNull
|
|
import kotlinx.coroutines.reactive.awaitFirstOrNull
|
|
|
|
|
+import kotlinx.coroutines.reactor.awaitSingle
|
|
|
import kotlinx.html.*
|
|
import kotlinx.html.*
|
|
|
import reactor.core.publisher.Flux
|
|
import reactor.core.publisher.Flux
|
|
|
import reactor.core.publisher.Mono
|
|
import reactor.core.publisher.Mono
|
|
|
import java.time.Duration
|
|
import java.time.Duration
|
|
|
-import kotlin.random.Random
|
|
|
|
|
|
|
+import java.util.concurrent.ThreadLocalRandom
|
|
|
|
|
+import kotlin.math.min
|
|
|
|
|
|
|
|
const val HELLO_WORLD = "Hello, World!"
|
|
const val HELLO_WORLD = "Hello, World!"
|
|
|
const val WORLD_QUERY = "SELECT id, randomnumber FROM world WHERE id = $1"
|
|
const val WORLD_QUERY = "SELECT id, randomnumber FROM world WHERE id = $1"
|
|
@@ -36,7 +36,6 @@ fun Application.main() {
|
|
|
val dbConnFactory = configurePostgresR2DBC(config)
|
|
val dbConnFactory = configurePostgresR2DBC(config)
|
|
|
|
|
|
|
|
val helloWorldContent = TextContent("Hello, World!", ContentType.Text.Plain)
|
|
val helloWorldContent = TextContent("Hello, World!", ContentType.Text.Plain)
|
|
|
- val random = Random.Default
|
|
|
|
|
|
|
|
|
|
install(DefaultHeaders)
|
|
install(DefaultHeaders)
|
|
|
|
|
|
|
@@ -50,44 +49,18 @@ fun Application.main() {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
get("/db") {
|
|
get("/db") {
|
|
|
- val request = getWorld(dbConnFactory, random)
|
|
|
|
|
- val result = request.awaitFirstOrNull()
|
|
|
|
|
-
|
|
|
|
|
- call.respondJson(result)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- fun selectWorlds(queries: Int, random: Random): Flow<World> = flow {
|
|
|
|
|
- repeat(queries) {
|
|
|
|
|
- emit(getWorld(dbConnFactory, random).awaitFirst())
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ val world = dbConnFactory.fetchWorld()
|
|
|
|
|
+ call.respondJson(world)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
get("/queries") {
|
|
get("/queries") {
|
|
|
val queries = call.queries()
|
|
val queries = call.queries()
|
|
|
-
|
|
|
|
|
- val result = buildList {
|
|
|
|
|
- selectWorlds(queries, random).collect {
|
|
|
|
|
- add(it)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- call.respondJson(result)
|
|
|
|
|
|
|
+ val worlds = dbConnFactory.fetchWorlds(queries)
|
|
|
|
|
+ call.respondJson(worlds)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
get("/fortunes") {
|
|
get("/fortunes") {
|
|
|
- val result = mutableListOf<Fortune>()
|
|
|
|
|
-
|
|
|
|
|
- val request = Flux.usingWhen(dbConnFactory.create(), { connection ->
|
|
|
|
|
- Flux.from(connection.createStatement(FORTUNES_QUERY).execute()).flatMap { r ->
|
|
|
|
|
- Flux.from(r.map { row, _ ->
|
|
|
|
|
- Fortune(
|
|
|
|
|
- row.get(0, Int::class.java)!!, row.get(1, String::class.java)!!
|
|
|
|
|
- )
|
|
|
|
|
- })
|
|
|
|
|
- }
|
|
|
|
|
- }, { connection -> connection.close() })
|
|
|
|
|
-
|
|
|
|
|
- request.collectList().awaitFirstOrNull()?.let { result.addAll(it) }
|
|
|
|
|
|
|
+ val result = dbConnFactory.fetchFortunes().toMutableList()
|
|
|
|
|
|
|
|
result.add(Fortune(0, "Additional fortune added at request time."))
|
|
result.add(Fortune(0, "Additional fortune added at request time."))
|
|
|
result.sortBy { it.message }
|
|
result.sortBy { it.message }
|
|
@@ -113,47 +86,85 @@ fun Application.main() {
|
|
|
get("/updates") {
|
|
get("/updates") {
|
|
|
val queries = call.queries()
|
|
val queries = call.queries()
|
|
|
|
|
|
|
|
- val worlds = selectWorlds(queries, random)
|
|
|
|
|
-
|
|
|
|
|
- val worldsUpdated = buildList {
|
|
|
|
|
- worlds.collect { world ->
|
|
|
|
|
- world.randomNumber = random.nextInt(DB_ROWS) + 1
|
|
|
|
|
- add(world)
|
|
|
|
|
-
|
|
|
|
|
- Mono.usingWhen(dbConnFactory.create(), { connection ->
|
|
|
|
|
- Mono.from(
|
|
|
|
|
- connection.createStatement(UPDATE_QUERY)
|
|
|
|
|
- .bind(0, world.randomNumber)
|
|
|
|
|
- .bind(1, world.id)
|
|
|
|
|
- .execute()
|
|
|
|
|
- ).flatMap { Mono.from(it.rowsUpdated) }
|
|
|
|
|
- }, Connection::close).awaitFirstOrNull()
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- call.respondJson(worldsUpdated)
|
|
|
|
|
|
|
+ val worlds = dbConnFactory.fetchWorlds(queries)
|
|
|
|
|
+ val updatedWorlds = worlds.map {
|
|
|
|
|
+ it.copy(randomNumber = ThreadLocalRandom.current().nextInt(1, DB_ROWS + 1))
|
|
|
|
|
+ }.sortedBy { it.id }
|
|
|
|
|
+
|
|
|
|
|
+ Mono.usingWhen(dbConnFactory.create(), { connection ->
|
|
|
|
|
+ Mono.from(connection.beginTransaction())
|
|
|
|
|
+ .thenMany(
|
|
|
|
|
+ Flux.fromIterable(updatedWorlds)
|
|
|
|
|
+ .concatMap { world ->
|
|
|
|
|
+ Mono.from(
|
|
|
|
|
+ connection.createStatement(UPDATE_QUERY)
|
|
|
|
|
+ .bind("$1", world.randomNumber)
|
|
|
|
|
+ .bind("$2", world.id)
|
|
|
|
|
+ .execute()
|
|
|
|
|
+ ).flatMap { Mono.from(it.rowsUpdated) }
|
|
|
|
|
+ }
|
|
|
|
|
+ )
|
|
|
|
|
+ .then(Mono.from(connection.commitTransaction()))
|
|
|
|
|
+ },
|
|
|
|
|
+ Connection::close,
|
|
|
|
|
+ { connection, _ -> connection.rollbackTransaction() },
|
|
|
|
|
+ { connection -> connection.rollbackTransaction() }
|
|
|
|
|
+ ).awaitFirstOrNull()
|
|
|
|
|
+
|
|
|
|
|
+ call.respondJson(updatedWorlds)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-private fun getWorld(
|
|
|
|
|
- dbConnFactory: ConnectionFactory, random: Random
|
|
|
|
|
-): Mono<World> = Mono.usingWhen(dbConnFactory.create(), { connection ->
|
|
|
|
|
- Mono.from(connection.createStatement(WORLD_QUERY)
|
|
|
|
|
- .bind("$1", random.nextInt(DB_ROWS) + 1)
|
|
|
|
|
- .execute())
|
|
|
|
|
- .flatMap { r ->
|
|
|
|
|
- Mono.from(r.map { row, _ ->
|
|
|
|
|
- val id = row.get(0, Int::class.java)
|
|
|
|
|
- val randomNumber = row.get(1, Int::class.java)
|
|
|
|
|
- if (id != null && randomNumber != null) {
|
|
|
|
|
- World(id, randomNumber)
|
|
|
|
|
- } else {
|
|
|
|
|
- throw IllegalStateException("Database returned null values for required fields")
|
|
|
|
|
- }
|
|
|
|
|
- })
|
|
|
|
|
- }
|
|
|
|
|
-}, Connection::close)
|
|
|
|
|
|
|
+private suspend fun ConnectionFactory.fetchWorld(): World =
|
|
|
|
|
+ Mono.usingWhen(create(), { connection ->
|
|
|
|
|
+ selectWorld(connection)
|
|
|
|
|
+ }, Connection::close).awaitSingle()
|
|
|
|
|
+
|
|
|
|
|
+private suspend fun ConnectionFactory.fetchWorlds(
|
|
|
|
|
+ count: Int
|
|
|
|
|
+): List<World> {
|
|
|
|
|
+ if (count <= 0) return emptyList()
|
|
|
|
|
+ val concurrency = min(count, 32)
|
|
|
|
|
+ return Mono.usingWhen(create(), { connection ->
|
|
|
|
|
+ Flux.range(0, count)
|
|
|
|
|
+ .flatMap({ selectWorldPublisher(connection) }, concurrency)
|
|
|
|
|
+ .collectList()
|
|
|
|
|
+ }, Connection::close).awaitSingle()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+private fun selectWorld(connection: Connection): Mono<World> =
|
|
|
|
|
+ selectWorldPublisher(connection)
|
|
|
|
|
+
|
|
|
|
|
+private fun selectWorldPublisher(connection: Connection): Mono<World> {
|
|
|
|
|
+ val worldId = ThreadLocalRandom.current().nextInt(1, DB_ROWS + 1)
|
|
|
|
|
+ return Mono.from(
|
|
|
|
|
+ connection.createStatement(WORLD_QUERY)
|
|
|
|
|
+ .bind("$1", worldId)
|
|
|
|
|
+ .execute()
|
|
|
|
|
+ ).flatMap { result ->
|
|
|
|
|
+ Mono.from(result.map { row, _ ->
|
|
|
|
|
+ World(
|
|
|
|
|
+ row.get(0, Int::class.java) ?: error("id is null"),
|
|
|
|
|
+ row.get(1, Int::class.java) ?: error("randomNumber is null")
|
|
|
|
|
+ )
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+private suspend fun ConnectionFactory.fetchFortunes(): List<Fortune> =
|
|
|
|
|
+ Mono.usingWhen(create(), { connection ->
|
|
|
|
|
+ Flux.from(connection.createStatement(FORTUNES_QUERY).execute())
|
|
|
|
|
+ .flatMap { result ->
|
|
|
|
|
+ Flux.from(result.map { row, _ ->
|
|
|
|
|
+ Fortune(
|
|
|
|
|
+ row.get(0, Int::class.java) ?: error("id is null"),
|
|
|
|
|
+ row.get(1, String::class.java) ?: error("message is null")
|
|
|
|
|
+ )
|
|
|
|
|
+ })
|
|
|
|
|
+ }
|
|
|
|
|
+ .collectList()
|
|
|
|
|
+ }, Connection::close).awaitSingle()
|
|
|
|
|
|
|
|
private fun configurePostgresR2DBC(config: ApplicationConfig): ConnectionFactory {
|
|
private fun configurePostgresR2DBC(config: ApplicationConfig): ConnectionFactory {
|
|
|
val cfo = PostgresqlConnectionConfiguration.builder()
|
|
val cfo = PostgresqlConnectionConfiguration.builder()
|