Просмотр исходного кода

Update to Vert.x 4.4.2 with IO uring available on linux + improvements (#8208)

* Update to Vert.x 4.4.2 with IO uring  available on linux

* Cached prepared statement update can be racy with server execution, we should make sure that prepared statement are set before starting the server

* When a database is not present it should not prevent the HTTP server to run
Julien Viet 2 лет назад
Родитель
Сommit
057c25b234
2 измененных файлов с 105 добавлено и 62 удалено
  1. 29 3
      frameworks/Java/vertx/pom.xml
  2. 76 59
      frameworks/Java/vertx/src/main/java/vertx/App.java

+ 29 - 3
frameworks/Java/vertx/pom.xml

@@ -10,9 +10,10 @@
 		<maven.compiler.target>17</maven.compiler.target>
 		<!-- the main class -->
 		<main.class>vertx.App</main.class>
-		<stack.version>4.3.8</stack.version>
-		<jackson.version>2.14.2</jackson.version>
-		<netty.version>4.1.89.Final</netty.version>
+		<stack.version>4.4.2</stack.version>
+		<jackson.version>2.15.0</jackson.version>
+		<netty.version>4.1.92.Final</netty.version>
+		<netty.io_uring.version>0.0.21.Final</netty.io_uring.version>
 	</properties>
 
 	<dependencies>
@@ -132,4 +133,29 @@
 
 	</build>
 
+	<profiles>
+		<profile>
+			<id>Linux</id>
+			<activation>
+				<activeByDefault>false</activeByDefault>
+				<os>
+					<family>unix</family>
+				</os>
+			</activation>
+			<dependencies>
+				<dependency>
+					<groupId>io.vertx</groupId>
+					<artifactId>vertx-io_uring-incubator</artifactId>
+					<version>${stack.version}</version>
+				</dependency>
+				<dependency>
+					<groupId>io.netty.incubator</groupId>
+					<artifactId>netty-incubator-transport-native-io_uring</artifactId>
+					<version>${netty.io_uring.version}</version>
+					<classifier>linux-x86_64</classifier>
+				</dependency>
+			</dependencies>
+		</profile>
+	</profiles>
+
 </project>

+ 76 - 59
frameworks/Java/vertx/src/main/java/vertx/App.java

@@ -3,6 +3,7 @@ package vertx;
 import com.fizzed.rocker.ContentType;
 import com.fizzed.rocker.RockerOutputFactory;
 import io.netty.util.concurrent.MultithreadEventExecutorGroup;
+import io.vertx.core.impl.VertxInternal;
 import io.vertx.pgclient.*;
 import io.vertx.core.*;
 import io.vertx.core.buffer.Buffer;
@@ -101,15 +102,13 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
   private static final String SELECT_WORLDS = "SELECT id, randomnumber from WORLD";
 
   private HttpServer server;
-
   private SqlClientInternal client;
-
   private CharSequence dateString;
-
   private CharSequence[] plaintextHeaders;
 
   private final RockerOutputFactory<BufferRockerOutput> factory = BufferRockerOutput.factory(ContentType.RAW);
 
+  private Throwable databaseErr;
   private PreparedQuery<RowSet<Row>> SELECT_WORLD_QUERY;
   private PreparedQuery<RowSet<Row>> SELECT_FORTUNE_QUERY;
   private PreparedQuery<RowSet<Row>> UPDATE_WORLD_QUERY;
@@ -122,8 +121,8 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
   @Override
   public void start(Promise<Void> startPromise) throws Exception {
     int port = 8080;
-    server = vertx.createHttpServer(new HttpServerOptions());
-    server.requestHandler(App.this).listen(port);
+    server = vertx.createHttpServer(new HttpServerOptions())
+            .requestHandler(App.this);
     dateString = createDateHeader();
     plaintextHeaders = new CharSequence[] {
         HEADER_CONTENT_TYPE, RESPONSE_TYPE_PLAIN,
@@ -140,50 +139,70 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
     options.setPassword(config.getString("password", "benchmarkdbpass"));
     options.setCachePreparedStatements(true);
     options.setPipeliningLimit(100_000); // Large pipelining means less flushing and we use a single connection anyway
-    PgConnection.connect(vertx, options).flatMap(conn -> {
-      client = (SqlClientInternal)conn;
-      Future<PreparedStatement> f1 = conn.prepare(SELECT_WORLD);
-      Future<PreparedStatement> f2 = conn.prepare(SELECT_FORTUNE);
-      Future<PreparedStatement> f3 = conn.prepare(UPDATE_WORLD);
-      Future<WorldCache> f4 = conn.preparedQuery(SELECT_WORLDS)
-          .collecting(Collectors.mapping(row -> new CachedWorld(row.getInteger(0), row.getInteger(1)), Collectors.toList()))
-          .execute().map(worlds -> new WorldCache(worlds.value()));
-      f1.onSuccess(ps -> SELECT_WORLD_QUERY = ps.query());
-      f2.onSuccess(ps -> SELECT_FORTUNE_QUERY = ps.query());
-      f3.onSuccess(ps -> UPDATE_WORLD_QUERY = ps.query());
-      f4.onSuccess(wc -> WORLD_CACHE = wc);
-      return CompositeFuture.all(f1, f2, f3, f4);
-    }).onComplete(ar -> startPromise.complete());
+    PgConnection.connect(vertx, options)
+            .flatMap(conn -> {
+              client = (SqlClientInternal) conn;
+              Future<PreparedStatement> f1 = conn.prepare(SELECT_WORLD)
+                      .andThen(onSuccess(ps -> SELECT_WORLD_QUERY = ps.query()));
+              Future<PreparedStatement> f2 = conn.prepare(SELECT_FORTUNE)
+                      .andThen(onSuccess(ps -> SELECT_FORTUNE_QUERY = ps.query()));
+              Future<PreparedStatement> f3 = conn.prepare(UPDATE_WORLD)
+                      .andThen(onSuccess(ps -> UPDATE_WORLD_QUERY = ps.query()));
+              Future<WorldCache> f4 = conn.preparedQuery(SELECT_WORLDS)
+                      .collecting(Collectors.mapping(row -> new CachedWorld(row.getInteger(0), row.getInteger(1)), Collectors.toList()))
+                      .execute()
+                      .map(worlds -> new WorldCache(worlds.value()))
+                      .andThen(onSuccess(wc -> WORLD_CACHE = wc));
+              return CompositeFuture.join(f1, f2, f3, f4);
+            })
+            .transform(ar -> {
+              databaseErr = ar.cause();
+              return server.listen(port);
+            })
+            .<Void>mapEmpty()
+            .onComplete(startPromise);
+  }
+
+  private static <T> Handler<AsyncResult<T>> onSuccess(Handler<T> handler) {
+    return ar -> {
+      if (ar.succeeded()) {
+        handler.handle(ar.result());
+      }
+    };
   }
 
   @Override
   public void handle(HttpServerRequest request) {
-    switch (request.path()) {
-      case PATH_PLAINTEXT:
-        handlePlainText(request);
-        break;
-      case PATH_JSON:
-        handleJson(request);
-        break;
-      case PATH_DB:
-        handleDb(request);
-        break;
-      case PATH_QUERIES:
-        new Queries(request).handle();
-        break;
-      case PATH_UPDATES:
-        new Update(request).handle();
-        break;
-      case PATH_FORTUNES:
-        handleFortunes(request);
-        break;
-      case PATH_CACHING:
-        handleCaching(request);
-        break;
-      default:
-        request.response().setStatusCode(404);
-        request.response().end();
-        break;
+    try {
+      switch (request.path()) {
+        case PATH_PLAINTEXT:
+          handlePlainText(request);
+          break;
+        case PATH_JSON:
+          handleJson(request);
+          break;
+        case PATH_DB:
+          handleDb(request);
+          break;
+        case PATH_QUERIES:
+          new Queries(request).handle();
+          break;
+        case PATH_UPDATES:
+          new Update(request).handle();
+          break;
+        case PATH_FORTUNES:
+          handleFortunes(request);
+          break;
+        case PATH_CACHING:
+          handleCaching(request);
+          break;
+        default:
+          request.response().setStatusCode(404);
+          request.response().end();
+          break;
+      }
+    } catch (Exception e) {
+      sendError(request, e);
     }
   }
 
@@ -192,6 +211,11 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
     if (server != null) server.close();
   }
 
+  private void sendError(HttpServerRequest req, Throwable cause) {
+    logger.error(cause.getMessage(), cause);
+    req.response().setStatusCode(500).end();
+  }
+
   private void handlePlainText(HttpServerRequest request) {
     HttpServerResponse response = request.response();
     MultiMap headers = response.headers();
@@ -237,13 +261,11 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
             .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_JSON)
             .end(Json.encode(new World(row.getInteger(0), row.getInteger(1))), NULL_HANDLER);
       } else {
-        logger.error(res.cause());
-        resp.setStatusCode(500).end(res.cause().getMessage());
+        sendError(req, res.cause());
       }
     });
   }
 
