Browse Source

optimiz protocol and mem pool (#6792)

longzl 3 years ago
parent
commit
d0c79e804f

+ 2 - 2
frameworks/Java/isocket-nio/pom.xml

@@ -97,8 +97,8 @@
                     <verbose>true</verbose>
                     <fork>true</fork>
                     <compilerArgument>-nowarn</compilerArgument>
-                    <source>1.7</source>
-                    <target>1.7</target>
+                    <source>11</source>
+                    <target>11</target>
                     <encoding>UTF-8</encoding>
                     <debug>false</debug>
                 </configuration>

+ 8 - 0
frameworks/Java/isocket-nio/src/main/java/cn/ibaijia/tfb/Consts.java

@@ -0,0 +1,8 @@
+package cn.ibaijia.tfb;
+
+public class Consts {
+
+    public static final byte[] TEXT_TYPE = "text/plain".getBytes();
+    public static final byte[] JSON_TYPE = "application/json".getBytes();
+
+}

+ 33 - 0
frameworks/Java/isocket-nio/src/main/java/cn/ibaijia/tfb/DateUtil.java

@@ -0,0 +1,33 @@
+package cn.ibaijia.tfb;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Locale;
+
+public class DateUtil {
+
+    private static final SimpleDateFormat dateFormat = new SimpleDateFormat("E, dd MMM yyyy HH:mm:ss z", Locale.ENGLISH);//Fri, 09 Jul 2021 09:10:42 UTC
+
+    private static byte[] date = ("\r\nDate:" + dateFormat.format(new Date())).getBytes();
+
+    public static byte[] getDate() {
+        return date;
+    }
+
+    public static void start() {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (true) {
+                    date = ("\r\nDate:" + dateFormat.format(new Date())).getBytes();
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+
+                    }
+                }
+            }
+        }).start();
+    }
+
+}

+ 7 - 4
frameworks/Java/isocket-nio/src/main/java/cn/ibaijia/tfb/HttpBootstrap.java

@@ -12,6 +12,7 @@ public class HttpBootstrap {
     private static final Logger logger = LoggerFactory.getLogger(HttpBootstrap.class);
 
     public static void main(String[] args) {
+        DateUtil.start();
         Server server = new Server("0.0.0.0", 8080);
         server.addProtocol(new SimpleHttpProtocol());
         server.setProcessor(new PlanTextProcessor());
@@ -23,10 +24,12 @@ public class HttpBootstrap {
         });
         int processorNumber = Runtime.getRuntime().availableProcessors();
         server.setThreadNumber(processorNumber);
-//        server.setUseDirectBuffer(true);
-        server.setPoolSize(16 * 1024);
-        server.setReadBuffSize(4 * 1024);
-        server.setBacklog(1024 * 8);
+        server.setUseDirectBuffer(true);
+        server.setReadFirst(true);
+        server.setUsePool(true);
+        server.setPoolPageSize(16 * 1024);
+        server.setBuffSize(1 * 1024);
+        server.setBacklog(1024 * 4);
         server.start();
     }
 

+ 9 - 2
frameworks/Java/isocket-nio/src/main/java/cn/ibaijia/tfb/http/HttpEntity.java

@@ -4,7 +4,14 @@ public abstract class HttpEntity {
 
     public String charset = "UTF-8";
 
-    public abstract String getHeader(String name);
-    public abstract void setHeader(String name,String value);
+    public abstract byte[] getHeader(byte[] name);
+
+    public abstract byte[] getHeader(String name);
+
+    public abstract void setHeader(byte[] name, byte[] value);
+
+    public abstract void setContentType(String contentType);
+
+    public abstract void setContentType(byte[] contentType);
 
 }

+ 30 - 6
frameworks/Java/isocket-nio/src/main/java/cn/ibaijia/tfb/http/HttpRequestEntity.java

@@ -1,19 +1,21 @@
 package cn.ibaijia.tfb.http;
 
