123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- /*
- * Copyright (C)2005-2019 Haxe Foundation
- *
- * Permission is hereby granted, free of charge, to any person obtaining a
- * copy of this software and associated documentation files (the "Software"),
- * to deal in the Software without restriction, including without limitation
- * the rights to use, copy, modify, merge, publish, distribute, sublicense,
- * and/or sell copies of the Software, and to permit persons to whom the
- * Software is furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
- * DEALINGS IN THE SOFTWARE.
- */
- package cpp.net;
- import cpp.vm.Thread;
- import cpp.net.Poll;
- import cpp.vm.Lock;
- private typedef ThreadInfos = {
- var id:Int;
- var t:Thread;
- var p:Poll;
- var socks:Array<sys.net.Socket>;
- }
- private typedef ClientInfos<Client> = {
- var client:Client;
- var sock:sys.net.Socket;
- var thread:ThreadInfos;
- var buf:haxe.io.Bytes;
- var bufpos:Int;
- }
- /**
- The ThreadServer can be used to easily create a multithreaded server where each thread polls multiple connections.
- To use it, at a minimum you must override or rebind clientConnected, readClientMessage, and clientMessage and you must define your Client and Message.
- **/
- class ThreadServer<Client, Message> {
- var threads:Array<ThreadInfos>;
- var sock:sys.net.Socket;
- var worker:Thread;
- var timer:Thread;
- /**
- Number of total connections the server will accept.
- **/
- public var listen:Int;
- /**
- Number of server threads.
- **/
- public var nthreads:Int;
- /**
- Polling timeout.
- **/
- public var connectLag:Float;
- /**
- Stream to send error messages.
- **/
- public var errorOutput:haxe.io.Output;
- /**
- Space allocated to buffers when they are created.
- **/
- public var initialBufferSize:Int;
- /**
- Maximum size of buffered data read from a socket. An exception is thrown if the buffer exceeds this value.
- **/
- public var maxBufferSize:Int;
- /**
- Minimum message size.
- **/
- public var messageHeaderSize:Int;
- /**
- Time between calls to update.
- **/
- public var updateTime:Float;
- /**
- The most sockets a thread will handle.
- **/
- public var maxSockPerThread:Int;
- /**
- Creates a ThreadServer.
- **/
- public function new() {
- threads = new Array();
- nthreads = if (Sys.systemName() == "Windows") 150 else 10;
- messageHeaderSize = 1;
- listen = 10;
- connectLag = 0.5;
- errorOutput = Sys.stderr();
- initialBufferSize = (1 << 10);
- maxBufferSize = (1 << 16);
- maxSockPerThread = 64;
- updateTime = 1;
- }
- function runThread(t) {
- while (true) {
- try {
- loopThread(t);
- } catch (e:Dynamic) {
- logError(e);
- }
- }
- }
- function readClientData(c:ClientInfos<Client>) {
- var available = c.buf.length - c.bufpos;
- if (available == 0) {
- var newsize = c.buf.length * 2;
- if (newsize > maxBufferSize) {
- newsize = maxBufferSize;
- if (c.buf.length == maxBufferSize)
- throw "Max buffer size reached";
- }
- var newbuf = haxe.io.Bytes.alloc(newsize);
- newbuf.blit(0, c.buf, 0, c.bufpos);
- c.buf = newbuf;
- available = newsize - c.bufpos;
- }
- var bytes = c.sock.input.readBytes(c.buf, c.bufpos, available);
- var pos = 0;
- var len = c.bufpos + bytes;
- while (len >= messageHeaderSize) {
- var m = readClientMessage(c.client, c.buf, pos, len);
- if (m == null)
- break;
- pos += m.bytes;
- len -= m.bytes;
- work(clientMessage.bind(c.client, m.msg));
- }
- if (pos > 0)
- c.buf.blit(0, c.buf, pos, len);
- c.bufpos = len;
- }
- function loopThread(t:ThreadInfos) {
- if (t.socks.length > 0)
- for (s in t.p.poll(t.socks, connectLag)) {
- var infos:ClientInfos<Client> = s.custom;
- try {
- readClientData(infos);
- } catch (e:Dynamic) {
- t.socks.remove(s);
- if (!Std.is(e, haxe.io.Eof) && !Std.is(e, haxe.io.Error))
- logError(e);
- work(doClientDisconnected.bind(s, infos.client));
- }
- }
- while (true) {
- var m:{s:sys.net.Socket, cnx:Bool} = Thread.readMessage(t.socks.length == 0);
- if (m == null)
- break;
- if (m.cnx)
- t.socks.push(m.s);
- else if (t.socks.remove(m.s)) {
- var infos:ClientInfos<Client> = m.s.custom;
- work(doClientDisconnected.bind(m.s, infos.client));
- }
- }
- }
- function doClientDisconnected(s:sys.net.Socket, c) {
- try
- s.close()
- catch (e:Dynamic) {};
- clientDisconnected(c);
- }
- function runWorker() {
- while (true) {
- var f = Thread.readMessage(true);
- try {
- f();
- } catch (e:Dynamic) {
- logError(e);
- }
- try {
- afterEvent();
- } catch (e:Dynamic) {
- logError(e);
- }
- }
- }
- /**
- Internally used to delegate something to the worker thread.
- **/
- public function work(f:Void->Void) {
- worker.sendMessage(f);
- }
- function logError(e:Dynamic) {
- var stack = haxe.CallStack.exceptionStack();
- if (Thread.current() == worker)
- onError(e, stack);
- else
- work(onError.bind(e, stack));
- }
- function addClient(sock:sys.net.Socket) {
- var start = Std.random(nthreads);
- for (i in 0...nthreads) {
- var t = threads[(start + i) % nthreads];
- if (t.socks.length < maxSockPerThread) {
- var infos:ClientInfos<Client> = {
- thread: t,
- client: clientConnected(sock),
- sock: sock,
- buf: haxe.io.Bytes.alloc(initialBufferSize),
- bufpos: 0,
- };
- sock.custom = infos;
- infos.thread.t.sendMessage({s: sock, cnx: true});
- return;
- }
- }
- refuseClient(sock);
- }
- function refuseClient(sock:sys.net.Socket) {
- // we have reached maximum number of active clients
- sock.close();
- }
- function runTimer() {
- var l = new Lock();
- while (true) {
- l.wait(updateTime);
- work(update);
- }
- }
- function init() {
- worker = Thread.create(runWorker);
- timer = Thread.create(runTimer);
- for (i in 0...nthreads) {
- var t = {
- id: i,
- t: null,
- socks: new Array(),
- p: new Poll(maxSockPerThread),
- };
- threads.push(t);
- t.t = Thread.create(runThread.bind(t));
- }
- }
- /**
- Called when the server gets a new connection.
- **/
- public function addSocket(s:sys.net.Socket) {
- s.setBlocking(false);
- work(addClient.bind(s));
- }
- /**
- Start the server at the specified host and port.
- **/
- public function run(host, port) {
- sock = new sys.net.Socket();
- sock.bind(new sys.net.Host(host), port);
- sock.listen(listen);
- init();
- while (true) {
- try {
- addSocket(sock.accept());
- } catch (e:Dynamic) {
- logError(e);
- }
- }
- }
- /**
- Send data to a client.
- **/
- public function sendData(s:sys.net.Socket, data:String) {
- try {
- s.write(data);
- } catch (e:Dynamic) {
- stopClient(s);
- }
- }
- /**
- Shutdown a client's connection and remove them from the server.
- **/
- public function stopClient(s:sys.net.Socket) {
- var infos:ClientInfos<Client> = s.custom;
- try
- s.shutdown(true, true)
- catch (e:Dynamic) {};
- infos.thread.t.sendMessage({s: s, cnx: false});
- }
- // --- CUSTOMIZABLE API ---
- /**
- Called when an error has ocurred.
- **/
- public dynamic function onError(e:Dynamic, stack) {
- var estr = try Std.string(e) catch (e2:Dynamic) "???" + try "[" + Std.string(e2) + "]" catch (e:Dynamic) "";
- errorOutput.writeString(estr + "\n" + haxe.CallStack.toString(stack));
- errorOutput.flush();
- }
- /**
- Called when a client connects. Returns a client object.
- **/
- public dynamic function clientConnected(s:sys.net.Socket):Client {
- return null;
- }
- /**
- Called when a client disconnects or an error forces the connection to close.
- **/
- public dynamic function clientDisconnected(c:Client) {}
- /**
- Called when data has been read from a socket. This method should try to extract a message from the buffer.
- The available data resides in buf, starts at pos, and is len bytes wide. Return the new message and the number of bytes read from the buffer.
- If no message could be read, return null.
- **/
- public dynamic function readClientMessage(c:Client, buf:haxe.io.Bytes, pos:Int, len:Int):{msg:Message, bytes:Int} {
- return {
- msg: null,
- bytes: len,
- };
- }
- /**
- Called when a message has been recieved. Message handling code should go here.
- **/
- public dynamic function clientMessage(c:Client, msg:Message) {}
- /**
- This method is called periodically. It can be used to do server maintenance.
- **/
- public dynamic function update() {}
- /**
- Called after a client connects, disconnects, a message is received, or an update is performed.
- **/
- public dynamic function afterEvent() {}
- }
|