2
0
Эх сурвалжийг харах

Outgoing messages are now packed together (#2636)

* outgoing messages are now packed together

* ability to set connection packet buffer limit

* custom messages and broadcasting fixed

* code style updates
Arnis Lielturks 5 жил өмнө
parent
commit
30dce947c4

+ 1 - 1
Source/Samples/16_Chat/Chat.cpp

@@ -51,7 +51,7 @@
 #endif
 
 // Identifier for the chat network messages
-const int MSG_CHAT = 153;
+const int MSG_CHAT = 154;
 // UDP port we will use
 const unsigned short CHAT_SERVER_PORT = 2345;
 

+ 34 - 0
Source/Samples/17_SceneReplication/SceneReplication.cpp

@@ -186,6 +186,20 @@ void SceneReplication::CreateUI()
     // Hide until connected
     instructionsText_->SetVisible(false);
 
+    packetsIn_ = ui->GetRoot()->CreateChild<Text>();
+    packetsIn_->SetText("Packets in : 0");
+    packetsIn_->SetFont(cache->GetResource<Font>("Fonts/Anonymous Pro.ttf"), 15);
+    packetsIn_->SetHorizontalAlignment(HA_LEFT);
+    packetsIn_->SetVerticalAlignment(VA_CENTER);
+    packetsIn_->SetPosition(10, -10);
+
+    packetsOut_ = ui->GetRoot()->CreateChild<Text>();
+    packetsOut_->SetText("Packets out: 0");
+    packetsOut_->SetFont(cache->GetResource<Font>("Fonts/Anonymous Pro.ttf"), 15);
+    packetsOut_->SetHorizontalAlignment(HA_LEFT);
+    packetsOut_->SetVerticalAlignment(VA_CENTER);
+    packetsOut_->SetPosition(10, 10);
+
     buttonContainer_ = root->CreateChild<UIElement>();
     buttonContainer_->SetFixedSize(500, 20);
     buttonContainer_->SetPosition(20, 20);
@@ -343,6 +357,26 @@ void SceneReplication::HandlePostUpdate(StringHash eventType, VariantMap& eventD
 {
     // We only rotate the camera according to mouse movement since last frame, so do not need the time step
     MoveCamera();
+
+    if (packetCounterTimer_.GetMSec(false) > 1000 && GetSubsystem<Network>()->GetServerConnection())
+    {
+        packetsIn_->SetText("Packets  in: " + String(GetSubsystem<Network>()->GetServerConnection()->GetPacketsInPerSec()));
+        packetsOut_->SetText("Packets out: " + String(GetSubsystem<Network>()->GetServerConnection()->GetPacketsOutPerSec()));
+        packetCounterTimer_.Reset();
+    }
+    if (packetCounterTimer_.GetMSec(false) > 1000 && GetSubsystem<Network>()->GetClientConnections().Size())
+    {
+        int packetsIn = 0;
+        int packetsOut = 0;
+        auto connections = GetSubsystem<Network>()->GetClientConnections();
+        for (auto it = connections.Begin(); it != connections.End(); ++it ) {
+            packetsIn += (*it)->GetPacketsInPerSec();
+            packetsOut += (*it)->GetPacketsOutPerSec();
+        }
+        packetsIn_->SetText("Packets  in: " + String(packetsIn));
+        packetsOut_->SetText("Packets out: " + String(packetsOut));
+        packetCounterTimer_.Reset();
+    }
 }
 
 void SceneReplication::HandlePhysicsPreStep(StringHash eventType, VariantMap& eventData)

+ 6 - 0
Source/Samples/17_SceneReplication/SceneReplication.h

@@ -114,4 +114,10 @@ private:
     SharedPtr<Text> instructionsText_;
     /// ID of own controllable object (client only.)
     unsigned clientObjectID_{};
+    /// Packets in per second
+    SharedPtr<Text> packetsIn_;
+    /// Packets out per second
+    SharedPtr<Text> packetsOut_;
+    /// Packet counter UI update timer
+    Timer packetCounterTimer_;
 };

+ 137 - 56
Source/Urho3D/Network/Connection.cpp

@@ -77,7 +77,8 @@ Connection::Connection(Context* context, bool isClient, const SLNet::AddressOrGU
     connectPending_(false),
     sceneLoaded_(false),
     logStatistics_(false),
-    address_(nullptr)
+    address_(nullptr),
+    packedMessageLimit_(1024)
 {
     sceneState_.connection_ = this;
     port_ = address.systemAddress.GetPort();
@@ -93,6 +94,20 @@ Connection::~Connection()
     address_ = nullptr;
 }
 
+PacketType Connection::GetPacketType(bool reliable, bool inOrder)
+{
+    if (reliable && inOrder)
+        return PT_RELIABLE_ORDERED;
+
+    if (reliable && !inOrder)
+        return PT_RELIABLE_UNORDERED;
+
+    if (!reliable && inOrder)
+        return PT_UNRELIABLE_ORDERED;
+
+    return PT_UNRELIABLE_UNORDERED;
+}
+
 void Connection::SendMessage(int msgID, bool reliable, bool inOrder, const VectorBuffer& msg, unsigned contentID)
 {
     SendMessage(msgID, reliable, inOrder, msg.GetData(), msg.GetSize(), contentID);
@@ -106,16 +121,22 @@ void Connection::SendMessage(int msgID, bool reliable, bool inOrder, const unsig
         URHO3D_LOGERROR("Null pointer supplied for network message data");
         return;
     }
-    
-    VectorBuffer buffer;
-    buffer.WriteUByte((unsigned char)DefaultMessageIDTypes::ID_USER_PACKET_ENUM);
-    buffer.WriteUInt((unsigned int)msgID);
-    buffer.Write(data, numBytes);
-    PacketReliability reliability = reliable ? (inOrder ? RELIABLE_ORDERED : RELIABLE) : (inOrder ? UNRELIABLE_SEQUENCED : UNRELIABLE);
-    if (peer_) {
-        peer_->Send((const char *) buffer.GetData(), (int) buffer.GetSize(), HIGH_PRIORITY, reliability, (char) 0, *address_, false);
-        tempPacketCounter_.y_++;
+
+    PacketType type = GetPacketType(reliable, inOrder);
+    VectorBuffer& buffer = outgoingBuffer_[type];
+
+    if (buffer.GetSize() + numBytes >= packedMessageLimit_)
+        SendBuffer(type);
+
+    if (buffer.GetSize() == 0)
+    {
+        buffer.WriteUByte((unsigned char)DefaultMessageIDTypes::ID_USER_PACKET_ENUM);
+        buffer.WriteUInt((unsigned int)MSG_PACKED_MESSAGE);
     }
+
+    buffer.WriteUInt((unsigned int) msgID);
+    buffer.WriteUInt(numBytes);
+    buffer.Write(data, numBytes);
 }
 
 void Connection::SendRemoteEvent(StringHash eventType, bool inOrder, const VariantMap& eventData)
@@ -354,6 +375,39 @@ void Connection::SendPackages()
     }
 }
 
+void Connection::SendBuffer(PacketType type)
+{
+    VectorBuffer& buffer = outgoingBuffer_[type];
+    if (buffer.GetSize() == 0)
+        return;
+
+    PacketReliability reliability = PacketReliability::UNRELIABLE;
+    if (type == PT_UNRELIABLE_ORDERED)
+        reliability = PacketReliability::UNRELIABLE_SEQUENCED;
+
+    if (type == PT_RELIABLE_ORDERED)
+        reliability = PacketReliability::RELIABLE_ORDERED;
+
+    if (type == PT_RELIABLE_UNORDERED)
+        reliability = PacketReliability::RELIABLE;
+
+    if (peer_) {
+        peer_->Send((const char *) buffer.GetData(), (int) buffer.GetSize(), HIGH_PRIORITY, reliability, (char) 0,
+                    *address_, false);
+        tempPacketCounter_.y_++;
+    }
+
+    buffer.Clear();
+}
+
+void Connection::SendAllBuffers()
+{
+    SendBuffer(PT_RELIABLE_ORDERED);
+    SendBuffer(PT_RELIABLE_UNORDERED);
+    SendBuffer(PT_UNRELIABLE_ORDERED);
+    SendBuffer(PT_UNRELIABLE_UNORDERED);
+}
+
 void Connection::ProcessPendingLatestData()
 {
     if (!scene_ || !sceneLoaded_)
@@ -391,66 +445,76 @@ void Connection::ProcessPendingLatestData()
     }
 }
 
