Browse Source

jooby: fix db.updates for pg-client (#5957)

- There were some dead locks while running batch updates. Solution: sort data set before sending batch
- move errored response to his own method
- fix netty classpath dependencies while using vertx.pg-client
Edgar Espina 5 years ago
parent
commit
a723d335af

+ 12 - 0
frameworks/Java/jooby/pom.xml

@@ -170,6 +170,12 @@
     <profile>
       <id>netty</id>
       <dependencies>
+        <dependency>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-common</artifactId>
+          <version>${netty.version}</version>
+        </dependency>
+
         <dependency>
           <groupId>io.netty</groupId>
           <artifactId>netty-buffer</artifactId>
@@ -188,6 +194,12 @@
           <version>${netty.version}</version>
         </dependency>
 
+        <dependency>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport</artifactId>
+          <version>${netty.version}</version>
+        </dependency>
+
         <dependency>
           <groupId>io.netty</groupId>
           <artifactId>netty-transport-native-epoll</artifactId>

+ 52 - 42
frameworks/Java/jooby/src/main/java/com/techempower/ReactivePg.java

@@ -1,18 +1,20 @@
 package com.techempower;
 
+import io.jooby.Context;
 import io.jooby.Jooby;
 import io.jooby.ServerOptions;
 import io.jooby.rocker.RockerModule;
 import io.vertx.pgclient.PgPool;
 import io.vertx.sqlclient.Row;
 import io.vertx.sqlclient.RowIterator;
+import io.vertx.sqlclient.SqlConnection;
 import io.vertx.sqlclient.Tuple;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.techempower.Util.randomWorld;
@@ -52,7 +54,7 @@ public class ReactivePg extends Jooby {
             ctx.sendError(rsp.cause());
           }
         } catch (IOException x) {
-          ctx.sendError(x);
+          sendError(ctx, x);
         }
       });
       return ctx;
@@ -62,7 +64,6 @@ public class ReactivePg extends Jooby {
     get("/queries", ctx -> {
       int queries = Util.queries(ctx);
       AtomicInteger counter = new AtomicInteger();
-      AtomicBoolean failed = new AtomicBoolean(false);
       World[] result = new World[queries];
       PgPool client = clients.next();
       for (int i = 0; i < result.length; i++) {
@@ -72,17 +73,15 @@ public class ReactivePg extends Jooby {
             Row row = rs.next();
             result[counter.get()] = new World(row.getInteger(0), row.getInteger(1));
           } else {
-            if (failed.compareAndSet(false, true)) {
-              ctx.sendError(rsp.cause());
-            }
+            sendError(ctx, rsp.cause());
           }
           // ready?
-          if (counter.incrementAndGet() == queries && !failed.get()) {
+          if (counter.incrementAndGet() == queries) {
             try {
               ctx.setResponseType(JSON)
                   .send(Json.encode(result));
             } catch (IOException x) {
-              ctx.sendError(x);
+              sendError(ctx, x);
             }
           }
         });
@@ -95,44 +94,49 @@ public class ReactivePg extends Jooby {
       int queries = Util.queries(ctx);
       World[] result = new World[queries];
       AtomicInteger counter = new AtomicInteger(0);
-      AtomicBoolean failed = new AtomicBoolean(false);
       PgPool pool = clients.next();
-      for (int i = 0; i < queries; i++) {
-        pool.preparedQuery(SELECT_WORLD).execute(Tuple.of(randomWorld()), query -> {
-          if (query.succeeded()) {
-            RowIterator<Row> rs = query.result().iterator();
-            Tuple row = rs.next();
-            World world = new World(row.getInteger(0), randomWorld());
-            result[counter.get()] = world;
-          } else {
-            if (failed.compareAndSet(false, true)) {
-              ctx.sendError(query.cause());
+      pool.getConnection(connectCallback -> {
+        if (connectCallback.failed()) {
+          sendError(ctx, connectCallback.cause());
+          return;
+        }
+        SqlConnection conn = connectCallback.result();
+        for (int i = 0; i < queries; i++) {
+          int id = randomWorld();
+          conn.preparedQuery(SELECT_WORLD).execute(Tuple.of(id), selectCallback -> {
+            if (selectCallback.failed()) {
+              conn.close();
+              sendError(ctx, selectCallback.cause());
               return;
             }
-          }
-
-          if (counter.incrementAndGet() == queries && !failed.get()) {
-            List<Tuple> batch = new ArrayList<>(queries);
-            for (World world : result) {
-              batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
-            }
-
-            pool.preparedQuery(UPDATE_WORLD)
-                .executeBatch(batch, update -> {
-                  if (update.failed()) {
-                    ctx.sendError(update.cause());
-                  } else {
-                    try {
-                      ctx.setResponseType(JSON)
-                          .send(Json.encode(result));
-                    } catch (IOException x) {
-                      ctx.sendError(x);
-                    }
+            result[counter.get()] = new World(
+                selectCallback.result().iterator().next().getInteger(0),
+                randomWorld());
+            if (counter.incrementAndGet() == queries) {
+              // Sort results... avoid dead locks
+              Arrays.sort(result);
+              List<Tuple> batch = new ArrayList<>(queries);
+              for (World world : result) {
+                batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
+              }
+
+              conn.preparedQuery(UPDATE_WORLD).executeBatch(batch, updateCallback -> {
+                conn.close();
+                if (updateCallback.failed()) {
+                  sendError(ctx, updateCallback.cause());
+                } else {
+                  try {
+                    ctx.setResponseType(JSON)
+                        .send(Json.encode(result));
+                  } catch (IOException x) {
+                    sendError(ctx, x);
                   }
-                });
-          }
-        });
-      }
+                }
+              });
+            }
+          });
+        }
+      });
       return ctx;
     });
 
@@ -162,6 +166,12 @@ public class ReactivePg extends Jooby {
     });
   }
 
+  private void sendError(Context ctx, Throwable cause) {
+    if (!ctx.isResponseStarted()) {
+      ctx.sendError(cause);
+    }
+  }
+
   public static void main(String[] args) {
     runApp(args, EVENT_LOOP, ReactivePg::new);
   }

+ 5 - 1
frameworks/Java/jooby/src/main/java/com/techempower/World.java

@@ -3,7 +3,7 @@ package com.techempower;
 import com.dslplatform.json.CompiledJson;
 
 @CompiledJson
-public class World {
+public class World implements Comparable<World> {
 
   private int id;
 
@@ -21,4 +21,8 @@ public class World {
   public int getRandomNumber() {
     return randomNumber;
   }
+
+  @Override public int compareTo(World o) {
+    return id - o.id;
+  }
 }