main.cpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. /**
  2. * libdatachannel client example
  3. * Copyright (c) 2019-2020 Paul-Louis Ageneau
  4. * Copyright (c) 2019 Murat Dogan
  5. * Copyright (c) 2020 Will Munn
  6. * Copyright (c) 2020 Nico Chatzi
  7. * Copyright (c) 2020 Lara Mackey
  8. * Copyright (c) 2020 Erik Cota-Robles
  9. * Copyright (c) 2020 Filip Klembara (in2core)
  10. *
  11. * This Source Code Form is subject to the terms of the Mozilla Public
  12. * License, v. 2.0. If a copy of the MPL was not distributed with this
  13. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  14. */
  15. #include "nlohmann/json.hpp"
  16. #include "h264fileparser.hpp"
  17. #include "opusfileparser.hpp"
  18. #include "helpers.hpp"
  19. #include "ArgParser.hpp"
  20. using namespace rtc;
  21. using namespace std;
  22. using namespace std::chrono_literals;
  23. using json = nlohmann::json;
  24. template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
  25. /// all connected clients
  26. unordered_map<string, shared_ptr<Client>> clients{};
  27. /// Creates peer connection and client representation
  28. /// @param config Configuration
  29. /// @param wws Websocket for signaling
  30. /// @param id Client ID
  31. /// @returns Client
  32. shared_ptr<Client> createPeerConnection(const Configuration &config,
  33. weak_ptr<WebSocket> wws,
  34. string id);
  35. /// Creates stream
  36. /// @param h264Samples Directory with H264 samples
  37. /// @param fps Video FPS
  38. /// @param opusSamples Directory with opus samples
  39. /// @returns Stream object
  40. shared_ptr<Stream> createStream(const string h264Samples, const unsigned fps, const string opusSamples);
  41. /// Add client to stream
  42. /// @param client Client
  43. /// @param adding_video True if adding video
  44. void addToStream(shared_ptr<Client> client, bool isAddingVideo);
  45. /// Start stream
  46. void startStream();
  47. /// Main dispatch queue
  48. DispatchQueue MainThread("Main");
  49. /// Audio and video stream
  50. optional<shared_ptr<Stream>> avStream = nullopt;
  51. const string defaultRootDirectory = "../../../examples/streamer/samples/";
  52. const string defaultH264SamplesDirectory = defaultRootDirectory + "h264/";
  53. string h264SamplesDirectory = defaultH264SamplesDirectory;
  54. const string defaultOpusSamplesDirectory = defaultRootDirectory + "opus/";
  55. string opusSamplesDirectory = defaultOpusSamplesDirectory;
  56. const string defaultIPAddress = "127.0.0.1";
  57. const uint16_t defaultPort = 8000;
  58. string ip_address = defaultIPAddress;
  59. uint16_t port = defaultPort;
  60. /// Incomming message handler for websocket
  61. /// @param message Incommint message
  62. /// @param config Configuration
  63. /// @param ws Websocket
  64. void wsOnMessage(json message, Configuration config, shared_ptr<WebSocket> ws) {
  65. auto it = message.find("id");
  66. if (it == message.end())
  67. return;
  68. string id = it->get<string>();
  69. it = message.find("type");
  70. if (it == message.end())
  71. return;
  72. string type = it->get<string>();
  73. if (type == "request") {
  74. clients.emplace(id, createPeerConnection(config, make_weak_ptr(ws), id));
  75. } else if (type == "answer") {
  76. if (auto jt = clients.find(id); jt != clients.end()) {
  77. auto pc = jt->second->peerConnection;
  78. auto sdp = message["sdp"].get<string>();
  79. auto description = Description(sdp, type);
  80. pc->setRemoteDescription(description);
  81. }
  82. }
  83. }
  84. int main(int argc, char **argv) try {
  85. bool enableDebugLogs = false;
  86. bool printHelp = false;
  87. int c = 0;
  88. auto parser = ArgParser({{"a", "audio"}, {"b", "video"}, {"d", "ip"}, {"p","port"}}, {{"h", "help"}, {"v", "verbose"}});
  89. auto parsingResult = parser.parse(argc, argv, [](string key, string value) {
  90. if (key == "audio") {
  91. opusSamplesDirectory = value + "/";
  92. } else if (key == "video") {
  93. h264SamplesDirectory = value + "/";
  94. } else if (key == "ip") {
  95. ip_address = value;
  96. } else if (key == "port") {
  97. port = atoi(value.data());
  98. } else {
  99. cerr << "Invalid option --" << key << " with value " << value << endl;
  100. return false;
  101. }
  102. return true;
  103. }, [&enableDebugLogs, &printHelp](string flag){
  104. if (flag == "verbose") {
  105. enableDebugLogs = true;
  106. } else if (flag == "help") {
  107. printHelp = true;
  108. } else {
  109. cerr << "Invalid flag --" << flag << endl;
  110. return false;
  111. }
  112. return true;
  113. });
  114. if (!parsingResult) {
  115. return 1;
  116. }
  117. if (printHelp) {
  118. cout << "usage: stream-h264 [-a opus_samples_folder] [-b h264_samples_folder] [-d ip_address] [-p port] [-v] [-h]" << endl
  119. << "Arguments:" << endl
  120. << "\t -a " << "Directory with opus samples (default: " << defaultOpusSamplesDirectory << ")." << endl
  121. << "\t -b " << "Directory with H264 samples (default: " << defaultH264SamplesDirectory << ")." << endl
  122. << "\t -d " << "Signaling server IP address (default: " << defaultIPAddress << ")." << endl
  123. << "\t -p " << "Signaling server port (default: " << defaultPort << ")." << endl
  124. << "\t -v " << "Enable debug logs." << endl
  125. << "\t -h " << "Print this help and exit." << endl;
  126. return 0;
  127. }
  128. if (enableDebugLogs) {
  129. InitLogger(LogLevel::Debug);
  130. }
  131. Configuration config;
  132. string stunServer = "stun:stun.l.google.com:19302";
  133. cout << "STUN server is " << stunServer << endl;
  134. config.iceServers.emplace_back(stunServer);
  135. config.disableAutoNegotiation = true;
  136. string localId = "server";
  137. cout << "The local ID is: " << localId << endl;
  138. auto ws = make_shared<WebSocket>();
  139. ws->onOpen([]() { cout << "WebSocket connected, signaling ready" << endl; });
  140. ws->onClosed([]() { cout << "WebSocket closed" << endl; });
  141. ws->onError([](const string &error) { cout << "WebSocket failed: " << error << endl; });
  142. ws->onMessage([&](variant<binary, string> data) {
  143. if (!holds_alternative<string>(data))
  144. return;
  145. json message = json::parse(get<string>(data));
  146. MainThread.dispatch([message, config, ws]() {
  147. wsOnMessage(message, config, ws);
  148. });
  149. });
  150. const string url = "ws://" + ip_address + ":" + to_string(port) + "/" + localId;
  151. cout << "URL is " << url << endl;
  152. ws->open(url);
  153. cout << "Waiting for signaling to be connected..." << endl;
  154. while (!ws->isOpen()) {
  155. if (ws->isClosed())
  156. return 1;
  157. this_thread::sleep_for(100ms);
  158. }
  159. while (true) {
  160. string id;
  161. cout << "Enter to exit" << endl;
  162. cin >> id;
  163. cin.ignore();
  164. cout << "exiting" << endl;
  165. break;
  166. }
  167. cout << "Cleaning up..." << endl;
  168. return 0;
  169. } catch (const std::exception &e) {
  170. std::cout << "Error: " << e.what() << std::endl;
  171. return -1;
  172. }
  173. shared_ptr<ClientTrackData> addVideo(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
  174. auto video = Description::Video(cname);
  175. video.addH264Codec(payloadType);
  176. video.addSSRC(ssrc, cname, msid, cname);
  177. auto track = pc->addTrack(video);
  178. // create RTP configuration
  179. auto rtpConfig = make_shared<RtpPacketizationConfig>(ssrc, cname, payloadType, H264RtpPacketizer::defaultClockRate);
  180. // create packetizer
  181. auto packetizer = make_shared<H264RtpPacketizer>(NalUnit::Separator::Length, rtpConfig);
  182. // create H264 handler
  183. auto h264Handler = make_shared<H264PacketizationHandler>(packetizer);
  184. // add RTCP SR handler
  185. auto srReporter = make_shared<RtcpSrReporter>(rtpConfig);
  186. h264Handler->addToChain(srReporter);
  187. // add RTCP NACK handler
  188. auto nackResponder = make_shared<RtcpNackResponder>();
  189. h264Handler->addToChain(nackResponder);
  190. // set handler
  191. track->setMediaHandler(h264Handler);
  192. track->onOpen(onOpen);
  193. auto trackData = make_shared<ClientTrackData>(track, srReporter);
  194. return trackData;
  195. }
  196. shared_ptr<ClientTrackData> addAudio(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
  197. auto audio = Description::Audio(cname);
  198. audio.addOpusCodec(payloadType);
  199. audio.addSSRC(ssrc, cname, msid, cname);
  200. auto track = pc->addTrack(audio);
  201. // create RTP configuration
  202. auto rtpConfig = make_shared<RtpPacketizationConfig>(ssrc, cname, payloadType, OpusRtpPacketizer::defaultClockRate);
  203. // create packetizer
  204. auto packetizer = make_shared<OpusRtpPacketizer>(rtpConfig);
  205. // create opus handler
  206. auto opusHandler = make_shared<OpusPacketizationHandler>(packetizer);
  207. // add RTCP SR handler
  208. auto srReporter = make_shared<RtcpSrReporter>(rtpConfig);
  209. opusHandler->addToChain(srReporter);
  210. // add RTCP NACK handler
  211. auto nackResponder = make_shared<RtcpNackResponder>();
  212. opusHandler->addToChain(nackResponder);
  213. // set handler
  214. track->setMediaHandler(opusHandler);
  215. track->onOpen(onOpen);
  216. auto trackData = make_shared<ClientTrackData>(track, srReporter);
  217. return trackData;
  218. }
  219. // Create and setup a PeerConnection
  220. shared_ptr<Client> createPeerConnection(const Configuration &config,
  221. weak_ptr<WebSocket> wws,
  222. string id) {
  223. auto pc = make_shared<PeerConnection>(config);
  224. auto client = make_shared<Client>(pc);
  225. pc->onStateChange([id](PeerConnection::State state) {
  226. cout << "State: " << state << endl;
  227. if (state == PeerConnection::State::Disconnected ||
  228. state == PeerConnection::State::Failed ||
  229. state == PeerConnection::State::Closed) {
  230. // remove disconnected client
  231. MainThread.dispatch([id]() {
  232. clients.erase(id);
  233. });
  234. }
  235. });
  236. pc->onGatheringStateChange(
  237. [wpc = make_weak_ptr(pc), id, wws](PeerConnection::GatheringState state) {
  238. cout << "Gathering State: " << state << endl;
  239. if (state == PeerConnection::GatheringState::Complete) {
  240. if(auto pc = wpc.lock()) {
  241. auto description = pc->localDescription();
  242. json message = {
  243. {"id", id},
  244. {"type", description->typeString()},
  245. {"sdp", string(description.value())}
  246. };
  247. // Gathering complete, send answer
  248. if (auto ws = wws.lock()) {
  249. ws->send(message.dump());
  250. }
  251. }
  252. }
  253. });
  254. client->video = addVideo(pc, 102, 1, "video-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
  255. MainThread.dispatch([wc]() {
  256. if (auto c = wc.lock()) {
  257. addToStream(c, true);
  258. }
  259. });
  260. cout << "Video from " << id << " opened" << endl;
  261. });
  262. client->audio = addAudio(pc, 111, 2, "audio-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
  263. MainThread.dispatch([wc]() {
  264. if (auto c = wc.lock()) {
  265. addToStream(c, false);
  266. }
  267. });
  268. cout << "Audio from " << id << " opened" << endl;
  269. });
  270. auto dc = pc->createDataChannel("ping-pong");
  271. dc->onOpen([id, wdc = make_weak_ptr(dc)]() {
  272. if (auto dc = wdc.lock()) {
  273. dc->send("Ping");
  274. }
  275. });
  276. dc->onMessage(nullptr, [id, wdc = make_weak_ptr(dc)](string msg) {
  277. cout << "Message from " << id << " received: " << msg << endl;
  278. if (auto dc = wdc.lock()) {
  279. dc->send("Ping");
  280. }
  281. });
  282. client->dataChannel = dc;
  283. pc->setLocalDescription();
  284. return client;
  285. };
  286. /// Create stream
  287. shared_ptr<Stream> createStream(const string h264Samples, const unsigned fps, const string opusSamples) {
  288. // video source
  289. auto video = make_shared<H264FileParser>(h264Samples, fps, true);
  290. // audio source
  291. auto audio = make_shared<OPUSFileParser>(opusSamples, true);
  292. auto stream = make_shared<Stream>(video, audio);
  293. // set callback responsible for sample sending
  294. stream->onSample([ws = make_weak_ptr(stream)](Stream::StreamSourceType type, uint64_t sampleTime, rtc::binary sample) {
  295. vector<ClientTrack> tracks{};
  296. string streamType = type == Stream::StreamSourceType::Video ? "video" : "audio";
  297. // get track for given type
  298. function<optional<shared_ptr<ClientTrackData>> (shared_ptr<Client>)> getTrackData = [type](shared_ptr<Client> client) {
  299. return type == Stream::StreamSourceType::Video ? client->video : client->audio;
  300. };
  301. // get all clients with Ready state
  302. for(auto id_client: clients) {
  303. auto id = id_client.first;
  304. auto client = id_client.second;
  305. auto optTrackData = getTrackData(client);
  306. if (client->getState() == Client::State::Ready && optTrackData.has_value()) {
  307. auto trackData = optTrackData.value();
  308. tracks.push_back(ClientTrack(id, trackData));
  309. }
  310. }
  311. if (!tracks.empty()) {
  312. for (auto clientTrack: tracks) {
  313. auto client = clientTrack.id;
  314. auto trackData = clientTrack.trackData;
  315. auto rtpConfig = trackData->sender->rtpConfig;
  316. // sample time is in us, we need to convert it to seconds
  317. auto elapsedSeconds = double(sampleTime) / (1000 * 1000);
  318. // get elapsed time in clock rate
  319. uint32_t elapsedTimestamp = rtpConfig->secondsToTimestamp(elapsedSeconds);
  320. // set new timestamp
  321. rtpConfig->timestamp = rtpConfig->startTimestamp + elapsedTimestamp;
  322. // get elapsed time in clock rate from last RTCP sender report
  323. auto reportElapsedTimestamp = rtpConfig->timestamp - trackData->sender->lastReportedTimestamp();
  324. // check if last report was at least 1 second ago
  325. if (rtpConfig->timestampToSeconds(reportElapsedTimestamp) > 1) {
  326. trackData->sender->setNeedsToReport();
  327. }
  328. cout << "Sending " << streamType << " sample with size: " << to_string(sample.size()) << " to " << client << endl;
  329. try {
  330. // send sample
  331. trackData->track->send(sample);
  332. } catch (const std::exception &e) {
  333. cerr << "Unable to send "<< streamType << " packet: " << e.what() << endl;
  334. }
  335. }
  336. }
  337. MainThread.dispatch([ws]() {
  338. if (clients.empty()) {
  339. // we have no clients, stop the stream
  340. if (auto stream = ws.lock()) {
  341. stream->stop();
  342. }
  343. }
  344. });
  345. });
  346. return stream;
  347. }
  348. /// Start stream
  349. void startStream() {
  350. shared_ptr<Stream> stream;
  351. if (avStream.has_value()) {
  352. stream = avStream.value();
  353. if (stream->isRunning) {
  354. // stream is already running
  355. return;
  356. }
  357. } else {
  358. stream = createStream(h264SamplesDirectory, 30, opusSamplesDirectory);
  359. avStream = stream;
  360. }
  361. stream->start();
  362. }
  363. /// Send previous key frame so browser can show something to user
  364. /// @param stream Stream
  365. /// @param video Video track data
  366. void sendInitialNalus(shared_ptr<Stream> stream, shared_ptr<ClientTrackData> video) {
  367. auto h264 = dynamic_cast<H264FileParser *>(stream->video.get());
  368. auto initialNalus = h264->initialNALUS();
  369. // send previous NALU key frame so users don't have to wait to see stream works
  370. if (!initialNalus.empty()) {
  371. const double frameDuration_s = double(h264->getSampleDuration_us()) / (1000 * 1000);
  372. const uint32_t frameTimestampDuration = video->sender->rtpConfig->secondsToTimestamp(frameDuration_s);
  373. video->sender->rtpConfig->timestamp = video->sender->rtpConfig->startTimestamp - frameTimestampDuration * 2;
  374. video->track->send(initialNalus);
  375. video->sender->rtpConfig->timestamp += frameTimestampDuration;
  376. // Send initial NAL units again to start stream in firefox browser
  377. video->track->send(initialNalus);
  378. }
  379. }
  380. /// Add client to stream
  381. /// @param client Client
  382. /// @param adding_video True if adding video
  383. void addToStream(shared_ptr<Client> client, bool isAddingVideo) {
  384. if (client->getState() == Client::State::Waiting) {
  385. client->setState(isAddingVideo ? Client::State::WaitingForAudio : Client::State::WaitingForVideo);
  386. } else if ((client->getState() == Client::State::WaitingForAudio && !isAddingVideo)
  387. || (client->getState() == Client::State::WaitingForVideo && isAddingVideo)) {
  388. // Audio and video tracks are collected now
  389. assert(client->video.has_value() && client->audio.has_value());
  390. auto video = client->video.value();
  391. if (avStream.has_value()) {
  392. sendInitialNalus(avStream.value(), video);
  393. }
  394. client->setState(Client::State::Ready);
  395. }
  396. if (client->getState() == Client::State::Ready) {
  397. startStream();
  398. }
  399. }