Server.d 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. module http.Server;
  2. import hunt.event;
  3. import hunt.io;
  4. import hunt.logging.ConsoleLogger;
  5. import hunt.system.Memory : totalCPUs;
  6. import hunt.util.DateTime;
  7. import std.array;
  8. import std.conv;
  9. import std.json;
  10. import std.socket;
  11. import std.string;
  12. import std.stdio;
  13. import http.Parser;
  14. import http.Processor;
  15. shared static this() {
  16. DateTimeHelper.startClock();
  17. }
  18. import hunt.io.channel;
  19. /**
  20. */
  21. abstract class AbstractTcpServer {
  22. protected EventLoopGroup _group = null;
  23. protected bool _isStarted = false;
  24. protected Address _address;
  25. protected int _workersCount;
  26. TcpStreamOption _tcpStreamoption;
  27. this(Address address, int thread = (totalCPUs - 1), int workersCount = 0) {
  28. this._address = address;
  29. _tcpStreamoption = TcpStreamOption.createOption();
  30. _tcpStreamoption.bufferSize = 1024 * 2;
  31. _tcpStreamoption.isKeepalive = false;
  32. _group = new EventLoopGroup(cast(uint) thread);
  33. this._workersCount = workersCount;
  34. }
  35. @property Address bindingAddress() {
  36. return _address;
  37. }
  38. void start() {
  39. if (_isStarted)
  40. return;
  41. _isStarted = true;
  42. Socket server = new TcpSocket();
  43. server.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true);
  44. server.bind(new InternetAddress("0.0.0.0", 8080));
  45. server.listen(8192);
  46. trace("Launching server");
  47. debug {
  48. _group.start();
  49. } else {
  50. _group.start(100);
  51. }
  52. if (_workersCount) {
  53. defaultPoolThreads = _workersCount;
  54. workerPool(); // Initilize worker poll
  55. }
  56. writefln("worker count: %d", _workersCount);
  57. writefln("IO thread: %d", _group.size);
  58. while (true) {
  59. try {
  60. version (HUNT_DEBUG)
  61. trace("Waiting for server.accept()");
  62. Socket socket = server.accept();
  63. version (HUNT_DEBUG) {
  64. infof("new connection from %s, fd=%d",
  65. socket.remoteAddress.toString(), socket.handle());
  66. }
  67. // EventLoop loop = _group.nextLoop();
  68. EventLoop loop = _group.nextLoop(socket.handle);
  69. TcpStream stream = new TcpStream(loop, socket, _tcpStreamoption);
  70. onConnectionAccepted(stream);
  71. } catch (Exception e) {
  72. warningf("Failure on accepting %s", e);
  73. break;
  74. }
  75. }
  76. _isStarted = false;
  77. }
  78. protected void onConnectionAccepted(TcpStream client);
  79. void stop() {
  80. if (!_isStarted)
  81. return;
  82. _isStarted = false;
  83. _group.stop();
  84. }
  85. }
  86. alias ProcessorCreater = HttpProcessor delegate(TcpStream client);
  87. /**
  88. */
  89. class HttpServer(T) : AbstractTcpServer if (is(T : HttpProcessor)) {
  90. this(string ip, ushort port, int thread = (totalCPUs - 1)) {
  91. super(new InternetAddress(ip, port), thread);
  92. }
  93. this(Address address, int thread = (totalCPUs - 1)) {
  94. super(address, thread);
  95. }
  96. override protected void onConnectionAccepted(TcpStream client) {
  97. HttpProcessor httpProcessor = new T(client);
  98. httpProcessor.run();
  99. }
  100. }