Browse Source

akka-http and akka-http-slick-postgres improvements (#4450)

* akka-http and akka-http-slick-postgres improvements

* remove withExecutionContext

* Revert "remove withExecutionContext"

This reverts commit e4c827ce

* remove experiment with a second dispatcher

* more log silence

* showstopper in fortunes fixed

* Akka HTTP tuning

* some adjustments and port 9000
Sven Ludwig 6 years ago
parent
commit
d8406cc647
17 changed files with 84 additions and 133 deletions
  1. 1 1
      frameworks/Scala/akka-http/akka-http-slick-postgres.dockerfile
  2. 7 7
      frameworks/Scala/akka-http/akka-http-slick-postgres/README.md
  3. 3 3
      frameworks/Scala/akka-http/akka-http-slick-postgres/build.sbt
  4. 1 1
      frameworks/Scala/akka-http/akka-http-slick-postgres/project/build.properties
  5. 17 33
      frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/resources/application.conf
  6. 2 2
      frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/resources/logback.xml
  7. 3 7
      frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/ApiRoutes.scala
  8. 1 5
      frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/ApiSupervisor.scala
  9. 0 30
      frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/AppConfig.scala
  10. 20 13
      frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/fortune/FortuneRoute.scala
  11. 3 7
      frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/world/DbRoute.scala
  12. 7 8
      frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/world/QueriesRoute.scala
  13. 7 7
      frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/world/UpdateRoute.scala
  14. 4 7
      frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/world/WorldRoutes.scala
  15. 6 0
      frameworks/Scala/akka-http/akka-http/README.md
  16. 1 1
      frameworks/Scala/akka-http/akka-http/build.sbt
  17. 1 1
      frameworks/Scala/akka-http/benchmark_config.json

+ 1 - 1
frameworks/Scala/akka-http/akka-http-slick-postgres.dockerfile

@@ -1,4 +1,4 @@
-FROM hseeberger/scala-sbt:8u151-2.12.4-1.1.1
+FROM hseeberger/scala-sbt:8u181_2.12.6_1.2.3
 
 WORKDIR /akka-http-slick-postgres
 

+ 7 - 7
frameworks/Scala/akka-http/akka-http-slick-postgres/README.md

@@ -25,28 +25,28 @@ The tests were run with:
 
 ### JSON
 
-http://localhost:9339/json
+http://localhost:9000/json
 
 ### PLAINTEXT
 
-http://localhost:9339/plaintext
+http://localhost:9000/plaintext
 
 ### DB
 
-http://localhost:9339/db
+http://localhost:9000/db
 
 ### QUERY
 
-http://localhost:9339/query?queries=
+http://localhost:9000/query?queries=
 
 ### CACHED QUERY
 
-http://localhost:9339/cached_query?queries=
+http://localhost:9000/cached_query?queries=
 
 ### UPDATE
 
-http://localhost:9339/update?queries=
+http://localhost:9000/update?queries=
 
 ### FORTUNES
 
-http://localhost:9339/fortunes
+http://localhost:9000/fortunes

+ 3 - 3
frameworks/Scala/akka-http/akka-http-slick-postgres/build.sbt

@@ -43,8 +43,8 @@ lazy val library =
       val scalaCheck        = "1.14.0"
       val scalaTest         = "3.0.5"
       val scalate           = "1.8.0"
-      val slick             = "3.2.3"
-      val slickPg           = "0.17.0"
+      val slick             = "3.3.0"
+      val slickPg           = "0.17.2"
     }
 
     val akkaSlf4j           = "com.typesafe.akka"            %% "akka-slf4j"                  % Version.akka
