Browse Source

fixed test and some perf bugs

Ilya Nemtsev 7 months ago
parent
commit
83abea6d2a

+ 5 - 5
frameworks/Kotlin/ktor/ktor-r2dbc.dockerfile

@@ -1,12 +1,12 @@
 FROM maven:3.9.9-amazoncorretto-21-debian-bookworm as maven
-WORKDIR /ktor
-COPY ktor/pom.xml pom.xml
-COPY ktor/src src
+WORKDIR /ktor-r2dbc
+COPY ktor-r2dbc/pom.xml pom.xml
+COPY ktor-r2dbc/src src
 RUN mvn clean package -q
 
 FROM amazoncorretto:21-al2023-headless
-WORKDIR /ktor
-COPY --from=maven /ktor/target/tech-empower-framework-benchmark-1.0-SNAPSHOT-netty-bundle.jar app.jar
+WORKDIR /ktor-r2dbc
+COPY --from=maven /ktor-r2dbc/target/tech-empower-framework-benchmark-1.0-SNAPSHOT-netty-bundle.jar app.jar
 
 EXPOSE 9090
 

+ 14 - 3
frameworks/Kotlin/ktor/ktor-r2dbc/pom.xml

@@ -18,9 +18,12 @@
         <serialization.version>1.7.3</serialization.version>
         <kotlinx.html.version>0.11.0</kotlinx.html.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <logback.version>1.2.13</logback.version>
+        <logback.version>1.5.12</logback.version>
+        <reactor.version>3.7.1</reactor.version>
         <postgresql.version>42.7.4</postgresql.version>
         <r2dbc.version>1.0.7.RELEASE</r2dbc.version>
+        <r2dbc.pool.version>1.0.2.RELEASE</r2dbc.pool.version>
+
     </properties>
 
     <dependencies>
@@ -64,13 +67,21 @@
             <artifactId>ktor-server-html-builder-jvm</artifactId>
             <version>${ktor.version}</version>
         </dependency>
-
         <dependency>
             <groupId>org.postgresql</groupId>
             <artifactId>r2dbc-postgresql</artifactId>
             <version>${r2dbc.version}</version>
         </dependency>
-
+        <dependency>
+            <groupId>io.r2dbc</groupId>
+            <artifactId>r2dbc-pool</artifactId>
+            <version>${r2dbc.pool.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+            <version>${reactor.version}</version>
+        </dependency>
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>

+ 35 - 10
frameworks/Kotlin/ktor/ktor-r2dbc/src/main/kotlin/org/jetbrains/ktor/benchmarks/Hello.kt

@@ -8,6 +8,14 @@ import io.ktor.server.html.*
 import io.ktor.server.plugins.defaultheaders.*
 import io.ktor.server.response.*
 import io.ktor.server.routing.*
+import io.r2dbc.pool.ConnectionPool
+import io.r2dbc.pool.ConnectionPoolConfiguration
+import io.r2dbc.pool.PoolingConnectionFactoryProvider
+import io.r2dbc.postgresql.PostgresqlConnectionConfiguration
+import io.r2dbc.postgresql.PostgresqlConnectionFactory
+import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
+import io.r2dbc.postgresql.client.SSLMode
+import io.r2dbc.spi.Connection
 import io.r2dbc.spi.ConnectionFactories
 import io.r2dbc.spi.ConnectionFactory
 import io.r2dbc.spi.ConnectionFactoryOptions
@@ -25,6 +33,8 @@ import org.jetbrains.ktor.benchmarks.models.Message
 import org.jetbrains.ktor.benchmarks.models.World
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import reactor.netty.resources.LoopResources
+import java.time.Duration
 import kotlin.random.Random
 
 fun Application.main() {
@@ -125,7 +135,7 @@ fun Application.main() {
                             connection.createStatement(UPDATE_QUERY).bind(0, world.randomNumber).bind(1, world.id)
                                 .execute()
                         ).flatMap { Mono.from(it.rowsUpdated) }
-                    }, { connection -> connection.close() })
+                    }, Connection::close)
                 }
 
                 Flux.merge(updateRequests).collectList().awaitFirstOrNull()
