From 659887556ba0e58c7ad0827c83d35087ecae7aa1 Mon Sep 17 00:00:00 2001 From: Alan George Date: Tue, 16 Jun 2026 11:13:33 -0600 Subject: [PATCH] Reset FFI logger filter and forward task on init/dispose. Rebuild env_logger from the current RUST_LOG on each setup(), flush and stop the capture forward task on dispose(), and silence stderr logging between cycles so repeated initialize/shutdown (e.g. gtest) observes the latest environment without requiring process-wide env setup. Co-authored-by: Cursor --- livekit-ffi/src/server/logger.rs | 161 +++++++++++++++++++++++++------ livekit-ffi/src/server/mod.rs | 4 +- 2 files changed, 133 insertions(+), 32 deletions(-) diff --git a/livekit-ffi/src/server/logger.rs b/livekit-ffi/src/server/logger.rs index c0cdcb181..a47994747 100644 --- a/livekit-ffi/src/server/logger.rs +++ b/livekit-ffi/src/server/logger.rs @@ -19,6 +19,7 @@ use std::{ use env_logger; use log::{self, Log}; +use parking_lot::{Mutex, RwLock}; use tokio::sync::{mpsc, oneshot}; use crate::{proto, FFI_SERVER}; @@ -26,13 +27,18 @@ use crate::{proto, FFI_SERVER}; pub const FLUSH_INTERVAL: Duration = Duration::from_secs(1); pub const BATCH_SIZE: usize = 32; -/// Logger that forward logs to the FfiClient when capture_logs is enabled -/// Otherwise fallback to the env_logger +/// Logger that forwards logs to the FfiClient when capture_logs is enabled. +/// Otherwise falls back to env_logger. +/// +/// The global `log` logger is installed once for the process, but `setup()` +/// and `dispose()` refresh the env_logger filter from the current `RUST_LOG` +/// value and restart the capture forward task so repeated initialize/shutdown +/// cycles (for example gtest repeats) observe the latest environment. pub struct FfiLogger { async_runtime: tokio::runtime::Handle, - log_tx: mpsc::UnboundedSender, + log_tx: Mutex>>, capture_logs: AtomicBool, - env_logger: env_logger::Logger, + env_logger: RwLock, } enum LogMsg { @@ -42,56 +48,111 @@ enum LogMsg { impl FfiLogger { pub fn new(async_runtime: tokio::runtime::Handle) -> Self { - let (log_tx, log_rx) = mpsc::unbounded_channel(); - async_runtime.spawn(log_forward_task(log_rx)); - - let env_logger = env_logger::Builder::from_default_env().build(); FfiLogger { async_runtime, - log_tx, - capture_logs: AtomicBool::new(false), // Always false by default to ensure the server - // is always initialized when using capture_logs - env_logger, + log_tx: Mutex::new(None), + capture_logs: AtomicBool::new(false), + // Avoid reading RUST_LOG until the client calls setup(). + env_logger: RwLock::new(silent_env_logger()), } } -} -impl FfiLogger { - pub fn capture_logs(&self) -> bool { - self.capture_logs.load(Ordering::Acquire) + /// Prepare logging for a new initialize cycle. + pub fn setup(&self, capture_logs: bool) { + *self.env_logger.write() = env_logger_from_default_env(); + self.capture_logs.store(capture_logs, Ordering::Release); + self.start_log_forward_task(); + } + + /// Tear down logging for the current initialize cycle. + pub fn dispose(&self) { + self.flush_captured_logs(); + self.capture_logs.store(false, Ordering::Release); + self.stop_log_forward_task(); + self.env_logger.read().flush(); + *self.env_logger.write() = silent_env_logger(); } - pub fn set_capture_logs(&self, capture: bool) { - self.capture_logs.store(capture, Ordering::Release); + pub fn capture_logs(&self) -> bool { + self.capture_logs.load(Ordering::Acquire) } } impl Log for FfiLogger { fn enabled(&self, metadata: &log::Metadata) -> bool { - if !self.capture_logs() { - return self.env_logger.enabled(metadata); + if self.capture_logs() { + return true; } - true // The ffi client decides what to log (FfiLogger is just forwarding) + self.env_logger.read().enabled(metadata) } fn log(&self, record: &log::Record) { if !self.capture_logs() { - return self.env_logger.log(record); + self.env_logger.read().log(record); + return; } - self.log_tx.send(LogMsg::Log(record.into())).unwrap(); + if let Some(log_tx) = self.log_tx.lock().as_ref() { + let _ = log_tx.send(LogMsg::Log(record.into())); + } } fn flush(&self) { if !self.capture_logs() { - return self.env_logger.flush(); + self.env_logger.read().flush(); + return; + } + + self.flush_captured_logs(); + } +} + +impl FfiLogger { + fn flush_captured_logs(&self) { + if !self.capture_logs() { + return; } + let log_tx_guard = self.log_tx.lock(); + let Some(log_tx) = log_tx_guard.as_ref() else { + return; + }; + let (tx, rx) = oneshot::channel(); - self.log_tx.send(LogMsg::Flush(tx)).unwrap(); - let _ = self.async_runtime.block_on(rx); // should we block? + if log_tx.send(LogMsg::Flush(tx)).is_err() { + return; + } + let _ = self.async_runtime.block_on(rx); } + + fn start_log_forward_task(&self) { + let mut log_tx = self.log_tx.lock(); + if log_tx.is_some() { + return; + } + + let (sender, log_rx) = mpsc::unbounded_channel(); + *log_tx = Some(sender); + self.async_runtime.spawn(log_forward_task(log_rx)); + } + + fn stop_log_forward_task(&self) { + *self.log_tx.lock() = None; + } + + #[cfg(test)] + fn env_logger_enabled(&self, metadata: &log::Metadata<'_>) -> bool { + self.env_logger.read().enabled(metadata) + } +} + +fn env_logger_from_default_env() -> env_logger::Logger { + env_logger::Builder::from_default_env().build() +} + +fn silent_env_logger() -> env_logger::Logger { + env_logger::Builder::new().filter_level(log::LevelFilter::Off).build() } async fn log_forward_task(mut rx: mpsc::UnboundedReceiver) { @@ -104,8 +165,8 @@ async fn log_forward_task(mut rx: mpsc::UnboundedReceiver) { let _ = FFI_SERVER.send_event( proto::LogBatch { - records: batch.clone(), // Avoid clone here? - } + records: batch.clone(), // Avoid clone here? + } .into(), ); batch.clear(); @@ -138,8 +199,6 @@ async fn log_forward_task(mut rx: mpsc::UnboundedReceiver) { flush(&mut batch).await; } - - println!("log forwarding task stopped"); // Shouldn't happen (logger is leaked) } impl From<&log::Record<'_>> for proto::LogRecord { @@ -166,3 +225,45 @@ impl From for proto::LogLevel { } } } + +#[cfg(test)] +mod tests { + use super::*; + use log::Metadata; + + fn metadata(level: log::Level) -> Metadata<'static> { + Metadata::builder().level(level).target("livekit_ffi").build() + } + + #[test] + fn setup_rebuilds_env_logger_from_rust_log() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let logger = FfiLogger::new(runtime.handle().clone()); + let debug_meta = metadata(log::Level::Debug); + + std::env::set_var("RUST_LOG", "livekit_ffi=error"); + logger.setup(false); + assert!(!logger.env_logger_enabled(&debug_meta)); + + logger.dispose(); + + std::env::set_var("RUST_LOG", "livekit_ffi=debug"); + logger.setup(false); + assert!(logger.env_logger_enabled(&debug_meta)); + + logger.dispose(); + } + + #[test] + fn dispose_stops_capture_forward_task() { + let runtime = tokio::runtime::Runtime::new().unwrap(); + let logger = FfiLogger::new(runtime.handle().clone()); + + logger.setup(true); + assert!(logger.log_tx.lock().is_some()); + + logger.dispose(); + assert!(logger.log_tx.lock().is_none()); + assert!(!logger.capture_logs()); + } +} diff --git a/livekit-ffi/src/server/mod.rs b/livekit-ffi/src/server/mod.rs index 1689bdbbe..235573338 100644 --- a/livekit-ffi/src/server/mod.rs +++ b/livekit-ffi/src/server/mod.rs @@ -148,8 +148,8 @@ impl Default for FfiServer { // It simplifies the code a lot tho. In most cases the server is used until the end of the process impl FfiServer { pub fn setup(&self, config: FfiConfig) { + self.logger.setup(config.capture_logs); *self.config.lock() = Some(config.clone()); - self.logger.set_capture_logs(config.capture_logs); log::debug!("initializing ffi server v{}", env!("CARGO_PKG_VERSION")); // TODO: Move this log } @@ -174,7 +174,7 @@ impl FfiServer { room.close(self, DisconnectReason::ClientInitiated).await; } - self.logger.set_capture_logs(false); + self.logger.dispose(); // Drop all handles *self.config.lock() = None; // Invalidate the config