Răsfoiți Sursa

Integrate changes made by @normanmaurer:
"use proper config for PooledByteBufAllocator"
<https://github.com/normanmaurer/FrameworkBenchmarks/commit/411209c5d6248c981e21fea1479efdd1bbf6dade>

luis.neves 11 ani în urmă
părinte
comite
d1fafe79b4

+ 29 - 36
netty/src/main/java/hello/HelloServerHandler.java

@@ -2,9 +2,8 @@ package hello;
 
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -15,25 +14,32 @@ import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.util.CharsetUtil;
 import io.netty.util.CharsetUtil;
 
 
+import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Date;
-import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
 import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
 
 
-
+@ChannelHandler.Sharable
 public class HelloServerHandler extends SimpleChannelInboundHandler<Object> {
 public class HelloServerHandler extends SimpleChannelInboundHandler<Object> {
-    private final SimpleDateFormat format = new SimpleDateFormat("E, dd MMM yyyy HH:mm:ss z");
-    private CharSequence date;
+	private static final ThreadLocal<DateFormat> FORMAT = new ThreadLocal<DateFormat>() {
+        @Override
+        protected DateFormat initialValue() {
+            return new SimpleDateFormat("E, dd MMM yyyy HH:mm:ss z");
+        }
+    };
 
 
-    private final ByteBuf buffer = Unpooled.directBuffer().writeBytes("Hello, World!".getBytes(CharsetUtil.UTF_8));
-    private final CharSequence contentLength = HttpHeaders.newEntity(String.valueOf(buffer.readableBytes()));
+
+    private static final ByteBuf CONTENT_BUFFER = Unpooled.unreleasableBuffer(
+            Unpooled.directBuffer().writeBytes("Hello, World!".getBytes(CharsetUtil.UTF_8)));
+    private static final CharSequence contentLength = HttpHeaders.newEntity(
+            String.valueOf(CONTENT_BUFFER.readableBytes()));
 
 
     private static final CharSequence TYPE_PLAIN = HttpHeaders.newEntity("text/plain; charset=UTF-8");
     private static final CharSequence TYPE_PLAIN = HttpHeaders.newEntity("text/plain; charset=UTF-8");
     private static final CharSequence TYPE_JSON = HttpHeaders.newEntity("application/json; charset=UTF-8");
     private static final CharSequence TYPE_JSON = HttpHeaders.newEntity("application/json; charset=UTF-8");
-
     private static final CharSequence SERVER_NAME = HttpHeaders.newEntity("Netty");
     private static final CharSequence SERVER_NAME = HttpHeaders.newEntity("Netty");
     private static final CharSequence CONTENT_TYPE_ENTITY = HttpHeaders.newEntity(HttpHeaders.Names.CONTENT_TYPE);
     private static final CharSequence CONTENT_TYPE_ENTITY = HttpHeaders.newEntity(HttpHeaders.Names.CONTENT_TYPE);
     private static final CharSequence DATE_ENTITY = HttpHeaders.newEntity(HttpHeaders.Names.DATE);
     private static final CharSequence DATE_ENTITY = HttpHeaders.newEntity(HttpHeaders.Names.DATE);
@@ -45,6 +51,19 @@ public class HelloServerHandler extends SimpleChannelInboundHandler<Object> {
     	MAPPER = new ObjectMapper();
     	MAPPER = new ObjectMapper();
     	MAPPER.registerModule(new AfterburnerModule());
     	MAPPER.registerModule(new AfterburnerModule());
     }
     }
+    
+    private volatile CharSequence date = HttpHeaders.newEntity(FORMAT.get().format(new Date()));
+
+    HelloServerHandler(ScheduledExecutorService service) {
+        service.scheduleWithFixedDelay(new Runnable() {
+            private final DateFormat format = FORMAT.get();
+            @Override
+            public void run() {
+                date = HttpHeaders.newEntity(format.format(new Date()));
+            }
+        }, 1000, 1000, TimeUnit.MILLISECONDS);
+
+    }
 
 
     @Override
     @Override
     public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
     public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
@@ -53,7 +72,7 @@ public class HelloServerHandler extends SimpleChannelInboundHandler<Object> {
             String uri = request.getUri();
             String uri = request.getUri();
             switch (uri) {
             switch (uri) {
                 case "/plaintext":
                 case "/plaintext":
-                    writeResponse(ctx, request, buffer.duplicate().retain(), TYPE_PLAIN, contentLength);
+                    writeResponse(ctx, request, CONTENT_BUFFER.duplicate(), TYPE_PLAIN, contentLength);
                     return;
                     return;
                 case "/json":
                 case "/json":
                     byte[] json = MAPPER.writeValueAsBytes(new Message("Hello, World!"));
                     byte[] json = MAPPER.writeValueAsBytes(new Message("Hello, World!"));
@@ -100,30 +119,4 @@ public class HelloServerHandler extends SimpleChannelInboundHandler<Object> {
     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
         ctx.flush();
         ctx.flush();
     }
     }
-
-    @Override
-    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
-        super.handlerRemoved(ctx);
-        buffer.release();
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx) throws Exception {
-        date = HttpHeaders.newEntity(format.format(new Date()));
-
-        Channel channel = ctx.channel();
-        final ScheduledFuture<?> future = channel.eventLoop().scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                date = HttpHeaders.newEntity(format.format(new Date()));
-            }
-        }, 1000, 1000, TimeUnit.MILLISECONDS);
-
-        channel.closeFuture().addListener(new ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture channelFuture) throws Exception {
-                future.cancel(false);
-            }
-        });
-    }
 }
 }

