Browse Source

Integrated libjuice for ICE transport

Paul-Louis Ageneau 5 years ago
parent
commit
0ded19992c
5 changed files with 279 additions and 34 deletions
  1. 1 1
      deps/libjuice
  2. 14 9
      include/rtc/include.hpp
  3. 1 1
      src/dtlstransport.cpp
  4. 230 13
      src/icetransport.cpp
  5. 33 10
      src/icetransport.hpp

+ 1 - 1
deps/libjuice

@@ -1 +1 @@
-Subproject commit b2c4e9f6825af02be419a9673c9b5aa1ad2066df
+Subproject commit 57fa7515e7fb3c1725c638b4697fa747cd1ae315

+ 14 - 9
include/rtc/include.hpp

@@ -44,6 +44,8 @@ using std::uint32_t;
 using std::uint64_t;
 using std::uint8_t;
 
+// Constants
+
 const size_t MAX_NUMERICNODE_LEN = 48; // Max IPv6 string representation length
 const size_t MAX_NUMERICSERV_LEN = 6;  // Max port string representation length
 
@@ -51,16 +53,9 @@ const uint16_t DEFAULT_SCTP_PORT = 5000; // SCTP port to use by default
 const size_t DEFAULT_MAX_MESSAGE_SIZE = 65536;    // Remote max message size if not specified in SDP
 const size_t LOCAL_MAX_MESSAGE_SIZE = 256 * 1024; // Local max message size
 
-inline void InitLogger(plog::Severity severity, plog::IAppender *appender = nullptr) {
-	static plog::ColorConsoleAppender<plog::TxtFormatter> consoleAppender;
-	if (!appender)
-		appender = &consoleAppender;
-	plog::init(severity, appender);
-	PLOG_DEBUG << "Logger initialized";
-}
+// Log
 
-// Don't change, it must match plog severity
-enum class LogLevel {
+enum class LogLevel { // Don't change, it must match plog severity
 	None = 0,
 	Fatal = 1,
 	Error = 2,
@@ -70,8 +65,18 @@ enum class LogLevel {
 	Verbose = 6
 };
 
+inline void InitLogger(plog::Severity severity, plog::IAppender *appender = nullptr) {
+	static plog::ColorConsoleAppender<plog::TxtFormatter> consoleAppender;
+	if (!appender)
+		appender = &consoleAppender;
+	plog::init(severity, appender);
+	PLOG_DEBUG << "Logger initialized";
+}
+
 inline void InitLogger(LogLevel level) { InitLogger(static_cast<plog::Severity>(level)); }
 
+// Utils
+
 template <class... Ts> struct overloaded : Ts... { using Ts::operator()...; };
 template <class... Ts> overloaded(Ts...)->overloaded<Ts...>;
 

+ 1 - 1
src/dtlstransport.cpp

@@ -270,7 +270,7 @@ int DtlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms)
 
 } // namespace rtc
 
-#else
+#else // USE_GNUTLS==0
 
 #include <openssl/bio.h>
 #include <openssl/ec.h>

+ 230 - 13
src/icetransport.cpp

@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2019 Paul-Louis Ageneau
+ * Copyright (c) 2019-2020 Paul-Louis Ageneau
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -28,23 +28,238 @@
 #include <random>
 #include <sstream>
 
-namespace rtc {
-
 using namespace std::chrono_literals;
 
 using std::shared_ptr;
 using std::weak_ptr;
 
+#if USE_JUICE
+
+namespace rtc {
+
 IceTransport::IceTransport(const Configuration &config, Description::Role role,
                            candidate_callback candidateCallback, state_callback stateChangeCallback,
                            gathering_state_callback gatheringStateChangeCallback)
     : mRole(role), mMid("0"), mState(State::Disconnected), mGatheringState(GatheringState::New),
-      mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr),
       mCandidateCallback(std::move(candidateCallback)),
       mStateChangeCallback(std::move(stateChangeCallback)),
-      mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)) {
+      mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)),
+      mAgent(nullptr, nullptr) {
+
+	PLOG_DEBUG << "Initializing ICE transport (libjuice)";
+	if (config.enableIceTcp) {
+		PLOG_WARNING << "ICE-TCP is not supported with libjuice";
+	}
+	juice_set_log_handler(IceTransport::LogCallback);
+
+	juice_config_t jconfig = {};
+	jconfig.cb_state_changed = IceTransport::StateChangeCallback;
+	jconfig.cb_candidate = IceTransport::CandidateCallback;
+	jconfig.cb_gathering_done = IceTransport::GatheringDoneCallback;
+	jconfig.cb_recv = IceTransport::RecvCallback;
+	jconfig.user_ptr = this;
+
+	// Randomize servers order
+	std::vector<IceServer> servers = config.iceServers;
+	unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
+	std::shuffle(servers.begin(), servers.end(), std::default_random_engine(seed));
+
+	// Pick a STUN server
+	for (auto &server : servers) {
+		if (!server.hostname.empty() && server.type == IceServer::Type::Stun) {
+			if (server.service.empty())
+				server.service = "3478"; // STUN UDP port
+			jconfig.stun_server_host = server.hostname.c_str();
+			jconfig.stun_server_port = std::stoul(server.service);
+		}
+	}
+
+	// TURN support is not implemented yet
+
+	// Create agent
+	mAgent = decltype(mAgent)(juice_create(&jconfig), juice_destroy);
+	if (!mAgent)
+		throw std::runtime_error("Failed to create the ICE agent");
+}
 
