Browse Source

Upgrade to JDK 24, Netty 4.2 and Virtual Thread Experimental support (#9800)

* Upgrade to JDK 24, Netty 4.2 and Virtual Thread Experimental support

* Improving Virtual Thread performance
Francesco Nigro 4 months ago
parent
commit
6443b71f75

+ 3 - 0
frameworks/Java/netty/.sdkmanrc

@@ -0,0 +1,3 @@
+# Enable auto-env through the sdkman_auto_env config
+# Add key=value pairs of SDKs to use below
+java=24-oracle

+ 19 - 0
frameworks/Java/netty/benchmark_config.json

@@ -19,6 +19,25 @@
       "display_name": "netty",
       "notes": "",
       "versus": "netty"
+    },
+    "loom": {
+      "json_url": "/json",
+      "plaintext_url": "/plaintext",
+      "port": 8080,
+      "approach": "Realistic",
+      "classification": "Platform",
+      "database": "None",
+      "framework": "netty",
+      "language": "Java",
+      "flavor": "None",
+      "orm": "Raw",
+      "platform": "Netty",
+      "webserver": "None",
+      "os": "Linux",
+      "database_os": "Linux",
+      "display_name": "netty",
+      "notes": "",
+      "versus": "netty"
     }
   }]
 }

+ 13 - 0
frameworks/Java/netty/config.toml

@@ -13,3 +13,16 @@ orm = "Raw"
 platform = "Netty"
 webserver = "None"
 versus = "netty"
+
+[loom]
+urls.plaintext = "/plaintext"
+urls.json = "/json"
+approach = "Realistic"
+classification = "Platform"
+database = "None"
+database_os = "Linux"
+os = "Linux"
+orm = "Raw"
+platform = "Netty"
+webserver = "None"
+versus = "netty"

+ 15 - 0
frameworks/Java/netty/netty-loom.dockerfile

@@ -0,0 +1,15 @@
+FROM maven:3.9.9-eclipse-temurin-24-noble as maven
+WORKDIR /netty
+COPY pom.xml pom.xml
+COPY src src
+RUN mvn compile assembly:single -q
+
+FROM maven:3.9.9-eclipse-temurin-24-noble
+WORKDIR /netty
+COPY --from=maven /netty/target/app.jar app.jar
+COPY run_netty_loom.sh run_netty_loom.sh
+
+EXPOSE 8080
+# see https://github.com/netty/netty/issues/14942
+# remember to run this with --privileged since https://github.com/TechEmpower/FrameworkBenchmarks/blob/c94f7f95bd751f86a57dea8b63fb8f336bdbbde3/toolset/utils/docker_helper.py#L239 does it
+ENTRYPOINT "./run_netty_loom.sh"

+ 7 - 5
frameworks/Java/netty/netty.dockerfile

@@ -1,13 +1,15 @@
-FROM maven:3.6.1-jdk-11-slim as maven
+FROM maven:3.9.9-eclipse-temurin-24-noble as maven
 WORKDIR /netty
 COPY pom.xml pom.xml
 COPY src src
 RUN mvn compile assembly:single -q
 
-FROM openjdk:11.0.3-jdk-slim
+FROM maven:3.9.9-eclipse-temurin-24-noble
 WORKDIR /netty
-COPY --from=maven /netty/target/netty-example-0.1-jar-with-dependencies.jar app.jar
+COPY --from=maven /netty/target/app.jar app.jar
+COPY run_netty.sh run_netty.sh
 
 EXPOSE 8080
-
-CMD ["java", "-server", "-XX:+UseNUMA", "-XX:+UseParallelGC", "-XX:+AggressiveOpts", "-Dio.netty.buffer.checkBounds=false", "-Dio.netty.buffer.checkAccessible=false", "-Dio.netty.iouring.iosqeAsyncThreshold=32000", "-jar", "app.jar"]
+# see https://github.com/netty/netty/issues/14942
+# remember to run this with --privileged since https://github.com/TechEmpower/FrameworkBenchmarks/blob/c94f7f95bd751f86a57dea8b63fb8f336bdbbde3/toolset/utils/docker_helper.py#L239 does it
+ENTRYPOINT "./run_netty.sh"

+ 9 - 8
frameworks/Java/netty/pom.xml

@@ -9,10 +9,9 @@
 	<version>0.1</version>
 
 	<properties>
-		<maven.compiler.source>11</maven.compiler.source>
-		<maven.compiler.target>11</maven.compiler.target>
-		<netty.version>4.1.108.Final</netty.version>
-		<io_uring.version>0.0.21.Final</io_uring.version>
+		<maven.compiler.source>24</maven.compiler.source>
+		<maven.compiler.target>24</maven.compiler.target>
+		<netty.version>4.2.0.Final</netty.version>
 	</properties>
 
 	<packaging>jar</packaging>
