Browse Source

TODO: fix receive message

mikymod 13 years ago
parent
commit
74dca36d45

+ 183 - 35
src/network/AsyncConnection.cpp

@@ -7,6 +7,26 @@ namespace network
 {
 
 AsyncConnection::AsyncConnection(Allocator& allocator) :
+	m_remote_address(0, 0, 0, 0, 0),
+	m_mode(NONE),
+	m_state(DISCONNECTED),
+	m_max_rate(MAX_RATE),
+	m_outgoing_rate_time(0),
+	m_outgoing_rate_bytes(0),
+	m_incoming_rate_time(0),
+	m_incoming_rate_bytes(0),
+	m_incoming_recv_packets(0.0f),
+	m_incoming_dropped_packets(0.0f),
+	m_incoming_packet_loss_time(0),
+	m_outgoing_sent_packet(0),
+	m_local_sequence(0),
+	m_remote_sequence(0),
+	m_max_sequence(MAX_SEQUENCE),
+	m_max_rtt(MAX_RTT),	//in milliseconds
+	m_rtt(0),		
+	m_last_send_time(0),
+	m_last_data_bytes(0),
+	m_running(false),
  	m_sent_msg(allocator),
  	m_received_msg(allocator),
  	m_pending_ack(allocator),
@@ -22,30 +42,67 @@ AsyncConnection::~AsyncConnection()
 }
 
 //-----------------------------------------------------------------------------
-void AsyncConnection::init(const os::NetAddress addr, const int id)
+void AsyncConnection::init(const int32_t id, const real timeout)
 {
-	m_remote_address = addr;
+	// set connection's id
 	m_id = id;
-	m_max_rate = 64000;
-	m_outgoing_rate_time = 0;
-	m_outgoing_rate_bytes = 0;
-	m_incoming_rate_time = 0;
-	m_incoming_rate_bytes = 0;
-	m_incoming_recv_packets = 0.0f;
-	m_incoming_dropped_packets = 0.0f;
-	m_incoming_packet_loss_time = 0;
-	m_outgoing_sent_packet = 0;
-	m_remote_sequence = 0;
-	m_local_sequence = 0;
-	m_max_sequence = 0xFFFFFFFF;
-	m_max_rtt = 1000;	//in milliseconds
-	m_rtt = 0;		
-	m_last_send_time = 0;
-	m_last_data_bytes = 0;
-	
-	// open port
-	m_socket.open(addr.get_port());
-	assert(m_socket.is_open());
+	// set connection's timeout
+	m_timeout = timeout;
+}
+
+//-----------------------------------------------------------------------------
+bool AsyncConnection::start(uint16_t port)
+{
+	// if connection is not running
+	assert(!m_running);
+	os::printf("Start connection on port %d\n", port);
+	// open socket
+	if (!m_socket.open(port))
+	{
+		return false;
+	}
+	m_running = true;
+	return true;  
+}
+
+//-----------------------------------------------------------------------------
+void AsyncConnection::stop()
+{
+	// if connection is running
+	assert(m_running);
+	os::printf("stop connection\n");;
+//  	bool connected = is_connected();
+	_clear_data();
+	// close socket
+	m_socket.close();
+	m_running = false;
+}
+
+//-----------------------------------------------------------------------------
+void AsyncConnection::listen()
+{
+	os::printf("server listening for connection...\n");;
+	// Set connection mode and state
+	_clear_data();
+	m_mode = SERVER;
+	m_state = LISTENING;  
+}
+
+//-----------------------------------------------------------------------------
+void AsyncConnection::connect(const os::NetAddress& addr)
+{
+  	_clear_data();
+
+	os::printf("client connecting to ");
+	os::printf("%i.", (uint8_t)addr.address[0]);
+	os::printf("%i.", (uint8_t)addr.address[1]);
+	os::printf("%i.", (uint8_t)addr.address[2]);
+	os::printf("%i:", (uint8_t)addr.address[3]);
+	os::printf("%i\n", (uint16_t)addr.port);
+
+	m_mode = CLIENT;
+	m_state = CONNECTING;
+	m_remote_address = addr;
 }
 
 //-----------------------------------------------------------------------------
@@ -100,30 +157,63 @@ float AsyncConnection::get_incoming_packet_loss() const
 	return m_incoming_dropped_packets * 100 / (m_incoming_recv_packets + m_incoming_dropped_packets);
 }
 
