| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 | /** Copyright (C)2005-2013 Haxe Foundation** Permission is hereby granted, free of charge, to any person obtaining a* copy of this software and associated documentation files (the "Software"),* to deal in the Software without restriction, including without limitation* the rights to use, copy, modify, merge, publish, distribute, sublicense,* and/or sell copies of the Software, and to permit persons to whom the* Software is furnished to do so, subject to the following conditions:** The above copyright notice and this permission notice shall be included in* all copies or substantial portions of the Software.** THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER* DEALINGS IN THE SOFTWARE.*/package cpp.net;import cpp.vm.Thread;import cpp.net.Poll;import cpp.vm.Lock;private typedef ThreadInfos = {        var id : Int;        var t : Thread;        var p : Poll;        var socks : Array<sys.net.Socket>;}private typedef ClientInfos<Client> = {        var client : Client;        var sock : sys.net.Socket;        var thread : ThreadInfos;        var buf : haxe.io.Bytes;        var bufpos : Int;}class ThreadServer<Client,Message> {        var threads : Array<ThreadInfos>;        var sock : sys.net.Socket;        var worker : Thread;        var timer : Thread;        public var listen : Int;        public var nthreads : Int;        public var connectLag : Float;        public var errorOutput : haxe.io.Output;        public var initialBufferSize : Int;        public var maxBufferSize : Int;        public var messageHeaderSize : Int;        public var updateTime : Float;        public var maxSockPerThread : Int;        public function new() {                threads = new Array();                nthreads = if( Sys.systemName() == "Windows" ) 150 else 10;                messageHeaderSize = 1;                listen = 10;                connectLag = 0.5;                errorOutput = Sys.stderr();                initialBufferSize = (1 << 10);                maxBufferSize = (1 << 16);                maxSockPerThread = 64;                updateTime = 1;        }        function runThread(t) {                while( true ) {                        try {                                loopThread(t);                        } catch( e : Dynamic ) {                                logError(e);                        }                }        }        function readClientData( c : ClientInfos<Client> ) {                var available = c.buf.length - c.bufpos;                if( available == 0 ) {                        var newsize = c.buf.length * 2;                        if( newsize > maxBufferSize ) {                                newsize = maxBufferSize;                                if( c.buf.length == maxBufferSize )                                        throw "Max buffer size reached";                        }                        var newbuf = haxe.io.Bytes.alloc(newsize);                        newbuf.blit(0,c.buf,0,c.bufpos);                        c.buf = newbuf;                        available = newsize - c.bufpos;                }                var bytes = c.sock.input.readBytes(c.buf,c.bufpos,available);                var pos = 0;                var len = c.bufpos + bytes;                while( len >= messageHeaderSize ) {                        var m = readClientMessage(c.client,c.buf,pos,len);                        if( m == null )                                break;                        pos += m.bytes;                        len -= m.bytes;                        work(clientMessage.bind(c.client,m.msg));                }                if( pos > 0 )                        c.buf.blit(0,c.buf,pos,len);                c.bufpos = len;        }        function loopThread( t : ThreadInfos ) {                if( t.socks.length > 0 )                        for( s in t.p.poll(t.socks,connectLag) ) {                                var infos : ClientInfos<Client> = s.custom;                                try {                                        readClientData(infos);                                } catch( e : Dynamic ) {                                        t.socks.remove(s);                                        if( !Std.is(e,haxe.io.Eof) && !Std.is(e,haxe.io.Error) )                                                logError(e);                                        work(doClientDisconnected.bind(s,infos.client));                                }                        }                while( true ) {                        var m : { s : sys.net.Socket, cnx : Bool } = Thread.readMessage(t.socks.length == 0);                        if( m == null )                                break;                        if( m.cnx )                                t.socks.push(m.s);                        else if( t.socks.remove(m.s) ) {                                var infos : ClientInfos<Client> = m.s.custom;                                work(doClientDisconnected.bind(m.s,infos.client));                        }                }        }        function doClientDisconnected(s,c) {                try s.close() catch( e : Dynamic ) {};                clientDisconnected(c);        }        function runWorker() {                while( true ) {                        var f = Thread.readMessage(true);                        try {                                f();                        } catch( e : Dynamic ) {                                logError(e);                        }                        try {                                afterEvent();                        } catch( e : Dynamic ) {                                logError(e);                        }                }        }        public function work( f : Void -> Void ) {                worker.sendMessage(f);        }        function logError( e : Dynamic ) {                var stack = haxe.CallStack.exceptionStack();                if( Thread.current() == worker )                        onError(e,stack);                else                        work(onError.bind(e,stack));        }        function addClient( sock : sys.net.Socket ) {                var start = Std.random(nthreads);                for( i in 0...nthreads ) {                        var t = threads[(start + i)%nthreads];                        if( t.socks.length < maxSockPerThread ) {                                var infos : ClientInfos<Client> = {                                        thread : t,                                        client : clientConnected(sock),                                        sock : sock,                                        buf : haxe.io.Bytes.alloc(initialBufferSize),                                        bufpos : 0,                                };                                sock.custom = infos;                                infos.thread.t.sendMessage({ s : sock, cnx : true });                                return;                        }                }                refuseClient(sock);        }        function refuseClient( sock : sys.net.Socket) {                // we have reached maximum number of active clients                sock.close();        }        function runTimer() {                var l = new Lock();                while( true ) {                        l.wait(updateTime);                        work(update);                }        }        function init() {                worker = Thread.create(runWorker);                timer = Thread.create(runTimer);                for( i in 0...nthreads ) {                        var t = {                                id : i,                                t : null,                                socks : new Array(),                                p : new Poll(maxSockPerThread),                        };                        threads.push(t);                        t.t = Thread.create(runThread.bind(t));                }        }        public function addSocket( s : sys.net.Socket ) {                s.setBlocking(false);                work(addClient.bind(s));        }        public function run( host, port ) {                sock = new sys.net.Socket();                sock.bind(new sys.net.Host(host),port);                sock.listen(listen);                init();                while( true ) {                        try {                                addSocket(sock.accept());                        } catch( e : Dynamic ) {                                logError(e);                        }                }        }        public function sendData( s : sys.net.Socket, data : String ) {                try {                        s.write(data);                } catch( e : Dynamic ) {                        stopClient(s);                }        }        public function stopClient( s : sys.net.Socket ) {                var infos : ClientInfos<Client> = s.custom;                try s.shutdown(true,true) catch( e : Dynamic ) { };                infos.thread.t.sendMessage({ s : s, cnx : false });        }        // --- CUSTOMIZABLE API ---        public dynamic function onError( e : Dynamic, stack ) {                var estr = try Std.string(e) catch( e2 : Dynamic ) "???" + try "["+Std.string(e2)+"]" catch( e : Dynamic ) "";                errorOutput.writeString( estr + "\n" + haxe.CallStack.toString(stack) );                errorOutput.flush();        }        public dynamic function clientConnected( s : sys.net.Socket ) : Client {                return null;        }        public dynamic function clientDisconnected( c : Client ) {        }        public dynamic function readClientMessage( c : Client, buf : haxe.io.Bytes, pos : Int, len : Int ) : { msg : Message, bytes : Int } {                return {                        msg : null,                        bytes : len,                };        }        public dynamic function clientMessage( c : Client, msg : Message ) {        }        public dynamic function update() {        }        public dynamic function afterEvent() {        }}
 |