123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448 |
- /**
- * libdatachannel client example
- * Copyright (c) 2019-2020 Paul-Louis Ageneau
- * Copyright (c) 2019 Murat Dogan
- * Copyright (c) 2020 Will Munn
- * Copyright (c) 2020 Nico Chatzi
- * Copyright (c) 2020 Lara Mackey
- * Copyright (c) 2020 Erik Cota-Robles
- * Copyright (c) 2020 Filip Klembara (in2core)
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at https://mozilla.org/MPL/2.0/.
- */
- #include "nlohmann/json.hpp"
- #include "h264fileparser.hpp"
- #include "opusfileparser.hpp"
- #include "helpers.hpp"
- #include "ArgParser.hpp"
- using namespace rtc;
- using namespace std;
- using namespace std::chrono_literals;
- using json = nlohmann::json;
- template <class T> weak_ptr<T> make_weak_ptr(shared_ptr<T> ptr) { return ptr; }
- /// all connected clients
- unordered_map<string, shared_ptr<Client>> clients{};
- /// Creates peer connection and client representation
- /// @param config Configuration
- /// @param wws Websocket for signaling
- /// @param id Client ID
- /// @returns Client
- shared_ptr<Client> createPeerConnection(const Configuration &config,
- weak_ptr<WebSocket> wws,
- string id);
- /// Creates stream
- /// @param h264Samples Directory with H264 samples
- /// @param fps Video FPS
- /// @param opusSamples Directory with opus samples
- /// @returns Stream object
- shared_ptr<Stream> createStream(const string h264Samples, const unsigned fps, const string opusSamples);
- /// Add client to stream
- /// @param client Client
- /// @param adding_video True if adding video
- void addToStream(shared_ptr<Client> client, bool isAddingVideo);
- /// Start stream
- void startStream();
- /// Main dispatch queue
- DispatchQueue MainThread("Main");
- /// Audio and video stream
- optional<shared_ptr<Stream>> avStream = nullopt;
- const string defaultRootDirectory = "../../../examples/streamer/samples/";
- const string defaultH264SamplesDirectory = defaultRootDirectory + "h264/";
- string h264SamplesDirectory = defaultH264SamplesDirectory;
- const string defaultOpusSamplesDirectory = defaultRootDirectory + "opus/";
- string opusSamplesDirectory = defaultOpusSamplesDirectory;
- const string defaultIPAddress = "127.0.0.1";
- const uint16_t defaultPort = 8000;
- string ip_address = defaultIPAddress;
- uint16_t port = defaultPort;
- /// Incomming message handler for websocket
- /// @param message Incommint message
- /// @param config Configuration
- /// @param ws Websocket
- void wsOnMessage(json message, Configuration config, shared_ptr<WebSocket> ws) {
- auto it = message.find("id");
- if (it == message.end())
- return;
- string id = it->get<string>();
- it = message.find("type");
- if (it == message.end())
- return;
- string type = it->get<string>();
- if (type == "request") {
- clients.emplace(id, createPeerConnection(config, make_weak_ptr(ws), id));
- } else if (type == "answer") {
- if (auto jt = clients.find(id); jt != clients.end()) {
- auto pc = jt->second->peerConnection;
- auto sdp = message["sdp"].get<string>();
- auto description = Description(sdp, type);
- pc->setRemoteDescription(description);
- }
- }
- }
- int main(int argc, char **argv) try {
- bool enableDebugLogs = false;
- bool printHelp = false;
- int c = 0;
- auto parser = ArgParser({{"a", "audio"}, {"b", "video"}, {"d", "ip"}, {"p","port"}}, {{"h", "help"}, {"v", "verbose"}});
- auto parsingResult = parser.parse(argc, argv, [](string key, string value) {
- if (key == "audio") {
- opusSamplesDirectory = value + "/";
- } else if (key == "video") {
- h264SamplesDirectory = value + "/";
- } else if (key == "ip") {
- ip_address = value;
- } else if (key == "port") {
- port = atoi(value.data());
- } else {
- cerr << "Invalid option --" << key << " with value " << value << endl;
- return false;
- }
- return true;
- }, [&enableDebugLogs, &printHelp](string flag){
- if (flag == "verbose") {
- enableDebugLogs = true;
- } else if (flag == "help") {
- printHelp = true;
- } else {
- cerr << "Invalid flag --" << flag << endl;
- return false;
- }
- return true;
- });
- if (!parsingResult) {
- return 1;
- }
- if (printHelp) {
- cout << "usage: stream-h264 [-a opus_samples_folder] [-b h264_samples_folder] [-d ip_address] [-p port] [-v] [-h]" << endl
- << "Arguments:" << endl
- << "\t -a " << "Directory with opus samples (default: " << defaultOpusSamplesDirectory << ")." << endl
- << "\t -b " << "Directory with H264 samples (default: " << defaultH264SamplesDirectory << ")." << endl
- << "\t -d " << "Signaling server IP address (default: " << defaultIPAddress << ")." << endl
- << "\t -p " << "Signaling server port (default: " << defaultPort << ")." << endl
- << "\t -v " << "Enable debug logs." << endl
- << "\t -h " << "Print this help and exit." << endl;
- return 0;
- }
- if (enableDebugLogs) {
- InitLogger(LogLevel::Debug);
- }
- Configuration config;
- string stunServer = "stun:stun.l.google.com:19302";
- cout << "STUN server is " << stunServer << endl;
- config.iceServers.emplace_back(stunServer);
- config.disableAutoNegotiation = true;
- string localId = "server";
- cout << "The local ID is: " << localId << endl;
- auto ws = make_shared<WebSocket>();
- ws->onOpen([]() { cout << "WebSocket connected, signaling ready" << endl; });
- ws->onClosed([]() { cout << "WebSocket closed" << endl; });
- ws->onError([](const string &error) { cout << "WebSocket failed: " << error << endl; });
- ws->onMessage([&](variant<binary, string> data) {
- if (!holds_alternative<string>(data))
- return;
- json message = json::parse(get<string>(data));
- MainThread.dispatch([message, config, ws]() {
- wsOnMessage(message, config, ws);
- });
- });
- const string url = "ws://" + ip_address + ":" + to_string(port) + "/" + localId;
- cout << "URL is " << url << endl;
- ws->open(url);
- cout << "Waiting for signaling to be connected..." << endl;
- while (!ws->isOpen()) {
- if (ws->isClosed())
- return 1;
- this_thread::sleep_for(100ms);
- }
- while (true) {
- string id;
- cout << "Enter to exit" << endl;
- cin >> id;
- cin.ignore();
- cout << "exiting" << endl;
- break;
- }
- cout << "Cleaning up..." << endl;
- return 0;
- } catch (const std::exception &e) {
- std::cout << "Error: " << e.what() << std::endl;
- return -1;
- }
- shared_ptr<ClientTrackData> addVideo(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
- auto video = Description::Video(cname);
- video.addH264Codec(payloadType);
- video.addSSRC(ssrc, cname, msid, cname);
- auto track = pc->addTrack(video);
- // create RTP configuration
- auto rtpConfig = make_shared<RtpPacketizationConfig>(ssrc, cname, payloadType, H264RtpPacketizer::defaultClockRate);
- // create packetizer
- auto packetizer = make_shared<H264RtpPacketizer>(NalUnit::Separator::Length, rtpConfig);
- // add RTCP SR handler
- auto srReporter = make_shared<RtcpSrReporter>(rtpConfig);
- packetizer->addToChain(srReporter);
- // add RTCP NACK handler
- auto nackResponder = make_shared<RtcpNackResponder>();
- packetizer->addToChain(nackResponder);
- // set handler
- track->setMediaHandler(packetizer);
- track->onOpen(onOpen);
- auto trackData = make_shared<ClientTrackData>(track, srReporter);
- return trackData;
- }
- shared_ptr<ClientTrackData> addAudio(const shared_ptr<PeerConnection> pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function<void (void)> onOpen) {
- auto audio = Description::Audio(cname);
- audio.addOpusCodec(payloadType);
- audio.addSSRC(ssrc, cname, msid, cname);
- auto track = pc->addTrack(audio);
- // create RTP configuration
- auto rtpConfig = make_shared<RtpPacketizationConfig>(ssrc, cname, payloadType, OpusRtpPacketizer::DefaultClockRate);
- // create packetizer
- auto packetizer = make_shared<OpusRtpPacketizer>(rtpConfig);
- // add RTCP SR handler
- auto srReporter = make_shared<RtcpSrReporter>(rtpConfig);
- packetizer->addToChain(srReporter);
- // add RTCP NACK handler
- auto nackResponder = make_shared<RtcpNackResponder>();
- packetizer->addToChain(nackResponder);
- // set handler
- track->setMediaHandler(packetizer);
- track->onOpen(onOpen);
- auto trackData = make_shared<ClientTrackData>(track, srReporter);
- return trackData;
- }
- // Create and setup a PeerConnection
- shared_ptr<Client> createPeerConnection(const Configuration &config,
- weak_ptr<WebSocket> wws,
- string id) {
- auto pc = make_shared<PeerConnection>(config);
- auto client = make_shared<Client>(pc);
- pc->onStateChange([id](PeerConnection::State state) {
- cout << "State: " << state << endl;
- if (state == PeerConnection::State::Disconnected ||
- state == PeerConnection::State::Failed ||
- state == PeerConnection::State::Closed) {
- // remove disconnected client
- MainThread.dispatch([id]() {
- clients.erase(id);
- });
- }
- });
- pc->onGatheringStateChange(
- [wpc = make_weak_ptr(pc), id, wws](PeerConnection::GatheringState state) {
- cout << "Gathering State: " << state << endl;
- if (state == PeerConnection::GatheringState::Complete) {
- if(auto pc = wpc.lock()) {
- auto description = pc->localDescription();
- json message = {
- {"id", id},
- {"type", description->typeString()},
- {"sdp", string(description.value())}
- };
- // Gathering complete, send answer
- if (auto ws = wws.lock()) {
- ws->send(message.dump());
- }
- }
- }
- });
- client->video = addVideo(pc, 102, 1, "video-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
- MainThread.dispatch([wc]() {
- if (auto c = wc.lock()) {
- addToStream(c, true);
- }
- });
- cout << "Video from " << id << " opened" << endl;
- });
- client->audio = addAudio(pc, 111, 2, "audio-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
- MainThread.dispatch([wc]() {
- if (auto c = wc.lock()) {
- addToStream(c, false);
- }
- });
- cout << "Audio from " << id << " opened" << endl;
- });
- auto dc = pc->createDataChannel("ping-pong");
- dc->onOpen([id, wdc = make_weak_ptr(dc)]() {
- if (auto dc = wdc.lock()) {
- dc->send("Ping");
- }
- });
- dc->onMessage(nullptr, [id, wdc = make_weak_ptr(dc)](string msg) {
- cout << "Message from " << id << " received: " << msg << endl;
- if (auto dc = wdc.lock()) {
- dc->send("Ping");
- }
- });
- client->dataChannel = dc;
- pc->setLocalDescription();
- return client;
- };
- /// Create stream
- shared_ptr<Stream> createStream(const string h264Samples, const unsigned fps, const string opusSamples) {
- // video source
- auto video = make_shared<H264FileParser>(h264Samples, fps, true);
- // audio source
- auto audio = make_shared<OPUSFileParser>(opusSamples, true);
- auto stream = make_shared<Stream>(video, audio);
- // set callback responsible for sample sending
- stream->onSample([ws = make_weak_ptr(stream)](Stream::StreamSourceType type, uint64_t sampleTime, rtc::binary sample) {
- vector<ClientTrack> tracks{};
- string streamType = type == Stream::StreamSourceType::Video ? "video" : "audio";
- // get track for given type
- function<optional<shared_ptr<ClientTrackData>> (shared_ptr<Client>)> getTrackData = [type](shared_ptr<Client> client) {
- return type == Stream::StreamSourceType::Video ? client->video : client->audio;
- };
- // get all clients with Ready state
- for(auto id_client: clients) {
- auto id = id_client.first;
- auto client = id_client.second;
- auto optTrackData = getTrackData(client);
- if (client->getState() == Client::State::Ready && optTrackData.has_value()) {
- auto trackData = optTrackData.value();
- tracks.push_back(ClientTrack(id, trackData));
- }
- }
- if (!tracks.empty()) {
- for (auto clientTrack: tracks) {
- auto client = clientTrack.id;
- auto trackData = clientTrack.trackData;
- auto rtpConfig = trackData->sender->rtpConfig;
- // sample time is in us, we need to convert it to seconds
- auto elapsedSeconds = double(sampleTime) / (1000 * 1000);
- // get elapsed time in clock rate
- uint32_t elapsedTimestamp = rtpConfig->secondsToTimestamp(elapsedSeconds);
- // set new timestamp
- rtpConfig->timestamp = rtpConfig->startTimestamp + elapsedTimestamp;
- // get elapsed time in clock rate from last RTCP sender report
- auto reportElapsedTimestamp = rtpConfig->timestamp - trackData->sender->lastReportedTimestamp();
- // check if last report was at least 1 second ago
- if (rtpConfig->timestampToSeconds(reportElapsedTimestamp) > 1) {
- trackData->sender->setNeedsToReport();
- }
- cout << "Sending " << streamType << " sample with size: " << to_string(sample.size()) << " to " << client << endl;
- try {
- // send sample
- trackData->track->send(sample);
- } catch (const std::exception &e) {
- cerr << "Unable to send "<< streamType << " packet: " << e.what() << endl;
- }
- }
- }
- MainThread.dispatch([ws]() {
- if (clients.empty()) {
- // we have no clients, stop the stream
- if (auto stream = ws.lock()) {
- stream->stop();
- }
- }
- });
- });
- return stream;
- }
- /// Start stream
- void startStream() {
- shared_ptr<Stream> stream;
- if (avStream.has_value()) {
- stream = avStream.value();
- if (stream->isRunning) {
- // stream is already running
- return;
- }
- } else {
- stream = createStream(h264SamplesDirectory, 30, opusSamplesDirectory);
- avStream = stream;
- }
- stream->start();
- }
- /// Send previous key frame so browser can show something to user
- /// @param stream Stream
- /// @param video Video track data
- void sendInitialNalus(shared_ptr<Stream> stream, shared_ptr<ClientTrackData> video) {
- auto h264 = dynamic_cast<H264FileParser *>(stream->video.get());
- auto initialNalus = h264->initialNALUS();
- // send previous NALU key frame so users don't have to wait to see stream works
- if (!initialNalus.empty()) {
- const double frameDuration_s = double(h264->getSampleDuration_us()) / (1000 * 1000);
- const uint32_t frameTimestampDuration = video->sender->rtpConfig->secondsToTimestamp(frameDuration_s);
- video->sender->rtpConfig->timestamp = video->sender->rtpConfig->startTimestamp - frameTimestampDuration * 2;
- video->track->send(initialNalus);
- video->sender->rtpConfig->timestamp += frameTimestampDuration;
- // Send initial NAL units again to start stream in firefox browser
- video->track->send(initialNalus);
- }
- }
- /// Add client to stream
- /// @param client Client
- /// @param adding_video True if adding video
- void addToStream(shared_ptr<Client> client, bool isAddingVideo) {
- if (client->getState() == Client::State::Waiting) {
- client->setState(isAddingVideo ? Client::State::WaitingForAudio : Client::State::WaitingForVideo);
- } else if ((client->getState() == Client::State::WaitingForAudio && !isAddingVideo)
- || (client->getState() == Client::State::WaitingForVideo && isAddingVideo)) {
- // Audio and video tracks are collected now
- assert(client->video.has_value() && client->audio.has_value());
- auto video = client->video.value();
- if (avStream.has_value()) {
- sendInitialNalus(avStream.value(), video);
- }
- client->setState(Client::State::Ready);
- }
- if (client->getState() == Client::State::Ready) {
- startStream();
- }
- }
|