Browse Source

implement dataChannelCount option

Murat Dogan 4 years ago
parent
commit
f1080bfd0f
1 changed files with 93 additions and 65 deletions
  1. 93 65
      examples/client-benchmark/main.cpp

+ 93 - 65
examples/client-benchmark/main.cpp

@@ -31,6 +31,7 @@
 #include <atomic>
 #include <chrono>
 #include <future>
+#include <iomanip>
 #include <iostream>
 #include <memory>
 #include <random>
@@ -61,7 +62,9 @@ string randomId(size_t length);
 // Benchmark
 const size_t messageSize = 65535;
 binary messageData(messageSize);
-atomic<size_t> receivedSize = 0, sentSize = 0;
+unordered_map<string, atomic<size_t>> receivedSizeMap;
+unordered_map<string, atomic<size_t>> sentSizeMap;
+// atomic<size_t> receivedSize = 0, sentSize = 0;
 bool noSend = false;
 
 // Benchmark - enableThroughputSet params
@@ -188,63 +191,67 @@ int main(int argc, char **argv) try {
 	auto pc = createPeerConnection(config, ws, id);
 
 	// We are the offerer, so create a data channel to initiate the process
-	const string label = "benchmark";
-	cout << "Creating DataChannel with label \"" << label << "\"" << endl;
-	auto dc = pc->createDataChannel(label);
+	for (int i = 1; i <= params.dataChannelCount(); i++) {
+		const string label = "DC-" + std::to_string(i);
+		cout << "Creating DataChannel with label \"" << label << "\"" << endl;
+		auto dc = pc->createDataChannel(label);
+		receivedSizeMap.emplace(label, 0);
+		sentSizeMap.emplace(label, 0);
 
-	// Set Buffer Size
-	dc->setBufferedAmountLowThreshold(bufferSize);
+		// Set Buffer Size
+		dc->setBufferedAmountLowThreshold(bufferSize);
 
-	dc->onOpen([id, wdc = make_weak_ptr(dc)]() {
-		cout << "DataChannel from " << id << " open" << endl;
-		if (noSend)
-			return;
+		dc->onOpen([id, wdc = make_weak_ptr(dc), label]() {
+			cout << "DataChannel from " << id << " open" << endl;
+			if (noSend)
+				return;
 
-		if (enableThroughputSet)
-			return;
+			if (enableThroughputSet)
+				return;
 
-		if (auto dcLocked = wdc.lock()) {
-			try {
-				while (dcLocked->bufferedAmount() <= bufferSize) {
-					dcLocked->send(messageData);
-					sentSize += messageData.size();
+			if (auto dcLocked = wdc.lock()) {
+				try {
+					while (dcLocked->bufferedAmount() <= bufferSize) {
+						dcLocked->send(messageData);
+						sentSizeMap[label] += messageData.size();
+					}
+				} catch (const std::exception &e) {
+					std::cout << "Send failed: " << e.what() << std::endl;
 				}
-			} catch (const std::exception &e) {
-				std::cout << "Send failed: " << e.what() << std::endl;
 			}
-		}
-	});
+		});
 
-	dc->onBufferedAmountLow([wdc = make_weak_ptr(dc)]() {
-		if (noSend)
-			return;
+		dc->onBufferedAmountLow([wdc = make_weak_ptr(dc), label]() {
+			if (noSend)
+				return;
 
-		if (enableThroughputSet)
-			return;
+			if (enableThroughputSet)
+				return;
 
-		auto dcLocked = wdc.lock();
-		if (!dcLocked)
-			return;
+			auto dcLocked = wdc.lock();
+			if (!dcLocked)
+				return;
 
-		// Continue sending
-		try {
-			while (dcLocked->isOpen() && dcLocked->bufferedAmount() <= bufferSize) {
-				dcLocked->send(messageData);
-				sentSize += messageData.size();
+			// Continue sending
+			try {
+				while (dcLocked->isOpen() && dcLocked->bufferedAmount() <= bufferSize) {
+					dcLocked->send(messageData);
+					sentSizeMap[label] += messageData.size();
+				}
+			} catch (const std::exception &e) {
+				std::cout << "Send failed: " << e.what() << std::endl;
 			}
-		} catch (const std::exception &e) {
-			std::cout << "Send failed: " << e.what() << std::endl;
-		}
-	});
+		});
 
-	dc->onClosed([id]() { cout << "DataChannel from " << id << " closed" << endl; });
+		dc->onClosed([id]() { cout << "DataChannel from " << id << " closed" << endl; });
 
-	dc->onMessage([id, wdc = make_weak_ptr(dc)](variant<binary, string> data) {
-		if (holds_alternative<binary>(data))
-			receivedSize += get<binary>(data).size();
-	});
+		dc->onMessage([id, wdc = make_weak_ptr(dc), label](variant<binary, string> data) {
+			if (holds_alternative<binary>(data))
+				receivedSizeMap[label] += get<binary>(data).size();
+		});
 
-	dataChannelMap.emplace(id, dc);
+		dataChannelMap.emplace(label, dc);
+	}
 
 	const int duration = params.durationInSec() > 0 ? params.durationInSec() : INT32_MAX;
 	cout << "Benchmark will run for " << duration << " seconds" << endl;
@@ -271,25 +278,39 @@ int main(int argc, char **argv) try {
 			binary tempMessageData(byteToSendThisLoop);
 			fill(tempMessageData.begin(), tempMessageData.end(), std::byte(0xFF));
 
-			if (dc->isOpen() && dc->bufferedAmount() <= bufferSize * byteToSendOnEveryLoop) {
-				dc->send(tempMessageData);
-				sentSize += tempMessageData.size();
+			for (const auto &[label, dc] : dataChannelMap) {
+				if (dc->isOpen() && dc->bufferedAmount() <= bufferSize * byteToSendOnEveryLoop) {
+					dc->send(tempMessageData);
+					sentSizeMap[label] += tempMessageData.size();
+				}
 			}
 		}
 
 		if (printCounter >= STEP_COUNT_FOR_1_SEC) {
-			unsigned long _receivedSize = receivedSize.exchange(0);
-			unsigned long _sentSize = sentSize.exchange(0);
 			const double elapsedTimeInSecs =
 			    std::chrono::duration<double>(steady_clock::now() - printTime).count();
 			printTime = steady_clock::now();
 
-			cout << "#" << i / STEP_COUNT_FOR_1_SEC
-			     << " Received: " << static_cast<int>(_receivedSize / (elapsedTimeInSecs * 1000))
-			     << " KB/s"
-			     << "   Sent: " << static_cast<int>(_sentSize / (elapsedTimeInSecs * 1000))
-			     << " KB/s"
-			     << "   BufferSize: " << dc->bufferedAmount() << endl;
+			unsigned long receiveSpeedTotal = 0;
+			unsigned long sendSpeedTotal = 0;
+			cout << "#" << i / STEP_COUNT_FOR_1_SEC << endl;
+			for (const auto &[label, dc] : dataChannelMap) {
+				unsigned long channelReceiveSpeed = static_cast<int>(
+				    receivedSizeMap[label].exchange(0) / (elapsedTimeInSecs * 1000));
+				unsigned long channelSendSpeed =
+				    static_cast<int>(sentSizeMap[label].exchange(0) / (elapsedTimeInSecs * 1000));
+
+				cout << std::setw(10) << label << " Received: " << channelReceiveSpeed << " KB/s"
+				     << "   Sent: " << channelSendSpeed << " KB/s"
+				     << "   BufferSize: " << dc->bufferedAmount() << endl;
+
+				receiveSpeedTotal += channelReceiveSpeed;
+				sendSpeedTotal += channelSendSpeed;
+			}
+			cout << std::setw(10) << "TOTL"
+			     << " Received: " << receiveSpeedTotal << " KB/s"
+			     << "   Sent: " << sendSpeedTotal << " KB/s" << endl;
+
 			printStatCounter++;
 			printCounter = 0;
 		}
@@ -308,12 +329,16 @@ int main(int argc, char **argv) try {
 
 	dataChannelMap.clear();
 	peerConnectionMap.clear();
+	receivedSizeMap.clear();
+	sentSizeMap.clear();
 	return 0;
 
 } catch (const std::exception &e) {
 	std::cout << "Error: " << e.what() << std::endl;
 	dataChannelMap.clear();
 	peerConnectionMap.clear();
+	receivedSizeMap.clear();
+	sentSizeMap.clear();
 	return -1;
 }
 
@@ -346,13 +371,16 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
 	});
 
 	pc->onDataChannel([id](shared_ptr<DataChannel> dc) {
-		cout << "DataChannel from " << id << " received with label \"" << dc->label() << "\""
-		     << endl;
+		const string label = dc->label();
+		cout << "DataChannel from " << id << " received with label \"" << label << "\"" << endl;
 
 		cout << "###########################################" << endl;
 		cout << "### Check other peer's screen for stats ###" << endl;
 		cout << "###########################################" << endl;
 
+		receivedSizeMap.emplace(dc->label(), 0);
+		sentSizeMap.emplace(dc->label(), 0);
+
 		// Set Buffer Size
 		dc->setBufferedAmountLowThreshold(bufferSize);
 
@@ -360,7 +388,7 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
 			try {
 				while (dc->bufferedAmount() <= bufferSize) {
 					dc->send(messageData);
-					sentSize += messageData.size();
+					sentSizeMap[label] += messageData.size();
 				}
 			} catch (const std::exception &e) {
 				std::cout << "Send failed: " << e.what() << std::endl;
@@ -370,7 +398,7 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
 		if (!noSend && enableThroughputSet) {
 			// Create Send Data Thread
 			// Thread will join when data channel destroyed or closed
-			std::thread([wdc = make_weak_ptr(dc)]() {
+			std::thread([wdc = make_weak_ptr(dc), label]() {
 				steady_clock::time_point stepTime = steady_clock::now();
 				// Byte count to send for every loop
 				int byteToSendOnEveryLoop = throughtputSetAsKB * stepDurationInMs;
@@ -399,7 +427,7 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
 
 						if (dcLocked->bufferedAmount() <= bufferSize) {
 							dcLocked->send(tempMessageData);
-							sentSize += tempMessageData.size();
+							sentSizeMap[label] += tempMessageData.size();
 						}
 					} catch (const std::exception &e) {
 						std::cout << "Send failed: " << e.what() << std::endl;
@@ -409,7 +437,7 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
 			}).detach();
 		}
 
-		dc->onBufferedAmountLow([wdc = make_weak_ptr(dc)]() {
+		dc->onBufferedAmountLow([wdc = make_weak_ptr(dc), label]() {
 			if (noSend)
 				return;
 
@@ -424,7 +452,7 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
 			try {
 				while (dcLocked->isOpen() && dcLocked->bufferedAmount() <= bufferSize) {
 					dcLocked->send(messageData);
-					sentSize += messageData.size();
+					sentSizeMap[label] += messageData.size();
 				}
 			} catch (const std::exception &e) {
 				std::cout << "Send failed: " << e.what() << std::endl;
@@ -433,12 +461,12 @@ shared_ptr<PeerConnection> createPeerConnection(const Configuration &config,
 
 		dc->onClosed([id]() { cout << "DataChannel from " << id << " closed" << endl; });
 
-		dc->onMessage([id, wdc = make_weak_ptr(dc)](variant<binary, string> data) {
+		dc->onMessage([id, wdc = make_weak_ptr(dc), label](variant<binary, string> data) {
 			if (holds_alternative<binary>(data))
-				receivedSize += get<binary>(data).size();
+				receivedSizeMap[label] += get<binary>(data).size();
 		});
 
-		dataChannelMap.emplace(id, dc);
+		dataChannelMap.emplace(label, dc);
 	});
 
 	peerConnectionMap.emplace(id, pc);