Application.java 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package controllers;
  2. import akka.dispatch.ExecutionContexts;
  3. import models.World;
  4. import play.Play;
  5. import play.core.NamedThreadFactory;
  6. import play.libs.F;
  7. import play.libs.Json;
  8. import play.mvc.Controller;
  9. import play.mvc.Result;
  10. import scala.concurrent.ExecutionContext;
  11. import utils.Predicate;
  12. import utils.Predicated;
  13. import java.util.ArrayList;
  14. import java.util.List;
  15. import java.util.Random;
  16. import java.util.concurrent.*;
  17. public class Application extends Controller {
  18. private static final int MAX_QUERIES_PER_REQUEST = 20;
  19. private static final int TEST_DATABASE_ROWS = 10000;
  20. private static final int partitionCount = Play.application().configuration().getInt("db.default.partitionCount");
  21. private static final int maxConnections =
  22. partitionCount * Play.application().configuration().getInt("db.default.maxConnectionsPerPartition");
  23. private static final int minConnections =
  24. partitionCount * Play.application().configuration().getInt("db.default.minConnectionsPerPartition");
  25. private static final ThreadPoolExecutor tpe = new ThreadPoolExecutor(minConnections, maxConnections,
  26. 0L, TimeUnit.MILLISECONDS,
  27. new LinkedBlockingQueue<Runnable>(),
  28. new NamedThreadFactory("dbEc"));
  29. private static final ExecutionContext dbEc = ExecutionContexts.fromExecutorService(tpe);
  30. // A predicate for checking our ability to service database requests is determined by ensuring that the request
  31. // queue doesn't fill up beyond a certain threshold. For convenience we use the max number of connections * the max
  32. // # of db requests per web request to determine this threshold. It is a rough check as we don't know how many
  33. // queries we're going to make or what other threads are running in parallel etc. Nevertheless, the check is
  34. // adequate in order to throttle the acceptance of requests to the size of the pool.
  35. public static class IsDbAvailable implements Predicate {
  36. @Override
  37. public boolean condition() {
  38. return tpe.getQueue().size() < maxConnections * MAX_QUERIES_PER_REQUEST;
  39. }
  40. }
  41. @Predicated(predicate = IsDbAvailable.class, failed = SERVICE_UNAVAILABLE)
  42. public static F.Promise<Result> db(final Integer queries) {
  43. final Random random = ThreadLocalRandom.current();
  44. final List<F.Promise<? extends World>> promises = new ArrayList<F.Promise<? extends World>>(queries);
  45. for (int i = 0; i < queries; ++i) {
  46. final F.Promise<World> p = F.Promise.promise(new F.Function0<World>() {
  47. @Override
  48. public World apply() throws Throwable {
  49. return World.find.byId(Long.valueOf(random.nextInt(TEST_DATABASE_ROWS) + 1));
  50. }
  51. }, dbEc);
  52. promises.add(p);
  53. }
  54. return F.Promise.sequence(promises).map(new F.Function<List<World>, Result>() {
  55. @Override
  56. public Result apply(List<World> worlds) {
  57. return ok(Json.toJson(worlds));
  58. }
  59. });
  60. }
  61. }