main.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491
  1. /*
  2. * libdatachannel client-benchmark example
  3. * Copyright (c) 2019-2020 Paul-Louis Ageneau
  4. * Copyright (c) 2019-2021 Murat Dogan
  5. * Copyright (c) 2020 Will Munn
  6. * Copyright (c) 2020 Nico Chatzi
  7. * Copyright (c) 2020 Lara Mackey
  8. * Copyright (c) 2020 Erik Cota-Robles
  9. *
  10. * This program is free software; you can redistribute it and/or
  11. * modify it under the terms of the GNU General Public License
  12. * as published by the Free Software Foundation; either version 2
  13. * of the License, or (at your option) any later version.
  14. *
  15. * This program is distributed in the hope that it will be useful,
  16. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  17. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  18. * GNU General Public License for more details.
  19. *
  20. * You should have received a copy of the GNU General Public License
  21. * along with this program; If not, see <http://www.gnu.org/licenses/>.
  22. */
  23. #include "rtc/rtc.hpp"
  24. #include "parse_cl.h"
  25. #include <nlohmann/json.hpp>
  26. #include <algorithm>
  27. #include <atomic>
  28. #include <chrono>
  29. #include <future>
  30. #include <iomanip>
  31. #include <iostream>
  32. #include <memory>
  33. #include <random>
  34. #include <stdexcept>
  35. #include <thread>
  36. #include <unordered_map>
  37. using namespace std::chrono_literals;
  38. using std::shared_ptr;
  39. using std::weak_ptr;
  40. using std::chrono::milliseconds;
  41. using std::chrono::steady_clock;
  42. template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
  43. using nlohmann::json;
  44. std::string localId;
  45. std::unordered_map<std::string, shared_ptr<rtc::PeerConnection>> peerConnectionMap;
  46. std::unordered_map<std::string, shared_ptr<rtc::DataChannel>> dataChannelMap;
  47. shared_ptr<rtc::PeerConnection> createPeerConnection(const rtc::Configuration &config,
  48. weak_ptr<rtc::WebSocket> wws, std::string id);
  49. std::string randomId(size_t length);
  50. // Benchmark
  51. const size_t messageSize = 65535;
  52. rtc::binary messageData(messageSize);
  53. std::unordered_map<std::string, std::atomic<size_t>> receivedSizeMap;
  54. std::unordered_map<std::string, std::atomic<size_t>> sentSizeMap;
  55. bool noSend = false;
  56. // Benchmark - enableThroughputSet params
  57. bool enableThroughputSet;
  58. int throughtputSetAsKB;
  59. int bufferSize;
  60. const float STEP_COUNT_FOR_1_SEC = 100.0;
  61. const int stepDurationInMs = int(1000 / STEP_COUNT_FOR_1_SEC);
  62. int main(int argc, char **argv) try {
  63. Cmdline params(argc, argv);
  64. rtc::InitLogger(rtc::LogLevel::Info);
  65. // Benchmark - construct message to send
  66. fill(messageData.begin(), messageData.end(), std::byte(0xFF));
  67. // Benchmark - enableThroughputSet params
  68. enableThroughputSet = params.enableThroughputSet();
  69. throughtputSetAsKB = params.throughtputSetAsKB();
  70. bufferSize = params.bufferSize();
  71. // No Send option
  72. noSend = params.noSend();
  73. if (noSend)
  74. std::cout << "Not sending data (one way benchmark)." << std::endl;
  75. rtc::Configuration config;
  76. std::string stunServer = "";
  77. if (params.noStun()) {
  78. std::cout
  79. << "No STUN server is configured. Only local hosts and public IP addresses supported."
  80. << std::endl;
  81. } else {
  82. if (params.stunServer().substr(0, 5).compare("stun:") != 0) {
  83. stunServer = "stun:";
  84. }
  85. stunServer += params.stunServer() + ":" + std::to_string(params.stunPort());
  86. std::cout << "STUN server is " << stunServer << std::endl;
  87. config.iceServers.emplace_back(stunServer);
  88. }
  89. localId = randomId(4);
  90. std::cout << "The local ID is " << localId << std::endl;
  91. auto ws = std::make_shared<rtc::WebSocket>();
  92. std::promise<void> wsPromise;
  93. auto wsFuture = wsPromise.get_future();
  94. ws->onOpen([&wsPromise]() {
  95. std::cout << "WebSocket connected, signaling ready" << std::endl;
  96. wsPromise.set_value();
  97. });
  98. ws->onError([&wsPromise](std::string s) {
  99. std::cout << "WebSocket error" << std::endl;
  100. wsPromise.set_exception(std::make_exception_ptr(std::runtime_error(s)));
  101. });
  102. ws->onClosed([]() { std::cout << "WebSocket closed" << std::endl; });
  103. ws->onMessage([&config, wws = make_weak_ptr(ws)](auto data) {
  104. if (!std::holds_alternative<std::string>(data))
  105. return;
  106. json message = json::parse(std::get<std::string>(data));
  107. auto it = message.find("id");
  108. if (it == message.end())
  109. return;
  110. auto id = it->get<std::string>();
  111. it = message.find("type");
  112. if (it == message.end())
  113. return;
  114. auto type = it->get<std::string>();
  115. shared_ptr<rtc::PeerConnection> pc;
  116. if (auto jt = peerConnectionMap.find(id); jt != peerConnectionMap.end()) {
  117. pc = jt->second;
  118. } else if (type == "offer") {
  119. std::cout << "Answering to " + id << std::endl;
  120. pc = createPeerConnection(config, wws, id);
  121. } else {
  122. return;
  123. }
  124. if (type == "offer" || type == "answer") {
  125. auto sdp = message["description"].get<std::string>();
  126. pc->setRemoteDescription(rtc::Description(sdp, type));
  127. } else if (type == "candidate") {
  128. auto sdp = message["candidate"].get<std::string>();
  129. auto mid = message["mid"].get<std::string>();
  130. pc->addRemoteCandidate(rtc::Candidate(sdp, mid));
  131. }
  132. });
  133. const std::string wsPrefix =
  134. params.webSocketServer().find("://") == std::string::npos ? "ws://" : "";
  135. const std::string url = wsPrefix + params.webSocketServer() + ":" +
  136. std::to_string(params.webSocketPort()) + "/" + localId;
  137. std::cout << "WebSocket URL is " << url << std::endl;
  138. ws->open(url);
  139. std::cout << "Waiting for signaling to be connected..." << std::endl;
  140. wsFuture.get();
  141. std::string id;
  142. std::cout << "Enter a remote ID to send an offer:" << std::endl;
  143. std::cin >> id;
  144. std::cin.ignore();
  145. if (id.empty()) {
  146. // Nothing to do
  147. return 0;
  148. }
  149. if (id == localId) {
  150. std::cout << "Invalid remote ID (This is the local ID). Exiting..." << std::endl;
  151. return 0;
  152. }
  153. std::cout << "Offering to " + id << std::endl;
  154. auto pc = createPeerConnection(config, ws, id);
  155. // We are the offerer, so create a data channel to initiate the process
  156. for (int i = 1; i <= params.dataChannelCount(); i++) {
  157. const std::string label = "DC-" + std::to_string(i);
  158. std::cout << "Creating DataChannel with label \"" << label << "\"" << std::endl;
  159. auto dc = pc->createDataChannel(label);
  160. receivedSizeMap.emplace(label, 0);
  161. sentSizeMap.emplace(label, 0);
  162. // Set Buffer Size
  163. dc->setBufferedAmountLowThreshold(bufferSize);
  164. dc->onOpen([id, wdc = make_weak_ptr(dc), label]() {
  165. std::cout << "DataChannel from " << id << " open" << std::endl;
  166. if (noSend)
  167. return;
  168. if (enableThroughputSet)
  169. return;
  170. if (auto dcLocked = wdc.lock()) {
  171. try {
  172. while (dcLocked->bufferedAmount() <= bufferSize) {
  173. dcLocked->send(messageData);
  174. sentSizeMap.at(label) += messageData.size();
  175. }
  176. } catch (const std::exception &e) {
  177. std::cout << "Send failed: " << e.what() << std::endl;
  178. }
  179. }
  180. });
  181. dc->onBufferedAmountLow([wdc = make_weak_ptr(dc), label]() {
  182. if (noSend)
  183. return;
  184. if (enableThroughputSet)
  185. return;
  186. auto dcLocked = wdc.lock();
  187. if (!dcLocked)
  188. return;
  189. // Continue sending
  190. try {
  191. while (dcLocked->isOpen() && dcLocked->bufferedAmount() <= bufferSize) {
  192. dcLocked->send(messageData);
  193. sentSizeMap.at(label) += messageData.size();
  194. }
  195. } catch (const std::exception &e) {
  196. std::cout << "Send failed: " << e.what() << std::endl;
  197. }
  198. });
  199. dc->onClosed([id]() { std::cout << "DataChannel from " << id << " closed" << std::endl; });
  200. dc->onMessage([id, wdc = make_weak_ptr(dc), label](auto data) {
  201. if (std::holds_alternative<rtc::binary>(data))
  202. receivedSizeMap.at(label) += std::get<rtc::binary>(data).size();
  203. });
  204. dataChannelMap.emplace(label, dc);
  205. }
  206. const int duration = params.durationInSec() > 0 ? params.durationInSec() : INT32_MAX;
  207. std::cout << "Benchmark will run for " << duration << " seconds" << std::endl;
  208. int printCounter = 0;
  209. int printStatCounter = 0;
  210. steady_clock::time_point printTime = steady_clock::now();
  211. steady_clock::time_point stepTime = steady_clock::now();
  212. // Byte count to send for every loop
  213. int byteToSendOnEveryLoop = throughtputSetAsKB * stepDurationInMs;
  214. for (int i = 1; i <= duration * STEP_COUNT_FOR_1_SEC; ++i) {
  215. std::this_thread::sleep_for(milliseconds(stepDurationInMs));
  216. printCounter++;
  217. if (enableThroughputSet) {
  218. const double elapsedTimeInSecs =
  219. std::chrono::duration<double>(steady_clock::now() - stepTime).count();
  220. stepTime = steady_clock::now();
  221. int byteToSendThisLoop = static_cast<int>(
  222. byteToSendOnEveryLoop * ((elapsedTimeInSecs * 1000.0) / stepDurationInMs));
  223. rtc::binary tempMessageData(byteToSendThisLoop);
  224. fill(tempMessageData.begin(), tempMessageData.end(), std::byte(0xFF));
  225. for (const auto &[label, dc] : dataChannelMap) {
  226. if (dc->isOpen() && dc->bufferedAmount() <= bufferSize * byteToSendOnEveryLoop) {
  227. dc->send(tempMessageData);
  228. sentSizeMap.at(label) += tempMessageData.size();
  229. }
  230. }
  231. }
  232. if (printCounter >= STEP_COUNT_FOR_1_SEC) {
  233. const double elapsedTimeInSecs =
  234. std::chrono::duration<double>(steady_clock::now() - printTime).count();
  235. printTime = steady_clock::now();
  236. unsigned long receiveSpeedTotal = 0;
  237. unsigned long sendSpeedTotal = 0;
  238. std::cout << "#" << i / STEP_COUNT_FOR_1_SEC << std::endl;
  239. for (const auto &[label, dc] : dataChannelMap) {
  240. unsigned long channelReceiveSpeed = static_cast<int>(
  241. receivedSizeMap[label].exchange(0) / (elapsedTimeInSecs * 1000));
  242. unsigned long channelSendSpeed =
  243. static_cast<int>(sentSizeMap[label].exchange(0) / (elapsedTimeInSecs * 1000));
  244. std::cout << std::setw(10) << label << " Received: " << channelReceiveSpeed
  245. << " KB/s"
  246. << " Sent: " << channelSendSpeed << " KB/s"
  247. << " BufferSize: " << dc->bufferedAmount() << std::endl;
  248. receiveSpeedTotal += channelReceiveSpeed;
  249. sendSpeedTotal += channelSendSpeed;
  250. }
  251. std::cout << std::setw(10) << "TOTAL"
  252. << " Received: " << receiveSpeedTotal << " KB/s"
  253. << " Sent: " << sendSpeedTotal << " KB/s" << std::endl;
  254. printStatCounter++;
  255. printCounter = 0;
  256. }
  257. if (printStatCounter >= 5) {
  258. std::cout << "Stats# "
  259. << "Received Total: " << pc->bytesReceived() / (1000 * 1000) << " MB"
  260. << " Sent Total: " << pc->bytesSent() / (1000 * 1000) << " MB"
  261. << " RTT: " << pc->rtt().value_or(0ms).count() << " ms" << std::endl;
  262. std::cout << std::endl;
  263. printStatCounter = 0;
  264. }
  265. }
  266. std::cout << "Cleaning up..." << std::endl;
  267. dataChannelMap.clear();
  268. peerConnectionMap.clear();
  269. receivedSizeMap.clear();
  270. sentSizeMap.clear();
  271. return 0;
  272. } catch (const std::exception &e) {
  273. std::cout << "Error: " << e.what() << std::endl;
  274. dataChannelMap.clear();
  275. peerConnectionMap.clear();
  276. receivedSizeMap.clear();
  277. sentSizeMap.clear();
  278. return -1;
  279. }
  280. // Create and setup a PeerConnection
  281. shared_ptr<rtc::PeerConnection> createPeerConnection(const rtc::Configuration &config,
  282. weak_ptr<rtc::WebSocket> wws, std::string id) {
  283. auto pc = std::make_shared<rtc::PeerConnection>(config);
  284. pc->onStateChange(
  285. [](rtc::PeerConnection::State state) { std::cout << "State: " << state << std::endl; });
  286. pc->onGatheringStateChange([](rtc::PeerConnection::GatheringState state) {
  287. std::cout << "Gathering State: " << state << std::endl;
  288. });
  289. pc->onLocalDescription([wws, id](rtc::Description description) {
  290. json message = {{"id", id},
  291. {"type", description.typeString()},
  292. {"description", std::string(description)}};
  293. if (auto ws = wws.lock())
  294. ws->send(message.dump());
  295. });
  296. pc->onLocalCandidate([wws, id](rtc::Candidate candidate) {
  297. json message = {{"id", id},
  298. {"type", "candidate"},
  299. {"candidate", std::string(candidate)},
  300. {"mid", candidate.mid()}};
  301. if (auto ws = wws.lock())
  302. ws->send(message.dump());
  303. });
  304. pc->onDataChannel([id](shared_ptr<rtc::DataChannel> dc) {
  305. const std::string label = dc->label();
  306. std::cout << "DataChannel from " << id << " received with label \"" << label << "\""
  307. << std::endl;
  308. std::cout << "###########################################" << std::endl;
  309. std::cout << "### Check other peer's screen for stats ###" << std::endl;
  310. std::cout << "###########################################" << std::endl;
  311. receivedSizeMap.emplace(dc->label(), 0);
  312. sentSizeMap.emplace(dc->label(), 0);
  313. // Set Buffer Size
  314. dc->setBufferedAmountLowThreshold(bufferSize);
  315. if (!noSend && !enableThroughputSet) {
  316. try {
  317. while (dc->bufferedAmount() <= bufferSize) {
  318. dc->send(messageData);
  319. sentSizeMap.at(label) += messageData.size();
  320. }
  321. } catch (const std::exception &e) {
  322. std::cout << "Send failed: " << e.what() << std::endl;
  323. }
  324. }
  325. if (!noSend && enableThroughputSet) {
  326. // Create Send Data Thread
  327. // Thread will join when data channel destroyed or closed
  328. std::thread([wdc = make_weak_ptr(dc), label]() {
  329. steady_clock::time_point stepTime = steady_clock::now();
  330. // Byte count to send for every loop
  331. int byteToSendOnEveryLoop = throughtputSetAsKB * stepDurationInMs;
  332. while (true) {
  333. std::this_thread::sleep_for(milliseconds(stepDurationInMs));
  334. auto dcLocked = wdc.lock();
  335. if (!dcLocked)
  336. break;
  337. if (!dcLocked->isOpen())
  338. break;
  339. try {
  340. const double elapsedTimeInSecs =
  341. std::chrono::duration<double>(steady_clock::now() - stepTime).count();
  342. stepTime = steady_clock::now();
  343. int byteToSendThisLoop =
  344. static_cast<int>(byteToSendOnEveryLoop *
  345. ((elapsedTimeInSecs * 1000.0) / stepDurationInMs));
  346. rtc::binary tempMessageData(byteToSendThisLoop);
  347. fill(tempMessageData.begin(), tempMessageData.end(), std::byte(0xFF));
  348. if (dcLocked->bufferedAmount() <= bufferSize) {
  349. dcLocked->send(tempMessageData);
  350. sentSizeMap.at(label) += tempMessageData.size();
  351. }
  352. } catch (const std::exception &e) {
  353. std::cout << "Send failed: " << e.what() << std::endl;
  354. }
  355. }
  356. std::cout << "Send Data Thread exiting..." << std::endl;
  357. }).detach();
  358. }
  359. dc->onBufferedAmountLow([wdc = make_weak_ptr(dc), label]() {
  360. if (noSend)
  361. return;
  362. if (enableThroughputSet)
  363. return;
  364. auto dcLocked = wdc.lock();
  365. if (!dcLocked)
  366. return;
  367. // Continue sending
  368. try {
  369. while (dcLocked->isOpen() && dcLocked->bufferedAmount() <= bufferSize) {
  370. dcLocked->send(messageData);
  371. sentSizeMap.at(label) += messageData.size();
  372. }
  373. } catch (const std::exception &e) {
  374. std::cout << "Send failed: " << e.what() << std::endl;
  375. }
  376. });
  377. dc->onClosed([id]() { std::cout << "DataChannel from " << id << " closed" << std::endl; });
  378. dc->onMessage([id, wdc = make_weak_ptr(dc), label](auto data) {
  379. if (std::holds_alternative<rtc::binary>(data))
  380. receivedSizeMap.at(label) += std::get<rtc::binary>(data).size();
  381. });
  382. dataChannelMap.emplace(label, dc);
  383. });
  384. peerConnectionMap.emplace(id, pc);
  385. return pc;
  386. };
  387. // Helper function to generate a random ID
  388. std::string randomId(size_t length) {
  389. using std::chrono::system_clock;
  390. static thread_local std::mt19937 rng(
  391. static_cast<unsigned int>(system_clock::now().time_since_epoch().count()));
  392. static const std::string characters(
  393. "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz");
  394. std::string id(length, '0');
  395. std::uniform_int_distribution<int> uniform(0, int(characters.size() - 1));
  396. std::generate(id.begin(), id.end(), [&]() { return characters.at(uniform(rng)); });
  397. return id;
  398. }