From 588c847e50dd3ba1ae8c54179e371cfd43fdf4a9 Mon Sep 17 00:00:00 2001 From: David Chen Date: Tue, 2 Dec 2025 18:06:06 -0800 Subject: [PATCH 1/7] simulcast ui controls --- examples/local_video/src/subscriber.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index 9d56b67b6..db719ee02 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -60,7 +60,7 @@ struct SharedYuv { #[derive(Clone)] struct SimulcastState { available: bool, - publication: Option, + publication: Option, requested_quality: Option, active_quality: Option, full_dims: Option<(u32, u32)>, @@ -127,7 +127,7 @@ impl eframe::App for VideoApp { }); // Simulcast layer controls: bottom-left overlay - egui::Area::new("simulcast_controls") + egui::Area::new("simulcast_controls".into()) .anchor(egui::Align2::LEFT_BOTTOM, egui::vec2(10.0, -10.0)) .interactable(true) .show(ctx, |ui| { @@ -217,9 +217,10 @@ async fn main() -> Result<()> { let active_sid = Arc::new(Mutex::new(None::)); // Shared simulcast UI/control state let simulcast = Arc::new(Mutex::new(SimulcastState::default())); + let simulcast_events = simulcast.clone(); tokio::spawn(async move { let active_sid = active_sid.clone(); - let simulcast = simulcast.clone(); + let simulcast = simulcast_events; let mut events = room.subscribe(); info!("Subscribed to room events"); while let Some(evt) = events.recv().await { @@ -309,13 +310,11 @@ async fn main() -> Result<()> { { let mut sc = simulcast.lock(); sc.available = publication.simulcasted(); - sc.full_dims = Some(publication.dimension()); + let dim = publication.dimension(); + sc.full_dims = Some((dim.0, dim.1)); sc.requested_quality = None; sc.active_quality = None; - sc.publication = match publication.clone() { - livekit::room::publication::TrackPublication::Remote(rp) => Some(rp), - _ => None, - }; + sc.publication = Some(publication.clone()); } let simulcast2 = simulcast.clone(); std::thread::spawn(move || { From 93c075cedc92ccb91fe52152ae16460e45de9dae Mon Sep 17 00:00:00 2001 From: David Chen Date: Wed, 3 Dec 2025 18:55:41 -0800 Subject: [PATCH 2/7] wip --- examples/local_video/src/publisher.rs | 20 +- examples/local_video/src/subscriber.rs | 8 + examples/wgpu_room/src/logo_track.rs | 3 +- libwebrtc/src/lib.rs | 2 +- libwebrtc/src/native/mod.rs | 1 + libwebrtc/src/native/sensor_timestamp.rs | 186 ++++++++++ libwebrtc/src/native/video_stream.rs | 1 + libwebrtc/src/video_frame.rs | 4 + livekit-ffi/src/server/video_source.rs | 3 +- livekit/src/room/e2ee/manager.rs | 46 ++- livekit/src/room/track/local_video_track.rs | 17 +- livekit/src/room/track/remote_video_track.rs | 20 +- webrtc-sys/build.rs | 2 + webrtc-sys/include/livekit/sensor_timestamp.h | 186 ++++++++++ webrtc-sys/src/lib.rs | 1 + webrtc-sys/src/sensor_timestamp.cpp | 335 ++++++++++++++++++ webrtc-sys/src/sensor_timestamp.rs | 100 ++++++ 17 files changed, 917 insertions(+), 18 deletions(-) create mode 100644 libwebrtc/src/native/sensor_timestamp.rs create mode 100644 webrtc-sys/include/livekit/sensor_timestamp.h create mode 100644 webrtc-sys/src/sensor_timestamp.cpp create mode 100644 webrtc-sys/src/sensor_timestamp.rs diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index d311129b9..fbfffe45f 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -208,7 +208,12 @@ async fn main() -> Result<()> { } // Reusable I420 buffer and frame - let mut frame = VideoFrame { rotation: VideoRotation::VideoRotation0, timestamp_us: 0, buffer: I420Buffer::new(width, height) }; + let mut frame = VideoFrame { + rotation: VideoRotation::VideoRotation0, + timestamp_us: 0, + sensor_timestamp_us: None, + buffer: I420Buffer::new(width, height), + }; let is_yuyv = fmt.format() == FrameFormat::YUYV; info!( "Selected conversion path: {}", @@ -362,6 +367,19 @@ async fn main() -> Result<()> { // Update RTP timestamp (monotonic, microseconds since start) frame.timestamp_us = start_ts.elapsed().as_micros() as i64; + + // Attach a static sensor timestamp for testing and push it into the + // shared queue used by the sensor timestamp transformer. + if let Some(store) = track.sensor_timestamp_store() { + let sensor_ts = frame.timestamp_us + 123_456; // simple fixed offset for visibility + frame.sensor_timestamp_us = Some(sensor_ts); + store.store(frame.timestamp_us, sensor_ts); + info!( + "Publisher: attached sensor_timestamp_us={} for capture_ts={}", + sensor_ts, frame.timestamp_us + ); + } + rtc_source.capture_frame(&frame); let t4 = Instant::now(); diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index db719ee02..250d89f59 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -369,6 +369,14 @@ async fn main() -> Result<()> { std::mem::swap(&mut s.v, &mut v_buf); s.dirty = true; + // Log any parsed sensor timestamp for this frame if available. + if let Some(ts) = video_track.last_sensor_timestamp() { + info!( + "Subscriber: received frame {}x{} with sensor_timestamp_us={}", + w, h, ts + ); + } + frames += 1; let elapsed = last_log.elapsed(); if elapsed >= Duration::from_secs(2) { diff --git a/examples/wgpu_room/src/logo_track.rs b/examples/wgpu_room/src/logo_track.rs index 03b1a0ec4..f284ac4b9 100644 --- a/examples/wgpu_room/src/logo_track.rs +++ b/examples/wgpu_room/src/logo_track.rs @@ -116,8 +116,9 @@ impl LogoTrack { framebuffer: Arc::new(Mutex::new(vec![0u8; FB_WIDTH * FB_HEIGHT * 4])), video_frame: Arc::new(Mutex::new(VideoFrame { rotation: VideoRotation::VideoRotation0, - buffer: I420Buffer::new(FB_WIDTH as u32, FB_HEIGHT as u32), timestamp_us: 0, + sensor_timestamp_us: None, + buffer: I420Buffer::new(FB_WIDTH as u32, FB_HEIGHT as u32), })), pos: (0, 0), direction: (1, 1), diff --git a/libwebrtc/src/lib.rs b/libwebrtc/src/lib.rs index 8dc2e426c..0f372f58c 100644 --- a/libwebrtc/src/lib.rs +++ b/libwebrtc/src/lib.rs @@ -66,7 +66,7 @@ pub mod video_track; pub mod native { pub use webrtc_sys::webrtc::ffi::create_random_uuid; - pub use crate::imp::{apm, audio_resampler, frame_cryptor, yuv_helper}; + pub use crate::imp::{apm, audio_resampler, frame_cryptor, sensor_timestamp, yuv_helper}; } #[cfg(target_os = "android")] diff --git a/libwebrtc/src/native/mod.rs b/libwebrtc/src/native/mod.rs index b91005f20..679937971 100644 --- a/libwebrtc/src/native/mod.rs +++ b/libwebrtc/src/native/mod.rs @@ -36,6 +36,7 @@ pub mod video_source; pub mod video_stream; pub mod video_track; pub mod yuv_helper; +pub mod sensor_timestamp; use webrtc_sys::{rtc_error as sys_err, webrtc as sys_rtc}; diff --git a/libwebrtc/src/native/sensor_timestamp.rs b/libwebrtc/src/native/sensor_timestamp.rs new file mode 100644 index 000000000..c6d9e3652 --- /dev/null +++ b/libwebrtc/src/native/sensor_timestamp.rs @@ -0,0 +1,186 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sensor timestamp support for end-to-end timestamp propagation. +//! +//! This module provides functionality to embed sensor/hardware timestamps +//! in encoded video frames as trailers. The timestamps are preserved +//! through the WebRTC pipeline and can be extracted on the receiver side. +//! +//! This works independently of e2ee encryption - timestamps can be +//! embedded even when encryption is disabled. + +use cxx::SharedPtr; +use webrtc_sys::sensor_timestamp::ffi as sys_st; + +use crate::{ + peer_connection_factory::PeerConnectionFactory, + rtp_receiver::RtpReceiver, + rtp_sender::RtpSender, +}; + +/// Thread-safe store for mapping capture timestamps to sensor timestamps. +/// +/// Used on the sender side to correlate video frame capture time with +/// the sensor timestamp that should be embedded in the encoded frame. +#[derive(Clone)] +pub struct SensorTimestampStore { + sys_handle: SharedPtr, +} + +impl SensorTimestampStore { + /// Create a new sensor timestamp store. + pub fn new() -> Self { + Self { + sys_handle: sys_st::new_sensor_timestamp_store(), + } + } + + /// Store a sensor timestamp associated with a capture timestamp. + /// + /// Call this when capturing a video frame with a sensor timestamp. + /// The `capture_timestamp_us` should match the `timestamp_us` field + /// of the VideoFrame. + pub fn store(&self, capture_timestamp_us: i64, sensor_timestamp_us: i64) { + self.sys_handle.store(capture_timestamp_us, sensor_timestamp_us); + } + + /// Lookup a sensor timestamp by capture timestamp (for debugging). + /// Returns None if not found. + pub fn lookup(&self, capture_timestamp_us: i64) -> Option { + let result = self.sys_handle.lookup(capture_timestamp_us); + if result < 0 { + None + } else { + Some(result) + } + } + + /// Pop the oldest sensor timestamp from the queue. + /// Returns None if the queue is empty. + pub fn pop(&self) -> Option { + let result = self.sys_handle.pop(); + if result < 0 { + None + } else { + Some(result) + } + } + + /// Peek at the oldest sensor timestamp without removing it. + /// Returns None if the queue is empty. + pub fn peek(&self) -> Option { + let result = self.sys_handle.peek(); + if result < 0 { + None + } else { + Some(result) + } + } + + /// Clear old entries (older than the given threshold in microseconds). + pub fn prune(&self, max_age_us: i64) { + self.sys_handle.prune(max_age_us); + } + + pub(crate) fn sys_handle(&self) -> SharedPtr { + self.sys_handle.clone() + } +} + +impl Default for SensorTimestampStore { + fn default() -> Self { + Self::new() + } +} + +/// Handler for sensor timestamp embedding/extraction on RTP streams. +/// +/// For sender side: Embeds sensor timestamps as 12-byte trailers on +/// encoded frames before they are sent. +/// +/// For receiver side: Extracts sensor timestamps from received frames +/// and makes them available for retrieval. +#[derive(Clone)] +pub struct SensorTimestampHandler { + sys_handle: SharedPtr, +} + +impl SensorTimestampHandler { + /// Enable or disable timestamp embedding/extraction. + pub fn set_enabled(&self, enabled: bool) { + self.sys_handle.set_enabled(enabled); + } + + /// Check if timestamp embedding/extraction is enabled. + pub fn enabled(&self) -> bool { + self.sys_handle.enabled() + } + + /// Get the last received sensor timestamp (receiver side only). + /// Returns None if no timestamp has been received yet. + pub fn last_sensor_timestamp(&self) -> Option { + if self.sys_handle.has_sensor_timestamp() { + let ts = self.sys_handle.last_sensor_timestamp(); + if ts >= 0 { + Some(ts) + } else { + None + } + } else { + None + } + } + + pub(crate) fn sys_handle(&self) -> SharedPtr { + self.sys_handle.clone() + } +} + +/// Create a sender-side sensor timestamp handler. +/// +/// This handler will embed sensor timestamps from the provided store +/// into encoded frames before they are packetized and sent. +pub fn create_sender_handler( + peer_factory: &PeerConnectionFactory, + store: &SensorTimestampStore, + sender: &RtpSender, +) -> SensorTimestampHandler { + SensorTimestampHandler { + sys_handle: sys_st::new_sensor_timestamp_sender( + peer_factory.handle.sys_handle.clone(), + store.sys_handle(), + sender.handle.sys_handle.clone(), + ), + } +} + +/// Create a receiver-side sensor timestamp handler. +/// +/// This handler will extract sensor timestamps from received frames +/// and make them available via `last_sensor_timestamp()`. +pub fn create_receiver_handler( + peer_factory: &PeerConnectionFactory, + store: &SensorTimestampStore, + receiver: &RtpReceiver, +) -> SensorTimestampHandler { + SensorTimestampHandler { + sys_handle: sys_st::new_sensor_timestamp_receiver( + peer_factory.handle.sys_handle.clone(), + store.sys_handle(), + receiver.handle.sys_handle.clone(), + ), + } +} + diff --git a/libwebrtc/src/native/video_stream.rs b/libwebrtc/src/native/video_stream.rs index 07774f87b..a2ab70f4e 100644 --- a/libwebrtc/src/native/video_stream.rs +++ b/libwebrtc/src/native/video_stream.rs @@ -84,6 +84,7 @@ impl sys_vt::VideoSink for VideoTrackObserver { let _ = self.frame_tx.send(VideoFrame { rotation: frame.rotation().into(), timestamp_us: frame.timestamp_us(), + sensor_timestamp_us: None, buffer: new_video_frame_buffer(unsafe { frame.video_frame_buffer() }), }); } diff --git a/libwebrtc/src/video_frame.rs b/libwebrtc/src/video_frame.rs index 926b45572..5d60659d8 100644 --- a/libwebrtc/src/video_frame.rs +++ b/libwebrtc/src/video_frame.rs @@ -59,6 +59,10 @@ where { pub rotation: VideoRotation, pub timestamp_us: i64, // When the frame was captured in microseconds + /// Optional sensor timestamp in microseconds, if available. + /// This is typically a hardware or device timestamp that can be + /// propagated end-to-end through the media pipeline. + pub sensor_timestamp_us: Option, pub buffer: T, } diff --git a/livekit-ffi/src/server/video_source.rs b/livekit-ffi/src/server/video_source.rs index 5af7d9a38..0eca541ce 100644 --- a/livekit-ffi/src/server/video_source.rs +++ b/livekit-ffi/src/server/video_source.rs @@ -64,8 +64,9 @@ impl FfiVideoSource { let buffer = colorcvt::to_libwebrtc_buffer(capture.buffer.clone()); let frame = VideoFrame { rotation: capture.rotation().into(), - timestamp_us: capture.timestamp_us, buffer, + timestamp_us: capture.timestamp_us, + sensor_timestamp_us: None, }; source.capture_frame(&frame); diff --git a/livekit/src/room/e2ee/manager.rs b/livekit/src/room/e2ee/manager.rs index 1e583b9c4..38bcb804f 100644 --- a/livekit/src/room/e2ee/manager.rs +++ b/livekit/src/room/e2ee/manager.rs @@ -15,8 +15,11 @@ use std::{collections::HashMap, sync::Arc}; use libwebrtc::{ - native::frame_cryptor::{ - DataPacketCryptor, EncryptedPacket, EncryptionAlgorithm, EncryptionState, FrameCryptor, + native::{ + frame_cryptor::{ + DataPacketCryptor, EncryptedPacket, EncryptionAlgorithm, EncryptionState, FrameCryptor, + }, + sensor_timestamp::{self, SensorTimestampStore}, }, rtp_receiver::RtpReceiver, rtp_sender::RtpSender, @@ -98,16 +101,24 @@ impl E2eeManager { publication: RemoteTrackPublication, participant: RemoteParticipant, ) { - if !self.initialized() { - return; + let identity = participant.identity(); + let receiver = track.transceiver().unwrap().receiver(); + // Always set up sensor timestamp extraction for remote video tracks. + if let RemoteTrack::Video(video_track) = &track { + let store = SensorTimestampStore::new(); + let handler = sensor_timestamp::create_receiver_handler( + LkRuntime::instance().pc_factory(), + &store, + &receiver, + ); + video_track.set_sensor_timestamp_handler(handler); } - if publication.encryption_type() == EncryptionType::None { + // E2EE frame cryptor is only created when encryption is configured. + if !self.initialized() || publication.encryption_type() == EncryptionType::None { return; } - let identity = participant.identity(); - let receiver = track.transceiver().unwrap().receiver(); let frame_cryptor = self.setup_rtp_receiver(&identity, receiver); self.setup_cryptor(&frame_cryptor); @@ -122,16 +133,27 @@ impl E2eeManager { publication: LocalTrackPublication, participant: LocalParticipant, ) { - if !self.initialized() { - return; + let identity = participant.identity(); + let sender = track.transceiver().unwrap().sender(); + // Always set up sensor timestamp embedding for local video tracks. + if let LocalTrack::Video(video_track) = &track { + let store = SensorTimestampStore::new(); + video_track.set_sensor_timestamp_store(store.clone()); + let _handler = sensor_timestamp::create_sender_handler( + LkRuntime::instance().pc_factory(), + &store, + &sender, + ); + // We rely on the underlying WebRTC objects to hold references to the + // transformer; the Rust-side handler object is kept only for lifetime + // management when needed. } - if publication.encryption_type() == EncryptionType::None { + // E2EE frame cryptor is only created when encryption is configured. + if !self.initialized() || publication.encryption_type() == EncryptionType::None { return; } - let identity = participant.identity(); - let sender = track.transceiver().unwrap().sender(); let frame_cryptor = self.setup_rtp_sender(&identity, sender); self.setup_cryptor(&frame_cryptor); diff --git a/livekit/src/room/track/local_video_track.rs b/livekit/src/room/track/local_video_track.rs index c7c26649b..e99158fc5 100644 --- a/livekit/src/room/track/local_video_track.rs +++ b/livekit/src/room/track/local_video_track.rs @@ -14,8 +14,9 @@ use std::{fmt::Debug, sync::Arc}; -use libwebrtc::{prelude::*, stats::RtcStats}; +use libwebrtc::{native::sensor_timestamp::SensorTimestampStore, prelude::*, stats::RtcStats}; use livekit_protocol as proto; +use parking_lot::Mutex; use super::TrackInner; use crate::{prelude::*, rtc_engine::lk_runtime::LkRuntime}; @@ -24,6 +25,7 @@ use crate::{prelude::*, rtc_engine::lk_runtime::LkRuntime}; pub struct LocalVideoTrack { inner: Arc, source: RtcVideoSource, + sensor_timestamp_store: Arc>>, } impl Debug for LocalVideoTrack { @@ -46,6 +48,7 @@ impl LocalVideoTrack { MediaStreamTrack::Video(rtc_track), )), source, + sensor_timestamp_store: Arc::new(Mutex::new(None)), } } @@ -123,6 +126,18 @@ impl LocalVideoTrack { self.source.clone() } + /// Returns the sensor timestamp store associated with this track, if any. + /// When present, callers can push per-frame sensor timestamps into the + /// outgoing queue which will then be embedded into encoded frames. + pub fn sensor_timestamp_store(&self) -> Option { + self.sensor_timestamp_store.lock().clone() + } + + /// Internal: set the sensor timestamp store used for this track. + pub(crate) fn set_sensor_timestamp_store(&self, store: SensorTimestampStore) { + *self.sensor_timestamp_store.lock() = Some(store); + } + pub async fn get_stats(&self) -> RoomResult> { super::local_track::get_stats(&self.inner).await } diff --git a/livekit/src/room/track/remote_video_track.rs b/livekit/src/room/track/remote_video_track.rs index 2076a3b1c..5688ce6b9 100644 --- a/livekit/src/room/track/remote_video_track.rs +++ b/livekit/src/room/track/remote_video_track.rs @@ -14,8 +14,9 @@ use std::{fmt::Debug, sync::Arc}; -use libwebrtc::{prelude::*, stats::RtcStats}; +use libwebrtc::{native::sensor_timestamp::SensorTimestampHandler, prelude::*, stats::RtcStats}; use livekit_protocol as proto; +use parking_lot::Mutex; use super::{remote_track, TrackInner}; use crate::prelude::*; @@ -23,6 +24,7 @@ use crate::prelude::*; #[derive(Clone)] pub struct RemoteVideoTrack { inner: Arc, + sensor_timestamp_handler: Arc>>, } impl Debug for RemoteVideoTrack { @@ -44,6 +46,7 @@ impl RemoteVideoTrack { TrackKind::Video, MediaStreamTrack::Video(rtc_track), )), + sensor_timestamp_handler: Arc::new(Mutex::new(None)), } } @@ -94,6 +97,21 @@ impl RemoteVideoTrack { true } + /// Returns the last parsed sensor timestamp (in microseconds) for this + /// remote video track, if the sensor timestamp transformer is enabled and + /// a timestamp has been received. + pub fn last_sensor_timestamp(&self) -> Option { + self.sensor_timestamp_handler + .lock() + .as_ref() + .and_then(|h| h.last_sensor_timestamp()) + } + + /// Internal: set the handler that extracts sensor timestamps for this track. + pub(crate) fn set_sensor_timestamp_handler(&self, handler: SensorTimestampHandler) { + self.sensor_timestamp_handler.lock().replace(handler); + } + pub async fn get_stats(&self) -> RoomResult> { super::remote_track::get_stats(&self.inner).await } diff --git a/webrtc-sys/build.rs b/webrtc-sys/build.rs index 87213e82a..6ca57602a 100644 --- a/webrtc-sys/build.rs +++ b/webrtc-sys/build.rs @@ -49,6 +49,7 @@ fn main() { "src/android.rs", "src/prohibit_libsrtp_initialization.rs", "src/apm.rs", + "src/sensor_timestamp.rs", ]); builder.files(&[ @@ -77,6 +78,7 @@ fn main() { "src/global_task_queue.cpp", "src/prohibit_libsrtp_initialization.cpp", "src/apm.cpp", + "src/sensor_timestamp.cpp", ]); let webrtc_dir = webrtc_sys_build::webrtc_dir(); diff --git a/webrtc-sys/include/livekit/sensor_timestamp.h b/webrtc-sys/include/livekit/sensor_timestamp.h new file mode 100644 index 000000000..ef2262d32 --- /dev/null +++ b/webrtc-sys/include/livekit/sensor_timestamp.h @@ -0,0 +1,186 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "absl/types/optional.h" +#include "api/frame_transformer_interface.h" +#include "api/scoped_refptr.h" +#include "livekit/peer_connection.h" +#include "livekit/peer_connection_factory.h" +#include "livekit/rtp_receiver.h" +#include "livekit/rtp_sender.h" +#include "livekit/webrtc.h" +#include "rtc_base/synchronization/mutex.h" +#include "rust/cxx.h" + +namespace livekit { + +// Magic bytes to identify sensor timestamp trailers: "LKTS" (LiveKit TimeStamp) +constexpr uint8_t kSensorTimestampMagic[4] = {'L', 'K', 'T', 'S'}; +constexpr size_t kSensorTimestampTrailerSize = 12; // 8 bytes timestamp + 4 bytes magic + +/// Thread-safe FIFO queue for sensor timestamps. +/// Used on the sender side to pass sensor timestamps to the transformer. +/// Works on the assumption that frames are captured and encoded in order. +class SensorTimestampStore { + public: + SensorTimestampStore() = default; + ~SensorTimestampStore() = default; + + /// Push a sensor timestamp to the queue. + /// Call this when capturing a video frame with a sensor timestamp. + void store(int64_t capture_timestamp_us, int64_t sensor_timestamp_us) const; + + /// Pop and return the next sensor timestamp from the queue. + /// Returns -1 if the queue is empty. + int64_t lookup(int64_t capture_timestamp_us) const; + + /// Pop the oldest entry if the queue has entries. + /// Returns the sensor timestamp, or -1 if empty. + int64_t pop() const; + + /// Peek at the oldest entry without removing it. + /// Returns the sensor timestamp, or -1 if empty. + int64_t peek() const; + + /// Clear old entries (older than the given threshold in microseconds). + void prune(int64_t max_age_us) const; + + private: + mutable webrtc::Mutex mutex_; + struct Entry { + int64_t capture_timestamp_us; + int64_t sensor_timestamp_us; + }; + mutable std::deque entries_; + static constexpr size_t kMaxEntries = 300; // ~10 seconds at 30fps +}; + +/// Frame transformer that appends/extracts sensor timestamp trailers. +/// This transformer can be used standalone or in conjunction with e2ee. +class SensorTimestampTransformer + : public webrtc::FrameTransformerInterface { + public: + enum class Direction { kSend, kReceive }; + + SensorTimestampTransformer(Direction direction, + std::shared_ptr store); + ~SensorTimestampTransformer() override = default; + + // FrameTransformerInterface implementation + void Transform( + std::unique_ptr frame) override; + void RegisterTransformedFrameCallback( + rtc::scoped_refptr callback) override; + void RegisterTransformedFrameSinkCallback( + rtc::scoped_refptr callback, + uint32_t ssrc) override; + void UnregisterTransformedFrameCallback() override; + void UnregisterTransformedFrameSinkCallback(uint32_t ssrc) override; + + /// Enable/disable timestamp embedding + void set_enabled(bool enabled); + bool enabled() const; + + /// Get the last received sensor timestamp (receiver side only) + std::optional last_sensor_timestamp() const; + + private: + void TransformSend( + std::unique_ptr frame); + void TransformReceive( + std::unique_ptr frame); + + /// Append sensor timestamp trailer to frame data + std::vector AppendTimestampTrailer( + rtc::ArrayView data, + int64_t sensor_timestamp_us); + + /// Extract and remove sensor timestamp trailer from frame data + /// Returns the sensor timestamp if found, nullopt otherwise + std::optional ExtractTimestampTrailer( + rtc::ArrayView data, + std::vector& out_data); + + const Direction direction_; + std::shared_ptr store_; + std::atomic enabled_{true}; + mutable webrtc::Mutex mutex_; + rtc::scoped_refptr callback_; + std::unordered_map> + sink_callbacks_; + mutable std::atomic last_sensor_timestamp_{0}; + mutable std::atomic has_last_sensor_timestamp_{false}; +}; + +/// Wrapper class for Rust FFI that manages sensor timestamp transformers. +class SensorTimestampHandler { + public: + SensorTimestampHandler(std::shared_ptr rtc_runtime, + std::shared_ptr store, + rtc::scoped_refptr sender); + + SensorTimestampHandler(std::shared_ptr rtc_runtime, + std::shared_ptr store, + rtc::scoped_refptr receiver); + + ~SensorTimestampHandler() = default; + + /// Enable/disable timestamp embedding + void set_enabled(bool enabled) const; + bool enabled() const; + + /// Get the last received sensor timestamp (receiver side only) + /// Returns -1 if no timestamp has been received yet + int64_t last_sensor_timestamp() const; + + /// Check if a sensor timestamp has been received + bool has_sensor_timestamp() const; + + private: + std::shared_ptr rtc_runtime_; + rtc::scoped_refptr transformer_; + rtc::scoped_refptr sender_; + rtc::scoped_refptr receiver_; +}; + +// Factory functions for Rust FFI +std::shared_ptr new_sensor_timestamp_store(); + +std::shared_ptr new_sensor_timestamp_sender( + std::shared_ptr peer_factory, + std::shared_ptr store, + std::shared_ptr sender); + +std::shared_ptr new_sensor_timestamp_receiver( + std::shared_ptr peer_factory, + std::shared_ptr store, + std::shared_ptr receiver); + +} // namespace livekit + diff --git a/webrtc-sys/src/lib.rs b/webrtc-sys/src/lib.rs index 2c3c94faf..a21a8726f 100644 --- a/webrtc-sys/src/lib.rs +++ b/webrtc-sys/src/lib.rs @@ -32,6 +32,7 @@ pub mod rtp_parameters; pub mod rtp_receiver; pub mod rtp_sender; pub mod rtp_transceiver; +pub mod sensor_timestamp; pub mod video_frame; pub mod video_frame_buffer; pub mod video_track; diff --git a/webrtc-sys/src/sensor_timestamp.cpp b/webrtc-sys/src/sensor_timestamp.cpp new file mode 100644 index 000000000..512a4ce50 --- /dev/null +++ b/webrtc-sys/src/sensor_timestamp.cpp @@ -0,0 +1,335 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/sensor_timestamp.h" + +#include +#include +#include + +#include "api/make_ref_counted.h" +#include "livekit/peer_connection_factory.h" +#include "rtc_base/logging.h" +#include "webrtc-sys/src/sensor_timestamp.rs.h" + +namespace livekit { + +// SensorTimestampStore implementation + +void SensorTimestampStore::store(int64_t capture_timestamp_us, + int64_t sensor_timestamp_us) const { + webrtc::MutexLock lock(&mutex_); + + // Remove old entries if we're at capacity + while (entries_.size() >= kMaxEntries) { + entries_.pop_front(); + } + + entries_.push_back({capture_timestamp_us, sensor_timestamp_us}); +} + +int64_t SensorTimestampStore::lookup(int64_t capture_timestamp_us) const { + webrtc::MutexLock lock(&mutex_); + + // Search from the end (most recent) for better performance + for (auto it = entries_.rbegin(); it != entries_.rend(); ++it) { + if (it->capture_timestamp_us == capture_timestamp_us) { + return it->sensor_timestamp_us; + } + } + + return -1; +} + +int64_t SensorTimestampStore::pop() const { + webrtc::MutexLock lock(&mutex_); + + if (entries_.empty()) { + return -1; + } + + int64_t sensor_ts = entries_.front().sensor_timestamp_us; + entries_.pop_front(); + return sensor_ts; +} + +int64_t SensorTimestampStore::peek() const { + webrtc::MutexLock lock(&mutex_); + + if (entries_.empty()) { + return -1; + } + + return entries_.front().sensor_timestamp_us; +} + +void SensorTimestampStore::prune(int64_t max_age_us) const { + webrtc::MutexLock lock(&mutex_); + + if (entries_.empty()) { + return; + } + + int64_t newest_timestamp = entries_.back().capture_timestamp_us; + int64_t threshold = newest_timestamp - max_age_us; + + while (!entries_.empty() && + entries_.front().capture_timestamp_us < threshold) { + entries_.pop_front(); + } +} + +// SensorTimestampTransformer implementation + +SensorTimestampTransformer::SensorTimestampTransformer( + Direction direction, + std::shared_ptr store) + : direction_(direction), store_(store) {} + +void SensorTimestampTransformer::Transform( + std::unique_ptr frame) { + if (!enabled_.load()) { + // Pass through without modification + webrtc::MutexLock lock(&mutex_); + if (callback_) { + callback_->OnTransformedFrame(std::move(frame)); + } + return; + } + + if (direction_ == Direction::kSend) { + TransformSend(std::move(frame)); + } else { + TransformReceive(std::move(frame)); + } +} + +void SensorTimestampTransformer::TransformSend( + std::unique_ptr frame) { + // Get the RTP timestamp from the frame for logging + uint32_t rtp_timestamp = frame->GetTimestamp(); + + auto data = frame->GetData(); + + // Pop the next sensor timestamp from the queue. + // This assumes frames are captured and encoded in order (FIFO). + int64_t ts_to_embed = 0; + + if (store_) { + int64_t popped_ts = store_->pop(); + if (popped_ts >= 0) { + ts_to_embed = popped_ts; + } + } + + // Always append trailer when enabled (even if timestamp is 0, + // which indicates no sensor timestamp was set for this frame) + std::vector new_data; + if (enabled_.load()) { + new_data = AppendTimestampTrailer(data, ts_to_embed); + frame->SetData(rtc::ArrayView(new_data)); + + RTC_LOG(LS_VERBOSE) << "SensorTimestampTransformer: Appended timestamp trailer" + << " ts=" << ts_to_embed + << " rtp_ts=" << rtp_timestamp + << " data_size=" << new_data.size(); + } + + webrtc::MutexLock lock(&mutex_); + if (callback_) { + callback_->OnTransformedFrame(std::move(frame)); + } +} + +void SensorTimestampTransformer::TransformReceive( + std::unique_ptr frame) { + auto data = frame->GetData(); + std::vector stripped_data; + + auto sensor_ts = ExtractTimestampTrailer(data, stripped_data); + + if (sensor_ts.has_value()) { + // Store the extracted timestamp for later retrieval + last_sensor_timestamp_.store(sensor_ts.value()); + has_last_sensor_timestamp_.store(true); + + // Update frame with stripped data + frame->SetData(rtc::ArrayView(stripped_data)); + + RTC_LOG(LS_VERBOSE) << "SensorTimestampTransformer: Extracted timestamp trailer" + << " ts=" << sensor_ts.value() + << " rtp_ts=" << frame->GetTimestamp() + << " stripped_size=" << stripped_data.size(); + } + + webrtc::MutexLock lock(&mutex_); + if (callback_) { + callback_->OnTransformedFrame(std::move(frame)); + } +} + +std::vector SensorTimestampTransformer::AppendTimestampTrailer( + rtc::ArrayView data, + int64_t sensor_timestamp_us) { + std::vector result; + result.reserve(data.size() + kSensorTimestampTrailerSize); + + // Copy original data + result.insert(result.end(), data.begin(), data.end()); + + // Append timestamp (big-endian) + for (int i = 7; i >= 0; --i) { + result.push_back(static_cast((sensor_timestamp_us >> (i * 8)) & 0xFF)); + } + + // Append magic bytes + result.insert(result.end(), std::begin(kSensorTimestampMagic), + std::end(kSensorTimestampMagic)); + + return result; +} + +std::optional SensorTimestampTransformer::ExtractTimestampTrailer( + rtc::ArrayView data, + std::vector& out_data) { + if (data.size() < kSensorTimestampTrailerSize) { + out_data.assign(data.begin(), data.end()); + return std::nullopt; + } + + // Check for magic bytes at the end + const uint8_t* magic_start = data.data() + data.size() - 4; + if (std::memcmp(magic_start, kSensorTimestampMagic, 4) != 0) { + out_data.assign(data.begin(), data.end()); + return std::nullopt; + } + + // Extract timestamp (big-endian) + const uint8_t* ts_start = data.data() + data.size() - kSensorTimestampTrailerSize; + int64_t timestamp = 0; + for (int i = 0; i < 8; ++i) { + timestamp = (timestamp << 8) | ts_start[i]; + } + + // Copy data without trailer + out_data.assign(data.begin(), data.end() - kSensorTimestampTrailerSize); + + return timestamp; +} + +void SensorTimestampTransformer::RegisterTransformedFrameCallback( + rtc::scoped_refptr callback) { + webrtc::MutexLock lock(&mutex_); + callback_ = callback; +} + +void SensorTimestampTransformer::RegisterTransformedFrameSinkCallback( + rtc::scoped_refptr callback, + uint32_t ssrc) { + webrtc::MutexLock lock(&mutex_); + sink_callbacks_[ssrc] = callback; +} + +void SensorTimestampTransformer::UnregisterTransformedFrameCallback() { + webrtc::MutexLock lock(&mutex_); + callback_ = nullptr; +} + +void SensorTimestampTransformer::UnregisterTransformedFrameSinkCallback( + uint32_t ssrc) { + webrtc::MutexLock lock(&mutex_); + sink_callbacks_.erase(ssrc); +} + +void SensorTimestampTransformer::set_enabled(bool enabled) { + enabled_.store(enabled); +} + +bool SensorTimestampTransformer::enabled() const { + return enabled_.load(); +} + +std::optional SensorTimestampTransformer::last_sensor_timestamp() + const { + if (!has_last_sensor_timestamp_.load()) { + return std::nullopt; + } + return last_sensor_timestamp_.load(); +} + +// SensorTimestampHandler implementation + +SensorTimestampHandler::SensorTimestampHandler( + std::shared_ptr rtc_runtime, + std::shared_ptr store, + rtc::scoped_refptr sender) + : rtc_runtime_(rtc_runtime), sender_(sender) { + transformer_ = rtc::make_ref_counted( + SensorTimestampTransformer::Direction::kSend, store); + sender->SetEncoderToPacketizerFrameTransformer(transformer_); +} + +SensorTimestampHandler::SensorTimestampHandler( + std::shared_ptr rtc_runtime, + std::shared_ptr store, + rtc::scoped_refptr receiver) + : rtc_runtime_(rtc_runtime), receiver_(receiver) { + transformer_ = rtc::make_ref_counted( + SensorTimestampTransformer::Direction::kReceive, store); + receiver->SetDepacketizerToDecoderFrameTransformer(transformer_); +} + +void SensorTimestampHandler::set_enabled(bool enabled) const { + transformer_->set_enabled(enabled); +} + +bool SensorTimestampHandler::enabled() const { + return transformer_->enabled(); +} + +int64_t SensorTimestampHandler::last_sensor_timestamp() const { + auto ts = transformer_->last_sensor_timestamp(); + return ts.value_or(-1); +} + +bool SensorTimestampHandler::has_sensor_timestamp() const { + return transformer_->last_sensor_timestamp().has_value(); +} + +// Factory functions + +std::shared_ptr new_sensor_timestamp_store() { + return std::make_shared(); +} + +std::shared_ptr new_sensor_timestamp_sender( + std::shared_ptr peer_factory, + std::shared_ptr store, + std::shared_ptr sender) { + return std::make_shared( + peer_factory->rtc_runtime(), store, sender->rtc_sender()); +} + +std::shared_ptr new_sensor_timestamp_receiver( + std::shared_ptr peer_factory, + std::shared_ptr store, + std::shared_ptr receiver) { + return std::make_shared( + peer_factory->rtc_runtime(), store, receiver->rtc_receiver()); +} + +} // namespace livekit + diff --git a/webrtc-sys/src/sensor_timestamp.rs b/webrtc-sys/src/sensor_timestamp.rs new file mode 100644 index 000000000..5a1a71ac7 --- /dev/null +++ b/webrtc-sys/src/sensor_timestamp.rs @@ -0,0 +1,100 @@ +// Copyright 2025 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::impl_thread_safety; + +#[cxx::bridge(namespace = "livekit")] +pub mod ffi { + unsafe extern "C++" { + include!("livekit/sensor_timestamp.h"); + include!("livekit/rtp_sender.h"); + include!("livekit/rtp_receiver.h"); + include!("livekit/peer_connection_factory.h"); + + type RtpSender = crate::rtp_sender::ffi::RtpSender; + type RtpReceiver = crate::rtp_receiver::ffi::RtpReceiver; + type PeerConnectionFactory = crate::peer_connection_factory::ffi::PeerConnectionFactory; + + /// Thread-safe store for mapping capture timestamps to sensor timestamps. + pub type SensorTimestampStore; + + /// Push a sensor timestamp to the queue. + fn store(self: &SensorTimestampStore, capture_timestamp_us: i64, sensor_timestamp_us: i64); + + /// Lookup a sensor timestamp by capture timestamp (for debugging). + /// Returns -1 if not found. + fn lookup(self: &SensorTimestampStore, capture_timestamp_us: i64) -> i64; + + /// Pop the oldest sensor timestamp from the queue. + /// Returns -1 if empty. + fn pop(self: &SensorTimestampStore) -> i64; + + /// Peek at the oldest sensor timestamp without removing it. + /// Returns -1 if empty. + fn peek(self: &SensorTimestampStore) -> i64; + + /// Clear old entries. + fn prune(self: &SensorTimestampStore, max_age_us: i64); + + /// Create a new sensor timestamp store. + fn new_sensor_timestamp_store() -> SharedPtr; + } + + unsafe extern "C++" { + include!("livekit/sensor_timestamp.h"); + + /// Handler for sensor timestamp embedding/extraction on RTP streams. + pub type SensorTimestampHandler; + + /// Enable/disable timestamp embedding. + fn set_enabled(self: &SensorTimestampHandler, enabled: bool); + + /// Check if timestamp embedding is enabled. + fn enabled(self: &SensorTimestampHandler) -> bool; + + /// Get the last received sensor timestamp (receiver side only). + /// Returns -1 if no timestamp has been received yet. + fn last_sensor_timestamp(self: &SensorTimestampHandler) -> i64; + + /// Check if a sensor timestamp has been received. + fn has_sensor_timestamp(self: &SensorTimestampHandler) -> bool; + + /// Create a new sensor timestamp handler for a sender. + fn new_sensor_timestamp_sender( + peer_factory: SharedPtr, + store: SharedPtr, + sender: SharedPtr, + ) -> SharedPtr; + + /// Create a new sensor timestamp handler for a receiver. + fn new_sensor_timestamp_receiver( + peer_factory: SharedPtr, + store: SharedPtr, + receiver: SharedPtr, + ) -> SharedPtr; + } +} + +impl_thread_safety!(ffi::SensorTimestampStore, Send + Sync); +impl_thread_safety!(ffi::SensorTimestampHandler, Send + Sync); + +#[cfg(test)] +mod tests { + #[test] + fn test_sensor_timestamp_store_creation() { + // Basic test to ensure the store can be created + // Full testing requires a running WebRTC context + } +} + From b11cf986640c3599e306817d31b357f621f02aad Mon Sep 17 00:00:00 2001 From: David Chen Date: Wed, 3 Dec 2025 19:33:39 -0800 Subject: [PATCH 3/7] working ex --- libwebrtc/src/native/sensor_timestamp.rs | 11 ++ webrtc-sys/src/sensor_timestamp.cpp | 176 ++++++++++++++++++----- 2 files changed, 153 insertions(+), 34 deletions(-) diff --git a/libwebrtc/src/native/sensor_timestamp.rs b/libwebrtc/src/native/sensor_timestamp.rs index c6d9e3652..519cb6b96 100644 --- a/libwebrtc/src/native/sensor_timestamp.rs +++ b/libwebrtc/src/native/sensor_timestamp.rs @@ -53,6 +53,12 @@ impl SensorTimestampStore { /// The `capture_timestamp_us` should match the `timestamp_us` field /// of the VideoFrame. pub fn store(&self, capture_timestamp_us: i64, sensor_timestamp_us: i64) { + log::info!( + target: "sensor_timestamp", + "store: capture_ts_us={}, sensor_ts_us={}", + capture_timestamp_us, + sensor_timestamp_us + ); self.sys_handle.store(capture_timestamp_us, sensor_timestamp_us); } @@ -134,6 +140,11 @@ impl SensorTimestampHandler { if self.sys_handle.has_sensor_timestamp() { let ts = self.sys_handle.last_sensor_timestamp(); if ts >= 0 { + log::info!( + target: "sensor_timestamp", + "last_sensor_timestamp: {}", + ts + ); Some(ts) } else { None diff --git a/webrtc-sys/src/sensor_timestamp.cpp b/webrtc-sys/src/sensor_timestamp.cpp index 512a4ce50..36c333b86 100644 --- a/webrtc-sys/src/sensor_timestamp.cpp +++ b/webrtc-sys/src/sensor_timestamp.cpp @@ -32,13 +32,16 @@ namespace livekit { void SensorTimestampStore::store(int64_t capture_timestamp_us, int64_t sensor_timestamp_us) const { webrtc::MutexLock lock(&mutex_); - + // Remove old entries if we're at capacity while (entries_.size() >= kMaxEntries) { entries_.pop_front(); } - + entries_.push_back({capture_timestamp_us, sensor_timestamp_us}); + RTC_LOG(LS_INFO) << "SensorTimestampStore::store capture_ts_us=" << capture_timestamp_us + << " sensor_ts_us=" << sensor_timestamp_us + << " size=" << entries_.size(); } int64_t SensorTimestampStore::lookup(int64_t capture_timestamp_us) const { @@ -56,13 +59,16 @@ int64_t SensorTimestampStore::lookup(int64_t capture_timestamp_us) const { int64_t SensorTimestampStore::pop() const { webrtc::MutexLock lock(&mutex_); - + if (entries_.empty()) { + RTC_LOG(LS_INFO) << "SensorTimestampStore::pop empty"; return -1; } - + int64_t sensor_ts = entries_.front().sensor_timestamp_us; entries_.pop_front(); + RTC_LOG(LS_INFO) << "SensorTimestampStore::pop sensor_ts_us=" << sensor_ts + << " remaining=" << entries_.size(); return sensor_ts; } @@ -97,15 +103,43 @@ void SensorTimestampStore::prune(int64_t max_age_us) const { SensorTimestampTransformer::SensorTimestampTransformer( Direction direction, std::shared_ptr store) - : direction_(direction), store_(store) {} + : direction_(direction), store_(store) { + RTC_LOG(LS_INFO) << "SensorTimestampTransformer created direction=" + << (direction_ == Direction::kSend ? "send" : "recv"); +} void SensorTimestampTransformer::Transform( std::unique_ptr frame) { + uint32_t ssrc = frame->GetSsrc(); + uint32_t rtp_timestamp = frame->GetTimestamp(); + if (!enabled_.load()) { - // Pass through without modification - webrtc::MutexLock lock(&mutex_); - if (callback_) { - callback_->OnTransformedFrame(std::move(frame)); + // Pass through without modification, but still log basic info so we know + // frames are flowing through the transformer. + RTC_LOG(LS_INFO) << "SensorTimestampTransformer::Transform (disabled)" + << " direction=" + << (direction_ == Direction::kSend ? "send" : "recv") + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp; + + rtc::scoped_refptr cb; + { + webrtc::MutexLock lock(&mutex_); + auto it = sink_callbacks_.find(ssrc); + if (it != sink_callbacks_.end()) { + cb = it->second; + } else { + cb = callback_; + } + } + + if (cb) { + cb->OnTransformedFrame(std::move(frame)); + } else { + RTC_LOG(LS_WARNING) + << "SensorTimestampTransformer::Transform (disabled) has no callback" + << " direction=" + << (direction_ == Direction::kSend ? "send" : "recv") + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp; } return; } @@ -121,63 +155,125 @@ void SensorTimestampTransformer::TransformSend( std::unique_ptr frame) { // Get the RTP timestamp from the frame for logging uint32_t rtp_timestamp = frame->GetTimestamp(); - + uint32_t ssrc = frame->GetSsrc(); + auto data = frame->GetData(); - + // Pop the next sensor timestamp from the queue. // This assumes frames are captured and encoded in order (FIFO). int64_t ts_to_embed = 0; - + if (store_) { int64_t popped_ts = store_->pop(); if (popped_ts >= 0) { ts_to_embed = popped_ts; + } else { + RTC_LOG(LS_INFO) << "SensorTimestampTransformer::TransformSend no sensor timestamp available" + << " rtp_ts=" << rtp_timestamp << " orig_size=" << data.size(); } } - + // Always append trailer when enabled (even if timestamp is 0, // which indicates no sensor timestamp was set for this frame) std::vector new_data; if (enabled_.load()) { new_data = AppendTimestampTrailer(data, ts_to_embed); frame->SetData(rtc::ArrayView(new_data)); - - RTC_LOG(LS_VERBOSE) << "SensorTimestampTransformer: Appended timestamp trailer" - << " ts=" << ts_to_embed - << " rtp_ts=" << rtp_timestamp - << " data_size=" << new_data.size(); + + RTC_LOG(LS_INFO) << "SensorTimestampTransformer::TransformSend appended trailer" + << " ts_us=" << ts_to_embed << " rtp_ts=" << rtp_timestamp + << " ssrc=" << ssrc + << " orig_size=" << data.size() + << " new_size=" << new_data.size(); } - - webrtc::MutexLock lock(&mutex_); - if (callback_) { - callback_->OnTransformedFrame(std::move(frame)); + + // Forward to the appropriate callback (either global or per-SSRC sink). + rtc::scoped_refptr cb; + { + webrtc::MutexLock lock(&mutex_); + auto it = sink_callbacks_.find(ssrc); + if (it != sink_callbacks_.end()) { + cb = it->second; + } else { + cb = callback_; + } + } + + if (cb) { + cb->OnTransformedFrame(std::move(frame)); + } else { + RTC_LOG(LS_WARNING) + << "SensorTimestampTransformer::TransformSend has no callback" + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp; } } void SensorTimestampTransformer::TransformReceive( std::unique_ptr frame) { + uint32_t ssrc = frame->GetSsrc(); + uint32_t rtp_timestamp = frame->GetTimestamp(); auto data = frame->GetData(); std::vector stripped_data; - + + RTC_LOG(LS_INFO) << "SensorTimestampTransformer::TransformReceive begin" + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp + << " size=" << data.size(); + auto sensor_ts = ExtractTimestampTrailer(data, stripped_data); - + if (sensor_ts.has_value()) { // Store the extracted timestamp for later retrieval last_sensor_timestamp_.store(sensor_ts.value()); has_last_sensor_timestamp_.store(true); - + // Update frame with stripped data frame->SetData(rtc::ArrayView(stripped_data)); - - RTC_LOG(LS_VERBOSE) << "SensorTimestampTransformer: Extracted timestamp trailer" - << " ts=" << sensor_ts.value() - << " rtp_ts=" << frame->GetTimestamp() - << " stripped_size=" << stripped_data.size(); + + RTC_LOG(LS_INFO) << "SensorTimestampTransformer::TransformReceive extracted trailer" + << " ts_us=" << sensor_ts.value() + << " rtp_ts=" << frame->GetTimestamp() + << " ssrc=" << ssrc + << " stripped_size=" << stripped_data.size() + << " orig_size=" << data.size(); + } else { + // Log the last few bytes so we can see whether the magic marker is present. + size_t log_len = std::min(data.size(), 16); + std::string tail_bytes; + tail_bytes.reserve(log_len * 4); + for (size_t i = data.size() - log_len; i < data.size(); ++i) { + char buf[8]; + std::snprintf(buf, sizeof(buf), "%u", static_cast(data[i])); + if (!tail_bytes.empty()) { + tail_bytes.append(","); + } + tail_bytes.append(buf); + } + + RTC_LOG(LS_INFO) + << "SensorTimestampTransformer::TransformReceive no trailer found" + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp + << " size=" << data.size() + << " tail_bytes_dec=[" << tail_bytes << "]"; } - - webrtc::MutexLock lock(&mutex_); - if (callback_) { - callback_->OnTransformedFrame(std::move(frame)); + + // Forward to the appropriate callback (either global or per-SSRC sink). + rtc::scoped_refptr cb; + { + webrtc::MutexLock lock(&mutex_); + auto it = sink_callbacks_.find(ssrc); + if (it != sink_callbacks_.end()) { + cb = it->second; + } else { + cb = callback_; + } + } + + if (cb) { + cb->OnTransformedFrame(std::move(frame)); + } else { + RTC_LOG(LS_WARNING) + << "SensorTimestampTransformer::TransformReceive has no callback" + << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp; } } @@ -206,6 +302,10 @@ std::optional SensorTimestampTransformer::ExtractTimestampTrailer( rtc::ArrayView data, std::vector& out_data) { if (data.size() < kSensorTimestampTrailerSize) { + RTC_LOG(LS_INFO) + << "SensorTimestampTransformer::ExtractTimestampTrailer data too small" + << " size=" << data.size() + << " required=" << kSensorTimestampTrailerSize; out_data.assign(data.begin(), data.end()); return std::nullopt; } @@ -213,6 +313,14 @@ std::optional SensorTimestampTransformer::ExtractTimestampTrailer( // Check for magic bytes at the end const uint8_t* magic_start = data.data() + data.size() - 4; if (std::memcmp(magic_start, kSensorTimestampMagic, 4) != 0) { + RTC_LOG(LS_INFO) + << "SensorTimestampTransformer::ExtractTimestampTrailer magic mismatch" + << " size=" << data.size() + << " magic_bytes_dec=[" + << static_cast(magic_start[0]) << "," + << static_cast(magic_start[1]) << "," + << static_cast(magic_start[2]) << "," + << static_cast(magic_start[3]) << "]"; out_data.assign(data.begin(), data.end()); return std::nullopt; } From 951ef5913fe8c374b49d0b17eb83a48bc6879ec3 Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 4 Dec 2025 09:37:14 -0800 Subject: [PATCH 4/7] add e2ee key config --- examples/local_video/src/publisher.rs | 14 +++++++++++++- examples/local_video/src/subscriber.rs | 12 ++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index fbfffe45f..d3c3d74c1 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -1,5 +1,6 @@ use anyhow::Result; use clap::Parser; +use livekit::e2ee::{key_provider::*, E2eeOptions, EncryptionType}; use livekit::options::{TrackPublishOptions, VideoCodec, VideoEncoding}; use livekit::prelude::*; use livekit::webrtc::video_frame::{I420Buffer, VideoFrame, VideoRotation}; @@ -61,6 +62,10 @@ struct Args { #[arg(long)] api_secret: Option, + /// Shared E2EE key (enables end-to-end encryption when set) + #[arg(long)] + e2ee_key: Option, + /// Use H.265/HEVC encoding if supported (falls back to H.264 on failure) #[arg(long, default_value_t = false)] h265: bool, @@ -111,6 +116,13 @@ async fn main() -> Result<()> { info!("Connecting to LiveKit room '{}' as '{}'...", args.room_name, args.identity); let mut room_options = RoomOptions::default(); room_options.auto_subscribe = true; + if let Some(ref key) = args.e2ee_key { + let key_provider = + KeyProvider::with_shared_key(KeyProviderOptions::default(), key.clone().into_bytes()); + room_options.encryption = + Some(E2eeOptions { encryption_type: EncryptionType::Gcm, key_provider }); + info!("E2EE enabled with provided shared key"); + } let (room, _) = Room::connect(&url, &token, room_options).await?; let room = std::sync::Arc::new(room); info!("Connected: {} - {}", room.name(), room.sid().await); @@ -371,7 +383,7 @@ async fn main() -> Result<()> { // Attach a static sensor timestamp for testing and push it into the // shared queue used by the sensor timestamp transformer. if let Some(store) = track.sensor_timestamp_store() { - let sensor_ts = frame.timestamp_us + 123_456; // simple fixed offset for visibility + let sensor_ts = 123_456; //frame.timestamp_us + 123_456; // simple fixed offset for visibility frame.sensor_timestamp_us = Some(sensor_ts); store.store(frame.timestamp_us, sensor_ts); info!( diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index 250d89f59..608e33234 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -5,6 +5,7 @@ use egui_wgpu as egui_wgpu_backend; use egui_wgpu_backend::CallbackTrait; use eframe::wgpu::{self, util::DeviceExt}; use futures::StreamExt; +use livekit::e2ee::{key_provider::*, E2eeOptions, EncryptionType}; use livekit::prelude::*; use livekit::webrtc::video_stream::native::NativeVideoStream; use livekit_api::access_token; @@ -40,6 +41,10 @@ struct Args { #[arg(long)] api_secret: Option, + /// Shared E2EE key (enables end-to-end encryption when set) + #[arg(long)] + e2ee_key: Option, + /// Only subscribe to video from this participant identity #[arg(long)] participant: Option, @@ -192,6 +197,13 @@ async fn main() -> Result<()> { info!("Connecting to LiveKit room '{}' as '{}'...", args.room_name, args.identity); let mut room_options = RoomOptions::default(); room_options.auto_subscribe = true; + if let Some(ref key) = args.e2ee_key { + let key_provider = + KeyProvider::with_shared_key(KeyProviderOptions::default(), key.clone().into_bytes()); + room_options.encryption = + Some(E2eeOptions { encryption_type: EncryptionType::Gcm, key_provider }); + info!("E2EE enabled with provided shared key"); + } let (room, _) = Room::connect(&url, &token, room_options).await?; let room = Arc::new(room); info!("Connected: {} - {}", room.name(), room.sid().await); From ab3a055b21ddf6de21ac51beb7564849922fa7b0 Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 4 Dec 2025 15:18:33 -0800 Subject: [PATCH 5/7] measure latency along pipeline --- examples/local_video/src/publisher.rs | 5 ++++- examples/local_video/src/subscriber.rs | 15 +++++++++++-- .../src/native/peer_connection_factory.rs | 17 ++++++++++---- libwebrtc/src/native/sensor_timestamp.rs | 5 ----- webrtc-sys/src/sensor_timestamp.cpp | 22 +++++++++++-------- 5 files changed, 43 insertions(+), 21 deletions(-) diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index d3c3d74c1..7f4081eb5 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -383,7 +383,10 @@ async fn main() -> Result<()> { // Attach a static sensor timestamp for testing and push it into the // shared queue used by the sensor timestamp transformer. if let Some(store) = track.sensor_timestamp_store() { - let sensor_ts = 123_456; //frame.timestamp_us + 123_456; // simple fixed offset for visibility + let sensor_ts = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH") + .as_micros() as i64; frame.sensor_timestamp_us = Some(sensor_ts); store.store(frame.timestamp_us, sensor_ts); info!( diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index 608e33234..cb97ec3da 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -383,9 +383,20 @@ async fn main() -> Result<()> { // Log any parsed sensor timestamp for this frame if available. if let Some(ts) = video_track.last_sensor_timestamp() { + // Get the current system timestamp in microseconds + use std::time::{SystemTime, UNIX_EPOCH}; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_micros() as i64; + + // Calculate the latency in microseconds, then convert to milliseconds + let latency_us = now - ts; + let latency_ms = latency_us as f64 / 1000.0; + info!( - "Subscriber: received frame {}x{} with sensor_timestamp_us={}", - w, h, ts + "Subscriber: decoded frame {}x{} sensor_timestamp={} latency={:.2} ms", + w, h, ts, latency_ms ); } diff --git a/libwebrtc/src/native/peer_connection_factory.rs b/libwebrtc/src/native/peer_connection_factory.rs index 4edc63047..0e4f23348 100644 --- a/libwebrtc/src/native/peer_connection_factory.rs +++ b/libwebrtc/src/native/peer_connection_factory.rs @@ -44,10 +44,19 @@ impl Default for PeerConnectionFactory { fn default() -> Self { let mut log_sink = LOG_SINK.lock(); if log_sink.is_none() { - *log_sink = Some(sys_rtc::ffi::new_log_sink(|msg, _| { - let msg = msg.strip_suffix("\r\n").or(msg.strip_suffix('\n')).unwrap_or(&msg); - - log::debug!(target: "libwebrtc", "{}", msg); + *log_sink = Some(sys_rtc::ffi::new_log_sink(|msg, _severity| { + let msg = msg + .strip_suffix("\r\n") + .or(msg.strip_suffix('\n')) + .unwrap_or(&msg); + + // Route sensor timestamp transformer logs to a dedicated target so they can + // be enabled independently from the very noisy general libwebrtc logs. + if msg.contains("SensorTimestampTransformer") { + log::info!(target: "sensor_timestamp_rtp", "{}", msg); + } else { + log::debug!(target: "libwebrtc", "{}", msg); + } })); } diff --git a/libwebrtc/src/native/sensor_timestamp.rs b/libwebrtc/src/native/sensor_timestamp.rs index 519cb6b96..c0149ee49 100644 --- a/libwebrtc/src/native/sensor_timestamp.rs +++ b/libwebrtc/src/native/sensor_timestamp.rs @@ -140,11 +140,6 @@ impl SensorTimestampHandler { if self.sys_handle.has_sensor_timestamp() { let ts = self.sys_handle.last_sensor_timestamp(); if ts >= 0 { - log::info!( - target: "sensor_timestamp", - "last_sensor_timestamp: {}", - ts - ); Some(ts) } else { None diff --git a/webrtc-sys/src/sensor_timestamp.cpp b/webrtc-sys/src/sensor_timestamp.cpp index 36c333b86..14e014a6f 100644 --- a/webrtc-sys/src/sensor_timestamp.cpp +++ b/webrtc-sys/src/sensor_timestamp.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include "api/make_ref_counted.h" #include "livekit/peer_connection_factory.h" @@ -215,13 +216,18 @@ void SensorTimestampTransformer::TransformReceive( auto data = frame->GetData(); std::vector stripped_data; - RTC_LOG(LS_INFO) << "SensorTimestampTransformer::TransformReceive begin" - << " ssrc=" << ssrc << " rtp_ts=" << rtp_timestamp - << " size=" << data.size(); - auto sensor_ts = ExtractTimestampTrailer(data, stripped_data); if (sensor_ts.has_value()) { + // Compute latency from embedded sensor timestamp to RTP receive + // time (both in microseconds since Unix epoch), so we can compare + // this with the latency logged after decode on the subscriber side. + int64_t now_us = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + double recv_latency_ms = + static_cast(now_us - sensor_ts.value()) / 1000.0; + // Store the extracted timestamp for later retrieval last_sensor_timestamp_.store(sensor_ts.value()); has_last_sensor_timestamp_.store(true); @@ -229,12 +235,10 @@ void SensorTimestampTransformer::TransformReceive( // Update frame with stripped data frame->SetData(rtc::ArrayView(stripped_data)); - RTC_LOG(LS_INFO) << "SensorTimestampTransformer::TransformReceive extracted trailer" - << " ts_us=" << sensor_ts.value() + RTC_LOG(LS_INFO) << "SensorTimestampTransformer" + << " sensor_ts=" << sensor_ts.value() << " rtp_ts=" << frame->GetTimestamp() - << " ssrc=" << ssrc - << " stripped_size=" << stripped_data.size() - << " orig_size=" << data.size(); + << " recv_latency=" << recv_latency_ms << " ms"; } else { // Log the last few bytes so we can see whether the magic marker is present. size_t log_len = std::min(data.size(), 16); From 69cb8dcc5c6e9bb93e0339ddc5f1dca053e51ba0 Mon Sep 17 00:00:00 2001 From: David Chen Date: Wed, 10 Dec 2025 11:07:49 -0800 Subject: [PATCH 6/7] add video render to publisher --- Cargo.lock | 13 + examples/local_video/Cargo.toml | 1 + examples/local_video/src/publisher.rs | 568 ++++++++++++++++--------- examples/local_video/src/subscriber.rs | 414 +++--------------- examples/local_video/src/yuv_viewer.rs | 479 +++++++++++++++++++++ 5 files changed, 927 insertions(+), 548 deletions(-) create mode 100644 examples/local_video/src/yuv_viewer.rs diff --git a/Cargo.lock b/Cargo.lock index 172b738b2..ad4ca9ccf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3253,6 +3253,7 @@ dependencies = [ "nokhwa", "objc2 0.6.3", "parking_lot", + "time", "tokio", "webrtc-sys", "wgpu 25.0.2", @@ -5632,10 +5633,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" dependencies = [ "deranged", + "itoa", "num-conv", "powerfmt", "serde", "time-core", + "time-macros", ] [[package]] @@ -5644,6 +5647,16 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" +[[package]] +name = "time-macros" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-skia" version = "0.11.4" diff --git a/examples/local_video/Cargo.toml b/examples/local_video/Cargo.toml index 8d01c1086..37dd6b8ee 100644 --- a/examples/local_video/Cargo.toml +++ b/examples/local_video/Cargo.toml @@ -33,6 +33,7 @@ winit = { version = "0.30.11", features = ["android-native-activity"] } parking_lot = { version = "0.12.1", features = ["deadlock_detection"] } anyhow = "1" bytemuck = { version = "1.16", features = ["derive"] } +time = { version = "0.3", features = ["macros", "formatting"] } [target.'cfg(target_os = "macos")'.dependencies] objc2 = { version = "0.6.0", features = ["relax-sign-encoding"] } diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index 7f4081eb5..1b6b6c050 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -1,5 +1,7 @@ use anyhow::Result; use clap::Parser; +use eframe::egui; +use egui_wgpu as egui_wgpu_backend; use livekit::e2ee::{key_provider::*, E2eeOptions, EncryptionType}; use livekit::options::{TrackPublishOptions, VideoCodec, VideoEncoding}; use livekit::prelude::*; @@ -8,12 +10,30 @@ use livekit::webrtc::video_source::native::NativeVideoSource; use livekit::webrtc::video_source::{RtcVideoSource, VideoResolution}; use livekit_api::access_token; use log::{debug, info}; -use yuv_sys as yuv_sys; use nokhwa::pixel_format::RgbFormat; use nokhwa::utils::{ApiBackend, CameraFormat, CameraIndex, FrameFormat, RequestedFormat, RequestedFormatType, Resolution}; use nokhwa::Camera; +use parking_lot::Mutex; use std::env; +use std::sync::Arc; use std::time::{Duration, Instant}; +use yuv_sys as yuv_sys; + +mod yuv_viewer; +use yuv_viewer::{SharedYuv, YuvPaintCallback}; + +fn format_sensor_timestamp(ts_micros: i64) -> Option { + if ts_micros == 0 { + // Treat 0 as "not set" + return None; + } + let nanos = i128::from(ts_micros).checked_mul(1_000)?; + let dt = time::OffsetDateTime::from_unix_timestamp_nanos(nanos).ok()?; + let format = time::macros::format_description!( + "[year]-[month]-[day] [hour]:[minute]:[second]:[subsecond digits:3]" + ); + dt.format(&format).ok() +} #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -66,9 +86,17 @@ struct Args { #[arg(long)] e2ee_key: Option, + /// Attach sensor timestamps to published frames (for testing) + #[arg(long, default_value_t = false)] + sensor_timestamp: bool, + /// Use H.265/HEVC encoding if supported (falls back to H.264 on failure) #[arg(long, default_value_t = false)] h265: bool, + + /// Show a local preview window for the captured video + #[arg(long, default_value_t = false)] + show_video: bool, } fn list_cameras() -> Result<()> { @@ -219,83 +247,93 @@ async fn main() -> Result<()> { info!("Published camera track"); } - // Reusable I420 buffer and frame - let mut frame = VideoFrame { - rotation: VideoRotation::VideoRotation0, - timestamp_us: 0, - sensor_timestamp_us: None, - buffer: I420Buffer::new(width, height), + // Optional shared YUV buffer for local preview UI + let shared_preview = if args.show_video { + Some(Arc::new(Mutex::new(SharedYuv { + width: 0, + height: 0, + stride_y: 0, + stride_u: 0, + stride_v: 0, + y: Vec::new(), + u: Vec::new(), + v: Vec::new(), + dirty: false, + sensor_timestamp: None, + }))) + } else { + None }; - let is_yuyv = fmt.format() == FrameFormat::YUYV; - info!( - "Selected conversion path: {}", - if is_yuyv { "YUYV->I420 (libyuv)" } else { "Auto (RGB24 or MJPEG)" } - ); - // Accurate pacing using absolute schedule (no drift) - let mut ticker = tokio::time::interval(Duration::from_secs_f64(1.0 / pace_fps)); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - // Align the first tick to now - ticker.tick().await; - let start_ts = Instant::now(); - - // Capture loop - let mut frames: u64 = 0; - let mut last_fps_log = Instant::now(); - let target = Duration::from_secs_f64(1.0 / pace_fps); - info!("Target frame interval: {:.2} ms", target.as_secs_f64() * 1000.0); - - // Timing accumulators (ms) for rolling stats - let mut sum_get_ms = 0.0; - let mut sum_decode_ms = 0.0; - let mut sum_convert_ms = 0.0; - let mut sum_capture_ms = 0.0; - let mut sum_sleep_ms = 0.0; - let mut sum_iter_ms = 0.0; - let mut logged_mjpeg_fallback = false; - loop { - // Wait until the scheduled next frame time - let wait_start = Instant::now(); + // Spawn the capture loop on the Tokio runtime so we can optionally run an egui + // preview window on the main thread. + let capture_shared = shared_preview.clone(); + let show_sensor_ts = args.sensor_timestamp; + let capture_handle = tokio::spawn(async move { + // Reusable I420 buffer and frame + let mut frame = VideoFrame { + rotation: VideoRotation::VideoRotation0, + timestamp_us: 0, + sensor_timestamp_us: None, + buffer: I420Buffer::new(width, height), + }; + let is_yuyv = fmt.format() == FrameFormat::YUYV; + info!( + "Selected conversion path: {}", + if is_yuyv { "YUYV->I420 (libyuv)" } else { "Auto (RGB24 or MJPEG)" } + ); + + // Accurate pacing using absolute schedule (no drift) + let mut ticker = tokio::time::interval(Duration::from_secs_f64(1.0 / pace_fps)); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // Align the first tick to now ticker.tick().await; - let iter_start = Instant::now(); - - // Get frame as RGB24 (decoded by nokhwa if needed) - let t0 = Instant::now(); - let frame_buf = camera.frame()?; - let t1 = Instant::now(); - let (stride_y, stride_u, stride_v) = frame.buffer.strides(); - let (data_y, data_u, data_v) = frame.buffer.data_mut(); - // Fast path for YUYV: convert directly to I420 via libyuv - let t2 = if is_yuyv { - let src = frame_buf.buffer(); - let src_bytes = src.as_ref(); - let src_stride = (width * 2) as i32; // YUYV packed 4:2:2 - let t2_local = t1; // no decode step in YUYV path - unsafe { - // returns 0 on success - let _ = yuv_sys::rs_YUY2ToI420( - src_bytes.as_ptr(), - src_stride, - data_y.as_mut_ptr(), - stride_y as i32, - data_u.as_mut_ptr(), - stride_u as i32, - data_v.as_mut_ptr(), - stride_v as i32, - width as i32, - height as i32, - ); - } - t2_local - } else { - // Auto path (either RGB24 already or compressed MJPEG) - let src = frame_buf.buffer(); - let t2_local = if src.len() == (width as usize * height as usize * 3) { - // Already RGB24 from backend; convert directly + let start_ts = Instant::now(); + + // Capture loop + let mut frames: u64 = 0; + let mut last_fps_log = Instant::now(); + let target = Duration::from_secs_f64(1.0 / pace_fps); + info!("Target frame interval: {:.2} ms", target.as_secs_f64() * 1000.0); + + // Timing accumulators (ms) for rolling stats + let mut sum_get_ms = 0.0; + let mut sum_decode_ms = 0.0; + let mut sum_convert_ms = 0.0; + let mut sum_capture_ms = 0.0; + let mut sum_sleep_ms = 0.0; + let mut sum_iter_ms = 0.0; + let mut logged_mjpeg_fallback = false; + + // Local YUV buffers reused for preview upload (if enabled) + let mut y_buf: Vec = Vec::new(); + let mut u_buf: Vec = Vec::new(); + let mut v_buf: Vec = Vec::new(); + let mut last_sensor_ts: Option = None; + + loop { + // Wait until the scheduled next frame time + let wait_start = Instant::now(); + ticker.tick().await; + let iter_start = Instant::now(); + + // Get frame as RGB24 (decoded by nokhwa if needed) + let t0 = Instant::now(); + let frame_buf = camera.frame()?; + let t1 = Instant::now(); + let (stride_y, stride_u, stride_v) = frame.buffer.strides(); + let (data_y, data_u, data_v) = frame.buffer.data_mut(); + // Fast path for YUYV: convert directly to I420 via libyuv + let t2 = if is_yuyv { + let src = frame_buf.buffer(); + let src_bytes = src.as_ref(); + let src_stride = (width * 2) as i32; // YUYV packed 4:2:2 + let t2_local = t1; // no decode step in YUYV path unsafe { - let _ = yuv_sys::rs_RGB24ToI420( - src.as_ref().as_ptr(), - (width * 3) as i32, + // returns 0 on success + let _ = yuv_sys::rs_YUY2ToI420( + src_bytes.as_ptr(), + src_stride, data_y.as_mut_ptr(), stride_y as i32, data_u.as_mut_ptr(), @@ -306,144 +344,272 @@ async fn main() -> Result<()> { height as i32, ); } - Instant::now() + t2_local } else { - // Try fast MJPEG->I420 via libyuv if available; fallback to image crate - let mut used_fast_mjpeg = false; - let t2_try = unsafe { - // rs_MJPGToI420 returns 0 on success - let ret = yuv_sys::rs_MJPGToI420( - src.as_ref().as_ptr(), - src.len(), - data_y.as_mut_ptr(), - stride_y as i32, - data_u.as_mut_ptr(), - stride_u as i32, - data_v.as_mut_ptr(), - stride_v as i32, - width as i32, - height as i32, - width as i32, - height as i32, - ); - if ret == 0 { used_fast_mjpeg = true; Instant::now() } else { t1 } - }; - if used_fast_mjpeg { - t2_try + // Auto path (either RGB24 already or compressed MJPEG) + let src = frame_buf.buffer(); + let t2_local = if src.len() == (width as usize * height as usize * 3) { + // Already RGB24 from backend; convert directly + unsafe { + let _ = yuv_sys::rs_RGB24ToI420( + src.as_ref().as_ptr(), + (width * 3) as i32, + data_y.as_mut_ptr(), + stride_y as i32, + data_u.as_mut_ptr(), + stride_u as i32, + data_v.as_mut_ptr(), + stride_v as i32, + width as i32, + height as i32, + ); + } + Instant::now() } else { - // Fallback: decode MJPEG using image crate then RGB24->I420 - match image::load_from_memory(src.as_ref()) { - Ok(img_dyn) => { - let rgb8 = img_dyn.to_rgb8(); - let dec_w = rgb8.width() as u32; - let dec_h = rgb8.height() as u32; - if dec_w != width || dec_h != height { - log::warn!( - "Decoded MJPEG size {}x{} differs from requested {}x{}; dropping frame", - dec_w, dec_h, width, height - ); - continue; - } - unsafe { - let _ = yuv_sys::rs_RGB24ToI420( - rgb8.as_raw().as_ptr(), - (dec_w * 3) as i32, - data_y.as_mut_ptr(), - stride_y as i32, - data_u.as_mut_ptr(), - stride_u as i32, - data_v.as_mut_ptr(), - stride_v as i32, - width as i32, - height as i32, - ); - } + // Try fast MJPEG->I420 via libyuv if available; fallback to image crate + let mut used_fast_mjpeg = false; + let t2_try = unsafe { + // rs_MJPGToI420 returns 0 on success + let ret = yuv_sys::rs_MJPGToI420( + src.as_ref().as_ptr(), + src.len(), + data_y.as_mut_ptr(), + stride_y as i32, + data_u.as_mut_ptr(), + stride_u as i32, + data_v.as_mut_ptr(), + stride_v as i32, + width as i32, + height as i32, + width as i32, + height as i32, + ); + if ret == 0 { + used_fast_mjpeg = true; Instant::now() + } else { + t1 } - Err(e2) => { - if !logged_mjpeg_fallback { - log::error!( - "MJPEG decode failed; buffer not RGB24 and image decode failed: {}", - e2 - ); - logged_mjpeg_fallback = true; + }; + if used_fast_mjpeg { + t2_try + } else { + // Fallback: decode MJPEG using image crate then RGB24->I420 + match image::load_from_memory(src.as_ref()) { + Ok(img_dyn) => { + let rgb8 = img_dyn.to_rgb8(); + let dec_w = rgb8.width() as u32; + let dec_h = rgb8.height() as u32; + if dec_w != width || dec_h != height { + log::warn!( + "Decoded MJPEG size {}x{} differs from requested {}x{}; dropping frame", + dec_w, dec_h, width, height + ); + continue; + } + unsafe { + let _ = yuv_sys::rs_RGB24ToI420( + rgb8.as_raw().as_ptr(), + (dec_w * 3) as i32, + data_y.as_mut_ptr(), + stride_y as i32, + data_u.as_mut_ptr(), + stride_u as i32, + data_v.as_mut_ptr(), + stride_v as i32, + width as i32, + height as i32, + ); + } + Instant::now() + } + Err(e2) => { + if !logged_mjpeg_fallback { + log::error!( + "MJPEG decode failed; buffer not RGB24 and image decode failed: {}", + e2 + ); + logged_mjpeg_fallback = true; + } + continue; } - continue; } } - } + }; + t2_local }; - t2_local - }; - let t3 = Instant::now(); - - // Update RTP timestamp (monotonic, microseconds since start) - frame.timestamp_us = start_ts.elapsed().as_micros() as i64; - - // Attach a static sensor timestamp for testing and push it into the - // shared queue used by the sensor timestamp transformer. - if let Some(store) = track.sensor_timestamp_store() { - let sensor_ts = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .expect("SystemTime before UNIX EPOCH") - .as_micros() as i64; - frame.sensor_timestamp_us = Some(sensor_ts); - store.store(frame.timestamp_us, sensor_ts); - info!( - "Publisher: attached sensor_timestamp_us={} for capture_ts={}", - sensor_ts, frame.timestamp_us - ); + let t3 = Instant::now(); + + // Update RTP timestamp (monotonic, microseconds since start) + frame.timestamp_us = start_ts.elapsed().as_micros() as i64; + + // Optionally attach a sensor timestamp and push it into the shared queue + // used by the sensor timestamp transformer. + if show_sensor_ts { + if let Some(store) = track.sensor_timestamp_store() { + let sensor_ts = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH") + .as_micros() as i64; + frame.sensor_timestamp_us = Some(sensor_ts); + store.store(frame.timestamp_us, sensor_ts); + last_sensor_ts = Some(sensor_ts); + info!( + "Publisher: attached sensor_timestamp_us={} for capture_ts={}", + sensor_ts, frame.timestamp_us + ); + } + } + + // If preview is enabled, copy I420 planes into the shared buffer. + if let Some(shared) = &capture_shared { + let (sy, su, sv) = (stride_y as u32, stride_u as u32, stride_v as u32); + let (dy, du, dv) = frame.buffer.data(); + let ch = (height + 1) / 2; + let y_size = (sy * height) as usize; + let u_size = (su * ch) as usize; + let v_size = (sv * ch) as usize; + if y_buf.len() != y_size { + y_buf.resize(y_size, 0); + } + if u_buf.len() != u_size { + u_buf.resize(u_size, 0); + } + if v_buf.len() != v_size { + v_buf.resize(v_size, 0); + } + y_buf.copy_from_slice(dy); + u_buf.copy_from_slice(du); + v_buf.copy_from_slice(dv); + + let mut s = shared.lock(); + s.width = width; + s.height = height; + s.stride_y = sy; + s.stride_u = su; + s.stride_v = sv; + std::mem::swap(&mut s.y, &mut y_buf); + std::mem::swap(&mut s.u, &mut u_buf); + std::mem::swap(&mut s.v, &mut v_buf); + s.dirty = true; + s.sensor_timestamp = last_sensor_ts; + } + + rtc_source.capture_frame(&frame); + let t4 = Instant::now(); + + frames += 1; + // We already paced via interval; measure actual sleep time for logging only + let sleep_dur = iter_start - wait_start; + + // Per-iteration timing bookkeeping + let t_end = Instant::now(); + let get_ms = (t1 - t0).as_secs_f64() * 1000.0; + let decode_ms = (t2 - t1).as_secs_f64() * 1000.0; + let convert_ms = (t3 - t2).as_secs_f64() * 1000.0; + let capture_ms = (t4 - t3).as_secs_f64() * 1000.0; + let sleep_ms = sleep_dur.as_secs_f64() * 1000.0; + let iter_ms = (t_end - iter_start).as_secs_f64() * 1000.0; + sum_get_ms += get_ms; + sum_decode_ms += decode_ms; + sum_convert_ms += convert_ms; + sum_capture_ms += capture_ms; + sum_sleep_ms += sleep_ms; + sum_iter_ms += iter_ms; + + if last_fps_log.elapsed() >= std::time::Duration::from_secs(2) { + let secs = last_fps_log.elapsed().as_secs_f64(); + let fps_est = frames as f64 / secs; + let n = frames.max(1) as f64; + info!( + "Publishing video: {}x{}, ~{:.1} fps | avg ms: get {:.2}, decode {:.2}, convert {:.2}, capture {:.2}, sleep {:.2}, iter {:.2} | target {:.2}", + width, + height, + fps_est, + sum_get_ms / n, + sum_decode_ms / n, + sum_convert_ms / n, + sum_capture_ms / n, + sum_sleep_ms / n, + sum_iter_ms / n, + target.as_secs_f64() * 1000.0, + ); + frames = 0; + sum_get_ms = 0.0; + sum_decode_ms = 0.0; + sum_convert_ms = 0.0; + sum_capture_ms = 0.0; + sum_sleep_ms = 0.0; + sum_iter_ms = 0.0; + last_fps_log = Instant::now(); + } + } + #[allow(unreachable_code)] + Ok::<(), anyhow::Error>(()) + }); + + // If preview is requested, run an egui window on the main thread rendering from + // the shared YUV buffer. Otherwise, just wait for the capture loop. + if let Some(shared) = shared_preview { + struct PreviewApp { + shared: Arc>, } - rtc_source.capture_frame(&frame); - let t4 = Instant::now(); - - frames += 1; - // We already paced via interval; measure actual sleep time for logging only - let sleep_dur = iter_start - wait_start; - - // Per-iteration timing bookkeeping - let t_end = Instant::now(); - let get_ms = (t1 - t0).as_secs_f64() * 1000.0; - let decode_ms = (t2 - t1).as_secs_f64() * 1000.0; - let convert_ms = (t3 - t2).as_secs_f64() * 1000.0; - let capture_ms = (t4 - t3).as_secs_f64() * 1000.0; - let sleep_ms = sleep_dur.as_secs_f64() * 1000.0; - let iter_ms = (t_end - iter_start).as_secs_f64() * 1000.0; - sum_get_ms += get_ms; - sum_decode_ms += decode_ms; - sum_convert_ms += convert_ms; - sum_capture_ms += capture_ms; - sum_sleep_ms += sleep_ms; - sum_iter_ms += iter_ms; - - if last_fps_log.elapsed() >= std::time::Duration::from_secs(2) { - let secs = last_fps_log.elapsed().as_secs_f64(); - let fps_est = frames as f64 / secs; - let n = frames.max(1) as f64; - info!( - "Publishing video: {}x{}, ~{:.1} fps | avg ms: get {:.2}, decode {:.2}, convert {:.2}, capture {:.2}, sleep {:.2}, iter {:.2} | target {:.2}", - width, - height, - fps_est, - sum_get_ms / n, - sum_decode_ms / n, - sum_convert_ms / n, - sum_capture_ms / n, - sum_sleep_ms / n, - sum_iter_ms / n, - target.as_secs_f64() * 1000.0, - ); - frames = 0; - sum_get_ms = 0.0; - sum_decode_ms = 0.0; - sum_convert_ms = 0.0; - sum_capture_ms = 0.0; - sum_sleep_ms = 0.0; - sum_iter_ms = 0.0; - last_fps_log = Instant::now(); + impl eframe::App for PreviewApp { + fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) { + egui::CentralPanel::default().show(ctx, |ui| { + let available = ui.available_size(); + let rect = egui::Rect::from_min_size(ui.min_rect().min, available); + + ui.ctx().request_repaint(); + + let cb = egui_wgpu_backend::Callback::new_paint_callback( + rect, + YuvPaintCallback { + shared: self.shared.clone(), + }, + ); + ui.painter().add(cb); + }); + + // Sensor timestamp overlay: top-left, same style as subscriber. + let sensor_timestamp_text = { + let shared = self.shared.lock(); + shared + .sensor_timestamp + .and_then(format_sensor_timestamp) + }; + if let Some(ts_text) = sensor_timestamp_text { + egui::Area::new("publisher_sensor_timestamp_overlay".into()) + .anchor(egui::Align2::LEFT_TOP, egui::vec2(20.0, 20.0)) + .interactable(false) + .show(ctx, |ui| { + ui.label( + egui::RichText::new(ts_text) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + }); + } + + ctx.request_repaint_after(Duration::from_millis(16)); + } } + + let app = PreviewApp { shared }; + let native_options = eframe::NativeOptions::default(); + eframe::run_native( + "LiveKit Camera Publisher Preview", + native_options, + Box::new(|_| Ok::, _>(Box::new(app))), + )?; + // When the window closes, main will exit, dropping the runtime and capture task. + Ok(()) + } else { + // No preview window; just run the capture loop until process exit or error. + capture_handle.await??; + Ok(()) } } - diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index cb97ec3da..15b3c227f 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -2,8 +2,6 @@ use anyhow::Result; use clap::Parser; use eframe::egui; use egui_wgpu as egui_wgpu_backend; -use egui_wgpu_backend::CallbackTrait; -use eframe::wgpu::{self, util::DeviceExt}; use futures::StreamExt; use livekit::e2ee::{key_provider::*, E2eeOptions, EncryptionType}; use livekit::prelude::*; @@ -18,6 +16,9 @@ use std::{ time::{Duration, Instant}, }; +mod yuv_viewer; +use yuv_viewer::{SharedYuv, YuvPaintCallback}; + #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -50,18 +51,6 @@ struct Args { participant: Option, } -struct SharedYuv { - width: u32, - height: u32, - stride_y: u32, - stride_u: u32, - stride_v: u32, - y: Vec, - u: Vec, - v: Vec, - dirty: bool, -} - #[derive(Clone)] struct SimulcastState { available: bool, @@ -109,6 +98,20 @@ fn simulcast_state_full_dims( sc.full_dims } +fn format_sensor_timestamp(ts_micros: i64) -> Option { + if ts_micros == 0 { + // Treat 0 as "not set" + return None; + } + // Convert microseconds since UNIX epoch to `OffsetDateTime` in UTC, then format. + let nanos = i128::from(ts_micros).checked_mul(1_000)?; + let dt = time::OffsetDateTime::from_unix_timestamp_nanos(nanos).ok()?; + let format = time::macros::format_description!( + "[year]-[month]-[day] [hour]:[minute]:[second]:[subsecond digits:3]" + ); + dt.format(&format).ok() +} + struct VideoApp { shared: Arc>, simulcast: Arc>, @@ -131,6 +134,27 @@ impl eframe::App for VideoApp { ui.painter().add(cb); }); + // Sensor timestamp overlay: top-left. Show nothing if no sensor timestamp parsed. + let sensor_timestamp_text = { + let shared = self.shared.lock(); + shared + .sensor_timestamp + .and_then(format_sensor_timestamp) + }; + if let Some(ts_text) = sensor_timestamp_text { + egui::Area::new("sensor_timestamp_overlay".into()) + .anchor(egui::Align2::LEFT_TOP, egui::vec2(20.0, 20.0)) + .interactable(false) + .show(ctx, |ui| { + ui.label( + egui::RichText::new(ts_text) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + }); + } + // Simulcast layer controls: bottom-left overlay egui::Area::new("simulcast_controls".into()) .anchor(egui::Align2::LEFT_BOTTOM, egui::vec2(10.0, -10.0)) @@ -219,6 +243,7 @@ async fn main() -> Result<()> { u: Vec::new(), v: Vec::new(), dirty: false, + sensor_timestamp: None, })); // Subscribe to room events: on first video track, start sink task @@ -369,28 +394,40 @@ async fn main() -> Result<()> { u_buf.copy_from_slice(du); v_buf.copy_from_slice(dv); - // Swap buffers into shared state - let mut s = shared2.lock(); - s.width = w as u32; - s.height = h as u32; - s.stride_y = sy as u32; - s.stride_u = su as u32; - s.stride_v = sv as u32; - std::mem::swap(&mut s.y, &mut y_buf); - std::mem::swap(&mut s.u, &mut u_buf); - std::mem::swap(&mut s.v, &mut v_buf); - s.dirty = true; - - // Log any parsed sensor timestamp for this frame if available. - if let Some(ts) = video_track.last_sensor_timestamp() { - // Get the current system timestamp in microseconds + // Fetch any parsed sensor timestamp for this frame, if available. + // Treat 0 as "not set". + let ts_opt = video_track + .last_sensor_timestamp() + .and_then(|ts| if ts == 0 { None } else { Some(ts) }); + + // Swap buffers into shared state, and only update the + // sensor timestamp when we actually have one. This + // prevents the overlay from flickering on frames that + // don't carry a parsed timestamp. + { + let mut s = shared2.lock(); + s.width = w as u32; + s.height = h as u32; + s.stride_y = sy as u32; + s.stride_u = su as u32; + s.stride_v = sv as u32; + std::mem::swap(&mut s.y, &mut y_buf); + std::mem::swap(&mut s.u, &mut u_buf); + std::mem::swap(&mut s.v, &mut v_buf); + s.dirty = true; + if let Some(ts) = ts_opt { + s.sensor_timestamp = Some(ts); + } + } + + // Log sensor timestamp + derived latency if available. + if let Some(ts) = ts_opt { use std::time::{SystemTime, UNIX_EPOCH}; let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_micros() as i64; - // Calculate the latency in microseconds, then convert to milliseconds let latency_us = now - ts; let latency_ms = latency_us as f64 / 1000.0; @@ -474,320 +511,3 @@ async fn main() -> Result<()> { Ok(()) } - -// ===== WGPU I420 renderer ===== - -struct YuvPaintCallback { - shared: Arc>, -} - -struct YuvGpuState { - pipeline: wgpu::RenderPipeline, - sampler: wgpu::Sampler, - bind_layout: wgpu::BindGroupLayout, - y_tex: wgpu::Texture, - u_tex: wgpu::Texture, - v_tex: wgpu::Texture, - y_view: wgpu::TextureView, - u_view: wgpu::TextureView, - v_view: wgpu::TextureView, - bind_group: wgpu::BindGroup, - params_buf: wgpu::Buffer, - y_pad_w: u32, - uv_pad_w: u32, - dims: (u32, u32), -} - -impl YuvGpuState { - fn create_textures(device: &wgpu::Device, _width: u32, height: u32, y_pad_w: u32, uv_pad_w: u32) -> (wgpu::Texture, wgpu::Texture, wgpu::Texture, wgpu::TextureView, wgpu::TextureView, wgpu::TextureView) { - let y_size = wgpu::Extent3d { width: y_pad_w, height, depth_or_array_layers: 1 }; - let uv_size = wgpu::Extent3d { width: uv_pad_w, height: (height + 1) / 2, depth_or_array_layers: 1 }; - let usage = wgpu::TextureUsages::COPY_DST | wgpu::TextureUsages::TEXTURE_BINDING; - let desc = |size: wgpu::Extent3d| wgpu::TextureDescriptor { - label: Some("yuv_plane"), - size, - mip_level_count: 1, - sample_count: 1, - dimension: wgpu::TextureDimension::D2, - format: wgpu::TextureFormat::R8Unorm, - usage, - view_formats: &[], - }; - let y_tex = device.create_texture(&desc(y_size)); - let u_tex = device.create_texture(&desc(uv_size)); - let v_tex = device.create_texture(&desc(uv_size)); - let y_view = y_tex.create_view(&wgpu::TextureViewDescriptor::default()); - let u_view = u_tex.create_view(&wgpu::TextureViewDescriptor::default()); - let v_view = v_tex.create_view(&wgpu::TextureViewDescriptor::default()); - (y_tex, u_tex, v_tex, y_view, u_view, v_view) - } -} - -fn align_up(value: u32, alignment: u32) -> u32 { - ((value + alignment - 1) / alignment) * alignment -} - -#[repr(C)] -#[derive(Clone, Copy, bytemuck::Pod, bytemuck::Zeroable)] -struct ParamsUniform { - src_w: u32, - src_h: u32, - y_tex_w: u32, - uv_tex_w: u32, -} - -impl CallbackTrait for YuvPaintCallback { - fn prepare(&self, device: &wgpu::Device, queue: &wgpu::Queue, _screen_desc: &egui_wgpu_backend::ScreenDescriptor, _encoder: &mut wgpu::CommandEncoder, resources: &mut egui_wgpu_backend::CallbackResources) -> Vec { - // Initialize or update GPU state lazily based on current frame - let mut shared = self.shared.lock(); - - // Nothing to draw yet - if shared.width == 0 || shared.height == 0 { - return Vec::new(); - } - - // Fetch or create our GPU state - if resources.get::().is_none() { - // Build pipeline and initial small textures; will be recreated on first upload - let shader_src = include_str!("yuv_shader.wgsl"); - let shader = device.create_shader_module(wgpu::ShaderModuleDescriptor { - label: Some("yuv_shader"), - source: wgpu::ShaderSource::Wgsl(shader_src.into()), - }); - - let bind_layout = device.create_bind_group_layout(&wgpu::BindGroupLayoutDescriptor { - label: Some("yuv_bind_layout"), - entries: &[ - wgpu::BindGroupLayoutEntry { - binding: 0, - visibility: wgpu::ShaderStages::FRAGMENT, - ty: wgpu::BindingType::Sampler(wgpu::SamplerBindingType::Filtering), - count: None, - }, - wgpu::BindGroupLayoutEntry { - binding: 1, - visibility: wgpu::ShaderStages::FRAGMENT, - ty: wgpu::BindingType::Texture { - sample_type: wgpu::TextureSampleType::Float { filterable: true }, - view_dimension: wgpu::TextureViewDimension::D2, - multisampled: false, - }, - count: None, - }, - wgpu::BindGroupLayoutEntry { binding: 2, visibility: wgpu::ShaderStages::FRAGMENT, ty: wgpu::BindingType::Texture { sample_type: wgpu::TextureSampleType::Float { filterable: true }, view_dimension: wgpu::TextureViewDimension::D2, multisampled: false }, count: None }, - wgpu::BindGroupLayoutEntry { binding: 3, visibility: wgpu::ShaderStages::FRAGMENT, ty: wgpu::BindingType::Texture { sample_type: wgpu::TextureSampleType::Float { filterable: true }, view_dimension: wgpu::TextureViewDimension::D2, multisampled: false }, count: None }, - wgpu::BindGroupLayoutEntry { - binding: 4, - visibility: wgpu::ShaderStages::FRAGMENT, - ty: wgpu::BindingType::Buffer { - ty: wgpu::BufferBindingType::Uniform, - has_dynamic_offset: false, - min_binding_size: Some(std::num::NonZeroU64::new(std::mem::size_of::() as u64).unwrap()), - }, - count: None, - }, - ], - }); - - let pipeline_layout = device.create_pipeline_layout(&wgpu::PipelineLayoutDescriptor { - label: Some("yuv_pipeline_layout"), - bind_group_layouts: &[&bind_layout], - push_constant_ranges: &[], - }); - - let render_pipeline = device.create_render_pipeline(&wgpu::RenderPipelineDescriptor { - label: Some("yuv_pipeline"), - layout: Some(&pipeline_layout), - vertex: wgpu::VertexState { module: &shader, entry_point: Some("vs_main"), buffers: &[], compilation_options: wgpu::PipelineCompilationOptions::default() }, - fragment: Some(wgpu::FragmentState { - module: &shader, - entry_point: Some("fs_main"), - targets: &[Some(wgpu::ColorTargetState { - format: wgpu::TextureFormat::Bgra8Unorm, - blend: Some(wgpu::BlendState::ALPHA_BLENDING), - write_mask: wgpu::ColorWrites::ALL, - })], - compilation_options: wgpu::PipelineCompilationOptions::default(), - }), - primitive: wgpu::PrimitiveState { topology: wgpu::PrimitiveTopology::TriangleList, strip_index_format: None, front_face: wgpu::FrontFace::Ccw, cull_mode: None, unclipped_depth: false, polygon_mode: wgpu::PolygonMode::Fill, conservative: false }, - depth_stencil: None, - multisample: wgpu::MultisampleState { count: 1, mask: !0, alpha_to_coverage_enabled: false }, - multiview: None, - cache: None, - }); - - let sampler = device.create_sampler(&wgpu::SamplerDescriptor { - label: Some("yuv_sampler"), - address_mode_u: wgpu::AddressMode::ClampToEdge, - address_mode_v: wgpu::AddressMode::ClampToEdge, - address_mode_w: wgpu::AddressMode::ClampToEdge, - mag_filter: wgpu::FilterMode::Linear, - min_filter: wgpu::FilterMode::Linear, - mipmap_filter: wgpu::FilterMode::Nearest, - ..Default::default() - }); - - let params_buf = device.create_buffer_init(&wgpu::util::BufferInitDescriptor { - label: Some("yuv_params"), - contents: bytemuck::bytes_of(&ParamsUniform { src_w: 1, src_h: 1, y_tex_w: 1, uv_tex_w: 1 }), - usage: wgpu::BufferUsages::UNIFORM | wgpu::BufferUsages::COPY_DST, - }); - - // Initial tiny textures - let (y_tex, u_tex, v_tex, y_view, u_view, v_view) = YuvGpuState::create_textures(device, 1, 1, 256, 256); - let bind_group = device.create_bind_group(&wgpu::BindGroupDescriptor { - label: Some("yuv_bind_group"), - layout: &bind_layout, - entries: &[ - wgpu::BindGroupEntry { binding: 0, resource: wgpu::BindingResource::Sampler(&sampler) }, - wgpu::BindGroupEntry { binding: 1, resource: wgpu::BindingResource::TextureView(&y_view) }, - wgpu::BindGroupEntry { binding: 2, resource: wgpu::BindingResource::TextureView(&u_view) }, - wgpu::BindGroupEntry { binding: 3, resource: wgpu::BindingResource::TextureView(&v_view) }, - wgpu::BindGroupEntry { binding: 4, resource: params_buf.as_entire_binding() }, - ], - }); - - let new_state = YuvGpuState { - pipeline: render_pipeline, - sampler, - bind_layout, - y_tex, - u_tex, - v_tex, - y_view, - u_view, - v_view, - bind_group, - params_buf, - y_pad_w: 256, - uv_pad_w: 256, - dims: (0, 0), - }; - resources.insert(new_state); - } - let state = resources.get_mut::().unwrap(); - - // Upload planes when marked dirty - // Recreate textures/bind group on size change - if state.dims != (shared.width, shared.height) { - let y_pad_w = align_up(shared.width, 256); - let uv_w = (shared.width + 1) / 2; - let uv_pad_w = align_up(uv_w, 256); - let (y_tex, u_tex, v_tex, y_view, u_view, v_view) = YuvGpuState::create_textures(device, shared.width, shared.height, y_pad_w, uv_pad_w); - let bind_group = device.create_bind_group(&wgpu::BindGroupDescriptor { - label: Some("yuv_bind_group"), - layout: &state.bind_layout, - entries: &[ - wgpu::BindGroupEntry { binding: 0, resource: wgpu::BindingResource::Sampler(&state.sampler) }, - wgpu::BindGroupEntry { binding: 1, resource: wgpu::BindingResource::TextureView(&y_view) }, - wgpu::BindGroupEntry { binding: 2, resource: wgpu::BindingResource::TextureView(&u_view) }, - wgpu::BindGroupEntry { binding: 3, resource: wgpu::BindingResource::TextureView(&v_view) }, - wgpu::BindGroupEntry { binding: 4, resource: state.params_buf.as_entire_binding() }, - ], - }); - state.y_tex = y_tex; - state.u_tex = u_tex; - state.v_tex = v_tex; - state.y_view = y_view; - state.u_view = u_view; - state.v_view = v_view; - state.bind_group = bind_group; - state.y_pad_w = y_pad_w; - state.uv_pad_w = uv_pad_w; - state.dims = (shared.width, shared.height); - } - - if shared.dirty { - let y_bytes_per_row = align_up(shared.width, 256); - let uv_w = (shared.width + 1) / 2; - let uv_h = (shared.height + 1) / 2; - let uv_bytes_per_row = align_up(uv_w, 256); - - // Pack and upload Y - if shared.stride_y >= shared.width { - let mut packed = vec![0u8; (y_bytes_per_row * shared.height) as usize]; - for row in 0..shared.height { - let src = &shared.y[(row * shared.stride_y) as usize..][..shared.width as usize]; - let dst_off = (row * y_bytes_per_row) as usize; - packed[dst_off..dst_off + shared.width as usize].copy_from_slice(src); - } - queue.write_texture( - wgpu::ImageCopyTexture { - texture: &state.y_tex, - mip_level: 0, - origin: wgpu::Origin3d::ZERO, - aspect: wgpu::TextureAspect::All, - }, - &packed, - wgpu::ImageDataLayout { - offset: 0, - bytes_per_row: Some(y_bytes_per_row), - rows_per_image: Some(shared.height), - }, - wgpu::Extent3d { width: state.y_pad_w, height: shared.height, depth_or_array_layers: 1 }, - ); - } - - // Pack and upload U,V - if shared.stride_u >= uv_w && shared.stride_v >= uv_w { - let mut packed_u = vec![0u8; (uv_bytes_per_row * uv_h) as usize]; - let mut packed_v = vec![0u8; (uv_bytes_per_row * uv_h) as usize]; - for row in 0..uv_h { - let src_u = &shared.u[(row * shared.stride_u) as usize..][..uv_w as usize]; - let src_v = &shared.v[(row * shared.stride_v) as usize..][..uv_w as usize]; - let dst_off = (row * uv_bytes_per_row) as usize; - packed_u[dst_off..dst_off + uv_w as usize].copy_from_slice(src_u); - packed_v[dst_off..dst_off + uv_w as usize].copy_from_slice(src_v); - } - queue.write_texture( - wgpu::ImageCopyTexture { texture: &state.u_tex, mip_level: 0, origin: wgpu::Origin3d::ZERO, aspect: wgpu::TextureAspect::All }, - &packed_u, - wgpu::ImageDataLayout { offset: 0, bytes_per_row: Some(uv_bytes_per_row), rows_per_image: Some(uv_h) }, - wgpu::Extent3d { width: state.uv_pad_w, height: uv_h, depth_or_array_layers: 1 }, - ); - queue.write_texture( - wgpu::ImageCopyTexture { texture: &state.v_tex, mip_level: 0, origin: wgpu::Origin3d::ZERO, aspect: wgpu::TextureAspect::All }, - &packed_v, - wgpu::ImageDataLayout { offset: 0, bytes_per_row: Some(uv_bytes_per_row), rows_per_image: Some(uv_h) }, - wgpu::Extent3d { width: state.uv_pad_w, height: uv_h, depth_or_array_layers: 1 }, - ); - } - - // Update params uniform - let params = ParamsUniform { src_w: shared.width, src_h: shared.height, y_tex_w: state.y_pad_w, uv_tex_w: state.uv_pad_w }; - queue.write_buffer(&state.params_buf, 0, bytemuck::bytes_of(¶ms)); - - shared.dirty = false; - } - - Vec::new() - } - - fn paint(&self, _info: egui::PaintCallbackInfo, render_pass: &mut wgpu::RenderPass<'static>, resources: &egui_wgpu_backend::CallbackResources) { - // Acquire device/queue via screen_descriptor? Not available; use resources to fetch our state - let shared = self.shared.lock(); - if shared.width == 0 || shared.height == 0 { - return; - } - - // Build pipeline and textures on first paint or on resize - let Some(state) = resources.get::() else { - // prepare may not have created the state yet (race with first frame); skip this paint - return; - }; - - if state.dims != (shared.width, shared.height) { - // We cannot rebuild here (no device access); skip drawing until next frame where prepare will rebuild - return; - } - - render_pass.set_pipeline(&state.pipeline); - render_pass.set_bind_group(0, &state.bind_group, &[]); - // Fullscreen triangle without vertex buffer - render_pass.draw(0..3, 0..1); - } -} - -// Build or rebuild GPU state. This helper is intended to be called from prepare, but we lack device there in current API constraints. -// Note: eframe/egui-wgpu provides device in paint via RenderPass context; however, to keep this example concise, we set up the state once externally. - diff --git a/examples/local_video/src/yuv_viewer.rs b/examples/local_video/src/yuv_viewer.rs new file mode 100644 index 000000000..75323e16f --- /dev/null +++ b/examples/local_video/src/yuv_viewer.rs @@ -0,0 +1,479 @@ +use eframe::egui; +use egui_wgpu as egui_wgpu_backend; +use egui_wgpu_backend::CallbackTrait; +use eframe::wgpu::{self, util::DeviceExt}; +use parking_lot::Mutex; +use std::sync::Arc; + +/// Shared I420 YUV frame storage for GPU rendering. +pub struct SharedYuv { + pub width: u32, + pub height: u32, + pub stride_y: u32, + pub stride_u: u32, + pub stride_v: u32, + pub y: Vec, + pub u: Vec, + pub v: Vec, + pub dirty: bool, + /// Optional sensor timestamp in microseconds since UNIX epoch. + pub sensor_timestamp: Option, +} + +/// egui-wgpu callback that renders a fullscreen quad from a `SharedYuv` buffer. +pub struct YuvPaintCallback { + pub shared: Arc>, +} + +struct YuvGpuState { + pipeline: wgpu::RenderPipeline, + sampler: wgpu::Sampler, + bind_layout: wgpu::BindGroupLayout, + y_tex: wgpu::Texture, + u_tex: wgpu::Texture, + v_tex: wgpu::Texture, + y_view: wgpu::TextureView, + u_view: wgpu::TextureView, + v_view: wgpu::TextureView, + bind_group: wgpu::BindGroup, + params_buf: wgpu::Buffer, + y_pad_w: u32, + uv_pad_w: u32, + dims: (u32, u32), +} + +impl YuvGpuState { + fn create_textures( + device: &wgpu::Device, + _width: u32, + height: u32, + y_pad_w: u32, + uv_pad_w: u32, + ) -> ( + wgpu::Texture, + wgpu::Texture, + wgpu::Texture, + wgpu::TextureView, + wgpu::TextureView, + wgpu::TextureView, + ) { + let y_size = wgpu::Extent3d { width: y_pad_w, height, depth_or_array_layers: 1 }; + let uv_size = wgpu::Extent3d { + width: uv_pad_w, + height: (height + 1) / 2, + depth_or_array_layers: 1, + }; + let usage = wgpu::TextureUsages::COPY_DST | wgpu::TextureUsages::TEXTURE_BINDING; + let desc = |size: wgpu::Extent3d| wgpu::TextureDescriptor { + label: Some("yuv_plane"), + size, + mip_level_count: 1, + sample_count: 1, + dimension: wgpu::TextureDimension::D2, + format: wgpu::TextureFormat::R8Unorm, + usage, + view_formats: &[], + }; + let y_tex = device.create_texture(&desc(y_size)); + let u_tex = device.create_texture(&desc(uv_size)); + let v_tex = device.create_texture(&desc(uv_size)); + let y_view = y_tex.create_view(&wgpu::TextureViewDescriptor::default()); + let u_view = u_tex.create_view(&wgpu::TextureViewDescriptor::default()); + let v_view = v_tex.create_view(&wgpu::TextureViewDescriptor::default()); + (y_tex, u_tex, v_tex, y_view, u_view, v_view) + } +} + +fn align_up(value: u32, alignment: u32) -> u32 { + ((value + alignment - 1) / alignment) * alignment +} + +#[repr(C)] +#[derive(Clone, Copy, bytemuck::Pod, bytemuck::Zeroable)] +struct ParamsUniform { + src_w: u32, + src_h: u32, + y_tex_w: u32, + uv_tex_w: u32, +} + +impl CallbackTrait for YuvPaintCallback { + fn prepare( + &self, + device: &wgpu::Device, + queue: &wgpu::Queue, + _screen_desc: &egui_wgpu_backend::ScreenDescriptor, + _encoder: &mut wgpu::CommandEncoder, + resources: &mut egui_wgpu_backend::CallbackResources, + ) -> Vec { + // Initialize or update GPU state lazily based on current frame + let mut shared = self.shared.lock(); + + // Nothing to draw yet + if shared.width == 0 || shared.height == 0 { + return Vec::new(); + } + + // Fetch or create our GPU state + if resources.get::().is_none() { + // Build pipeline and initial small textures; will be recreated on first upload + let shader_src = include_str!("yuv_shader.wgsl"); + let shader = device.create_shader_module(wgpu::ShaderModuleDescriptor { + label: Some("yuv_shader"), + source: wgpu::ShaderSource::Wgsl(shader_src.into()), + }); + + let bind_layout = device.create_bind_group_layout(&wgpu::BindGroupLayoutDescriptor { + label: Some("yuv_bind_layout"), + entries: &[ + wgpu::BindGroupLayoutEntry { + binding: 0, + visibility: wgpu::ShaderStages::FRAGMENT, + ty: wgpu::BindingType::Sampler(wgpu::SamplerBindingType::Filtering), + count: None, + }, + wgpu::BindGroupLayoutEntry { + binding: 1, + visibility: wgpu::ShaderStages::FRAGMENT, + ty: wgpu::BindingType::Texture { + sample_type: wgpu::TextureSampleType::Float { filterable: true }, + view_dimension: wgpu::TextureViewDimension::D2, + multisampled: false, + }, + count: None, + }, + wgpu::BindGroupLayoutEntry { + binding: 2, + visibility: wgpu::ShaderStages::FRAGMENT, + ty: wgpu::BindingType::Texture { + sample_type: wgpu::TextureSampleType::Float { filterable: true }, + view_dimension: wgpu::TextureViewDimension::D2, + multisampled: false, + }, + count: None, + }, + wgpu::BindGroupLayoutEntry { + binding: 3, + visibility: wgpu::ShaderStages::FRAGMENT, + ty: wgpu::BindingType::Texture { + sample_type: wgpu::TextureSampleType::Float { filterable: true }, + view_dimension: wgpu::TextureViewDimension::D2, + multisampled: false, + }, + count: None, + }, + wgpu::BindGroupLayoutEntry { + binding: 4, + visibility: wgpu::ShaderStages::FRAGMENT, + ty: wgpu::BindingType::Buffer { + ty: wgpu::BufferBindingType::Uniform, + has_dynamic_offset: false, + min_binding_size: Some( + std::num::NonZeroU64::new( + std::mem::size_of::() as u64, + ) + .unwrap(), + ), + }, + count: None, + }, + ], + }); + + let pipeline_layout = device.create_pipeline_layout(&wgpu::PipelineLayoutDescriptor { + label: Some("yuv_pipeline_layout"), + bind_group_layouts: &[&bind_layout], + push_constant_ranges: &[], + }); + + let render_pipeline = device.create_render_pipeline(&wgpu::RenderPipelineDescriptor { + label: Some("yuv_pipeline"), + layout: Some(&pipeline_layout), + vertex: wgpu::VertexState { + module: &shader, + entry_point: Some("vs_main"), + buffers: &[], + compilation_options: wgpu::PipelineCompilationOptions::default(), + }, + fragment: Some(wgpu::FragmentState { + module: &shader, + entry_point: Some("fs_main"), + targets: &[Some(wgpu::ColorTargetState { + format: wgpu::TextureFormat::Bgra8Unorm, + blend: Some(wgpu::BlendState::ALPHA_BLENDING), + write_mask: wgpu::ColorWrites::ALL, + })], + compilation_options: wgpu::PipelineCompilationOptions::default(), + }), + primitive: wgpu::PrimitiveState { + topology: wgpu::PrimitiveTopology::TriangleList, + strip_index_format: None, + front_face: wgpu::FrontFace::Ccw, + cull_mode: None, + unclipped_depth: false, + polygon_mode: wgpu::PolygonMode::Fill, + conservative: false, + }, + depth_stencil: None, + multisample: wgpu::MultisampleState { + count: 1, + mask: !0, + alpha_to_coverage_enabled: false, + }, + multiview: None, + cache: None, + }); + + let sampler = device.create_sampler(&wgpu::SamplerDescriptor { + label: Some("yuv_sampler"), + address_mode_u: wgpu::AddressMode::ClampToEdge, + address_mode_v: wgpu::AddressMode::ClampToEdge, + address_mode_w: wgpu::AddressMode::ClampToEdge, + mag_filter: wgpu::FilterMode::Linear, + min_filter: wgpu::FilterMode::Linear, + mipmap_filter: wgpu::FilterMode::Nearest, + ..Default::default() + }); + + let params_buf = device.create_buffer_init(&wgpu::util::BufferInitDescriptor { + label: Some("yuv_params"), + contents: bytemuck::bytes_of(&ParamsUniform { + src_w: 1, + src_h: 1, + y_tex_w: 1, + uv_tex_w: 1, + }), + usage: wgpu::BufferUsages::UNIFORM | wgpu::BufferUsages::COPY_DST, + }); + + // Initial tiny textures + let (y_tex, u_tex, v_tex, y_view, u_view, v_view) = + YuvGpuState::create_textures(device, 1, 1, 256, 256); + let bind_group = device.create_bind_group(&wgpu::BindGroupDescriptor { + label: Some("yuv_bind_group"), + layout: &bind_layout, + entries: &[ + wgpu::BindGroupEntry { + binding: 0, + resource: wgpu::BindingResource::Sampler(&sampler), + }, + wgpu::BindGroupEntry { + binding: 1, + resource: wgpu::BindingResource::TextureView(&y_view), + }, + wgpu::BindGroupEntry { + binding: 2, + resource: wgpu::BindingResource::TextureView(&u_view), + }, + wgpu::BindGroupEntry { + binding: 3, + resource: wgpu::BindingResource::TextureView(&v_view), + }, + wgpu::BindGroupEntry { + binding: 4, + resource: params_buf.as_entire_binding(), + }, + ], + }); + + let new_state = YuvGpuState { + pipeline: render_pipeline, + sampler, + bind_layout, + y_tex, + u_tex, + v_tex, + y_view, + u_view, + v_view, + bind_group, + params_buf, + y_pad_w: 256, + uv_pad_w: 256, + dims: (0, 0), + }; + resources.insert(new_state); + } + let state = resources.get_mut::().unwrap(); + + // Upload planes when marked dirty + // Recreate textures/bind group on size change + if state.dims != (shared.width, shared.height) { + let y_pad_w = align_up(shared.width, 256); + let uv_w = (shared.width + 1) / 2; + let uv_pad_w = align_up(uv_w, 256); + let (y_tex, u_tex, v_tex, y_view, u_view, v_view) = + YuvGpuState::create_textures(device, shared.width, shared.height, y_pad_w, uv_pad_w); + let bind_group = device.create_bind_group(&wgpu::BindGroupDescriptor { + label: Some("yuv_bind_group"), + layout: &state.bind_layout, + entries: &[ + wgpu::BindGroupEntry { + binding: 0, + resource: wgpu::BindingResource::Sampler(&state.sampler), + }, + wgpu::BindGroupEntry { + binding: 1, + resource: wgpu::BindingResource::TextureView(&y_view), + }, + wgpu::BindGroupEntry { + binding: 2, + resource: wgpu::BindingResource::TextureView(&u_view), + }, + wgpu::BindGroupEntry { + binding: 3, + resource: wgpu::BindingResource::TextureView(&v_view), + }, + wgpu::BindGroupEntry { + binding: 4, + resource: state.params_buf.as_entire_binding(), + }, + ], + }); + state.y_tex = y_tex; + state.u_tex = u_tex; + state.v_tex = v_tex; + state.y_view = y_view; + state.u_view = u_view; + state.v_view = v_view; + state.bind_group = bind_group; + state.y_pad_w = y_pad_w; + state.uv_pad_w = uv_pad_w; + state.dims = (shared.width, shared.height); + } + + if shared.dirty { + let y_bytes_per_row = align_up(shared.width, 256); + let uv_w = (shared.width + 1) / 2; + let uv_h = (shared.height + 1) / 2; + let uv_bytes_per_row = align_up(uv_w, 256); + + // Pack and upload Y + if shared.stride_y >= shared.width { + let mut packed = vec![0u8; (y_bytes_per_row * shared.height) as usize]; + for row in 0..shared.height { + let src = + &shared.y[(row * shared.stride_y) as usize..][..shared.width as usize]; + let dst_off = (row * y_bytes_per_row) as usize; + packed[dst_off..dst_off + shared.width as usize].copy_from_slice(src); + } + queue.write_texture( + wgpu::ImageCopyTexture { + texture: &state.y_tex, + mip_level: 0, + origin: wgpu::Origin3d::ZERO, + aspect: wgpu::TextureAspect::All, + }, + &packed, + wgpu::ImageDataLayout { + offset: 0, + bytes_per_row: Some(y_bytes_per_row), + rows_per_image: Some(shared.height), + }, + wgpu::Extent3d { + width: state.y_pad_w, + height: shared.height, + depth_or_array_layers: 1, + }, + ); + } + + // Pack and upload U,V + if shared.stride_u >= uv_w && shared.stride_v >= uv_w { + let mut packed_u = vec![0u8; (uv_bytes_per_row * uv_h) as usize]; + let mut packed_v = vec![0u8; (uv_bytes_per_row * uv_h) as usize]; + for row in 0..uv_h { + let src_u = + &shared.u[(row * shared.stride_u) as usize..][..uv_w as usize]; + let src_v = + &shared.v[(row * shared.stride_v) as usize..][..uv_w as usize]; + let dst_off = (row * uv_bytes_per_row) as usize; + packed_u[dst_off..dst_off + uv_w as usize].copy_from_slice(src_u); + packed_v[dst_off..dst_off + uv_w as usize].copy_from_slice(src_v); + } + queue.write_texture( + wgpu::ImageCopyTexture { + texture: &state.u_tex, + mip_level: 0, + origin: wgpu::Origin3d::ZERO, + aspect: wgpu::TextureAspect::All, + }, + &packed_u, + wgpu::ImageDataLayout { + offset: 0, + bytes_per_row: Some(uv_bytes_per_row), + rows_per_image: Some(uv_h), + }, + wgpu::Extent3d { + width: state.uv_pad_w, + height: uv_h, + depth_or_array_layers: 1, + }, + ); + queue.write_texture( + wgpu::ImageCopyTexture { + texture: &state.v_tex, + mip_level: 0, + origin: wgpu::Origin3d::ZERO, + aspect: wgpu::TextureAspect::All, + }, + &packed_v, + wgpu::ImageDataLayout { + offset: 0, + bytes_per_row: Some(uv_bytes_per_row), + rows_per_image: Some(uv_h), + }, + wgpu::Extent3d { + width: state.uv_pad_w, + height: uv_h, + depth_or_array_layers: 1, + }, + ); + } + + // Update params uniform + let params = ParamsUniform { + src_w: shared.width, + src_h: shared.height, + y_tex_w: state.y_pad_w, + uv_tex_w: state.uv_pad_w, + }; + queue.write_buffer(&state.params_buf, 0, bytemuck::bytes_of(¶ms)); + + shared.dirty = false; + } + + Vec::new() + } + + fn paint( + &self, + _info: egui::PaintCallbackInfo, + render_pass: &mut wgpu::RenderPass<'static>, + resources: &egui_wgpu_backend::CallbackResources, + ) { + // Acquire current frame + let shared = self.shared.lock(); + if shared.width == 0 || shared.height == 0 { + return; + } + + // Build pipeline and textures on first paint or on resize + let Some(state) = resources.get::() else { + // prepare may not have created the state yet (race with first frame); skip this paint + return; + }; + + if state.dims != (shared.width, shared.height) { + // We cannot rebuild here (no device access); skip drawing until next frame where prepare will rebuild + return; + } + + render_pass.set_pipeline(&state.pipeline); + render_pass.set_bind_group(0, &state.bind_group, &[]); + // Fullscreen triangle without vertex buffer + render_pass.draw(0..3, 0..1); + } +} + + From c00496140d6bf488c2c8714a752d18a8498dc813 Mon Sep 17 00:00:00 2001 From: David Chen Date: Wed, 10 Dec 2025 16:01:37 -0800 Subject: [PATCH 7/7] add timestamp render on the video --- examples/local_video/src/publisher.rs | 177 +++++++++++++++++++++---- examples/local_video/src/subscriber.rs | 155 +++++++++++++++++++--- examples/local_video/src/yuv_viewer.rs | 2 +- 3 files changed, 286 insertions(+), 48 deletions(-) diff --git a/examples/local_video/src/publisher.rs b/examples/local_video/src/publisher.rs index 1b6b6c050..8fe05490e 100644 --- a/examples/local_video/src/publisher.rs +++ b/examples/local_video/src/publisher.rs @@ -35,6 +35,13 @@ fn format_sensor_timestamp(ts_micros: i64) -> Option { dt.format(&format).ok() } +fn now_unix_timestamp_micros() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH") + .as_micros() as i64 +} + #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -90,6 +97,10 @@ struct Args { #[arg(long, default_value_t = false)] sensor_timestamp: bool, + /// Show system time and delta vs sensor timestamp in the preview overlay + #[arg(long, default_value_t = false)] + show_sys_time: bool, + /// Use H.265/HEVC encoding if supported (falls back to H.264 on failure) #[arg(long, default_value_t = false)] h265: bool, @@ -317,6 +328,19 @@ async fn main() -> Result<()> { ticker.tick().await; let iter_start = Instant::now(); + // Capture a sensor timestamp at the beginning of the loop so it reflects + // the scheduled capture time rather than the later capture_frame call. + let loop_sensor_ts = if show_sensor_ts { + Some( + std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH") + .as_micros() as i64, + ) + } else { + None + }; + // Get frame as RGB24 (decoded by nokhwa if needed) let t0 = Instant::now(); let frame_buf = camera.frame()?; @@ -443,14 +467,10 @@ async fn main() -> Result<()> { // Update RTP timestamp (monotonic, microseconds since start) frame.timestamp_us = start_ts.elapsed().as_micros() as i64; - // Optionally attach a sensor timestamp and push it into the shared queue - // used by the sensor timestamp transformer. + // Optionally attach a sensor timestamp captured at the top of the loop and + // push it into the shared queue used by the sensor timestamp transformer. if show_sensor_ts { - if let Some(store) = track.sensor_timestamp_store() { - let sensor_ts = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .expect("SystemTime before UNIX EPOCH") - .as_micros() as i64; + if let (Some(store), Some(sensor_ts)) = (track.sensor_timestamp_store(), loop_sensor_ts) { frame.sensor_timestamp_us = Some(sensor_ts); store.store(frame.timestamp_us, sensor_ts); last_sensor_ts = Some(sensor_ts); @@ -553,6 +573,9 @@ async fn main() -> Result<()> { if let Some(shared) = shared_preview { struct PreviewApp { shared: Arc>, + show_sys_time: bool, + last_latency_ms: Option, + last_latency_update: Option, } impl eframe::App for PreviewApp { @@ -572,32 +595,132 @@ async fn main() -> Result<()> { ui.painter().add(cb); }); - // Sensor timestamp overlay: top-left, same style as subscriber. - let sensor_timestamp_text = { - let shared = self.shared.lock(); - shared - .sensor_timestamp - .and_then(format_sensor_timestamp) - }; - if let Some(ts_text) = sensor_timestamp_text { - egui::Area::new("publisher_sensor_timestamp_overlay".into()) - .anchor(egui::Align2::LEFT_TOP, egui::vec2(20.0, 20.0)) - .interactable(false) - .show(ctx, |ui| { - ui.label( - egui::RichText::new(ts_text) - .monospace() - .size(22.0) - .color(egui::Color32::WHITE), - ); - }); + // Sensor timestamp / system time overlay for the local preview. + // + // When `show_sys_time` is false, we only render the user (sensor) timestamp, if present. + // + // When `show_sys_time` is true: + // - If there is a sensor timestamp, we render up to three rows: + // 1) "usr ts: yyyy-mm-dd hh:mm:ss:nnn" (sensor timestamp) + // 2) "sys ts: yyyy-mm-dd hh:mm:ss:nnn" (system timestamp) + // 3) "latency: xxxxms" (delta in ms, 4 digits, updated at 2 Hz) + // - If there is no sensor timestamp, we render a single row: + // "sys ts: yyyy-mm-dd hh:mm:ss:nnn" + if self.show_sys_time { + let (sensor_raw, sensor_text, sys_raw, sys_text_opt) = { + let shared = self.shared.lock(); + let sensor_raw = shared.sensor_timestamp; + let sensor_text = sensor_raw.and_then(format_sensor_timestamp); + let sys_raw = now_unix_timestamp_micros(); + let sys_text = format_sensor_timestamp(sys_raw); + (sensor_raw, sensor_text, sys_raw, sys_text) + }; + + if let Some(sys_text) = sys_text_opt { + // Latency: throttle updates to 2 Hz to reduce jitter in the display. + let latency_to_show = if let Some(sensor) = sensor_raw { + let now = Instant::now(); + let needs_update = self + .last_latency_update + .map(|prev| now.duration_since(prev) >= Duration::from_millis(500)) + .unwrap_or(true); + if needs_update { + let delta_micros = sys_raw - sensor; + let delta_ms = delta_micros as f64 / 1000.0; + // Clamp to [0, 9999] ms to keep formatting consistent. + let clamped = delta_ms.round().clamp(0.0, 9_999.0) as i32; + self.last_latency_ms = Some(clamped); + self.last_latency_update = Some(now); + } + self.last_latency_ms + } else { + self.last_latency_ms = None; + self.last_latency_update = None; + None + }; + + egui::Area::new("publisher_sensor_sys_timestamp_overlay".into()) + .anchor(egui::Align2::LEFT_TOP, egui::vec2(20.0, 20.0)) + .interactable(false) + .show(ctx, |ui| { + ui.vertical(|ui| { + if let Some(ts_text) = sensor_text { + // First row: user (sensor) timestamp + let usr_line = format!("usr ts: {ts_text}"); + ui.label( + egui::RichText::new(usr_line) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + + // Second row: system timestamp. + let sys_line = format!("sys ts: {sys_text}"); + ui.label( + egui::RichText::new(sys_line) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + + // Third row: latency in milliseconds (if available). + if let Some(latency_ms) = latency_to_show { + let latency_line = + format!("latency: {:04}ms", latency_ms.max(0)); + ui.label( + egui::RichText::new(latency_line) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + } + } else { + // No sensor timestamp: only show system timestamp. + let sys_line = format!("sys ts: {sys_text}"); + ui.label( + egui::RichText::new(sys_line) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + } + }); + }); + } + } else { + // Original behavior: render only the user (sensor) timestamp, if present. + let sensor_timestamp_text = { + let shared = self.shared.lock(); + shared + .sensor_timestamp + .and_then(format_sensor_timestamp) + }; + if let Some(ts_text) = sensor_timestamp_text { + let usr_line = format!("usr ts: {ts_text}"); + egui::Area::new("publisher_sensor_timestamp_overlay".into()) + .anchor(egui::Align2::LEFT_TOP, egui::vec2(20.0, 20.0)) + .interactable(false) + .show(ctx, |ui| { + ui.label( + egui::RichText::new(usr_line) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + }); + } } ctx.request_repaint_after(Duration::from_millis(16)); } } - let app = PreviewApp { shared }; + let app = PreviewApp { + shared, + show_sys_time: args.show_sys_time, + last_latency_ms: None, + last_latency_update: None, + }; let native_options = eframe::NativeOptions::default(); eframe::run_native( "LiveKit Camera Publisher Preview", diff --git a/examples/local_video/src/subscriber.rs b/examples/local_video/src/subscriber.rs index 15b3c227f..b78ee30de 100644 --- a/examples/local_video/src/subscriber.rs +++ b/examples/local_video/src/subscriber.rs @@ -49,6 +49,10 @@ struct Args { /// Only subscribe to video from this participant identity #[arg(long)] participant: Option, + + /// Show system time and delta vs sensor timestamp in the YUV viewer overlay + #[arg(long, default_value_t = false)] + show_sys_time: bool, } #[derive(Clone)] @@ -112,9 +116,19 @@ fn format_sensor_timestamp(ts_micros: i64) -> Option { dt.format(&format).ok() } +fn now_unix_timestamp_micros() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .expect("SystemTime before UNIX EPOCH") + .as_micros() as i64 +} + struct VideoApp { shared: Arc>, simulcast: Arc>, + show_sys_time: bool, + last_latency_ms: Option, + last_latency_update: Option, } impl eframe::App for VideoApp { @@ -134,25 +148,120 @@ impl eframe::App for VideoApp { ui.painter().add(cb); }); - // Sensor timestamp overlay: top-left. Show nothing if no sensor timestamp parsed. - let sensor_timestamp_text = { - let shared = self.shared.lock(); - shared - .sensor_timestamp - .and_then(format_sensor_timestamp) - }; - if let Some(ts_text) = sensor_timestamp_text { - egui::Area::new("sensor_timestamp_overlay".into()) - .anchor(egui::Align2::LEFT_TOP, egui::vec2(20.0, 20.0)) - .interactable(false) - .show(ctx, |ui| { - ui.label( - egui::RichText::new(ts_text) - .monospace() - .size(22.0) - .color(egui::Color32::WHITE), - ); - }); + // Sensor timestamp / system time overlay: top-left. + // + // When `show_sys_time` is false, we only render the user (sensor) timestamp, if present. + // + // When `show_sys_time` is true: + // - If there is a sensor timestamp, we render up to three rows: + // 1) "usr ts: yyyy-mm-dd hh:mm:ss:nnn" (sensor timestamp) + // 2) "sys ts: yyyy-mm-dd hh:mm:ss:nnn" (system timestamp) + // 3) "latency: xxxxms" (delta in ms, 4 digits, updated at 2 Hz) + // - If there is no sensor timestamp, we render a single row: + // "sys ts: yyyy-mm-dd hh:mm:ss:nnn" + if self.show_sys_time { + let (sensor_raw, sensor_text, sys_raw, sys_text_opt) = { + let shared = self.shared.lock(); + let sensor_raw = shared.sensor_timestamp; + let sensor_text = sensor_raw.and_then(format_sensor_timestamp); + let sys_raw = now_unix_timestamp_micros(); + let sys_text = format_sensor_timestamp(sys_raw); + (sensor_raw, sensor_text, sys_raw, sys_text) + }; + + if let Some(sys_text) = sys_text_opt { + // Latency: throttle updates to 2 Hz to reduce jitter in the display. + let latency_to_show = if let Some(sensor) = sensor_raw { + let now = Instant::now(); + let needs_update = self + .last_latency_update + .map(|prev| now.duration_since(prev) >= Duration::from_millis(500)) + .unwrap_or(true); + if needs_update { + let delta_micros = sys_raw - sensor; + let delta_ms = delta_micros as f64 / 1000.0; + // Clamp to [0, 9999] ms to keep formatting consistent. + let clamped = delta_ms.round().clamp(0.0, 9_999.0) as i32; + self.last_latency_ms = Some(clamped); + self.last_latency_update = Some(now); + } + self.last_latency_ms + } else { + self.last_latency_ms = None; + self.last_latency_update = None; + None + }; + + egui::Area::new("sensor_sys_timestamp_overlay".into()) + .anchor(egui::Align2::LEFT_TOP, egui::vec2(20.0, 20.0)) + .interactable(false) + .show(ctx, |ui| { + ui.vertical(|ui| { + if let Some(ts_text) = sensor_text { + // First row: user (sensor) timestamp + let usr_line = format!("usr ts: {ts_text}"); + ui.label( + egui::RichText::new(usr_line) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + + // Second row: system timestamp. + let sys_line = format!("sys ts: {sys_text}"); + ui.label( + egui::RichText::new(sys_line) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + + // Third row: latency in milliseconds (if available). + if let Some(latency_ms) = latency_to_show { + let latency_line = + format!("latency: {:04}ms", latency_ms.max(0)); + ui.label( + egui::RichText::new(latency_line) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + } + } else { + // No sensor timestamp: only show system timestamp. + let sys_line = format!("sys ts: {sys_text}"); + ui.label( + egui::RichText::new(sys_line) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + } + }); + }); + } + } else { + // Original behavior: render only the user (sensor) timestamp, if present. + let sensor_timestamp_text = { + let shared = self.shared.lock(); + shared + .sensor_timestamp + .and_then(format_sensor_timestamp) + }; + if let Some(ts_text) = sensor_timestamp_text { + let usr_line = format!("usr ts: {ts_text}"); + egui::Area::new("sensor_timestamp_overlay".into()) + .anchor(egui::Align2::LEFT_TOP, egui::vec2(20.0, 20.0)) + .interactable(false) + .show(ctx, |ui| { + ui.label( + egui::RichText::new(usr_line) + .monospace() + .size(22.0) + .color(egui::Color32::WHITE), + ); + }); + } } // Simulcast layer controls: bottom-left overlay @@ -504,7 +613,13 @@ async fn main() -> Result<()> { }); // Start UI - let app = VideoApp { shared, simulcast }; + let app = VideoApp { + shared, + simulcast, + show_sys_time: args.show_sys_time, + last_latency_ms: None, + last_latency_update: None, + }; let native_options = eframe::NativeOptions::default(); eframe::run_native("LiveKit Video Subscriber", native_options, Box::new(|_| Ok::, _>(Box::new(app))))?; diff --git a/examples/local_video/src/yuv_viewer.rs b/examples/local_video/src/yuv_viewer.rs index 75323e16f..dc4c9c3c9 100644 --- a/examples/local_video/src/yuv_viewer.rs +++ b/examples/local_video/src/yuv_viewer.rs @@ -463,7 +463,7 @@ impl CallbackTrait for YuvPaintCallback { // prepare may not have created the state yet (race with first frame); skip this paint return; }; - + if state.dims != (shared.width, shared.height) { // We cannot rebuild here (no device access); skip drawing until next frame where prepare will rebuild return;