|
@@ -1,137 +1,77 @@
|
|
|
package com.falmarri.finagle
|
|
|
|
|
|
-import com.twitter.finagle.http.{ Request, Response, RichHttp, Http }
|
|
|
-import com.twitter.finagle.builder.{ Server, ServerBuilder }
|
|
|
-import java.net.InetSocketAddress
|
|
|
-import com.twitter.finagle.http.service.RoutingService
|
|
|
-import com.twitter.finagle.Service
|
|
|
-import com.twitter.util.Future
|
|
|
-import org.jboss.netty.buffer.ChannelBuffers.copiedBuffer
|
|
|
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse
|
|
|
-import org.jboss.netty.handler.codec.http.HttpResponseStatus
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper
|
|
|
import com.fasterxml.jackson.module.scala.DefaultScalaModule
|
|
|
-import scala.language.experimental.macros
|
|
|
-//import scala.reflect.macros.Context
|
|
|
-import java.io.StringWriter
|
|
|
-import org.jboss.netty.util.CharsetUtil.UTF_8
|
|
|
-import scala.slick.driver.MySQLDriver.simple._
|
|
|
-import scala.util.Random
|
|
|
-import org.apache.commons.dbcp.BasicDataSource
|
|
|
-import javax.sql.DataSource
|
|
|
-import com.twitter.util.FuturePool
|
|
|
+import com.twitter.finagle.builder.ClientBuilder
|
|
|
+import com.twitter.finagle.exp.mysql.{Client, IntValue, MySQL, Row}
|
|
|
+import com.twitter.finagle.http.{HttpMuxer, Request, Response}
|
|
|
+import com.twitter.finagle.{Http, Service}
|
|
|
+import com.twitter.util.{Future, FuturePool}
|
|
|
+import java.net.InetSocketAddress
|
|
|
import java.util.concurrent.Executors
|
|
|
+import org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer
|
|
|
+import scala.util.Random
|
|
|
|
|
|
-case class World(id: Int, randomNumber: Int)
|
|
|
+object FinagleBenchmark extends App {
|
|
|
+ val maxConnections = 256
|
|
|
|
|
|
-object Worlds extends Table[World]("World") {
|
|
|
- def id = column[Int]("id", O.PrimaryKey)
|
|
|
- def randomNumber = column[Int]("randomNumber")
|
|
|
- def * = id ~ randomNumber <> (World, World.unapply _)
|
|
|
-
|
|
|
-}
|
|
|
+ val mysql = new Client(ClientBuilder()
|
|
|
+ .codec(new MySQL("benchmarkdbuser", "benchmarkdbpass", Some("hello_world")))
|
|
|
+ .hosts(new InetSocketAddress(System.getProperty("db.host", "localhost"), 3306))
|
|
|
+ .hostConnectionLimit(maxConnections)
|
|
|
+ .buildFactory())
|
|
|
|
|
|
+ val pool = FuturePool(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2))
|
|
|
|
|
|
-object FinagleBenchmark extends App {
|
|
|
+ val mapper = new ObjectMapper()
|
|
|
+ mapper.registerModule(DefaultScalaModule)
|
|
|
|
|
|
-// def serialize[T](value: T): String = macro serializeImpl[T]
|
|
|
-//
|
|
|
-// def serializeImpl[T](c: Context)(value: c.Expr[T]): c.Expr[String] = {
|
|
|
-// import java.io.StringWriter
|
|
|
-// import c.universe._
|
|
|
-// c.Expr[String](reify {
|
|
|
-// val writer = new StringWriter()
|
|
|
-// mapper.writeValue(writer, value.splice)
|
|
|
-// writer.toString()
|
|
|
-// }.tree)
|
|
|
-// }
|
|
|
- val dataSource: DataSource = {
|
|
|
- val ds = new BasicDataSource
|
|
|
- ds.setDriverClassName("com.mysql.jdbc.Driver")
|
|
|
- ds.setUsername("benchmarkdbuser")
|
|
|
- ds.setPassword("benchmarkdbpass")
|
|
|
- ds.setMaxActive(256);
|
|
|
- ds.setMaxIdle(10);
|
|
|
- ds.setInitialSize(20);
|
|
|
- //ds.setValidationQuery("SELECT 1 FROM INFORMATION_SCHEMA.SYSTEM_USERS")
|
|
|
- //new java.io.File("target").mkdirs // ensure that folder for database exists
|
|
|
- ds.setUrl("jdbc:mysql://" + System.getProperty("db.host", "localhost") + ":3306/hello_world?jdbcCompliantTruncation=false&elideSetAutoCommits=true&useLocalSessionState=true&cachePrepStmts=true&cacheCallableStmts=true&alwaysSendSetIsolation=false&prepStmtCacheSize=4096&cacheServerConfiguration=true&prepStmtCacheSqlLimit=2048&zeroDateTimeBehavior=convertToNull&traceProtocol=false&useUnbufferedInput=false&useReadAheadInput=false&maintainTimeStats=false&useServerPrepStmts&cacheRSMetadata=true")
|
|
|
- ds
|
|
|
- }
|
|
|
-
|
|
|
- val database = Database.forDataSource(dataSource)
|
|
|
-
|
|
|
- def serialize(value: Any) = {
|
|
|
- val writer = new StringWriter()
|
|
|
- mapper.writeValue(writer, value)
|
|
|
- writer.toString()
|
|
|
+ def getValue(row: Row, name: String): Any = row(name) match {
|
|
|
+ case Some(IntValue(v)) => v
|
|
|
+ case _ => throw new Exception("couldn't get value for %s".format(name))
|
|
|
}
|
|
|
|
|
|
- val mapper = new ObjectMapper()
|
|
|
- mapper.registerModule(DefaultScalaModule)
|
|
|
+ def rowToMap(row: Row) = {
|
|
|
+ Map(
|
|
|
+ "id" -> getValue(row, "id"),
|
|
|
+ "randomNumber" -> getValue(row, "randomNumber")
|
|
|
+ )
|
|
|
+ }
|
|
|
|
|
|
- val json = new Service[Request, Response] {
|
|
|
- def apply(req: Request): Future[Response] ={
|
|
|
- val resp = Response()
|
|
|
- resp.setContent(copiedBuffer(serialize(Map("message" -> "Hello, World!")), UTF_8))
|
|
|
- resp.setContentTypeJson
|
|
|
- Future.value(resp)
|
|
|
- }
|
|
|
+ def serialize(result: Any): Array[Byte] =
|
|
|
+ mapper.writeValueAsBytes(result)
|
|
|
|
|
|
+ def createResponse(req: Request, bytes: Array[Byte]) = {
|
|
|
+ val body = wrappedBuffer(bytes)
|
|
|
+ val resp = req.response
|
|
|
+ resp.setContentTypeJson
|
|
|
+ resp.setContent(body)
|
|
|
+ resp.contentLength = body.readableBytes
|
|
|
+ resp
|
|
|
}
|
|
|
-
|
|
|
- val diskIoFuturePool = FuturePool(Executors.newFixedThreadPool(8))
|
|
|
-
|
|
|
|
|
|
- val db = new Service[Request, Response] {
|
|
|
- def apply(req: Request): Future[Response] = {
|
|
|
- val n = req.params.getIntOrElse("queries", 1)
|
|
|
- val resp = Response()
|
|
|
- database withSession {implicit session: Session =>
|
|
|
- val rand = new Random()
|
|
|
- val q = (0 until n).map { _ =>
|
|
|
- Query(Worlds).where(_.id > rand.nextInt(10000)).first
|
|
|
- }
|
|
|
- resp.setContent(copiedBuffer(serialize(q), UTF_8))
|
|
|
- resp.setContentTypeJson
|
|
|
- Future.value(resp)
|
|
|
- }
|
|
|
+ HttpMuxer.addRichHandler("/json", new Service[Request, Response] {
|
|
|
+ def apply(req: Request): Future[Response] = pool {
|
|
|
+ createResponse(req, serialize(Map("message" -> "Hello, World!")))
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- val poolingdb = new Service[Request, Response] {
|
|
|
+ })
|
|
|
+
|
|
|
+ HttpMuxer.addRichHandler("/db", new Service[Request, Response] {
|
|
|
+ val rand = new Random()
|
|
|
+ val sql = "SELECT * FROM world WHERE id = "
|
|
|
+
|
|
|
def apply(req: Request): Future[Response] = {
|
|
|
val n = req.params.getIntOrElse("queries", 1)
|
|
|
- val query = {
|
|
|
- val resp = Response()
|
|
|
- database withSession {implicit session: Session =>
|
|
|
- val rand = new Random()
|
|
|
- val q = (0 until n).map { _ =>
|
|
|
- Query(Worlds).where(_.id > rand.nextInt(10000)).first
|
|
|
- }
|
|
|
- resp.setContent(copiedBuffer(serialize(q), UTF_8))
|
|
|
- resp.setContentTypeJson
|
|
|
- resp
|
|
|
- }
|
|
|
- }
|
|
|
- diskIoFuturePool(query)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- val service =
|
|
|
- RoutingService byPath {
|
|
|
|
|
|
- case "/json" => json
|
|
|
- case "/db" => db
|
|
|
- case "/pooling" => poolingdb
|
|
|
+ val qs = (0 until n) map { i =>
|
|
|
+ mysql.select(sql + rand.nextInt(10000))(rowToMap)
|
|
|
+ }
|
|
|
|
|
|
+ Future.collect(qs) map { results =>
|
|
|
+ createResponse(req, serialize(results.flatten))
|
|
|
+ }
|
|
|
}
|
|
|
+ })
|
|
|
|
|
|
- val server: Server = ServerBuilder()
|
|
|
- .codec(RichHttp[Request](Http()))
|
|
|
- .bindTo(new InetSocketAddress(8080))
|
|
|
- .name("finagle")
|
|
|
- .build(service)
|
|
|
-
|
|
|
+ Http.serve(new InetSocketAddress(8080), HttpMuxer)
|
|
|
}
|