@@ -21,7 +20,7 @@
 
 		<dependency>
 			<groupId>io.netty</groupId>
-			<artifactId>netty-codec-http</artifactId>
+			<artifactId>netty-all</artifactId>
 			<version>${netty.version}</version>
 		</dependency>
 
@@ -40,9 +39,9 @@
 		</dependency>
 
 		<dependency>
-			<groupId>io.netty.incubator</groupId>
-			<artifactId>netty-incubator-transport-native-io_uring</artifactId>
-			<version>${io_uring.version}</version>
+			<groupId>io.netty</groupId>
+			<artifactId>netty-transport-native-io_uring</artifactId>
+			<version>${netty.version}</version>
 			<classifier>linux-x86_64</classifier>
 		</dependency>
 
@@ -74,6 +73,7 @@
 			<plugin>
 				<artifactId>maven-assembly-plugin</artifactId>
 				<configuration>
+					<finalName>app</finalName>
 					<archive>
 						<manifest>
 							<mainClass>hello.HelloWebServer</mainClass>
@@ -82,6 +82,7 @@
 					<descriptorRefs>
 						<descriptorRef>jar-with-dependencies</descriptorRef>
 					</descriptorRefs>
+					<appendAssemblyId>false</appendAssemblyId>
 				</configuration>
 				<executions>
 					<execution>

+ 14 - 0
frameworks/Java/netty/run_netty.sh

@@ -0,0 +1,14 @@
+#!/bin/bash
+
+# PROFILING: -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints
+JAVA_OPTIONS="--enable-native-access=ALL-UNNAMED \
+  -Dio.netty.noUnsafe=false \
+  --sun-misc-unsafe-memory-access=allow \
+  --add-opens=java.base/java.lang=ALL-UNNAMED \
+  -XX:+UseNUMA \
+  -XX:+UseParallelGC \
+  -Dio.netty.buffer.checkBounds=false \
+  -Dio.netty.buffer.checkAccessible=false \
+  $@"
+
+java $JAVA_OPTIONS -jar app.jar

+ 18 - 0
frameworks/Java/netty/run_netty_loom.sh

@@ -0,0 +1,18 @@
+#!/bin/bash
+
+# PROFILING: -XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints
+JAVA_OPTIONS="--enable-native-access=ALL-UNNAMED \
+  -Dio.netty.noUnsafe=false \
+  --sun-misc-unsafe-memory-access=allow \
+  --add-opens=java.base/java.lang=ALL-UNNAMED \
+  -XX:+UseNUMA \
+  -XX:+UseParallelGC \
+  -Dio.netty.buffer.checkBounds=false \
+  -Dio.netty.buffer.checkAccessible=false \
+  -Dhello.eventloop.carrier=true \
+  -XX:+UnlockExperimentalVMOptions \
+  -XX:-DoJVMTIVirtualThreadTransitions \
+  -Djdk.trackAllThreads=false \
+  $@"
+
+java $JAVA_OPTIONS -jar app.jar

+ 24 - 0
frameworks/Java/netty/src/main/java/hello/Constants.java

@@ -0,0 +1,24 @@
+package hello;
+
+import io.netty.util.AsciiString;
+import io.netty.util.CharsetUtil;
+
+public class Constants {
+
+   public static final byte[] STATIC_PLAINTEXT = "Hello, World!".getBytes(CharsetUtil.UTF_8);
+   public static final int STATIC_PLAINTEXT_LEN = STATIC_PLAINTEXT.length;
+
+   public static final CharSequence PLAINTEXT_CLHEADER_VALUE = AsciiString.cached(String.valueOf(STATIC_PLAINTEXT_LEN));
+   public static final int JSON_LEN = jsonLen();
+   public static final CharSequence JSON_CLHEADER_VALUE = AsciiString.cached(String.valueOf(JSON_LEN));
+   public static final CharSequence SERVER_NAME = AsciiString.cached("Netty");
+
+   private static int jsonLen() {
+      return JsonUtils.serializeMsg(newMsg()).length;
+   }
+
+   public static Message newMsg() {
+      return new Message("Hello, World!");
+   }
+
+}

+ 29 - 70
frameworks/Java/netty/src/main/java/hello/HelloServerHandler.java

@@ -1,28 +1,20 @@
 package hello;
 
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
-import static io.netty.handler.codec.http.HttpHeaderNames.DATE;
-import static io.netty.handler.codec.http.HttpHeaderNames.SERVER;
-import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON;
-import static io.netty.handler.codec.http.HttpHeaderValues.TEXT_PLAIN;
+import static hello.HttpResponses.makeJsonResponse;
+import static hello.HttpResponses.makePlaintextResponse;
+import static hello.JsonUtils.acquireJsonStreamFromEventLoop;
+import static hello.JsonUtils.releaseJsonStreamFromEventLoop;
 import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
 import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
 
