|
@@ -1,106 +1,72 @@
|
|
package com.typesafe.akka.http.benchmark.datastore
|
|
package com.typesafe.akka.http.benchmark.datastore
|
|
|
|
|
|
|
|
+import java.sql.PreparedStatement
|
|
import java.sql.ResultSet
|
|
import java.sql.ResultSet
|
|
-import java.util.Properties
|
|
|
|
import java.util.concurrent.Executors
|
|
import java.util.concurrent.Executors
|
|
|
|
|
|
-import akka.actor.ActorSystem
|
|
|
|
-import com.typesafe.akka.http.benchmark.entity.{Fortune, World}
|
|
|
|
-import com.typesafe.config.Config
|
|
|
|
|
|
+import com.typesafe.akka.http.benchmark.Infrastructure
|
|
|
|
+import com.typesafe.akka.http.benchmark.entity.Fortune
|
|
|
|
+import com.typesafe.akka.http.benchmark.entity.World
|
|
import com.zaxxer.hikari._
|
|
import com.zaxxer.hikari._
|
|
-import scala.concurrent.{ExecutionContext, Future, Promise}
|
|
|
|
|
|
|
|
-class MySqlDataStore(components: {
|
|
|
|
- val system: ActorSystem
|
|
|
|
- val config: Config
|
|
|
|
-}) extends DataStore {
|
|
|
|
- val config = components.config.getConfig("akka.http.benchmark.mysql")
|
|
|
|
|
|
+import scala.collection.immutable
|
|
|
|
+import scala.concurrent.ExecutionContext
|
|
|
|
+import scala.concurrent.Future
|
|
|
|
|
|
- private val dataSource = new HikariDataSource {
|
|
|
|
|
|
+// TODO: use slick or similar here instead for more idiomatic usage
|
|
|
|
+trait MySqlDataStore extends DataStore { _: Infrastructure =>
|
|
|
|
+ lazy val config = appConfig.getConfig("akka.http.benchmark.mysql")
|
|
|
|
+
|
|
|
|
+ private lazy val dataSource = new HikariDataSource {
|
|
setJdbcUrl(config.getString("jdbc-url"))
|
|
setJdbcUrl(config.getString("jdbc-url"))
|
|
setUsername(config.getString("dbuser"))
|
|
setUsername(config.getString("dbuser"))
|
|
setPassword(config.getString("dbpass"))
|
|
setPassword(config.getString("dbpass"))
|
|
setMaximumPoolSize(config.getInt("connection-pool-size"))
|
|
setMaximumPoolSize(config.getInt("connection-pool-size"))
|
|
}
|
|
}
|
|
|
|
|
|
- private implicit val executionContext: ExecutionContext = {
|
|
|
|
|
|
+ private implicit lazy val dbExecutionContext: ExecutionContext = {
|
|
val size = config.getInt("thread-pool-size")
|
|
val size = config.getInt("thread-pool-size")
|
|
val threadPool = Executors.newFixedThreadPool(size)
|
|
val threadPool = Executors.newFixedThreadPool(size)
|
|
- new ExecutionContext {
|
|
|
|
- override def reportFailure(cause: Throwable): Unit = {
|
|
|
|
- components.system.log.error(cause, "exception in mysql thread pool")
|
|
|
|
- }
|
|
|
|
|
|
+ ExecutionContext.fromExecutor(threadPool)
|
|
|
|
+ }
|
|
|
|
|
|
- override def execute(runnable: Runnable): Unit = {
|
|
|
|
- threadPool.execute(runnable)
|
|
|
|
- }
|
|
|
|
|
|
+ def requireWorldById(id: Int): Future[World] = findWorldById(id).map(_.getOrElse(throw new RuntimeException(s"Element with id $id was not found.")))(executionContext)
|
|
|
|
+ override def findWorldById(id: Int): Future[Option[World]] =
|
|
|
|
+ withStatement("select id, randomNumber from World where id = ?") { stmt =>
|
|
|
|
+ stmt.setInt(1, id)
|
|
|
|
+ val rs = stmt.executeQuery()
|
|
|
|
+
|
|
|
|
+ if (rs.next()) Some(World(rs.getInt("id"), rs.getInt("randomNumber")))
|
|
|
|
+ else None
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- override def findOne(id: Int): Future[World] = {
|
|
|
|
- val select = "select id, randomNumber from World where id = ?"
|
|
|
|
- val promise = Promise[World]()
|
|
|
|
- executionContext.execute(new Runnable {
|
|
|
|
- override def run(): Unit = {
|
|
|
|
- val conn = dataSource.getConnection
|
|
|
|
- val stmt = conn.prepareStatement(select)
|
|
|
|
- stmt.setInt(1, id)
|
|
|
|
- val rs = stmt.executeQuery()
|
|
|
|
- val world = rs.next() match {
|
|
|
|
- case true =>
|
|
|
|
- World(rs.getInt("id"), rs.getInt("randomNumber"))
|
|
|
|
- }
|
|
|
|
- rs.close()
|
|
|
|
- stmt.close()
|
|
|
|
- conn.close()
|
|
|
|
- promise.success(world)
|
|
|
|
- }
|
|
|
|
- })
|
|
|
|
- promise.future
|
|
|
|
- }
|
|
|
|
|
|
+ override def updateWorld(world: World): Future[Boolean] =
|
|
|
|
+ withStatement("update World set randomNumber = ? where id = ?") { stmt =>
|
|
|
|
+ stmt.setInt(1, world.randomNumber)
|
|
|
|
+ stmt.setInt(2, world.id)
|
|
|
|
+ stmt.executeUpdate() > 0
|
|
|
|
+ }
|
|
|
|
|
|
- override def updateOne(id: Int, randomNumber: Int): Future[Boolean] = {
|
|
|
|
- val update = "update World set randomNumber = ? where id = ?"
|
|
|
|
- val promise = Promise[Boolean]()
|
|
|
|
- executionContext.execute(new Runnable {
|
|
|
|
- override def run(): Unit = {
|
|
|
|
- val conn = dataSource.getConnection
|
|
|
|
- val stmt = conn.prepareStatement(update)
|
|
|
|
- stmt.setInt(1, randomNumber)
|
|
|
|
- stmt.setInt(2, id)
|
|
|
|
- val n = stmt.executeUpdate()
|
|
|
|
- stmt.close()
|
|
|
|
- conn.close()
|
|
|
|
- promise.success(n > 0)
|
|
|
|
- }
|
|
|
|
- })
|
|
|
|
- promise.future
|
|
|
|
- }
|
|
|
|
|
|
+ override def getFortunes: Future[immutable.Seq[Fortune]] =
|
|
|
|
+ withStatement("select id, message from Fortune") { stmt =>
|
|
|
|
+ val rs = stmt.executeQuery()
|
|
|
|
+ (Fortune(0, "Additional fortune added at request time.") +: rs.map(r => Fortune(r.getInt("id"), r.getString("message"))).toVector)
|
|
|
|
+ .sortBy(_.message)
|
|
|
|
+ }
|
|
|
|
|
|
- override def getFortunes: Future[List[Fortune]] = {
|
|
|
|
- val select = "select id, message from Fortune"
|
|
|
|
- val promise = Promise[List[Fortune]]()
|
|
|
|
- executionContext.execute(new Runnable {
|
|
|
|
- override def run(): Unit = {
|
|
|
|
- val conn = dataSource.getConnection
|
|
|
|
- val stmt = conn.prepareStatement(select)
|
|
|
|
- val rs = stmt.executeQuery()
|
|
|
|
- val fortunes = {
|
|
|
|
- rs.map(r => Fortune(r.getInt("id"), r.getString("message"))).toList :+ Fortune(0, "Additional fortune added at request time.")
|
|
|
|
- }.sortBy(_.message)
|
|
|
|
- rs.close()
|
|
|
|
|
|
+ private def withStatement[T](statement: String)(f: PreparedStatement => T): Future[T] =
|
|
|
|
+ Future {
|
|
|
|
+ val conn = dataSource.getConnection
|
|
|
|
+ val stmt = conn.prepareStatement(statement)
|
|
|
|
+ try f(stmt)
|
|
|
|
+ finally {
|
|
stmt.close()
|
|
stmt.close()
|
|
conn.close()
|
|
conn.close()
|
|
- promise.success(fortunes)
|
|
|
|
}
|
|
}
|
|
- })
|
|
|
|
- promise.future
|
|
|
|
- }
|
|
|
|
|
|
+ }(dbExecutionContext)
|
|
|
|
|
|
implicit class RsIterator(rs: ResultSet) extends Iterator[ResultSet] {
|
|
implicit class RsIterator(rs: ResultSet) extends Iterator[ResultSet] {
|
|
def hasNext: Boolean = rs.next()
|
|
def hasNext: Boolean = rs.next()
|
|
-
|
|
|
|
def next(): ResultSet = rs
|
|
def next(): ResultSet = rs
|
|
}
|
|
}
|
|
-
|
|
|
|
}
|
|
}
|