Quellcode durchsuchen

Improve ConsoleServer

Daniele Bartolini vor 12 Jahren
Ursprung
Commit
babfd26fdb
2 geänderte Dateien mit 95 neuen und 130 gelöschten Zeilen
  1. 70 103
      engine/ConsoleServer.cpp
  2. 25 27
      engine/ConsoleServer.h

+ 70 - 103
engine/ConsoleServer.cpp

@@ -42,9 +42,6 @@ namespace crown
 //-----------------------------------------------------------------------------
 ConsoleServer::ConsoleServer(uint16_t port)
 	: m_port(port)
-	, m_num_clients(0)
-	, m_receive_buffer(default_allocator())
-	, m_receive_callbacks(default_allocator())
 {
 }
 
@@ -56,8 +53,14 @@ void ConsoleServer::init(bool wait)
 
 	if (wait)
 	{
+		AcceptResult result;
 		TCPSocket client;
-		m_server.accept(client);
+		do
+		{
+			result = m_server.accept(client);
+		}
+		while (result.error != AcceptResult::NO_ERROR);
+
 		add_client(client);
 	}
 }
@@ -65,7 +68,7 @@ void ConsoleServer::init(bool wait)
 //-----------------------------------------------------------------------------
 void ConsoleServer::shutdown()
 {
-	for (uint32_t i = 0; i < MAX_CONSOLE_CLIENTS; i++)
+	for (uint32_t i = 0; i < m_clients.size(); i++)
 	{
 		m_clients[i].close();
 	}
@@ -94,151 +97,115 @@ void ConsoleServer::log_to_all(const char* message, LogSeverity::Enum severity)
 
 	json << "\"message\":\"" << buf << "\"}";
 
-	send_message_to_all(json.c_str());
+	send_to_all(json.c_str());
 }
 
 //-----------------------------------------------------------------------------
-void ConsoleServer::send_message_to(ClientId client, const char* message)
+void ConsoleServer::send(TCPSocket client, const char* message)
 {
 	uint32_t msg_len = string::strlen(message);
-	m_clients[client.index].write((const char*) &msg_len, 4);
-	m_clients[client.index].write(message, msg_len);
+	client.write((const char*) &msg_len, 4);
+	client.write(message, msg_len);
 }
 
 //-----------------------------------------------------------------------------
-void ConsoleServer::send_message_to_all(const char* message)
+void ConsoleServer::send_to_all(const char* message)
 {
-	// Update all clients
-	for (uint32_t i = 0; i < MAX_CONSOLE_CLIENTS; i++)
+	for (uint32_t i = 0; i < m_clients.size(); i++)
 	{
-		ClientId id = *(m_clients_table.begin() + i);
-		if (id.id != INVALID_ID)
-		{
-			send_message_to(id, message);
-		}
+		send(m_clients[i].socket, message);
 	}
 }
 
 //-----------------------------------------------------------------------------
 void ConsoleServer::update()
 {
-	// Check for new clients
-	TCPSocket client;
-	if (m_server.accept_nonblock(client))
+	// Check for new clients only if we have room for them
+	if (m_clients.size() < MAX_CONSOLE_CLIENTS - 1)
 	{
-		add_client(client);
+		TCPSocket client;
+		AcceptResult result = m_server.accept_nonblock(client);
+		if (result.error == AcceptResult::NO_ERROR)
+		{
+			add_client(client);
+		}
 	}
 
+	TempAllocator256 alloc;
+	List<Id> to_remove(alloc);
+
 	// Update all clients
-	for (uint32_t i = 0; i < MAX_CONSOLE_CLIENTS; i++)
+	for (uint32_t i = 0; i < m_clients.size(); i++)
 	{
-		ClientId id = *(m_clients_table.begin() + i);
-		if (id.id != INVALID_ID)
-		{
-			update_client(id);
-		}
+		ReadResult rr = update_client(m_clients[i].socket);
+		if (rr.error != ReadResult::NO_ERROR) to_remove.push_back(m_clients[i].id);
 	}
 
-	// Process all requests
-	process_requests();
+	// Remove clients
+	for (uint32_t i = 0; i < to_remove.size(); i++)
+	{
+		m_clients.lookup(to_remove[i]).socket.close();
+		m_clients.destroy(to_remove[i]);
+	}
 }
 
 //-----------------------------------------------------------------------------