-import java.io.IOException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import com.jsoniter.output.JsonStream;
-import com.jsoniter.output.JsonStreamPool;
-import com.jsoniter.spi.JsonException;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
@@ -33,51 +25,21 @@ import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpRequest;
 import io.netty.handler.codec.http.LastHttpContent;
 import io.netty.util.AsciiString;
-import io.netty.util.CharsetUtil;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.FastThreadLocal;
 
 public class HelloServerHandler extends ChannelInboundHandlerAdapter {
 
-	private static final FastThreadLocal<DateFormat> FORMAT = new FastThreadLocal<DateFormat>() {
-		@Override
-		protected DateFormat initialValue() {
-			return new SimpleDateFormat("E, dd MMM yyyy HH:mm:ss z");
-		}
-	};
-
-	private static Message newMsg() {
-		return new Message("Hello, World!");
-	}
-
-	private static byte[] serializeMsg(Message obj) {
-		JsonStream stream = JsonStreamPool.borrowJsonStream();
-		try {
-			stream.reset(null);
-			stream.writeVal(Message.class, obj);
-			return Arrays.copyOfRange(stream.buffer().data(), 0, stream.buffer().tail());
-		} catch (IOException e) {
-			throw new JsonException(e);
-		} finally {
-			JsonStreamPool.returnJsonStream(stream);
-		}
-	}
-
-	private static int jsonLen() {
-		return serializeMsg(newMsg()).length;
-	}
+	private static final FastThreadLocal<DateFormat> FORMAT = new FastThreadLocal<>() {
+      @Override
+      protected DateFormat initialValue() {
+         return new SimpleDateFormat("E, dd MMM yyyy HH:mm:ss z");
+      }
+   };
 
-	private static final byte[] STATIC_PLAINTEXT = "Hello, World!".getBytes(CharsetUtil.UTF_8);
-	private static final int STATIC_PLAINTEXT_LEN = STATIC_PLAINTEXT.length;
+	protected volatile AsciiString date = new AsciiString(FORMAT.get().format(new Date()));
 
-	private static final CharSequence PLAINTEXT_CLHEADER_VALUE = AsciiString.cached(String.valueOf(STATIC_PLAINTEXT_LEN));
-	private static final int JSON_LEN = jsonLen();
-	private static final CharSequence JSON_CLHEADER_VALUE = AsciiString.cached(String.valueOf(JSON_LEN));
-	private static final CharSequence SERVER_NAME = AsciiString.cached("Netty");
-
-	private volatile CharSequence date = new AsciiString(FORMAT.get().format(new Date()));
-
-	HelloServerHandler(ScheduledExecutorService service) {
+	public HelloServerHandler(ScheduledExecutorService service) {
 		service.scheduleWithFixedDelay(new Runnable() {
 			private final DateFormat format = FORMAT.get();
 
@@ -118,42 +80,39 @@ public class HelloServerHandler extends ChannelInboundHandlerAdapter {
 		String uri = request.uri();
 		switch (uri) {
 		case "/plaintext":
-			writePlainResponse(ctx, Unpooled.wrappedBuffer(STATIC_PLAINTEXT));
+			writePlainResponse(ctx, date);
 			return;
 		case "/json":
-			byte[] json = serializeMsg(newMsg());
-			writeJsonResponse(ctx, Unpooled.wrappedBuffer(json));
+			// even for the virtual thread case we expect virtual threads to be executed inlined!
+			var stream = acquireJsonStreamFromEventLoop();
+			try {
+				writeJsonResponse(ctx, stream, date);
+			} finally {
+				releaseJsonStreamFromEventLoop(stream);
+			}
 			return;
 		}
+		// we drain in-flight responses before closing the connection
+		channelReadComplete(ctx);
 		FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_FOUND, Unpooled.EMPTY_BUFFER, false);
-		ctx.write(response).addListener(ChannelFutureListener.CLOSE);
-	}
-
-	private void writePlainResponse(ChannelHandlerContext ctx, ByteBuf buf) {
-		ctx.write(makeResponse(buf, TEXT_PLAIN, PLAINTEXT_CLHEADER_VALUE), ctx.voidPromise());
+		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
 	}
 
-	private void writeJsonResponse(ChannelHandlerContext ctx, ByteBuf buf) {
-		ctx.write(makeResponse(buf, APPLICATION_JSON, JSON_CLHEADER_VALUE), ctx.voidPromise());
+	protected void writePlainResponse(ChannelHandlerContext ctx, AsciiString date) {
+		ctx.write(makePlaintextResponse(date), ctx.voidPromise());
 	}
 
-	private FullHttpResponse makeResponse(ByteBuf buf, CharSequence contentType, CharSequence contentLength) {
-		final FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buf, false);
-		response.headers()
-				.set(CONTENT_TYPE, contentType)
-				.set(SERVER, SERVER_NAME)
-				.set(DATE, date)
-				.set(CONTENT_LENGTH, contentLength);
-		return response;
+	protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, AsciiString date) {
+		ctx.write(makeJsonResponse(stream, date), ctx.voidPromise());
 	}
 
 	@Override
-	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
 		ctx.close();
 	}
 
 	@Override
-	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+	public void channelReadComplete(ChannelHandlerContext ctx) {
 		ctx.flush();
 	}
 }

+ 6 - 2
frameworks/Java/netty/src/main/java/hello/HelloServerInitializer.java

@@ -14,7 +14,7 @@ import io.netty.handler.codec.http.HttpVersion;
 
 public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {
 
-	private final ScheduledExecutorService service;
+	protected final ScheduledExecutorService service;
 
 	public HelloServerInitializer(ScheduledExecutorService service) {
 		this.service = service;
@@ -46,6 +46,10 @@ public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {
 						return false;
 					}
 				})
-                .addLast("handler", new HelloServerHandler(service));
+                .addLast("handler", newHelloServerHandler(service));
+	}
+
+	protected HelloServerHandler newHelloServerHandler(ScheduledExecutorService service) {
+		return new HelloServerHandler(service);
 	}
 }