+//-----------------------------------------------------------------------------
+uint16_t AsyncConnection::get_local_sequence() const
+{
+	return m_local_sequence;
+}
 //-----------------------------------------------------------------------------
 void AsyncConnection::send_message(BitMessage& msg, const uint32_t time)
 {
+	assert(m_running);
+	
+	msg.init(MAX_MESSAGE_SIZE);
 	m_socket.send(m_remote_address, msg.get_data(), msg.get_size());
-	_packet_sent(msg.get_size());
+
 }
 
 //-----------------------------------------------------------------------------
-bool AsyncConnection::receive_message(BitMessage& msg, const uint32_t time)
+int32_t AsyncConnection::receive_message(BitMessage& msg, const uint32_t time)
 {
-	size_t size;
-	
-	m_socket.receive(m_remote_address, msg.get_data(), size);
+	assert(m_running);
+	// init BitMessage handler
+	msg.init(175);
+	msg.begin_writing();
+	size_t size = 175;
+	// NetAddress handler
+	os::NetAddress sender(0, 0, 0, 0, 0);
+	// receive message
+	int32_t bytes = m_socket.receive(sender, msg.get_data(), size);
+	//TODO: why received bytes is zero
+	os::printf("%d bytes received\n", bytes);
 	msg.set_size(size);
+	// sets BitMessage in only-read
 	msg.begin_reading();
+	
+	if (m_mode == SERVER && !is_connected() && bytes > 0)
+	{
+		os::printf("server accepts connection from client ");
+		os::printf("%i.", (uint8_t)sender.address[0]);
+		os::printf("%i.", (uint8_t)sender.address[1]);
+		os::printf("%i.", (uint8_t)sender.address[2]);
+		os::printf("%i:", (uint8_t)sender.address[3]);
+		os::printf("%i\n", (uint16_t)sender.port);
+		
+		m_state = CONNECTED;
+		m_remote_address = sender;
+	}
 
-	//TODO: check return value of receive
-	BitMessage::Header header;
-
-	msg.read_int32();	// read protocol id
-	header.sequence = msg.read_int32();	// read sequence
-	msg.read_int32();// read ack
-	msg.read_int32();// read ack_bits
-	header.size = msg.read_uint16();// read size
+	if (sender == m_remote_address)
+	{
+		if (m_mode == CLIENT && m_state == CONNECTING)
+		{
+			os::printf("client completes connection with server");
+			m_state = CONNECTED;
+		}
+		m_timeout_acc = 0.0f;
+		return msg.get_size();
+	}
+	return 0;
 }
 
 //-----------------------------------------------------------------------------
@@ -133,6 +223,33 @@ void AsyncConnection::clear_reliable_messages()
 	m_received_msg.clear();
 }
 
+//-----------------------------------------------------------------------------
+void AsyncConnection::update(real delta)
+{
+	assert(m_running);
+	
+	m_timeout_acc += delta;
+	
+	if (m_timeout_acc > m_timeout)
+	{
+		if (m_state == CONNECTING)
+		{
+			os::printf("Connect timed out\n");
+			_clear_data();
+			m_state = CONNECT_FAIL;
+		}
+		else if (m_state == CONNECTED)
+		{
+			os::printf("Connection timed out\n");
+ 			_clear_data();
+			if (m_state == CONNECTING)
+			{
+				m_state = CONNECT_FAIL;
+			}
+		}
+	}	
+}	
+
 //-----------------------------------------------------------------------------
 bool AsyncConnection::ready_to_send(const int time) const
 {
@@ -159,6 +276,30 @@ bool AsyncConnection::process(const os::NetAddress from, int time, BitMessage &m
 
 }
 
+//-----------------------------------------------------------------------------
+bool AsyncConnection::is_connecting() const
+{ 
+	return m_state == CONNECTING; 
+}
+
+//-----------------------------------------------------------------------------
+bool AsyncConnection::is_listening() const
+{ 
+	return m_state == LISTENING; 
+}
+
+//-----------------------------------------------------------------------------
+bool AsyncConnection::is_connected() const 
+{ 
+	return m_state == CONNECTED; 
+}
+
+//-----------------------------------------------------------------------------
+bool AsyncConnection::is_connect_fail() const
+{ 
+	return m_state == CONNECT_FAIL; 
+}
+
 //-----------------------------------------------------------------------------
 void AsyncConnection::_packet_sent(size_t size)
 {
@@ -400,5 +541,12 @@ void AsyncConnection::_update_packet_loss(const uint32_t time, const uint32_t nu
 	m_incoming_dropped_packets += num_dropped;
 }
 
+void AsyncConnection::_clear_data()
+{
+	m_state = DISCONNECTED;
+	m_timeout_acc = 0.0f;
+	m_remote_address = os::NetAddress();
+}
+
 } // namespace network
 } // namespace crown

