/*
* libdatachannel client example
* Copyright (c) 2019-2020 Paul-Louis Ageneau
* Copyright (c) 2019 Murat Dogan
* Copyright (c) 2020 Will Munn
* Copyright (c) 2020 Nico Chatzi
* Copyright (c) 2020 Lara Mackey
* Copyright (c) 2020 Erik Cota-Robles
* Copyright (c) 2020 Filip Klembara (in2core)
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; If not, see .
*/
#include "nlohmann/json.hpp"
#include "h264fileparser.hpp"
#include "opusfileparser.hpp"
#include "helpers.hpp"
using namespace rtc;
using namespace std;
using namespace std::chrono_literals;
using json = nlohmann::json;
template weak_ptr make_weak_ptr(shared_ptr ptr) { return ptr; }
/// all connected clients
unordered_map> clients{};
/// Creates peer connection and client representation
/// @param config Configuration
/// @param wws Websocket for signaling
/// @param id Client ID
/// @returns Client
shared_ptr createPeerConnection(const Configuration &config,
weak_ptr wws,
string id);
/// Creates stream
/// @param h264Samples Directory with H264 samples
/// @param fps Video FPS
/// @param opusSamples Directory with opus samples
/// @returns Stream object
shared_ptr createStream(const string h264Samples, const unsigned fps, const string opusSamples);
/// Add client to stream
/// @param client Client
/// @param adding_video True if adding video
void addToStream(shared_ptr client, bool isAddingVideo);
/// Start stream
void startStream();
/// Main dispatch queue
DispatchQueue MainThread("Main");
/// Audio and video stream
optional> avStream = nullopt;
const string defaultRootDirectory = "../../../../examples/streamer/samples/";
const string defaultH264SamplesDirectory = defaultRootDirectory + "h264/";
string h264SamplesDirectory = defaultH264SamplesDirectory;
const string defaultOpusSamplesDirectory = defaultRootDirectory + "opus/";
string opusSamplesDirectory = defaultOpusSamplesDirectory;
const string defaultIPAddress = "127.0.0.1";
const uint16_t defaultPort = 8000;
string ip_address = defaultIPAddress;
uint16_t port = defaultPort;
/// Incomming message handler for websocket
/// @param message Incommint message
/// @param config Configuration
/// @param ws Websocket
void wsOnMessage(json message, Configuration config, shared_ptr ws) {
auto it = message.find("id");
if (it == message.end())
return;
string id = it->get();
it = message.find("type");
if (it == message.end())
return;
string type = it->get();
if (type == "streamRequest") {
shared_ptr c = createPeerConnection(config, make_weak_ptr(ws), id);
clients.emplace(id, c);
} else if (type == "answer") {
shared_ptr c;
if (auto jt = clients.find(id); jt != clients.end()) {
auto pc = clients.at(id)->peerConnection;
auto sdp = message["sdp"].get();
auto description = Description(sdp, type);
pc->setRemoteDescription(description);
}
}
}
int main(int argc, char **argv) try {
bool enableDebugLogs = false;
bool printHelp = false;
int c = 0;
// parse arguments
while ((c = getopt (argc, argv, "a:b:d:p:vh")) != -1) {
switch (c) {
case 'a':
opusSamplesDirectory = string(optarg) + "/";
break;
case 'b':
h264SamplesDirectory = string(optarg) + "/";
break;
case 'd':
ip_address = string(optarg);
break;
case 'p':
port = atoi(optarg);
break;
case 'v':
enableDebugLogs = true;
break;
case 'h':
printHelp = true;
break;
case '?':
if (optopt == 'a' || optopt == 'b' || optopt == 'd' || optopt == 'p') {
string s(1, optopt);
cerr << "Option -" << s <<" requires an argument." << endl;
} else if (isprint(optopt)) {
string s(1, optopt);
cerr << "Unknown option `-" << s << "'." << endl;
} else {
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (printHelp) {
cout << "usage: stream-h264 [-a opus_samples_folder] [-b h264_samples_folder] [-d ip_address] [-p port] [-v] [-h]" << endl
<< "Arguments:" << endl
<< "\t -a " << "Directory with opus samples (default: " << defaultOpusSamplesDirectory << ")." << endl
<< "\t -b " << "Directory with H264 samples (default: " << defaultH264SamplesDirectory << ")." << endl
<< "\t -d " << "Signaling server IP address (default: " << defaultIPAddress << ")." << endl
<< "\t -p " << "Signaling server port (default: " << defaultPort << ")." << endl
<< "\t -v " << "Enable debug logs." << endl
<< "\t -h " << "Print this help and exit." << endl;
return 0;
}
if (enableDebugLogs) {
InitLogger(LogLevel::Debug);
}
Configuration config;
string stunServer = "stun:stun.l.google.com:19302";
cout << "Stun server is " << stunServer << endl;
config.iceServers.emplace_back(stunServer);
string localId = "server";
cout << "The local ID is: " << localId << endl;
auto ws = make_shared();
ws->onOpen([]() { cout << "WebSocket connected, signaling ready" << endl; });
ws->onClosed([]() { cout << "WebSocket closed" << endl; });
ws->onError([](const string &error) { cout << "WebSocket failed: " << error << endl; });
ws->onMessage([&](variant data) {
if (!holds_alternative(data))
return;
json message = json::parse(get(data));
MainThread.dispatch([message, config, ws]() {
wsOnMessage(message, config, ws);
});
});
const string url = "ws://" + ip_address + ":" + to_string(port) + "/" + localId;
cout << "Url is " << url << endl;
ws->open(url);
cout << "Waiting for signaling to be connected..." << endl;
while (!ws->isOpen()) {
if (ws->isClosed())
return 1;
this_thread::sleep_for(100ms);
}
while (true) {
string id;
cout << "Enter to exit" << endl;
cin >> id;
cin.ignore();
cout << "exiting" << endl;
break;
}
cout << "Cleaning up..." << endl;
return 0;
} catch (const std::exception &e) {
std::cout << "Error: " << e.what() << std::endl;
return -1;
}
shared_ptr addVideo(const shared_ptr pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function onOpen) {
auto video = Description::Video(cname);
video.addH264Codec(payloadType);
video.addSSRC(ssrc, cname, msid);
auto track = pc->addTrack(video);
// create RTP configuration
auto rtpConfig = shared_ptr(new RTPPacketizationConfig(ssrc, cname, payloadType, H264RTPPacketizer::defaultClockRate));
// create packetizer
auto packetizer = shared_ptr(new H264RTPPacketizer(rtpConfig));
// create H264 and RTCP SP handler
shared_ptr h264Handler(new H264PacketizationHandler(H264PacketizationHandler::Separator::Length, packetizer));
// set handler
track->setRtcpHandler(h264Handler);
track->onOpen(onOpen);
auto trackData = make_shared(track, h264Handler);
return trackData;
}
shared_ptr addAudio(const shared_ptr pc, const uint8_t payloadType, const uint32_t ssrc, const string cname, const string msid, const function onOpen) {
auto audio = Description::Audio(cname);
audio.addOpusCodec(payloadType);
audio.addSSRC(ssrc, cname, msid);
auto track = pc->addTrack(audio);
// create RTP configuration
auto rtpConfig = shared_ptr(new RTPPacketizationConfig(ssrc, cname, payloadType, OpusRTPPacketizer::defaultClockRate));
// create packetizer
auto packetizer = make_shared(rtpConfig);
// create opus and RTCP SP handler
auto opusHandler = make_shared(packetizer);
// set handler
track->setRtcpHandler(opusHandler);
track->onOpen(onOpen);
auto trackData = make_shared(track, opusHandler);
return trackData;
}
// Create and setup a PeerConnection
shared_ptr createPeerConnection(const Configuration &config,
weak_ptr wws,
string id) {
auto pc = make_shared(config);
shared_ptr client(new Client(pc));
pc->onStateChange([id](PeerConnection::State state) {
cout << "State: " << state << endl;
if (state == PeerConnection::State::Disconnected ||
state == PeerConnection::State::Failed ||
state == PeerConnection::State::Closed) {
// remove disconnected client
MainThread.dispatch([id]() {
clients.erase(id);
});
}
});
pc->onGatheringStateChange(
[wpc = make_weak_ptr(pc), id, wws](PeerConnection::GatheringState state) {
cout << "Gathering State: " << state << endl;
if (state == PeerConnection::GatheringState::Complete) {
if(auto pc = wpc.lock()) {
auto description = pc->localDescription();
json message = {
{"id", id},
{"type", description->typeString()},
{"sdp", string(description.value())}
};
// Gathering complete, send answer
if (auto ws = wws.lock()) {
ws->send(message.dump());
}
}
}
});
client->video = addVideo(pc, 102, 1, "video-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
MainThread.dispatch([wc]() {
if (auto c = wc.lock()) {
addToStream(c, true);
}
});
cout << "Video from " << id << " opened" << endl;
});
client->audio = addAudio(pc, 111, 2, "audio-stream", "stream1", [id, wc = make_weak_ptr(client)]() {
MainThread.dispatch([wc]() {
if (auto c = wc.lock()) {
addToStream(c, false);
}
});
cout << "Audio from " << id << " opened" << endl;
});
auto dc = pc->addDataChannel("ping-pong");
dc->onOpen([id, wdc = make_weak_ptr(dc)]() {
if (auto dc = wdc.lock()) {
dc->send("Ping");
}
});
dc->onMessage(nullptr, [id, wdc = make_weak_ptr(dc)](string msg) {
cout << "Message from " << id << " received: " << msg << endl;
if (auto dc = wdc.lock()) {
dc->send("Ping");
}
});
client->dataChannel = dc;
pc->setLocalDescription();
return client;
};
/// Create stream
shared_ptr createStream(const string h264Samples, const unsigned fps, const string opusSamples) {
// video source
auto video = make_shared(h264Samples, fps, true);
// audio source
auto audio = make_shared(opusSamples, true);
auto stream = make_shared(video, audio);
// set callback responsible for sample sending
stream->onSample([ws = make_weak_ptr(stream)](Stream::StreamSourceType type, uint64_t sampleTime, rtc::binary sample) {
vector tracks{};
string streamType = type == Stream::StreamSourceType::Video ? "video" : "audio";
// get track for given type
function> (shared_ptr)> getTrackData = [type](shared_ptr client) {
return type == Stream::StreamSourceType::Video ? client->video : client->audio;
};
// get all clients with Ready state
for(auto id_client: clients) {
auto id = id_client.first;
auto client = id_client.second;
auto optTrackData = getTrackData(client);
if (client->getState() == Client::State::Ready && optTrackData.has_value()) {
auto trackData = optTrackData.value();
tracks.push_back(ClientTrack(id, trackData));
}
}
if (!tracks.empty()) {
auto message = make_message(move(sample));
for (auto clientTrack: tracks) {
auto client = clientTrack.id;
auto trackData = clientTrack.trackData;
// sample time is in us, we need to convert it to seconds
auto elapsedSeconds = double(sampleTime) / (1000 * 1000);
auto rtpConfig = trackData->sender->rtpConfig;
// get elapsed time in clock rate
uint32_t elapsedTimestamp = rtpConfig->secondsToTimestamp(elapsedSeconds);
// set new timestamp
rtpConfig->timestamp = rtpConfig->startTimestamp + elapsedTimestamp;
// get elapsed time in clock rate from last RTCP sender report
auto reportElapsedTimestamp = rtpConfig->timestamp - trackData->sender->previousReportedTimestamp;
// check if last report was at least 1 second ago
if (rtpConfig->timestampToSeconds(reportElapsedTimestamp) > 1) {
trackData->sender->setNeedsToReport();
}
cout << "Sending " << streamType << " sample with size: " << to_string(message->size()) << " to " << client << endl;
bool send = false;
try {
// send sample
send = trackData->track->send(*message);
} catch (...) {
send = false;
}
if (!send) {
cerr << "Unable to send "<< streamType << " packet" << endl;
break;
}
}
}
MainThread.dispatch([ws]() {
if (clients.empty()) {
// we have no clients, stop the stream
if (auto stream = ws.lock()) {
stream->stop();
}
}
});
});
return stream;
}
/// Start stream
void startStream() {
shared_ptr stream;
if (avStream.has_value()) {
stream = avStream.value();
if (stream->isRunning) {
// stream is already running
return;
}
} else {
stream = createStream(h264SamplesDirectory, 30, opusSamplesDirectory);
avStream = stream;
}
stream->start();
}
/// Send previous key frame so browser can show something to user
/// @param stream Stream
/// @param video Video track data
void sendInitialNalus(shared_ptr stream, shared_ptr video) {
auto h264 = dynamic_cast(stream->video.get());
auto initialNalus = h264->initialNALUS();
// send previous NALU key frame so users don't have to wait to see stream works
if (!initialNalus.empty()) {
const double frameDuration_s = double(h264->sampleDuration_us) / (1000 * 1000);
const uint32_t frameTimestampDuration = video->sender->rtpConfig->secondsToTimestamp(frameDuration_s);
video->sender->rtpConfig->timestamp = video->sender->rtpConfig->startTimestamp - frameTimestampDuration * 2;
video->track->send(initialNalus);
video->sender->rtpConfig->timestamp += frameTimestampDuration;
// Send initial NAL units again to start stream in firefox browser
video->track->send(initialNalus);
}
}
/// Add client to stream
/// @param client Client
/// @param adding_video True if adding video
void addToStream(shared_ptr client, bool isAddingVideo) {
if (client->getState() == Client::State::Waiting) {
client->setState(isAddingVideo ? Client::State::WaitingForAudio : Client::State::WaitingForVideo);
} else if ((client->getState() == Client::State::WaitingForAudio && !isAddingVideo)
|| (client->getState() == Client::State::WaitingForVideo && isAddingVideo)) {
// Audio and video tracks are collected now
assert(client->video.has_value() && client->audio.has_value());
auto video = client->video.value();
auto audio = client->audio.value();
auto currentTime_us = double(currentTimeInMicroSeconds());
auto currentTime_s = currentTime_us / (1000 * 1000);
// set start time of stream
video->sender->rtpConfig->setStartTime(currentTime_s, RTPPacketizationConfig::EpochStart::T1970);
audio->sender->rtpConfig->setStartTime(currentTime_s, RTPPacketizationConfig::EpochStart::T1970);
// start stat recording of RTCP SR
video->sender->startRecording();
audio->sender->startRecording();
if (avStream.has_value()) {
sendInitialNalus(avStream.value(), video);
}
client->setState(Client::State::Ready);
}
if (client->getState() == Client::State::Ready) {
startStream();
}
}