-void ConsoleServer::process_requests()
+void ConsoleServer::add_client(TCPSocket socket)
 {
-	for (uint32_t i = 0; i < m_receive_callbacks.size(); i++)
-	{
-		RPCCallback cb = m_receive_callbacks.front();
-		m_receive_callbacks.pop_front();
-
-		const char* request = &m_receive_buffer[cb.message_index];
-		JSONParser parser(request);
-		JSONElement request_type = parser.root().key("type");
-		DynamicString type; 
-		request_type.string_value(type);
-
-		// Determine request type
-		if (type == "ping") process_ping(cb.client, request);
-		else if (type == "script") process_script(cb.client, request);
-		else if (type == "stats") process_stats(cb.client, request);
-		else if (type == "command") process_command(cb.client, request);
-		else if (type == "filesystem") processs_filesystem(cb.client, request);
-		else continue;
-	}
-
-	m_receive_callbacks.clear();
-	m_receive_buffer.clear();
+	Client client;
+	client.socket = socket;
+	Id id = m_clients.create(client);
+	m_clients.lookup(id).id = id;
 }
 
 //-----------------------------------------------------------------------------
-void ConsoleServer::update_client(ClientId id)
+ReadResult ConsoleServer::update_client(TCPSocket client)
 {
-	TCPSocket& client = m_clients[id.index];
-
 	uint32_t msg_len = 0;
 	ReadResult rr = client.read_nonblock(&msg_len, 4);
 
 	// If no data received, return
-	if (rr.error == ReadResult::NO_ERROR && rr.bytes_read == 0) return;
-	if (rr.error == ReadResult::REMOTE_CLOSED)
-	{
-		remove_client(id);
-		return;
-	}
+	if (rr.error == ReadResult::NO_ERROR && rr.bytes_read == 0) return rr;
+	if (rr.error == ReadResult::REMOTE_CLOSED) return rr;
+	if (rr.error != ReadResult::NO_ERROR) return rr;
 
 	// Else read the message
 	List<char> msg_buf(default_allocator());
 	msg_buf.resize(msg_len);
 	ReadResult msg_result = client.read(msg_buf.begin(), msg_len);
+	msg_buf.push_back('\0');
 
-	uint32_t message_index = m_receive_buffer.size();
-	m_receive_buffer.push(msg_buf.begin(), msg_result.bytes_read);
-	m_receive_buffer.push_back('\0');
-	add_request(id, message_index);
-}
-
-//-----------------------------------------------------------------------------
-void ConsoleServer::add_client(TCPSocket& client)
-{
-	if (m_num_clients < MAX_CONSOLE_CLIENTS)
-	{
-		ClientId id = m_clients_table.create();
-		m_clients[id.index] = client;
-		m_num_clients++;
-	}
-	else
-	{
-		Log::e("Too many clients, connection denied");
-	}
-}
-
-//-----------------------------------------------------------------------------
-void ConsoleServer::remove_client(ClientId id)
-{
-	CE_ASSERT(m_num_clients > 0, "No client connected");
+	if (msg_result.error == ReadResult::REMOTE_CLOSED) return msg_result;
+	if (msg_result.error != ReadResult::NO_ERROR) return msg_result;
 
-	m_clients[id.index].close();
-	m_clients_table.destroy(id);
-	m_num_clients--;
+	process(client, msg_buf.begin());
+	return msg_result;
 }
 
 //-----------------------------------------------------------------------------
-void ConsoleServer::add_request(ClientId client, uint32_t message_index)
+void ConsoleServer::process(TCPSocket client, const char* request)
 {
-	RPCCallback cb;
-	cb.client = client;
-	cb.message_index = message_index;
-	m_receive_callbacks.push_back(cb);
+	JSONParser parser(request);
+	DynamicString type; 
+	parser.root().key("type").string_value(type);
+
+	// Determine request type
+	if (type == "ping") process_ping(client, request);
+	else if (type == "script") process_script(client, request);
+	else if (type == "stats") process_stats(client, request);
+	else if (type == "command") process_command(client, request);
+	else if (type == "filesystem") processs_filesystem(client, request);
+	else CE_FATAL("Request unknown.");
 }
 
 //-----------------------------------------------------------------------------
-void ConsoleServer::process_ping(ClientId client, const char* /*msg*/)
+void ConsoleServer::process_ping(TCPSocket client, const char* /*msg*/)
 {
-	send_message_to(client, "{\"type\":\"pong\"}");
+	send(client, "{\"type\":\"pong\"}");
 }
 
 //-----------------------------------------------------------------------------
-void ConsoleServer::process_script(ClientId /*client*/, const char* msg)
+void ConsoleServer::process_script(TCPSocket /*client*/, const char* msg)
 {
 	JSONParser parser(msg);
 	JSONElement root = parser.root();
@@ -249,7 +216,7 @@ void ConsoleServer::process_script(ClientId /*client*/, const char* msg)
 }
 
 //-----------------------------------------------------------------------------