-bool Connection::ProcessMessage(int msgID, MemoryBuffer& msg)
+bool Connection::ProcessMessage(int msgID, MemoryBuffer& buffer)
 {
-    // New incomming message, reset last heard timer
-    lastHeardTimer_.Reset();
     tempPacketCounter_.x_++;
-    bool processed = true;
+    if (buffer.GetSize() == 0)
+        return false;
 
-    switch (msgID)
+    if (msgID != MSG_PACKED_MESSAGE)
     {
-    case MSG_IDENTITY:
-        ProcessIdentity(msgID, msg);
-        break;
+        ProcessUnknownMessage(msgID, buffer);
+        return true;
+    }
 
-    case MSG_CONTROLS:
-        ProcessControls(msgID, msg);
-        break;
+    while (!buffer.IsEof()) {
+        msgID = buffer.ReadUInt();
+        unsigned int packetSize = buffer.ReadUInt();
+        MemoryBuffer msg(buffer.GetData() + buffer.GetPosition(), packetSize);
+        buffer.Seek(buffer.GetPosition() + packetSize);
 
-    case MSG_SCENELOADED:
-        ProcessSceneLoaded(msgID, msg);
-        break;
+        switch (msgID)
+        {
+            case MSG_IDENTITY:
+                ProcessIdentity(msgID, msg);
+                break;
 
-    case MSG_REQUESTPACKAGE:
-    case MSG_PACKAGEDATA:
-        ProcessPackageDownload(msgID, msg);
-        break;
+            case MSG_CONTROLS:
+                ProcessControls(msgID, msg);
+                break;
 
-    case MSG_LOADSCENE:
-        ProcessLoadScene(msgID, msg);
-        break;
+            case MSG_SCENELOADED:
+                ProcessSceneLoaded(msgID, msg);
+                break;
 
-    case MSG_SCENECHECKSUMERROR:
-        ProcessSceneChecksumError(msgID, msg);
-        break;
+            case MSG_REQUESTPACKAGE:
+            case MSG_PACKAGEDATA:
+                ProcessPackageDownload(msgID, msg);
+                break;
 
-    case MSG_CREATENODE:
-    case MSG_NODEDELTAUPDATE:
-    case MSG_NODELATESTDATA:
-    case MSG_REMOVENODE:
-    case MSG_CREATECOMPONENT:
-    case MSG_COMPONENTDELTAUPDATE:
-    case MSG_COMPONENTLATESTDATA:
-    case MSG_REMOVECOMPONENT:
-        ProcessSceneUpdate(msgID, msg);
-        break;
+            case MSG_LOADSCENE:
+                ProcessLoadScene(msgID, msg);
+                break;
 
-    case MSG_REMOTEEVENT:
-    case MSG_REMOTENODEEVENT:
-        ProcessRemoteEvent(msgID, msg);
-        break;
+            case MSG_SCENECHECKSUMERROR:
+                ProcessSceneChecksumError(msgID, msg);
+                break;
 
-    case MSG_PACKAGEINFO:
-        ProcessPackageInfo(msgID, msg);
-        break;
+            case MSG_CREATENODE:
+            case MSG_NODEDELTAUPDATE:
+            case MSG_NODELATESTDATA:
+            case MSG_REMOVENODE:
+            case MSG_CREATECOMPONENT:
+            case MSG_COMPONENTDELTAUPDATE:
+            case MSG_COMPONENTLATESTDATA:
+            case MSG_REMOVECOMPONENT:
+                ProcessSceneUpdate(msgID, msg);
+                break;
 
-    default:
-        processed = false;
-        break;
-    }
+            case MSG_REMOTEEVENT:
+            case MSG_REMOTENODEEVENT:
+                ProcessRemoteEvent(msgID, msg);
+                break;
 
-    return processed;
+            case MSG_PACKAGEINFO:
+                ProcessPackageInfo(msgID, msg);
+                break;
+            default:
+                ProcessUnknownMessage(msgID, msg);
+                break;
+        }
+    }
+    return true;
 }
 
 void Connection::Ban()
