|
@@ -15,12 +15,10 @@ import io.ktor.routing.get
|
|
|
import io.ktor.routing.routing
|
|
|
import io.ktor.server.engine.embeddedServer
|
|
|
import io.ktor.server.netty.Netty
|
|
|
-import io.reactiverse.pgclient.PgPoolOptions
|
|
|
-import io.reactiverse.reactivex.pgclient.PgClient
|
|
|
-import io.reactiverse.reactivex.pgclient.PgRowSet
|
|
|
-import io.reactiverse.reactivex.pgclient.Row
|
|
|
-import io.reactiverse.reactivex.pgclient.Tuple
|
|
|
-import kotlinx.coroutines.rx2.await
|
|
|
+import io.reactiverse.kotlin.pgclient.getConnectionAwait
|
|
|
+import io.reactiverse.kotlin.pgclient.preparedBatchAwait
|
|
|
+import io.reactiverse.kotlin.pgclient.preparedQueryAwait
|
|
|
+import io.reactiverse.pgclient.*
|
|
|
import kotlinx.html.*
|
|
|
import kotlinx.serialization.Serializable
|
|
|
import kotlinx.serialization.json.JSON
|
|
@@ -82,20 +80,11 @@ class JasyncRepository() : Repository {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-fun PgRowSet.rows(): List<Row> {
|
|
|
- val rows = mutableListOf<Row>()
|
|
|
- val iterator = iterator()
|
|
|
- while (iterator.hasNext()) {
|
|
|
- rows.add(iterator.next())
|
|
|
- }
|
|
|
- return rows
|
|
|
-}
|
|
|
-
|
|
|
class ReactivePGRepository : Repository {
|
|
|
- private val poolOptions = PgPoolOptions()
|
|
|
- private val db: PgClient
|
|
|
+ private val db: PgPool
|
|
|
|
|
|
init {
|
|
|
+ val poolOptions = PgPoolOptions()
|
|
|
poolOptions.apply {
|
|
|
host = "tfb-database"
|
|
|
database = "hello_world"
|
|
@@ -108,20 +97,20 @@ class ReactivePGRepository : Repository {
|
|
|
}
|
|
|
|
|
|
override suspend fun getFortunes(): List<Fortune> {
|
|
|
- val results = db.rxPreparedQuery("select id, message from fortune").await()
|
|
|
- return results.rows().map { Fortune(it.getInteger(0), it.getString(1)) }
|
|
|
+ val results = db.preparedQueryAwait("select id, message from fortune")
|
|
|
+ return results.map { Fortune(it.getInteger(0), it.getString(1)) }
|
|
|
}
|
|
|
|
|
|
override suspend fun getWorld(): World {
|
|
|
val worldId = rand.nextInt(1, 10000)
|
|
|
- val result = db.rxPreparedQuery("select id, randomNumber from world where id = $1", Tuple.of(worldId)).await()
|
|
|
- val row = result.rows().first()
|
|
|
+ val result = db.preparedQueryAwait("select id, randomNumber from world where id = $1", Tuple.of(worldId))
|
|
|
+ val row = result.first()
|
|
|
return World(row.getInteger(0), row.getInteger(1)!!)
|
|
|
}
|
|
|
|
|
|
override suspend fun updateWorlds(worlds: List<World>) {
|
|
|
val batch = worlds.map { Tuple.of(it.id, it.randomNumber) }
|
|
|
- db.rxPreparedBatch("update world set randomNumber = $1 where id = $2", batch).await()
|
|
|
+ db.preparedBatchAwait("update world set randomNumber = $1 where id = $2", batch)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -165,7 +154,7 @@ class FortuneTemplate(val fortunes: List<Fortune>, val main: MainTemplate = Main
|
|
|
}
|
|
|
|
|
|
fun main(args: Array<String>) {
|
|
|
- val db = when(args.first()) {
|
|
|
+ val db = when(args.firstOrNull()) {
|
|
|
"jasync-sql" -> JasyncRepository()
|
|
|
"reactive-pg" -> ReactivePGRepository()
|
|
|
else -> throw IllegalArgumentException("Must specify a postgres client")
|
|
@@ -221,4 +210,4 @@ fun main(args: Array<String>) {
|
|
|
}
|
|
|
|
|
|
server.start(wait = true)
|
|
|
-}
|
|
|
+}
|