From 4fea4d63ca04ab75a42e20c8008e06098acef18c Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Thu, 20 Mar 2025 21:16:03 -0600 Subject: [PATCH] allow passing in TLS connector We've been through a number of different possible rustls connectors, and we're currently using one (rustls-platform-verifier) that doesn't have feature flags in livekit or tungstenite yet. Instead of requiring that every transitive dependency supports the rustls feature flags we need, it's much easier to pass only the __rustls-tls feature flag and pass the Connector that we create with our configuration down the stack. --- livekit-api/src/signal_client/mod.rs | 42 ++++++++++++++----- .../src/signal_client/signal_stream.rs | 17 +++++--- livekit/src/room/mod.rs | 27 ++++++++++-- 3 files changed, 65 insertions(+), 21 deletions(-) diff --git a/livekit-api/src/signal_client/mod.rs b/livekit-api/src/signal_client/mod.rs index e29a9d389..7b9deec17 100644 --- a/livekit-api/src/signal_client/mod.rs +++ b/livekit-api/src/signal_client/mod.rs @@ -42,6 +42,11 @@ mod signal_stream; pub use region::RegionUrlProvider; +#[cfg(feature = "signal-client-tokio")] +pub use tokio_tungstenite::Connector; +#[cfg(not(feature = "signal-client-tokio"))] +pub enum Connector {} + pub type SignalEmitter = mpsc::UnboundedSender; pub type SignalEvents = mpsc::UnboundedReceiver; pub type SignalResult = Result; @@ -84,11 +89,12 @@ impl Default for SignalSdkOptions { } } -#[derive(Debug, Clone)] +#[derive(Clone)] #[non_exhaustive] pub struct SignalOptions { pub auto_subscribe: bool, pub adaptive_stream: bool, + pub connector: Option, pub sdk_options: SignalSdkOptions, } @@ -97,11 +103,23 @@ impl Default for SignalOptions { Self { auto_subscribe: true, adaptive_stream: false, + connector: None, sdk_options: SignalSdkOptions::default(), } } } +impl Debug for SignalOptions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SignalClient") + .field("auto_subscribe", &self.auto_subscribe) + .field("adaptive_stream", &self.adaptive_stream) + .field("connector", &self.connector.is_some()) + .field("sdk_options", &self.sdk_options) + .finish() + } +} + pub enum SignalEvent { /// Received a message from the server Message(Box), @@ -250,17 +268,18 @@ impl SignalInner { let lk_url = get_livekit_url(url, &options)?; // Try to connect to the SignalClient - let (stream, mut events) = match SignalStream::connect(lk_url.clone(), token).await { - Ok(stream) => stream, - Err(err) => { - if let SignalError::TokenFormat = err { + let (stream, mut events) = + match SignalStream::connect(lk_url.clone(), token, options.connector.clone()).await { + Ok(stream) => stream, + Err(err) => { + if let SignalError::TokenFormat = err { + return Err(err); + } + // Connection failed, try to retrieve more informations + Self::validate(lk_url).await?; return Err(err); } - // Connection failed, try to retrieve more informations - Self::validate(lk_url).await?; - return Err(err); - } - }; + }; let join_response = get_join_response(&mut events).await?; @@ -322,7 +341,8 @@ impl SignalInner { let mut lk_url = get_livekit_url(&self.url, &self.options).unwrap(); lk_url.query_pairs_mut().append_pair("reconnect", "1").append_pair("sid", sid); - let (new_stream, mut events) = SignalStream::connect(lk_url, &token).await?; + let (new_stream, mut events) = + SignalStream::connect(lk_url, &token, self.options.connector.clone()).await?; let reconnect_response = get_reconnect_response(&mut events).await?; *stream = Some(new_stream); diff --git a/livekit-api/src/signal_client/signal_stream.rs b/livekit-api/src/signal_client/signal_stream.rs index 37742b7a9..a01714f6a 100644 --- a/livekit-api/src/signal_client/signal_stream.rs +++ b/livekit-api/src/signal_client/signal_stream.rs @@ -34,12 +34,12 @@ use tokio::{ #[cfg(feature = "signal-client-tokio")] use tokio_tungstenite::{ - connect_async, + client_async_with_config, connect_async_tls_with_config, tungstenite::client::IntoClientRequest, tungstenite::error::ProtocolError, tungstenite::http::{header::AUTHORIZATION, HeaderValue}, tungstenite::{Error as WsError, Message}, - MaybeTlsStream, WebSocketStream, + Connector, MaybeTlsStream, WebSocketStream, }; #[cfg(feature = "__signal-client-async-compatible")] @@ -88,6 +88,7 @@ impl SignalStream { pub async fn connect( url: url::Url, token: &str, + tls_connector: Option, ) -> SignalResult<(Self, mpsc::UnboundedReceiver>)> { log::info!("connecting to {}", url); let mut request = url.clone().into_client_request()?; @@ -288,17 +289,18 @@ impl SignalStream { }; // Now perform WebSocket handshake over the established connection - let (ws_stream, _) = - tokio_tungstenite::client_async_with_config(request, stream, None).await?; + let (ws_stream, _) = client_async_with_config(request, stream, None).await?; ws_stream } else { // No proxy specified, connect directly - let (ws_stream, _) = connect_async(request).await?; + let (ws_stream, _) = + connect_async_tls_with_config(request, None, false, tls_connector).await?; ws_stream } } else { // Non-tokio build or no proxy - connect directly - let (ws_stream, _) = connect_async(request).await?; + let (ws_stream, _) = + connect_async_tls_with_config(request, None, false, tls_connector).await?; ws_stream }; @@ -307,6 +309,9 @@ impl SignalStream { #[cfg(not(feature = "signal-client-tokio"))] let (ws_stream, _) = connect_async(request).await?; + #[cfg(not(feature = "signal-client-tokio"))] + let _ = tls_connector; + let (ws_writer, ws_reader) = ws_stream.split(); let (emitter, events) = mpsc::unbounded_channel(); diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 3ee94088d..090c16833 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -348,7 +348,7 @@ impl From for SignalSdkOptions { } } -#[derive(Debug, Clone)] +#[derive(Clone)] #[non_exhaustive] pub struct RoomOptions { pub auto_subscribe: bool, @@ -361,8 +361,24 @@ pub struct RoomOptions { pub rtc_config: RtcConfiguration, pub join_retries: u32, pub sdk_options: RoomSdkOptions, + pub signal_options: SignalOptions, + pub connector: Option, +} +impl Debug for RoomOptions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RoomOptions") + .field("auto_subscribe", &self.auto_subscribe) + .field("adaptive_stream", &self.adaptive_stream) + .field("dynacast", &self.dynacast) + .field("e2ee", &self.e2ee) + .field("rtc_config", &self.rtc_config) + .field("join_retries", &self.join_retries) + .field("sdk_options", &self.sdk_options) + .field("signal_options", &self.signal_options) + // Exclude connector field as it's not Debug + .finish() + } } - impl Default for RoomOptions { fn default() -> Self { Self { @@ -381,6 +397,8 @@ impl Default for RoomOptions { }, join_retries: 3, sdk_options: RoomSdkOptions::default(), + signal_options: SignalOptions::default(), + connector: None, } } } @@ -474,6 +492,7 @@ impl Room { signal_options.sdk_options = options.sdk_options.clone().into(); signal_options.auto_subscribe = options.auto_subscribe; signal_options.adaptive_stream = options.adaptive_stream; + signal_options.connector = options.connector.clone(); let (rtc_engine, join_response, engine_events) = RtcEngine::connect( url, token, @@ -1120,7 +1139,7 @@ impl RoomSession { } async fn send_sync_state(self: &Arc) { - let auto_subscribe = self.options.auto_subscribe; + let auto_subscribe = self.options.signal_options.auto_subscribe; let session = self.rtc_engine.session(); if session.subscriber().peer_connection().current_local_description().is_none() { @@ -1596,7 +1615,7 @@ impl RoomSession { name, metadata, attributes, - self.options.auto_subscribe, + self.options.signal_options.auto_subscribe, ); participant.on_track_published({