@@ -1115,6 +1179,11 @@ void Connection::ConfigureNetworkSimulator(int latencyMs, float packetLoss)
         peer_->ApplyNetworkSimulator(packetLoss, latencyMs, 0);
 }
 
+void Connection::SetPacketSizeLimit(int limit)
+{
+    packedMessageLimit_ = limit;
+}
+
 void Connection::HandleAsyncLoadFinished(StringHash eventType, VariantMap& eventData)
 {
     sceneLoaded_ = true;
@@ -1572,6 +1641,18 @@ void Connection::ProcessPackageInfo(int msgID, MemoryBuffer& msg)
     RequestNeededPackages(1, msg);
 }
 
+void Connection::ProcessUnknownMessage(int msgID, MemoryBuffer& msg)
+{
+    // If message was not handled internally, forward as an event
+    using namespace NetworkMessage;
+
+    VariantMap& eventData = GetEventDataMap();
+    eventData[P_CONNECTION] = this;
+    eventData[P_MESSAGEID] = (int)msgID;
+    eventData[P_DATA].SetBuffer(msg.GetData(), msg.GetSize());
+    SendEvent(E_NETWORKMESSAGE, eventData);
+}
+
 String Connection::GetAddress() const {
     return String(address_->ToString(false /*write port*/)); 
 }

