|
@@ -5,7 +5,6 @@ import com.github.plokhotnyuk.jsoniter_scala.macros._
|
|
|
|
|
|
import cats.effect._
|
|
|
import cats.implicits._
|
|
|
-import fs2.{StreamApp, Stream}
|
|
|
|
|
|
import org.http4s._
|
|
|
import org.http4s.headers.`Content-Type`
|
|
@@ -14,13 +13,12 @@ import org.http4s.server.blaze.BlazeBuilder
|
|
|
import org.http4s.twirl._
|
|
|
|
|
|
import doobie.hikari.HikariTransactor
|
|
|
+import doobie.util.ExecutionContexts
|
|
|
import doobie._
|
|
|
import doobie.implicits._
|
|
|
|
|
|
import java.util.concurrent.ThreadLocalRandom
|
|
|
|
|
|
-import scala.concurrent.ExecutionContext.Implicits.global
|
|
|
-
|
|
|
case class Message(message: String)
|
|
|
case class World(id: Int, randomNumber: Int)
|
|
|
case class Fortune(id: Int, message: String)
|
|
@@ -34,72 +32,71 @@ object Queries {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-object WebServer extends StreamApp[IO] with Http4sDsl[IO] {
|
|
|
-
|
|
|
+object JsonSupport {
|
|
|
implicit val messageCodec: JsonValueCodec[Message] = JsonCodecMaker.make[Message](CodecMakerConfig())
|
|
|
implicit val worldCodec: JsonValueCodec[World] = JsonCodecMaker.make[World](CodecMakerConfig())
|
|
|
implicit val worldListCodec: JsonValueCodec[List[World]] = JsonCodecMaker.make[List[World]](CodecMakerConfig())
|
|
|
implicit val fortuneCodec: JsonValueCodec[Fortune] = JsonCodecMaker.make[Fortune](CodecMakerConfig())
|
|
|
|
|
|
-
|
|
|
implicit def jsonEncoder[T: JsonValueCodec]: EntityEncoder[IO, T] =
|
|
|
EntityEncoder
|
|
|
.byteArrayEncoder[IO]
|
|
|
.contramap((data: T) => writeToArray(data))
|
|
|
- .withContentType(`Content-Type`(MediaType.`application/json`, Some(Charset.`UTF-8`)))
|
|
|
+ .withContentType(`Content-Type`(MediaType.application.json, Some(Charset.`UTF-8`)))
|
|
|
+}
|
|
|
|
|
|
- def addHeaders(service: HttpService[IO]): HttpService[IO] =
|
|
|
- cats.data.Kleisli { req: Request[IO] =>
|
|
|
- service.run(req).map(_.putHeaders(Header("Server", req.serverAddr)))
|
|
|
- }
|
|
|
+object WebServer extends IOApp with Http4sDsl[IO] {
|
|
|
+ import JsonSupport._
|
|
|
|
|
|
- def connectDatabase(host: String, poolSize: Int): IO[HikariTransactor[IO]] = {
|
|
|
- val driver = "org.postgresql.Driver"
|
|
|
- val url = s"jdbc:postgresql://$host/hello_world"
|
|
|
- val user = "benchmarkdbuser"
|
|
|
- val pass = "benchmarkdbpass"
|
|
|
- val maxPoolSize = poolSize
|
|
|
- val minIdle = poolSize
|
|
|
+ def openDatabase(host: String, poolSize: Int): Resource[IO, HikariTransactor[IO]] =
|
|
|
for {
|
|
|
- xa <- HikariTransactor.newHikariTransactor[IO](driver, url, user, pass)
|
|
|
- _ <- xa.configure(ds => IO {
|
|
|
- ds.setMaximumPoolSize(maxPoolSize)
|
|
|
- ds.setMinimumIdle(minIdle)
|
|
|
- })
|
|
|
+ ce <- ExecutionContexts.fixedThreadPool[IO](32) // our connect EC
|
|
|
+ te <- ExecutionContexts.cachedThreadPool[IO] // our transaction TE
|
|
|
+ xa <- HikariTransactor.newHikariTransactor[IO](
|
|
|
+ "org.postgresql.Driver",
|
|
|
+ s"jdbc:postgresql://$host/hello_world",
|
|
|
+ "benchmarkdbuser",
|
|
|
+ "benchmarkdbpass",
|
|
|
+ ce,
|
|
|
+ te
|
|
|
+ )
|
|
|
+ _ <- Resource.liftF(
|
|
|
+ xa.configure(ds => IO {
|
|
|
+ ds.setMaximumPoolSize(poolSize)
|
|
|
+ ds.setMinimumIdle(poolSize)
|
|
|
+ })
|
|
|
+ )
|
|
|
} yield xa
|
|
|
- }
|
|
|
|
|
|
// Provide a random number between 1 and 10000 (inclusive)
|
|
|
val randomWorldId: IO[Int] = IO(ThreadLocalRandom.current.nextInt(1, 10001))
|
|
|
|
|
|
// Update the randomNumber field with a random number
|
|
|
- def updateRandomNumber(world: World): IO[World] = {
|
|
|
- randomWorldId map { id =>
|
|
|
+ def updateRandomNumber(world: World): IO[World] =
|
|
|
+ randomWorldId.map(id =>
|
|
|
world.copy(randomNumber = id)
|
|
|
- }
|
|
|
- }
|
|
|
+ )
|
|
|
|
|
|
// Select a World object from the database by ID
|
|
|
- def selectWorld(xa: Transactor[IO], id: Int): IO[World] = {
|
|
|
- val query = sql"select id, randomNumber from World where id = $id".query[World]
|
|
|
- query.unique.transact(xa)
|
|
|
- }
|
|
|
+ def selectWorld(xa: Transactor[IO], id: Int): IO[World] =
|
|
|
+ sql"select id, randomNumber from World where id = $id"
|
|
|
+ .query[World]
|
|
|
+ .unique
|
|
|
+ .transact(xa)
|
|
|
|
|
|
// Select a random World object from the database
|
|
|
- def selectRandomWorld(xa: Transactor[IO]): IO[World] = {
|
|
|
+ def selectRandomWorld(xa: Transactor[IO]): IO[World] =
|
|
|
randomWorldId flatMap { id =>
|
|
|
selectWorld(xa, id)
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
// Select a specified number of random World objects from the database
|
|
|
def getWorlds(xa: Transactor[IO], numQueries: Int): IO[List[World]] =
|
|
|
(0 until numQueries).toList.traverse(_ => selectRandomWorld(xa))
|
|
|
|
|
|
// Update the randomNumber field with a new random number, for a list of World objects
|
|
|
- def getNewWorlds(worlds: List[World]): IO[List[World]] = {
|
|
|
+ def getNewWorlds(worlds: List[World]): IO[List[World]] =
|
|
|
worlds.traverse(updateRandomNumber)
|
|
|
- }
|
|
|
|
|
|
// Update the randomNumber column in the database for a specified set of World objects,
|
|
|
// this uses a batch update SQL call.
|
|
@@ -121,52 +118,63 @@ object WebServer extends StreamApp[IO] with Http4sDsl[IO] {
|
|
|
(newFortune :: old).sortBy(_.message)
|
|
|
}
|
|
|
|
|
|
+ // Add Server header container server address
|
|
|
+ def addServerHeader(service: HttpRoutes[IO]): HttpRoutes[IO] =
|
|
|
+ cats.data.Kleisli { req: Request[IO] =>
|
|
|
+ service.run(req).map(_.putHeaders(Header("Server", req.serverAddr)))
|
|
|
+ }
|
|
|
+
|
|
|
// HTTP service definition
|
|
|
- def service(xa: Transactor[IO]) = HttpService[IO] {
|
|
|
- case GET -> Root / "json" =>
|
|
|
- Ok(Message("Hello, World!"))
|
|
|
-
|
|
|
- case GET -> Root / "db" =>
|
|
|
- Ok(selectRandomWorld(xa))
|
|
|
-
|
|
|
- case GET -> Root / "queries" :? Queries(numQueries) =>
|
|
|
- Ok(getWorlds(xa, numQueries))
|
|
|
-
|
|
|
- case GET -> Root / "fortunes" =>
|
|
|
- val page = for {
|
|
|
- oldFortunes <- getFortunes(xa)
|
|
|
- newFortunes = getSortedFortunes(oldFortunes)
|
|
|
- } yield html.index(newFortunes)
|
|
|
- Ok(page)
|
|
|
-
|
|
|
- case GET -> Root / "updates" :? Queries(numQueries) =>
|
|
|
- val updated = for {
|
|
|
- worlds <- getWorlds(xa, numQueries)
|
|
|
- newWorlds <- getNewWorlds(worlds)
|
|
|
- _ <- updateWorlds(xa, newWorlds)
|
|
|
- } yield newWorlds
|
|
|
- Ok(updated)
|
|
|
-
|
|
|
- case GET -> Root / "plaintext" =>
|
|
|
- Ok("Hello, World!")
|
|
|
- }
|
|
|
+ def service(xa: Transactor[IO]) =
|
|
|
+ addServerHeader(
|
|
|
+ HttpRoutes.of[IO] {
|
|
|
+ case GET -> Root / "plaintext" =>
|
|
|
+ Ok("Hello, World!")
|
|
|
+
|
|
|
+ case GET -> Root / "json" =>
|
|
|
+ Ok(Message("Hello, World!"))
|
|
|
+
|
|
|
+ case GET -> Root / "db" =>
|
|
|
+ Ok(selectRandomWorld(xa))
|
|
|
+
|
|
|
+ case GET -> Root / "queries" :? Queries(numQueries) =>
|
|
|
+ Ok(getWorlds(xa, numQueries))
|
|
|
+
|
|
|
+ case GET -> Root / "fortunes" =>
|
|
|
+ Ok(
|
|
|
+ for {
|
|
|
+ oldFortunes <- getFortunes(xa)
|
|
|
+ newFortunes = getSortedFortunes(oldFortunes)
|
|
|
+ } yield html.index(newFortunes)
|
|
|
+ )
|
|
|
+
|
|
|
+ case GET -> Root / "updates" :? Queries(numQueries) =>
|
|
|
+ Ok(
|
|
|
+ for {
|
|
|
+ worlds <- getWorlds(xa, numQueries)
|
|
|
+ newWorlds <- getNewWorlds(worlds)
|
|
|
+ _ <- updateWorlds(xa, newWorlds)
|
|
|
+ } yield newWorlds
|
|
|
+ )
|
|
|
+ }
|
|
|
+ )
|
|
|
|
|
|
// Given a fully constructed HttpService, start the server and wait for completion
|
|
|
- def startServer(service: HttpService[IO]) = {
|
|
|
+ def startServer(service: HttpRoutes[IO]) =
|
|
|
BlazeBuilder[IO]
|
|
|
.bindHttp(8080, "0.0.0.0")
|
|
|
.mountService(service, "/")
|
|
|
- .serve
|
|
|
- }
|
|
|
+ .resource
|
|
|
|
|
|
// Entry point when starting service
|
|
|
- override def stream(args: List[String], requestShutdown: IO[Unit]) = {
|
|
|
- for {
|
|
|
- xa <- Stream.eval(connectDatabase(
|
|
|
+ override def run(args: List[String]): IO[ExitCode] =
|
|
|
+ (for {
|
|
|
+ db <- openDatabase(
|
|
|
args.headOption.getOrElse("localhost"),
|
|
|
sys.env.get("DB_POOL_SIZE").map(_.toInt).getOrElse(256)
|
|
|
- ))
|
|
|
- exitCode <- startServer(addHeaders(service(xa)))
|
|
|
- } yield exitCode
|
|
|
- }
|
|
|
+ )
|
|
|
+ server <- startServer(service(db))
|
|
|
+ } yield server)
|
|
|
+ .use(_ => IO.never)
|
|
|
+ .map(_ => ExitCode.Success)
|
|
|
}
|