Browse Source

Minor vertx improvements / tuning after 3.6.0.CR1 update (#4202)

* Added custom rocker output that avoid an intermediary string

* Use iterator instead of foreach

* Log general info in test

* For updates get a pool of 4 connections as database processing has actually a cost that needs to be scaled to 4 connections
Julien Viet 6 years ago
parent
commit
b00458ce16

+ 43 - 27
frameworks/Java/vertx/src/main/java/vertx/App.java

@@ -1,5 +1,7 @@
 package vertx;
 
+import com.fizzed.rocker.ContentType;
+import com.fizzed.rocker.RockerOutputFactory;
 import io.reactiverse.pgclient.*;
 import io.vertx.core.*;
 import io.vertx.core.buffer.Buffer;
@@ -16,6 +18,7 @@ import io.vertx.core.logging.LoggerFactory;
 import vertx.model.Fortune;
 import vertx.model.Message;
 import vertx.model.World;
+import vertx.rocker.BufferRockerOutput;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -84,11 +87,14 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
   private HttpServer server;
 
   private PgClient client;
+  private PgPool pool;
 
   private CharSequence dateString;
 
   private CharSequence[] plaintextHeaders;
 
+  private final RockerOutputFactory<BufferRockerOutput> factory = BufferRockerOutput.factory(ContentType.RAW);
+
   public static CharSequence createDateHeader() {
     return HttpHeaders.createOptimized(DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now()));
   }
@@ -113,8 +119,8 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
     options.setUser(config.getString("username"));
     options.setPassword(config.getString("password"));
     options.setCachePreparedStatements(true);
-    options.setMaxSize(1);
-    client = PgClient.pool(vertx, options);
+    client = PgClient.pool(vertx, new PgPoolOptions(options).setMaxSize(1));
+    pool = PgClient.pool(vertx, new PgPoolOptions(options).setMaxSize(4));
   }
 
   @Override