+ 23 - 2
Source/Urho3D/Network/Connection.h

@@ -106,6 +106,14 @@ enum ObserverPositionSendMode
     OPSM_POSITION_ROTATION
 };
 
+/// Packet types for outgoing buffers. Outgoing messages are grouped by their type
+enum PacketType {
+    PT_UNRELIABLE_UNORDERED,
+    PT_UNRELIABLE_ORDERED,
+    PT_RELIABLE_UNORDERED,
+    PT_RELIABLE_ORDERED
+};
+
 /// %Connection to a remote network host.
 class URHO3D_API Connection : public Object
 {
@@ -117,6 +125,8 @@ public:
     /// Destruct.
     ~Connection() override;
 
+    /// Get packet type based on the message parameters
+    PacketType GetPacketType(bool reliable, bool inOrder);
     /// Send a message.
     void SendMessage(int msgID, bool reliable, bool inOrder, const VectorBuffer& msg, unsigned contentID = 0);
     /// Send a message.
@@ -149,17 +159,20 @@ public:
     void SendRemoteEvents();
     /// Send package files to client. Called by network.
     void SendPackages();
+    /// Send out buffered messages by their type
+    void SendBuffer(PacketType type);
+    /// Send out all buffered messages
+    void SendAllBuffers();
     /// Process pending latest data for nodes and components.
     void ProcessPendingLatestData();
     /// Process a message from the server or client. Called by Network.
-    bool ProcessMessage(int msgID, MemoryBuffer& msg);
+    bool ProcessMessage(int msgID, MemoryBuffer& buffer);
     /// Ban this connections IP address.
     void Ban();
     /// Return the RakNet address/guid.
     const SLNet::AddressOrGUID& GetAddressOrGUID() const { return *address_; }
     /// Set the the RakNet address/guid.
     void SetAddressOrGUID(const SLNet::AddressOrGUID& addr);
-
     /// Return client identity.
     VariantMap& GetIdentity() { return identity_; }
 
@@ -230,6 +243,8 @@ public:
 
     /// Set network simulation parameters. Called by Network.
     void ConfigureNetworkSimulator(int latencyMs, float packetLoss);
+    /// Buffered packet size limit, when reached, packet is sent out immediately
+    void SetPacketSizeLimit(int limit);
 
     /// Current controls.
     Controls controls_;
@@ -265,6 +280,8 @@ private:
     void ProcessExistingNode(Node* node, NodeReplicationState& nodeState);
     /// Process a SyncPackagesInfo message from server.
     void ProcessPackageInfo(int msgID, MemoryBuffer& msg);
+    /// Process unknown message. All unknown messages are forwarded as an events
+    void ProcessUnknownMessage(int msgID, MemoryBuffer& msg);
     /// Check a package list received from server and initiate package downloads as necessary. Return true on success, or false if failed to initialze downloads (cache dir not set).
     bool RequestNeededPackages(unsigned numPackages, MemoryBuffer& msg);
     /// Initiate a package download.
@@ -328,6 +345,10 @@ private:
     Timer packetCounterTimer_;
     /// Last heard timer, resets when new packet is incoming.
     Timer lastHeardTimer_;
+    /// Outgoing packet buffer which can contain multiple messages
+    HashMap<int, VectorBuffer> outgoingBuffer_;
+    /// Outgoing packet size limit
+    int packedMessageLimit_;
 };
 
 }

