ThreadServer.hx 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. /*
  2. * Copyright (C)2005-2012 Haxe Foundation
  3. *
  4. * Permission is hereby granted, free of charge, to any person obtaining a
  5. * copy of this software and associated documentation files (the "Software"),
  6. * to deal in the Software without restriction, including without limitation
  7. * the rights to use, copy, modify, merge, publish, distribute, sublicense,
  8. * and/or sell copies of the Software, and to permit persons to whom the
  9. * Software is furnished to do so, subject to the following conditions:
  10. *
  11. * The above copyright notice and this permission notice shall be included in
  12. * all copies or substantial portions of the Software.
  13. *
  14. * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
  20. * DEALINGS IN THE SOFTWARE.
  21. */
  22. package neko.net;
  23. private typedef ThreadInfos = {
  24. var id : Int;
  25. var t : neko.vm.Thread;
  26. var p : neko.net.Poll;
  27. var socks : Array<sys.net.Socket>;
  28. }
  29. private typedef ClientInfos<Client> = {
  30. var client : Client;
  31. var sock : sys.net.Socket;
  32. var thread : ThreadInfos;
  33. var buf : haxe.io.Bytes;
  34. var bufpos : Int;
  35. }
  36. class ThreadServer<Client,Message> {
  37. var threads : Array<ThreadInfos>;
  38. var sock : sys.net.Socket;
  39. var worker : neko.vm.Thread;
  40. var timer : neko.vm.Thread;
  41. public var listen : Int;
  42. public var nthreads : Int;
  43. public var connectLag : Float;
  44. public var errorOutput : haxe.io.Output;
  45. public var initialBufferSize : Int;
  46. public var maxBufferSize : Int;
  47. public var messageHeaderSize : Int;
  48. public var updateTime : Float;
  49. public var maxSockPerThread : Int;
  50. public function new() {
  51. threads = new Array();
  52. nthreads = if( Sys.systemName() == "Windows" ) 150 else 10;
  53. messageHeaderSize = 1;
  54. listen = 10;
  55. connectLag = 0.5;
  56. errorOutput = Sys.stderr();
  57. initialBufferSize = (1 << 10);
  58. maxBufferSize = (1 << 16);
  59. maxSockPerThread = 64;
  60. updateTime = 1;
  61. }
  62. function runThread(t) {
  63. while( true ) {
  64. try {
  65. loopThread(t);
  66. } catch( e : Dynamic ) {
  67. logError(e);
  68. }
  69. }
  70. }
  71. function readClientData( c : ClientInfos<Client> ) {
  72. var available = c.buf.length - c.bufpos;
  73. if( available == 0 ) {
  74. var newsize = c.buf.length * 2;
  75. if( newsize > maxBufferSize ) {
  76. newsize = maxBufferSize;
  77. if( c.buf.length == maxBufferSize )
  78. throw "Max buffer size reached";
  79. }
  80. var newbuf = haxe.io.Bytes.alloc(newsize);
  81. newbuf.blit(0,c.buf,0,c.bufpos);
  82. c.buf = newbuf;
  83. available = newsize - c.bufpos;
  84. }
  85. var bytes = c.sock.input.readBytes(c.buf,c.bufpos,available);
  86. var pos = 0;
  87. var len = c.bufpos + bytes;
  88. while( len >= messageHeaderSize ) {
  89. var m = readClientMessage(c.client,c.buf,pos,len);
  90. if( m == null )
  91. break;
  92. pos += m.bytes;
  93. len -= m.bytes;
  94. work(clientMessage.bind(c.client,m.msg));
  95. }
  96. if( pos > 0 )
  97. c.buf.blit(0,c.buf,pos,len);
  98. c.bufpos = len;
  99. }
  100. function loopThread( t : ThreadInfos ) {
  101. if( t.socks.length > 0 )
  102. for( s in t.p.poll(t.socks,connectLag) ) {
  103. var infos : ClientInfos<Client> = s.custom;
  104. try {
  105. readClientData(infos);
  106. } catch( e : Dynamic ) {
  107. t.socks.remove(s);
  108. if( !Std.is(e,haxe.io.Eof) && !Std.is(e,haxe.io.Error) )
  109. logError(e);
  110. work(doClientDisconnected.bind(s,infos.client));
  111. }
  112. }
  113. while( true ) {
  114. var m : { s : sys.net.Socket, cnx : Bool } = neko.vm.Thread.readMessage(t.socks.length == 0);
  115. if( m == null )
  116. break;
  117. if( m.cnx )
  118. t.socks.push(m.s);
  119. else if( t.socks.remove(m.s) ) {
  120. var infos : ClientInfos<Client> = m.s.custom;
  121. work(doClientDisconnected.bind(m.s,infos.client));
  122. }
  123. }
  124. }
  125. function doClientDisconnected(s,c) {
  126. try s.close() catch( e : Dynamic ) {};
  127. clientDisconnected(c);
  128. }
  129. function runWorker() {
  130. while( true ) {
  131. var f = neko.vm.Thread.readMessage(true);
  132. try {
  133. f();
  134. } catch( e : Dynamic ) {
  135. logError(e);
  136. }
  137. try {
  138. afterEvent();
  139. } catch( e : Dynamic ) {
  140. logError(e);
  141. }
  142. }
  143. }
  144. public function work( f : Void -> Void ) {
  145. worker.sendMessage(f);
  146. }
  147. function logError( e : Dynamic ) {
  148. var stack = haxe.CallStack.exceptionStack();
  149. if( neko.vm.Thread.current() == worker )
  150. onError(e,stack);
  151. else
  152. work(onError.bind(e,stack));
  153. }
  154. function addClient( sock : sys.net.Socket ) {
  155. var start = Std.random(nthreads);
  156. for( i in 0...nthreads ) {
  157. var t = threads[(start + i)%nthreads];
  158. if( t.socks.length < maxSockPerThread ) {
  159. var infos : ClientInfos<Client> = {
  160. thread : t,
  161. client : clientConnected(sock),
  162. sock : sock,
  163. buf : haxe.io.Bytes.alloc(initialBufferSize),
  164. bufpos : 0,
  165. };
  166. sock.custom = infos;
  167. infos.thread.t.sendMessage({ s : sock, cnx : true });
  168. return;
  169. }
  170. }
  171. refuseClient(sock);
  172. }
  173. function refuseClient( sock : sys.net.Socket) {
  174. // we have reached maximum number of active clients
  175. sock.close();
  176. }
  177. function runTimer() {
  178. var l = new neko.vm.Lock();
  179. while( true ) {
  180. l.wait(updateTime);
  181. work(update);
  182. }
  183. }
  184. function init() {
  185. worker = neko.vm.Thread.create(runWorker);
  186. timer = neko.vm.Thread.create(runTimer);
  187. for( i in 0...nthreads ) {
  188. var t = {
  189. id : i,
  190. t : null,
  191. socks : new Array(),
  192. p : new neko.net.Poll(maxSockPerThread),
  193. };
  194. threads.push(t);
  195. t.t = neko.vm.Thread.create(runThread.bind(t));
  196. }
  197. }
  198. public function addSocket( s : sys.net.Socket ) {
  199. s.setBlocking(false);
  200. work(addClient.bind(s));
  201. }
  202. public function run( host, port ) {
  203. sock = new sys.net.Socket();
  204. sock.bind(new sys.net.Host(host),port);
  205. sock.listen(listen);
  206. init();
  207. while( true ) {
  208. try {
  209. addSocket(sock.accept());
  210. } catch( e : Dynamic ) {
  211. logError(e);
  212. }
  213. }
  214. }
  215. public function sendData( s : sys.net.Socket, data : String ) {
  216. try {
  217. s.write(data);
  218. } catch( e : Dynamic ) {
  219. stopClient(s);
  220. }
  221. }
  222. public function stopClient( s : sys.net.Socket ) {
  223. var infos : ClientInfos<Client> = s.custom;
  224. try s.shutdown(true,true) catch( e : Dynamic ) { };
  225. infos.thread.t.sendMessage({ s : s, cnx : false });
  226. }
  227. // --- CUSTOMIZABLE API ---
  228. public dynamic function onError( e : Dynamic, stack ) {
  229. var estr = try Std.string(e) catch( e2 : Dynamic ) "???" + try "["+Std.string(e2)+"]" catch( e : Dynamic ) "";
  230. errorOutput.writeString( estr + "\n" + haxe.CallStack.toString(stack) );
  231. errorOutput.flush();
  232. }
  233. public dynamic function clientConnected( s : sys.net.Socket ) : Client {
  234. return null;
  235. }
  236. public dynamic function clientDisconnected( c : Client ) {
  237. }
  238. public dynamic function readClientMessage( c : Client, buf : haxe.io.Bytes, pos : Int, len : Int ) : { msg : Message, bytes : Int } {
  239. return {
  240. msg : null,
  241. bytes : len,
  242. };
  243. }
  244. public dynamic function clientMessage( c : Client, msg : Message ) {
  245. }
  246. public dynamic function update() {
  247. }
  248. public dynamic function afterEvent() {
  249. }
  250. }