Browse Source

servlet3-cass: implement test type 5 + refactor

marko asplund 11 years ago
parent
commit
9f643151e9

+ 100 - 0
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/DatabaseBaseServlet.java

@@ -0,0 +1,100 @@
+package fi.markoa.tfb.servlet3;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Base class for Web Framework Benchmarks database test type implementations.
+ *
+ * @author marko asplund
+ */
+public abstract class DatabaseBaseServlet extends HttpServlet {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseBaseServlet.class);
+  protected static final ObjectMapper mapper = new ObjectMapper();
+  protected static final String MEDIATYPE_APPLICATION_JSON = "application/json";
+  protected static final int WORLD_LEAST_VALUE = 1;
+  protected static final int WORLD_BOUND_VALUE = 10000;
+
+  protected static final ListeningExecutorService executorService =
+    MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  protected MessageDAOCassImpl dao;
+
+  @Override
+  public void init(ServletConfig config) throws ServletException {
+    dao = new MessageDAOCassImpl();
+    dao.init();
+  }
+
+  /**
+   * callback for sending the response back to the client
+   *
+   * @param asyncContext Servlet asynchronous context
+   * @param future ListenableFuture holding the backend response
+   * @param executor ExecutorService instance for executing the ListenableFuture
+   */
+  protected void addResponseCallback(final AsyncContext asyncContext, ListenableFuture<?> future, Executor executor) {
+    Futures.addCallback(future, new FutureCallback<Object>() {
+      @Override
+      public void onSuccess(Object world) {
+        try {
+          mapper.writeValue(asyncContext.getResponse().getOutputStream(), world);
+        } catch (IOException ex) {
+          LOGGER.error("failed to get output stream", ex);
+          throw new RuntimeException("failed to get output stream", ex);
+        }
+        asyncContext.complete();
+      }
+
+      @Override
+      public void onFailure(Throwable th) {
+        // TODO
+        LOGGER.error("failed to get data, "+th);
+        asyncContext.complete();
+        throw new RuntimeException(th);
+      }
+    }, executor);
+  }
+
+  protected int getQueries(String queries) {
+    int q;
+    if(queries == null) {
+      return 1;
+    }
+    try {
+      q = Integer.parseInt(queries);
+    } catch (NumberFormatException ex) {
+      return 1;
+    }
+    if(q > 500)
+      return 500;
+    if(q < 1)
+      return 1;
+
+    return q;
+  }
+
+  protected List<Integer> generateRandomNumbers(int count, int least, int bound) {
+    List<Integer> ids = new ArrayList<>();
+    for(int cnt = 0; cnt < count; cnt++)
+      ids.add(ThreadLocalRandom.current().nextInt(least, bound));
+    return ids;
+  }
+
+  @Override
+  public void destroy() {
+    dao.destroy();
+  }
+}

+ 33 - 0
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/DatabaseQueriesServlet.java

@@ -0,0 +1,33 @@
+package fi.markoa.tfb.servlet3;
+
+import com.google.common.util.concurrent.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletException;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+/**
+ * Web Framework Benchmarks
+ * Test type 3: Multiple database queries
+ *
+ * @author marko asplund
+ */
+@WebServlet(urlPatterns={"/queries"}, asyncSupported=true)
+public class DatabaseQueriesServlet extends DatabaseBaseServlet {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseQueriesServlet.class);
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+    resp.setContentType(MEDIATYPE_APPLICATION_JSON);
+    final AsyncContext asyncContext = req.startAsync();
+    ListenableFuture<?> future = dao.read(generateRandomNumbers(getQueries(req.getParameter("queries")),
+      WORLD_LEAST_VALUE, WORLD_BOUND_VALUE+1));
+    addResponseCallback(asyncContext, future, executorService);
+  }
+
+}

+ 6 - 77
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/DatabaseQueryServlet.java

@@ -1,105 +1,34 @@
 package fi.markoa.tfb.servlet3;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.servlet.AsyncContext;
-import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
 import javax.servlet.annotation.WebServlet;
