main.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. /*
  2. * libdatachannel client example
  3. * Copyright (c) 2019-2020 Paul-Louis Ageneau
  4. * Copyright (c) 2019 Murat Dogan
  5. * Copyright (c) 2020 Will Munn
  6. * Copyright (c) 2020 Nico Chatzi
  7. * Copyright (c) 2020 Lara Mackey
  8. * Copyright (c) 2020 Erik Cota-Robles
  9. * Copyright (c) 2020 Filip Klembara (in2core)
  10. *
  11. * This program is free software; you can redistribute it and/or
  12. * modify it under the terms of the GNU General Public License
  13. * as published by the Free Software Foundation; either version 2
  14. * of the License, or (at your option) any later version.
  15. *
  16. * This program is distributed in the hope that it will be useful,
  17. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  18. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  19. * GNU General Public License for more details.
  20. *
  21. * You should have received a copy of the GNU General Public License
  22. * along with this program; If not, see <http://www.gnu.org/licenses/>.
  23. */
  24. #include "nlohmann/json.hpp"
  25. #include "h264fileparser.hpp"
  26. #include "opusfileparser.hpp"
  27. #include "helpers.hpp"
  28. using namespace rtc;
  29. using namespace std;
  30. using namespace std::chrono_literals;
  31. using json = nlohmann::json;
  32. template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
  33. /// all connected clients
  34. unordered_map<string, shared_ptr<Client>> clients{};
  35. /// Creates peer connection and client representation
  36. /// @param config Configuration
  37. /// @param wws Websocket for signaling
  38. /// @param id Client ID
  39. /// @returns Client
  40. shared_ptr<Client> createPeerConnection(const Configuration &config,
  41. weak_ptr<WebSocket> wws,
  42. string id);
  43. /// Creates stream
  44. /// @param h264Samples Directory with H264 samples
  45. /// @param fps Video FPS
  46. /// @param opusSamples Directory with opus samples
  47. /// @returns Stream object
  48. shared_ptr<Stream> createStream(const string h264Samples, const unsigned fps, const string opusSamples);
  49. /// Add client to stream
  50. /// @param client Client
  51. /// @param adding_video True if adding video
  52. void addToStream(shared_ptr<Client> client, bool isAddingVideo);
  53. /// Start stream
  54. void startStream();
  55. /// Main dispatch queue
  56. DispatchQueue MainThread("Main");
  57. /// Audio and video stream
  58. optional<shared_ptr<Stream>> avStream = nullopt;
  59. const string defaultRootDirectory = "../../../../examples/streamer/samples/";
  60. const string defaultH264SamplesDirectory = defaultRootDirectory + "h264/";
  61. string h264SamplesDirectory = defaultH264SamplesDirectory;
  62. const string defaultOpusSamplesDirectory = defaultRootDirectory + "opus/";
  63. string opusSamplesDirectory = defaultOpusSamplesDirectory;
  64. const string defaultIPAddress = "127.0.0.1";
  65. const uint16_t defaultPort = 8000;
  66. string ip_address = defaultIPAddress;
  67. uint16_t port = defaultPort;
  68. /// Incomming message handler for websocket
  69. /// @param message Incommint message
  70. /// @param config Configuration
  71. /// @param ws Websocket
  72. void wsOnMessage(json message, Configuration config, shared_ptr<WebSocket> ws) {
  73. auto it = message.find("id");
  74. if (it == message.end())
  75. return;
  76. string id = it->get<string>();
  77. it = message.find("type");
  78. if (it == message.end())
  79. return;
  80. string type = it->get<string>();
  81. if (type == "streamRequest") {
  82. shared_ptr<Client> c = createPeerConnection(config, make_weak_ptr(ws), id);
  83. clients.emplace(id, c);
  84. } else if (type == "answer") {
  85. shared_ptr<Client> c;
  86. if (auto jt = clients.find(id); jt != clients.end()) {
  87. auto pc = clients.at(id)->peerConnection;
  88. auto sdp = message["sdp"].get<string>();
  89. auto description = Description(sdp, type);
  90. pc->setRemoteDescription(description);
  91. }
  92. }
  93. }
  94. int main(int argc, char **argv) try {
  95. bool enableDebugLogs = false;
  96. bool printHelp = false;
  97. int c = 0;
  98. // parse arguments
  99. while ((c = getopt (argc, argv, "a:b:d:p:vh")) != -1) {
  100. switch (c) {
  101. case 'a':
  102. opusSamplesDirectory = string(optarg) + "/";
  103. break;
  104. case 'b':
  105. h264SamplesDirectory = string(optarg) + "/";
  106. break;
  107. case 'd':
  108. ip_address = string(optarg);
  109. break;
  110. case 'p':
  111. port = atoi(optarg);
  112. break;
  113. case 'v':
  114. enableDebugLogs = true;
  115. break;
  116. case 'h':
  117. printHelp = true;
  118. break;
  119. case '?':
  120. if (optopt == 'a' || optopt == 'b' || optopt == 'd' || optopt == 'p') {
  121. string s(1, optopt);
  122. cerr << "Option -" << s <<" requires an argument." << endl;
  123. } else if (isprint(optopt)) {
  124. string s(1, optopt);
  125. cerr << "Unknown option `-" << s << "'." << endl;
  126. } else {
  127. fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
  128. }
  129. return 1;
  130. default:
  131. abort();
  132. }
  133. }
  134. if (printHelp) {
  135. cout << "usage: stream-h264 [-a opus_samples_folder] [-b h264_samples_folder] [-d ip_address] [-p port] [-v] [-h]" << endl
  136. << "Arguments:" << endl
  137. << "\t -a " << "Directory with opus samples (default: " << defaultOpusSamplesDirectory << ")." << endl
  138. << "\t -b " << "Directory with H264 samples (default: " << defaultH264SamplesDirectory << ")." << endl
  139. << "\t -d " << "Signaling server IP address (default: " << defaultIPAddress << ")." << endl
  140. << "\t -p " << "Signaling server port (default: " << defaultPort << ")." << endl
  141. << "\t -v " << "Enable debug logs." << endl
  142. << "\t -h " << "Print this help and exit." << endl;
  143. return 0;
  144. }
  145. if (enableDebugLogs) {
  146. InitLogger(LogLevel::Debug);
  147. }
  148. Configuration config;
  149. string stunServer = "stun:stun.l.google.com:19302";
  150. cout << "Stun server is " << stunServer << endl;
  151. config.iceServers.emplace_back(stunServer);
  152. string localId = "server";
  153. cout << "The local ID is: " << localId << endl;
  154. auto ws = make_shared<WebSocket>();
  155. ws->onOpen([]() { cout << "WebSocket connected, signaling ready" << endl; });
  156. ws->onClosed([]() { cout << "WebSocket closed" << endl; });
  157. ws->onError([](const string &error) { cout << "WebSocket failed: " << error << endl; });
  158. ws->onMessage([&](variant<binary, string> data) {
  159. if (!holds_alternative<string>(data))
  160. return;
  161. json message = json::parse(get<string>(data));
  162. MainThread.dispatch([message, config, ws]() {
  163. wsOnMessage(message, config, ws);
  164. });
  165. });
  166. const string url = "ws://" + ip_address + ":" + to_string(port) + "/" + localId;
  167. cout << "Url is " << url << endl;
  168. ws->open(url);
  169. cout << "Waiting for signaling to be connected..." << endl;
  170. while (!ws->isOpen()) {
  171. if (ws->isClosed())
  172. return 1;
  173. this_thread::sleep_for(100ms);
  174. }
  175. while (true) {
  176. string id;
  177. cout << "Enter to exit" << endl;
  178. cin >> id;
  179. cin.ignore();
  180. cout << "exiting" << endl;
  181. break;
  182. }
  183. cout << "Cleaning up..." << endl;
  184. return 0;
  185. } catch (const std::exception &e) {
  186. std::cout << "Error: " << e.what() << std::endl;
  187. return -1;
  188. }
  189. shared_ptr<ClientTrackData> addVideo(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
  190. auto video = Description::Video(cname);
  191. video.addH264Codec(payloadType);
  192. video.addSSRC(ssrc, cname, msid);
  193. auto track = pc->addTrack(video);
  194. // create RTP configuration
  195. auto rtpConfig = shared_ptr<RTPPacketizationConfig>(new RTPPacketizationConfig(ssrc, cname, payloadType, H264RTPPacketizer::defaultClockRate));
  196. // create packetizer
  197. auto packetizer = shared_ptr<H264RTPPacketizer>(new H264RTPPacketizer(rtpConfig));
  198. // create H264 and RTCP SP handler
  199. shared_ptr<H264PacketizationHandler> h264Handler(new H264PacketizationHandler(H264PacketizationHandler::Separator::Length, packetizer));
  200. // set handler
  201. track->setRtcpHandler(h264Handler);
  202. track->onOpen(onOpen);
  203. auto trackData = make_shared<ClientTrackData>(track, h264Handler);
  204. return trackData;
  205. }
  206. shared_ptr<ClientTrackData> addAudio(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
  207. auto audio = Description::Audio(cname);
  208. audio.addOpusCodec(payloadType);
  209. audio.addSSRC(ssrc, cname, msid);
  210. auto track = pc->addTrack(audio);
  211. // create RTP configuration
  212. auto rtpConfig = shared_ptr<RTPPacketizationConfig>(new RTPPacketizationConfig(ssrc, cname, payloadType, OpusRTPPacketizer::defaultClockRate));
  213. // create packetizer
  214. auto packetizer = make_shared<OpusRTPPacketizer>(rtpConfig);
  215. // create opus and RTCP SP handler
  216. auto opusHandler = make_shared<OpusPacketizationHandler>(packetizer);
  217. // set handler
  218. track->setRtcpHandler(opusHandler);
  219. track->onOpen(onOpen);
  220. auto trackData = make_shared<ClientTrackData>(track, opusHandler);
  221. return trackData;
  222. }
  223. // Create and setup a PeerConnection
  224. shared_ptr<Client> createPeerConnection(const Configuration &config,
  225. weak_ptr<WebSocket> wws,
  226. string id) {
  227. auto pc = make_shared<PeerConnection>(config);
  228. shared_ptr<Client> client(new Client(pc));
  229. pc->onStateChange([id](PeerConnection::State state) {
  230. cout << "State: " << state << endl;
  231. if (state == PeerConnection::State::Disconnected ||
  232. state == PeerConnection::State::Failed ||
  233. state == PeerConnection::State::Closed) {
  234. // remove disconnected client
  235. MainThread.dispatch([id]() {
  236. clients.erase(id);
  237. });
  238. }
  239. });
  240. pc->onGatheringStateChange(
  241. [wpc = make_weak_ptr(pc), id, wws](PeerConnection::GatheringState state) {
  242. cout << "Gathering State: " << state << endl;
  243. if (state == PeerConnection::GatheringState::Complete) {
  244. if(auto pc = wpc.lock()) {
  245. auto description = pc->localDescription();
  246. json message = {
  247. {"id", id},
  248. {"type", description->typeString()},
  249. {"sdp", string(description.value())}
  250. };
  251. // Gathering complete, send answer
  252. if (auto ws = wws.lock()) {
  253. ws->send(message.dump());
  254. }
  255. }
  256. }
  257. });
  258. client->video = addVideo(pc, 102, 1, "video-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
  259. MainThread.dispatch([wc]() {
  260. if (auto c = wc.lock()) {
  261. addToStream(c, true);
  262. }
  263. });
  264. cout << "Video from " << id << " opened" << endl;
  265. });
  266. client->audio = addAudio(pc, 111, 2, "audio-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
  267. MainThread.dispatch([wc]() {
  268. if (auto c = wc.lock()) {
  269. addToStream(c, false);
  270. }
  271. });
  272. cout << "Audio from " << id << " opened" << endl;
  273. });
  274. auto dc = pc->addDataChannel("ping-pong");
  275. dc->onOpen([id, wdc = make_weak_ptr(dc)]() {
  276. if (auto dc = wdc.lock()) {
  277. dc->send("Ping");
  278. }
  279. });
  280. dc->onMessage(nullptr, [id, wdc = make_weak_ptr(dc)](string msg) {
  281. cout << "Message from " << id << " received: " << msg << endl;
  282. if (auto dc = wdc.lock()) {
  283. dc->send("Ping");
  284. }
  285. });
  286. client->dataChannel = dc;
  287. pc->setLocalDescription();
  288. return client;
  289. };
  290. /// Create stream
  291. shared_ptr<Stream> createStream(const string h264Samples, const unsigned fps, const string opusSamples) {
  292. // video source
  293. auto video = make_shared<H264FileParser>(h264Samples, fps, true);
  294. // audio source
  295. auto audio = make_shared<OPUSFileParser>(opusSamples, true);
  296. auto stream = make_shared<Stream>(video, audio);
  297. // set callback responsible for sample sending
  298. stream->onSample([ws = make_weak_ptr(stream)](Stream::StreamSourceType type, uint64_t sampleTime, rtc::binary sample) {
  299. vector<ClientTrack> tracks{};
  300. string streamType = type == Stream::StreamSourceType::Video ? "video" : "audio";
  301. // get track for given type
  302. function<optional<shared_ptr<ClientTrackData>> (shared_ptr<Client>)> getTrackData = [type](shared_ptr<Client> client) {
  303. return type == Stream::StreamSourceType::Video ? client->video : client->audio;
  304. };
  305. // get all clients with Ready state
  306. for(auto id_client: clients) {
  307. auto id = id_client.first;
  308. auto client = id_client.second;
  309. auto optTrackData = getTrackData(client);
  310. if (client->getState() == Client::State::Ready && optTrackData.has_value()) {
  311. auto trackData = optTrackData.value();
  312. tracks.push_back(ClientTrack(id, trackData));
  313. }
  314. }
  315. if (!tracks.empty()) {
  316. auto message = make_message(move(sample));
  317. for (auto clientTrack: tracks) {
  318. auto client = clientTrack.id;
  319. auto trackData = clientTrack.trackData;
  320. // sample time is in us, we need to convert it to seconds
  321. auto elapsedSeconds = double(sampleTime) / (1000 * 1000);
  322. auto rtpConfig = trackData->sender->rtpConfig;
  323. // get elapsed time in clock rate
  324. uint32_t elapsedTimestamp = rtpConfig->secondsToTimestamp(elapsedSeconds);
  325. // set new timestamp
  326. rtpConfig->timestamp = rtpConfig->startTimestamp + elapsedTimestamp;
  327. // get elapsed time in clock rate from last RTCP sender report
  328. auto reportElapsedTimestamp = rtpConfig->timestamp - trackData->sender->previousReportedTimestamp;
  329. // check if last report was at least 1 second ago
  330. if (rtpConfig->timestampToSeconds(reportElapsedTimestamp) > 1) {
  331. trackData->sender->setNeedsToReport();
  332. }
  333. cout << "Sending " << streamType << " sample with size: " << to_string(message->size()) << " to " << client << endl;
  334. bool send = false;
  335. try {
  336. // send sample
  337. send = trackData->track->send(*message);
  338. } catch (...) {
  339. send = false;
  340. }
  341. if (!send) {
  342. cerr << "Unable to send "<< streamType << " packet" << endl;
  343. break;
  344. }
  345. }
  346. }
  347. MainThread.dispatch([ws]() {
  348. if (clients.empty()) {
  349. // we have no clients, stop the stream
  350. if (auto stream = ws.lock()) {
  351. stream->stop();
  352. }
  353. }
  354. });
  355. });
  356. return stream;
  357. }
  358. /// Start stream
  359. void startStream() {
  360. shared_ptr<Stream> stream;
  361. if (avStream.has_value()) {
  362. stream = avStream.value();
  363. if (stream->isRunning) {
  364. // stream is already running
  365. return;
  366. }
  367. } else {
  368. stream = createStream(h264SamplesDirectory, 30, opusSamplesDirectory);
  369. avStream = stream;
  370. }
  371. stream->start();
  372. }
  373. /// Send previous key frame so browser can show something to user
  374. /// @param stream Stream
  375. /// @param video Video track data
  376. void sendInitialNalus(shared_ptr<Stream> stream, shared_ptr<ClientTrackData> video) {
  377. auto h264 = dynamic_cast<H264FileParser *>(stream->video.get());
  378. auto initialNalus = h264->initialNALUS();
  379. // send previous NALU key frame so users don't have to wait to see stream works
  380. if (!initialNalus.empty()) {
  381. const double frameDuration_s = double(h264->sampleDuration_us) / (1000 * 1000);
  382. const uint32_t frameTimestampDuration = video->sender->rtpConfig->secondsToTimestamp(frameDuration_s);
  383. video->sender->rtpConfig->timestamp = video->sender->rtpConfig->startTimestamp - frameTimestampDuration * 2;
  384. video->track->send(initialNalus);
  385. video->sender->rtpConfig->timestamp += frameTimestampDuration;
  386. // Send initial NAL units again to start stream in firefox browser
  387. video->track->send(initialNalus);
  388. }
  389. }
  390. /// Add client to stream
  391. /// @param client Client
  392. /// @param adding_video True if adding video
  393. void addToStream(shared_ptr<Client> client, bool isAddingVideo) {
  394. if (client->getState() == Client::State::Waiting) {
  395. client->setState(isAddingVideo ? Client::State::WaitingForAudio : Client::State::WaitingForVideo);
  396. } else if ((client->getState() == Client::State::WaitingForAudio && !isAddingVideo)
  397. || (client->getState() == Client::State::WaitingForVideo && isAddingVideo)) {
  398. // Audio and video tracks are collected now
  399. assert(client->video.has_value() && client->audio.has_value());
  400. auto video = client->video.value();
  401. auto audio = client->audio.value();
  402. auto currentTime_us = double(currentTimeInMicroSeconds());
  403. auto currentTime_s = currentTime_us / (1000 * 1000);
  404. // set start time of stream
  405. video->sender->rtpConfig->setStartTime(currentTime_s, RTPPacketizationConfig::EpochStart::T1970);
  406. audio->sender->rtpConfig->setStartTime(currentTime_s, RTPPacketizationConfig::EpochStart::T1970);
  407. // start stat recording of RTCP SR
  408. video->sender->startRecording();
  409. audio->sender->startRecording();
  410. if (avStream.has_value()) {
  411. sendInitialNalus(avStream.value(), video);
  412. }
  413. client->setState(Client::State::Ready);
  414. }
  415. if (client->getState() == Client::State::Ready) {
  416. startStream();
  417. }
  418. }