ThreadServer.hx 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. /*
  2. * Copyright (C)2005-2013 Haxe Foundation
  3. *
  4. * Permission is hereby granted, free of charge, to any person obtaining a
  5. * copy of this software and associated documentation files (the "Software"),
  6. * to deal in the Software without restriction, including without limitation
  7. * the rights to use, copy, modify, merge, publish, distribute, sublicense,
  8. * and/or sell copies of the Software, and to permit persons to whom the
  9. * Software is furnished to do so, subject to the following conditions:
  10. *
  11. * The above copyright notice and this permission notice shall be included in
  12. * all copies or substantial portions of the Software.
  13. *
  14. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  20. * DEALINGS IN THE SOFTWARE.
  21. */
  22. package cpp.net;
  23. import cpp.vm.Thread;
  24. import cpp.net.Poll;
  25. import cpp.vm.Lock;
  26. private typedef ThreadInfos = {
  27. var id : Int;
  28. var t : Thread;
  29. var p : Poll;
  30. var socks : Array<sys.net.Socket>;
  31. }
  32. private typedef ClientInfos<Client> = {
  33. var client : Client;
  34. var sock : sys.net.Socket;
  35. var thread : ThreadInfos;
  36. var buf : haxe.io.Bytes;
  37. var bufpos : Int;
  38. }
  39. class ThreadServer<Client,Message> {
  40. var threads : Array<ThreadInfos>;
  41. var sock : sys.net.Socket;
  42. var worker : Thread;
  43. var timer : Thread;
  44. public var listen : Int;
  45. public var nthreads : Int;
  46. public var connectLag : Float;
  47. public var errorOutput : haxe.io.Output;
  48. public var initialBufferSize : Int;
  49. public var maxBufferSize : Int;
  50. public var messageHeaderSize : Int;
  51. public var updateTime : Float;
  52. public var maxSockPerThread : Int;
  53. public function new() {
  54. threads = new Array();
  55. nthreads = if( Sys.systemName() == "Windows" ) 150 else 10;
  56. messageHeaderSize = 1;
  57. listen = 10;
  58. connectLag = 0.5;
  59. errorOutput = Sys.stderr();
  60. initialBufferSize = (1 << 10);
  61. maxBufferSize = (1 << 16);
  62. maxSockPerThread = 64;
  63. updateTime = 1;
  64. }
  65. function runThread(t) {
  66. while( true ) {
  67. try {
  68. loopThread(t);
  69. } catch( e : Dynamic ) {
  70. logError(e);
  71. }
  72. }
  73. }
  74. function readClientData( c : ClientInfos<Client> ) {
  75. var available = c.buf.length - c.bufpos;
  76. if( available == 0 ) {
  77. var newsize = c.buf.length * 2;
  78. if( newsize > maxBufferSize ) {
  79. newsize = maxBufferSize;
  80. if( c.buf.length == maxBufferSize )
  81. throw "Max buffer size reached";
  82. }
  83. var newbuf = haxe.io.Bytes.alloc(newsize);
  84. newbuf.blit(0,c.buf,0,c.bufpos);
  85. c.buf = newbuf;
  86. available = newsize - c.bufpos;
  87. }
  88. var bytes = c.sock.input.readBytes(c.buf,c.bufpos,available);
  89. var pos = 0;
  90. var len = c.bufpos + bytes;
  91. while( len >= messageHeaderSize ) {
  92. var m = readClientMessage(c.client,c.buf,pos,len);
  93. if( m == null )
  94. break;
  95. pos += m.bytes;
  96. len -= m.bytes;
  97. work(clientMessage.bind(c.client,m.msg));
  98. }
  99. if( pos > 0 )
  100. c.buf.blit(0,c.buf,pos,len);
  101. c.bufpos = len;
  102. }
  103. function loopThread( t : ThreadInfos ) {
  104. if( t.socks.length > 0 )
  105. for( s in t.p.poll(t.socks,connectLag) ) {
  106. var infos : ClientInfos<Client> = s.custom;
  107. try {
  108. readClientData(infos);
  109. } catch( e : Dynamic ) {
  110. t.socks.remove(s);
  111. if( !Std.is(e,haxe.io.Eof) && !Std.is(e,haxe.io.Error) )
  112. logError(e);
  113. work(doClientDisconnected.bind(s,infos.client));
  114. }
  115. }
  116. while( true ) {
  117. var m : { s : sys.net.Socket, cnx : Bool } = Thread.readMessage(t.socks.length == 0);
  118. if( m == null )
  119. break;
  120. if( m.cnx )
  121. t.socks.push(m.s);
  122. else if( t.socks.remove(m.s) ) {
  123. var infos : ClientInfos<Client> = m.s.custom;
  124. work(doClientDisconnected.bind(m.s,infos.client));
  125. }
  126. }
  127. }
  128. function doClientDisconnected(s,c) {
  129. try s.close() catch( e : Dynamic ) {};
  130. clientDisconnected(c);
  131. }
  132. function runWorker() {
  133. while( true ) {
  134. var f = Thread.readMessage(true);
  135. try {
  136. f();
  137. } catch( e : Dynamic ) {
  138. logError(e);
  139. }
  140. try {
  141. afterEvent();
  142. } catch( e : Dynamic ) {
  143. logError(e);
  144. }
  145. }
  146. }
  147. public function work( f : Void -> Void ) {
  148. worker.sendMessage(f);
  149. }
  150. function logError( e : Dynamic ) {
  151. var stack = haxe.CallStack.exceptionStack();
  152. if( Thread.current() == worker )
  153. onError(e,stack);
  154. else
  155. work(onError.bind(e,stack));
  156. }
  157. function addClient( sock : sys.net.Socket ) {
  158. var start = Std.random(nthreads);
  159. for( i in 0...nthreads ) {
  160. var t = threads[(start + i)%nthreads];
  161. if( t.socks.length < maxSockPerThread ) {
  162. var infos : ClientInfos<Client> = {
  163. thread : t,
  164. client : clientConnected(sock),
  165. sock : sock,
  166. buf : haxe.io.Bytes.alloc(initialBufferSize),
  167. bufpos : 0,
  168. };
  169. sock.custom = infos;
  170. infos.thread.t.sendMessage({ s : sock, cnx : true });
  171. return;
  172. }
  173. }
  174. refuseClient(sock);
  175. }
  176. function refuseClient( sock : sys.net.Socket) {
  177. // we have reached maximum number of active clients
  178. sock.close();
  179. }
  180. function runTimer() {
  181. var l = new Lock();
  182. while( true ) {
  183. l.wait(updateTime);
  184. work(update);
  185. }
  186. }
  187. function init() {
  188. worker = Thread.create(runWorker);
  189. timer = Thread.create(runTimer);
  190. for( i in 0...nthreads ) {
  191. var t = {
  192. id : i,
  193. t : null,
  194. socks : new Array(),
  195. p : new Poll(maxSockPerThread),
  196. };
  197. threads.push(t);
  198. t.t = Thread.create(runThread.bind(t));
  199. }
  200. }
  201. public function addSocket( s : sys.net.Socket ) {
  202. s.setBlocking(false);
  203. work(addClient.bind(s));
  204. }
  205. public function run( host, port ) {
  206. sock = new sys.net.Socket();
  207. sock.bind(new sys.net.Host(host),port);
  208. sock.listen(listen);
  209. init();
  210. while( true ) {
  211. try {
  212. addSocket(sock.accept());
  213. } catch( e : Dynamic ) {
  214. logError(e);
  215. }
  216. }
  217. }
  218. public function sendData( s : sys.net.Socket, data : String ) {
  219. try {
  220. s.write(data);
  221. } catch( e : Dynamic ) {
  222. stopClient(s);
  223. }
  224. }
  225. public function stopClient( s : sys.net.Socket ) {
  226. var infos : ClientInfos<Client> = s.custom;
  227. try s.shutdown(true,true) catch( e : Dynamic ) { };
  228. infos.thread.t.sendMessage({ s : s, cnx : false });
  229. }
  230. // --- CUSTOMIZABLE API ---
  231. public dynamic function onError( e : Dynamic, stack ) {
  232. var estr = try Std.string(e) catch( e2 : Dynamic ) "???" + try "["+Std.string(e2)+"]" catch( e : Dynamic ) "";
  233. errorOutput.writeString( estr + "\n" + haxe.CallStack.toString(stack) );
  234. errorOutput.flush();
  235. }
  236. public dynamic function clientConnected( s : sys.net.Socket ) : Client {
  237. return null;
  238. }
  239. public dynamic function clientDisconnected( c : Client ) {
  240. }
  241. public dynamic function readClientMessage( c : Client, buf : haxe.io.Bytes, pos : Int, len : Int ) : { msg : Message, bytes : Int } {
  242. return {
  243. msg : null,
  244. bytes : len,
  245. };
  246. }
  247. public dynamic function clientMessage( c : Client, msg : Message ) {
  248. }
  249. public dynamic function update() {
  250. }
  251. public dynamic function afterEvent() {
  252. }
  253. }