@@ -147,23 +157,38 @@ private fun getWorld(
             )
         })
     }
-}, { connection -> connection.close() })
+}, Connection::close)
 
 private fun configurePostgresR2DBC(config: ApplicationConfig): ConnectionFactory {
-    val options = ConnectionFactoryOptions.builder().option(ConnectionFactoryOptions.DRIVER, "database.driver")
-        .option(ConnectionFactoryOptions.DATABASE, config.property("database.url").getString())
-        .option(ConnectionFactoryOptions.USER, config.property("database.user").getString())
-        .option(ConnectionFactoryOptions.PASSWORD, config.property("database.password").getString()).build()
-
-    return ConnectionFactories.get(options)
+    val cfo = PostgresqlConnectionConfiguration.builder()
+        .host(config.property("db.host").getString())
+        .port(config.property("db.port").getString().toInt())
+        .database(config.property("db.database").getString())
+        .username(config.property("db.username").getString())
+        .password(config.property("db.password").getString())
+        .loopResources { NioClientEventLoopResources(Runtime.getRuntime().availableProcessors()).cacheLoops() }
+        .sslMode(SSLMode.DISABLE)
+        .tcpKeepAlive(true)
+        .tcpNoDelay(true)
+        .build()
+
+    val cf = PostgresqlConnectionFactory(cfo)
+
+    val cp = ConnectionPoolConfiguration.builder(cf)
+        .initialSize(config.property("db.initPoolSize").getString().toInt())
+        .maxSize(config.property("db.maxPoolSize").getString().toInt())
+        //.maxLifeTime(Duration.ofMillis(Long.MAX_VALUE))
+        .build()
+
+    return ConnectionPool(cp)
 }
 
 private fun ApplicationCall.queries() = request.queryParameters["queries"]?.toIntOrNull()?.coerceIn(1, 500) ?: 1
 
 
 object Constants {
-    const val WORLD_QUERY = "SELECT id, randomNumber FROM World WHERE id = ?"
+    const val WORLD_QUERY = "SELECT id, randomnumber FROM world WHERE id = $1"
     const val FORTUNES_QUERY = "SELECT id, message FROM fortune"
-    const val UPDATE_QUERY = "UPDATE World SET randomNumber = ? WHERE id = ?"
+    const val UPDATE_QUERY = "UPDATE world SET randomnumber = $1 WHERE id = $2"
     const val DB_ROWS = 10000
 }

+ 123 - 0
frameworks/Kotlin/ktor/ktor-r2dbc/src/main/kotlin/org/jetbrains/ktor/benchmarks/NioClientEventLoopResources.java

