App.scala 10 KB


  1. package vertx
  2. import java.io.{ByteArrayOutputStream, File, IOException}
  3. import java.nio.file.Files
  4. import java.time.ZonedDateTime
  5. import java.time.format.DateTimeFormatter
  6. import java.util.concurrent.ThreadLocalRandom
  7. import com.typesafe.scalalogging.Logger
  8. import io.vertx.core.buffer.Buffer
  9. import io.vertx.core.http.HttpHeaders
  10. import io.vertx.core.json.{JsonArray, JsonObject}
  11. import io.vertx.core.{AsyncResult, Handler, VertxOptions => JVertxOptions}
  12. import io.vertx.lang.scala.{ScalaVerticle, VertxExecutionContext}
  13. import io.vertx.pgclient._
  14. import io.vertx.scala.core.http.{HttpServer, HttpServerRequest, HttpServerResponse}
  15. import io.vertx.scala.core.{VertxOptions, _}
  16. import io.vertx.scala.ext.web.Router
  17. import io.vertx.sqlclient._
  18. import vertx.model.{Fortune, Message, World}
  19. import scala.collection.JavaConverters._
  20. import scala.concurrent.{Future, Promise}
  21. import scala.util.{Failure, Sorting, Success, Try}
  22. case class Header(name: CharSequence, value: String)
  23. class App extends ScalaVerticle {
  24. import App._
  25. private val HELLO_WORLD = "Hello, world!"
  26. private val HELLO_WORLD_BUFFER = Buffer.buffer(HELLO_WORLD, "UTF-8")
  27. private val SERVER = "vert.x"
  28. private val contentTypeJson = Header(HttpHeaders.CONTENT_TYPE, "application/json")
  29. private val contentTypeHtml = Header(HttpHeaders.CONTENT_TYPE, "text/html; charset=UTF-8")
  30. private val contentTypePlainText = Header(HttpHeaders.CONTENT_TYPE, "text/plain")
  31. private var dateString: String = ""
  32. private var server: HttpServer = _
  33. private var client: PgConnection = _
  34. private def refreshDateHeader(): Unit = dateString = App.createDateHeader()
  35. override def startFuture(): Future[_] = {
  36. refreshDateHeader()
  37. vertx.setPeriodic(1000, (_: Long) => refreshDateHeader())
  38. val pgConnectOptions = new PgConnectOptions()
  39. .setDatabase(config.getString("database"))
  40. .setHost(config.getString("host"))
  41. .setPort(config.getInteger("port", 5432))
  42. .setUser(config.getString("username"))
  43. .setPassword(config.getString("password"))
  44. .setCachePreparedStatements(true)
  45. .setPipeliningLimit(100000)
  46. val jVertx = vertx.asJava.asInstanceOf[io.vertx.core.Vertx]
  47. val pgConnectionPromise = Promise[Unit]
  48. PgConnection.connect(
  49. jVertx,
  50. pgConnectOptions,
  51. (ar => {
  52. client = ar.result()
  53. pgConnectionPromise.success()
  54. }): Handler[AsyncResult[PgConnection]]
  55. )
  56. val router = Router.router(vertx)
  57. router.get("/plaintext").handler(context => handlePlainText(context.request()))
  58. router.get("/json").handler(context => handleJson(context.request()))
  59. router.get("/db").handler(context => handleDb(context.request()))
  60. router.get("/queries").handler(context => handleQueries(context.request()))
  61. router.get("/updates").handler(context => handleUpdates(context.request()))
  62. router.get("/fortunes").handler(context => handleFortunes(context.request()))
  63. val port = 8080
  64. server = vertx.createHttpServer()
  65. val httpServerPromise = Promise[Unit]
  66. server
  67. .requestHandler(router.accept)
  68. .listen(port, (_ => httpServerPromise.success()): Handler[AsyncResult[HttpServer]])
  69. pgConnectionPromise.future.flatMap(_ => httpServerPromise.future)
  70. }
  71. override def stop(): Unit = Option(server).foreach(_.close())
  72. private def responseWithHeaders(response: HttpServerResponse, contentType: Header) =
  73. response
  74. .putHeader(contentType.name.toString, contentType.value)
  75. .putHeader(HttpHeaders.SERVER.toString, SERVER)
  76. .putHeader(HttpHeaders.DATE.toString, dateString)
  77. private def handlePlainText(request: HttpServerRequest): Unit = {
  78. responseWithHeaders(request.response, contentTypePlainText).end(HELLO_WORLD_BUFFER)
  79. }
  80. private def handleJson(request: HttpServerRequest): Unit =
  81. responseWithHeaders(request.response, contentTypeJson)
  82. .end(Message("Hello, World!").toBuffer)
  83. private def handleDb(request: HttpServerRequest): Unit =
  84. client
  85. .preparedQuery("SELECT id, randomnumber from WORLD where id=$1")
  86. .execute(
  87. Tuple.of(App.randomWorld(), Nil: _*),
  88. (ar: AsyncResult[RowSet[Row]]) => {
  89. if (ar.succeeded) {
  90. val resultSet = ar.result.iterator
  91. if (!resultSet.hasNext) {
  92. request.response.setStatusCode(404).end()
  93. } else {
  94. val row = resultSet.next
  95. responseWithHeaders(request.response, contentTypeJson)
  96. .end(World(row.getInteger(0), row.getInteger(1)).encode())
  97. }
  98. } else {
  99. sendError(request, ar.cause, "Failed to handle Db request")
  100. }
  101. }
  102. )
  103. private def handleQueries(request: HttpServerRequest): Unit = {
  104. val queries = App.getQueries(request)
  105. val worlds = new JsonArray
  106. var i = 0
  107. var failed = false
  108. while (i < queries) {
  109. client
  110. .preparedQuery("SELECT id, randomnumber from WORLD where id=$1")
  111. .execute(
  112. Tuple.of(App.randomWorld(), Nil: _*),
  113. (ar: AsyncResult[RowSet[Row]]) => {
  114. if (!failed) {
  115. if (ar.failed) {
  116. failed = true
  117. sendError(request, ar.cause, "Failed to handle Queries request")
  118. return
  119. }
  120. // we need a final reference
  121. val row = ar.result.iterator.next
  122. worlds.add(World(row.getInteger(0), row.getInteger(1)))
  123. if (worlds.size == queries)
  124. responseWithHeaders(request.response, contentTypeJson)
  125. .end(worlds.encode)
  126. }
  127. }
  128. )
  129. i += 1
  130. }
  131. }
  132. private def handleUpdates(request: HttpServerRequest): Unit = {
  133. def handleUpdates(conn: SqlConnection, worlds: Array[World]): Unit = {
  134. Sorting.quickSort(worlds)
  135. val batch = worlds.map(world => Tuple.of(world.randomNumber, world.id)).toList.asJava
  136. conn
  137. .preparedQuery("UPDATE world SET randomnumber=$1 WHERE id=$2")
  138. .executeBatch(
  139. batch,
  140. (ar: AsyncResult[RowSet[Row]]) => {
  141. if (ar.failed) {
  142. sendError(request, ar.cause, "handleUpdates: failed to update DB")
  143. return
  144. }
  145. responseWithHeaders(request.response, contentTypeJson)
  146. .end(new JsonArray(worlds.toList.asJava).toBuffer)
  147. }
  148. )
  149. }
  150. val queries = App.getQueries(request)
  151. val worlds = new Array[World](queries)
  152. var failed = false
  153. var queryCount = 0
  154. var i = 0
  155. while (i < worlds.length) {
  156. val id = App.randomWorld()
  157. val index = i
  158. client
  159. .preparedQuery("SELECT id, randomnumber from WORLD where id=$1")
  160. .execute(
  161. Tuple.of(id, Nil: _*),
  162. (ar2: AsyncResult[RowSet[Row]]) => {
  163. if (!failed) {
  164. if (ar2.failed) {
  165. failed = true
  166. sendError(request, ar2.cause, "handleUpdates: failed to read DB")
  167. return
  168. }
  169. worlds(index) = World(ar2.result.iterator.next.getInteger(0), App.randomWorld())
  170. queryCount += 1
  171. if (queryCount == worlds.length) handleUpdates(client, worlds)
  172. }
  173. }
  174. )
  175. i += 1
  176. }
  177. }
  178. private def handleFortunes(request: HttpServerRequest): Unit =
  179. client
  180. .preparedQuery("SELECT id, message from FORTUNE")
  181. .execute(
  182. (ar: AsyncResult[RowSet[Row]]) => {
  183. val response = request.response
  184. if (ar.succeeded) {
  185. val resultSet = ar.result.iterator
  186. if (!resultSet.hasNext) {
  187. response.setStatusCode(404).end("No results")
  188. return
  189. }
  190. val fortunes = (resultSet.asScala
  191. .map(row => Fortune(row.getInteger(0), row.getString(1))) ++
  192. Seq(Fortune(0, "Additional fortune added at request time."))).toArray
  193. Sorting.quickSort(fortunes)
  194. responseWithHeaders(request.response, contentTypeHtml)
  195. .end(html.fortune(fortunes).body)
  196. } else {
  197. sendError(request, ar.cause, "handleFortunes failed to update DB")
  198. }
  199. }
  200. )
  201. }
  202. object App {
  203. val logger: Logger = Logger[App]
  204. val defaultConfigPath = "src/main/conf/config.json"
  205. def main(args: Array[String]): Unit = {
  206. val config = new JsonObject(Files.readString(new File(if(args.length < 1) defaultConfigPath else args(0)).toPath))
  207. val vertx = Vertx.vertx(VertxOptions().setPreferNativeTransport(true))
  208. printConfig(vertx)
  209. vertx.exceptionHandler(_.printStackTrace())
  210. implicit val executionContext: VertxExecutionContext = VertxExecutionContext(vertx.getOrCreateContext())
  211. vertx
  212. .deployVerticleFuture(
  213. ScalaVerticle.nameForVerticle[App],
  214. DeploymentOptions().setInstances(JVertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE).setConfig(config)
  215. )
  216. .onComplete {
  217. case _: Success[String] => logger.info("Server listening on port 8080")
  218. case f: Failure[String] => logger.error("Unable to start application", f.exception)
  219. }
  220. }
  221. def createDateHeader(): String = DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now)
  222. def randomWorld(): Int = 1 + ThreadLocalRandom.current.nextInt(10000)
  223. def getQueries(request: HttpServerRequest): Int =
  224. request
  225. .getParam("queries")
  226. .flatMap(param => Try(param.toInt).toOption)
  227. .map(number => Math.min(500, Math.max(1, number)))
  228. .getOrElse(1)
  229. private def printConfig(vertx: Vertx): Unit = {
  230. val version = Try {
  231. def resourceAsStream(resource: String) = classOf[Vertx].getClassLoader.getResourceAsStream(resource)
  232. val in =
  233. Option(resourceAsStream("META-INF/vertx/vertx-version.txt")).getOrElse(resourceAsStream("vertx-version.txt"))
  234. val out = new ByteArrayOutputStream
  235. val buffer = new Array[Byte](256)
  236. Iterator
  237. .continually(in.read(buffer))
  238. .takeWhile(_ != -1)
  239. .foreach(read => out.write(buffer, 0, read))
  240. out.toString
  241. }.recover {
  242. case e: IOException =>
  243. logger.error("Could not read Vert.x version", e)
  244. "unknown"
  245. }.get
  246. logger.info("Vert.x: {}", version)
  247. logger.info("Event Loop Size: {}", JVertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE)
  248. logger.info("Native transport: {}", vertx.isNativeTransportEnabled)
  249. }
  250. def sendError(request: HttpServerRequest, err: Throwable, msg: String = ""): Unit = {
  251. App.logger.error(msg, err)
  252. request.response.setStatusCode(500).end(err.getMessage)
  253. }
  254. }