Browse Source

Merge pull request #124 from huntc/backpressure-checking

Backpressure checking
Patrick Falls 12 years ago
parent
commit
6ab9862f9d

+ 32 - 6
play-java/app/controllers/Application.java

@@ -1,8 +1,10 @@
 package controllers;
 package controllers;
 
 
+import akka.dispatch.ExecutionContexts;
 import akka.dispatch.Futures;
 import akka.dispatch.Futures;
 import models.World;
 import models.World;
-import play.api.libs.concurrent.Promise;
+import play.Play;
+import play.core.NamedThreadFactory;
 import play.libs.Akka;
 import play.libs.Akka;
 import play.libs.F;
 import play.libs.F;
 import play.libs.Json;
 import play.libs.Json;
@@ -14,13 +16,13 @@ import org.codehaus.jackson.map.ObjectMapper;
 import play.mvc.Controller;
 import play.mvc.Controller;
 import play.mvc.Result;
 import play.mvc.Result;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext;
+import utils.Predicate;
+import utils.Predicated;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Random;
 import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 
 public class Application extends Controller {
 public class Application extends Controller {
 
 
@@ -28,7 +30,30 @@ public class Application extends Controller {
     //http://stackoverflow.com/questions/3907929/should-i-make-jacksons-objectmapper-as-static-final
     //http://stackoverflow.com/questions/3907929/should-i-make-jacksons-objectmapper-as-static-final
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
 
-    private static final ExecutionContext DB_EC = Akka.system().dispatchers().lookup("akka.actor.db");
+    private static final int partitionCount = Play.application().configuration().getInt("db.default.partitionCount");
+    private static final int maxConnections =
+            partitionCount * Play.application().configuration().getInt("db.default.maxConnectionsPerPartition");
+    private static final int minConnections =
+            partitionCount * Play.application().configuration().getInt("db.default.minConnectionsPerPartition");
+
+    private static final ThreadPoolExecutor tpe = new ThreadPoolExecutor(minConnections, maxConnections,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new NamedThreadFactory("dbEc"));
+    private static final ExecutionContext dbEc = ExecutionContexts.fromExecutorService(tpe);
+
+    // A predicate for checking our ability to service database requests is determined by ensuring that the request
+    // queue doesn't fill up beyond a certain threshold. For convenience we use the max number of connections
+    // to determine this threshold. It is a rough check as we don't know how many queries we're going
+    // to make or what other threads are running in parallel etc. Nevertheless, the check is adequate in order to
+    // throttle the acceptance of requests to the size of the pool.
+    public static class IsDbAvailable implements Predicate {
+        @Override
+        public boolean condition() {
+            return (tpe.getQueue().size() < maxConnections);
+        }
+    }
+
 
 
     public static Result json() {
     public static Result json() {
         final ObjectNode result = OBJECT_MAPPER.createObjectNode();
         final ObjectNode result = OBJECT_MAPPER.createObjectNode();
@@ -36,6 +61,7 @@ public class Application extends Controller {
         return ok(result);
         return ok(result);
     }
     }
 
 
+    @Predicated(predicate = IsDbAvailable.class, failed = SERVICE_UNAVAILABLE)
     public static Result db(final Integer queries) {
     public static Result db(final Integer queries) {
         return async(
         return async(
                 future(new Callable<Result>() {
                 future(new Callable<Result>() {
@@ -48,7 +74,7 @@ public class Application extends Controller {
                             // an issue that will address this though: https://github.com/playframework/Play20/issues/972
                             // an issue that will address this though: https://github.com/playframework/Play20/issues/972
                             // Meanwhile we call the Akka future directly and wrap its result in a promise.
                             // Meanwhile we call the Akka future directly and wrap its result in a promise.
                             final F.Promise p = Akka.asPromise(Futures.future(
                             final F.Promise p = Akka.asPromise(Futures.future(
-                                    findWorld(Long.valueOf(random.nextInt(TEST_DATABASE_ROWS) + 1)), DB_EC));
+                                    findWorld(Long.valueOf(random.nextInt(TEST_DATABASE_ROWS) + 1)), dbEc));
                             promises.add(p);
                             promises.add(p);
                         }
                         }
                         final List<World> worlds = F.Promise.sequence(promises).get(5L * queries, TimeUnit.SECONDS);
                         final List<World> worlds = F.Promise.sequence(promises).get(5L * queries, TimeUnit.SECONDS);

+ 8 - 0
play-java/app/utils/Predicate.java

@@ -0,0 +1,8 @@
+package utils;
+
+/**
+ * Predicates for PredicatedActions.
+ */
+public interface Predicate {
+    boolean condition();
+}

+ 26 - 0
play-java/app/utils/Predicated.java

@@ -0,0 +1,26 @@
+package utils;
+
+import play.mvc.With;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Declares a composing action that will check for a condition before deciding on whether to proceed with the request.
+ */
+@With(PredicatedAction.class)
+@Target({ElementType.TYPE, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Predicated {
+    /**
+     * The condition.
+     */
+    Class<? extends Predicate> predicate();
+
+    /**
+     * The http status code to return if the condition fails.
+     */
+    int failed();
+}

+ 22 - 0
play-java/app/utils/PredicatedAction.java

@@ -0,0 +1,22 @@
+package utils;
+
+/**
+ * A predicated action is one where a condition must be satisfied in order to proceed with the request. If the
+ * condition is not satisfied then a supplied status result is yielded.
+ */
+
+import play.mvc.Action;
+import play.mvc.Http;
+import play.mvc.Result;
+
+public class PredicatedAction extends Action<Predicated> {
+    @Override
+    public Result call(final Http.Context ctx) throws Throwable {
+        final Predicate p = configuration.predicate().newInstance();
+        if (p.condition()) {
+            return delegate.call(ctx);
+        } else {
+            return status(configuration.failed());
+        }
+    }
+}

+ 0 - 11
play-java/conf/application.conf

@@ -87,17 +87,6 @@ play {
           parallelism-max = 300
           parallelism-max = 300
         }
         }
       }	
       }	
-      db = {
-        executor = "thread-pool-executor"
-        thread-pool-executor {
-          core-pool-size-factor = 1.0
-          core-pool-size-min = 256
-          core-pool-size-max = 256
-          max-pool-size-factor = 1.0
-          max-pool-size-min = 256
-          max-pool-size-max = 256
-        }
-      }
     }
     }
   }
   }
 }
 }

+ 42 - 19
play-scala/app/controllers/Application.scala

@@ -3,35 +3,58 @@ package controllers
 import play.api.Play.current
 import play.api.Play.current
 import play.api.mvc._
 import play.api.mvc._
 import play.api.libs.json.Json
 import play.api.libs.json.Json
-import play.api.libs.concurrent._
-import java.util.concurrent.ThreadLocalRandom
+import java.util.concurrent._
 import scala.concurrent._
 import scala.concurrent._
 import models._
 import models._
+import utils._
+import scala.concurrent.Future
+
+import play.api.libs.concurrent.Execution.Implicits._
+import play.core.NamedThreadFactory
 
 
 object Application extends Controller {
 object Application extends Controller {
-  
-  private val TEST_DATABASE_ROWS = 10000
 
 
-  private val dbEc = Akka.system.dispatchers.lookup("akka.actor.db")
+  private val TestDatabaseRows = 10000
+
+  private val partitionCount = current.configuration.getInt("db.default.partitionCount").getOrElse(2)
+  private val maxConnections =
+    partitionCount * current.configuration.getInt("db.default.maxConnectionsPerPartition").getOrElse(5)
+  private val minConnections =
+    partitionCount * current.configuration.getInt("db.default.minConnectionsPerPartition").getOrElse(5)
+
+  private val tpe = new ThreadPoolExecutor(minConnections, maxConnections,
+    0L, TimeUnit.MILLISECONDS,
+    new LinkedBlockingQueue[Runnable](),
+    new NamedThreadFactory("dbEc"))
+  private val dbEc = ExecutionContext.fromExecutorService(tpe)
+
+  // A predicate for checking our ability to service database requests is determined by ensuring that the request
+  // queue doesn't fill up beyond a certain threshold. For convenience we use the max number of connections
+  // to determine this threshold. It is a rough check as we don't know how many queries we're going
+  // to make or what other threads are running in parallel etc. Nevertheless, the check is adequate in order to
+  // throttle the acceptance of requests to the size of the pool.
+  def isDbAvailable: Boolean = (tpe.getQueue.size() < maxConnections)
+
 
 
   def json() = Action {
   def json() = Action {
-    Ok(Json.obj("message" -> "Hello World!"))   
+    Ok(Json.obj("message" -> "Hello World!"))
   }
   }
 
 
-  def db(queries: Int) = Action {
-    import play.api.libs.concurrent.Execution.Implicits._
-
-    Async {
-      val random = ThreadLocalRandom.current()
+  def db(queries: Int) = PredicatedAction(isDbAvailable, ServiceUnavailable) {
+    Action {
+      Async {
+        val random = ThreadLocalRandom.current()
 
 
-      val worlds = Future.sequence( (for {
-            _ <- 1 to queries
-          } yield Future(World.findById(random.nextInt(TEST_DATABASE_ROWS) + 1))(dbEc)
-        ).toList)
+        val worlds = Future.sequence((for {
+          _ <- 1 to queries
+        } yield Future(World.findById(random.nextInt(TestDatabaseRows) + 1))(dbEc)
+          ).toList)
 
 
-      worlds map {
-        w => Ok(Json.toJson(w))  
-      } 
+        worlds map {
+          w => Ok(Json.toJson(w))
+        }
+      }
     }
     }
-  }     
+  }
+
 }
 }

+ 19 - 0
play-scala/app/utils/PredicatedAction.scala

@@ -0,0 +1,19 @@
+package utils
+
+import play.api.mvc._
+
+/**
+ * A predicated action is one where a condition must be satisfied in order to proceed with the request. If the
+ * condition is not satisfied then a supplied status result is yielded.
+ */
+class PredicatedActionBuilder {
+  def apply[A](p: => Boolean, failed: => Result)(action: Action[A]): Action[A] = new Action[A] {
+    def apply(request: Request[A]): Result = {
+      if (p) action(request) else failed
+    }
+
+    lazy val parser = action.parser
+  }
+}
+
+object PredicatedAction extends PredicatedActionBuilder

+ 0 - 11
play-scala/conf/application.conf

@@ -82,17 +82,6 @@ play {
           parallelism-max = 300
           parallelism-max = 300
         }
         }
       }
       }
-      db = {
-        executor = "thread-pool-executor"
-        thread-pool-executor {
-          core-pool-size-factor = 1.0
-          core-pool-size-min = 256
-          core-pool-size-max = 256
-          max-pool-size-factor = 1.0
-          max-pool-size-min = 256
-          max-pool-size-max = 256
-        }
-      }
     }
     }
   }
   }
 }
 }