Browse Source

Added back-pressure capabilities to play-java also.

Christopher Hunt 12 years ago
parent
commit
3df6617881

+ 32 - 6
play-java/app/controllers/Application.java

@@ -1,8 +1,10 @@
 package controllers;
 package controllers;
 
 
+import akka.dispatch.ExecutionContexts;
 import akka.dispatch.Futures;
 import akka.dispatch.Futures;
 import models.World;
 import models.World;
-import play.api.libs.concurrent.Promise;
+import play.Play;
+import play.core.NamedThreadFactory;
 import play.libs.Akka;
 import play.libs.Akka;
 import play.libs.F;
 import play.libs.F;
 import play.libs.Json;
 import play.libs.Json;
@@ -14,13 +16,13 @@ import org.codehaus.jackson.map.ObjectMapper;
 import play.mvc.Controller;
 import play.mvc.Controller;
 import play.mvc.Result;
 import play.mvc.Result;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext;
+import utils.Predicate;
+import utils.Predicated;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 import java.util.Random;
 import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 
 public class Application extends Controller {
 public class Application extends Controller {
 
 
@@ -28,7 +30,30 @@ public class Application extends Controller {
     //http://stackoverflow.com/questions/3907929/should-i-make-jacksons-objectmapper-as-static-final
     //http://stackoverflow.com/questions/3907929/should-i-make-jacksons-objectmapper-as-static-final
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
 
-    private static final ExecutionContext DB_EC = Akka.system().dispatchers().lookup("akka.actor.db");
+    private static final int partitionCount = Play.application().configuration().getInt("db.default.partitionCount");
+    private static final int maxConnections =
+            partitionCount * Play.application().configuration().getInt("db.default.maxConnectionsPerPartition");
+    private static final int minConnections =
+            partitionCount * Play.application().configuration().getInt("db.default.minConnectionsPerPartition");
+
+    private static final ThreadPoolExecutor tpe = new ThreadPoolExecutor(minConnections, maxConnections,
+            0L, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new NamedThreadFactory("dbEc"));
+    private static final ExecutionContext dbEc = ExecutionContexts.fromExecutorService(tpe);
+
+    // A predicate for checking our ability to service database requests is determined by ensuring that the request
+    // queue doesn't fill up beyond a certain threshold. For convenience we use the max number of connections
+    // to determine this threshold. It is a rough check as we don't know how many queries we're going
+    // to make or what other threads are running in parallel etc. Nevertheless, the check is adequate in order to
+    // throttle the acceptance of requests to the size of the pool.
+    public static class IsDbAvailable implements Predicate {
+        @Override
+        public boolean condition() {
+            return (tpe.getQueue().size() < maxConnections);
+        }
+    }
+
 
 
     public static Result json() {
     public static Result json() {
         final ObjectNode result = OBJECT_MAPPER.createObjectNode();
         final ObjectNode result = OBJECT_MAPPER.createObjectNode();
@@ -36,6 +61,7 @@ public class Application extends Controller {
         return ok(result);
         return ok(result);
     }
     }
 
 
+    @Predicated(predicate = IsDbAvailable.class, failed = SERVICE_UNAVAILABLE)
     public static Result db(final Integer queries) {
     public static Result db(final Integer queries) {
         return async(
         return async(
                 future(new Callable<Result>() {
                 future(new Callable<Result>() {
@@ -48,7 +74,7 @@ public class Application extends Controller {
                             // an issue that will address this though: https://github.com/playframework/Play20/issues/972
                             // an issue that will address this though: https://github.com/playframework/Play20/issues/972
                             // Meanwhile we call the Akka future directly and wrap its result in a promise.
                             // Meanwhile we call the Akka future directly and wrap its result in a promise.
                             final F.Promise p = Akka.asPromise(Futures.future(
                             final F.Promise p = Akka.asPromise(Futures.future(
-                                    findWorld(Long.valueOf(random.nextInt(TEST_DATABASE_ROWS) + 1)), DB_EC));
+                                    findWorld(Long.valueOf(random.nextInt(TEST_DATABASE_ROWS) + 1)), dbEc));
                             promises.add(p);
                             promises.add(p);
                         }
                         }
                         final List<World> worlds = F.Promise.sequence(promises).get(5L * queries, TimeUnit.SECONDS);
                         final List<World> worlds = F.Promise.sequence(promises).get(5L * queries, TimeUnit.SECONDS);

+ 8 - 0
play-java/app/utils/Predicate.java

@@ -0,0 +1,8 @@
+package utils;
+
+/**
+ * Predicates for PredicatedActions.
+ */
+public interface Predicate {
+    boolean condition();
+}

+ 26 - 0
play-java/app/utils/Predicated.java

@@ -0,0 +1,26 @@
+package utils;
+
+import play.mvc.With;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Declares a composing action that will check for a condition before deciding on whether to proceed with the request.
+ */
+@With(PredicatedAction.class)
+@Target({ElementType.TYPE, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface Predicated {
+    /**
+     * The condition.
+     */
+    Class<? extends Predicate> predicate();
+
+    /**
+     * The http status code to return if the condition fails.
+     */
+    int failed();
+}

+ 22 - 0
play-java/app/utils/PredicatedAction.java

@@ -0,0 +1,22 @@
+package utils;
+
+/**
+ * A predicated action is one where a condition must be satisfied in order to proceed with the request. If the
+ * condition is not satisfied then a supplied status result is yielded.
+ */
+
+import play.mvc.Action;
+import play.mvc.Http;
+import play.mvc.Result;
+
+public class PredicatedAction extends Action<Predicated> {
+    @Override
+    public Result call(final Http.Context ctx) throws Throwable {
+        final Predicate p = configuration.predicate().newInstance();
+        if (p.condition()) {
+            return delegate.call(ctx);
+        } else {
+            return status(configuration.failed());
+        }
+    }
+}

+ 0 - 11
play-java/conf/application.conf

@@ -87,17 +87,6 @@ play {
           parallelism-max = 300
           parallelism-max = 300
         }
         }
       }	
       }	
-      db = {
-        executor = "thread-pool-executor"
-        thread-pool-executor {
-          core-pool-size-factor = 1.0
-          core-pool-size-min = 256
-          core-pool-size-max = 256
-          max-pool-size-factor = 1.0
-          max-pool-size-min = 256
-          max-pool-size-max = 256
-        }
-      }
     }
     }
   }
   }
 }
 }

+ 3 - 1
play-scala/app/controllers/Application.scala

@@ -10,6 +10,7 @@ import utils._
 import scala.concurrent.Future
 import scala.concurrent.Future
 
 
 import play.api.libs.concurrent.Execution.Implicits._
 import play.api.libs.concurrent.Execution.Implicits._
+import play.core.NamedThreadFactory
 
 
 object Application extends Controller {
 object Application extends Controller {
 
 
@@ -23,7 +24,8 @@ object Application extends Controller {
 
 
   private val tpe = new ThreadPoolExecutor(minConnections, maxConnections,
   private val tpe = new ThreadPoolExecutor(minConnections, maxConnections,
     0L, TimeUnit.MILLISECONDS,
     0L, TimeUnit.MILLISECONDS,
-    new LinkedBlockingQueue[Runnable]())
+    new LinkedBlockingQueue[Runnable](),
+    new NamedThreadFactory("dbEc"))
   private val dbEc = ExecutionContext.fromExecutorService(tpe)
   private val dbEc = ExecutionContext.fromExecutorService(tpe)
 
 
   // A predicate for checking our ability to service database requests is determined by ensuring that the request
   // A predicate for checking our ability to service database requests is determined by ensuring that the request