+ 48 - 42
frameworks/Java/netty/src/main/java/hello/HelloWebServer.java

@@ -2,30 +2,35 @@ package hello;
 
 import java.net.InetSocketAddress;
 
+import hello.loom.HelloLoomServerInitializer;
+import hello.loom.LoomSupport;
+import hello.loom.MultithreadVirtualEventExecutorGroup;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.ServerChannel;
-import io.netty.channel.epoll.Epoll;
 import io.netty.channel.epoll.EpollChannelOption;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollServerSocketChannel;
-import io.netty.channel.kqueue.KQueue;
-import io.netty.channel.kqueue.KQueueServerSocketChannel;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.incubator.channel.uring.IOUring;
-import io.netty.incubator.channel.uring.IOUringChannelOption;
-import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
-import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
+import io.netty.channel.uring.IoUringChannelOption;
 import io.netty.util.ResourceLeakDetector;
 import io.netty.util.ResourceLeakDetector.Level;
 
 public class HelloWebServer {
 
+	private static final boolean EVENT_LOOP_CARRIER = Boolean.getBoolean("hello.eventloop.carrier");
+	private static final IoMultiplexer PREFERRED_TRANSPORT;
+
 	static {
 		ResourceLeakDetector.setLevel(Level.DISABLED);
+		String transportName = System.getProperty("hello.transport");
+		if (transportName != null) {
+			try {
+				PREFERRED_TRANSPORT = IoMultiplexer.valueOf(transportName);
+			} catch (IllegalArgumentException e) {
+				System.err.println("Invalid transport name: " + transportName);
+				throw e;
+			}
+		} else {
+			PREFERRED_TRANSPORT = IoMultiplexer.type();
+		}
 	}
 
 	private final int port;
@@ -35,47 +40,46 @@ public class HelloWebServer {
 	}
 
 	public void run() throws Exception {
-		// Configure the server.
-		if (IOUring.isAvailable()) {
-			doRun(new IOUringEventLoopGroup(Runtime.getRuntime().availableProcessors()), IOUringServerSocketChannel.class, IoMultiplexer.IO_URING);
-		} else
-			if (Epoll.isAvailable()) {
-			doRun(new EpollEventLoopGroup(), EpollServerSocketChannel.class, IoMultiplexer.EPOLL);
-		} else if (KQueue.isAvailable()) {
-			doRun(new EpollEventLoopGroup(), KQueueServerSocketChannel.class, IoMultiplexer.KQUEUE);
-		} else {
-			doRun(new NioEventLoopGroup(), NioServerSocketChannel.class, IoMultiplexer.JDK);
+		final var preferredTransport = PREFERRED_TRANSPORT;
+		System.out.printf("Using %s IoMultiplexer%n", preferredTransport);
+		final int coreCount = Runtime.getRuntime().availableProcessors();
+		final var group = EVENT_LOOP_CARRIER?
+				preferredTransport.newVirtualEventExecutorGroup(coreCount) :
+				preferredTransport.newEventLoopGroup(coreCount);
+		if (EVENT_LOOP_CARRIER) {
+			LoomSupport.checkSupported();
+			System.out.println("Using EventLoop optimized for Loom");
 		}
-	}
-
-	private void doRun(EventLoopGroup loupGroup, Class<? extends ServerChannel> serverChannelClass, IoMultiplexer multiplexer) throws InterruptedException {
 		try {
-			InetSocketAddress inet = new InetSocketAddress(port);
-			
-			System.out.printf("Using %s IoMultiplexer%n", multiplexer);
+			final var serverChannelClass = preferredTransport.serverChannelClass();
+			var inet = new InetSocketAddress(port);
+			var b = new ServerBootstrap();
 
-			ServerBootstrap b = new ServerBootstrap();
-
-			if (multiplexer == IoMultiplexer.EPOLL) {
-				b.option(EpollChannelOption.SO_REUSEPORT, true);
-			}
-			
-			if (multiplexer == IoMultiplexer.IO_URING) {
-				b.option(IOUringChannelOption.SO_REUSEPORT, true);
-			}
-			
 			b.option(ChannelOption.SO_BACKLOG, 8192);
 			b.option(ChannelOption.SO_REUSEADDR, true);
-			b.group(loupGroup).channel(serverChannelClass).childHandler(new HelloServerInitializer(loupGroup.next()));
+			switch (preferredTransport) {
+				case EPOLL:
+					b.option(EpollChannelOption.SO_REUSEPORT, true);
+					break;
+				case IO_URING:
+					b.option(IoUringChannelOption.SO_REUSEPORT, true);
+					break;
+			}
+			var channelB = b.group(group).channel(serverChannelClass);
+			if (EVENT_LOOP_CARRIER) {
+				channelB.childHandler(new HelloLoomServerInitializer((MultithreadVirtualEventExecutorGroup) group, group.next()));
+			} else {
+				channelB.childHandler(new HelloServerInitializer(group.next()));
+			}
 			b.childOption(ChannelOption.SO_REUSEADDR, true);
 
 			Channel ch = b.bind(inet).sync().channel();
 
-			System.out.printf("Httpd started. Listening on: %s%n", inet.toString());
+			System.out.printf("Httpd started. Listening on: %s%n", inet);
 
 			ch.closeFuture().sync();
 		} finally {
-			loupGroup.shutdownGracefully().sync();
+			group.shutdownGracefully().sync();
 		}
 	}
 
@@ -87,5 +91,7 @@ public class HelloWebServer {
 			port = 8080;
 		}
 		new HelloWebServer(port).run();
+
+
 	}
 }