-import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 
 /**
  * Web Framework Benchmarks
  * Test type 2: Single database query
- * Test type 3: Multiple database queries
  *
  * @author marko asplund
  */
-@WebServlet(urlPatterns={"/db", "/queries"}, asyncSupported=true)
-public class DatabaseQueryServlet extends HttpServlet {
+@WebServlet(urlPatterns={"/db"}, asyncSupported=true)
+public class DatabaseQueryServlet extends DatabaseBaseServlet {
   private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseQueryServlet.class);
-  private static final ObjectMapper mapper = new ObjectMapper();
-  private static final String MEDIATYPE_APPLICATION_JSON = "application/json";
-
-  private static final ListeningExecutorService executorService =
-    MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
-  private MessageDAOCassImpl dao;
-
-  @Override
-  public void init(ServletConfig config) throws ServletException {
-    dao = new MessageDAOCassImpl();
-    dao.init();
-  }
 
   @Override
   protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
     resp.setContentType(MEDIATYPE_APPLICATION_JSON);
-    final AsyncContext asyncContext = req.startAsync();
-    ListenableFuture<?> future;
-    if("/queries".equals(req.getServletPath())) {
-      List<Integer> ids = new ArrayList<>();
-      int queries = getQueries(req.getParameter("queries"));
-      for(int cnt = 0; cnt < queries; cnt++)
-        ids.add(ThreadLocalRandom.current().nextInt(1, 10001));
-      future = dao.read(ids);
-    } else {
-      int randId = ThreadLocalRandom.current().nextInt(1, 10001);
-      future = dao.read(randId);
-    }
-
-    Futures.addCallback(future, new FutureCallback<Object>() {
-      @Override
-      public void onSuccess(Object world) {
-        try {
-          mapper.writeValue(asyncContext.getResponse().getOutputStream(), world);
-        } catch (IOException ex) {
-          LOGGER.error("failed to get output stream", ex);
-          throw new RuntimeException("failed to get output stream", ex);
-        }
-        asyncContext.complete();
-      }
-
-      @Override
-      public void onFailure(Throwable th) {
-        // TODO
-        LOGGER.error("failed to get data, "+th);
-        asyncContext.complete();
-        throw new RuntimeException(th);
-      }
-    }, executorService);
-
-  }
-
-  private int getQueries(String queries) {
-    int q;
-    if(queries == null) {
-      return 1;
-    }
-    try {
-      q = Integer.parseInt(queries);
-    } catch (NumberFormatException ex) {
-      return 1;
-    }
-    if(q > 500)
-      return 500;
-    if(q < 1)
-      return 1;
-
-    return q;
+    AsyncContext asyncContext = req.startAsync();
+    int randId = ThreadLocalRandom.current().nextInt(WORLD_LEAST_VALUE, WORLD_BOUND_VALUE+1);
+    ListenableFuture<?> future = dao.read(randId);
+    addResponseCallback(asyncContext, future, executorService);
   }
 
-  @Override
-  public void destroy() {
-    dao.destroy();
-  }
 }

+ 85 - 0
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/DatabaseUpdatesServlet.java

