Browse Source

improved updated/queries via Flows

Ilya Nemtsev 7 months ago
parent
commit
15a1faac30

+ 19 - 14
frameworks/Kotlin/ktor/ktor-r2dbc/src/main/kotlin/org/jetbrains/ktor/benchmarks/Hello.kt

@@ -20,6 +20,9 @@ import io.r2dbc.spi.ConnectionFactories
 import io.r2dbc.spi.ConnectionFactory
 import io.r2dbc.spi.ConnectionFactoryOptions
 import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.reactive.asFlow
+import kotlinx.coroutines.reactive.awaitFirst
 import kotlinx.coroutines.reactive.awaitFirstOrNull
 import kotlinx.html.*
 import kotlinx.serialization.encodeToString
@@ -62,24 +65,21 @@ fun Application.main() {
             call.respondText(Json.encodeToString(result), ContentType.Application.Json)
         }
 
-        suspend fun selectWorlds(queries: Int, random: Random): List<World> = coroutineScope {
-            val result = ArrayList<Deferred<World?>>(queries)
-
+        fun selectWorlds(queries: Int, random: Random): Flow<World> = flow {
             repeat(queries) {
-                val deferred = async {
-                    getWorld(dbConnFactory, random).awaitFirstOrNull()
-                }
-                result.add(deferred)
+                emit(getWorld(dbConnFactory, random).awaitFirst())
             }
-
-            result.awaitAll().filterNotNull()
         }
 
         get("/queries") {
             val queries = call.queries()
             val random = Random.Default
 
-            val result = selectWorlds(queries, random)
+            val result = buildList {
+                selectWorlds(queries, random).collect {
+                    add(it)
+                }
+            }
 
             call.respondText(Json.encodeToString(result), ContentType.Application.Json)
         }
@@ -127,9 +127,14 @@ fun Application.main() {
             val result = coroutineScope {
                 val worlds = selectWorlds(queries, random)
 
-                worlds.forEach { it.randomNumber = random.nextInt(DB_ROWS) + 1 }
+                val worldsUpdated = buildList {
+                    worlds.collect {
+                        it.randomNumber = random.nextInt(DB_ROWS) + 1
+                        add(it)
+                    }
+                }
 
-                val updateRequests = worlds.map { world ->
+                val updateRequests = worldsUpdated.map { world ->
                     Mono.usingWhen(dbConnFactory.create(), { connection ->
                         Mono.from(
                             connection.createStatement(UPDATE_QUERY).bind(0, world.randomNumber).bind(1, world.id)
@@ -138,8 +143,8 @@ fun Application.main() {
                     }, Connection::close)
                 }
 
-                Flux.merge(updateRequests).collectList().awaitFirstOrNull()
-                worlds
+                Flux.merge(updateRequests).collectList().awaitFirst()
+                worldsUpdated
             }
 
             call.respondText(Json.encodeToString(result), ContentType.Application.Json)