ThreadServer.hx 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. /*
  2. * Copyright (c) 2005, The haXe Project Contributors
  3. * All rights reserved.
  4. * Redistribution and use in source and binary forms, with or without
  5. * modification, are permitted provided that the following conditions are met:
  6. *
  7. * - Redistributions of source code must retain the above copyright
  8. * notice, this list of conditions and the following disclaimer.
  9. * - Redistributions in binary form must reproduce the above copyright
  10. * notice, this list of conditions and the following disclaimer in the
  11. * documentation and/or other materials provided with the distribution.
  12. *
  13. * THIS SOFTWARE IS PROVIDED BY THE HAXE PROJECT CONTRIBUTORS "AS IS" AND ANY
  14. * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  15. * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  16. * DISCLAIMED. IN NO EVENT SHALL THE HAXE PROJECT CONTRIBUTORS BE LIABLE FOR
  17. * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  18. * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
  19. * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
  20. * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  21. * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  22. * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
  23. * DAMAGE.
  24. */
  25. package neko.net;
  26. private typedef ThreadInfos = {
  27. var id : Int;
  28. var t : neko.vm.Thread;
  29. var p : neko.net.Poll;
  30. var socks : Array<neko.net.Socket>;
  31. }
  32. private typedef ClientInfos<Client> = {
  33. var client : Client;
  34. var sock : neko.net.Socket;
  35. var thread : ThreadInfos;
  36. var buf : haxe.io.Bytes;
  37. var bufpos : Int;
  38. }
  39. class ThreadServer<Client,Message> {
  40. var threads : Array<ThreadInfos>;
  41. var sock : neko.net.Socket;
  42. var worker : neko.vm.Thread;
  43. var timer : neko.vm.Thread;
  44. public var listen : Int;
  45. public var nthreads : Int;
  46. public var connectLag : Float;
  47. public var errorOutput : haxe.io.Output;
  48. public var initialBufferSize : Int;
  49. public var maxBufferSize : Int;
  50. public var messageHeaderSize : Int;
  51. public var updateTime : Float;
  52. public var maxSockPerThread : Int;
  53. public function new() {
  54. threads = new Array();
  55. nthreads = if( neko.Sys.systemName() == "Windows" ) 150 else 10;
  56. messageHeaderSize = 1;
  57. listen = 10;
  58. connectLag = 0.5;
  59. errorOutput = neko.io.File.stderr();
  60. initialBufferSize = (1 << 10);
  61. maxBufferSize = (1 << 16);
  62. maxSockPerThread = 64;
  63. updateTime = 1;
  64. }
  65. function runThread(t) {
  66. while( true ) {
  67. try {
  68. loopThread(t);
  69. } catch( e : Dynamic ) {
  70. logError(e);
  71. }
  72. }
  73. }
  74. function readClientData( c : ClientInfos<Client> ) {
  75. var available = c.buf.length - c.bufpos;
  76. if( available == 0 ) {
  77. var newsize = c.buf.length * 2;
  78. if( newsize > maxBufferSize ) {
  79. newsize = maxBufferSize;
  80. if( c.buf.length == maxBufferSize )
  81. throw "Max buffer size reached";
  82. }
  83. var newbuf = haxe.io.Bytes.alloc(newsize);
  84. newbuf.blit(0,c.buf,0,c.bufpos);
  85. c.buf = newbuf;
  86. available = newsize - c.bufpos;
  87. }
  88. var bytes = c.sock.input.readBytes(c.buf,c.bufpos,available);
  89. var pos = 0;
  90. var len = c.bufpos + bytes;
  91. while( len >= messageHeaderSize ) {
  92. var m = readClientMessage(c.client,c.buf,pos,len);
  93. if( m == null )
  94. break;
  95. pos += m.bytes;
  96. len -= m.bytes;
  97. work(callback(clientMessage,c.client,m.msg));
  98. }
  99. if( pos > 0 )
  100. c.buf.blit(0,c.buf,pos,len);
  101. c.bufpos = len;
  102. }
  103. function loopThread( t : ThreadInfos ) {
  104. if( t.socks.length > 0 )
  105. for( s in t.p.poll(t.socks,connectLag) ) {
  106. var infos : ClientInfos<Client> = s.custom;
  107. try {
  108. readClientData(infos);
  109. } catch( e : Dynamic ) {
  110. t.socks.remove(s);
  111. if( !Std.is(e,haxe.io.Eof) && !Std.is(e,haxe.io.Error) )
  112. logError(e);
  113. work(callback(doClientDisconnected,s,infos.client));
  114. }
  115. }
  116. while( true ) {
  117. var m : { s : neko.net.Socket, cnx : Bool } = neko.vm.Thread.readMessage(t.socks.length == 0);
  118. if( m == null )
  119. break;
  120. if( m.cnx )
  121. t.socks.push(m.s);
  122. else if( t.socks.remove(m.s) ) {
  123. var infos : ClientInfos<Client> = m.s.custom;
  124. work(callback(doClientDisconnected,m.s,infos.client));
  125. }
  126. }
  127. }
  128. function doClientDisconnected(s,c) {
  129. try s.close() catch( e : Dynamic ) {};
  130. clientDisconnected(c);
  131. }
  132. function runWorker() {
  133. while( true ) {
  134. var f = neko.vm.Thread.readMessage(true);
  135. try {
  136. f();
  137. } catch( e : Dynamic ) {
  138. logError(e);
  139. }
  140. try {
  141. afterEvent();
  142. } catch( e : Dynamic ) {
  143. logError(e);
  144. }
  145. }
  146. }
  147. public function work( f : Void -> Void ) {
  148. worker.sendMessage(f);
  149. }
  150. function logError( e : Dynamic ) {
  151. var stack = haxe.Stack.exceptionStack();
  152. if( neko.vm.Thread.current() == worker )
  153. onError(e,stack);
  154. else
  155. work(callback(onError,e,stack));
  156. }
  157. function addClient( sock : neko.net.Socket ) {
  158. var infos : ClientInfos<Client> = {
  159. thread : threads[Std.random(nthreads)],
  160. client : clientConnected(sock),
  161. sock : sock,
  162. buf : haxe.io.Bytes.alloc(initialBufferSize),
  163. bufpos : 0,
  164. };
  165. sock.custom = infos;
  166. infos.thread.t.sendMessage({ s : sock, cnx : true });
  167. }
  168. function runTimer() {
  169. var l = new neko.vm.Lock();
  170. while( true ) {
  171. l.wait(updateTime);
  172. work(update);
  173. }
  174. }
  175. function init() {
  176. worker = neko.vm.Thread.create(runWorker);
  177. timer = neko.vm.Thread.create(runTimer);
  178. for( i in 0...nthreads ) {
  179. var t = {
  180. id : i,
  181. t : null,
  182. socks : new Array(),
  183. p : new neko.net.Poll(maxSockPerThread),
  184. };
  185. threads.push(t);
  186. t.t = neko.vm.Thread.create(callback(runThread,t));
  187. }
  188. }
  189. public function addSocket( s : neko.net.Socket ) {
  190. s.setBlocking(false);
  191. work(callback(addClient,s));
  192. }
  193. public function run( host, port ) {
  194. sock = new neko.net.Socket();
  195. sock.bind(new neko.net.Host(host),port);
  196. sock.listen(listen);
  197. init();
  198. while( true ) {
  199. try {
  200. addSocket(sock.accept());
  201. } catch( e : Dynamic ) {
  202. logError(e);
  203. }
  204. }
  205. }
  206. public function sendData( s : neko.net.Socket, data : String ) {
  207. try {
  208. s.write(data);
  209. } catch( e : Dynamic ) {
  210. stopClient(s);
  211. }
  212. }
  213. public function stopClient( s : neko.net.Socket ) {
  214. var infos : ClientInfos<Client> = s.custom;
  215. try s.shutdown(true,true) catch( e : Dynamic ) { };
  216. infos.thread.t.sendMessage({ s : s, cnx : false });
  217. }
  218. // --- CUSTOMIZABLE API ---
  219. public dynamic function onError( e : Dynamic, stack ) {
  220. var estr = try Std.string(e) catch( e2 : Dynamic ) "???" + try "["+Std.string(e2)+"]" catch( e : Dynamic ) "";
  221. errorOutput.writeString( estr + "\n" + haxe.Stack.toString(stack) );
  222. errorOutput.flush();
  223. }
  224. public dynamic function clientConnected( s : neko.net.Socket ) : Client {
  225. return null;
  226. }
  227. public dynamic function clientDisconnected( c : Client ) {
  228. }
  229. public dynamic function readClientMessage( c : Client, buf : haxe.io.Bytes, pos : Int, len : Int ) : { msg : Message, bytes : Int } {
  230. return {
  231. msg : null,
  232. bytes : len,
  233. };
  234. }
  235. public dynamic function clientMessage( c : Client, msg : Message ) {
  236. }
  237. public dynamic function update() {
  238. }
  239. public dynamic function afterEvent() {
  240. }
  241. }