123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- module http.Server;
- import hunt.event;
- import hunt.io;
- import hunt.logging.ConsoleLogger;
- import hunt.system.Memory : totalCPUs;
- import hunt.util.DateTime;
- import std.array;
- import std.conv;
- import std.json;
- import std.socket;
- import std.string;
- import std.stdio;
- import http.Parser;
- import http.Processor;
- shared static this() {
- DateTimeHelper.startClock();
- }
- import hunt.io.channel;
- /**
- */
- abstract class AbstractTcpServer {
- protected EventLoopGroup _group = null;
- protected bool _isStarted = false;
- protected Address _address;
- protected int _workersCount;
- TcpStreamOption _tcpStreamoption;
- this(Address address, int thread = (totalCPUs - 1), int workersCount = 0) {
- this._address = address;
- _tcpStreamoption = TcpStreamOption.createOption();
- _tcpStreamoption.bufferSize = 1024 * 2;
- _tcpStreamoption.isKeepalive = false;
- _group = new EventLoopGroup(cast(uint) thread);
- this._workersCount = workersCount;
- }
- @property Address bindingAddress() {
- return _address;
- }
- void start() {
- if (_isStarted)
- return;
- _isStarted = true;
- Socket server = new TcpSocket();
- server.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
- server.bind(new InternetAddress("0.0.0.0", 8080));
- server.listen(8192);
- trace("Launching server");
- debug {
- _group.start();
- } else {
- _group.start(100);
- }
- if (_workersCount) {
- defaultPoolThreads = _workersCount;
- workerPool(); // Initilize worker poll
- }
- writefln("worker count: %d", _workersCount);
- writefln("IO thread: %d", _group.size);
- while (true) {
- try {
- version (HUNT_DEBUG)
- trace("Waiting for server.accept()");
- Socket socket = server.accept();
- version (HUNT_DEBUG) {
- infof("new connection from %s, fd=%d",
- socket.remoteAddress.toString(), socket.handle());
- }
- // EventLoop loop = _group.nextLoop();
- EventLoop loop = _group.nextLoop(socket.handle);
- TcpStream stream = new TcpStream(loop, socket, _tcpStreamoption);
- onConnectionAccepted(stream);
- } catch (Exception e) {
- warningf("Failure on accepting %s", e);
- break;
- }
- }
- _isStarted = false;
- }
- protected void onConnectionAccepted(TcpStream client);
- void stop() {
- if (!_isStarted)
- return;
- _isStarted = false;
- _group.stop();
- }
- }
- alias ProcessorCreater = HttpProcessor delegate(TcpStream client);
- /**
- */
- class HttpServer(T) : AbstractTcpServer if (is(T : HttpProcessor)) {
- this(string ip, ushort port, int thread = (totalCPUs - 1)) {
- super(new InternetAddress(ip, port), thread);
- }
- this(Address address, int thread = (totalCPUs - 1)) {
- super(address, thread);
- }
- override protected void onConnectionAccepted(TcpStream client) {
- HttpProcessor httpProcessor = new T(client);
- httpProcessor.run();
- }
- }
|