+ 45 - 0
frameworks/Java/netty/src/main/java/hello/HttpResponses.java

@@ -0,0 +1,45 @@
+package hello;
+
+import static hello.Constants.JSON_CLHEADER_VALUE;
+import static hello.Constants.PLAINTEXT_CLHEADER_VALUE;
+import static hello.Constants.SERVER_NAME;
+import static hello.Constants.STATIC_PLAINTEXT;
+import static hello.Constants.newMsg;
+import static hello.JsonUtils.serializeMsg;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpHeaderNames.DATE;
+import static io.netty.handler.codec.http.HttpHeaderNames.SERVER;
+import static io.netty.handler.codec.http.HttpHeaderValues.APPLICATION_JSON;
+import static io.netty.handler.codec.http.HttpHeaderValues.TEXT_PLAIN;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+import com.jsoniter.output.JsonStream;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.util.AsciiString;
+
+public class HttpResponses {
+
+   public static FullHttpResponse makePlaintextResponse(AsciiString date) {
+      return makeResponse(Unpooled.wrappedBuffer(STATIC_PLAINTEXT), TEXT_PLAIN, PLAINTEXT_CLHEADER_VALUE, date);
+   }
+
+   public static FullHttpResponse makeJsonResponse(JsonStream stream, AsciiString date) {
+      return makeResponse(Unpooled.wrappedBuffer(serializeMsg(newMsg(), stream)), APPLICATION_JSON, JSON_CLHEADER_VALUE, date);
+   }
+
+   private static FullHttpResponse makeResponse(ByteBuf buf, CharSequence contentType, CharSequence contentLength, AsciiString date) {
+      final FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, buf, false);
+      response.headers()
+            .set(CONTENT_TYPE, contentType)
+            .set(SERVER, SERVER_NAME)
+            .set(DATE, date)
+            .set(CONTENT_LENGTH, contentLength);
+      return response;
+   }
+}

+ 55 - 1
frameworks/Java/netty/src/main/java/hello/IoMultiplexer.java

@@ -1,5 +1,59 @@
 package hello;
 