+import cn.ibaijia.tfb.Consts;
+
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
 public class HttpRequestEntity extends HttpEntity {
 
 
-
     public ByteBuffer bodyBuffer = null;
     public boolean chunked = false;
     public int contentLength = -1;
     public int crNum = 0;
     public int lfNum = 0;
-    public String tmp;
+    public byte[] tmp;
 
 
     //请求行
@@ -24,17 +26,39 @@ public class HttpRequestEntity extends HttpEntity {
     //请求体
     public String body;
     //第一次 请求header时解析 第一行不要
-    private Map<String, String> headers = new HashMap<>();
+    private Map<byte[], byte[]> headers = new HashMap<>();
+    private byte[] contentType = Consts.TEXT_TYPE;
 
-    public String getHeader(String name) {
-        return this.headers.get(name);
+    @Override
+    public byte[] getHeader(byte[] name) {
+        for (Map.Entry<byte[], byte[]> entry : headers.entrySet()) {
+            if (Arrays.equals(entry.getKey(), name)) {
+                return entry.getValue();
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public byte[] getHeader(String name) {
+        return getHeader(name.getBytes());
     }
 
     @Override
-    public void setHeader(String name, String value) {
+    public void setHeader(byte[] name, byte[] value) {
         this.headers.put(name, value);
     }
 
+    @Override
+    public void setContentType(String contentType) {
+        this.contentType = contentType.getBytes();
+    }
+
+    @Override
+    public void setContentType(byte[] contentType) {
+        this.contentType = contentType;
+    }
+
     public void processBody() {
         bodyBuffer.flip();
         byte[] bytes = new byte[bodyBuffer.remaining()];

+ 64 - 25
frameworks/Java/isocket-nio/src/main/java/cn/ibaijia/tfb/http/HttpResponseEntity.java

@@ -1,54 +1,93 @@
 package cn.ibaijia.tfb.http;
 
+import cn.ibaijia.tfb.Consts;
+import cn.ibaijia.tfb.DateUtil;
+
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 public class HttpResponseEntity extends HttpEntity {
-    public static final String protocol = "HTTP/1.1";
-    public int statusCode = 200;
-    public String status = "OK";
+    private static final byte[] PROTOCOL = "HTTP/1.1 ".getBytes();
+    private static final byte[] STATUS_200 = "200 OK".getBytes();
+    private static final byte[] CRLF = "\r\n".getBytes();
+    private static final byte[] COLON = ":".getBytes();
+    private static final byte[] SERVER_NAME = "\r\nServer:tfb\r\n".getBytes();
+    private static final byte[] CONTENT_LENGTH_HEAD = "\r\nContent-Length:".getBytes();
+    private static final byte[] CONTENT_TYPE_HEAD = "\r\nContent-Type:".getBytes();
 
+    private byte[] contentType = Consts.TEXT_TYPE;
     //响应体
-    public String body = "";
+    public String body;
 
     //请求头 或者 响应头
-    public Map<String, String> headers = new HashMap<>();
+    public Map<byte[], byte[]> headers = new HashMap<>();
+
+    @Override
+    public byte[] getHeader(byte[] name) {
+        for (Map.Entry<byte[], byte[]> entry : headers.entrySet()) {
+            if (Arrays.equals(entry.getKey(), name)) {
+                return entry.getValue();
+            }
+        }
+        return null;
+    }
 
     @Override
-    public String getHeader(String name) {
-        return headers.get(name);
+    public byte[] getHeader(String name) {
+        return getHeader(name.getBytes());
     }
 
     @Override
-    public void setHeader(String name, String value) {
+    public void setHeader(byte[] name, byte[] value) {
         headers.put(name, value);
     }
 
-    public int getStatusCode() {
-        return statusCode;
+    @Override
+    public void setContentType(String contentType) {
+        this.contentType = contentType.getBytes();
     }
 
-    public void setStatusCode(int statusCode) {
-        this.statusCode = statusCode;
+    @Override
+    public void setContentType(byte[] contentType) {
+        this.contentType = contentType;
     }
 
-    public String getStatus() {
-        return status;
+    public ByteBuffer toBuffer(ByteBuffer byteBuffer) {
+        byteBuffer.put(PROTOCOL);
+        byteBuffer.put(STATUS_200);
+        byteBuffer.put(DateUtil.getDate());
+        byteBuffer.put(CONTENT_LENGTH_HEAD);
+        byteBuffer.put(String.valueOf(body.length()).getBytes());
+        byteBuffer.put(CONTENT_TYPE_HEAD);
+        byteBuffer.put(contentType);
+        byteBuffer.put(SERVER_NAME);
+        for (Map.Entry<byte[], byte[]> header : headers.entrySet()) {
+            byteBuffer.put(header.getKey());
+            byteBuffer.put(COLON);
+            byteBuffer.put(header.getValue());
+            byteBuffer.put(CRLF);
+        }
+        byteBuffer.put(CRLF);
+        byteBuffer.put(body.getBytes());
+        byteBuffer.flip();
+        return byteBuffer;
     }
 
-    public void setStatus(String status) {
-        this.status = status;
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        HttpResponseEntity that = (HttpResponseEntity) o;
+        return Objects.equals(body, that.body) &&
+                Objects.equals(headers, that.headers);
     }
 
-    public ByteBuffer toBuffer() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(protocol).append(" ").append(statusCode).append(" ").append(status).append("\r\n");
-        sb.append("Content-Length:").append(body.length()).append("\r\n");
-        for (Map.Entry<String, String> header : headers.entrySet()) {
-            sb.append(header.getKey()).append(":").append(header.getValue()).append("\r\n");
-        }
-        sb.append("\r\n").append(body);
-        return ByteBuffer.wrap(sb.toString().getBytes());
+    @Override
+    public int hashCode() {
+
+        return Objects.hash(body, headers);
     }
 }

+ 9 - 44
frameworks/Java/isocket-nio/src/main/java/cn/ibaijia/tfb/processor/PlanTextProcessor.java

@@ -2,9 +2,7 @@ package cn.ibaijia.tfb.processor;
 
 import cn.ibaijia.isocket.processor.Processor;
 import cn.ibaijia.isocket.session.Session;
-import cn.ibaijia.isocket.session.SessionManager;
-import cn.ibaijia.isocket.util.BufferPool;
-import cn.ibaijia.isocket.util.BufferState;
+import cn.ibaijia.tfb.Consts;
 import cn.ibaijia.tfb.http.HttpEntity;
 import cn.ibaijia.tfb.http.HttpRequestEntity;
 import cn.ibaijia.tfb.http.HttpResponseEntity;
@@ -12,62 +10,33 @@ import com.alibaba.fastjson.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Locale;
-import java.util.concurrent.locks.ReentrantLock;
-
 public class PlanTextProcessor implements Processor<HttpEntity> {
     private static final Logger logger = LoggerFactory.getLogger(PlanTextProcessor.class);
-    private static final SimpleDateFormat dateFormat = new SimpleDateFormat("E, dd MMM yyyy HH:mm:ss z", Locale.ENGLISH);//Fri, 09 Jul 2021 09:10:42 UTC
-    private static final String SERVER_NAME = "isocket-nio-tfb";
-    private static final String TEXT_TYPE = "text/plain; charset=UTF-8";
-    private static final String JSON_TYPE = "application/json; charset=UTF-8";
-    private static ReentrantLock lock = new ReentrantLock();
-    private static String dateHeader = dateFormat.format(new Date());
-
-    private String getDateHeader() {
-        //TODO
-        if (lock.tryLock()) {
-            dateHeader = dateFormat.format(new Date());
-        }
-        return dateHeader;
-    }
 
     @Override
     public boolean process(final Session session, final HttpEntity httpEntity) {
         HttpRequestEntity httpRequestEntity = (HttpRequestEntity) httpEntity;
         String url = httpRequestEntity.url;
         logger.trace("url:{}", url);
-        if (url.equals("/plaintext")) {
+        if ("/plaintext".equals(url)) {
             HttpResponseEntity httpResponseEntity = new HttpResponseEntity();
-            httpResponseEntity.setHeader("Content-Type", TEXT_TYPE);
-            httpResponseEntity.setHeader("Server", SERVER_NAME);
-            httpResponseEntity.setHeader("Date", getDateHeader());
+            httpResponseEntity.setContentType(Consts.TEXT_TYPE);
             httpResponseEntity.body = "Hello, World!";
             session.write(httpResponseEntity);
-        } else if (url.equals("/json")) {
+        } else if ("/json".equals(url)) {
             HttpResponseEntity httpResponseEntity = new HttpResponseEntity();
-            httpResponseEntity.setHeader("Content-Type", JSON_TYPE);
-            httpResponseEntity.setHeader("Server", SERVER_NAME);
-            httpResponseEntity.setHeader("Date", getDateHeader());
+            httpResponseEntity.setContentType(Consts.JSON_TYPE);
             httpResponseEntity.body = JSON.toJSONString(new Message("Hello, World!"));
             session.write(httpResponseEntity);
-        } else if (url.equals("/state")) {
+        } else if ("/state".equals(url)) {
             HttpResponseEntity httpResponseEntity = new HttpResponseEntity();
-            httpResponseEntity.setHeader("Content-Type", JSON_TYPE);
-            httpResponseEntity.setHeader("Server", SERVER_NAME);
-            httpResponseEntity.setHeader("Date", getDateHeader());
+            httpResponseEntity.setContentType(Consts.JSON_TYPE);
             State state = new State();
-            state.sessionCount = SessionManager.getSessionCount();
-            state.bufferState = new BufferState();
             httpResponseEntity.body = JSON.toJSONString(state);
             session.write(httpResponseEntity);
         } else {
             HttpResponseEntity httpResponseEntity = new HttpResponseEntity();
-            httpResponseEntity.setHeader("Content-Type", TEXT_TYPE);
-            httpResponseEntity.setHeader("Server", SERVER_NAME);
-            httpResponseEntity.setHeader("Date", getDateHeader());
+            httpResponseEntity.setContentType(Consts.TEXT_TYPE);
             httpResponseEntity.body = "hi";
             session.write(httpResponseEntity);
         }
@@ -78,12 +47,8 @@ public class PlanTextProcessor implements Processor<HttpEntity> {
     public void processError(Session session, HttpEntity httpEntity, Throwable throwable) {
         logger.error("processError:", throwable);
         HttpResponseEntity httpResponseEntity = new HttpResponseEntity();
-        httpResponseEntity.setHeader("Content-Type", TEXT_TYPE);
-        httpResponseEntity.setHeader("Server", SERVER_NAME);
-        httpResponseEntity.setHeader("Date", getDateHeader());
+        httpResponseEntity.setContentType(Consts.TEXT_TYPE);
         httpResponseEntity.body = "hi";
-        httpResponseEntity.statusCode = 500;
-        httpResponseEntity.status = "Internal Server Error";
         session.write(httpResponseEntity);
     }
 }

+ 2 - 4
frameworks/Java/isocket-nio/src/main/java/cn/ibaijia/tfb/processor/State.java

@@ -1,10 +1,8 @@
 package cn.ibaijia.tfb.processor;
 
-import cn.ibaijia.isocket.util.BufferState;
-
 public class State {
 
-    public int sessionCount;
-    public BufferState bufferState;
+//    public int sessionCount;
+//    public BufferState bufferState;
 
 }

+ 31 - 36
frameworks/Java/isocket-nio/src/main/java/cn/ibaijia/tfb/protocol/SimpleHttpProtocol.java

@@ -9,14 +9,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 public class SimpleHttpProtocol implements Protocol<ByteBuffer, HttpEntity> {
 
     private static final Logger logger = LoggerFactory.getLogger(SimpleHttpProtocol.class);
 
-    private static final String CONTENT_LENGTH = "CONTENT-LENGTH";
-    private static final String TRANSFER_ENCODING = "TRANSFER-ENCODING";
-    private static final String CHUNKED = "CHUNKED";
     private static final byte CR13 = (byte) 13; // \CR \r
     private static final byte LF10 = (byte) 10; // \LF \n
     private static final byte SPACE0 = (byte) 32; // \SP
@@ -40,22 +38,22 @@ public class SimpleHttpProtocol implements Protocol<ByteBuffer, HttpEntity> {
         }
 
         if (!httpEntity.headerComplete() && byteBuffer.hasRemaining()) { //解析header
-            readHeader(byteBuffer, session, httpEntity);
+            readHeader(byteBuffer, httpEntity);
         }
 
-        if (httpEntity.headerComplete() && httpEntity.bodyBuffer != null && byteBuffer.hasRemaining()) {// 解析body
-            readBody(byteBuffer, session, httpEntity);
-        }
-
-        if (httpEntity.complete()) {
-            session.setAttribute(httpEntityKey, null);
-            return httpEntity;
+        if (httpEntity.headerComplete()) {
+            if (httpEntity.complete()) {
+                session.setAttribute(httpEntityKey, null);
+                return httpEntity;
+            }
+            if (httpEntity.bodyBuffer != null && byteBuffer.hasRemaining()) { // 解析request body
+                readBody(byteBuffer, httpEntity);
+            }
         }
-
         return null;
     }
 
-    private void readHeader(ByteBuffer byteBuffer, Session session, HttpRequestEntity httpEntity) {
+    private void readHeader(ByteBuffer byteBuffer, HttpRequestEntity httpEntity) {
         try {
             ByteBuffer buf = byteBuffer.duplicate();
             int startPos = 0;
@@ -71,6 +69,11 @@ public class SimpleHttpProtocol implements Protocol<ByteBuffer, HttpEntity> {
                     httpEntity.crNum = 0;
                     httpEntity.lfNum = 0;
                 }
+
+                if (httpEntity.headerComplete()) {
+                    return;
+                }
+
                 if (httpEntity.isReadHeadLine()) {
                     if (b == SPACE0) {
                         int len = endPos - startPos - 1;
@@ -80,21 +83,16 @@ public class SimpleHttpProtocol implements Protocol<ByteBuffer, HttpEntity> {
                         buf.position(startPos);
                         if (httpEntity.method == null) {
                             httpEntity.method = new String(bytes);
-                            continue;
-                        }
-                        if (httpEntity.url == null) {
+                        } else if (httpEntity.url == null) {
                             httpEntity.url = new String(bytes);
-                            continue;
                         }
-                    }
-                    if (httpEntity.crNum == 1 && httpEntity.lfNum == 1) {
+                    } else if (httpEntity.crNum == 1 && httpEntity.lfNum == 1) {
                         int len = endPos - startPos - 2;
                         byte[] bytes = new byte[len];
                         buf.get(bytes, 0, len);
                         startPos = endPos;
                         buf.position(startPos);
                         httpEntity.protocol = new String(bytes);
-                        continue;
                     }
                 } else {
                     if (b == COLON && httpEntity.tmp == null) {
@@ -103,26 +101,23 @@ public class SimpleHttpProtocol implements Protocol<ByteBuffer, HttpEntity> {
                         buf.get(bytes, 0, len);
                         startPos = endPos;
                         buf.position(startPos);
-                        httpEntity.tmp = new String(bytes);
-                        continue;
-                    }
-                    if (httpEntity.crNum == 1 && httpEntity.lfNum == 1) {
+                        httpEntity.tmp = bytes;
+                    } else if (httpEntity.crNum == 1 && httpEntity.lfNum == 1) {
                         int len = endPos - startPos - 2;
                         byte[] bytes = new byte[len];
                         buf.get(bytes, 0, len);
                         startPos = endPos;
                         buf.position(startPos);
-                        String value = new String(bytes);
-                        httpEntity.setHeader(httpEntity.tmp, value);
+                        httpEntity.setHeader(httpEntity.tmp, bytes);
                         httpEntity.tmp = null;
-                        if (CONTENT_LENGTH.equals(httpEntity.tmp)) {
-                            httpEntity.contentLength = (value == null ? 0 : Integer.valueOf(value));
-                            httpEntity.bodyBuffer = ByteBuffer.allocate(httpEntity.contentLength);//TODO can pooling
-                        }
-                        if (CHUNKED.equals(httpEntity.tmp)) {
-                            httpEntity.chunked = true;
+//                        if (Arrays.equals(CONTENT_LENGTH, httpEntity.tmp)) {
+//                            httpEntity.contentLength = (value == null ? 0 : Integer.valueOf(value));
+//                            httpEntity.bodyBuffer = ByteBuffer.allocate(httpEntity.contentLength);//TODO can pooling
+//                        }
+//                        if (Arrays.equals(CHUNKED, httpEntity.tmp)) {
+//                            httpEntity.chunked = true;
 //                            throw new RuntimeException("not support chunked");
-                        }
+//                        }
                     }
                 }
             }
@@ -131,7 +126,7 @@ public class SimpleHttpProtocol implements Protocol<ByteBuffer, HttpEntity> {
         }
     }
 
-    private void readBody(ByteBuffer byteBuffer, Session session, HttpRequestEntity httpEntity) {
+    private void readBody(ByteBuffer byteBuffer, HttpRequestEntity httpEntity) {
         try {
             if (httpEntity.bodyBuffer.hasRemaining()) {
                 if (byteBuffer.remaining() <= httpEntity.bodyBuffer.remaining()) {
@@ -152,8 +147,8 @@ public class SimpleHttpProtocol implements Protocol<ByteBuffer, HttpEntity> {
 
     @Override
     public ByteBuffer encode(HttpEntity httpEntity, Session session) {
+        ByteBuffer byteBuffer = session.getHandler().getPooledByteBuff().get();
         HttpResponseEntity httpResponseEntity = (HttpResponseEntity) httpEntity;
-
-        return httpResponseEntity.toBuffer();
+        return httpResponseEntity.toBuffer(byteBuffer);
     }
 }