@@ -0,0 +1,85 @@
+package fi.markoa.tfb.servlet3;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletException;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+/**
+ * Web Framework Benchmarks
+ * Test type 5: Database updates
+ *
+ * @author marko asplund
+ */
+
+@WebServlet(urlPatterns={"/updates"}, asyncSupported=true)
+public class DatabaseUpdatesServlet extends DatabaseBaseServlet {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseUpdatesServlet.class);
+
+  @Override
+  protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+    resp.setContentType(MEDIATYPE_APPLICATION_JSON);
+    final int queries = getQueries(req.getParameter("queries"));
+    final AsyncContext asyncContext = req.startAsync();
+    ListenableFuture<List<World>> readFuture = dao.read(generateRandomNumbers(queries,
+      WORLD_LEAST_VALUE, WORLD_BOUND_VALUE+1));
+    final Future<List<Integer>> newRandomsFuture = generateRandomNumbersFuture(queries,
+      WORLD_LEAST_VALUE, WORLD_BOUND_VALUE+1);
+
+    Futures.addCallback(readFuture, new FutureCallback<List<World>>() {
+      @Override
+      public void onSuccess(List<World> worlds) {
+        List<Integer> newRandoms;
+        try {
+          newRandoms = newRandomsFuture.get();
+        } catch (InterruptedException | ExecutionException ex) {
+          LOGGER.error("failed to generate random numbers", ex);
+          throw new RuntimeException("failed to generate random numbers", ex);
+        }
+        List<World> newWorlds = new ArrayList<>();
+        for(int i = 0; i < worlds.size(); i++)
+          newWorlds.add(new World(worlds.get(i).getId(), newRandoms.get(i)));
+        dao.update(newWorlds);
+
+        try {
+          mapper.writeValue(asyncContext.getResponse().getOutputStream(), newWorlds);
+        } catch (IOException ex) {
+          LOGGER.error("failed to get output stream", ex);
+          throw new RuntimeException("failed to get output stream", ex);
+        }
+        asyncContext.complete();
+
+        LOGGER.debug("update done");
+      }
+
+      @Override
+      public void onFailure(Throwable th) {
+        LOGGER.error("update failed", th);
+      }
+    }, executorService);
+
+  }
+
+  protected Future<List<Integer>> generateRandomNumbersFuture(final int count, final int least, final int bound) {
+    return executorService.submit(new Callable<List<Integer>>() {
+      @Override
+      public List<Integer> call() throws Exception {
+        return generateRandomNumbers(count, least, bound);
+      }
+    });
+  }
+
+}

+ 1 - 1
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/MessageDAO.java

@@ -3,11 +3,11 @@ package fi.markoa.tfb.servlet3;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.List;
-import java.util.Properties;
 
 public interface MessageDAO {
   void init();
   ListenableFuture<World> read(int id);
   ListenableFuture<List<World>> read(List<Integer> ids);
+  void update(List<World> worlds);
   void destroy();
 }

+ 11 - 3
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/MessageDAOCassImpl.java

@@ -45,17 +45,18 @@ public class MessageDAOCassImpl implements MessageDAO {
     session = cluster.connect(conf.getProperty("cassandra.keyspace"));
 
     Map<String, PreparedStatement> stmts = new HashMap<>();
-    stmts.put("get_by_id", session.prepare("SELECT id, randomnumber FROM world WHERE id = ?"));
+    stmts.put("get_by_id", session.prepare("SELECT randomnumber FROM world WHERE id=?"));
+    stmts.put("update_by_id", session.prepare("UPDATE world SET randomnumber=? WHERE id=?"));
     statements = Collections.unmodifiableMap(stmts);
   }
 
   @Override
-  public ListenableFuture<World> read(int id) {
+  public ListenableFuture<World> read(final int id) {
     Function<ResultSet, World> transformation = new Function<ResultSet, World>() {
       @Override
       public World apply(ResultSet results) {
         Row r = results.one();
-        return new World(r.getInt("id"), r.getInt("randomnumber"));
+        return new World(id, r.getInt("randomnumber"));
       }
     };
     ResultSetFuture rsf = session.executeAsync(statements.get("get_by_id").bind(id));
@@ -69,6 +70,13 @@ public class MessageDAOCassImpl implements MessageDAO {
     return Futures.allAsList(futures);
   }
 
+  public void update(List<World> worlds) {
+    BatchStatement bs = new BatchStatement(BatchStatement.Type.UNLOGGED);
+    for(World w : worlds)
+      bs.add(statements.get("update_by_id").bind(w.getId(), w.getRandomNumber()));
+    session.execute(bs);
+  }
+
   @Override
   public void destroy() {
     LOGGER.debug("destroy()");

+ 8 - 0
servlet3-cass/src/main/java/fi/markoa/tfb/servlet3/World.java

@@ -13,6 +13,14 @@ public class World {
     return id;
   }
 
+  @Override
+  public String toString() {
+    return "World{" +
+      "id=" + id +
+      ", randomNumber=" + randomNumber +
+      '}';
+  }
+
   public int getRandomNumber() {
     return randomNumber;
   }