Browse Source

review steam service implementation, allows several hosts/clients, ping service on top of the protocol (no auth required)

ncannasse 8 năm trước cách đây
mục cha
commit
a3f6e6320f
1 tập tin đã thay đổi với 206 bổ sung77 xóa
  1. 206 77
      hxd/net/SteamHost.hx

+ 206 - 77
hxd/net/SteamHost.hx

@@ -32,10 +32,8 @@ import hxbit.NetworkHost;
 @:allow(hxd.net.SteamHost)
 class SteamClient extends NetworkClient {
 
-	static var MAX_PACKET_SIZE = 512 * 1024;
-	static var MAX_PAYLOAD_SIZE = MAX_PACKET_SIZE - 32;
-
 	public var user : steam.User;
+	public var sessionId : Int = 0;
 
 	var shost : SteamHost;
 	var bigPacket : haxe.io.Bytes;
@@ -49,51 +47,211 @@ class SteamClient extends NetworkClient {
 	}
 
 	override function send( data : haxe.io.Bytes ) {
+		@:privateAccess shost.service.sendData(user, sessionId, data);
+	}
+
+	override function stop() {
+		super.stop();
+		//Sys.println("CLOSE " + user);
+		steam.Networking.closeSession(user);
+	}
+
+}
+
+@:access(hxd.net.SteamHost)
+class SteamService {
+
+	static inline var SPing = 1;
+	static inline var SPong = 2;
+	static inline var SRequestSession = 3;
+	static inline var SSessionAnswer = 4;
+	static inline var SSessionData = 5;
+	static inline var SBigPacket = 6;
+	static inline var SBigPacketData = 7;
+
+	static var previousIds = new Map();
+
+	var tmpBuf : haxe.io.Bytes;
+	var pingRequests : Array<{ user : steam.User, id : Int, time : Float, onResult : Null<Float> -> Void } > ;
+	var pingID : Int;
+
+	function new() {
+		tmpBuf = haxe.io.Bytes.alloc(5);
+		pingRequests = [];
+		pingID = randomID();
+	}
+
+	function onConnectionRequest( u : steam.User ) : Bool {
+		return true;
+	}
+
+	function onConnectionError( u : steam.User, error : steam.Networking.NetworkStatus ) : Void {
+		for( h in hosts )
+			h.onConnectionError(u);
+	}
+
+	function sendMessage( u : steam.User, msgId : Int, ident : Int, ?payload : haxe.io.Bytes, payloadSize = -1 ) {
+		if( payload != null ) {
+			if( payloadSize < 0 ) payloadSize = payload.length;
+			if( tmpBuf.length < payloadSize + 5 )
+				tmpBuf = haxe.io.Bytes.alloc(payloadSize + 5);
+			tmpBuf.blit(5, payload, 0, payloadSize);
+		} else
+			payloadSize = 0;
+		tmpBuf.set(0, msgId);
+		tmpBuf.setInt32(1, ident);
+		steam.Networking.sendP2P(u, tmpBuf, Reliable, 0, 5 + payloadSize);
+	}
+
+	static var MAX_PACKET_SIZE = 512 * 1024;
+	static var MAX_PAYLOAD_SIZE = MAX_PACKET_SIZE - 32;
+
+	public function sendData( u : steam.User, sessionId : Int, data : haxe.io.Bytes ) {
 		if( data.length > MAX_PACKET_SIZE ) {
 			// split
 			var bsize = haxe.io.Bytes.alloc(4);
 			bsize.setInt32(0, data.length);
-			host.sendCustom(SteamHost.CBIG_PACKET, bsize, this);
+			sendMessage(u, SBigPacket, sessionId, bsize);
 			var split = Std.int(data.length / MAX_PAYLOAD_SIZE);
 			for( i in 0...split )
-				host.sendCustom(SteamHost.CBIG_PACKET_DATA, data.sub(i * MAX_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE), this);
-			host.sendCustom(SteamHost.CBIG_PACKET_DATA, data.sub(split * MAX_PAYLOAD_SIZE, data.length - split * MAX_PAYLOAD_SIZE), this);
+				sendMessage(u, SBigPacketData, sessionId, data.sub(i * MAX_PAYLOAD_SIZE, MAX_PAYLOAD_SIZE));
+			sendMessage(u, SBigPacketData, sessionId, data.sub(split * MAX_PAYLOAD_SIZE, data.length - split * MAX_PAYLOAD_SIZE));
 			return;
 		}
-		if( cache == null || cache.length < data.length + 4 )
-			cache = haxe.io.Bytes.alloc(data.length * 2 + 4);
-		cache.blit(4, data, 0, data.length);
-		cache.setInt32(0, @:privateAccess shost.sessionID);
-		//Sys.println(user + " > [" + data.length+"] " + (data.length > 100 ? data.sub(0,60).toHex()+"..."+data.sub(data.length-8,8).toHex() : data.toHex()));
-		steam.Networking.sendP2P(user, cache, Reliable, 0, data.length + 4);
+		sendMessage(u, SSessionData, sessionId, data);
 	}
 
-	override function stop() {
-		super.stop();
-		//Sys.println("CLOSE " + user);
-		steam.Networking.closeSession(user);
+	function getClient( u : steam.User, sid : Int ) {
+		for( h in hosts ) {
+			var c = h.getClient(u);
+			if( c == null ) continue;
+			var cs = Std.instance(c, SteamClient);
+			if( cs.sessionId == sid ) return cs;
+		}
+		return null;
+	}
+
+	function onData( u : steam.User, data : haxe.io.Bytes ) : Void {
+		switch( data.get(0) ) {
+		case SPing:
+			var pid = data.getInt32(1);
+			sendMessage(u, SPong, pid);
+		case SPong:
+			var pid = data.getInt32(1);
+			for( p in pingRequests )
+				if( p.user == u && p.id == pid ) {
+					pingRequests.remove(p);
+					p.onResult( haxe.Timer.stamp() - p.time );
+					break;
+				}
+		case SRequestSession:
+			var gid = data.getInt32(1);
+			var sid = 0;
+			for( h in hosts )
+				if( h.isAuth && h.gameId == gid ) {
+					sid = h.onUserConnect(u);
+					break;
+				}
+			var tmp = haxe.io.Bytes.alloc(4);
+			tmp.setInt32(0, gid);
+			sendMessage(u, SSessionAnswer, sid, tmp);
+		case SSessionAnswer:
+			var sid = data.getInt32(1);
+			var gid = data.getInt32(5);
+			for( h in hosts )
+				if( !h.isAuth && h.gameId == gid && h.server == u ) {
+					var serv = Std.instance(h.clients[0], SteamClient);
+					if( serv.sessionId != 0 ) break;
+					serv.sessionId = sid;
+					h.onConnected(sid != 0);
+					break;
+				}
+		case SSessionData:
+			var sid = data.getInt32(1);
+			var from = getClient(u, sid);
+			if( from != null )
+				Std.instance(@:privateAccess from.host, SteamHost).onData(from, data, 5, data.length - 5);
+		case SBigPacket:
+			var sid = data.getInt32(1);
+			var size = data.getInt32(5);
+			var from = getClient(u, sid);
+			if( from != null ) @:privateAccess {
+				from.bigPacket = haxe.io.Bytes.alloc(size);
+				from.bigPacketPosition = 0;
+			}
+		case SBigPacketData:
+			var sid = data.getInt32(1);
+			var from = getClient(u, sid);
+			if( from != null ) @:privateAccess {
+				from.bigPacket.blit(from.bigPacketPosition, data, 5, data.length - 5);
+				from.bigPacketPosition += data.length - 5;
+				if( from.bigPacketPosition == from.bigPacket.length ) {
+					var data = from.bigPacket;
+					from.bigPacket = null;
+					Std.instance(from.host, SteamHost).onData(from, data, 0, data.length);
+				}
+			}
+		default:
+			// ignore
+		}
+	}
+
+	public function randomID() {
+		while( true ) {
+			var id = Std.random(0x7FFFFFFF) + 1; // positive integer
+			if( !previousIds.exists(id) ) {
+				previousIds.set(id, true);
+				return id;
+			}
+		}
+	}
+
+	public function requestSession( server : steam.User, gid : Int ) {
+		sendMessage(server, SRequestSession, gid);
+	}
+
+	public function ping( user : steam.User, onResult : Null<Float> -> Void ) {
+		var pid = pingID++;
+		pingRequests.push({ user : user, id : pid, time : haxe.Timer.stamp(), onResult : onResult });
+		sendMessage(user, SPing, pid);
+	}
+
+	public function stop( host : SteamHost ) {
+		hosts.remove(host);
+		if( hosts.length == 0 ) {
+			inst = null;
+			steam.Networking.closeP2P();
+		}
+	}
+
+	static var hosts = new Array<SteamHost>();
+	static var inst : SteamService;
+	public static function start( ?host : SteamHost ) {
+		if( inst == null ) {
+			inst = new SteamService();
+			steam.Networking.startP2P(inst);
+		}
+		if( host != null )
+			hosts.push(host);
+		return inst;
 	}
 
 }
 
 class SteamHost extends NetworkHost {
 
-	public static inline var CHELLO_CLIENT = 0;
-	public static inline var CHELLO_SERVER = 1;
-	public static inline var CBIG_PACKET = 2;
-	public static inline var CBIG_PACKET_DATA = 3;
-
 	public var enableRecording(default,set) : Bool = false;
 	var recordedData : haxe.io.BytesBuffer;
 	var recordedStart : Float;
 	var server : steam.User;
 	var onConnected : Bool -> Void;
 	var input : haxe.io.BytesInput;
-	var sessionID : Int;
+	var gameId : Int;
+	var service : SteamService;
 
-	public function new( sessionID : Int ) {
+	public function new( gameId ) {
 		super();
-		this.sessionID = sessionID;
+		this.gameId = gameId;
 		isAuth = false;
 		self = new SteamClient(this, steam.Api.getUser());
 		input = new haxe.io.BytesInput(haxe.io.Bytes.alloc(0));
@@ -114,56 +272,39 @@ class SteamHost extends NetworkHost {
 	}
 
 	function close() {
-		steam.Networking.closeP2P();
+		if( service != null ) {
+			service.stop(this);
+			service = null;
+		}
 	}
 
 	public function startClient( server : steam.User, onConnected : Bool -> Void ) {
 		isAuth = false;
 		this.server = server;
 		clients = [new SteamClient(this, server)];
-		steam.Networking.startP2P(this);
+		service = SteamService.start(this);
 		this.onConnected = onConnected;
-		sendCustom(CHELLO_CLIENT);
+		service.requestSession(server, gameId);
 	}
 
-	override function onCustom(from:NetworkClient, id:Int, ?data:haxe.io.Bytes) {
-		switch( id ) {
-		case CHELLO_CLIENT if( isAuth ):
-			// was disconnected !
-			if( clients.indexOf(from) >= 0 ) {
-				clients.remove(from);
-				pendingClients.push(from);
-			}
-			sendCustom(CHELLO_SERVER, from);
-		case CHELLO_SERVER if( !isAuth && from == clients[0] ):
-			onConnected(true);
-		case CBIG_PACKET:
-			var from = Std.instance(from, SteamClient);
-			from.bigPacket = haxe.io.Bytes.alloc(data.getInt32(0));
-			from.bigPacketPosition = 0;
-		case CBIG_PACKET_DATA:
-			var from = Std.instance(from, SteamClient);
-			from.bigPacket.blit(from.bigPacketPosition, data, 0, data.length);
-			from.bigPacketPosition += data.length;
-			if( from.bigPacketPosition == from.bigPacket.length ) {
-				var data = from.bigPacket;
-				from.bigPacket = null;
-				@:privateAccess {
-					var oldIn = ctx.input;
-					var oldPos = ctx.inPos;
-					from.processMessagesData(data, 0, data.length);
-					ctx.input = oldIn;
-					ctx.inPos = oldPos;
-				}
-			}
-		default:
-			throw "Unknown custom packet " + id;
+	function onUserConnect( user : steam.User ) {
+		var c = getClient(user);
+		if( c == null )
+			c = new SteamClient(this, user);
+		else {
+			// was disconnected, let's reassign a SID
+			clients.remove(c);
+			pendingClients.remove(c);
 		}
+		pendingClients.push(c);
+		var sid = service.randomID();
+		@:privateAccess Std.instance(c,SteamClient).sessionId = sid;
+		return sid;
 	}
 
 	public function startServer() {
 		this.server = steam.Api.getUser();
-		steam.Networking.startP2P(this);
+		service = SteamService.start(this);
 		isAuth = true;
 	}
 
@@ -182,7 +323,7 @@ class SteamHost extends NetworkHost {
 		return null;
 	}
 
-	function onConnectionError( user : steam.User, error : steam.Networking.NetworkStatus ) {
+	function onConnectionError( user : steam.User ) {
 		if( !isAuth && user == server ) {
 			onConnected(false);
 			return;
@@ -193,29 +334,17 @@ class SteamHost extends NetworkHost {
 		}
 	}
 
-	function onConnectionRequest( user : steam.User ) {
-		return true;
-	}
-
 	public function getRecordedData() {
 		return recordedData == null ? null : recordedData.getBytes();
 	}
 
-	function onData( from : steam.User, data : haxe.io.Bytes ) {
-		if( data.getInt32(0) != sessionID )
-			return;
+	function onData( from : SteamClient, data : haxe.io.Bytes, startPos : Int, dataLen : Int ) {
 		if( recordedData != null ) {
 			recordedData.addFloat(haxe.Timer.stamp() - recordedStart);
-			recordedData.addInt32(data.length);
-			recordedData.addBytes(data, 0, data.length);
-		}
-		//Sys.println(from + " < [" + data.length+"] " + (data.length > 100 ? data.sub(0,60).toHex()+"..."+data.sub(data.length-8,8).toHex() : data.toHex()));
-		var c = Std.instance(getClient(from), SteamClient);
-		if( c == null ) {
-			c = new SteamClient(this, from);
-			pendingClients.push(c);
+			recordedData.addInt32(dataLen);
+			recordedData.addBytes(data, startPos, dataLen);
 		}
-		@:privateAccess c.processMessagesData(data, 4, data.length);
+		@:privateAccess from.processMessagesData(data, startPos, dataLen);
 	}