Browse Source

Applied a back-pressure check to ensure that the database does not get overloaded

Christopher Hunt 12 years ago
parent
commit
86165eedb1

+ 42 - 18
play-scala/app/controllers/Application.scala

@@ -1,34 +1,58 @@
 package controllers
 
-import play.api._
+import play.api.Play.current
 import play.api.mvc._
 import play.api.libs.json.Json
-import play.api.libs.concurrent._
-import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent._
 import scala.concurrent._
 import models._
+import utils._
+import scala.concurrent.Future
+
+import play.api.libs.concurrent.Execution.Implicits._
 
 object Application extends Controller {
-  
-  private val TEST_DATABASE_ROWS = 10000
+
+  private val TestDatabaseRows = 10000
+
+  private val partitionCount = current.configuration.getInt("db.default.partitionCount").getOrElse(2)
+  private val maxConnections =
+    partitionCount * current.configuration.getInt("db.default.maxConnectionsPerPartition").getOrElse(5)
+  private val minConnections =
+    partitionCount * current.configuration.getInt("db.default.minConnectionsPerPartition").getOrElse(5)
+
+  private val tpe = new ThreadPoolExecutor(minConnections, maxConnections,
+    0L, TimeUnit.MILLISECONDS,
+    new LinkedBlockingQueue[Runnable]())
+  private val dbEc = ExecutionContext.fromExecutorService(tpe)
+
+  // A predicate for checking our ability to service database requests is determined by ensuring that the request
+  // queue doesn't fill up beyond a certain threshold. For convenience we use the max number of connections
+  // to determine this threshold. It is a rough check as we don't know how many queries we're going
+  // to make or what other threads are running in parallel etc. Nevertheless, the check is adequate in order to
+  // throttle the acceptance of requests to the size of the pool.
+  def isDbAvailable: Boolean = (tpe.getQueue.size() < maxConnections)
+
 
   def json() = Action {
-    Ok(Json.obj("message" -> "Hello World!"))   
+    Ok(Json.obj("message" -> "Hello World!"))
   }
 
-  def db(queries: Int) = Action {
-    import play.api.libs.concurrent.Execution.Implicits._
-
-    Async {
-      val random = ThreadLocalRandom.current()
+  def db(queries: Int) = PredicatedAction(isDbAvailable, ServiceUnavailable) {
+    Action {
+      Async {
+        val random = ThreadLocalRandom.current()
 
-      val worlds = Future.sequence( (for {
-        _ <- (1 to queries).par
-      } yield Future(World.findById(random.nextInt(TEST_DATABASE_ROWS) + 1))).toList)
+        val worlds = Future.sequence((for {
+          _ <- 1 to queries
+        } yield Future(World.findById(random.nextInt(TestDatabaseRows) + 1))(dbEc)
+          ).toList)
 
-      worlds map {
-        w => Ok(Json.toJson(w))  
-      } 
+        worlds map {
+          w => Ok(Json.toJson(w))
+        }
+      }
     }
-  }     
+  }
+
 }

+ 19 - 0
play-scala/app/utils/PredicatedAction.scala

@@ -0,0 +1,19 @@
+package utils
+
+import play.api.mvc._
+
+/**
+ * A predicated action is one where a condition must be satisfied in order to proceed with the request. If the
+ * condition is not satisfied then a supplied status result is yielded.
+ */
+class PredicatedActionBuilder {
+  def apply[A](p: => Boolean, failed: Result)(action: Action[A]): Action[A] = new Action[A] {
+    def apply(request: Request[A]): Result = {
+      if (p) action(request) else failed
+    }
+
+    lazy val parser = action.parser
+  }
+}
+
+object PredicatedAction extends PredicatedActionBuilder