-	PLOG_DEBUG << "Initializing ICE transport";
+IceTransport::~IceTransport() { stop(); }
+
+void IceTransport::stop() {
+	// Nothing to do
+}
+
+Description::Role IceTransport::role() const { return mRole; }
+
+IceTransport::State IceTransport::state() const { return mState; }
+
+Description IceTransport::getLocalDescription(Description::Type type) const {
+	char sdp[JUICE_MAX_SDP_STRING_LEN];
+	if (juice_get_local_description(mAgent.get(), sdp, JUICE_MAX_SDP_STRING_LEN) < 0)
+		throw std::runtime_error("Failed to generate local SDP");
+
+	return Description(string(sdp), type, mRole);
+}
+
+void IceTransport::setRemoteDescription(const Description &description) {
+	mRole = description.role() == Description::Role::Active ? Description::Role::Passive
+	                                                        : Description::Role::Active;
+	mMid = description.mid();
+	// TODO
+	// mTrickleTimeout = description.trickleEnabled() ? 30s : 0s;
+
+	if (juice_set_remote_description(mAgent.get(), string(description).c_str()) < 0)
+		throw std::runtime_error("Failed to parse remote SDP");
+}
+
+bool IceTransport::addRemoteCandidate(const Candidate &candidate) {
+	// Don't try to pass unresolved candidates for more safety
+	if (!candidate.isResolved())
+		return false;
+
+	return juice_add_remote_candidate(mAgent.get(), string(candidate).c_str()) >= 0;
+}
+
+void IceTransport::gatherLocalCandidates() {
+	// Change state now as candidates calls can be synchronous
+	changeGatheringState(GatheringState::InProgress);
+
+	if (juice_gather_candidates(mAgent.get()) < 0) {
+		throw std::runtime_error("Failed to gather local ICE candidates");
+	}
+}
+
+std::optional<string> IceTransport::getLocalAddress() const {
+	char str[JUICE_MAX_ADDRESS_STRING_LEN];
+	if (juice_get_selected_addresses(mAgent.get(), str, JUICE_MAX_ADDRESS_STRING_LEN, NULL, 0)) {
+		return std::make_optional(string(str));
+	}
+	return nullopt;
+}
+std::optional<string> IceTransport::getRemoteAddress() const {
+	char str[JUICE_MAX_ADDRESS_STRING_LEN];
+	if (juice_get_selected_addresses(mAgent.get(), NULL, 0, str, JUICE_MAX_ADDRESS_STRING_LEN)) {
+		return std::make_optional(string(str));
+	}
+	return nullopt;
+}
+
+bool IceTransport::send(message_ptr message) {
+	if (!message || (mState != State::Connected && mState != State::Completed))
+		return false;
+
+	PLOG_VERBOSE << "Send size=" << message->size();
+	return outgoing(message);
+}
+
+void IceTransport::incoming(message_ptr message) { recv(message); }
+
+void IceTransport::incoming(const byte *data, int size) {
+	incoming(make_message(data, data + size));
+}
+
+bool IceTransport::outgoing(message_ptr message) {
+	return juice_send(mAgent.get(), reinterpret_cast<const char *>(message->data()),
+	                  message->size()) >= 0;
+}
+
+void IceTransport::changeState(State state) {
+	if (mState.exchange(state) != state)
+		mStateChangeCallback(mState);
+}
+
+void IceTransport::changeGatheringState(GatheringState state) {
+	if (mGatheringState.exchange(state) != state)
+		mGatheringStateChangeCallback(mGatheringState);
+}
+
+void IceTransport::processStateChange(unsigned int state) {
+	changeState(static_cast<State>(state));
+}
+
+void IceTransport::processCandidate(const string &candidate) {
+	mCandidateCallback(Candidate(candidate, mMid));
+}
+
+void IceTransport::processGatheringDone() { changeGatheringState(GatheringState::Complete); }
+
+void IceTransport::StateChangeCallback(juice_agent_t *agent, juice_state_t state, void *user_ptr) {
+	auto iceTransport = static_cast<rtc::IceTransport *>(user_ptr);
+	try {
+		iceTransport->processStateChange(static_cast<unsigned int>(state));
+	} catch (const std::exception &e) {
+		PLOG_WARNING << e.what();
+	}
+}
+
+void IceTransport::CandidateCallback(juice_agent_t *agent, const char *sdp, void *user_ptr) {
+	auto iceTransport = static_cast<rtc::IceTransport *>(user_ptr);
+	try {
+		iceTransport->processCandidate(sdp);
+	} catch (const std::exception &e) {
+		PLOG_WARNING << e.what();
+	}
+}
+
+void IceTransport::GatheringDoneCallback(juice_agent_t *agent, void *user_ptr) {
+	auto iceTransport = static_cast<rtc::IceTransport *>(user_ptr);
+	try {
+		iceTransport->processGatheringDone();
+	} catch (const std::exception &e) {
+		PLOG_WARNING << e.what();
+	}
+}
+
+void IceTransport::RecvCallback(juice_agent_t *agent, const char *data, size_t size,
+                                void *user_ptr) {
+	auto iceTransport = static_cast<rtc::IceTransport *>(user_ptr);
+	try {
+		iceTransport->incoming(reinterpret_cast<const byte *>(data), size);
+	} catch (const std::exception &e) {
+		PLOG_WARNING << e.what();
+	}
+}
+
+void IceTransport::LogCallback(juice_log_level_t level, const char *message) {
+	plog::Severity severity;
+	switch (level) {
+	case JUICE_LOG_LEVEL_FATAL:
+		severity = plog::fatal;
+		break;
+	case JUICE_LOG_LEVEL_ERROR:
+		severity = plog::error;
+		break;
+	case JUICE_LOG_LEVEL_WARN:
+		severity = plog::warning;
+		break;
+	case JUICE_LOG_LEVEL_INFO:
+		severity = plog::info;
+		break;
+	case JUICE_LOG_LEVEL_DEBUG:
+		severity = plog::debug;
+		break;
+	default:
+		severity = plog::verbose;
+		break;
+	}
+	PLOG(severity) << "juice: " << message;
+}
+
+} // namespace rtc
+
+#else // USE_JUICE == 0
+
+namespace rtc {
+
+IceTransport::IceTransport(const Configuration &config, Description::Role role,
+                           candidate_callback candidateCallback, state_callback stateChangeCallback,
+                           gathering_state_callback gatheringStateChangeCallback)
+    : mRole(role), mMid("0"), mState(State::Disconnected), mGatheringState(GatheringState::New),
+      mCandidateCallback(std::move(candidateCallback)),
+      mStateChangeCallback(std::move(stateChangeCallback)),
+      mGatheringStateChangeCallback(std::move(gatheringStateChangeCallback)),
+      mNiceAgent(nullptr, nullptr), mMainLoop(nullptr, nullptr) {
+
+	PLOG_DEBUG << "Initializing ICE transport (libnice)";
 
 	g_log_set_handler("libnice", G_LOG_LEVEL_MASK, LogCallback, this);
 
@@ -305,24 +520,24 @@ void IceTransport::changeState(State state) {
 		mStateChangeCallback(mState);
 }
 
+void IceTransport::changeGatheringState(GatheringState state) {
+	if (mGatheringState.exchange(state) != state)
+		mGatheringStateChangeCallback(mGatheringState);
+}
+
 void IceTransport::processTimeout() {
 	PLOG_WARNING << "ICE timeout";
 	mTimeoutId = 0;
 	changeState(State::Failed);
 }
 
-void IceTransport::changeGatheringState(GatheringState state) {
-	mGatheringState = state;
-	mGatheringStateChangeCallback(mGatheringState);
-}
-
 void IceTransport::processCandidate(const string &candidate) {
 	mCandidateCallback(Candidate(candidate, mMid));
 }
 
 void IceTransport::processGatheringDone() { changeGatheringState(GatheringState::Complete); }
 
-void IceTransport::processStateChange(uint32_t state) {
+void IceTransport::processStateChange(unsigned int state) {
 	if (state == NICE_COMPONENT_STATE_FAILED && mTrickleTimeout.count() > 0) {
 		if (mTimeoutId)
 			g_source_remove(mTimeoutId);
@@ -415,7 +630,9 @@ void IceTransport::LogCallback(const gchar *logDomain, GLogLevelFlags logLevel,
 	else
 		severity = plog::verbose; // libnice debug as verbose
 
-	PLOG(severity) << message;
+	PLOG(severity) << "nice: " << message;
 }
 
 } // namespace rtc
+
+#endif

+ 33 - 10
src/icetransport.hpp

@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2019 Paul-Louis Ageneau
+ * Copyright (c) 2019-2020 Paul-Louis Ageneau
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -27,7 +27,11 @@
 #include "transport.hpp"
 
 extern "C" {
+#if USE_JUICE
+#include <juice/juice.h>
+#else
 #include <nice/agent.h>
+#endif
 }
 
 #include <atomic>
@@ -38,14 +42,23 @@ namespace rtc {
 
 class IceTransport : public Transport {
 public:
-	enum class State : uint32_t {
+#if USE_JUICE
+	enum class State : unsigned int{
+	    Disconnected = JUICE_STATE_DISCONNECTED,
+	    Connecting = JUICE_STATE_CONNECTING,
+	    Connected = JUICE_STATE_CONNECTED,
+	    Completed = JUICE_STATE_COMPLETED,
+	    Failed = JUICE_STATE_FAILED,
+	};
+#else
+	enum class State : unsigned int {
 		Disconnected = NICE_COMPONENT_STATE_DISCONNECTED,
 		Connecting = NICE_COMPONENT_STATE_CONNECTING,
 		Connected = NICE_COMPONENT_STATE_CONNECTED,
 		Completed = NICE_COMPONENT_STATE_READY,
-		Failed = NICE_COMPONENT_STATE_FAILED
+		Failed = NICE_COMPONENT_STATE_FAILED,
 	};
-
+#endif
 	enum class GatheringState { New = 0, InProgress = 1, Complete = 2 };
 
 	using candidate_callback = std::function<void(const Candidate &candidate)>;
@@ -79,9 +92,9 @@ private:
 	void changeState(State state);
 	void changeGatheringState(GatheringState state);
 
+	void processStateChange(unsigned int state);
 	void processCandidate(const string &candidate);
 	void processGatheringDone();
-	void processStateChange(uint32_t state);
 	void processTimeout();
 
 	Description::Role mRole;
@@ -90,27 +103,37 @@ private:
 	std::atomic<State> mState;
 	std::atomic<GatheringState> mGatheringState;
 
+	candidate_callback mCandidateCallback;
+	state_callback mStateChangeCallback;
+	gathering_state_callback mGatheringStateChangeCallback;
+
+#if USE_JUICE
+	std::unique_ptr<juice_agent_t, void (*)(juice_agent_t *)> mAgent;
+
+	static void StateChangeCallback(juice_agent_t *agent, juice_state_t state, void *user_ptr);
+	static void CandidateCallback(juice_agent_t *agent, const char *sdp, void *user_ptr);
+	static void GatheringDoneCallback(juice_agent_t *agent, void *user_ptr);
+	static void RecvCallback(juice_agent_t *agent, const char *data, size_t size, void *user_ptr);
+	static void LogCallback(juice_log_level_t level, const char *message);
+#else
 	uint32_t mStreamId = 0;
 	std::unique_ptr<NiceAgent, void (*)(gpointer)> mNiceAgent;
 	std::unique_ptr<GMainLoop, void (*)(GMainLoop *)> mMainLoop;
 	std::thread mMainLoopThread;
 	guint mTimeoutId = 0;
 
-	candidate_callback mCandidateCallback;
-	state_callback mStateChangeCallback;
-	gathering_state_callback mGatheringStateChangeCallback;
-
 	static string AddressToString(const NiceAddress &addr);
 
 	static void CandidateCallback(NiceAgent *agent, NiceCandidate *candidate, gpointer userData);
 	static void GatheringDoneCallback(NiceAgent *agent, guint streamId, gpointer userData);
 	static void StateChangeCallback(NiceAgent *agent, guint streamId, guint componentId,
-	                                 guint state, gpointer userData);
+	                                guint state, gpointer userData);
 	static void RecvCallback(NiceAgent *agent, guint stream_id, guint component_id, guint len,
 	                         gchar *buf, gpointer userData);
 	static gboolean TimeoutCallback(gpointer userData);
 	static void LogCallback(const gchar *log_domain, GLogLevelFlags log_level, const gchar *message,
 	                        gpointer user_data);
+#endif
 };
 
 } // namespace rtc