Browse Source

[D/Hunt] Compatible with Hunt 1.2.0-RC (#4500)

* Compatible with Hunt 1.2.0-RC

* Upgrade dmd version to 2.085.0

* Update hunt.dockerfile

* Impove database operations

* Upgrade docker files

* Minor updates
Heromyth 6 years ago
parent
commit
6c11a13181

+ 1 - 1
frameworks/D/hunt/dub.json

@@ -11,7 +11,7 @@
 	"libs-posix": [ "http_parser" ],
 	"libs-posix": [ "http_parser" ],
 	"lflags-posix": ["-Lhttp-parser/"],
 	"lflags-posix": ["-Lhttp-parser/"],
 	"dependencies": {
 	"dependencies": {
-		"hunt": "~>1.2.0-beta.1",
+		"hunt": "~>1.2.0-rc.1",
 		"hunt-database": "~>1.2.0-beta.1",
 		"hunt-database": "~>1.2.0-beta.1",
 		"std_data_json": "~>0.18.2"
 		"std_data_json": "~>0.18.2"
 	},
 	},

+ 1 - 0
frameworks/D/hunt/http-parser

@@ -0,0 +1 @@
+Subproject commit 0d0a24e19eb5ba232d2ea8859aba2a7cc6c42bc4

+ 3 - 3
frameworks/D/hunt/hunt-dmd-postgresql.dockerfile

@@ -1,15 +1,15 @@
-FROM dlanguage/ldc:1.7.0
+FROM dlangchina/dlang-dmd:latest
 
 
 ADD ./ /hunt
 ADD ./ /hunt
 WORKDIR /hunt
 WORKDIR /hunt
 
 
-RUN apt update -yqq && apt install -yqq git make libpq-dev
+RUN apt update -y && apt install -y --no-install-recommends git && rm -rf /var/lib/apt/lists/* && rm -rf /var/cache/apt/*
 
 
 RUN git clone https://github.com/nodejs/http-parser.git && \
 RUN git clone https://github.com/nodejs/http-parser.git && \
     cd http-parser && \
     cd http-parser && \
     make package
     make package
     
     
 RUN dub upgrade --verbose
 RUN dub upgrade --verbose
-RUN dub build --build=release --arch=x86_64 --config=postgresql -f
+RUN dub build --build=release --arch=x86_64 --config=postgresql --compiler=dmd -f
 
 
 CMD ["./hunt-minihttp"]
 CMD ["./hunt-minihttp"]

+ 2 - 2
frameworks/D/hunt/hunt-ldc-postgresql.dockerfile

@@ -1,9 +1,9 @@
-FROM dlanguage/ldc:1.7.0
+FROM dlangchina/dlang-ldc:latest
 
 
 ADD ./ /hunt
 ADD ./ /hunt
 WORKDIR /hunt
 WORKDIR /hunt
 
 
-RUN apt update -yqq && apt install -yqq git make libpq-dev
+RUN apt update -y && apt install -y --no-install-recommends git && rm -rf /var/lib/apt/lists/* && rm -rf /var/cache/apt/*
 
 
 RUN git clone https://github.com/nodejs/http-parser.git && \
 RUN git clone https://github.com/nodejs/http-parser.git && \
     cd http-parser && \
     cd http-parser && \

+ 2 - 2
frameworks/D/hunt/hunt-ldc.dockerfile

@@ -1,6 +1,6 @@
-FROM dlanguage/ldc:1.7.0
+FROM dlangchina/dlang-ldc:latest
 
 
-RUN apt update -yqq && apt install -yqq git make libpq-dev
+RUN apt update -y && apt install -y --no-install-recommends git && rm -rf /var/lib/apt/lists/* && rm -rf /var/cache/apt/*
 
 
 ADD ./ /hunt
 ADD ./ /hunt
 WORKDIR /hunt
 WORKDIR /hunt

+ 3 - 3
frameworks/D/hunt/hunt.dockerfile

@@ -1,6 +1,6 @@
-FROM dlanguage/ldc:1.7.0
+FROM dlangchina/dlang-dmd:latest
 
 
-RUN apt update -yqq && apt install -yqq git make libpq-dev
+RUN apt-get update && apt-get install -y --no-install-recommends git && rm -rf /var/lib/apt/lists/* && rm -rf /var/cache/apt/*
 
 
 ADD ./ /hunt
 ADD ./ /hunt
 WORKDIR /hunt
 WORKDIR /hunt
@@ -10,6 +10,6 @@ RUN git clone https://github.com/nodejs/http-parser.git && \
     make package
     make package
     
     
 RUN dub upgrade --verbose
 RUN dub upgrade --verbose
-RUN dub build -f --arch=x86_64 --build=release -c=lite
+RUN dub build -f --arch=x86_64 --build=release --compiler=dmd -c=lite
 
 
 CMD ["./hunt-minihttp"]
 CMD ["./hunt-minihttp"]

+ 1 - 2
frameworks/D/hunt/source/DemoProcessor.d

@@ -149,8 +149,7 @@ class DemoProcessor : HttpProcessor {
         private void respondSingleQuery() {
         private void respondSingleQuery() {
             int id = uniform(1, 10000);
             int id = uniform(1, 10000);
             string query = "SELECT randomNumber FROM world WHERE id = " ~ id.to!string;
             string query = "SELECT randomNumber FROM world WHERE id = " ~ id.to!string;
-            Statement statement = dbConnection.prepare(query);
-            ResultSet rs = statement.query();
+            ResultSet rs = dbConnection.query(query);
 
 
             JSONValue js = JSONValue(["id" : JSONValue(id), "randomNumber"
             JSONValue js = JSONValue(["id" : JSONValue(id), "randomNumber"
                     : JSONValue(to!int(rs.front()[0]))]);
                     : JSONValue(to!int(rs.front()[0]))]);

+ 13 - 9
frameworks/D/hunt/source/app.d

@@ -26,16 +26,20 @@ void main(string[] args) {
 		return;
 		return;
 	}
 	}
 
 
-version(POSTGRESQL) {
-	debug {
-		dbConnection = new Database("postgresql://benchmarkdbuser:[email protected]:5432/hello_world?charset=utf-8");
-		dbConnection.getOption().setMinimumConnection(128);
-	} else {
-		dbConnection = new Database("postgresql://benchmarkdbuser:benchmarkdbpass@tfb-database:5432/hello_world?charset=utf-8");
-		dbConnection.getOption().setMinimumConnection(128);
+	version (POSTGRESQL) {
+		DatabaseOption options;
+		debug {
+			options = new DatabaseOption(
+					"postgresql://benchmarkdbuser:[email protected]:5432/hello_world?charset=utf-8");
+		} else {
+			options = new DatabaseOption(
+					"postgresql://benchmarkdbuser:benchmarkdbpass@tfb-database:5432/hello_world?charset=utf-8");
+		}
+		options.setMinimumConnection(totalCPUs*2);
+		options.setMaximumConnection(totalCPUs*2);
+		dbConnection = new Database(options);
 	}
 	}
-		dbConnection.getOption().setMaximumConnection(128);
-}
+	
 	HttpServer httpServer = new HttpServer("0.0.0.0", port, totalCPUs);
 	HttpServer httpServer = new HttpServer("0.0.0.0", port, totalCPUs);
 	httpServer.onProcessorCreate(delegate HttpProcessor (TcpStream client) {
 	httpServer.onProcessorCreate(delegate HttpProcessor (TcpStream client) {
 		return new DemoProcessor(client);
 		return new DemoProcessor(client);

+ 15 - 5
frameworks/D/hunt/source/http/Processor.d

@@ -7,9 +7,11 @@ import core.stdc.stdlib;
 import core.thread, core.atomic;
 import core.thread, core.atomic;
 import http.Parser;
 import http.Parser;
 
 
-import hunt.util.DateTime;
+import hunt.collection.ByteBuffer;
 import hunt.logging;
 import hunt.logging;
 import hunt.io;
 import hunt.io;
+import hunt.util.DateTime;
+
 
 
 struct HttpHeader {
 struct HttpHeader {
 	string name, value;
 	string name, value;
@@ -21,6 +23,10 @@ struct HttpRequest {
 	string uri;
 	string uri;
 }
 }
 
 
+version(NO_HTTPPARSER) {
+enum string ResponseData = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nConnection: Keep-Alive\r\nContent-Type: text/plain\r\nServer: Hunt/1.0\r\nDate: Wed, 17 Apr 2013 12:00:00 GMT\r\n\r\nHello, World!";
+}
+
 abstract class HttpProcessor {
 abstract class HttpProcessor {
 private:
 private:
 	enum State {
 	enum State {
@@ -30,7 +36,6 @@ private:
 		done
 		done
 	}
 	}
 
 
-	ubyte[] buffer;
 	Appender!(char[]) outBuf;
 	Appender!(char[]) outBuf;
 	HttpHeader[] headers; // buffer for headers
 	HttpHeader[] headers; // buffer for headers
 	size_t header; // current header
 	size_t header; // current header
@@ -48,15 +53,19 @@ public:
 	this(TcpStream sock) {
 	this(TcpStream sock) {
 		serving = true;
 		serving = true;
 		client = sock;
 		client = sock;
-		buffer = new ubyte[2048];
 		headers = new HttpHeader[1];
 		headers = new HttpHeader[1];
 		pad = ScratchPad(16 * 1024);
 		pad = ScratchPad(16 * 1024);
 		parser = httpParser(this, HttpParserType.request);
 		parser = httpParser(this, HttpParserType.request);
 	}
 	}
 
 
 	void run() {
 	void run() {
-		client.onDataReceived((const ubyte[] data) { 
-			parser.execute(data);
+		client.onDataReceived((ByteBuffer buffer) { 
+			version(NO_HTTPPARSER) {
+				client.write(cast(ubyte[])ResponseData);
+			} else {
+				parser.execute(cast(ubyte[]) buffer.getRemaining());
+			}
+
 		})
 		})
 		.onClosed(() {
 		.onClosed(() {
 			// notifyClientClosed();
 			// notifyClientClosed();
@@ -76,6 +85,7 @@ public:
 	}
 	}
 
 
 	void respondWith(const(ubyte)[] _body, uint status, HttpHeader[] headers...) {
 	void respondWith(const(ubyte)[] _body, uint status, HttpHeader[] headers...) {
+		outBuf.clear();
 		formattedWrite(outBuf, "HTTP/1.1 %s OK\r\n", status);
 		formattedWrite(outBuf, "HTTP/1.1 %s OK\r\n", status);
 		outBuf.put("Server: Hunt/1.0\r\n");
 		outBuf.put("Server: Hunt/1.0\r\n");
 
 

+ 27 - 8
frameworks/D/hunt/source/http/Server.d

@@ -11,6 +11,7 @@ import std.conv;
 import std.json;
 import std.json;
 import std.socket;
 import std.socket;
 import std.string;
 import std.string;
+import std.stdio;
 
 
 import http.Parser;
 import http.Parser;
 import http.Processor;
 import http.Processor;
@@ -19,19 +20,23 @@ shared static this() {
 	DateTimeHelper.startClock();
 	DateTimeHelper.startClock();
 }
 }
 
 
+
 /**
 /**
 */
 */
 abstract class AbstractTcpServer {
 abstract class AbstractTcpServer {
 	protected EventLoopGroup _group = null;
 	protected EventLoopGroup _group = null;
 	protected bool _isStarted = false;
 	protected bool _isStarted = false;
 	protected Address _address;
 	protected Address _address;
+	protected int _workersCount;
 	TcpStreamOption _tcpStreamoption;
 	TcpStreamOption _tcpStreamoption;
 
 
-	this(Address address, int thread = (totalCPUs - 1)) {
+	this(Address address, int thread = (totalCPUs - 1), int workersCount = 0) {
 		this._address = address;
 		this._address = address;
 		_tcpStreamoption = TcpStreamOption.createOption();
 		_tcpStreamoption = TcpStreamOption.createOption();
-		_tcpStreamoption.bufferSize = 1024*2;
+		_tcpStreamoption.bufferSize = 1024 * 2;
+		_tcpStreamoption.isKeepalive = false;
 		_group = new EventLoopGroup(cast(uint) thread);
 		_group = new EventLoopGroup(cast(uint) thread);
+		this._workersCount = workersCount;
 	}
 	}
 
 
 	@property Address bindingAddress() {
 	@property Address bindingAddress() {
@@ -48,23 +53,37 @@ abstract class AbstractTcpServer {
 		server.bind(new InternetAddress("0.0.0.0", 8080));
 		server.bind(new InternetAddress("0.0.0.0", 8080));
 		server.listen(8192);
 		server.listen(8192);
 
 
-		debug trace("Launching server");
+		trace("Launching server");
+		debug {
+			_group.start();
+		} else {
+			_group.start(100);
+		}
 
 
-		_group.start();
+		if (_workersCount) {
+			defaultPoolThreads = _workersCount;
+			workerPool(); // Initilize worker poll
+		}
+		writefln("worker count: %d", _workersCount);
+		writefln("IO thread: %d", _group.size);
+		
 
 
 		while (true) {
 		while (true) {
 			try {
 			try {
 				version (HUNT_DEBUG)
 				version (HUNT_DEBUG)
 					trace("Waiting for server.accept()");
 					trace("Waiting for server.accept()");
+
 				Socket socket = server.accept();
 				Socket socket = server.accept();
 				version (HUNT_DEBUG) {
 				version (HUNT_DEBUG) {
-					infof("new client from %s, fd=%d", socket.remoteAddress.toString(), socket.handle());
+					infof("new connection from %s, fd=%d",
+							socket.remoteAddress.toString(), socket.handle());
 				}
 				}
-				EventLoop loop = _group.nextLoop();
+				// EventLoop loop = _group.nextLoop();
+				EventLoop loop = _group.nextLoop(socket.handle);
 				TcpStream stream = new TcpStream(loop, socket, _tcpStreamoption);
 				TcpStream stream = new TcpStream(loop, socket, _tcpStreamoption);
 				onConnectionAccepted(stream);
 				onConnectionAccepted(stream);
 			} catch (Exception e) {
 			} catch (Exception e) {
-				warningf("Failure on accept %s", e);
+				warningf("Failure on accepting %s", e);
 				break;
 				break;
 			}
 			}
 		}
 		}
@@ -98,7 +117,7 @@ class HttpServer : AbstractTcpServer {
 	}
 	}
 
 
 	override protected void onConnectionAccepted(TcpStream client) {
 	override protected void onConnectionAccepted(TcpStream client) {
-		if(processorCreater !is null) {
+		if (processorCreater !is null) {
 			HttpProcessor httpProcessor = processorCreater(client);
 			HttpProcessor httpProcessor = processorCreater(client);
 			httpProcessor.run();
 			httpProcessor.run();
 		}
 		}