+import hello.loom.MultithreadVirtualEventExecutorGroup;
+import io.netty.channel.IoHandlerFactory;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollIoHandler;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.kqueue.KQueueIoHandler;
+import io.netty.channel.kqueue.KQueueServerSocketChannel;
+import io.netty.channel.nio.NioIoHandler;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.uring.IoUring;
+import io.netty.channel.uring.IoUringIoHandler;
+import io.netty.channel.uring.IoUringServerSocketChannel;
+
 public enum IoMultiplexer {
-	EPOLL, KQUEUE, JDK, IO_URING
+   EPOLL, KQUEUE, JDK, IO_URING;
+
+   public Class<? extends ServerChannel> serverChannelClass() {
+      return switch (this) {
+         case EPOLL -> EpollServerSocketChannel.class;
+         case KQUEUE -> KQueueServerSocketChannel.class;
+         case JDK -> NioServerSocketChannel.class;
+         case IO_URING -> IoUringServerSocketChannel.class;
+      };
+   }
+
+   public IoHandlerFactory newIoHandlerFactory() {
+      return switch (this) {
+         case EPOLL -> EpollIoHandler.newFactory();
+         case KQUEUE -> KQueueIoHandler.newFactory();
+         case JDK -> NioIoHandler.newFactory();
+         case IO_URING -> IoUringIoHandler.newFactory();
+      };
+   }
+
+   public MultiThreadIoEventLoopGroup newEventLoopGroup(int nThreads) {
+      return new MultiThreadIoEventLoopGroup(nThreads, newIoHandlerFactory());
+   }
+
+   public MultithreadVirtualEventExecutorGroup newVirtualEventExecutorGroup(int nThreads) {
+      return new MultithreadVirtualEventExecutorGroup(nThreads, newIoHandlerFactory());
+   }
+
+   public static IoMultiplexer type() {
+      if (IoUring.isAvailable()) {
+         return IO_URING;
+      } else if (Epoll.isAvailable()) {
+         return EPOLL;
+      } else if (KQueue.isAvailable()) {
+         return KQUEUE;
+      } else {
+         return JDK;
+      }
+   }
 }

+ 78 - 0
frameworks/Java/netty/src/main/java/hello/JsonUtils.java

@@ -0,0 +1,78 @@
+package hello;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.VarHandle;
+import java.util.Arrays;
+
+import com.jsoniter.output.JsonStream;
+import com.jsoniter.output.JsonStreamPool;
+import com.jsoniter.spi.Config;
+import com.jsoniter.spi.JsonException;
+
+import io.netty.util.concurrent.FastThreadLocal;
+
+public class JsonUtils {
+
+   private static final VarHandle INDENTATION;
+
+   static {
+      try {
+         var lookup = MethodHandles.privateLookupIn(JsonStream.class, MethodHandles.lookup());
+         INDENTATION = lookup.findVarHandle(JsonStream.class, "indention", int.class);
+         var dummy = new JsonStream(null, 32);
+         INDENTATION.set(dummy, 4);
+      } catch (Exception e) {
+         throw new RuntimeException(e);
+      }
+   }
+
+   private static void setIndentation(JsonStream stream, int value) {
+      INDENTATION.set(stream, value); // Plain store
+   }
+
+   private static final FastThreadLocal<JsonStream> JSON_STREAM = new FastThreadLocal<>();
+
+   public static JsonStream acquireJsonStreamFromEventLoop() {
+      var stream = JSON_STREAM.get();
+      if (stream == null) {
+         stream = new JsonStream(null, 512) {
+            // this is to save virtual threads to use thread locals
+            @Override
+            public Config currentConfig() {
+               return Config.INSTANCE;
+            }
+         };
+      } else {
+         stream.reset(null);
+         JSON_STREAM.set(null);
+      }
+      return stream;
+   }
+
+   public static JsonStream releaseJsonStreamFromEventLoop(JsonStream jsonStream) {
+      jsonStream.configCache = null;
+      setIndentation(jsonStream, 0);
+      JSON_STREAM.set(jsonStream);
+      return jsonStream;
+   }
+
+   public static byte[] serializeMsg(Message message) {
+      var stream = JsonStreamPool.borrowJsonStream();
+      try {
+         return serializeMsg(message, stream);
+      } finally {
+         // Reset the stream to avoid memory leaks
+         JsonStreamPool.returnJsonStream(stream);
+      }
+   }
+
+   public static byte[] serializeMsg(Message obj, JsonStream stream) {
+      try {
+         stream.writeVal(Message.class, obj);
+         return Arrays.copyOfRange(stream.buffer().data(), 0, stream.buffer().tail());
+      } catch (IOException e) {
+         throw new JsonException(e);
+      }
+   }
+}

+ 21 - 0
frameworks/Java/netty/src/main/java/hello/loom/HelloLoomServerInitializer.java

@@ -0,0 +1,21 @@
+package hello.loom;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import hello.HelloServerHandler;
+import hello.HelloServerInitializer;
+
+public class HelloLoomServerInitializer extends HelloServerInitializer {
+
+   private final MultithreadVirtualEventExecutorGroup group;
+
+   public HelloLoomServerInitializer(MultithreadVirtualEventExecutorGroup group, ScheduledExecutorService service) {
+      super(service);
+      this.group = group;
+   }
+
+   @Override
+   protected HelloServerHandler newHelloServerHandler(ScheduledExecutorService service) {
+      return new VirtualThreadHelloServerHandler(service, group);
+   }
+}

+ 71 - 0
frameworks/Java/netty/src/main/java/hello/loom/LoomSupport.java

