Browse Source

parallelize database access

denka 10 years ago
parent
commit
95e595b669

+ 22 - 8
frameworks/Java/undertow/src/main/java/hello/DbSqlHandler.java

@@ -11,7 +11,11 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 
 
 import static hello.HelloWebServer.JSON_UTF8;
 import static hello.HelloWebServer.JSON_UTF8;
 
 
@@ -44,18 +48,28 @@ final class DbSqlHandler implements HttpHandler {
     
     
     World[] worlds = new World[queries];
     World[] worlds = new World[queries];
     try (Connection connection = database.getConnection();
     try (Connection connection = database.getConnection();
-         PreparedStatement statement = connection.prepareStatement(
+         final PreparedStatement statement = connection.prepareStatement(
              "SELECT * FROM World WHERE id = ?",
              "SELECT * FROM World WHERE id = ?",
              ResultSet.TYPE_FORWARD_ONLY,
              ResultSet.TYPE_FORWARD_ONLY,
              ResultSet.CONCUR_READ_ONLY)) {
              ResultSet.CONCUR_READ_ONLY)) {
+      Map<Integer, Future<World>> futureWorlds = new ConcurrentHashMap<>();
       for (int i = 0; i < queries; i++) {
       for (int i = 0; i < queries; i++) {
-        statement.setInt(1, Helper.randomWorld());
-        try (ResultSet resultSet = statement.executeQuery()) {
-          resultSet.next();
-          worlds[i] = new World(
-              resultSet.getInt("id"),
-              resultSet.getInt("randomNumber"));
-        }
+        futureWorlds.put(i, Helper.EXECUTOR.submit(new Callable<World>(){
+          @Override
+          public World call() throws Exception {
+            statement.setInt(1, Helper.randomWorld());
+            try (ResultSet resultSet = statement.executeQuery()) {
+              resultSet.next();
+              return new World(
+                resultSet.getInt("id"),
+                resultSet.getInt("randomNumber"));
+            }
+          }
+        }));
+      }
+
+      for (int i = 0; i < queries; i++) {
+        worlds[i] = futureWorlds.get(i).get();
       }
       }
     }
     }
     exchange.getResponseHeaders().put(
     exchange.getResponseHeaders().put(

+ 14 - 0
frameworks/Java/undertow/src/main/java/hello/Helper.java

@@ -9,7 +9,11 @@ import org.apache.commons.pool.impl.GenericObjectPool;
 
 
 import javax.sql.DataSource;
 import javax.sql.DataSource;
 import java.util.Deque;
 import java.util.Deque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 
 /**
 /**
  * Provides utility methods for the benchmark tests.
  * Provides utility methods for the benchmark tests.
@@ -79,4 +83,14 @@ final class Helper {
   static int randomWorld() {
   static int randomWorld() {
     return 1 + ThreadLocalRandom.current().nextInt(10000);
     return 1 + ThreadLocalRandom.current().nextInt(10000);
   }
   }
+
+  private static final int cpuCount = Runtime.getRuntime().availableProcessors();
+
+  // todo: parameterize multipliers
+  public static ExecutorService EXECUTOR =
+    new ThreadPoolExecutor(
+      cpuCount * 2, cpuCount * 25, 200, TimeUnit.MILLISECONDS,
+      new LinkedBlockingQueue<Runnable>(cpuCount * 100),
+      new ThreadPoolExecutor.CallerRunsPolicy());
+
 }
 }

+ 28 - 15
frameworks/Java/undertow/src/main/java/hello/UpdatesSqlHandler.java

@@ -9,7 +9,11 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 
 
 import static hello.HelloWebServer.JSON_UTF8;
 import static hello.HelloWebServer.JSON_UTF8;
 
 
@@ -34,26 +38,35 @@ final class UpdatesSqlHandler implements HttpHandler {
     int queries = Helper.getQueries(exchange);
     int queries = Helper.getQueries(exchange);
     World[] worlds = new World[queries];
     World[] worlds = new World[queries];
     try (Connection connection = database.getConnection();
     try (Connection connection = database.getConnection();
-         PreparedStatement query = connection.prepareStatement(
+         final PreparedStatement query = connection.prepareStatement(
              "SELECT * FROM World WHERE id = ?",
              "SELECT * FROM World WHERE id = ?",
              ResultSet.TYPE_FORWARD_ONLY,
              ResultSet.TYPE_FORWARD_ONLY,
              ResultSet.CONCUR_READ_ONLY);
              ResultSet.CONCUR_READ_ONLY);
-         PreparedStatement update = connection.prepareStatement(
+         final PreparedStatement update = connection.prepareStatement(
              "UPDATE World SET randomNumber = ? WHERE id= ?")) {
              "UPDATE World SET randomNumber = ? WHERE id= ?")) {
+      Map<Integer, Future<World>> futureWorlds = new ConcurrentHashMap<>();
       for (int i = 0; i < queries; i++) {
       for (int i = 0; i < queries; i++) {
-        query.setInt(1, Helper.randomWorld());
-        World world;
-        try (ResultSet resultSet = query.executeQuery()) {
-          resultSet.next();
-          world = new World(
-              resultSet.getInt("id"),
-              resultSet.getInt("randomNumber"));
-        }
-        world.randomNumber = Helper.randomWorld();
-        update.setInt(1, world.randomNumber);
-        update.setInt(2, world.id);
-        update.executeUpdate();
-        worlds[i] = world;
+        futureWorlds.put(i, Helper.EXECUTOR.submit(new Callable<World>() {
+          @Override
+          public World call() throws Exception {
+            query.setInt(1, Helper.randomWorld());
+            World world;
+            try (ResultSet resultSet = query.executeQuery()) {
+              resultSet.next();
+              world = new World(
+                resultSet.getInt("id"),
+                resultSet.getInt("randomNumber"));
+            }
+            world.randomNumber = Helper.randomWorld();
+            update.setInt(1, world.randomNumber);
+            update.setInt(2, world.id);
+            update.executeUpdate();
+            return world;
+          }
+        }));
+      }
+      for (int i = 0; i < queries; i++) {
+        worlds[i] = futureWorlds.get(i).get();
       }
       }
     }
     }
     exchange.getResponseHeaders().put(
     exchange.getResponseHeaders().put(