|
@@ -7,18 +7,20 @@ import java.time.format.DateTimeFormatter
|
|
import java.util.concurrent.ThreadLocalRandom
|
|
import java.util.concurrent.ThreadLocalRandom
|
|
|
|
|
|
import com.typesafe.scalalogging.Logger
|
|
import com.typesafe.scalalogging.Logger
|
|
-import io.reactiverse.pgclient._
|
|
|
|
import io.vertx.core.buffer.Buffer
|
|
import io.vertx.core.buffer.Buffer
|
|
import io.vertx.core.http.HttpHeaders
|
|
import io.vertx.core.http.HttpHeaders
|
|
import io.vertx.core.json.{JsonArray, JsonObject}
|
|
import io.vertx.core.json.{JsonArray, JsonObject}
|
|
-import io.vertx.core.{AsyncResult, VertxOptions => JVertxOptions}
|
|
|
|
|
|
+import io.vertx.core.{AsyncResult, Handler, VertxOptions => JVertxOptions}
|
|
import io.vertx.lang.scala.{ScalaVerticle, VertxExecutionContext}
|
|
import io.vertx.lang.scala.{ScalaVerticle, VertxExecutionContext}
|
|
|
|
+import io.vertx.pgclient._
|
|
import io.vertx.scala.core.http.{HttpServer, HttpServerRequest, HttpServerResponse}
|
|
import io.vertx.scala.core.http.{HttpServer, HttpServerRequest, HttpServerResponse}
|
|
import io.vertx.scala.core.{VertxOptions, _}
|
|
import io.vertx.scala.core.{VertxOptions, _}
|
|
import io.vertx.scala.ext.web.Router
|
|
import io.vertx.scala.ext.web.Router
|
|
|
|
+import io.vertx.sqlclient._
|
|
import vertx.model.{Fortune, Message, World}
|
|
import vertx.model.{Fortune, Message, World}
|
|
|
|
|
|
import scala.collection.JavaConverters._
|
|
import scala.collection.JavaConverters._
|
|
|
|
+import scala.concurrent.{Future, Promise}
|
|
import scala.util.{Failure, Sorting, Success, Try}
|
|
import scala.util.{Failure, Sorting, Success, Try}
|
|
|
|
|
|
case class Header(name: CharSequence, value: String)
|
|
case class Header(name: CharSequence, value: String)
|
|
@@ -26,7 +28,7 @@ case class Header(name: CharSequence, value: String)
|
|
class App extends ScalaVerticle {
|
|
class App extends ScalaVerticle {
|
|
|
|
|
|
private val HELLO_WORLD = "Hello, world!"
|
|
private val HELLO_WORLD = "Hello, world!"
|
|
- private val HELLO_WORLD_BUFFER = Buffer.factory.directBuffer(HELLO_WORLD, "UTF-8")
|
|
|
|
|
|
+ private val HELLO_WORLD_BUFFER = Buffer.buffer(HELLO_WORLD, "UTF-8")
|
|
private val SERVER = "vert.x"
|
|
private val SERVER = "vert.x"
|
|
|
|
|
|
private val contentTypeJson = Header(HttpHeaders.CONTENT_TYPE, "application/json")
|
|
private val contentTypeJson = Header(HttpHeaders.CONTENT_TYPE, "application/json")
|
|
@@ -36,26 +38,33 @@ class App extends ScalaVerticle {
|
|
private var dateString: String = ""
|
|
private var dateString: String = ""
|
|
|
|
|
|
private var server: HttpServer = _
|
|
private var server: HttpServer = _
|
|
- private var client: PgClient = _
|
|
|
|
- private var pool: PgPool = _
|
|
|
|
|
|
+ private var client: PgConnection = _
|
|
|
|
|
|
private def refreshDateHeader(): Unit = dateString = App.createDateHeader()
|
|
private def refreshDateHeader(): Unit = dateString = App.createDateHeader()
|
|
|
|
|
|
- override def start(): Unit = {
|
|
|
|
|
|
+ override def startFuture(): Future[_] = {
|
|
refreshDateHeader()
|
|
refreshDateHeader()
|
|
vertx.setPeriodic(1000, (_: Long) => refreshDateHeader())
|
|
vertx.setPeriodic(1000, (_: Long) => refreshDateHeader())
|
|
|
|
|
|
- val options = new PgPoolOptions()
|
|
|
|
|
|
+ val pgConnectOptions = new PgConnectOptions()
|
|
.setDatabase(config.getString("database"))
|
|
.setDatabase(config.getString("database"))
|
|
.setHost(config.getString("host"))
|
|
.setHost(config.getString("host"))
|
|
.setPort(config.getInteger("port", 5432))
|
|
.setPort(config.getInteger("port", 5432))
|
|
.setUser(config.getString("username"))
|
|
.setUser(config.getString("username"))
|
|
.setPassword(config.getString("password"))
|
|
.setPassword(config.getString("password"))
|
|
.setCachePreparedStatements(true)
|
|
.setCachePreparedStatements(true)
|
|
|
|
+ .setPipeliningLimit(100000)
|
|
|
|
|
|
val jVertx = vertx.asJava.asInstanceOf[io.vertx.core.Vertx]
|
|
val jVertx = vertx.asJava.asInstanceOf[io.vertx.core.Vertx]
|
|
- client = PgClient.pool(jVertx, new PgPoolOptions(options).setMaxSize(1))
|
|
|
|
- pool = PgClient.pool(jVertx, new PgPoolOptions(options).setMaxSize(4))
|
|
|
|
|
|
+ val pgConnectionPromise = Promise[Unit]
|
|
|
|
+ PgConnection.connect(
|
|
|
|
+ jVertx,
|
|
|
|
+ pgConnectOptions,
|
|
|
|
+ (ar => {
|
|
|
|
+ client = ar.result()
|
|
|
|
+ pgConnectionPromise.success()
|
|
|
|
+ }): Handler[AsyncResult[PgConnection]]
|
|
|
|
+ )
|
|
|
|
|
|
val router = Router.router(vertx)
|
|
val router = Router.router(vertx)
|
|
router.get("/plaintext").handler(context => handlePlainText(context.request()))
|
|
router.get("/plaintext").handler(context => handlePlainText(context.request()))
|
|
@@ -67,7 +76,12 @@ class App extends ScalaVerticle {
|
|
|
|
|
|
val port = 8080
|
|
val port = 8080
|
|
server = vertx.createHttpServer()
|
|
server = vertx.createHttpServer()
|
|
- server.requestHandler(router.accept).listen(port)
|
|
|
|
|
|
+ val httpServerPromise = Promise[Unit]
|
|
|
|
+ server
|
|
|
|
+ .requestHandler(router.accept)
|
|
|
|
+ .listen(port, (_ => httpServerPromise.success()): Handler[AsyncResult[HttpServer]])
|
|
|
|
+
|
|
|
|
+ pgConnectionPromise.future.flatMap(_ => httpServerPromise.future)
|
|
}
|
|
}
|
|
|
|
|
|
override def stop(): Unit = Option(server).foreach(_.close())
|
|
override def stop(): Unit = Option(server).foreach(_.close())
|
|
@@ -87,25 +101,26 @@ class App extends ScalaVerticle {
|
|
.end(Message("Hello, World!").toBuffer)
|
|
.end(Message("Hello, World!").toBuffer)
|
|
|
|
|
|
private def handleDb(request: HttpServerRequest): Unit =
|
|
private def handleDb(request: HttpServerRequest): Unit =
|
|
- client.preparedQuery(
|
|
|
|
- "SELECT id, randomnumber from WORLD where id=$1",
|
|
|
|
- Tuple.of(App.randomWorld(), Nil: _*),
|
|
|
|
- (ar: AsyncResult[PgRowSet]) => {
|
|
|
|
- if (ar.succeeded) {
|
|
|
|
- val resultSet = ar.result.iterator
|
|
|
|
- if (!resultSet.hasNext) {
|
|
|
|
- request.response.setStatusCode(404).end()
|
|
|
|
|
|
+ client
|
|
|
|
+ .preparedQuery("SELECT id, randomnumber from WORLD where id=$1")
|
|
|
|
+ .execute(
|
|
|
|
+ Tuple.of(App.randomWorld(), Nil: _*),
|
|
|
|
+ (ar: AsyncResult[RowSet[Row]]) => {
|
|
|
|
+ if (ar.succeeded) {
|
|
|
|
+ val resultSet = ar.result.iterator
|
|
|
|
+ if (!resultSet.hasNext) {
|
|
|
|
+ request.response.setStatusCode(404).end()
|
|
|
|
+ } else {
|
|
|
|
+ val row = resultSet.next
|
|
|
|
+ responseWithHeaders(request.response, contentTypeJson)
|
|
|
|
+ .end(World(row.getInteger(0), row.getInteger(1)).encode())
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
- val row = resultSet.next
|
|
|
|
- responseWithHeaders(request.response, contentTypeJson)
|
|
|
|
- .end(World(row.getInteger(0), row.getInteger(1)).encode())
|
|
|
|
|
|
+ App.logger.error("Failed to handle request", ar.cause())
|
|
|
|
+ request.response.setStatusCode(500).end(ar.cause.getMessage)
|
|
}
|
|
}
|
|
- } else {
|
|
|
|
- App.logger.error("Failed to handle request", ar.cause())
|
|
|
|
- request.response.setStatusCode(500).end(ar.cause.getMessage)
|
|
|
|
}
|
|
}
|
|
- }
|
|
|
|
- )
|
|
|
|
|
|
+ )
|
|
|
|
|
|
private def handleQueries(request: HttpServerRequest): Unit = {
|
|
private def handleQueries(request: HttpServerRequest): Unit = {
|
|
val queries = App.getQueries(request)
|
|
val queries = App.getQueries(request)
|
|
@@ -113,26 +128,27 @@ class App extends ScalaVerticle {
|
|
var i = 0
|
|
var i = 0
|
|
var failed = false
|
|
var failed = false
|
|
while (i < queries) {
|
|
while (i < queries) {
|
|
- client.preparedQuery(
|
|
|
|
- "SELECT id, randomnumber from WORLD where id=$1",
|
|
|
|
- Tuple.of(App.randomWorld(), Nil: _*),
|
|
|
|
- (ar: AsyncResult[PgRowSet]) => {
|
|
|
|
- if (!failed) {
|
|
|
|
- if (ar.failed) {
|
|
|
|
- failed = true
|
|
|
|
- request.response.setStatusCode(500).end(ar.cause.getMessage)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- // we need a final reference
|
|
|
|
- val row = ar.result.iterator.next
|
|
|
|
|
|
+ client
|
|
|
|
+ .preparedQuery("SELECT id, randomnumber from WORLD where id=$1")
|
|
|
|
+ .execute(
|
|
|
|
+ Tuple.of(App.randomWorld(), Nil: _*),
|
|
|
|
+ (ar: AsyncResult[RowSet[Row]]) => {
|
|
|
|
+ if (!failed) {
|
|
|
|
+ if (ar.failed) {
|
|
|
|
+ failed = true
|
|
|
|
+ request.response.setStatusCode(500).end(ar.cause.getMessage)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ // we need a final reference
|
|
|
|
+ val row = ar.result.iterator.next
|
|
|
|
|
|
- worlds.add(World(row.getInteger(0), row.getInteger(1)))
|
|
|
|
- if (worlds.size == queries)
|
|
|
|
- responseWithHeaders(request.response, contentTypeJson)
|
|
|
|
- .end(worlds.encode)
|
|
|
|
|
|
+ worlds.add(World(row.getInteger(0), row.getInteger(1)))
|
|
|
|
+ if (worlds.size == queries)
|
|
|
|
+ responseWithHeaders(request.response, contentTypeJson)
|
|
|
|
+ .end(worlds.encode)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
- )
|
|
|
|
|
|
+ )
|
|
|
|
|
|
i += 1
|
|
i += 1
|
|
}
|
|
}
|
|
@@ -144,24 +160,24 @@ class App extends ScalaVerticle {
|
|
request.response.setStatusCode(500).end(err.getMessage)
|
|
request.response.setStatusCode(500).end(err.getMessage)
|
|
}
|
|
}
|
|
|
|
|
|
- def handleUpdates(conn: PgConnection, worlds: Array[World]): Unit = {
|
|
|
|
|
|
+ def handleUpdates(conn: SqlConnection, worlds: Array[World]): Unit = {
|
|
Sorting.quickSort(worlds)
|
|
Sorting.quickSort(worlds)
|
|
|
|
|
|
val batch = worlds.map(world => Tuple.of(world.randomNumber, world.id)).toList.asJava
|
|
val batch = worlds.map(world => Tuple.of(world.randomNumber, world.id)).toList.asJava
|
|
- conn.preparedBatch(
|
|
|
|
- "UPDATE world SET randomnumber=$1 WHERE id=$2",
|
|
|
|
- batch,
|
|
|
|
- (ar: AsyncResult[PgRowSet]) => {
|
|
|
|
- conn.close()
|
|
|
|
- if (ar.failed) {
|
|
|
|
- sendError(ar.cause)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
|
|
+ conn
|
|
|
|
+ .preparedQuery("UPDATE world SET randomnumber=$1 WHERE id=$2")
|
|
|
|
+ .executeBatch(
|
|
|
|
+ batch,
|
|
|
|
+ (ar: AsyncResult[RowSet[Row]]) => {
|
|
|
|
+ if (ar.failed) {
|
|
|
|
+ sendError(ar.cause)
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
|
|
- responseWithHeaders(request.response, contentTypeJson)
|
|
|
|
- .end(new JsonArray(worlds.toList.asJava).toBuffer)
|
|
|
|
- }
|
|
|
|
- )
|
|
|
|
|
|
+ responseWithHeaders(request.response, contentTypeJson)
|
|
|
|
+ .end(new JsonArray(worlds.toList.asJava).toBuffer)
|
|
|
|
+ }
|
|
|
|
+ )
|
|
}
|
|
}
|
|
|
|
|
|
val queries = App.getQueries(request)
|
|
val queries = App.getQueries(request)
|
|
@@ -169,65 +185,57 @@ class App extends ScalaVerticle {
|
|
var failed = false
|
|
var failed = false
|
|
var queryCount = 0
|
|
var queryCount = 0
|
|
|
|
|
|
- pool.getConnection((ar1: AsyncResult[PgConnection]) => {
|
|
|
|
- if (ar1.failed) {
|
|
|
|
- failed = true
|
|
|
|
- sendError(ar1.cause)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- val conn = ar1.result
|
|
|
|
- var i = 0
|
|
|
|
- while (i < worlds.length) {
|
|
|
|
- val id = App.randomWorld()
|
|
|
|
- val index = i
|
|
|
|
- conn.preparedQuery(
|
|
|
|
- "SELECT id, randomnumber from WORLD where id=$1",
|
|
|
|
|
|
+ var i = 0
|
|
|
|
+ while (i < worlds.length) {
|
|
|
|
+ val id = App.randomWorld()
|
|
|
|
+ val index = i
|
|
|
|
+ client
|
|
|
|
+ .preparedQuery("SELECT id, randomnumber from WORLD where id=$1")
|
|
|
|
+ .execute(
|
|
Tuple.of(id, Nil: _*),
|
|
Tuple.of(id, Nil: _*),
|
|
- (ar2: AsyncResult[PgRowSet]) => {
|
|
|
|
|
|
+ (ar2: AsyncResult[RowSet[Row]]) => {
|
|
if (!failed) {
|
|
if (!failed) {
|
|
if (ar2.failed) {
|
|
if (ar2.failed) {
|
|
- conn.close()
|
|
|
|
failed = true
|
|
failed = true
|
|
sendError(ar2.cause)
|
|
sendError(ar2.cause)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
worlds(index) = World(ar2.result.iterator.next.getInteger(0), App.randomWorld())
|
|
worlds(index) = World(ar2.result.iterator.next.getInteger(0), App.randomWorld())
|
|
queryCount += 1
|
|
queryCount += 1
|
|
- if (queryCount == worlds.length) handleUpdates(conn, worlds)
|
|
|
|
|
|
+ if (queryCount == worlds.length) handleUpdates(client, worlds)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
)
|
|
)
|
|
|
|
|
|
- i += 1
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- })
|
|
|
|
|
|
+ i += 1
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
private def handleFortunes(request: HttpServerRequest): Unit =
|
|
private def handleFortunes(request: HttpServerRequest): Unit =
|
|
- client.preparedQuery(
|
|
|
|
- "SELECT id, message from FORTUNE",
|
|
|
|
- (ar: AsyncResult[PgRowSet]) => {
|
|
|
|
- val response = request.response
|
|
|
|
- if (ar.succeeded) {
|
|
|
|
- val resultSet = ar.result.iterator
|
|
|
|
- if (!resultSet.hasNext) {
|
|
|
|
- response.setStatusCode(404).end("No results")
|
|
|
|
- return
|
|
|
|
|
|
+ client
|
|
|
|
+ .preparedQuery("SELECT id, message from FORTUNE")
|
|
|
|
+ .execute(
|
|
|
|
+ (ar: AsyncResult[RowSet[Row]]) => {
|
|
|
|
+ val response = request.response
|
|
|
|
+ if (ar.succeeded) {
|
|
|
|
+ val resultSet = ar.result.iterator
|
|
|
|
+ if (!resultSet.hasNext) {
|
|
|
|
+ response.setStatusCode(404).end("No results")
|
|
|
|
+ return
|
|
|
|
+ }
|
|
|
|
+ val fortunes = (resultSet.asScala
|
|
|
|
+ .map(row => Fortune(row.getInteger(0), row.getString(1))) ++
|
|
|
|
+ Seq(Fortune(0, "Additional fortune added at request time."))).toArray
|
|
|
|
+ Sorting.quickSort(fortunes)
|
|
|
|
+ responseWithHeaders(request.response, contentTypeHtml)
|
|
|
|
+ .end(html.fortune(fortunes).body)
|
|
|
|
+ } else {
|
|
|
|
+ val err = ar.cause
|
|
|
|
+ App.logger.error("", err)
|
|
|
|
+ response.setStatusCode(500).end(err.getMessage)
|
|
}
|
|
}
|
|
- val fortunes = (resultSet.asScala
|
|
|
|
- .map(row => Fortune(row.getInteger(0), row.getString(1))) ++
|
|
|
|
- Seq(Fortune(0, "Additional fortune added at request time."))).toArray
|
|
|
|
- Sorting.quickSort(fortunes)
|
|
|
|
- responseWithHeaders(request.response, contentTypeHtml)
|
|
|
|
- .end(html.fortune(fortunes).body)
|
|
|
|
- } else {
|
|
|
|
- val err = ar.cause
|
|
|
|
- App.logger.error("", err)
|
|
|
|
- response.setStatusCode(500).end(err.getMessage)
|
|
|
|
}
|
|
}
|
|
- }
|
|
|
|
- )
|
|
|
|
|
|
+ )
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|