@@ -0,0 +1,71 @@
+package hello.loom;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.reflect.Field;
+import java.util.concurrent.Executor;
+
+public final class LoomSupport {
+   private static final boolean SUPPORTED;
+   private static Throwable FAILURE;
+
+   private static final MethodHandle SCHEDULER;
+
+   static {
+      boolean sup;
+      MethodHandle scheduler;
+      try {
+         // this is required to override the default scheduler
+         MethodHandles.Lookup lookup = MethodHandles.lookup();
+         Field schedulerField = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder")
+               .getDeclaredField("scheduler");
+         schedulerField.setAccessible(true);
+         scheduler = lookup.unreflectSetter(schedulerField);
+
+         // this is to make sure we fail earlier!
+         var builder = Thread.ofVirtual();
+         scheduler.invoke(builder, new Executor() {
+            @Override
+            public void execute(Runnable command) {
+
+            }
+         });
+
+         FAILURE = null;
+
+         sup = true;
+      } catch (Throwable e) {
+         scheduler = null;
+         sup = false;
+         FAILURE = e;
+      }
+
+      SCHEDULER = scheduler;
+      SUPPORTED = sup;
+   }
+
+   private LoomSupport() {
+   }
+
+   public static boolean isSupported() {
+      return SUPPORTED;
+   }
+
+   public static void checkSupported() {
+      if (!isSupported()) {
+         throw new UnsupportedOperationException(FAILURE);
+      }
+   }
+
+
+   public static Thread.Builder.OfVirtual setVirtualThreadFactoryScheduler(Thread.Builder.OfVirtual builder,
+                                                                           Executor vthreadScheduler) {
+      checkSupported();
+      try {
+         SCHEDULER.invoke(builder, vthreadScheduler);
+         return builder;
+      } catch (Throwable e) {
+         throw new RuntimeException(e);
+      }
+   }
+}

+ 56 - 0
frameworks/Java/netty/src/main/java/hello/loom/MultithreadVirtualEventExecutorGroup.java

@@ -0,0 +1,56 @@
+package hello.loom;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadFactory;
+
+import io.netty.channel.IoEventLoop;
+import io.netty.channel.IoHandlerFactory;
+import io.netty.channel.MultiThreadIoEventLoopGroup;
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.concurrent.FastThreadLocalThread;
+
+public class MultithreadVirtualEventExecutorGroup extends MultiThreadIoEventLoopGroup {
+
+   public static final int RESUMED_CONTINUATIONS_EXPECTED_COUNT = Integer.getInteger("io.netty.loom.resumed.continuations", 1024);
+   private ThreadFactory threadFactory;
+   private IdentityHashMap<Thread, VirtualThreadNettyScheduler> schedulers;
+   private final FastThreadLocal<ThreadFactory> v_thread_factory = new FastThreadLocal<>() {
+      @Override
+      protected ThreadFactory initialValue() {
+         var scheduler = schedulers.get(Thread.currentThread());
+         if (scheduler == null) {
+            return null;
+         }
+         return LoomSupport.setVirtualThreadFactoryScheduler(Thread.ofVirtual(), scheduler).factory();
+      }
+   };
+
+   public MultithreadVirtualEventExecutorGroup(int nThreads, IoHandlerFactory ioHandlerFactory) {
+      super(nThreads, (Executor) command -> {
+         throw new UnsupportedOperationException("this executor is not supposed to be used");
+      }, ioHandlerFactory);
+   }
+
+   public ThreadFactory eventLoopVirtualThreadFactory() {
+      if (!(Thread.currentThread() instanceof FastThreadLocalThread)) {
+         throw new IllegalStateException("this method should be called from event loop fast thread local threads");
+      }
+      return v_thread_factory.get();
+   }
+
+   @Override
+   protected IoEventLoop newChild(Executor executor, IoHandlerFactory ioHandlerFactory,
+                                  @SuppressWarnings("unused") Object... args) {
+      if (threadFactory == null) {
+         threadFactory = newDefaultThreadFactory();
+      }
+      var scheduler = new VirtualThreadNettyScheduler(this, threadFactory, ioHandlerFactory, RESUMED_CONTINUATIONS_EXPECTED_COUNT);
+      if (schedulers == null) {
+         schedulers = new IdentityHashMap<>();
+      }
+      schedulers.put(scheduler.getCarrierThread(), scheduler);
+      return scheduler.ioEventLoop();
+   }
+
+}

+ 46 - 0
frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadHelloServerHandler.java

