|
@@ -1,17 +1,15 @@
|
|
package http4s.techempower.benchmark
|
|
package http4s.techempower.benchmark
|
|
|
|
|
|
-import java.util.concurrent.ThreadLocalRandom
|
|
|
|
|
|
+import java.util.concurrent.Executors
|
|
|
|
|
|
-import cats.effect._
|
|
|
|
-import cats.instances.list._
|
|
|
|
-import cats.syntax.parallel._
|
|
|
|
-import cats.syntax.traverse._
|
|
|
|
|
|
+import scala.concurrent.ExecutionContext
|
|
|
|
+import cats.effect.{ExitCode, IO, IOApp, Resource}
|
|
|
|
+import com.typesafe.config.ConfigValueFactory
|
|
import io.circe.generic.auto._
|
|
import io.circe.generic.auto._
|
|
import io.circe.syntax._
|
|
import io.circe.syntax._
|
|
-import doobie._
|
|
|
|
-import doobie.implicits._
|
|
|
|
-import doobie.hikari.HikariTransactor
|
|
|
|
-import doobie.util.ExecutionContexts
|
|
|
|
|
|
+import io.getquill.util.LoadConfig
|
|
|
|
+import io.getquill.LowerCase
|
|
|
|
+import io.getquill.PostgresJAsyncContext
|
|
import org.http4s._
|
|
import org.http4s._
|
|
import org.http4s.dsl._
|
|
import org.http4s.dsl._
|
|
import org.http4s.circe._
|
|
import org.http4s.circe._
|
|
@@ -20,9 +18,9 @@ import org.http4s.server.Router
|
|
import org.http4s.server.blaze.BlazeServerBuilder
|
|
import org.http4s.server.blaze.BlazeServerBuilder
|
|
import org.http4s.twirl._
|
|
import org.http4s.twirl._
|
|
|
|
|
|
-case class Message(message: String)
|
|
|
|
-case class World(id: Int, randomNumber: Int)
|
|
|
|
-case class Fortune(id: Int, message: String)
|
|
|
|
|
|
+final case class Message(message: String)
|
|
|
|
+final case class World(id: Int, randomNumber: Int)
|
|
|
|
+final case class Fortune(id: Int, message: String)
|
|
|
|
|
|
// Extract queries parameter (with default and min/maxed)
|
|
// Extract queries parameter (with default and min/maxed)
|
|
object Queries {
|
|
object Queries {
|
|
@@ -35,72 +33,25 @@ object Queries {
|
|
}
|
|
}
|
|
|
|
|
|
object WebServer extends IOApp with Http4sDsl[IO] {
|
|
object WebServer extends IOApp with Http4sDsl[IO] {
|
|
- def openDatabase(host: String,
|
|
|
|
- poolSize: Int): Resource[IO, HikariTransactor[IO]] =
|
|
|
|
|
|
+ def makeDatabaseService(
|
|
|
|
+ host: String,
|
|
|
|
+ poolSize: Int
|
|
|
|
+ ): Resource[IO, DatabaseService] = {
|
|
for {
|
|
for {
|
|
- ce <- ExecutionContexts.fixedThreadPool[IO](32) // our connect EC
|
|
|
|
- be <- Blocker[IO] // our blocking EC
|
|
|
|
- xa <- HikariTransactor.newHikariTransactor[IO](
|
|
|
|
- "org.postgresql.Driver",
|
|
|
|
- s"jdbc:postgresql://$host/hello_world",
|
|
|
|
- "benchmarkdbuser",
|
|
|
|
- "benchmarkdbpass",
|
|
|
|
- ce,
|
|
|
|
- be
|
|
|
|
- )
|
|
|
|
- _ <- Resource.liftF(
|
|
|
|
- xa.configure(
|
|
|
|
- ds =>
|
|
|
|
- IO {
|
|
|
|
- ds.setMaximumPoolSize(poolSize)
|
|
|
|
- ds.setMinimumIdle(poolSize)
|
|
|
|
- }
|
|
|
|
- )
|
|
|
|
- )
|
|
|
|
- } yield xa
|
|
|
|
-
|
|
|
|
- // Provide a random number between 1 and 10000 (inclusive)
|
|
|
|
- val randomWorldId: IO[Int] = IO(ThreadLocalRandom.current.nextInt(1, 10001))
|
|
|
|
-
|
|
|
|
- // Update the randomNumber field with a random number
|
|
|
|
- def updateRandomNumber(world: World): IO[World] =
|
|
|
|
- randomWorldId.map(id => world.copy(randomNumber = id))
|
|
|
|
-
|
|
|
|
- // Select a World object from the database by ID
|
|
|
|
- def selectWorld(xa: Transactor[IO], id: Int): IO[World] =
|
|
|
|
- sql"select id, randomNumber from World where id = $id"
|
|
|
|
- .query[World]
|
|
|
|
- .unique
|
|
|
|
- .transact(xa)
|
|
|
|
-
|
|
|
|
- // Select a random World object from the database
|
|
|
|
- def selectRandomWorld(xa: Transactor[IO]): IO[World] =
|
|
|
|
- randomWorldId flatMap { id =>
|
|
|
|
- selectWorld(xa, id)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Select a specified number of random World objects from the database
|
|
|
|
- def getWorlds(xa: Transactor[IO], numQueries: Int): IO[List[World]] =
|
|
|
|
- (0 until numQueries).toList.parTraverse(_ => selectRandomWorld(xa))
|
|
|
|
-
|
|
|
|
- // Update the randomNumber field with a new random number, for a list of World objects
|
|
|
|
- def getNewWorlds(worlds: List[World]): IO[List[World]] =
|
|
|
|
- worlds.traverse(updateRandomNumber)
|
|
|
|
-
|
|
|
|
- // Update the randomNumber column in the database for a specified set of World objects,
|
|
|
|
- // this uses a batch update SQL call.
|
|
|
|
- def updateWorlds(xa: Transactor[IO], newWorlds: List[World]): IO[Int] = {
|
|
|
|
- val sql = "update World set randomNumber = ? where id = ?"
|
|
|
|
- // Reason for sorting: https://github.com/TechEmpower/FrameworkBenchmarks/pull/4214#issuecomment-489358881
|
|
|
|
- val update = Update[(Int, Int)](sql)
|
|
|
|
- .updateMany(newWorlds.sortBy(_.id).map(w => (w.randomNumber, w.id)))
|
|
|
|
- update.transact(xa)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // Retrieve all fortunes from the database
|
|
|
|
- def getFortunes(xa: Transactor[IO]): IO[List[Fortune]] = {
|
|
|
|
- val query = sql"select id, message from Fortune".query[Fortune]
|
|
|
|
- query.to[List].transact(xa)
|
|
|
|
|
|
+ executor <- Resource(IO {
|
|
|
|
+ val pool = Executors.newFixedThreadPool(poolSize)
|
|
|
|
+ (pool, IO(pool.shutdown()))
|
|
|
|
+ })
|
|
|
|
+ ctx <- Resource.fromAutoCloseable(IO(new PostgresJAsyncContext(
|
|
|
|
+ LowerCase,
|
|
|
|
+ LoadConfig("ctx")
|
|
|
|
+ .withValue("host", ConfigValueFactory.fromAnyRef(host))
|
|
|
|
+ .withValue(
|
|
|
|
+ "maxActiveConnections",
|
|
|
|
+ ConfigValueFactory.fromAnyRef(poolSize)
|
|
|
|
+ )
|
|
|
|
+ )))
|
|
|
|
+ } yield new DatabaseService(ctx, executor)
|
|
}
|
|
}
|
|
|
|
|
|
// Add a new fortune to an existing list, and sort by message.
|
|
// Add a new fortune to an existing list, and sort by message.
|
|
@@ -116,7 +67,7 @@ object WebServer extends IOApp with Http4sDsl[IO] {
|
|
}
|
|
}
|
|
|
|
|
|
// HTTP service definition
|
|
// HTTP service definition
|
|
- def service(xa: Transactor[IO]) =
|
|
|
|
|
|
+ def service(db: DatabaseService) =
|
|
addServerHeader(HttpRoutes.of[IO] {
|
|
addServerHeader(HttpRoutes.of[IO] {
|
|
case GET -> Root / "plaintext" =>
|
|
case GET -> Root / "plaintext" =>
|
|
Ok("Hello, World!")
|
|
Ok("Hello, World!")
|
|
@@ -125,41 +76,43 @@ object WebServer extends IOApp with Http4sDsl[IO] {
|
|
Ok(Message("Hello, World!").asJson)
|
|
Ok(Message("Hello, World!").asJson)
|
|
|
|
|
|
case GET -> Root / "db" =>
|
|
case GET -> Root / "db" =>
|
|
- Ok(selectRandomWorld(xa).map(_.asJson))
|
|
|
|
|
|
+ Ok(db.selectRandomWorld().map(_.asJson))
|
|
|
|
|
|
case GET -> Root / "queries" :? Queries(numQueries) =>
|
|
case GET -> Root / "queries" :? Queries(numQueries) =>
|
|
- Ok(getWorlds(xa, numQueries).map(_.asJson))
|
|
|
|
|
|
+ Ok(db.getWorlds(numQueries).map(_.asJson))
|
|
|
|
|
|
case GET -> Root / "fortunes" =>
|
|
case GET -> Root / "fortunes" =>
|
|
Ok(for {
|
|
Ok(for {
|
|
- oldFortunes <- getFortunes(xa)
|
|
|
|
|
|
+ oldFortunes <- db.getFortunes()
|
|
newFortunes = getSortedFortunes(oldFortunes)
|
|
newFortunes = getSortedFortunes(oldFortunes)
|
|
} yield html.index(newFortunes))
|
|
} yield html.index(newFortunes))
|
|
|
|
|
|
case GET -> Root / "updates" :? Queries(numQueries) =>
|
|
case GET -> Root / "updates" :? Queries(numQueries) =>
|
|
Ok(for {
|
|
Ok(for {
|
|
- worlds <- getWorlds(xa, numQueries)
|
|
|
|
- newWorlds <- getNewWorlds(worlds)
|
|
|
|
- _ <- updateWorlds(xa, newWorlds)
|
|
|
|
|
|
+ worlds <- db.getWorlds(numQueries)
|
|
|
|
+ newWorlds <- db.getNewWorlds(worlds)
|
|
|
|
+ _ <- db.updateWorlds(newWorlds)
|
|
} yield newWorlds.asJson)
|
|
} yield newWorlds.asJson)
|
|
})
|
|
})
|
|
|
|
|
|
|
|
+ val blazeEc = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(32))
|
|
|
|
+
|
|
// Given a fully constructed HttpService, start the server and wait for completion
|
|
// Given a fully constructed HttpService, start the server and wait for completion
|
|
def startServer(service: HttpRoutes[IO]) =
|
|
def startServer(service: HttpRoutes[IO]) =
|
|
- BlazeServerBuilder[IO]
|
|
|
|
|
|
+ BlazeServerBuilder[IO](blazeEc)
|
|
.bindHttp(8080, "0.0.0.0")
|
|
.bindHttp(8080, "0.0.0.0")
|
|
.withHttpApp(Router("/" -> service).orNotFound)
|
|
.withHttpApp(Router("/" -> service).orNotFound)
|
|
|
|
+ .withSocketKeepAlive(true)
|
|
.resource
|
|
.resource
|
|
|
|
|
|
// Entry point when starting service
|
|
// Entry point when starting service
|
|
override def run(args: List[String]): IO[ExitCode] =
|
|
override def run(args: List[String]): IO[ExitCode] =
|
|
(for {
|
|
(for {
|
|
- db <- openDatabase(
|
|
|
|
|
|
+ db <- makeDatabaseService(
|
|
args.headOption.getOrElse("localhost"),
|
|
args.headOption.getOrElse("localhost"),
|
|
- sys.env.get("DB_POOL_SIZE").map(_.toInt).getOrElse(256)
|
|
|
|
|
|
+ sys.env.get("DB_POOL_SIZE").map(_.toInt).getOrElse(64)
|
|
)
|
|
)
|
|
server <- startServer(service(db))
|
|
server <- startServer(service(db))
|
|
} yield server)
|
|
} yield server)
|
|
.use(_ => IO.never)
|
|
.use(_ => IO.never)
|
|
- .map(_ => ExitCode.Success)
|
|
|
|
}
|
|
}
|