|
@@ -16,8 +16,6 @@
|
|
|
@brief Implements the UDP-specific code of MessageConnection.
|
|
@brief Implements the UDP-specific code of MessageConnection.
|
|
|
\todo Flow control currently disabled since testing out the performance of UDT. */
|
|
\todo Flow control currently disabled since testing out the performance of UDT. */
|
|
|
|
|
|
|
|
-// Modified by Lasse Öörni for Urho3D
|
|
|
|
|
-
|
|
|
|
|
#include <cmath>
|
|
#include <cmath>
|
|
|
#include <cstdio>
|
|
#include <cstdio>
|
|
|
#include <sstream>
|
|
#include <sstream>
|
|
@@ -44,25 +42,26 @@ using namespace std;
|
|
|
namespace kNet
|
|
namespace kNet
|
|
|
{
|
|
{
|
|
|
|
|
|
|
|
|
|
+static const int initialDatagramRatePerSecond = 30;
|
|
|
/// The maximum time to wait before acking a packet. If there are enough packets to ack for a full ack message,
|
|
/// The maximum time to wait before acking a packet. If there are enough packets to ack for a full ack message,
|
|
|
/// acking will be performed earlier. (milliseconds)
|
|
/// acking will be performed earlier. (milliseconds)
|
|
|
static const float maxAckDelay = 33.f; // (1/30th of a second)
|
|
static const float maxAckDelay = 33.f; // (1/30th of a second)
|
|
|
|
|
+/// The time counter after which an unacked reliable message will be resent. (UDP only)
|
|
|
|
|
+static const float timeOutMilliseconds = 2000.f;//750.f;
|
|
|
/// The maximum number of datagrams to read in from the socket at one go - after this reads will be throttled
|
|
/// The maximum number of datagrams to read in from the socket at one go - after this reads will be throttled
|
|
|
/// to give time for data sending as well.
|
|
/// to give time for data sending as well.
|
|
|
static const int cMaxDatagramsToReadInOneFrame = 2048;
|
|
static const int cMaxDatagramsToReadInOneFrame = 2048;
|
|
|
|
|
|
|
|
-/// Minimum retransmission timeout value (milliseconds)
|
|
|
|
|
-static const float minRTOTimeoutValue = 500.f;
|
|
|
|
|
-/// Maximum retransmission timeout value (milliseconds)
|
|
|
|
|
-static const float maxRTOTimeoutValue = 5000.f;
|
|
|
|
|
|
|
+static const u32 cMaxUDPMessageFragmentSize = 470;
|
|
|
|
|
|
|
|
UDPMessageConnection::UDPMessageConnection(Network *owner, NetworkServer *ownerServer, Socket *socket, ConnectionState startingState)
|
|
UDPMessageConnection::UDPMessageConnection(Network *owner, NetworkServer *ownerServer, Socket *socket, ConnectionState startingState)
|
|
|
:MessageConnection(owner, ownerServer, socket, startingState),
|
|
:MessageConnection(owner, ownerServer, socket, startingState),
|
|
|
-retransmissionTimeout(1000.f), numAcksLastFrame(0), numLossesLastFrame(0), smoothedRTT(1000.f), rttVariation(0.f), rttCleared(true), // Set RTT initial values as per RFC 2988.
|
|
|
|
|
|
|
+retransmissionTimeout(3.f), numAcksLastFrame(0), numLossesLastFrame(0), smoothedRTT(3.f), rttVariation(0.f), rttCleared(true), // Set RTT initial values as per RFC 2988.
|
|
|
lastReceivedInOrderPacketID(0),
|
|
lastReceivedInOrderPacketID(0),
|
|
|
lastSentInOrderPacketID(0), datagramPacketIDCounter(1),
|
|
lastSentInOrderPacketID(0), datagramPacketIDCounter(1),
|
|
|
-packetLossRate(0.f), packetLossCount(0.f),
|
|
|
|
|
-datagramSendRate(50.f), lowestDatagramSendRateOnPacketLoss(50.f), slowModeDelay(0),
|
|
|
|
|
|
|
+packetLossRate(0.f), packetLossCount(0.f), datagramOutRatePerSecond(initialDatagramRatePerSecond),
|
|
|
|
|
+datagramInRatePerSecond(initialDatagramRatePerSecond),
|
|
|
|
|
+datagramSendRate(70),
|
|
|
receivedPacketIDs(64 * 1024), outboundPacketAckTrack(1024),
|
|
receivedPacketIDs(64 * 1024), outboundPacketAckTrack(1024),
|
|
|
previousReceivedPacketID(0), queuedInboundDatagrams(128)
|
|
previousReceivedPacketID(0), queuedInboundDatagrams(128)
|
|
|
{
|
|
{
|
|
@@ -140,7 +139,7 @@ UDPMessageConnection::SocketReadResult UDPMessageConnection::ReadSocket(size_t &
|
|
|
{
|
|
{
|
|
|
connectionState = ConnectionOK;
|
|
connectionState = ConnectionOK;
|
|
|
LOG(LogUser, "UDPMessageConnection::ReadSocket: Received data from socket %s. Transitioned from ConnectionPending to ConnectionOK state.",
|
|
LOG(LogUser, "UDPMessageConnection::ReadSocket: Received data from socket %s. Transitioned from ConnectionPending to ConnectionOK state.",
|
|
|
- (socket ? socket->ToString().CString() : "(null)"));
|
|
|
|
|
|
|
+ (socket ? socket->ToString().c_str() : "(null)"));
|
|
|
}
|
|
}
|
|
|
if (readResult == SocketReadError)
|
|
if (readResult == SocketReadError)
|
|
|
return SocketReadError;
|
|
return SocketReadError;
|
|
@@ -156,10 +155,10 @@ void UDPMessageConnection::PerformPacketAckSends()
|
|
|
AssertInWorkerThreadContext();
|
|
AssertInWorkerThreadContext();
|
|
|
|
|
|
|
|
tick_t now = Clock::Tick();
|
|
tick_t now = Clock::Tick();
|
|
|
- while(inboundPacketAckTrack.Size() > 0)
|
|
|
|
|
|
|
+ while(inboundPacketAckTrack.size() > 0)
|
|
|
{
|
|
{
|
|
|
- if (Clock::TimespanToMillisecondsF(inboundPacketAckTrack.Begin()->second_.sentTick, now) < maxAckDelay &&
|
|
|
|
|
- inboundPacketAckTrack.Size() < 33)
|
|
|
|
|
|
|
+ if (Clock::TimespanToMillisecondsF(inboundPacketAckTrack.begin()->second.sentTick, now) < maxAckDelay &&
|
|
|
|
|
+ inboundPacketAckTrack.size() < 33)
|
|
|
break;
|
|
break;
|
|
|
|
|
|
|
|
SendPacketAckMessage();
|
|
SendPacketAckMessage();
|
|
@@ -256,67 +255,42 @@ void UDPMessageConnection::HandleFlowControl()
|
|
|
AssertInWorkerThreadContext();
|
|
AssertInWorkerThreadContext();
|
|
|
|
|
|
|
|
// In packets/second.
|
|
// In packets/second.
|
|
|
- const float minBandwidthOnLoss = 10.f;
|
|
|
|
|
- const float minBandwidth = 50.f;
|
|
|
|
|
- const float maxBandwidth = 10000.f;
|
|
|
|
|
- const int framesPerSec = 10;
|
|
|
|
|
- const int maxSlowModeDelay = 10 * framesPerSec;
|
|
|
|
|
-
|
|
|
|
|
- const tick_t frameLength = Clock::TicksPerSec() / framesPerSec; // in ticks
|
|
|
|
|
- const tick_t now = Clock::Tick();
|
|
|
|
|
|
|
+ const float totalEstimatedBandwidth = 50; ///\todo Make this estimation dynamic as in UDT or similar.
|
|
|
|
|
+ const float additiveIncreaseAggressiveness = 5e-2f;
|
|
|
|
|
|
|
|
- unsigned long numFrames = (unsigned long)(Clock::TicksInBetween(now, lastFrameTime) / frameLength);
|
|
|
|
|
- if (numFrames > 0)
|
|
|
|
|
|
|
+ const tick_t frameLength = Clock::TicksPerSec() / 100; // in ticks
|
|
|
|
|
+ // Additively increase the outbound send rate.
|
|
|
|
|
+ unsigned long numFrames = (unsigned long)(Clock::TicksInBetween(Clock::Tick(), lastFrameTime) / frameLength);
|
|
|
|
|
+ if (/*numAcksLastFrame > 0 &&*/ numFrames > 0)
|
|
|
{
|
|
{
|
|
|
- if (numFrames >= framesPerSec)
|
|
|
|
|
- numFrames = framesPerSec;
|
|
|
|
|
-
|
|
|
|
|
- int numUnacked = NumOutboundUnackedDatagrams();
|
|
|
|
|
|
|
+ if (numFrames >= 100)
|
|
|
|
|
+ numFrames = 100;
|
|
|
|
|
|
|
|
- // Reduce sendrate on significant loss
|
|
|
|
|
- if (numLossesLastFrame > 2)
|
|
|
|
|
|
|
+ if (numLossesLastFrame > 5) // Do not respond to a random single packet losses.
|
|
|
{
|
|
{
|
|
|
float oldRate = datagramSendRate;
|
|
float oldRate = datagramSendRate;
|
|
|
- datagramSendRate = min(datagramSendRate, max(minBandwidthOnLoss, lowestDatagramSendRateOnPacketLoss * 0.9f)); // Multiplicative decreases.
|
|
|
|
|
|
|
+ datagramSendRate = min(datagramSendRate, max(1.f, lowestDatagramSendRateOnPacketLoss * 0.9f)); // Multiplicative decreases.
|
|
|
|
|
+// datagramSendRate = max(1.f, datagramSendRate * 0.9f); // Multiplicative decreases.
|
|
|
LOG(LogVerbose, "Received %d losses. datagramSendRate backed to %.2f from %.2f", (int)numLossesLastFrame, datagramSendRate, oldRate);
|
|
LOG(LogVerbose, "Received %d losses. datagramSendRate backed to %.2f from %.2f", (int)numLossesLastFrame, datagramSendRate, oldRate);
|
|
|
}
|
|
}
|
|
|
- else
|
|
|
|
|
|
|
+ else // Additive increases.
|
|
|
{
|
|
{
|
|
|
- // Check if more or less bandwidth is needed
|
|
|
|
|
- ///\todo Very simple logic for now, can be improved
|
|
|
|
|
- bool needMore = outboundQueue.Size() > 10;
|
|
|
|
|
- bool needLess = outboundQueue.Size() == 0;
|
|
|
|
|
- float maxRTT = max(rtt, smoothedRTT);
|
|
|
|
|
-
|
|
|
|
|
- // Need more: increase sendrate. Factor in RTT and acks
|
|
|
|
|
- if (needMore && numLossesLastFrame == 0)
|
|
|
|
|
- {
|
|
|
|
|
- float delta = (50.f + 2.f * numAcksLastFrame) / maxRTT;
|
|
|
|
|
- if (slowModeDelay > 0)
|
|
|
|
|
- delta *= 0.2f;
|
|
|
|
|
- datagramSendRate = min(datagramSendRate + delta, maxBandwidth);
|
|
|
|
|
- lowestDatagramSendRateOnPacketLoss = datagramSendRate;
|
|
|
|
|
- }
|
|
|
|
|
- // Need less: decrease sendrate if not already at minimum
|
|
|
|
|
- else if (needLess && datagramSendRate > minBandwidth)
|
|
|
|
|
- datagramSendRate = max(datagramSendRate * 0.98f, minBandwidth);
|
|
|
|
|
-
|
|
|
|
|
- // Whenever slow mode or slight loss is occurring and RTT is more than the minimum RTO value, back off slowly
|
|
|
|
|
- // This is to ensure we do not stay "balanced" in a state where slight loss occurs constantly due to sending too much
|
|
|
|
|
- if ((numLossesLastFrame > 0 || slowModeDelay > 0) && maxRTT > minRTOTimeoutValue && datagramSendRate > minBandwidth)
|
|
|
|
|
- datagramSendRate = max(datagramSendRate * 0.999f, minBandwidth);
|
|
|
|
|
|
|
+ float increment = min((float)numFrames * additiveIncreaseAggressiveness * (totalEstimatedBandwidth - datagramSendRate), 1.f);
|
|
|
|
|
+ datagramSendRate += increment;
|
|
|
|
|
+ datagramSendRate = min(datagramSendRate, totalEstimatedBandwidth);
|
|
|
|
|
+ lowestDatagramSendRateOnPacketLoss = datagramSendRate;
|
|
|
|
|
+// LOG(LogVerbose, "Incremented sendRate by %.2f to %.2f", increment, datagramSendRate);
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- // Update the slow mode timer
|
|
|
|
|
- if (numLossesLastFrame > 1)
|
|
|
|
|
- slowModeDelay = min(slowModeDelay + numLossesLastFrame * framesPerSec, maxSlowModeDelay);
|
|
|
|
|
- else if (slowModeDelay > 0)
|
|
|
|
|
- --slowModeDelay;
|
|
|
|
|
-
|
|
|
|
|
numAcksLastFrame = 0;
|
|
numAcksLastFrame = 0;
|
|
|
numLossesLastFrame = 0;
|
|
numLossesLastFrame = 0;
|
|
|
- lastFrameTime = now;
|
|
|
|
|
|
|
+ if (numFrames < 100)
|
|
|
|
|
+ lastFrameTime += numFrames * frameLength;
|
|
|
|
|
+ else
|
|
|
|
|
+ lastFrameTime = Clock::Tick();
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ // Do a fixed flow control for testing.
|
|
|
|
|
+ datagramSendRate = 1000; ///\todo Remove.
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void UDPMessageConnection::SendOutPackets()
|
|
void UDPMessageConnection::SendOutPackets()
|
|
@@ -361,7 +335,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
if (!CanSendOutNewDatagram())
|
|
if (!CanSendOutNewDatagram())
|
|
|
return PacketSendThrottled;
|
|
return PacketSendThrottled;
|
|
|
|
|
|
|
|
- OverlappedTransferBuffer *data = socket->BeginSend();
|
|
|
|
|
|
|
+ OverlappedTransferBuffer *data = socket->BeginSend(socket->MaxSendSize());
|
|
|
if (!data)
|
|
if (!data)
|
|
|
return PacketSendThrottled;
|
|
return PacketSendThrottled;
|
|
|
|
|
|
|
@@ -369,7 +343,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
const size_t maxSendSize = socket->MaxSendSize();
|
|
const size_t maxSendSize = socket->MaxSendSize();
|
|
|
|
|
|
|
|
// Push out all the pending data to the socket.
|
|
// Push out all the pending data to the socket.
|
|
|
- datagramSerializedMessages.Clear();
|
|
|
|
|
|
|
+ datagramSerializedMessages.clear();
|
|
|
|
|
|
|
|
// If true, the receiver needs to Ack the packet we are now crafting.
|
|
// If true, the receiver needs to Ack the packet we are now crafting.
|
|
|
bool reliable = false;
|
|
bool reliable = false;
|
|
@@ -381,7 +355,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
|
|
|
|
|
unsigned long smallestReliableMessageNumber = 0xFFFFFFFF;
|
|
unsigned long smallestReliableMessageNumber = 0xFFFFFFFF;
|
|
|
|
|
|
|
|
- skippedMessages.Clear();
|
|
|
|
|
|
|
+ skippedMessages.clear();
|
|
|
|
|
|
|
|
// Fill up the rest of the packet from messages from the outbound queue.
|
|
// Fill up the rest of the packet from messages from the outbound queue.
|
|
|
while(outboundQueue.Size() > 0)
|
|
while(outboundQueue.Size() > 0)
|
|
@@ -413,7 +387,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
{
|
|
{
|
|
|
LOG(LogError, "Throttling fragmented transfer send! No free TransferID to start a new fragmented transfer with!");
|
|
LOG(LogError, "Throttling fragmented transfer send! No free TransferID to start a new fragmented transfer with!");
|
|
|
outboundQueue.PopFront();
|
|
outboundQueue.PopFront();
|
|
|
- skippedMessages.Push(msg);
|
|
|
|
|
|
|
+ skippedMessages.push_back(msg);
|
|
|
continue;
|
|
continue;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -425,13 +399,13 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
int totalMessageSize = msg->GetTotalDatagramPackedSize();// + ((msg->inOrder && !inOrder) ? cBytesForInOrderDeltaCounter : 0);
|
|
int totalMessageSize = msg->GetTotalDatagramPackedSize();// + ((msg->inOrder && !inOrder) ? cBytesForInOrderDeltaCounter : 0);
|
|
|
|
|
|
|
|
// If this message won't fit into the buffer, send out all the previously gathered messages (there must at least be one previously submitted message).
|
|
// If this message won't fit into the buffer, send out all the previously gathered messages (there must at least be one previously submitted message).
|
|
|
- if (datagramSerializedMessages.Size() > 0 && (size_t)packetSizeInBytes + totalMessageSize >= maxSendSize)
|
|
|
|
|
|
|
+ if (datagramSerializedMessages.size() > 0 && (size_t)packetSizeInBytes + totalMessageSize >= maxSendSize)
|
|
|
break;
|
|
break;
|
|
|
|
|
|
|
|
if (totalMessageSize > (int)maxSendSize)
|
|
if (totalMessageSize > (int)maxSendSize)
|
|
|
LOG(LogError, "Warning: Sending out a message of ID %d and size %d bytes, but UDP socket max send size is only %d bytes!", (int)msg->id, totalMessageSize, (int)maxSendSize);
|
|
LOG(LogError, "Warning: Sending out a message of ID %d and size %d bytes, but UDP socket max send size is only %d bytes!", (int)msg->id, totalMessageSize, (int)maxSendSize);
|
|
|
|
|
|
|
|
- datagramSerializedMessages.Push(msg);
|
|
|
|
|
|
|
+ datagramSerializedMessages.push_back(msg);
|
|
|
outboundQueue.PopFront();
|
|
outboundQueue.PopFront();
|
|
|
|
|
|
|
|
packetSizeInBytes += totalMessageSize;
|
|
packetSizeInBytes += totalMessageSize;
|
|
@@ -447,7 +421,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Ensure that the range of the message numbers is within the capacity that the protocol can represent in the byte stream.
|
|
// Ensure that the range of the message numbers is within the capacity that the protocol can represent in the byte stream.
|
|
|
- for(size_t i = 0; i < datagramSerializedMessages.Size(); ++i)
|
|
|
|
|
|
|
+ for(size_t i = 0; i < datagramSerializedMessages.size(); ++i)
|
|
|
if (datagramSerializedMessages[i]->reliable)
|
|
if (datagramSerializedMessages[i]->reliable)
|
|
|
{
|
|
{
|
|
|
u32 reliableDelta = (u32)(datagramSerializedMessages[i]->reliableMessageNumber - smallestReliableMessageNumber);
|
|
u32 reliableDelta = (u32)(datagramSerializedMessages[i]->reliableMessageNumber - smallestReliableMessageNumber);
|
|
@@ -455,15 +429,15 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
{ // they will have to be serialized in separate datagrams.
|
|
{ // they will have to be serialized in separate datagrams.
|
|
|
LOG(LogError, "UDPMessageConnection::SendOutPacket: Too large msgnum delta present - skipping serialization of message with ID %d (lowest: %d, delta: %d)",
|
|
LOG(LogError, "UDPMessageConnection::SendOutPacket: Too large msgnum delta present - skipping serialization of message with ID %d (lowest: %d, delta: %d)",
|
|
|
(int)datagramSerializedMessages[i]->reliableMessageNumber, (int)smallestReliableMessageNumber, (int)reliableDelta);
|
|
(int)datagramSerializedMessages[i]->reliableMessageNumber, (int)smallestReliableMessageNumber, (int)reliableDelta);
|
|
|
- skippedMessages.Push(datagramSerializedMessages[i]);
|
|
|
|
|
- datagramSerializedMessages.Erase(datagramSerializedMessages.Begin() + i);
|
|
|
|
|
|
|
+ skippedMessages.push_back(datagramSerializedMessages[i]);
|
|
|
|
|
+ datagramSerializedMessages.erase(datagramSerializedMessages.begin() + i);
|
|
|
--i;
|
|
--i;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// If we had skipped any messages from the outbound queue while looking for good messages to send, put all the messages
|
|
// If we had skipped any messages from the outbound queue while looking for good messages to send, put all the messages
|
|
|
// we skipped back to the outbound queue to wait to be processed during subsequent frames.
|
|
// we skipped back to the outbound queue to wait to be processed during subsequent frames.
|
|
|
- for(size_t i = 0; i < skippedMessages.Size(); ++i)
|
|
|
|
|
|
|
+ for(size_t i = 0; i < skippedMessages.size(); ++i)
|
|
|
#ifdef KNET_NO_MAXHEAP
|
|
#ifdef KNET_NO_MAXHEAP
|
|
|
outboundQueue.InsertWithResize(skippedMessages[i]);
|
|
outboundQueue.InsertWithResize(skippedMessages[i]);
|
|
|
#else
|
|
#else
|
|
@@ -486,7 +460,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
bool sentDisconnectAckMessage = false;
|
|
bool sentDisconnectAckMessage = false;
|
|
|
|
|
|
|
|
// Write all the messages in this UDP packet.
|
|
// Write all the messages in this UDP packet.
|
|
|
- for(size_t i = 0; i < datagramSerializedMessages.Size(); ++i)
|
|
|
|
|
|
|
+ for(size_t i = 0; i < datagramSerializedMessages.size(); ++i)
|
|
|
{
|
|
{
|
|
|
NetworkMessage *msg = datagramSerializedMessages[i];
|
|
NetworkMessage *msg = datagramSerializedMessages[i];
|
|
|
assert(!msg->transfer || msg->transfer->id != -1);
|
|
assert(!msg->transfer || msg->transfer->id != -1);
|
|
@@ -522,36 +496,52 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
if (msg->transfer == 0 || msg->fragmentIndex == 0)
|
|
if (msg->transfer == 0 || msg->fragmentIndex == 0)
|
|
|
writer.AddVLE<VLE8_16_32>(msg->id); // Add the message ID number.
|
|
writer.AddVLE<VLE8_16_32>(msg->id); // Add the message ID number.
|
|
|
if (msg->dataSize > 0) // Add the actual message payload data.
|
|
if (msg->dataSize > 0) // Add the actual message payload data.
|
|
|
|
|
+ {
|
|
|
|
|
+ if (networkSendSimulator.enabled &&
|
|
|
|
|
+ (networkSendSimulator.corruptionType == NetworkSimulator::CorruptPayload ||
|
|
|
|
|
+ (networkSendSimulator.corruptionType == NetworkSimulator::CorruptMessageType &&
|
|
|
|
|
+ msg->id == networkSendSimulator.corruptMessageId)))
|
|
|
|
|
+ networkSendSimulator.MaybeCorruptBufferToggleBits(msg->data, msg->dataSize);
|
|
|
writer.AddAlignedByteArray(msg->data, msg->dataSize);
|
|
writer.AddAlignedByteArray(msg->data, msg->dataSize);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Send the crafted packet out to the socket.
|
|
// Send the crafted packet out to the socket.
|
|
|
- data->buffer.len = writer.BytesFilled();
|
|
|
|
|
- bool success = socket->EndSend(data);
|
|
|
|
|
|
|
+ data->bytesContains = writer.BytesFilled();
|
|
|
|
|
+ bool success;
|
|
|
|
|
+
|
|
|
|
|
+ if (!networkSendSimulator.enabled)
|
|
|
|
|
+ success = socket->EndSend(data); // Send the data out.
|
|
|
|
|
+ else
|
|
|
|
|
+ {
|
|
|
|
|
+ // We're running a network simulator. Pass the buffer to networkSendSimulator for delayed sending.
|
|
|
|
|
+ networkSendSimulator.SubmitSendBuffer(data, socket);
|
|
|
|
|
+ success = true; // Act here as if we succeeded.
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
if (!success)
|
|
if (!success)
|
|
|
{
|
|
{
|
|
|
// We failed, so put all messages back to the outbound queue, except for those that are from old in-order packet,
|
|
// We failed, so put all messages back to the outbound queue, except for those that are from old in-order packet,
|
|
|
// since they need to be resent with the old packet ID and not as fresh messages.
|
|
// since they need to be resent with the old packet ID and not as fresh messages.
|
|
|
- for(size_t i = 0; i < datagramSerializedMessages.Size(); ++i)
|
|
|
|
|
|
|
+ for(size_t i = 0; i < datagramSerializedMessages.size(); ++i)
|
|
|
outboundQueue.Insert(datagramSerializedMessages[i]);
|
|
outboundQueue.Insert(datagramSerializedMessages[i]);
|
|
|
|
|
|
|
|
- LOG(LogError, "UDPMessageConnection::SendOutPacket: Socket::EndSend failed to socket %s!", socket->ToString().CString());
|
|
|
|
|
|
|
+ LOG(LogError, "UDPMessageConnection::SendOutPacket: Socket::EndSend failed to socket %s!", socket->ToString().c_str());
|
|
|
return PacketSendSocketFull;
|
|
return PacketSendSocketFull;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Sending the datagram succeeded - increment the send count of each message by one, to remember the retry timeout count.
|
|
// Sending the datagram succeeded - increment the send count of each message by one, to remember the retry timeout count.
|
|
|
- for(size_t i = 0; i < datagramSerializedMessages.Size(); ++i)
|
|
|
|
|
|
|
+ for(size_t i = 0; i < datagramSerializedMessages.size(); ++i)
|
|
|
{
|
|
{
|
|
|
++datagramSerializedMessages[i]->sendCount;
|
|
++datagramSerializedMessages[i]->sendCount;
|
|
|
|
|
|
|
|
#ifdef KNET_NETWORK_PROFILING
|
|
#ifdef KNET_NETWORK_PROFILING
|
|
|
- String str;
|
|
|
|
|
- if (!datagramSerializedMessages[i]->profilerName.Empty())
|
|
|
|
|
- str += "messageOut." + datagramSerializedMessages[i]->profilerName;
|
|
|
|
|
|
|
+ std::stringstream ss;
|
|
|
|
|
+ if (!datagramSerializedMessages[i]->profilerName.empty())
|
|
|
|
|
+ ss << "messageOut." << datagramSerializedMessages[i]->profilerName;
|
|
|
else
|
|
else
|
|
|
- str += "messageOut." + String((unsigned)datagramSerializedMessages[i]->id);
|
|
|
|
|
- ADDEVENT(str.CString(), (float)datagramSerializedMessages[i]->Size(), "bytes");
|
|
|
|
|
|
|
+ ss << "messageOut." << datagramSerializedMessages[i]->id;
|
|
|
|
|
+ ADDEVENT(ss.str().c_str(), (float)datagramSerializedMessages[i]->Size(), "bytes");
|
|
|
if (datagramSerializedMessages[i]->transfer)
|
|
if (datagramSerializedMessages[i]->transfer)
|
|
|
{
|
|
{
|
|
|
if (datagramSerializedMessages[i]->fragmentIndex > 0)
|
|
if (datagramSerializedMessages[i]->fragmentIndex > 0)
|
|
@@ -570,7 +560,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
lastSentInOrderPacketID = datagramPacketIDCounter;
|
|
lastSentInOrderPacketID = datagramPacketIDCounter;
|
|
|
datagramPacketIDCounter = AddPacketID(datagramPacketIDCounter, 1);
|
|
datagramPacketIDCounter = AddPacketID(datagramPacketIDCounter, 1);
|
|
|
|
|
|
|
|
- AddOutboundStats(writer.BytesFilled(), 1, datagramSerializedMessages.Size());
|
|
|
|
|
|
|
+ AddOutboundStats(writer.BytesFilled(), 1, datagramSerializedMessages.size());
|
|
|
ADDEVENT("datagramOut", (float)writer.BytesFilled(), "bytes");
|
|
ADDEVENT("datagramOut", (float)writer.BytesFilled(), "bytes");
|
|
|
|
|
|
|
|
if (reliable)
|
|
if (reliable)
|
|
@@ -582,10 +572,11 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
const tick_t now = Clock::Tick();
|
|
const tick_t now = Clock::Tick();
|
|
|
ack.sendCount = 1;
|
|
ack.sendCount = 1;
|
|
|
ack.sentTick = now;
|
|
ack.sentTick = now;
|
|
|
|
|
+ retransmissionTimeout = 5000.f; ///\todo Remove this.
|
|
|
ack.timeoutTick = now + (tick_t)((double)retransmissionTimeout * Clock::TicksPerMillisecond());
|
|
ack.timeoutTick = now + (tick_t)((double)retransmissionTimeout * Clock::TicksPerMillisecond());
|
|
|
ack.datagramSendRate = datagramSendRate;
|
|
ack.datagramSendRate = datagramSendRate;
|
|
|
|
|
|
|
|
- for(size_t i = 0; i < datagramSerializedMessages.Size(); ++i)
|
|
|
|
|
|
|
+ for(size_t i = 0; i < datagramSerializedMessages.size(); ++i)
|
|
|
{
|
|
{
|
|
|
if (datagramSerializedMessages[i]->reliable)
|
|
if (datagramSerializedMessages[i]->reliable)
|
|
|
ack.messages.push_back(datagramSerializedMessages[i]); // The ownership of these messages is transferred into this struct.
|
|
ack.messages.push_back(datagramSerializedMessages[i]); // The ownership of these messages is transferred into this struct.
|
|
@@ -600,7 +591,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
else // We sent an unreliable datagram.
|
|
else // We sent an unreliable datagram.
|
|
|
{
|
|
{
|
|
|
// This is send-and-forget, we can free all the message data we just sent.
|
|
// This is send-and-forget, we can free all the message data we just sent.
|
|
|
- for(size_t i = 0; i < datagramSerializedMessages.Size(); ++i)
|
|
|
|
|
|
|
+ for(size_t i = 0; i < datagramSerializedMessages.size(); ++i)
|
|
|
{
|
|
{
|
|
|
ClearOutboundMessageWithContentID(datagramSerializedMessages[i]);
|
|
ClearOutboundMessageWithContentID(datagramSerializedMessages[i]);
|
|
|
FreeMessage(datagramSerializedMessages[i]);
|
|
FreeMessage(datagramSerializedMessages[i]);
|
|
@@ -617,7 +608,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
connectionState = ConnectionClosed;
|
|
connectionState = ConnectionClosed;
|
|
|
if (socket)
|
|
if (socket)
|
|
|
socket->MarkWriteClosed();
|
|
socket->MarkWriteClosed();
|
|
|
- LOG(LogInfo, "UDPMessageConnection::SendOutPacket: Send Disconnect from connection %s.", ToString().CString());
|
|
|
|
|
|
|
+ LOG(LogInfo, "UDPMessageConnection::SendOutPacket: Send Disconnect from connection %s.", ToString().c_str());
|
|
|
}
|
|
}
|
|
|
// If we sent out the DisconnectAck message, we can tear down the connection right now - we're finished.
|
|
// If we sent out the DisconnectAck message, we can tear down the connection right now - we're finished.
|
|
|
if (sentDisconnectAckMessage)
|
|
if (sentDisconnectAckMessage)
|
|
@@ -628,7 +619,7 @@ MessageConnection::PacketSendResult UDPMessageConnection::SendOutPacket()
|
|
|
socket->MarkWriteClosed();
|
|
socket->MarkWriteClosed();
|
|
|
}
|
|
}
|
|
|
connectionState = ConnectionClosed;
|
|
connectionState = ConnectionClosed;
|
|
|
- LOG(LogInfo, "UDPMessageConnection::SendOutPacket: Send DisconnectAck from connection %s.", ToString().CString());
|
|
|
|
|
|
|
+ LOG(LogInfo, "UDPMessageConnection::SendOutPacket: Send DisconnectAck from connection %s.", ToString().c_str());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
LOG(LogVerbose, "UDPMessageConnection::SendOutPacket: Socket::EndSend succeeded with %d bytes.", (int)writer.BytesFilled());
|
|
LOG(LogVerbose, "UDPMessageConnection::SendOutPacket: Socket::EndSend succeeded with %d bytes.", (int)writer.BytesFilled());
|
|
@@ -662,17 +653,28 @@ void UDPMessageConnection::DoUpdateConnection()
|
|
|
|
|
|
|
|
udpUpdateTimer.StartMSecs(10.f);
|
|
udpUpdateTimer.StartMSecs(10.f);
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+/*
|
|
|
|
|
+ if (statsUpdateTimer.TriggeredOrNotRunning())
|
|
|
|
|
+ {
|
|
|
|
|
+ ///\todo Put this behind a timer - update only once every 1 sec or so.
|
|
|
|
|
+ ComputePacketLoss();
|
|
|
|
|
+ statsUpdateTimer.StartMSecs(1000.f);
|
|
|
|
|
+ }
|
|
|
|
|
+*/
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
unsigned long UDPMessageConnection::TimeUntilCanSendPacket() const
|
|
unsigned long UDPMessageConnection::TimeUntilCanSendPacket() const
|
|
|
{
|
|
{
|
|
|
- tick_t now = Clock::Tick();
|
|
|
|
|
|
|
+ const tick_t now = Clock::Tick();
|
|
|
|
|
|
|
|
|
|
+ // The interval at which we send out datagrams.
|
|
|
const tick_t datagramSendTickDelay = (tick_t)(Clock::TicksPerSec() / datagramSendRate);
|
|
const tick_t datagramSendTickDelay = (tick_t)(Clock::TicksPerSec() / datagramSendRate);
|
|
|
- tick_t nextDatagramSendTime = lastDatagramSendTime + datagramSendTickDelay;
|
|
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ const tick_t nextDatagramSendTime = lastDatagramSendTime + datagramSendTickDelay;
|
|
|
|
|
+
|
|
|
if (Clock::IsNewer(now, nextDatagramSendTime))
|
|
if (Clock::IsNewer(now, nextDatagramSendTime))
|
|
|
- return 0;
|
|
|
|
|
|
|
+ return 0; // We are already due to send out the next datagram?
|
|
|
|
|
|
|
|
return (unsigned long)Clock::TimespanToMillisecondsF(now, nextDatagramSendTime);
|
|
return (unsigned long)Clock::TimespanToMillisecondsF(now, nextDatagramSendTime);
|
|
|
}
|
|
}
|
|
@@ -693,7 +695,7 @@ void UDPMessageConnection::AddReceivedPacketIDStats(packet_id_t packetID)
|
|
|
// if (packetID == 0)
|
|
// if (packetID == 0)
|
|
|
// cs.recvPacketIDs.clear();
|
|
// cs.recvPacketIDs.clear();
|
|
|
|
|
|
|
|
- cs.recvPacketIDs.Push(ConnectionStatistics::DatagramIDTrack());
|
|
|
|
|
|
|
+ cs.recvPacketIDs.push_back(ConnectionStatistics::DatagramIDTrack());
|
|
|
ConnectionStatistics::DatagramIDTrack &t = cs.recvPacketIDs.back();
|
|
ConnectionStatistics::DatagramIDTrack &t = cs.recvPacketIDs.back();
|
|
|
t.tick = Clock::Tick();
|
|
t.tick = Clock::Tick();
|
|
|
t.packetID = packetID;
|
|
t.packetID = packetID;
|
|
@@ -754,10 +756,11 @@ void UDPMessageConnection::ExtractMessages(const char *data, size_t numBytes)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Note that this check must be after the ack check (above), since we still need to ack the new packet as well (our
|
|
// Note that this check must be after the ack check (above), since we still need to ack the new packet as well (our
|
|
|
- // previous ack might not have reached the sender or was delayed, which is why he's resending it).
|
|
|
|
|
|
|
+ // previous ack might not have reached the sender or was delayed, which is why the peer is resending it).
|
|
|
if (HaveReceivedPacketID(packetID))
|
|
if (HaveReceivedPacketID(packetID))
|
|
|
{
|
|
{
|
|
|
ADDEVENT("duplicateReceived", (float)numBytes, "bytes");
|
|
ADDEVENT("duplicateReceived", (float)numBytes, "bytes");
|
|
|
|
|
+ LOG(LogVerbose, "Duplicate datagram with packet ID %d received!", (int)packetID);
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
if (packetID != previousReceivedPacketID + 1)
|
|
if (packetID != previousReceivedPacketID + 1)
|
|
@@ -806,10 +809,10 @@ void UDPMessageConnection::ExtractMessages(const char *data, size_t numBytes)
|
|
|
|
|
|
|
|
reliableMessageNumber = reliableMessageIndexBase + reader.ReadVLE<VLE8_16>();
|
|
reliableMessageNumber = reliableMessageIndexBase + reader.ReadVLE<VLE8_16>();
|
|
|
|
|
|
|
|
- if (receivedReliableMessages.Find(reliableMessageNumber) != receivedReliableMessages.End())
|
|
|
|
|
|
|
+ if (receivedReliableMessages.find(reliableMessageNumber) != receivedReliableMessages.end())
|
|
|
duplicateMessage = true;
|
|
duplicateMessage = true;
|
|
|
else
|
|
else
|
|
|
- receivedReliableMessages.Insert(reliableMessageNumber);
|
|
|
|
|
|
|
+ receivedReliableMessages.insert(reliableMessageNumber);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (contentLength == 0)
|
|
if (contentLength == 0)
|
|
@@ -829,53 +832,58 @@ void UDPMessageConnection::ExtractMessages(const char *data, size_t numBytes)
|
|
|
throw NetException("Malformed UDP packet received! Message payload missing.");
|
|
throw NetException("Malformed UDP packet received! Message payload missing.");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // If we received the start of a new fragment, start tracking a new fragmented transfer.
|
|
|
|
|
- if (fragmentStart)
|
|
|
|
|
|
|
+ if (!duplicateMessage)
|
|
|
{
|
|
{
|
|
|
- if (numTotalFragments == DataDeserializer::VLEReadError || numTotalFragments <= 1)
|
|
|
|
|
|
|
+ // If we received the start of a new fragment, start tracking a new fragmented transfer.
|
|
|
|
|
+ if (fragmentStart)
|
|
|
{
|
|
{
|
|
|
- LOG(LogError, "Malformed UDP packet! This packet had fragmentStart bit on, but parsing numTotalFragments VLE failed!");
|
|
|
|
|
- throw NetException("Malformed UDP packet received! This packet had fragmentStart bit on, but parsing numTotalFragments VLE failed!");
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ if (numTotalFragments == DataDeserializer::VLEReadError || numTotalFragments <= 1)
|
|
|
|
|
+ {
|
|
|
|
|
+ LOG(LogError, "Malformed UDP packet! This packet had fragmentStart bit on, but parsing numTotalFragments VLE failed!");
|
|
|
|
|
+ throw NetException("Malformed UDP packet received! This packet had fragmentStart bit on, but parsing numTotalFragments VLE failed!");
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- if (!duplicateMessage)
|
|
|
|
|
- {
|
|
|
|
|
fragmentedReceives.NewFragmentStartReceived(fragmentTransferID, numTotalFragments, &data[reader.BytePos()], contentLength);
|
|
fragmentedReceives.NewFragmentStartReceived(fragmentTransferID, numTotalFragments, &data[reader.BytePos()], contentLength);
|
|
|
ADDEVENT("FragmentStartReceived", 1, "");
|
|
ADDEVENT("FragmentStartReceived", 1, "");
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- }
|
|
|
|
|
- // If we received a fragment that is a part of an old fragmented transfer, pass it to the fragmented transfer manager
|
|
|
|
|
- // so that it can reconstruct the final stream when the transfer finishes.
|
|
|
|
|
- else if (fragment)
|
|
|
|
|
- {
|
|
|
|
|
- if (fragmentNumber == DataDeserializer::VLEReadError)
|
|
|
|
|
- {
|
|
|
|
|
- LOG(LogError, "Malformed UDP packet! This packet has fragment flag on, but parsing the fragment number failed!");
|
|
|
|
|
- throw NetException("Malformed UDP packet received! This packet has fragment flag on, but parsing the fragment number failed!");
|
|
|
|
|
}
|
|
}
|
|
|
|
|
+ // If we received a fragment that is a part of an old fragmented transfer, pass it to the fragmented transfer manager
|
|
|
|
|
+ // so that it can reconstruct the final stream when the transfer finishes.
|
|
|
|
|
+ else if (fragment)
|
|
|
|
|
+ {
|
|
|
|
|
+ if (fragmentNumber == DataDeserializer::VLEReadError)
|
|
|
|
|
+ {
|
|
|
|
|
+ LOG(LogError, "Malformed UDP packet! This packet has fragment flag on, but parsing the fragment number failed!");
|
|
|
|
|
+ throw NetException("Malformed UDP packet received! This packet has fragment flag on, but parsing the fragment number failed!");
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- ADDEVENT("FragmentReceived", 1, "");
|
|
|
|
|
|
|
+ ADDEVENT("FragmentReceived", 1, "");
|
|
|
|
|
|
|
|
- bool messageReady = fragmentedReceives.NewFragmentReceived(fragmentTransferID, fragmentNumber, &data[reader.BytePos()], contentLength);
|
|
|
|
|
- if (messageReady)
|
|
|
|
|
|
|
+ bool messageReady = fragmentedReceives.NewFragmentReceived(fragmentTransferID, fragmentNumber, &data[reader.BytePos()], contentLength);
|
|
|
|
|
+ if (messageReady)
|
|
|
|
|
+ {
|
|
|
|
|
+ // This was the last fragment of the whole message - reconstruct the message from the fragments and pass it on to
|
|
|
|
|
+ // the client to handle.
|
|
|
|
|
+ assembledData.clear();
|
|
|
|
|
+ fragmentedReceives.AssembleMessage(fragmentTransferID, assembledData);
|
|
|
|
|
+ assert(assembledData.size() > 0);
|
|
|
|
|
+ ///\todo InOrder.
|
|
|
|
|
+ HandleInboundMessage(packetID, &assembledData[0], assembledData.size());
|
|
|
|
|
+ ++numMessagesReceived;
|
|
|
|
|
+ fragmentedReceives.FreeMessage(fragmentTransferID);
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ else
|
|
|
{
|
|
{
|
|
|
- // This was the last fragment of the whole message - reconstruct the message from the fragments and pass it on to
|
|
|
|
|
- // the client to handle.
|
|
|
|
|
- assembledData.Clear();
|
|
|
|
|
- fragmentedReceives.AssembleMessage(fragmentTransferID, assembledData);
|
|
|
|
|
- assert(assembledData.Size() > 0);
|
|
|
|
|
- ///\todo InOrder.
|
|
|
|
|
- HandleInboundMessage(packetID, &assembledData[0], assembledData.Size());
|
|
|
|
|
|
|
+ // Not a fragment, so directly call the handling code.
|
|
|
|
|
+ HandleInboundMessage(packetID, &data[reader.BytePos()], contentLength);
|
|
|
++numMessagesReceived;
|
|
++numMessagesReceived;
|
|
|
- fragmentedReceives.FreeMessage(fragmentTransferID);
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- else if (!duplicateMessage)
|
|
|
|
|
|
|
+ else // this is a duplicate reliable message, ignore it.
|
|
|
{
|
|
{
|
|
|
- // Not a fragment, so directly call the handling code.
|
|
|
|
|
- HandleInboundMessage(packetID, &data[reader.BytePos()], contentLength);
|
|
|
|
|
- ++numMessagesReceived;
|
|
|
|
|
|
|
+ ///\todo Can we remove this duplicate reliable message checking?
|
|
|
|
|
+ LOG(LogVerbose, "Received a duplicate reliable message with message number %d!", (int)reliableMessageNumber);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
reader.SkipBytes(contentLength);
|
|
reader.SkipBytes(contentLength);
|
|
@@ -944,6 +952,31 @@ void UDPMessageConnection::SendDisconnectAckMessage()
|
|
|
LOG(LogInfo, "UDPMessageConnection::SendDisconnectAckMessage: Sent DisconnectAck.");
|
|
LOG(LogInfo, "UDPMessageConnection::SendDisconnectAckMessage: Sent DisconnectAck.");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+void UDPMessageConnection::HandleFlowControlRequestMessage(const char *data, size_t numBytes)
|
|
|
|
|
+{
|
|
|
|
|
+ AssertInWorkerThreadContext();
|
|
|
|
|
+ /*
|
|
|
|
|
+ if (numBytes != 2)
|
|
|
|
|
+ {
|
|
|
|
|
+ LOG(LogError, "Malformed FlowControlRequest message received! Size was %d bytes, expected 2 bytes!", (int)numBytes);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ const u16 minOutboundRate = 5;
|
|
|
|
|
+ const u16 maxOutboundRate = 10 * 1024;
|
|
|
|
|
+ u16 newOutboundRate = *reinterpret_cast<const u16*>(data);
|
|
|
|
|
+ if (newOutboundRate < minOutboundRate || newOutboundRate > maxOutboundRate)
|
|
|
|
|
+ {
|
|
|
|
|
+ LOG(LogError, "Invalid FlowControlRequest rate %d packets/sec received! Ignored. Valid range (%d, %d)", (int)newOutboundRate,
|
|
|
|
|
+ (int)minOutboundRate, (int)maxOutboundRate);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+// LOG(LogVerbose, "Received FlowControl message. Adjusting OutRate from %d to %d msgs/sec.", (int)datagramOutRatePerSecond, (int)newOutboundRate);
|
|
|
|
|
+
|
|
|
|
|
+ datagramOutRatePerSecond = newOutboundRate;*/
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
int UDPMessageConnection::BiasedBinarySearchFindPacketIndex(PacketAckTrackQueue &queue, int packetID)
|
|
int UDPMessageConnection::BiasedBinarySearchFindPacketIndex(PacketAckTrackQueue &queue, int packetID)
|
|
|
{
|
|
{
|
|
|
///\bug Make this all packetID wrap-around -aware.
|
|
///\bug Make this all packetID wrap-around -aware.
|
|
@@ -1009,13 +1042,16 @@ void UDPMessageConnection::FreeOutboundPacketAckTrack(packet_id_t packetID)
|
|
|
|
|
|
|
|
if (track.sendCount <= 1)
|
|
if (track.sendCount <= 1)
|
|
|
{
|
|
{
|
|
|
- UpdateRTOCounterOnPacketAck((float)Clock::TimespanToMillisecondsD(track.sentTick, Clock::Tick()));
|
|
|
|
|
|
|
+ UpdateRTOCounterOnPacketAck((float)Clock::TimespanToSecondsD(track.sentTick, Clock::Tick()));
|
|
|
++numAcksLastFrame;
|
|
++numAcksLastFrame;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
outboundPacketAckTrack.EraseItemAt(itemIndex);
|
|
outboundPacketAckTrack.EraseItemAt(itemIndex);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+static const float minRTOTimeoutValue = 1000.f;
|
|
|
|
|
+static const float maxRTOTimeoutValue = 5000.f;
|
|
|
|
|
+
|
|
|
/// Adjusts the retransmission timer values as per RFC 2988.
|
|
/// Adjusts the retransmission timer values as per RFC 2988.
|
|
|
/// @param rtt The round trip time that was measured on the packet that was just acked.
|
|
/// @param rtt The round trip time that was measured on the packet that was just acked.
|
|
|
void UDPMessageConnection::UpdateRTOCounterOnPacketAck(float rtt)
|
|
void UDPMessageConnection::UpdateRTOCounterOnPacketAck(float rtt)
|
|
@@ -1040,9 +1076,20 @@ void UDPMessageConnection::UpdateRTOCounterOnPacketAck(float rtt)
|
|
|
}
|
|
}
|
|
|
// We add this much constant delay to all RTO timers to avoid too optimistic RTO values
|
|
// We add this much constant delay to all RTO timers to avoid too optimistic RTO values
|
|
|
// in excellent conditions (localhost, LAN).
|
|
// in excellent conditions (localhost, LAN).
|
|
|
|
|
+ const float safetyThresholdAdd = 1.f;
|
|
|
const float safetyThresholdMul = 2.f;
|
|
const float safetyThresholdMul = 2.f;
|
|
|
|
|
|
|
|
- retransmissionTimeout = min(maxRTOTimeoutValue, max(minRTOTimeoutValue, safetyThresholdMul * (smoothedRTT + rttVariation)));
|
|
|
|
|
|
|
+// retransmissionTimeout = min(maxRTOTimeoutValue, max(minRTOTimeoutValue, safetyThresholdAdd + safetyThresholdMul * (smoothedRTT + rttVariation)));
|
|
|
|
|
+ retransmissionTimeout = min(maxRTOTimeoutValue, max(minRTOTimeoutValue, safetyThresholdAdd + safetyThresholdMul * (smoothedRTT + rttVariation)));
|
|
|
|
|
+
|
|
|
|
|
+/// const float maxDatagramSendRate = 3000.f;
|
|
|
|
|
+ // Update data send rate.
|
|
|
|
|
+// ++datagramOutRatePerSecond; // Additive increases.
|
|
|
|
|
+// datagramSendRate = datagramSendRate + 1.f; // Increase by one datagram/successfully sent packet.
|
|
|
|
|
+// datagramSendRate = min(datagramSendRate + 1.f, maxDatagramSendRate); // Increase by one datagram/successfully sent packet.
|
|
|
|
|
+
|
|
|
|
|
+// LOG(LogVerbose, "Packet ack event: RTO: %.3f sec., srtt: %.3f sec., rttvar: %.3f sec. datagramSendRate: %.2f",
|
|
|
|
|
+// retransmissionTimeout, smoothedRTT, rttVariation, datagramSendRate);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
void UDPMessageConnection::UpdateRTOCounterOnPacketLoss()
|
|
void UDPMessageConnection::UpdateRTOCounterOnPacketLoss()
|
|
@@ -1051,10 +1098,14 @@ void UDPMessageConnection::UpdateRTOCounterOnPacketLoss()
|
|
|
|
|
|
|
|
using namespace std;
|
|
using namespace std;
|
|
|
|
|
|
|
|
- // retransmissionTimeout = smoothedRTT = min(maxRTOTimeoutValue, max(minRTOTimeoutValue, smoothedRTT * 2.f));
|
|
|
|
|
|
|
+ retransmissionTimeout = smoothedRTT = min(maxRTOTimeoutValue, max(minRTOTimeoutValue, smoothedRTT * 2.f));
|
|
|
// The variation just gives bogus values, so clear it altogether.
|
|
// The variation just gives bogus values, so clear it altogether.
|
|
|
rttVariation = 0.f;
|
|
rttVariation = 0.f;
|
|
|
|
|
|
|
|
|
|
+ // Multiplicative decreases.
|
|
|
|
|
+// datagramOutRatePerSecond = max(1, datagramOutRatePerSecond / 2);
|
|
|
|
|
+// datagramSendRate = max(1.f, datagramSendRate * 0.9f); // At least send one packet/second.
|
|
|
|
|
+
|
|
|
++numLossesLastFrame;
|
|
++numLossesLastFrame;
|
|
|
|
|
|
|
|
// LOG(LogVerbose, "Packet loss event: RTO: %.3f sec. datagramSendRate: %.2f", retransmissionTimeout, datagramSendRate);
|
|
// LOG(LogVerbose, "Packet loss event: RTO: %.3f sec. datagramSendRate: %.2f", retransmissionTimeout, datagramSendRate);
|
|
@@ -1064,21 +1115,21 @@ void UDPMessageConnection::SendPacketAckMessage()
|
|
|
{
|
|
{
|
|
|
AssertInWorkerThreadContext();
|
|
AssertInWorkerThreadContext();
|
|
|
|
|
|
|
|
- while(inboundPacketAckTrack.Size() > 0)
|
|
|
|
|
|
|
+ while(inboundPacketAckTrack.size() > 0)
|
|
|
{
|
|
{
|
|
|
- packet_id_t packetID = inboundPacketAckTrack.Begin()->first_;
|
|
|
|
|
|
|
+ packet_id_t packetID = inboundPacketAckTrack.begin()->first;
|
|
|
u32 sequence = 0;
|
|
u32 sequence = 0;
|
|
|
|
|
|
|
|
- inboundPacketAckTrack.Erase(packetID);
|
|
|
|
|
|
|
+ inboundPacketAckTrack.erase(packetID);
|
|
|
for(int i = 0; i < 32; ++i)
|
|
for(int i = 0; i < 32; ++i)
|
|
|
{
|
|
{
|
|
|
packet_id_t id = AddPacketID(packetID, i + 1);
|
|
packet_id_t id = AddPacketID(packetID, i + 1);
|
|
|
|
|
|
|
|
- PacketAckTrackMap::Iterator iter = inboundPacketAckTrack.Find(id);
|
|
|
|
|
- if (iter != inboundPacketAckTrack.End())
|
|
|
|
|
|
|
+ PacketAckTrackMap::iterator iter = inboundPacketAckTrack.find(id);
|
|
|
|
|
+ if (iter != inboundPacketAckTrack.end())
|
|
|
{
|
|
{
|
|
|
sequence |= 1 << i;
|
|
sequence |= 1 << i;
|
|
|
- inboundPacketAckTrack.Erase(id);
|
|
|
|
|
|
|
+ inboundPacketAckTrack.erase(id);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1150,18 +1201,44 @@ void UDPMessageConnection::HandleDisconnectAckMessage()
|
|
|
LOG(LogInfo, "Received DisconnectAck message on a MessageConnection not in ConnectionDisconnecting state! (state was %d)",
|
|
LOG(LogInfo, "Received DisconnectAck message on a MessageConnection not in ConnectionDisconnecting state! (state was %d)",
|
|
|
(int)connectionState);
|
|
(int)connectionState);
|
|
|
else
|
|
else
|
|
|
- LOG(LogInfo, "UDPMessageConnection::HandleDisconnectAckMessage: Connection closed to %s.", ToString().CString());
|
|
|
|
|
|
|
+ LOG(LogInfo, "UDPMessageConnection::HandleDisconnectAckMessage: Connection closed to %s.", ToString().c_str());
|
|
|
|
|
|
|
|
connectionState = ConnectionClosed;
|
|
connectionState = ConnectionClosed;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+void UDPMessageConnection::PerformFlowControl()
|
|
|
|
|
+{
|
|
|
|
|
+ AssertInWorkerThreadContext();
|
|
|
|
|
+
|
|
|
|
|
+ /*
|
|
|
|
|
+ // The manual flow control only applies to UDP connections.
|
|
|
|
|
+ if (socket->TransportLayer() == SocketOverTCP)
|
|
|
|
|
+ return;
|
|
|
|
|
+
|
|
|
|
|
+ const float maxAllowedPacketLossRate = 0.f;
|
|
|
|
|
+ if (GetPacketLossRate() > maxAllowedPacketLossRate)
|
|
|
|
|
+ {
|
|
|
|
|
+ float newInboundRate = PacketsInPerSec() * (1.f - GetPacketLossRate());
|
|
|
|
|
+// LOG(LogVerbose, "Packet loss rate: %.2f. Adjusting InRate from %d to %d!", GetPacketLossRate(), datagramInRatePerSecond, (int)newInboundRate);
|
|
|
|
|
+ SetDatagramInFlowRatePerSecond((int)newInboundRate, true);
|
|
|
|
|
+ }
|
|
|
|
|
+ else if (PacketsInPerSec() >= (float)datagramInRatePerSecond / 2)
|
|
|
|
|
+ {
|
|
|
|
|
+ const int flowRateIncr = 50;
|
|
|
|
|
+// LOG(LogVerbose, "Have received %.2f packets in/sec with loss rate of %.2f. Increasing InRate from %d to %d.",
|
|
|
|
|
+// PacketsInPerSec(), GetPacketLossRate(), datagramInRatePerSecond, datagramInRatePerSecond + flowRateIncr);
|
|
|
|
|
+ SetDatagramInFlowRatePerSecond(datagramInRatePerSecond + flowRateIncr, true);
|
|
|
|
|
+ }
|
|
|
|
|
+ */
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
void UDPMessageConnection::ComputePacketLoss()
|
|
void UDPMessageConnection::ComputePacketLoss()
|
|
|
{
|
|
{
|
|
|
AssertInWorkerThreadContext();
|
|
AssertInWorkerThreadContext();
|
|
|
|
|
|
|
|
Lockable<ConnectionStatistics>::LockType cs = statistics.Acquire();
|
|
Lockable<ConnectionStatistics>::LockType cs = statistics.Acquire();
|
|
|
|
|
|
|
|
- if (cs->recvPacketIDs.Size() <= 1)
|
|
|
|
|
|
|
+ if (cs->recvPacketIDs.size() <= 1)
|
|
|
{
|
|
{
|
|
|
packetLossRate = packetLossCount = 0.f;
|
|
packetLossRate = packetLossCount = 0.f;
|
|
|
return;
|
|
return;
|
|
@@ -1172,14 +1249,14 @@ void UDPMessageConnection::ComputePacketLoss()
|
|
|
const tick_t maxTickAge = timeNow - maxEntryAge;
|
|
const tick_t maxTickAge = timeNow - maxEntryAge;
|
|
|
|
|
|
|
|
// Remove old entries.
|
|
// Remove old entries.
|
|
|
- for(size_t i = 0; i < cs->recvPacketIDs.Size(); ++i)
|
|
|
|
|
|
|
+ for(size_t i = 0; i < cs->recvPacketIDs.size(); ++i)
|
|
|
if (Clock::IsNewer(cs->recvPacketIDs[i].tick, maxTickAge))
|
|
if (Clock::IsNewer(cs->recvPacketIDs[i].tick, maxTickAge))
|
|
|
{
|
|
{
|
|
|
- cs->recvPacketIDs.Erase(cs->recvPacketIDs.Begin(), cs->recvPacketIDs.Begin() + i);
|
|
|
|
|
|
|
+ cs->recvPacketIDs.erase(cs->recvPacketIDs.begin(), cs->recvPacketIDs.begin() + i);
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if (cs->recvPacketIDs.Size() <= 1)
|
|
|
|
|
|
|
+ if (cs->recvPacketIDs.size() <= 1)
|
|
|
{
|
|
{
|
|
|
packetLossRate = packetLossCount = 0.f;
|
|
packetLossRate = packetLossCount = 0.f;
|
|
|
return;
|
|
return;
|
|
@@ -1187,34 +1264,56 @@ void UDPMessageConnection::ComputePacketLoss()
|
|
|
|
|
|
|
|
// Find the oldest packet (in terms of messageID)
|
|
// Find the oldest packet (in terms of messageID)
|
|
|
int oldestIndex = 0;
|
|
int oldestIndex = 0;
|
|
|
- for(size_t i = 1; i < cs->recvPacketIDs.Size(); ++i)
|
|
|
|
|
|
|
+ for(size_t i = 1; i < cs->recvPacketIDs.size(); ++i)
|
|
|
if (PacketIDIsNewerThan(cs->recvPacketIDs[oldestIndex].packetID, cs->recvPacketIDs[i].packetID))
|
|
if (PacketIDIsNewerThan(cs->recvPacketIDs[oldestIndex].packetID, cs->recvPacketIDs[i].packetID))
|
|
|
oldestIndex = i;
|
|
oldestIndex = i;
|
|
|
|
|
|
|
|
- Vector<packet_id_t> relIDs;
|
|
|
|
|
- relIDs.Reserve(cs->recvPacketIDs.Size());
|
|
|
|
|
- for(size_t i = 0; i < cs->recvPacketIDs.Size(); ++i)
|
|
|
|
|
- relIDs.Push(SubPacketID(cs->recvPacketIDs[i].packetID, cs->recvPacketIDs[oldestIndex].packetID));
|
|
|
|
|
|
|
+ std::vector<packet_id_t> relIDs;
|
|
|
|
|
+ relIDs.reserve(cs->recvPacketIDs.size());
|
|
|
|
|
+ for(size_t i = 0; i < cs->recvPacketIDs.size(); ++i)
|
|
|
|
|
+ relIDs.push_back(SubPacketID(cs->recvPacketIDs[i].packetID, cs->recvPacketIDs[oldestIndex].packetID));
|
|
|
|
|
|
|
|
- sort::CocktailSort(&relIDs[0], relIDs.Size());
|
|
|
|
|
|
|
+ sort::CocktailSort(&relIDs[0], relIDs.size());
|
|
|
|
|
|
|
|
int numMissedPackets = 0;
|
|
int numMissedPackets = 0;
|
|
|
- for(size_t i = 0; i+1 < cs->recvPacketIDs.Size(); ++i)
|
|
|
|
|
|
|
+ for(size_t i = 0; i+1 < cs->recvPacketIDs.size(); ++i)
|
|
|
{
|
|
{
|
|
|
assert(relIDs[i+1] > relIDs[i]);
|
|
assert(relIDs[i+1] > relIDs[i]);
|
|
|
numMissedPackets += relIDs[i+1] - relIDs[i] - 1;
|
|
numMissedPackets += relIDs[i+1] - relIDs[i] - 1;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- packetLossRate = (float)numMissedPackets / (cs->recvPacketIDs.Size() + numMissedPackets);
|
|
|
|
|
|
|
+ packetLossRate = (float)numMissedPackets / (cs->recvPacketIDs.size() + numMissedPackets);
|
|
|
packetLossCount = (float)numMissedPackets * 1000.f / (float)Clock::TimespanToMillisecondsD(maxTickAge, timeNow);
|
|
packetLossCount = (float)numMissedPackets * 1000.f / (float)Clock::TimespanToMillisecondsD(maxTickAge, timeNow);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-void AppendU16ToVector(Vector<char> &data, unsigned long value)
|
|
|
|
|
|
|
+void AppendU16ToVector(std::vector<char> &data, unsigned long value)
|
|
|
{
|
|
{
|
|
|
- data.Insert(data.End(), (const char *)&value, (const char *)&value + 2);
|
|
|
|
|
|
|
+ data.insert(data.end(), (const char *)&value, (const char *)&value + 2);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void UDPMessageConnection::SetDatagramInFlowRatePerSecond(int newDatagramReceiveRate, bool internalCall)
|
|
|
|
|
+{/*
|
|
|
|
|
+ if (newDatagramReceiveRate == datagramInRatePerSecond) // No need to set it multiple times.
|
|
|
|
|
+ return;
|
|
|
|
|
+
|
|
|
|
|
+ if (newDatagramReceiveRate < 5 || newDatagramReceiveRate > 10 * 1024)
|
|
|
|
|
+ {
|
|
|
|
|
+ LOG(LogError, "Tried to set invalid UDP receive rate %d packets/sec! Ignored.", newDatagramReceiveRate);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ datagramInRatePerSecond = newDatagramReceiveRate;
|
|
|
|
|
+
|
|
|
|
|
+ NetworkMessage *msg = StartNewMessage(MsgIdFlowControlRequest);
|
|
|
|
|
+ AppendU16ToVector(msg->data, newDatagramReceiveRate);
|
|
|
|
|
+ msg->priority = NetworkMessage::cMaxPriority - 1;
|
|
|
|
|
+#ifdef KNET_NETWORK_PROFILING
|
|
|
|
|
+ msg->profilerName = "FlowControlRequest (3)";
|
|
|
|
|
+#endif
|
|
|
|
|
+ EndAndQueueMessage(msg, 2, internalCall);*/
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-bool UDPMessageConnection::HandleMessage(packet_id_t packetID, u32 messageID, const char *data, size_t numBytes)
|
|
|
|
|
|
|
+bool UDPMessageConnection::HandleMessage(packet_id_t packetID, message_id_t messageID, const char *data, size_t numBytes)
|
|
|
{
|
|
{
|
|
|
AssertInWorkerThreadContext();
|
|
AssertInWorkerThreadContext();
|
|
|
|
|
|
|
@@ -1224,6 +1323,9 @@ bool UDPMessageConnection::HandleMessage(packet_id_t packetID, u32 messageID, co
|
|
|
case MsgIdPingReply:
|
|
case MsgIdPingReply:
|
|
|
return false; // We don't do anything with these messages, the MessageConnection base class handles these.
|
|
return false; // We don't do anything with these messages, the MessageConnection base class handles these.
|
|
|
|
|
|
|
|
|
|
+ case MsgIdFlowControlRequest:
|
|
|
|
|
+ HandleFlowControlRequestMessage(data, numBytes);
|
|
|
|
|
+ return true;
|
|
|
case MsgIdPacketAck:
|
|
case MsgIdPacketAck:
|
|
|
HandlePacketAckMessage(data, numBytes);
|
|
HandlePacketAckMessage(data, numBytes);
|
|
|
return true;
|
|
return true;
|
|
@@ -1270,7 +1372,7 @@ void UDPMessageConnection::DumpConnectionStatus() const
|
|
|
smoothedRTT,
|
|
smoothedRTT,
|
|
|
rttVariation,
|
|
rttVariation,
|
|
|
(int)outboundPacketAckTrack.Size(), ///\todo Accessing this variable is not thread-safe.
|
|
(int)outboundPacketAckTrack.Size(), ///\todo Accessing this variable is not thread-safe.
|
|
|
- (int)inboundPacketAckTrack.Size(), ///\todo Accessing this variable is not thread-safe.
|
|
|
|
|
|
|
+ (int)inboundPacketAckTrack.size(), ///\todo Accessing this variable is not thread-safe.
|
|
|
packetLossCount,
|
|
packetLossCount,
|
|
|
packetLossRate,
|
|
packetLossRate,
|
|
|
PacketsInPerSec(),
|
|
PacketsInPerSec(),
|