@@ -250,32 +256,43 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
     }
 
     private void handle() {
-      for (int i = 0; i < worlds.length; i++) {
-        int id = randomWorld();
-        int index = i;
-        client.preparedQuery(SELECT_WORLD, Tuple.of(id), ar -> {
-          if (!failed) {
-            if (ar.failed()) {
-              failed = true;
-              sendError(ar.cause());
-              return;
-            }
-            worlds[index] = new World(ar.result().iterator().next().getInteger(0), randomWorld());
-            if (++queryCount == worlds.length) {
-              handleUpdates();
+
+      pool.getConnection(ar1 -> {
+        if (ar1.failed()) {
+          failed = true;
+          sendError(ar1.cause());
+          return;
+        }
+        PgConnection conn = ar1.result();
+        for (int i = 0; i < worlds.length; i++) {
+          int id = randomWorld();
+          int index = i;
+          conn.preparedQuery(SELECT_WORLD, Tuple.of(id), ar2 -> {
+            if (!failed) {
+              if (ar2.failed()) {
+                conn.close();
+                failed = true;
+                sendError(ar2.cause());
+                return;
+              }
+              worlds[index] = new World(ar2.result().iterator().next().getInteger(0), randomWorld());
+              if (++queryCount == worlds.length) {
+                handleUpdates(conn);
+              }
             }
-          }
-        });
-      }
+          });
+        }
+      });
     }
 
-    void handleUpdates() {
+    void handleUpdates(PgConnection conn) {
       Arrays.sort(worlds);
       List<Tuple> batch = new ArrayList<>();
       for (World world : worlds) {
         batch.add(Tuple.of(world.getRandomNumber(), world.getId()));
       }
-      client.preparedBatch(UPDATE_WORLD, batch, ar -> {
+      conn.preparedBatch(UPDATE_WORLD, batch, ar -> {
+        conn.close();
         if (ar.failed()) {
           sendError(ar.cause());
           return;
@@ -318,7 +335,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
             .putHeader(HttpHeaders.SERVER, SERVER)
             .putHeader(HttpHeaders.DATE, dateString)
             .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_HTML)
-            .end(FortunesTemplate.template(fortunes).render().toString());
+            .end(FortunesTemplate.template(fortunes).render(factory).buffer());
       } else {
         Throwable err = ar.cause();
         logger.error("", err);
@@ -329,16 +346,15 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
 
   public static void main(String[] args) throws Exception {
     JsonObject config = new JsonObject(new String(Files.readAllBytes(new File(args[0]).toPath())));
-    int procs = Runtime.getRuntime().availableProcessors();
     Vertx vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true));
     vertx.exceptionHandler(err -> {
       err.printStackTrace();
     });
     printConfig(vertx);
     vertx.deployVerticle(App.class.getName(),
-        new DeploymentOptions().setInstances(procs * 2).setConfig(config), event -> {
+        new DeploymentOptions().setInstances(VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE).setConfig(config), event -> {
           if (event.succeeded()) {
-            logger.debug("Your Vert.x application is started!");
+            logger.info("Server listening on port " + 8080);
           } else {
             logger.error("Unable to start your application", event.cause());
           }
@@ -366,8 +382,8 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
     } catch (IOException e) {
       logger.error("Could not read Vertx version", e);;
     }
-    logger.debug("Vertx: " + version);
-    logger.debug("Default Event Loop Size: " + VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE);
-    logger.debug("Native transport : " + nativeTransport);
+    logger.info("Vertx: " + version);
+    logger.info("Event Loop Size: " + VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE);
+    logger.info("Native transport : " + nativeTransport);
   }
 }

+ 68 - 0
frameworks/Java/vertx/src/main/java/vertx/rocker/BufferRockerOutput.java

@@ -0,0 +1,68 @@
+package vertx.rocker;
+
+import com.fizzed.rocker.ContentType;
+import com.fizzed.rocker.RockerOutput;
+import com.fizzed.rocker.RockerOutputFactory;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.net.impl.PartialPooledByteBufAllocator;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+public class BufferRockerOutput implements RockerOutput<BufferRockerOutput> {
+
+  public static RockerOutputFactory<BufferRockerOutput> factory(ContentType contentType) {
+    BufferRockerOutput output = new BufferRockerOutput(contentType);
+    return (_contentType, charsetName) -> {
+      output.reset();
+      return output;
+    };
+  }
+
+  private final ByteBuf buff = PartialPooledByteBufAllocator.UNPOOLED.directBuffer();
+  private final Buffer buffer = Buffer.buffer(buff);
+  private final ContentType contentType;
+
+  BufferRockerOutput(ContentType contentType) {
+    this.contentType = contentType;
+  }
+
+  private void reset() {
+    buff.resetReaderIndex();
+    buff.resetWriterIndex();
+  }
+
+  @Override
+  public BufferRockerOutput w(byte[] bytes) throws IOException {
+    buffer.appendBytes(bytes);
+    return this;
+  }
+
+  @Override
+  public BufferRockerOutput w(String s) throws IOException {
+    buffer.appendString(s);
+    return this;
+  }
+
+  @Override
+  public ContentType getContentType() {
+    return contentType;
+  }
+
+  @Override
+  public Charset getCharset() {
+    return StandardCharsets.UTF_8;
+  }
+
+  @Override
+  public int getByteLength() {
+    return buffer.length();
+  }
+
+  public Buffer buffer() {
+    return buffer;
+  }
+}

+ 1 - 1
frameworks/Java/vertx/src/main/templates/vertx/FortunesTemplate.rocker.html

@@ -9,7 +9,7 @@
   <tr>
     <th>id</th>
     <th>message</th>
-  </tr> @for (fortune : fortunes) {
+  </tr> @for ((ForIterator i, Fortune fortune) : fortunes) {
   <tr>
     <td>@fortune.getId()</td>
     <td>@fortune.getMessage()</td>