+ 8 - 1
netty/src/main/java/hello/HelloServerInitializer.java

@@ -1,17 +1,24 @@
 package hello;
 package hello;
 
 
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpRequestDecoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;
 import io.netty.handler.codec.http.HttpResponseEncoder;
 
 
+import java.util.concurrent.ScheduledExecutorService;
+
 public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {
 public class HelloServerInitializer extends ChannelInitializer<SocketChannel> {
+    private final ChannelHandler httpHandler;
+    public HelloServerInitializer(ScheduledExecutorService service) {
+        this.httpHandler = new HelloServerHandler(service);
+    }
     @Override
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         ChannelPipeline p = ch.pipeline();
         p.addLast("encoder", new HttpResponseEncoder());
         p.addLast("encoder", new HttpResponseEncoder());
         p.addLast("decoder", new HttpRequestDecoder(4096, 8192, 8192, false));
         p.addLast("decoder", new HttpRequestDecoder(4096, 8192, 8192, false));
-        p.addLast("handler", new HelloServerHandler());
+        p.addLast("handler", httpHandler);
     }
     }
 }
 }

+ 6 - 2
netty/src/main/java/hello/HelloWebServer.java

@@ -15,6 +15,7 @@ import io.netty.util.ResourceLeakDetector.Level;
 
 
 
 
 public class HelloWebServer {
 public class HelloWebServer {
+	private static int IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;
     static {
     static {
         ResourceLeakDetector.setLevel(Level.DISABLED);
         ResourceLeakDetector.setLevel(Level.DISABLED);
     }
     }
@@ -45,8 +46,11 @@ public class HelloWebServer {
 			ServerBootstrap b = new ServerBootstrap();
 			ServerBootstrap b = new ServerBootstrap();
 			b.option(ChannelOption.SO_BACKLOG, 1024);
 			b.option(ChannelOption.SO_BACKLOG, 1024);
 			b.option(ChannelOption.SO_REUSEADDR, true);
 			b.option(ChannelOption.SO_REUSEADDR, true);
-			b.group(loupGroup).channel(serverChannelClass).childHandler(new HelloServerInitializer());			
-			b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+			b.group(loupGroup).channel(serverChannelClass).childHandler(new HelloServerInitializer(loupGroup.next()));			
+            b.option(ChannelOption.MAX_MESSAGES_PER_READ, Integer.MAX_VALUE);
+            b.childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator(true, IO_THREADS, IO_THREADS, 8192, 11));
+            b.childOption(ChannelOption.SO_REUSEADDR, true);
+            b.childOption(ChannelOption.MAX_MESSAGES_PER_READ, Integer.MAX_VALUE);
 
 
 			Channel ch = b.bind(port).sync().channel();
 			Channel ch = b.bind(port).sync().channel();
 			ch.closeFuture().sync();
 			ch.closeFuture().sync();