+ 64 - 17
src/network/AsyncConnection.h

@@ -6,16 +6,20 @@
 #include "BitMessage.h"
 #include "Queue.h"
 
+#define DEFAULT_PROTOCOL_ID 0xFFFFFFFF
+#define DEFAULT_TIMEOUT		10.0f
+#define MAX_RATE			64000
+#define MAX_SEQUENCE		0xFFFFFFFF
+#define MAX_RTT				1000
+#define MAX_PACKET_LEN		1400
+#define MAX_MESSAGE_SIZE	16384
+#define MAX_QUEUE_SIZE		16384
+
 namespace crown
 {
 namespace network
 {
-	#define DEFAULT_PROTOCOL_ID 0xFFFFFFFF
-	
-	#define MAX_SEQUENCE		0xFFFFFFFF
-	#define MAX_PACKET_LEN		1400
-	#define MAX_MESSAGE_SIZE	16384
-	#define MAX_QUEUE_SIZE		16384
+
 	
 /**
  * Reliable connection over UDP
@@ -23,11 +27,15 @@ namespace network
 class AsyncConnection
 {
 public:
-  
+
 									AsyncConnection(Allocator& allocator);
 									~AsyncConnection();
 
-	void							init(const os::NetAddress addr, const int32_t id = DEFAULT_PROTOCOL_ID);
+	void							init(const int32_t id = DEFAULT_PROTOCOL_ID, const real timeout = DEFAULT_TIMEOUT);
+	bool 							start(uint16_t port);
+	void 							stop();
+	void 							listen();
+	void 							connect(const os::NetAddress& addr);
 	void							reset_rate();
 
 								  // Sets the maximum outgoing rate.
@@ -41,11 +49,15 @@ public:
 									// Returns the average incoming rate over the last second.
 	int32_t							get_incoming_rate() const;
 									// Returns the average incoming packet loss over the last 5 seconds.
-	real							get_incoming_packet_loss() const;		
+	real							get_incoming_packet_loss() const;
+									// return the current local sequence number
+	uint16_t						get_local_sequence() const;
+									// return the current remote sequence number
+	uint16_t						get_remote_sequence() const;
 									// Sends message
 	void							send_message(BitMessage& msg, const uint32_t time);
 									// Receive message
-	bool							receive_message(BitMessage& msg, const uint32_t time);
+	int32_t							receive_message(BitMessage& msg, const uint32_t time);
 									// Removes any pending outgoing or incoming reliable messages.
 	void							clear_reliable_messages();
 									// Update AsyncConnection
@@ -54,7 +66,32 @@ public:
 	bool							ready_to_send(const int32_t time) const;
 									// Processes the incoming message.
 	bool							process(const os::NetAddress from, int32_t time, BitMessage &msg, int32_t &sequence);
-
+	
+	bool 							is_connecting() const;
+	bool 							is_listening() const; 
+	bool 							is_connected() const;
+	bool 							is_connect_fail() const;
+	
+private:
+	
+	// connection mode
+	enum Mode
+	{
+		NONE,
+		CLIENT,
+		SERVER
+	};
+	
+	// connection states
+	enum State
+	{
+		DISCONNECTED,
+		LISTENING,
+		CONNECTING,
+		CONNECT_FAIL,
+		CONNECTED
+	};  
+  
 private:
 
 									 // methods which provides a reliability system
@@ -64,9 +101,12 @@ private:
 	bool							_sequence_more_recent(uint16_t s1, uint16_t s2);
 	int32_t							_bit_index_for_sequence(uint16_t seq, uint16_t ack);
 	int32_t							_generate_ack_bits(uint16_t ack);
+									// methods which provides a flow control system
 	void							_update_outgoing_rate(const uint32_t time, const size_t size);
 	void							_update_incoming_rate(const uint32_t time, const size_t size);
 	void							_update_packet_loss(const uint32_t time, const uint32_t num_recv, const uint32_t num_dropped);
+	
+	void							_clear_data();
 
 
 private:
@@ -74,6 +114,8 @@ private:
 	os::NetAddress					m_remote_address;			// address of remote host
 	os::UDPSocket					m_socket;					// socket
 	int32_t							m_id;						// our identification used instead of port number
+	Mode							m_mode;						// connection mode
+	State							m_state;					// connection state
 	int32_t							m_max_rate;					// maximum number of bytes that may go out per second
 
 									// variables to keep track of the rate
@@ -86,27 +128,32 @@ private:
 	real							m_incoming_recv_packets;
 	real							m_incoming_dropped_packets;
 	int32_t							m_incoming_packet_loss_time;
-	
-						
+
 	int32_t							m_outgoing_sent_packet;		// keep track of sent packet
 
 									// sequencing variables
-	int32_t							m_local_sequence;
-	int32_t							m_remote_sequence;
-	int32_t							m_max_sequence;
+	uint16_t						m_local_sequence;
+	uint16_t						m_remote_sequence;
+	uint16_t						m_max_sequence;
 	
 	real							m_max_rtt;					// Maximum round trip time
 	real							m_rtt;						// Round trip time
+	real							m_timeout;					// connection timeout value
+	real							m_timeout_acc;				// timeout accumulator
 	
 									// variables to control the outgoing rate
 	int32_t							m_last_send_time;			// last time data was sent out
 	int32_t							m_last_data_bytes;			// bytes left to send at last send time
-
+									
+									// flag variables
+	bool							m_running;
 									// reliable messages
 	Queue<BitMessage::Header>		m_sent_msg;					// Sent messages queue
 	Queue<BitMessage::Header>		m_received_msg;				// Received messages queue
 	Queue<BitMessage::Header>		m_pending_ack;				// Pending acknokledges queue
 	Queue<BitMessage::Header>		m_acked;					// Acknowledges queue
+					
+
 
 };
 

+ 7 - 3
src/network/BitMessage.cpp

@@ -729,8 +729,12 @@ size_t BitMessage::read_header(Header& header)
 	header.size = (size_t)read_uint16();
 }
 
-
-
-
+//---------------------------------------------------------------------------------------------
+void BitMessage::print() const
+{
+	os::printf("MAX_SIZE: %d\n", m_max_size);
+	os::printf("CUR_SIZE: %d\n", m_cur_size);	
+}
+  
 }	//namespace network
 }	//namespace crown

+ 2 - 0
src/network/BitMessage.h

@@ -104,6 +104,8 @@ public:
 	int32_t				read_data(void* data, int32_t length) const;
 	void				read_netaddr(os::NetAddress* addr) const;
 	size_t				read_header(Header& header);
+	
+	void				print() const;
 
 private:
   

+ 22 - 5
src/os/OS.h

@@ -156,7 +156,7 @@ struct NetAddress
 	
 	uint32_t get_address()
 	{
-		uint32_t addr = address[0] << 24 | address[1] << 16 | address[2] << 8 | address[3];
+		uint32_t addr = (address[0] << 24) | (address[1] << 16) | (address[2] << 8) | (address[3]);
 		
 		return addr;
 	}
@@ -168,10 +168,10 @@ struct NetAddress
 	
 	void set(uint32_t a, uint16_t p)
 	{
-		address[0] = (uint8_t) a >> 24;
-		address[1] = (uint8_t) a >> 16;
-		address[2] = (uint8_t) a >> 8;
-		address[3] = (uint8_t) a;
+		address[0] = a >> 24;
+		address[1] = a >> 16;
+		address[2] = a >> 8;
+		address[3] = a;
 
 		port = p;
 	}
@@ -194,6 +194,23 @@ struct NetAddress
 			   address[3] == addr.address[3] &&
 			   port == addr.port;
 	}
+	
+	NetAddress& operator=(const NetAddress& addr)
+	{
+		address[0] = addr.address[0];
+		address[1] = addr.address[1];
+		address[2] = addr.address[2];
+		address[3] = addr.address[3];
+		
+		port = addr.port;
+		
+		return *this;
+	}
+	
+	void print()
+	{
+		printf("NetAddress: %i.%i.%i.%i:%i\n", address[0], address[1], address[2], address[3], port);
+	}
 };
 
 //-----------------------------------------------------------------------------

+ 3 - 3
src/os/linux/LinuxUDPSocket.cpp

@@ -81,7 +81,7 @@ bool UDPSocket::send(NetAddress &receiver, const void* data, size_t size)
 	return sent_bytes == size;
 }
 
-int32_t UDPSocket::receive(NetAddress &sender, void* data, size_t size)
+int32_t UDPSocket::receive(NetAddress& sender, void* data, size_t size)
 {
 	assert(data);
 	assert(size > 0);
@@ -103,9 +103,9 @@ int32_t UDPSocket::receive(NetAddress &sender, void* data, size_t size)
 
 	uint32_t address = ntohl(from.sin_addr.s_addr);
 	uint16_t port = ntohs(from.sin_port);
-
+	
 	sender.set(address, port);
-
+	
 	return received_bytes;	
 }
 

+ 127 - 2
tests/connections.cpp

@@ -1,10 +1,135 @@
 #include <cstdio>
+#include <cstring>
 
+#include "OS.h"
+#include "MallocAllocator.h"
+#include "BitMessage.h"
 #include "AsyncConnection.h"
 
 using namespace crown;
 
-int main()
+const unsigned int server_port = 30000;
+const unsigned int client_port = 30001;
+const unsigned int protocol_id = 0x99887766;
+const float delta_time = 1.0f / 45.0f;
+const real send_rate = 1.0f / 45.0f;
+
+const float time_out = 10.0f;
+
+enum Mode
 {
-	return 0;
+    SERVER,
+    CLIENT
+};
+
+int main(int argc, char** argv)
+{
+	if (argc != 2)
+	{
+		os::printf("Usage: ./connections server/client.\n");
+		return -1;
+	}
+	
+	bool connected = false;
+	real send_acc = 0.0f;
+	Mode mode = SERVER;
+	os::NetAddress addr(127, 0, 0, 1, server_port);
+	
+	// creates connection
+	MallocAllocator alloc;
+	network::AsyncConnection connection(alloc);
+	
+	if (strcmp(argv[1], "server") == 0)
+	{
+		mode = SERVER;
+	}
+	else if (strcmp(argv[1], "client") == 0)
+	{
+		mode = CLIENT;
+	}
+	else
+	{
+		os::printf("Usage: ./connections server/client.\n");
+		return -1;
+	}
+	
+	uint16_t port = mode == SERVER ? server_port : client_port;
+
+	
+	// connection init
+	connection.init(protocol_id, 10.0f);
+	//connection start
+	if (!connection.start(port))
+	{
+        os::printf("Couldn't start connection on port %d\n", port);
+		return -1;
+	}
+	
+	if (mode == CLIENT)
+	{
+		connection.connect(addr);
+	}
+	else if (mode == SERVER)
+	{
+		connection.listen();
+	}	
+	
+	// main loop
+	while (true)
+	{
+       
+        if (!connected && connection.is_connected())
+        {
+            os::printf("Client connected to server\n");
+            connected = true;
+        }
+        
+        if (!connected && connection.is_connect_fail())
+        {
+            os::printf("Connection failed\n");
+            break;
+        }
+                
+        send_acc += delta_time;
+		
+		if (mode == CLIENT)
+		{
+			while (send_acc > 1 / send_rate)
+			{
+				network::BitMessage message(alloc);
+			
+				message.init(6);
+				message.write_string("prova", 6);
+				
+				connection.send_message(message, delta_time);
+				
+				send_acc -= 1.0f / send_rate;
+			}
+		}
+		if (mode == SERVER)
+		{
+			while (true)
+			{
+				network::BitMessage received(alloc);
+				
+				received.init(6);
+				
+				int32_t bytes = connection.receive_message(received, delta_time);
+				if (bytes > 0)
+				{
+					char string[6];
+					received.read_string(string, 6);
+// 					os::printf("%s\n", string);
+				} 
+				
+				if (bytes <= 0)
+				{
+					break;
+				}
+			}
+		}
+	}
+	
+	connection.stop();
+  
 }

+ 1 - 0
tests/messages.cpp

@@ -270,6 +270,7 @@ void test_string()
 	printf("-----------------------------\n");
 	printf("start write and read for STRING\n");
  	printf("string = %s\n", res);
+	printf("sizeof string= %d\n", sizeof(s));
 	printf("bits written = %d\n", bits_written);
 	printf("remaining write bits = %d\n", rem_write_bits);
 	printf("bits read = %d\n", bits_read);