Browse Source

servlet3-cass: small improvements: shutdown ListeningExecutorService, allow callback registration on MessageDAO.update

marko asplund 11 years ago
parent
commit
36cbe80f75

+ 3 - 2
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/DatabaseBaseServlet.java

@@ -36,7 +36,7 @@ public abstract class DatabaseBaseServlet extends HttpServlet {
   @Override
   public void init(ServletConfig config) throws ServletException {
     dao = new MessageDAOCassImpl();
-    dao.init();
+    dao.init(executorService);
   }
 
   /**
@@ -84,7 +84,7 @@ public abstract class DatabaseBaseServlet extends HttpServlet {
     }
     if(q > 500)
       return 500;
-    if(q < 1)
+    else if(q < 1)
       return 1;
 
     return q;
@@ -100,5 +100,6 @@ public abstract class DatabaseBaseServlet extends HttpServlet {
   @Override
   public void destroy() {
     dao.destroy();
+    executorService.shutdown();
   }
 }

+ 2 - 2
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/DatabaseUpdatesServlet.java

@@ -36,7 +36,7 @@ public class DatabaseUpdatesServlet extends DatabaseBaseServlet {
     final AsyncContext asyncContext = req.startAsync();
     ListenableFuture<List<World>> readFuture = dao.read(generateRandomNumbers(queries,
       WORLD_LEAST_VALUE, WORLD_BOUND_VALUE+1));
-    final Future<List<Integer>> newRandomsFuture = generateRandomNumbersFuture(queries,
+    final ListenableFuture<List<Integer>> newRandomsFuture = generateRandomNumbersFuture(queries,
       WORLD_LEAST_VALUE, WORLD_BOUND_VALUE+1);
 
     Futures.addCallback(readFuture, new FutureCallback<List<World>>() {
@@ -74,7 +74,7 @@ public class DatabaseUpdatesServlet extends DatabaseBaseServlet {
 
   }
 
-  protected Future<List<Integer>> generateRandomNumbersFuture(final int count, final int least, final int bound) {
+  protected ListenableFuture<List<Integer>> generateRandomNumbersFuture(final int count, final int least, final int bound) {
     return executorService.submit(new Callable<List<Integer>>() {
       @Override
       public List<Integer> call() throws Exception {

+ 3 - 2
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/MessageDAO.java

@@ -1,13 +1,14 @@
 package fi.markoa.tfb.servlet3;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 
 import java.util.List;
 
 public interface MessageDAO {
-  void init();
+  void init(ListeningExecutorService executorService);
   ListenableFuture<World> read(int id);
   ListenableFuture<List<World>> read(List<Integer> ids);
-  void update(List<World> worlds);
+  ListenableFuture<Void> update(List<World> worlds);
   void destroy();
 }

+ 11 - 5
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/MessageDAOCassImpl.java

@@ -4,6 +4,7 @@ import com.datastax.driver.core.*;
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -24,7 +25,7 @@ public class MessageDAOCassImpl implements MessageDAO {
   private Map<String, PreparedStatement> statements;
 
   @Override
-  public void init() {
+  public void init(ListeningExecutorService executorService) {
     LOGGER.debug("init()");
 
     Properties conf;
@@ -59,8 +60,7 @@ public class MessageDAOCassImpl implements MessageDAO {
         return new World(id, r.getInt("randomnumber"));
       }
     };
-    ResultSetFuture rsf = session.executeAsync(statements.get("get_by_id").bind(id));
-    return Futures.transform(rsf, transformation);
+    return Futures.transform(session.executeAsync(statements.get("get_by_id").bind(id)), transformation);
   }
 
   public ListenableFuture<List<World>> read(List<Integer> ids) {
@@ -70,11 +70,17 @@ public class MessageDAOCassImpl implements MessageDAO {
     return Futures.allAsList(futures);
   }
 
-  public void update(List<World> worlds) {
+  public ListenableFuture<Void> update(List<World> worlds) {
+    Function<ResultSet, Void> transformation = new Function<ResultSet, Void>() {
+      @Override
+      public Void apply(ResultSet rows) {
+        return null;
+      }
+    };
     BatchStatement bs = new BatchStatement(BatchStatement.Type.UNLOGGED);
     for(World w : worlds)
       bs.add(statements.get("update_by_id").bind(w.getId(), w.getRandomNumber()));
-    session.execute(bs);
+    return Futures.transform(session.executeAsync(bs), transformation);
   }
 
   @Override