Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 131 additions & 30 deletions livekit-ffi/src/server/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@ use std::{

use env_logger;
use log::{self, Log};
use parking_lot::{Mutex, RwLock};
use tokio::sync::{mpsc, oneshot};

use crate::{proto, FFI_SERVER};

pub const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
pub const BATCH_SIZE: usize = 32;

/// Logger that forward logs to the FfiClient when capture_logs is enabled
/// Otherwise fallback to the env_logger
/// Logger that forwards logs to the FfiClient when capture_logs is enabled.
/// Otherwise falls back to env_logger.
///
/// The global `log` logger is installed once for the process, but `setup()`
/// and `dispose()` refresh the env_logger filter from the current `RUST_LOG`
/// value and restart the capture forward task so repeated initialize/shutdown
/// cycles (for example gtest repeats) observe the latest environment.
pub struct FfiLogger {
async_runtime: tokio::runtime::Handle,
log_tx: mpsc::UnboundedSender<LogMsg>,
log_tx: Mutex<Option<mpsc::UnboundedSender<LogMsg>>>,
capture_logs: AtomicBool,
env_logger: env_logger::Logger,
env_logger: RwLock<env_logger::Logger>,
}

enum LogMsg {
Expand All @@ -42,56 +48,111 @@ enum LogMsg {

impl FfiLogger {
pub fn new(async_runtime: tokio::runtime::Handle) -> Self {
let (log_tx, log_rx) = mpsc::unbounded_channel();
async_runtime.spawn(log_forward_task(log_rx));

let env_logger = env_logger::Builder::from_default_env().build();
FfiLogger {
async_runtime,
log_tx,
capture_logs: AtomicBool::new(false), // Always false by default to ensure the server
// is always initialized when using capture_logs
env_logger,
log_tx: Mutex::new(None),
capture_logs: AtomicBool::new(false),
// Avoid reading RUST_LOG until the client calls setup().
env_logger: RwLock::new(silent_env_logger()),
}
}
}

impl FfiLogger {
pub fn capture_logs(&self) -> bool {
self.capture_logs.load(Ordering::Acquire)
/// Prepare logging for a new initialize cycle.
pub fn setup(&self, capture_logs: bool) {
*self.env_logger.write() = env_logger_from_default_env();
self.capture_logs.store(capture_logs, Ordering::Release);
self.start_log_forward_task();
}

/// Tear down logging for the current initialize cycle.
pub fn dispose(&self) {
self.flush_captured_logs();
self.capture_logs.store(false, Ordering::Release);
self.stop_log_forward_task();
self.env_logger.read().flush();
*self.env_logger.write() = silent_env_logger();
}

pub fn set_capture_logs(&self, capture: bool) {
self.capture_logs.store(capture, Ordering::Release);
pub fn capture_logs(&self) -> bool {
self.capture_logs.load(Ordering::Acquire)
}
}

impl Log for FfiLogger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
if !self.capture_logs() {
return self.env_logger.enabled(metadata);
if self.capture_logs() {
return true;
}

true // The ffi client decides what to log (FfiLogger is just forwarding)
self.env_logger.read().enabled(metadata)
}

fn log(&self, record: &log::Record) {
if !self.capture_logs() {
return self.env_logger.log(record);
self.env_logger.read().log(record);
return;
}

self.log_tx.send(LogMsg::Log(record.into())).unwrap();
if let Some(log_tx) = self.log_tx.lock().as_ref() {
let _ = log_tx.send(LogMsg::Log(record.into()));
}
}

fn flush(&self) {
if !self.capture_logs() {
return self.env_logger.flush();
self.env_logger.read().flush();
return;
}

self.flush_captured_logs();
}
}

impl FfiLogger {
fn flush_captured_logs(&self) {
if !self.capture_logs() {
return;
}

let log_tx_guard = self.log_tx.lock();
let Some(log_tx) = log_tx_guard.as_ref() else {
return;
};

let (tx, rx) = oneshot::channel();
self.log_tx.send(LogMsg::Flush(tx)).unwrap();
let _ = self.async_runtime.block_on(rx); // should we block?
if log_tx.send(LogMsg::Flush(tx)).is_err() {
return;
}
let _ = self.async_runtime.block_on(rx);
}

fn start_log_forward_task(&self) {
let mut log_tx = self.log_tx.lock();
if log_tx.is_some() {
return;
}

let (sender, log_rx) = mpsc::unbounded_channel();
*log_tx = Some(sender);
self.async_runtime.spawn(log_forward_task(log_rx));
}

fn stop_log_forward_task(&self) {
*self.log_tx.lock() = None;
}

#[cfg(test)]
fn env_logger_enabled(&self, metadata: &log::Metadata<'_>) -> bool {
self.env_logger.read().enabled(metadata)
}
}

fn env_logger_from_default_env() -> env_logger::Logger {
env_logger::Builder::from_default_env().build()
}

fn silent_env_logger() -> env_logger::Logger {
env_logger::Builder::new().filter_level(log::LevelFilter::Off).build()
}

async fn log_forward_task(mut rx: mpsc::UnboundedReceiver<LogMsg>) {
Expand All @@ -104,8 +165,8 @@ async fn log_forward_task(mut rx: mpsc::UnboundedReceiver<LogMsg>) {

let _ = FFI_SERVER.send_event(
proto::LogBatch {
records: batch.clone(), // Avoid clone here?
}
records: batch.clone(), // Avoid clone here?
}
.into(),
);
batch.clear();
Expand Down Expand Up @@ -138,8 +199,6 @@ async fn log_forward_task(mut rx: mpsc::UnboundedReceiver<LogMsg>) {

flush(&mut batch).await;
}

println!("log forwarding task stopped"); // Shouldn't happen (logger is leaked)
}

impl From<&log::Record<'_>> for proto::LogRecord {
Expand All @@ -166,3 +225,45 @@ impl From<log::Level> for proto::LogLevel {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use log::Metadata;

fn metadata(level: log::Level) -> Metadata<'static> {
Metadata::builder().level(level).target("livekit_ffi").build()
}

#[test]
fn setup_rebuilds_env_logger_from_rust_log() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let logger = FfiLogger::new(runtime.handle().clone());
let debug_meta = metadata(log::Level::Debug);

std::env::set_var("RUST_LOG", "livekit_ffi=error");
logger.setup(false);
assert!(!logger.env_logger_enabled(&debug_meta));

logger.dispose();

std::env::set_var("RUST_LOG", "livekit_ffi=debug");
logger.setup(false);
assert!(logger.env_logger_enabled(&debug_meta));

logger.dispose();
}

#[test]
fn dispose_stops_capture_forward_task() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let logger = FfiLogger::new(runtime.handle().clone());

logger.setup(true);
assert!(logger.log_tx.lock().is_some());

logger.dispose();
assert!(logger.log_tx.lock().is_none());
assert!(!logger.capture_logs());
}
}
4 changes: 2 additions & 2 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ impl Default for FfiServer {
// It simplifies the code a lot tho. In most cases the server is used until the end of the process
impl FfiServer {
pub fn setup(&self, config: FfiConfig) {
self.logger.setup(config.capture_logs);
*self.config.lock() = Some(config.clone());
self.logger.set_capture_logs(config.capture_logs);

log::debug!("initializing ffi server v{}", env!("CARGO_PKG_VERSION")); // TODO: Move this log
}
Expand All @@ -174,7 +174,7 @@ impl FfiServer {
room.close(self, DisconnectReason::ClientInitiated).await;
}

self.logger.set_capture_logs(false);
self.logger.dispose();

// Drop all handles
*self.config.lock() = None; // Invalidate the config
Expand Down
Loading