-void ConsoleServer::process_stats(ClientId client, const char* /*msg*/)
+void ConsoleServer::process_stats(TCPSocket client, const char* /*msg*/)
 {
 	TempAllocator2048 alloc;
 	StringStream response(alloc);
@@ -271,11 +238,11 @@ void ConsoleServer::process_stats(ClientId client, const char* /*msg*/)
 
 	response << "]" << "}";
 
-	send_message_to(client, response.c_str());
+	send(client, response.c_str());
 }
 
 //-----------------------------------------------------------------------------
-void ConsoleServer::process_command(ClientId /*client*/, const char* msg)
+void ConsoleServer::process_command(TCPSocket /*client*/, const char* msg)
 {
 	JSONParser parser(msg);
 	JSONElement root = parser.root();
@@ -311,7 +278,7 @@ void ConsoleServer::process_command(ClientId /*client*/, const char* msg)
 }
 
 //-----------------------------------------------------------------------------
-void ConsoleServer::processs_filesystem(ClientId client, const char* msg)
+void ConsoleServer::processs_filesystem(TCPSocket client, const char* msg)
 {
 	JSONParser parser(msg);
 	JSONElement root = parser.root();
@@ -333,7 +300,7 @@ void ConsoleServer::processs_filesystem(ClientId client, const char* msg)
 		StringStream response(alloc);
 		response << "{\"type\":\"file\",\"size\":" << file_size << "}";
 
-		send_message_to(client, response.c_str());
+		send(client, response.c_str());
 	}
 	else if (cmd == "read")
 	{
@@ -361,7 +328,7 @@ void ConsoleServer::processs_filesystem(ClientId client, const char* msg)
 		response << "{\"type\":\"file\",";
 		response << "\"data\":\"" << bytes_encoded << "\"}";
 
-		send_message_to(client, response.c_str());
+		send(client, response.c_str());
 
 		// Cleanup
 		default_allocator().deallocate(bytes_encoded);

+ 25 - 27
engine/ConsoleServer.h

@@ -29,21 +29,27 @@ OTHER DEALINGS IN THE SOFTWARE.
 #include "OsSocket.h"
 #include "List.h"
 #include "Queue.h"
-#include "IdTable.h"
+#include "IdArray.h"
 #include "Log.h"
 
+#define MAX_CONSOLE_CLIENTS 32
+
 namespace crown
 {
 
-typedef Id ClientId;
-#define MAX_CONSOLE_CLIENTS 100
-
-struct RPCCallback
+struct Client
 {
-	ClientId client;
-	uint32_t message_index;
+	Id id;
+	TCPSocket socket;
+
+	void close()
+	{
+		socket.close();
+	}
 };
 
+typedef IdArray<MAX_CONSOLE_CLIENTS, Client> ClientArray;
+
 class ConsoleServer
 {
 public:
@@ -58,37 +64,29 @@ public:
 
 	void						log_to_all(const char* message, LogSeverity::Enum severity);
 
-	void						send_message_to(ClientId client, const char* message);
-	void						send_message_to_all(const char* message);
-
 	/// Collects requests from clients and processes them all.
 	void						update();
 
 private:
 
-	void						process_requests();
-	void						update_client(ClientId id);
-	void						add_client(TCPSocket& client);
-	void						remove_client(ClientId id);
+	void						send(TCPSocket client, const char* message);
+	void						send_to_all(const char* message);
+
+	void						add_client(TCPSocket socket);
+	ReadResult					update_client(TCPSocket client);
+	void						process(TCPSocket client, const char* request);
 
-	void						add_request(ClientId client, uint32_t message_index);
-	void						process_ping(ClientId client, const char* msg);
-	void						process_script(ClientId client, const char* msg);
-	void						process_stats(ClientId client, const char* msg);
-	void						process_command(ClientId client, const char* msg);
-	void						processs_filesystem(ClientId client, const char* msg);
+	void						process_ping(TCPSocket client, const char* msg);
+	void						process_script(TCPSocket client, const char* msg);
+	void						process_stats(TCPSocket client, const char* msg);
+	void						process_command(TCPSocket client, const char* msg);
+	void						processs_filesystem(TCPSocket client, const char* msg);
 
 private:
 
 	uint16_t					m_port;
 	TCPServer					m_server;
-
-	uint8_t						m_num_clients;
-	IdTable<MAX_CONSOLE_CLIENTS>	m_clients_table;
-	TCPSocket					m_clients[MAX_CONSOLE_CLIENTS];
-
-	List<char>					m_receive_buffer;
-	Queue<RPCCallback>			m_receive_callbacks;
+	ClientArray					m_clients;
 };
 
 } // namespace crown