Browse Source

Ktor: update benchmark (#2834)

* ktor: enable logback logging

* ktor: upgrade Maven

* ktor: sampling call logger

* ktor: run database tasks on a dedicated thread pool
Sergey Mashkov 8 years ago
parent
commit
77fcaa8968

+ 1 - 1
frameworks/Kotlin/ktor/.mvn/wrapper/maven-wrapper.properties

@@ -1 +1 @@
-distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.3.9/apache-maven-3.3.9-bin.zip
+distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.0/apache-maven-3.5.0-bin.zip

+ 5 - 0
frameworks/Kotlin/ktor/pom.xml

@@ -58,6 +58,11 @@
             <artifactId>mysql-connector-java</artifactId>
             <artifactId>mysql-connector-java</artifactId>
             <version>5.1.41</version>
             <version>5.1.41</version>
         </dependency>
         </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.2.1</version>
+        </dependency>
     </dependencies>
     </dependencies>
 
 
     <build>
     <build>

+ 65 - 43
frameworks/Kotlin/ktor/src/main/kotlin/org/jetbrains/ktor/benchmarks/Hello.kt

@@ -3,6 +3,7 @@ package org.jetbrains.ktor.benchmarks
 import com.google.gson.*
 import com.google.gson.*
 import com.mysql.jdbc.*
 import com.mysql.jdbc.*
 import com.zaxxer.hikari.*
 import com.zaxxer.hikari.*
+import kotlinx.coroutines.experimental.*
 import org.jetbrains.ktor.application.*
 import org.jetbrains.ktor.application.*
 import org.jetbrains.ktor.content.*
 import org.jetbrains.ktor.content.*
 import org.jetbrains.ktor.features.*
 import org.jetbrains.ktor.features.*
@@ -11,8 +12,10 @@ import org.jetbrains.ktor.http.*
 import org.jetbrains.ktor.netty.*
 import org.jetbrains.ktor.netty.*
 import org.jetbrains.ktor.routing.*
 import org.jetbrains.ktor.routing.*
 import org.jetbrains.ktor.util.*
 import org.jetbrains.ktor.util.*
-import java.sql.ResultSet.*
+import java.sql.ResultSet.CONCUR_READ_ONLY
+import java.sql.ResultSet.TYPE_FORWARD_ONLY
 import java.util.concurrent.*
 import java.util.concurrent.*
+import java.util.concurrent.atomic.*
 import javax.sql.*
 import javax.sql.*
 
 
 
 
@@ -27,7 +30,17 @@ fun main(args: Array<String>) {
     Driver::class.java.newInstance()
     Driver::class.java.newInstance()
     val pool = hikari(dbHost, a, b)
     val pool = hikari(dbHost, a, b)
 
 
+    val counter = AtomicInteger()
+    val databaseExecutor = Executors.newFixedThreadPool(256) { r ->
+        Thread(r, "db-${counter.incrementAndGet()}-thread")
+    }
+    val databaseDispatcher = databaseExecutor.asCoroutineDispatcher()
+
     embeddedServer(Netty, 9090) {
     embeddedServer(Netty, 9090) {
+        install(SamplingCallLogging) {
+            samplingFactor = 50000L
+        }
+
         install(DefaultHeaders)
         install(DefaultHeaders)
         routing {
         routing {
             get("/plaintext") { call ->
             get("/plaintext") { call ->
@@ -37,64 +50,72 @@ fun main(args: Array<String>) {
                 call.respond(TextContent(gson.toJson(Message()), ContentType.Application.Json, HttpStatusCode.OK))
                 call.respond(TextContent(gson.toJson(Message()), ContentType.Application.Json, HttpStatusCode.OK))
             }
             }
             get("/db") { call ->
             get("/db") { call ->
-                pool.connection.use { connection ->
-                    val random = ThreadLocalRandom.current()
-                    val queries = call.queries()
-                    val result = mutableListOf<World>()
-
-                    connection.prepareStatement("SELECT * FROM World WHERE id = ?", TYPE_FORWARD_ONLY, CONCUR_READ_ONLY).use { statement ->
-                        for (i in 1..(queries ?: 1)) {
-                            statement.setInt(1, random.nextInt(DbRows) + 1)
-
-                            statement.executeQuery().use { rs ->
-                                while (rs.next()) {
-                                    result += World(rs.getInt("id"), rs.getInt("randomNumber"))
+                val response = run(databaseDispatcher) {
+                    pool.connection.use { connection ->
+                        val random = ThreadLocalRandom.current()
+                        val queries = call.queries()
+                        val result = mutableListOf<World>()
+
+                        connection.prepareStatement("SELECT * FROM World WHERE id = ?", TYPE_FORWARD_ONLY, CONCUR_READ_ONLY).use { statement ->
+                            for (i in 1..(queries ?: 1)) {
+                                statement.setInt(1, random.nextInt(DbRows) + 1)
+
+                                statement.executeQuery().use { rs ->
+                                    while (rs.next()) {
+                                        result += World(rs.getInt("id"), rs.getInt("randomNumber"))
+                                    }
                                 }
                                 }
                             }
                             }
-                        }
 
 
-                        call.respond(TextContent(gson.toJson(when (queries) {
-                            null -> result.single()
-                            else -> result
-                        }), ContentType.Application.Json, HttpStatusCode.OK))
+                            TextContent(gson.toJson(when (queries) {
+                                null -> result.single()
+                                else -> result
+                            }), ContentType.Application.Json, HttpStatusCode.OK)
+                        }
                     }
                     }
                 }
                 }
+
+                call.respond(response)
             }
             }
             get("/updates") {
             get("/updates") {
-                pool.connection.use { connection ->
-                    val queries = call.queries()
-                    val random = ThreadLocalRandom.current()
-                    val result = mutableListOf<World>()
-
-                    connection.prepareStatement("SELECT * FROM World WHERE id = ?", TYPE_FORWARD_ONLY, CONCUR_READ_ONLY).use { statement ->
-                        for (i in 1..(queries ?: 1)) {
-                            statement.setInt(1, random.nextInt(DbRows) + 1)
-
-                            statement.executeQuery().use { rs ->
-                                while (rs.next()) {
-                                    result += World(rs.getInt("id"), rs.getInt("randomNumber"))
+                val t = run(databaseDispatcher) {
+                    pool.connection.use { connection ->
+                        val queries = call.queries()
+                        val random = ThreadLocalRandom.current()
+                        val result = mutableListOf<World>()
+
+                        connection.prepareStatement("SELECT * FROM World WHERE id = ?", TYPE_FORWARD_ONLY, CONCUR_READ_ONLY).use { statement ->
+                            for (i in 1..(queries ?: 1)) {
+                                statement.setInt(1, random.nextInt(DbRows) + 1)
+
+                                statement.executeQuery().use { rs ->
+                                    while (rs.next()) {
+                                        result += World(rs.getInt("id"), rs.getInt("randomNumber"))
+                                    }
                                 }
                                 }
                             }
                             }
-                        }
 
 
-                    }
+                        }
 
 
-                    result.forEach { it.randomNumber = random.nextInt(DbRows) + 1 }
+                        result.forEach { it.randomNumber = random.nextInt(DbRows) + 1 }
 
 
-                    connection.prepareStatement("UPDATE World SET randomNumber = ? WHERE id = ?").use { updateStatement ->
-                        for ((id, randomNumber) in result) {
-                            updateStatement.setInt(1, randomNumber)
-                            updateStatement.setInt(2, id)
+                        connection.prepareStatement("UPDATE World SET randomNumber = ? WHERE id = ?").use { updateStatement ->
+                            for ((id, randomNumber) in result) {
+                                updateStatement.setInt(1, randomNumber)
+                                updateStatement.setInt(2, id)
 
 
-                            updateStatement.executeUpdate()
+                                updateStatement.executeUpdate()
+                            }
                         }
                         }
-                    }
 
 
-                    call.respond(TextContent(gson.toJson(when (queries) {
-                        null -> result.single()
-                        else -> result
-                    }), ContentType.Application.Json, HttpStatusCode.OK))
+                        TextContent(gson.toJson(when (queries) {
+                            null -> result.single()
+                            else -> result
+                        }), ContentType.Application.Json, HttpStatusCode.OK)
+                    }
                 }
                 }
+
+                call.respond(t)
             }
             }
         }
         }
     }.start(true)
     }.start(true)
@@ -115,6 +136,7 @@ private fun hikari(dbHost: String, a: String, b: String): DataSource {
     config.addDataSourceProperty("prepStmtCacheSize", "250")
     config.addDataSourceProperty("prepStmtCacheSize", "250")
     config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048")
     config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048")
     config.driverClassName = Driver::class.java.name
     config.driverClassName = Driver::class.java.name
+    config.connectionTimeout = 10000
 
 
     return HikariDataSource(config)
     return HikariDataSource(config)
 }
 }

+ 63 - 0
frameworks/Kotlin/ktor/src/main/kotlin/org/jetbrains/ktor/benchmarks/SamplingCallLogging.kt

@@ -0,0 +1,63 @@
+package org.jetbrains.ktor.benchmarks
+
+import org.jetbrains.ktor.application.*
+import org.jetbrains.ktor.http.*
+import org.jetbrains.ktor.logging.*
+import org.jetbrains.ktor.pipeline.*
+import org.jetbrains.ktor.request.*
+import org.jetbrains.ktor.util.*
+import java.util.concurrent.atomic.*
+
+// utility logging feature useful for debugging benchmark
+internal class SamplingCallLogging(private val log: ApplicationLog) {
+    private val counter = AtomicLong()
+    var samplingFactor = 10000L
+
+    fun log(call: ApplicationCall) {
+        val v = counter.incrementAndGet()
+        if (v < 0) counter.set(0)
+
+        if (v % samplingFactor == 0L) {
+            logSuccess(call)
+        }
+    }
+
+    private fun logSuccess(call: ApplicationCall) {
+        val status = call.response.status() ?: "Unhandled"
+
+        when (status) {
+            HttpStatusCode.Found -> log.trace("$status: ${call.request.logInfo()} -> ${call.response.headers[HttpHeaders.Location]}")
+            else -> log.trace("$status: ${call.request.logInfo()}")
+        }
+    }
+
+    private fun ApplicationRequest.logInfo() = "${httpMethod.value} - ${path()}"
+
+    companion object : ApplicationFeature<Application, SamplingCallLogging, SamplingCallLogging> {
+        override val key = AttributeKey<SamplingCallLogging>("SamplingCallLogging")
+
+        override fun install(pipeline: Application, configure: SamplingCallLogging.() -> Unit): SamplingCallLogging {
+            pipeline.environment.monitor.logEvents()
+
+            val feature = SamplingCallLogging(pipeline.log)
+            configure(feature)
+
+            val loggingPhase = PipelinePhase("SLogging")
+
+            pipeline.phases.insertBefore(ApplicationCallPipeline.Infrastructure, loggingPhase)
+            pipeline.intercept(loggingPhase) { call ->
+                proceed()
+                feature.log(call)
+            }
+
+            return feature
+        }
+
+        private fun ApplicationMonitor.logEvents() {
+            applicationStarted += { it.log.trace("Application started: $it") }
+            applicationStopped += { it.log.trace("Application stopped: $it") }
+            applicationStarting += { it.log.trace("Application starting: $it") }
+            applicationStopping += { it.log.trace("Application stopping: $it") }
+        }
+    }
+}

+ 15 - 0
frameworks/Kotlin/ktor/src/main/resources/logback.xml

@@ -0,0 +1,15 @@
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{YYYY-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="trace">
+        <appender-ref ref="STDOUT"/>
+    </root>
+
+    <logger name="org.eclipse.jetty" level="INFO"/>
+    <logger name="io.netty" level="INFO"/>
+
+</configuration>