-
   class Queries implements Handler<AsyncResult<RowSet<Row>>> {
 
     boolean failed;
@@ -271,7 +293,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
       if (!failed) {
         if (ar.failed()) {
           failed = true;
-          resp.setStatusCode(500).end(ar.cause().getMessage());
+          sendError(req, ar.cause());
           return;
         }
 
@@ -315,7 +337,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
             if (!failed) {
               if (ar2.failed()) {
                 failed = true;
-                sendError(ar2.cause());
+                sendError(req, ar2.cause());
                 return;
               }
               worlds[index] = new World(ar2.result().iterator().next().getInteger(0), randomWorld());
@@ -336,7 +358,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
       }
       UPDATE_WORLD_QUERY.executeBatch(batch, ar2 -> {
         if (ar2.failed()) {
-          sendError(ar2.cause());
+          sendError(req, ar2.cause());
           return;
         }
         JsonArray json = new JsonArray();
@@ -350,11 +372,6 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
             .end(json.toBuffer(), NULL_HANDLER);
       });
     }
-
-    void sendError(Throwable err) {
-      logger.error("", err);
-      req.response().setStatusCode(500).end(err.getMessage());
-    }
   }
 
   private void handleFortunes(HttpServerRequest req) {
@@ -379,9 +396,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
             .putHeader(HttpHeaders.CONTENT_TYPE, RESPONSE_TYPE_HTML)
             .end(FortunesTemplate.template(fortunes).render(factory).buffer(), NULL_HANDLER);
       } else {
-        Throwable err = ar.cause();
-        logger.error("", err);
-        response.setStatusCode(500).end(err.getMessage());
+        sendError(req, ar.cause());
       }
     });
   }
@@ -441,6 +456,7 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
 
   private static void printConfig(Vertx vertx) {
     boolean nativeTransport = vertx.isNativeTransportEnabled();
+    String transport = ((VertxInternal) vertx).transport().getClass().getSimpleName();
     String version = "unknown";
     try {
       InputStream in = Vertx.class.getClassLoader().getResourceAsStream("META-INF/vertx/vertx-version.txt");
@@ -463,5 +479,6 @@ public class App extends AbstractVerticle implements Handler<HttpServerRequest>
     logger.info("Vertx: " + version);
     logger.info("Event Loop Size: " + ((MultithreadEventExecutorGroup)vertx.nettyEventLoopGroup()).executorCount());
     logger.info("Native transport : " + nativeTransport);
+    logger.info("Transport : " + transport);
   }
 }