浏览代码

added @:rpc for network

ncannasse 9 年之前
父节点
当前提交
1c770ffc69
共有 6 个文件被更改,包括 159 次插入16 次删除
  1. 4 0
      hxd/net/LocalHost.hx
  2. 81 7
      hxd/net/Macros.hx
  3. 35 7
      hxd/net/NetworkHost.hx
  4. 1 0
      hxd/net/NetworkSerializable.hx
  5. 16 2
      hxd/net/Socket.hx
  6. 22 0
      samples/network/Main.hx

+ 4 - 0
hxd/net/LocalHost.hx

@@ -22,6 +22,8 @@ class LocalClient extends NetworkClient {
 
 	function readData() {
 		if( messageLength < 0 ) {
+			if( socket.input.available < 4 )
+				return;
 			messageLength = socket.input.readInt32();
 			if( pendingBuffer == null || pendingBuffer.length < messageLength )
 				pendingBuffer = haxe.io.Bytes.alloc(messageLength);
@@ -34,10 +36,12 @@ class LocalClient extends NetworkClient {
 			while( pendingPos < messageLength )
 				pendingPos = processMessage(pendingBuffer, pendingPos);
 			messageLength = -1;
+			readData();
 		}
 	}
 
 	override function send( bytes : haxe.io.Bytes ) {
+		socket.out.wait();
 		socket.out.writeInt32(bytes.length);
 		socket.out.write(bytes);
 		socket.out.flush();

+ 81 - 7
hxd/net/Macros.hx

@@ -576,26 +576,37 @@ class Macros {
 			return null;
 		var fields = Context.getBuildFields();
 		var toSerialize = [];
+		var rpc = [];
 
 		if( !Context.defined("display") )
 		for( f in fields ) {
 			if( f.meta == null ) continue;
 			var m = null;
-			for( meta in f.meta )
+			for( meta in f.meta ) {
 				if( meta.name == ":s" ) {
 					toSerialize.push({ f : f, m : m });
 					break;
 				}
+				if( meta.name == ":rpc" ) {
+					rpc.push({ f : f, m:m });
+					break;
+				}
+			}
 		}
 
 		var sup = cl.superClass;
 		var isSubSer = sup != null && isSerializable(sup.t);
-		var startFID = 0;
-		if( isSubSer )
+		var startFID = 0, rpcID = 0;
+		if( isSubSer ) {
 			startFID = switch( sup.t.get().meta.extract(":fieldID")[0].params[0].expr ) {
 			case EConst(CInt(i)): Std.parseInt(i);
 			default: throw "assert";
 			}
+			rpcID = switch( sup.t.get().meta.extract(":rpcID")[0].params[0].expr ) {
+			case EConst(CInt(i)): Std.parseInt(i);
+			default: throw "assert";
+			}
+		}
 
 		var pos = Context.currentPos();
 		if( !isSubSer ) {
@@ -700,13 +711,54 @@ class Macros {
 			flushExpr.push(macro if( b & (1 << $v{ bitID } ) != 0 ) hxd.net.Macros.serializeValue(ctx, this.$fname));
 			syncExpr.push(macro if( __bits & (1 << $v{ bitID } ) != 0 ) hxd.net.Macros.unserializeValue(ctx, this.$fname));
 		}
-		if( startFID > 32 ) Context.error("Too many serializable fields", pos);
-		cl.meta.add(":fieldID", [macro $v { startFID } ], pos);
+
+		// BUILD RPC
+		var firstRPCID = rpcID;
+		var rpcCases = [];
+		for( r in rpc ) {
+			switch( r.f.kind ) {
+			case FFun(f):
+				var id = rpcID++;
+				if( f.ret != null && haxe.macro.ComplexTypeTools.toString(f.ret) != "Void" )
+					Context.error("RPC function cannot return a value", r.f.pos);
+				if( f.ret == null )
+					f.ret = macro : Void;
+				for( a in f.args )
+					if( a.type == null )
+						Context.error("Type required for rpc function argument " + r.f.name+"(" + a.name+")", r.f.pos);
+				var expr = f.expr;
+				f.expr = macro {
+					inline function __dispatch() {
+						var __ctx = __host.beginRPC(this,$v{id});
+						$b{[
+							for( a in f.args )
+								macro hxd.net.Macros.serializeValue(__ctx,$i{a.name})
+						] };
+					}
+					if( __host != null && !__host.isAuth ) {
+						__dispatch();
+						return;
+					}
+					$expr;
+					if( __host != null ) __dispatch();
+				};
+				var p = r.f.pos;
+				var exprs = [{ expr : EVars([for( a in f.args ) { name : a.name, type : a.type, expr : null }]), pos : p }];
+				for( a in f.args )
+					exprs.push(macro hxd.net.Macros.unserializeValue(__ctx,$i{a.name}));
+				exprs.push( { expr : ECall( { expr : EField({ expr : EConst(CIdent("this")), pos:p }, r.f.name), pos : p }, [for( a in f.args ) { expr : EConst(CIdent(a.name)), pos : p } ]), pos : p } );
+				rpcCases.push({ values : [{ expr : EConst(CInt(""+id)), pos : p }], guard : null, expr : { expr : EBlock(exprs), pos : p } });
+			default:
+				Context.error("Cannot use @:rpc on non function", r.f.pos);
+			}
+		}
+
+		// Add network methods
+		var access = [APublic];
+		if( isSubSer ) access.push(AOverride);
 
 		if( fields.length != 0 || !isSubSer ) {
-			var access = [APublic];
 			if( isSubSer ) {
-				access.push(AOverride);
 				flushExpr.unshift(macro super.networkFlush(ctx));
 				syncExpr.unshift(macro super.networkSync(ctx));
 			} else {
@@ -738,9 +790,31 @@ class Macros {
 					expr : macro @:privateAccess $b{syncExpr},
 				}),
 			});
+		}
 
+		if( rpc.length != 0 || !isSubSer ) {
+			var swExpr = { expr : ESwitch( { expr : EConst(CIdent("__id")), pos : pos }, rpcCases, macro throw "Unknown RPC identifier " + __id), pos : pos };
+			fields.push({
+				name : "networkRPC",
+				pos : pos,
+				access : access,
+				meta : noComplete,
+				kind : FFun({
+					args : [ { name : "__ctx", type : macro : hxd.net.Serializer }, { name : "__id", type : macro : Int } ],
+					ret : null,
+					expr : if( isSubSer && firstRPCID > 0 ) macro { if( __id < $v { firstRPCID } ) { super.networkRPC(__ctx, __id); return; } $swExpr; } else swExpr,
+				}),
+			});
 		}
 
+
+		// add metadata
+
+		if( startFID > 32 ) Context.error("Too many serializable fields", pos);
+		if( rpcID > 255 ) Context.error("Too many rpc calls", pos);
+		cl.meta.add(":fieldID", [macro $v { startFID } ], pos);
+		cl.meta.add(":rpcID", [macro $v { rpcID } ], pos);
+
 		return fields;
 	}
 

+ 35 - 7
hxd/net/NetworkHost.hx

@@ -56,6 +56,16 @@ class NetworkClient {
 				if( o == null ) break;
 			}
 			host.onSync(obj);
+		case NetworkHost.RPC:
+			var o : hxd.net.NetworkSerializable = cast ctx.refs[ctx.getInt()];
+			var fid = ctx.getByte();
+			if( !o.__host.isAuth ) {
+				var old = o.__host;
+				o.__host = null;
+				o.networkRPC(ctx, fid);
+				o.__host = old;
+			} else
+				o.networkRPC(ctx, fid);
 		case x:
 			error("Unknown message code " + x);
 		}
@@ -67,11 +77,11 @@ class NetworkClient {
 @:allow(hxd.net.NetworkClient)
 class NetworkHost {
 
-	static inline var SYNC = 0;
-	static inline var REG = 1;
-	static inline var UNREG = 2;
-	static inline var FULLSYNC = 3;
-	static inline var EOF = 0xFF;
+	static inline var SYNC 		= 1;
+	static inline var REG 		= 2;
+	static inline var UNREG 	= 3;
+	static inline var FULLSYNC 	= 4;
+	static inline var RPC 		= 5;
 
 	public static var current : NetworkHost = null;
 
@@ -81,6 +91,7 @@ class NetworkHost {
 	var clients : Array<NetworkClient>;
 	var isAlive = false;
 	var logger : String -> Void;
+	var hasData = false;
 
 	public function new() {
 		current = this;
@@ -95,6 +106,19 @@ class NetworkHost {
 		markHead = o;
 	}
 
+	public function beginRPC(o:NetworkSerializable, id:Int) {
+		flushProps();
+		hasData = true;
+		if( ctx.refs[o.__uid] == null )
+			throw "Can't call RPC on an object not previously transferred";
+		ctx.addByte(RPC);
+		ctx.addInt(o.__uid);
+		ctx.addByte(id);
+		if( logger != null )
+			logger("RPC " + o +"#"+id);
+		return ctx;
+	}
+
 	public function makeAlive() {
 		isAlive = true;
 		for( o in ctx.refs ) {
@@ -137,9 +161,8 @@ class NetworkHost {
 		trace("SYNC " + obj);
 	}
 
-	public function flush() {
+	function flushProps() {
 		var o = markHead;
-		var hasData = false;
 		while( o != null ) {
 			if( o.__bits != 0 ) {
 //				if( logger != null )
@@ -152,6 +175,10 @@ class NetworkHost {
 			o = o.__next;
 		}
 		markHead = null;
+	}
+
+	public function flush() {
+		flushProps();
 		if( !hasData )
 			return;
 		@:privateAccess {
@@ -159,6 +186,7 @@ class NetworkHost {
 			ctx.out = new haxe.io.BytesBuffer();
 			send(bytes);
 		}
+		hasData = false;
 	}
 
 	public static function enableReplication( o : NetworkSerializable, b : Bool ) {

+ 1 - 0
hxd/net/NetworkSerializable.hx

@@ -17,6 +17,7 @@ interface NetworkSerializable extends Serializable extends ProxyHost {
 	public var enableReplication(get, set) : Bool;
 	public function networkFlush( ctx : Serializer ) : Void;
 	public function networkSync( ctx : Serializer ) : Void;
+	public function networkRPC( ctx : Serializer, rpcID : Int ) : Void;
 	public function alive() : Void;
 }
 

+ 16 - 2
hxd/net/Socket.hx

@@ -20,6 +20,16 @@ private class SocketOutput extends haxe.io.Output {
 
 }
 
+private class SocketInput extends haxe.io.Input {
+
+	public var available(get, never) : Int;
+
+	function get_available() {
+		return 0;
+	}
+
+}
+
 #if flash
 private class FlashSocketOutput extends SocketOutput {
 	var s : flash.net.Socket;
@@ -67,7 +77,7 @@ private class FlashSocketOutput extends SocketOutput {
 
 }
 
-private class FlashSocketInput extends haxe.io.Input {
+private class FlashSocketInput extends SocketInput {
 
 	var sock : flash.net.Socket;
 
@@ -75,6 +85,10 @@ private class FlashSocketInput extends haxe.io.Input {
 		sock = s;
 	}
 
+	override function get_available() {
+		return sock.bytesAvailable;
+	}
+
 	override function readBytes( bytes : haxe.io.Bytes, pos : Int, len : Int ) {
 		if( len > (sock.bytesAvailable : Int) ) {
 			len = sock.bytesAvailable;
@@ -104,7 +118,7 @@ class Socket {
 	var serv : flash.net.ServerSocket;
 	#end
 	public var out(default, null) : SocketOutput;
-	public var input(default, null) : haxe.io.Input;
+	public var input(default, null) : SocketInput;
 	public var timeout(default, set) : Null<Float>;
 
 	public function new() {

+ 22 - 0
samples/network/Main.hx

@@ -16,6 +16,7 @@ class Cursor implements hxd.net.NetworkSerializable {
 	}
 
 	function set_x( v : Float ) {
+		if( v == x ) return v;
 		if( bmp != null ) bmp.x = v;
 		return this.x = v;
 	}
@@ -39,6 +40,24 @@ class Cursor implements hxd.net.NetworkSerializable {
 		bmp.drawCircle(0, 0, 5);
 
 		enableReplication = true;
+
+		var i = new h2d.Interactive(10, 10, bmp);
+		i.x = i.y = -5;
+		i.isEllipse = true;
+		i.onClick = function(_) blink( 2 + Math.random() * 2 );
+	}
+
+	@:rpc function blink( s : Float ) {
+		bmp.scale(s);
+		main.event.waitUntil(function(dt) {
+			bmp.scaleX *= Math.pow(0.9, dt);
+			bmp.scaleY *= Math.pow(0.9, dt);
+			if( bmp.scaleX < 1 ) {
+				bmp.scaleX = bmp.scaleY = 1;
+				return true;
+			}
+			return false;
+		});
 	}
 
 	public function alive() {
@@ -56,9 +75,11 @@ class Main extends hxd.App {
 	static var PORT = 6676;
 
 	public var host : hxd.net.LocalHost;
+	public var event : hxd.WaitEvent;
 	var cursor : Cursor;
 
 	override function init() {
+		event = new hxd.WaitEvent();
 		host = new hxd.net.LocalHost();
 		host.setLogger(function(msg) log(msg));
 		try {
@@ -108,6 +129,7 @@ class Main extends hxd.App {
 	}
 
 	override function update(dt:Float) {
+		event.update(dt);
 		if( cursor != null ) {
 			cursor.x = s2d.mouseX;
 			cursor.y = s2d.mouseY;