+ 3 - 10
Source/Urho3D/Network/Network.cpp

@@ -300,15 +300,6 @@ void Network::HandleMessage(const SLNet::AddressOrGUID& source, int packetID, in
         MemoryBuffer msg(data, (unsigned)numBytes);
         if (connection->ProcessMessage((int)msgID, msg))
             return;
-
-        // If message was not handled internally, forward as an event
-        using namespace NetworkMessage;
-
-        VariantMap& eventData = GetEventDataMap();
-        eventData[P_CONNECTION] = connection;
-        eventData[P_MESSAGEID] = (int)msgID;
-        eventData[P_DATA].SetBuffer(msg.GetData(), msg.GetSize());
-        connection->SendEvent(E_NETWORKMESSAGE, eventData);
     }
     else
         URHO3D_LOGWARNING("Discarding message from unknown MessageConnection " + String(source.ToString()));
@@ -879,7 +870,7 @@ void Network::HandleIncomingPacket(SLNet::Packet* packet, bool isServer)
         else
         {
             MemoryBuffer buffer(packet->data + dataStart, packet->length - dataStart);
-            bool processed = serverConnection_->ProcessMessage(messageID, buffer);
+            bool processed = serverConnection_ && serverConnection_->ProcessMessage(messageID, buffer);
             if (!processed)
             {
                 HandleMessage(packet->systemAddress, 0, messageID, (const char*)(packet->data + dataStart), packet->length - dataStart);
@@ -962,6 +953,7 @@ void Network::PostUpdate(float timeStep)
                     i->second_->SendServerUpdate();
                     i->second_->SendRemoteEvents();
                     i->second_->SendPackages();
+                    i->second_->SendAllBuffers();
                 }
             }
         }
@@ -971,6 +963,7 @@ void Network::PostUpdate(float timeStep)
             // Send the client update
             serverConnection_->SendClientUpdate();
             serverConnection_->SendRemoteEvents();
+            serverConnection_->SendAllBuffers();
         }
 
         // Notify that the update was sent

+ 3 - 0
Source/Urho3D/Network/Protocol.h

@@ -64,6 +64,9 @@ static const int MSG_REMOTENODEEVENT = 0x97;
 /// Server->client: info about package.
 static const int MSG_PACKAGEINFO = 0x98;
 
+/// Packet that includes all the above messages
+static const int MSG_PACKED_MESSAGE = 0x99;
+
 /// Fixed content ID for client controls update.
 static const unsigned CONTROLS_CONTENT_ID = 1;
 /// Package file fragment size.