@@ -74,7 +74,7 @@ lazy val commonSettings =
   Seq(
     organization := "net.benchmark.akka.http",
     organizationName := "Akka",
-    scalaVersion := "2.12.4",
+    scalaVersion := "2.12.6",
     scalacOptions ++= Seq(
       "-deprecation",
       "-encoding",

+ 1 - 1
frameworks/Scala/akka-http/akka-http-slick-postgres/project/build.properties

@@ -1 +1 @@
-sbt.version = 1.1.1
+sbt.version = 1.2.3

+ 17 - 33
frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/resources/application.conf

@@ -19,11 +19,12 @@ akka {
       type = "Dispatcher"
       executor = "fork-join-executor"
       fork-join-executor {
-        parallelism-min = 3
-        parallelism-factor = 1.4
-        parallelism-max = 20
+        parallelism-min = 8
+        parallelism-factor = 1 # one thread per core is enough
+        parallelism-max = 64
         task-peeking-mode = "FIFO"
       }
+      throughput = 64
     }
 
     debug {
@@ -32,6 +33,16 @@ akka {
       unhandled = on
     }
   }
+
+  http {
+    server {
+      backlog = 1256
+      linger-timeout = 25 s
+      max-connections = 1256
+      pipelining-limit = 32
+      response-header-size-hint = 1024
+    }
+  }
 }
 
 akka-http-slick-postgres {
@@ -40,45 +51,18 @@ akka-http-slick-postgres {
     # Listening address
     address = 0.0.0.0
     # Listening port
-    port = 9339
+    port = 9000
     # The dispatcher that will be used for the routing operations.
     # See the official documentation at http://doc.akka.io/docs/akka/current/scala/dispatchers.html for details.
   }
 
-  queries {
-    route-dispatcher-config-path = "akka-http-slick-postgres.custom-routes-dispatcher"
-  }
-
-  updates {
-    route-dispatcher-config-path = "akka-http-slick-postgres.custom-routes-dispatcher"
-  }
-
-  db {
-    route-dispatcher-config-path = "akka-http-slick-postgres.custom-routes-dispatcher"
-  }
-
-  fortunes {
-    route-dispatcher-config-path = "akka-http-slick-postgres.custom-routes-dispatcher"
-  }
-
-  custom-routes-dispatcher {
-    type = "Dispatcher"
-    executor = "fork-join-executor"
-    fork-join-executor {
-      parallelism-min = 3
-      parallelism-factor = 1.4
-      parallelism-max = 20
-      task-peeking-mode = "FIFO"
-    }
-  }
-
   database {
     profile = "net.benchmark.akka.http.db.CustomPostgresProfile$"
     db {
       url = "jdbc:postgresql://tfb-database:5432/hello_world?user=benchmarkdbuser&password=benchmarkdbpass"
     }
-    numThreads = 10
+    numThreads = 40
     queueSize = 20000
-    maxConnections = 10
+    maxConnections = 96
   }
 }

+ 2 - 2
frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/resources/logback.xml

@@ -21,7 +21,7 @@
     <appender-ref ref="STDOUT" />
   </appender>
 
-  <logger name="akka" level="WARN" additivity="false">
+  <logger name="akka" level="ERROR" additivity="false">
     <appender-ref ref="ASYNC_AKKA"/>
   </logger>
 
@@ -29,7 +29,7 @@
 
   <logger name="slick" level="ERROR"/>
 
-  <logger name="net.benchmark" level="INFO"/>
+  <logger name="net.benchmark" level="ERROR"/>
 
   <root level="ERROR">
     <appender-ref ref="ASYNC"/>

+ 3 - 7
frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/ApiRoutes.scala

@@ -16,11 +16,7 @@ object ApiRoutes {
   private val log: Logger = LoggerFactory.getLogger(getClass)
 
   def routes(dbLoader: DatabaseRepositoryLoader,
-             sd: ExecutionContextExecutor,
-             qd: ExecutionContextExecutor,
-             ud: ExecutionContextExecutor,
-             dd: ExecutionContextExecutor,
-             fd: ExecutionContextExecutor)(implicit system: ActorSystem): Route =
+             sd: ExecutionContextExecutor)(implicit system: ActorSystem): Route =
     handleRejections(RejectionHandler.default) {
       val eh: ExceptionHandler = ExceptionHandler {
         case ex @ (_: Exception) =>
@@ -28,8 +24,8 @@ object ApiRoutes {
           complete(StatusCodes.InternalServerError)
       }
 
-      val worldRoutes = new WorldRoutes(dbLoader.loadWorldRepository(), sd, qd, ud, dd)
-      val fortuneRoutes = new FortuneRoute(dbLoader.loadFortuneRepository(), sd, fd)
+      val worldRoutes = new WorldRoutes(dbLoader.loadWorldRepository(), sd)
+      val fortuneRoutes = new FortuneRoute(dbLoader.loadFortuneRepository(), sd)
 
       handleExceptions(eh) {
         get {

+ 1 - 5
frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/ApiSupervisor.scala

@@ -23,10 +23,6 @@ class ApiSupervisor(dbLoader: DatabaseRepositoryLoader, materializer: ActorMater
   implicit val system: ActorSystem = context.system
 
   private val sd = SameThreadDirectExecutor.executionContext()
-  private val qd = context.system.dispatchers.lookup(AppConfig.Queries.routeDispatcherConfigPath)
-  private val ud = context.system.dispatchers.lookup(AppConfig.Updates.routeDispatcherConfigPath)
-  private val dd = context.system.dispatchers.lookup(AppConfig.Db.routeDispatcherConfigPath)
-  private val fd = context.system.dispatchers.lookup(AppConfig.Fortunes.routeDispatcherConfigPath)
 
   @SuppressWarnings(Array("org.wartremover.warts.Any"))
   override def receive: Receive = {
@@ -40,7 +36,7 @@ class ApiSupervisor(dbLoader: DatabaseRepositoryLoader, materializer: ActorMater
 
       import context.dispatcher
 
-      val _ = Http(system).bindAndHandle(ApiRoutes.routes(dbLoader, sd, qd, ud, dd, fd), address, port).pipeTo(self)
+      val _ = Http(system).bindAndHandle(ApiRoutes.routes(dbLoader, sd), address, port).pipeTo(self)
 
       context.become(running(sender()))
 

+ 0 - 30
frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/AppConfig.scala

@@ -1,30 +0,0 @@
-package net.benchmark.akka.http
-
-import com.typesafe.config.{Config, ConfigFactory}
-
-object AppConfig {
-
-  val config: Config = ConfigFactory.load()
-
-  object Queries {
-    val routeDispatcherConfigPath: String =
-      config.getString("akka-http-slick-postgres.queries.route-dispatcher-config-path")
-
-  }
-
-  object Updates {
-    val routeDispatcherConfigPath: String =
-      config.getString("akka-http-slick-postgres.updates.route-dispatcher-config-path")
-
-  }
-
-  object Db {
-    val routeDispatcherConfigPath: String = config.getString("akka-http-slick-postgres.db.route-dispatcher-config-path")
-  }
-
-  object Fortunes {
-    val routeDispatcherConfigPath: String =
-      config.getString("akka-http-slick-postgres.fortunes.route-dispatcher-config-path")
-  }
-
-}

+ 20 - 13
frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/fortune/FortuneRoute.scala

@@ -14,24 +14,33 @@ import net.benchmark.akka.http.util.Deciders
 import org.fusesource.scalate.TemplateEngine
 import slick.basic.DatabasePublisher
 
-import scala.concurrent.{ExecutionContextExecutor, Future}
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
 
-class FortuneRoute(fr: FortuneRepository, sd: ExecutionContextExecutor, fd: ExecutionContextExecutor)(
-    implicit val system: ActorSystem) {
+object FortuneRoute {
+
+  private val ec1 = ExecutionContext.fromExecutorService(java.util.concurrent.Executors.newFixedThreadPool(7))
 
   private val te = new TemplateEngine()
-  private val fmat: ActorMaterializer = ActorMaterializer(Deciders.resumingMat("fmat"))
 
-  private implicit lazy val fm: ToEntityMarshaller[Seq[Fortune]] = {
-    val fortunesTemplate = te.load("/templates/fortunes.mustache")
+  private val fortunesTemplate = FortuneRoute.te.load("/templates/fortunes.mustache")
+
+  private val fm: ToEntityMarshaller[Seq[Fortune]] = {
     Marshaller.opaque { fortunes =>
       HttpEntity(
         contentType = `text/html`.withCharset(`UTF-8`),
-        string = te.layout("", fortunesTemplate, Map("fortunes" -> fortunes))
+        string = FortuneRoute.te.layout("", fortunesTemplate, Map("fortunes" -> fortunes))
       )
     }
   }
 
+}
+
+class FortuneRoute(fr: FortuneRepository, sd: ExecutionContextExecutor)(implicit val system: ActorSystem) {
+
+  private val fmat: ActorMaterializer = ActorMaterializer(Deciders.resumingMat("fmat"))
+
+  private implicit val fmar = FortuneRoute.fm
+
   private def source(p: DatabasePublisher[Fortune]): Source[Fortune, NotUsed] = {
     Source
       .fromPublisher(p)
@@ -41,12 +50,10 @@ class FortuneRoute(fr: FortuneRepository, sd: ExecutionContextExecutor, fd: Exec
   @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
   def route(): Route = {
     path("fortunes") {
-      withExecutionContext(fd) {
-        complete(
-          source(fr.all())
-            .runWith(Sink.seq[Fortune])(fmat)
-            .flatMap(s => Future.successful(s.sortBy(_.message)))(sd))
-      }
+      complete(
+        source(fr.all())
+          .runWith(Sink.seq[Fortune])(fmat)
+          .flatMap(s => Future.successful(s.sortBy(_.message)))(FortuneRoute.ec1))
     }
   }
 

+ 3 - 7
frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/world/DbRoute.scala

@@ -1,10 +1,8 @@
 package net.benchmark.akka.http.world
-import akka.http.scaladsl.server.Directives.{complete, path, withExecutionContext}
+import akka.http.scaladsl.server.Directives.{complete, path}
 import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport._
 
-import scala.concurrent.ExecutionContextExecutor
-
-class DbRoute(wr: WorldRepository, dd: ExecutionContextExecutor) {
+class DbRoute(wr: WorldRepository) {
 
   private def rand(): Int = {
     java.util.concurrent.ThreadLocalRandom.current().nextInt(10000) + 1
@@ -12,9 +10,7 @@ class DbRoute(wr: WorldRepository, dd: ExecutionContextExecutor) {
 
   def route() = {
     path("db") {
-      withExecutionContext(dd) {
-        complete(wr.require(rand()))
-      }
+      complete(wr.require(rand()))
     }
   }
 

+ 7 - 8
frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/world/QueriesRoute.scala

@@ -6,10 +6,9 @@ import akka.http.scaladsl.server.Route
 import akka.stream.scaladsl.Source
 import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport._
 
-import scala.concurrent.ExecutionContextExecutor
 import scala.util.Try
 
-class QueriesRoute(wr: WorldRepository, qd: ExecutionContextExecutor) {
+class QueriesRoute(wr: WorldRepository) {
 
   implicit private val jss: JsonEntityStreamingSupport =
     EntityStreamingSupport.json().withParallelMarshalling(5, unordered = true)
@@ -24,19 +23,19 @@ class QueriesRoute(wr: WorldRepository, qd: ExecutionContextExecutor) {
   }
 
   private def source(n: Int): Source[World, NotUsed] = {
+    val t = if (1 <= n && n < 5) n else 5
+
     Source(1 to n)
       .map(rand)
-      .mapAsync(n)(i => wr.require(i))
+      .mapAsync(t)(i => wr.require(i))
   }
 
   @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
   def route(): Route = {
     path("queries") {
-      withExecutionContext(qd) {
-        parameter('queries.?) { pn: Option[String] =>
-          complete {
-            source(parse(pn))
-          }
+      parameter('queries.?) { pn: Option[String] =>
+        complete {
+          source(parse(pn))
         }
       }
     }

+ 7 - 7
frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/world/UpdateRoute.scala

@@ -8,7 +8,7 @@ import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport._
 import scala.concurrent.ExecutionContextExecutor
 import scala.util.Try
 
-class UpdateRoute(wr: WorldRepository, ud: ExecutionContextExecutor, sd: ExecutionContextExecutor) {
+class UpdateRoute(wr: WorldRepository, sd: ExecutionContextExecutor) {
 
   implicit private val jss: JsonEntityStreamingSupport =
     EntityStreamingSupport.json().withParallelMarshalling(5, unordered = true)
@@ -27,10 +27,12 @@ class UpdateRoute(wr: WorldRepository, ud: ExecutionContextExecutor, sd: Executi
   }
 
   private def source(n: Int): Source[World, NotUsed] = {
+    val t = if (1 <= n && n < 5) n else 5
+
     Source(1 to n)
       .map(rand)
-      .mapAsync(n)(wr.require)
-      .mapAsync(n) { w =>
+      .mapAsync(t)(wr.require)
+      .mapAsync(t) { w =>
         val wn = w.copy(randomNumber = rand())
         wr.update(wn).map(_ => wn)(sd)
       }
@@ -38,10 +40,8 @@ class UpdateRoute(wr: WorldRepository, ud: ExecutionContextExecutor, sd: Executi
 
   def route() = {
     path("updates") {
-      withExecutionContext(ud) {
-        parameter('queries.?) { pn =>
-          complete(source(parse(pn)))
-        }
+      parameter('queries.?) { pn =>
+        complete(source(parse(pn)))
       }
     }
   }

+ 4 - 7
frameworks/Scala/akka-http/akka-http-slick-postgres/src/main/scala/net/benchmark/akka/http/world/WorldRoutes.scala

@@ -5,14 +5,11 @@ import akka.http.scaladsl.server.Route
 import scala.concurrent.ExecutionContextExecutor
 
 class WorldRoutes(wr: WorldRepository,
-                  sd: ExecutionContextExecutor,
-                  qd: ExecutionContextExecutor,
-                  ud: ExecutionContextExecutor,
-                  dd: ExecutionContextExecutor) {
+                  sd: ExecutionContextExecutor) {
 
-  private val qr = new QueriesRoute(wr, qd).route()
-  private val ur = new UpdateRoute(wr, ud, sd).route()
-  private val dr = new DbRoute(wr, dd).route()
+  private val qr = new QueriesRoute(wr).route()
+  private val ur = new UpdateRoute(wr, sd).route()
+  private val dr = new DbRoute(wr).route()
 
   @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
   def routes(): Route = {

+ 6 - 0
frameworks/Scala/akka-http/akka-http/README.md

@@ -0,0 +1,6 @@
+# akka-http Benchmarking Tests
+
+See the README.md files of the individual permutations:
+
+akka-http
+akka-http-slick-postgres

+ 1 - 1
frameworks/Scala/akka-http/akka-http/build.sbt

@@ -6,7 +6,7 @@ name := "akka-http-benchmark"
 
 version := "0.1.0-SNAPSHOT"
 
-scalaVersion := "2.12.6"
+scalaVersion := "2.12.5"
 
 resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
 

+ 1 - 1
frameworks/Scala/akka-http/benchmark_config.json

@@ -30,7 +30,7 @@
         "query_url": "/queries?queries=",
         "fortune_url": "/fortunes",
         "update_url": "/updates?queries=",
-        "port": 9339,
+        "port": 9000,
         "approach": "Realistic",
         "classification": "Micro",
         "database": "Postgres",