Browse Source

Merge pull request #1439 from denkab/master

Increase throughput for Grizzly and undertow by parallelizing DB access
Brittany Mazza 10 years ago
parent
commit
ce09e8481c

+ 1 - 0
frameworks/Java/grizzly-jersey/source_code

@@ -1,6 +1,7 @@
 ./grizzly-jersey/src/main/
 ./grizzly-jersey/src/main/java
 ./grizzly-jersey/src/main/java/hello
+./grizzly-jersey/src/main/java/hello/Common.java
 ./grizzly-jersey/src/main/java/hello/DbResource.java
 ./grizzly-jersey/src/main/java/hello/FortunesResource.java
 ./grizzly-jersey/src/main/java/hello/SessionFactoryProvider.java

+ 20 - 0
frameworks/Java/grizzly-jersey/src/main/java/hello/Common.java

@@ -0,0 +1,20 @@
+package hello;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author denkab
+ */
+public class Common {
+
+    private static final int cpuCount = Runtime.getRuntime().availableProcessors();
+
+    public static ExecutorService EXECUTOR = new ThreadPoolExecutor(
+        cpuCount * 2, cpuCount * 25, 200, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>(cpuCount * 100),
+        new ThreadPoolExecutor.CallerRunsPolicy());
+
+}

+ 38 - 6
frameworks/Java/grizzly-jersey/src/main/java/hello/DbResource.java

@@ -3,7 +3,12 @@ package hello;
 import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
 import hello.domain.World;
 
+import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 
 import javax.ws.rs.DefaultValue;
@@ -13,6 +18,7 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 
+import org.hibernate.IdentifierLoadAccess;
 import org.hibernate.Session;
 import org.hibernate.SessionFactory;
 