@@ -0,0 +1,46 @@
+package hello.loom;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.jsoniter.output.JsonStream;
+
+import hello.HelloServerHandler;
+import hello.HttpResponses;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.util.AsciiString;
+
+public class VirtualThreadHelloServerHandler extends HelloServerHandler {
+
+   private final ArrayDeque<FullHttpResponse> responses = new ArrayDeque<>();
+   private final MultithreadVirtualEventExecutorGroup group;
+
+   public VirtualThreadHelloServerHandler(ScheduledExecutorService service, MultithreadVirtualEventExecutorGroup group) {
+      super(service);
+      this.group = group;
+   }
+
+   @Override
+   protected void writePlainResponse(ChannelHandlerContext ctx, AsciiString date) {
+      group.eventLoopVirtualThreadFactory().newThread(() -> {
+         responses.add(HttpResponses.makePlaintextResponse(date));
+      }).start();
+   }
+
+   @Override
+   protected void writeJsonResponse(ChannelHandlerContext ctx, JsonStream stream, AsciiString date) {
+      group.eventLoopVirtualThreadFactory().newThread(() -> {
+         responses.add(HttpResponses.makeJsonResponse(stream, date));
+      }).start();
+   }
+
+   @Override
+   public void channelReadComplete(ChannelHandlerContext ctx) {
+      var responses = this.responses;
+      for (int i = 0, count = responses.size(); i < count; i++) {
+         ctx.write(responses.poll());
+      }
+      ctx.flush();
+   }
+}

+ 95 - 0
frameworks/Java/netty/src/main/java/hello/loom/VirtualThreadNettyScheduler.java

@@ -0,0 +1,95 @@
+package hello.loom;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.channel.IoEventLoopGroup;
+import io.netty.channel.IoHandlerFactory;
+import io.netty.channel.ManualIoEventLoop;
+import io.netty.util.internal.shaded.org.jctools.queues.MpscUnboundedArrayQueue;
+
+public class VirtualThreadNettyScheduler implements Executor {
+
+   private static final long MAX_WAIT_TASKS_NS = TimeUnit.SECONDS.toNanos(1);
+   private static final long MAX_RUN_CONTINUATIONS_NS = TimeUnit.SECONDS.toNanos(1);
+
+   private final MpscUnboundedArrayQueue<Runnable> externalContinuations;
+   private final ManualIoEventLoop ioEventLoop;
+   private final Thread carrierThread;
+
+
+   public VirtualThreadNettyScheduler(IoEventLoopGroup parent, ThreadFactory threadFactory, IoHandlerFactory ioHandlerFactory, int resumedContinuationsExpectedCount) {
+      this.externalContinuations = new MpscUnboundedArrayQueue<>(resumedContinuationsExpectedCount);
+      this.carrierThread = threadFactory.newThread(this::internalRun);
+      this.ioEventLoop = new ManualIoEventLoop(parent, carrierThread, ioHandlerFactory);
+      // we can start the carrier only after all the fields are initialized
+      carrierThread.start();
+   }
+
+   public Thread getCarrierThread() {
+      return carrierThread;
+   }
+
+   public ManualIoEventLoop ioEventLoop() {
+      return ioEventLoop;
+   }
+
+   private void internalRun() {
+      var ioEventLoop = this.ioEventLoop;
+      while (!ioEventLoop.isShuttingDown()) {
+         // runnning I/O and async tasks within Netty without blocking
+         int workDone = ioEventLoop.runNow();
+         workDone += runExternalContinuations(MAX_RUN_CONTINUATIONS_NS);
+         if (workDone == 0 && externalContinuations.isEmpty()) {
+            ioEventLoop.run(MAX_WAIT_TASKS_NS);
+         }
+      }
+      while (!ioEventLoop.isTerminated()) {
+         ioEventLoop.runNow();
+         runExternalContinuations(MAX_RUN_CONTINUATIONS_NS);
+      }
+      while (!externalContinuations.isEmpty()) {
+         runExternalContinuations(MAX_RUN_CONTINUATIONS_NS);
+      }
+   }
+
+   private int runExternalContinuations(long deadlineNs) {
+      final long startDrainingNs = System.nanoTime();
+      int executed = 0;
+      for (; ; ) {
+         var continuation = this.externalContinuations.poll();
+         if (continuation == null) {
+            break;
+         }
+         try {
+            continuation.run();
+         } catch (Throwable t) {
+            // this shouldn't really happen
+         }
+         executed++;
+         long elapsedNs = System.nanoTime() - startDrainingNs;
+         if (elapsedNs >= deadlineNs) {
+            return executed;
+         }
+      }
+      return executed;
+   }
+
+   @Override
+   public void execute(Runnable command) {
+      // TODO improve it using a reject handler? It's not too strict!?
+      if (ioEventLoop.isShuttingDown()) {
+         throw new RejectedExecutionException("event loop is shutting down");
+      }
+      if (ioEventLoop.inEventLoop(Thread.currentThread())) {
+         command.run();
+      } else {
+         externalContinuations.offer(command);
+         // wakeup won't happen if we're shutting down!
+         ioEventLoop.wakeup();
+      }
+   }
+
+}