Ivan Safrin 12 лет назад
Родитель
Сommit
08598b6e91
2 измененных файлов с 71 добавлено и 41 удалено
  1. 10 8
      Core/Contents/Include/PolyPeer.h
  2. 61 33
      Core/Contents/Source/PolyPeer.cpp

+ 10 - 8
Core/Contents/Include/PolyPeer.h

@@ -28,7 +28,7 @@ THE SOFTWARE.
 #include "PolySocket.h"
 
 #include <vector>
-
+#include <deque>
 
 namespace Polycode {	
 	
@@ -38,7 +38,6 @@ namespace Polycode {
 		unsigned int headerHash;
 		unsigned int sequence;
 		unsigned int ack;
-		unsigned short reliableID;
 		unsigned int ackBitfield;
 		unsigned short size;
 		unsigned short type;
@@ -56,17 +55,16 @@ namespace Polycode {
 	
 	class _PolyExport PeerConnection {
 	public:
-		PeerConnection() { localSequence = 0; remoteSequence = 0; reliableID = 1;}
-		~PeerConnection(){}
-		
-		void ackPackets(unsigned int ack);
+		PeerConnection();
+		~PeerConnection();
 		
+		void ackPacketsWithBitfield(unsigned int ack, unsigned int ackBitfield);		
+		void ackPackets(unsigned int ack);		
 		unsigned int localSequence;
 		unsigned int remoteSequence;
-		unsigned int reliableID;
 		
 		std::vector<SentPacketEntry> reliablePacketQueue;
-		std::vector<unsigned short> recentReliableIDs;
+		std::deque <unsigned int> receivedPacketQueue;
 		Address address;
 	};
 
@@ -151,6 +149,8 @@ namespace Polycode {
 			PeerConnection *addPeerConnection(const Address &address);
 			void removePeerConnection(PeerConnection* connection);
 			
+			void setReliableRetransmissionInterval(int interval);
+					
 			void updateReliableDataQueue();
 		
 			virtual void updatePeer(){}
@@ -158,6 +158,8 @@ namespace Polycode {
 		
 		protected:
 		
+			int reliableRetransmissionInverval;
+		
 			Timer *updateTimer;
 			std::vector<PeerConnection*> peerConnections;
 			Socket *socket;

+ 61 - 33
Core/Contents/Source/PolyPeer.cpp

@@ -27,6 +27,16 @@ THE SOFTWARE.
 
 using namespace Polycode;
 
+PeerConnection::PeerConnection() {
+	localSequence = 0;
+	remoteSequence = 0;
+	receivedPacketQueue.resize(32, 0);
+}
+
+PeerConnection::~PeerConnection() {
+
+}
+
 void PeerConnection::ackPackets(unsigned int ack) {	
 	std::vector<SentPacketEntry>::iterator it;	
 	for(it = reliablePacketQueue.begin(); it != reliablePacketQueue.end();) {
@@ -39,6 +49,19 @@ void PeerConnection::ackPackets(unsigned int ack) {
 	}
 }
 
+void PeerConnection::ackPacketsWithBitfield(unsigned int ack, unsigned int ackBitfield) {
+
+	if(reliablePacketQueue.size() == 0) {
+		return;
+	}
+	
+	for(int i=0; i <32; i++) {
+		if(ackBitfield & (1<<i)) {
+			ackPackets(ack-i);
+		}
+	}
+}
+
 #if USE_THREADED_SOCKETS == 1
 	Peer::Peer(unsigned int port) : Threaded() {
 #else
@@ -55,6 +78,7 @@ void PeerConnection::ackPackets(unsigned int ack) {
 	updateTimer->addEventListener(this, Timer::EVENT_TRIGGER);
 #endif
 
+	reliableRetransmissionInverval = 1000;
 }
 
 Peer::~Peer() {
@@ -86,6 +110,10 @@ void Peer::removePeerConnection(PeerConnection* connection) {
 	}
 }
 
+void Peer::setReliableRetransmissionInterval(int interval) {
+	reliableRetransmissionInverval = interval;
+}
+
 Packet *Peer::createPacket(const Address &target, char *data, unsigned int size, unsigned short type) {	
 	PeerConnection *connection = getPeerConnection(target);
 	if(!connection)
@@ -93,11 +121,26 @@ Packet *Peer::createPacket(const Address &target, char *data, unsigned int size,
 	Packet *packet = new Packet();
 	packet->header.sequence = connection->localSequence;
 	packet->header.headerHash = 20;
-	packet->header.reliableID = 0;	
 	packet->header.ack = connection->remoteSequence;
 	packet->header.ackBitfield = 0;
-	packet->header.size = size;	
+			
+	for(int i=0; i < 32; i++) {
+		if(connection->receivedPacketQueue[31-i] == connection->remoteSequence-i) {
+			packet->header.ackBitfield = (packet->header.ackBitfield & ~(1 << i)) | (1 << i);
+		} else {
+			packet->header.ackBitfield = (packet->header.ackBitfield & ~(1 << i)) | (0 << i);		
+		}
+	}
+
+	int sizeToCopy = size;
+	if(size > MAX_PACKET_SIZE) {
+		size = MAX_PACKET_SIZE; 
+	}
+	
+	packet->header.size = sizeToCopy;	
 	packet->header.type = type;
+	
+	
 	if(size > 0)
 		memcpy(packet->data, data, size);	
 	connection->localSequence++;	
@@ -108,20 +151,14 @@ void Peer::sendReliableData(const Address &target, char *data, unsigned int size
 	PeerConnection *connection = getPeerConnection(target);	
 	if(!connection)
 		connection = addPeerConnection(target);	
-	Packet *packet = createPacket(target, data, size, type);
-	packet->header.reliableID = connection->reliableID;
-	connection->reliableID++;
-	
-	if(connection->reliableID == 0)
-		connection->reliableID = 1;
-
+	Packet *packet = createPacket(target, data, size, type);	
 	sendPacket(target, packet);	
-	/*
+
 	SentPacketEntry entry;
 	entry.packet = packet;
 	entry.timestamp = CoreServices::getInstance()->getCore()->getTicks();
 	connection->reliablePacketQueue.push_back(entry);
-*/
+
 }
 
 void Peer::sendDataToAll(char *data, unsigned int size, unsigned short type) {
@@ -148,31 +185,17 @@ void Peer::sendPacket(const Address &target, Packet *packet) {
 }
 
 bool Peer::checkPacketAcks(PeerConnection *connection, Packet *packet) {
-	bool retVal = true;
-	if(packet->header.sequence > connection->remoteSequence)
+	if(packet->header.sequence > connection->remoteSequence) {
 		connection->remoteSequence = packet->header.sequence;
-	else // ignore old packets
-		retVal = false;
-	
-	// if this is a reliable packet, check if it was recently received	
-	if(packet->header.reliableID != 0) {
-		retVal = true;
-		for(int i=0; i < connection->recentReliableIDs.size(); i++) {
-			if(connection->recentReliableIDs[i] == packet->header.reliableID)
-				retVal = false;
-		}
-		
-		// if still good, push the id into recent reliable acks
-		if(retVal) {
-			connection->recentReliableIDs.push_back(packet->header.reliableID);
-			if(connection->recentReliableIDs.size() > 50)
-				connection->recentReliableIDs.erase(connection->recentReliableIDs.begin());
-		}		
+	} else {
+		return false;
 	}
 	
-	connection->ackPackets(packet->header.ack);
+	connection->receivedPacketQueue.push_back(packet->header.sequence);	
+	connection->receivedPacketQueue.pop_front();
 	
-	return retVal;
+	connection->ackPacketsWithBitfield(packet->header.ack, packet->header.ackBitfield);	
+	return true;
 }
 
 void Peer::handleEvent(Event *event) {
@@ -195,8 +218,13 @@ void Peer::handleEvent(Event *event) {
 void Peer::updateReliableDataQueue() {
 	for(int i=0; i < peerConnections.size(); i++) {
 		for(int j=0; j < peerConnections[i]->reliablePacketQueue.size(); j++) {		
-			if(peerConnections[i]->reliablePacketQueue[j].timestamp < CoreServices::getInstance()->getCore()->getTicks() - 1000) {
+			if(peerConnections[i]->reliablePacketQueue[j].timestamp < CoreServices::getInstance()->getCore()->getTicks() - reliableRetransmissionInverval) {
 				peerConnections[i]->reliablePacketQueue[j].timestamp = CoreServices::getInstance()->getCore()->getTicks(); 
+				
+				Packet *oldPacket = peerConnections[i]->reliablePacketQueue[j].packet;
+				peerConnections[i]->reliablePacketQueue[j].packet = createPacket(peerConnections[i]->address, oldPacket->data, oldPacket->header.size, oldPacket->header.type);
+				delete oldPacket;
+				
 				sendPacket(peerConnections[i]->address, peerConnections[i]->reliablePacketQueue[j].packet);
 			}
 		}