@@ -29,17 +35,43 @@ public class DbResource {
   
   @GET
   @Produces(APPLICATION_JSON + "; charset=utf-8")
-  public Object db(@QueryParam("queries") @DefaultValue("1") final int queries) {
+  public Object db(@QueryParam("queries") String queriesParam)
+      throws ExecutionException, InterruptedException {
+
+    final int queries = getQueries(queriesParam);
     final World[] worlds = new World[queries];
     final Random random = ThreadLocalRandom.current();
     final Session session = sessionFactory.openSession();
-    
+    session.setDefaultReadOnly(true);
+    final IdentifierLoadAccess accessor = session.byId(World.class);
+
+    Map<Integer, Future<World>> futureWorlds = new ConcurrentHashMap<>();
+    for (int i = 0; i < queries; i++) {
+      futureWorlds.put(i, Common.EXECUTOR.submit(
+        new Callable<World>() {
+          @Override
+          public World call() throws Exception {
+            return (World) accessor.load(random.nextInt(DB_ROWS) + 1);
+          }
+        }
+      ));
+    }
+
     for (int i = 0; i < queries; i++) {
-        worlds[i] = (World) session.byId(World.class).load(random.nextInt(DB_ROWS) + 1);
+      worlds[i] = futureWorlds.get(i).get();
     }
-    
-    session.close();
+
     return queries == 1 ? worlds[0] : worlds;
   }
-  
+
+  private int getQueries(String proto) {
+    int result = 1;
+    try {
+      result = Integer.parseInt(proto);
+    } catch (NumberFormatException e) {
+      e.printStackTrace();
+    }
+
+    return Math.min(500, Math.max(1, result));
+  }
 }

+ 3 - 3
frameworks/Java/grizzly-jersey/src/main/resources/hibernate.cfg.xml

@@ -8,9 +8,9 @@
     <property name="hibernate.dialect">org.hibernate.dialect.MySQLDialect</property>
     <property name="hibernate.cache.use_query_cache">false</property>
     <property name="hibernate.show_sql">false</property>
-    <property name="hibernate.c3p0.min_size">5</property>
-    <property name="hibernate.c3p0.max_size">40</property>
+    <property name="hibernate.c3p0.min_size">256</property>
+    <property name="hibernate.c3p0.max_size">256</property>
     <property name="hibernate.c3p0.timeout">1800</property>
-    <property name="hibernate.c3p0.max_statements">50</property>
+    <property name="hibernate.c3p0.max_statements">2048</property>
   </session-factory>
 </hibernate-configuration>

+ 26 - 12
frameworks/Java/undertow/src/main/java/hello/DbSqlHandler.java

@@ -11,7 +11,11 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 
 import static hello.HelloWebServer.JSON_UTF8;
 
@@ -43,19 +47,29 @@ final class DbSqlHandler implements HttpHandler {
     }
     
     World[] worlds = new World[queries];
-    try (Connection connection = database.getConnection();
-         PreparedStatement statement = connection.prepareStatement(
-             "SELECT * FROM World WHERE id = ?",
-             ResultSet.TYPE_FORWARD_ONLY,
-             ResultSet.CONCUR_READ_ONLY)) {
+    try (final Connection connection = database.getConnection()) {
+      Map<Integer, Future<World>> futureWorlds = new ConcurrentHashMap<>();
       for (int i = 0; i < queries; i++) {
-        statement.setInt(1, Helper.randomWorld());
-        try (ResultSet resultSet = statement.executeQuery()) {
-          resultSet.next();
-          worlds[i] = new World(
-              resultSet.getInt("id"),
-              resultSet.getInt("randomNumber"));
-        }
+        futureWorlds.put(i, Helper.EXECUTOR.submit(new Callable<World>(){
+          @Override
+          public World call() throws Exception {
+            try (PreparedStatement statement = connection.prepareStatement(
+                "SELECT * FROM World WHERE id = ?",
+                ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+
+              statement.setInt(1, Helper.randomWorld());
+              ResultSet resultSet = statement.executeQuery();
+              resultSet.next();
+              return new World(
+                resultSet.getInt("id"),
+                resultSet.getInt("randomNumber"));
+            }
+          }
+        }));
+      }
+
+      for (int i = 0; i < queries; i++) {
+        worlds[i] = futureWorlds.get(i).get();
       }
     }
     exchange.getResponseHeaders().put(

+ 14 - 0
frameworks/Java/undertow/src/main/java/hello/Helper.java

@@ -9,7 +9,11 @@ import org.apache.commons.pool.impl.GenericObjectPool;
 
 import javax.sql.DataSource;
 import java.util.Deque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Provides utility methods for the benchmark tests.
@@ -79,4 +83,14 @@ final class Helper {
   static int randomWorld() {
     return 1 + ThreadLocalRandom.current().nextInt(10000);
   }
+
+  private static final int cpuCount = Runtime.getRuntime().availableProcessors();
+
+  // todo: parameterize multipliers
+  public static ExecutorService EXECUTOR =
+    new ThreadPoolExecutor(
+      cpuCount * 2, cpuCount * 25, 200, TimeUnit.MILLISECONDS,
+      new LinkedBlockingQueue<Runnable>(cpuCount * 100),
+      new ThreadPoolExecutor.CallerRunsPolicy());
+
 }

+ 34 - 20
frameworks/Java/undertow/src/main/java/hello/UpdatesSqlHandler.java

@@ -9,7 +9,11 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 
 import static hello.HelloWebServer.JSON_UTF8;
 
@@ -33,27 +37,37 @@ final class UpdatesSqlHandler implements HttpHandler {
     }
     int queries = Helper.getQueries(exchange);
     World[] worlds = new World[queries];
-    try (Connection connection = database.getConnection();
-         PreparedStatement query = connection.prepareStatement(
-             "SELECT * FROM World WHERE id = ?",
-             ResultSet.TYPE_FORWARD_ONLY,
-             ResultSet.CONCUR_READ_ONLY);
-         PreparedStatement update = connection.prepareStatement(
-             "UPDATE World SET randomNumber = ? WHERE id= ?")) {
+    try (final Connection connection = database.getConnection()) {
+      Map<Integer, Future<World>> futureWorlds = new ConcurrentHashMap<>();
       for (int i = 0; i < queries; i++) {
-        query.setInt(1, Helper.randomWorld());
-        World world;
-        try (ResultSet resultSet = query.executeQuery()) {
-          resultSet.next();
-          world = new World(
-              resultSet.getInt("id"),
-              resultSet.getInt("randomNumber"));
-        }
-        world.randomNumber = Helper.randomWorld();
-        update.setInt(1, world.randomNumber);
-        update.setInt(2, world.id);
-        update.executeUpdate();
-        worlds[i] = world;
+        futureWorlds.put(i, Helper.EXECUTOR.submit(new Callable<World>() {
+          @Override
+          public World call() throws Exception {
+            World world;
+            try (PreparedStatement update = connection.prepareStatement(
+                "UPDATE World SET randomNumber = ? WHERE id= ?")) {
+              try (PreparedStatement query = connection.prepareStatement(
+                  "SELECT * FROM World WHERE id = ?",
+                  ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+
+                query.setInt(1, Helper.randomWorld());
+                ResultSet resultSet = query.executeQuery();
+                resultSet.next();
+                world = new World(
+                  resultSet.getInt("id"),
+                  resultSet.getInt("randomNumber"));
+              }
+              world.randomNumber = Helper.randomWorld();
+              update.setInt(1, world.randomNumber);
+              update.setInt(2, world.id);
+              update.executeUpdate();
+              return world;
+            }
+          }
+        }));
+      }
+      for (int i = 0; i < queries; i++) {
+        worlds[i] = futureWorlds.get(i).get();
       }
     }
     exchange.getResponseHeaders().put(