@@ -0,0 +1,123 @@
+package org.jetbrains.ktor.benchmarks;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.DatagramChannel;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.ThreadPerTaskExecutor;
+import reactor.core.publisher.Mono;
+import reactor.netty.FutureMono;
+import reactor.netty.resources.LoopResources;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Copied from GitHub issue comment: https://github.com/r2dbc/r2dbc-pool/issues/190#issuecomment-1566845190
+ */
+public class NioClientEventLoopResources implements LoopResources {
+    public static final String THREAD_PREFIX = "prefix-";
+    final int threads;
+    final AtomicReference<EventLoopGroup> loops = new AtomicReference<>();
+    final AtomicBoolean running;
+
+    NioClientEventLoopResources(int threads) {
+        this.running = new AtomicBoolean(true);
+        this.threads = threads;
+    }
+
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
+        return Mono.defer(() -> {
+        long quietPeriodMillis = quietPeriod.toMillis();
+        long timeoutMillis = timeout.toMillis();
+        EventLoopGroup serverLoopsGroup = loops.get();
+        Mono<?> slMono = Mono.empty();
+        if (running.compareAndSet(true, false)) {
+            if (serverLoopsGroup != null) {
+                slMono = FutureMono.from((Future) serverLoopsGroup.shutdownGracefully(
+                        quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
+            }
+        }
+        return Mono.when(slMono);
+    });
+    }
+
+    @Override
+    public boolean isDisposed() {
+        return !running.get();
+    }
+
+    @Override
+    public EventLoopGroup onClient(boolean useNative) {
+        return cacheLoops();
+    }
+
+    @Override
+    public EventLoopGroup onServer(boolean useNative) {
+        throw new UnsupportedOperationException("This event loop is designed only for client DB calls.");
+    }
+
+    @Override
+    public EventLoopGroup onServerSelect(boolean useNative) {
+        throw new UnsupportedOperationException("This event loop is designed only for client DB calls.");
+    }
+
+    @Override
+    public <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoopGroup group) {
+        if (channelType.equals(SocketChannel.class)) {
+                return (CHANNEL) new NioSocketChannel();
+            }
+            if (channelType.equals(ServerSocketChannel.class)) {
+                    return (CHANNEL) new NioServerSocketChannel();
+                }
+                if (channelType.equals(DatagramChannel.class)) {
+                        return (CHANNEL) new NioDatagramChannel();
+                    }
+                    throw new IllegalArgumentException("Unsupported channel type: " + channelType.getSimpleName());
+    }
+
+    @Override
+    public <CHANNEL extends Channel> Class<? extends CHANNEL> onChannelClass(Class<CHANNEL> channelType,
+        EventLoopGroup group) {
+        if (channelType.equals(SocketChannel.class)) {
+                return (Class<? extends CHANNEL>) NioSocketChannel.class;
+            }
+            if (channelType.equals(ServerSocketChannel.class)) {
+                    return (Class<? extends CHANNEL>) NioServerSocketChannel.class;
+                }
+                if (channelType.equals(DatagramChannel.class)) {
+                        return (Class<? extends CHANNEL>) NioDatagramChannel.class;
+                    }
+                    throw new IllegalArgumentException("Unsupported channel type: " + channelType.getSimpleName());
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    EventLoopGroup cacheLoops() {
+        EventLoopGroup eventLoopGroup = loops.get();
+        if (null == eventLoopGroup) {
+            EventLoopGroup newEventLoopGroup = createNewEventLoopGroup();
+            if (!loops.compareAndSet(null, newEventLoopGroup)) {
+                //"FutureReturnValueIgnored" this is deliberate
+                newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
+            }
+            eventLoopGroup = cacheLoops();
+        }
+        return eventLoopGroup;
+    }
+
+    private NioEventLoopGroup createNewEventLoopGroup() {
+        return new NioEventLoopGroup(threads, new ThreadPerTaskExecutor(new DefaultThreadFactory(THREAD_PREFIX)));
+    }
+}

+ 10 - 4
frameworks/Kotlin/ktor/ktor-r2dbc/src/main/resources/application.conf

@@ -11,10 +11,16 @@ ktor {
     }
 }
 
-database {
-    driver = "org.postgresql.Driver"
-    url = "url: r2dbc:pool://tfb-database:5432/hello_world?loggerLevel=OFF&sslmode=disable"
-    poolsize = 512
+db {
+    driver = "pool"
+    protocol = "postgresql"
+    ssl = "false"
+    host = "tfb-database"
+    port = 5432
+    database = "hello_world"
+    initPoolSize = 512
+    maxPoolSize = 512
     username = "benchmarkdbuser"
     password = "benchmarkdbpass"
+    //url = "r2dbc:postgresql://"${db.host}":"${db.port}"/"${db.database}"?loggerLevel=OFF&disableColumnSanitiser=true&assumeMinServerVersion=16&sslmode=disable&maxSize="${db.poolSize}
 }