diff --git a/Cargo.lock b/Cargo.lock index f50c4a80..bfeae5f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -255,7 +255,7 @@ dependencies = [ [[package]] name = "aw-server" -version = "0.13.1" +version = "0.14.0" dependencies = [ "android_logger", "aw-client-rust", @@ -266,6 +266,7 @@ dependencies = [ "chrono", "clap", "dirs", + "fancy-regex", "fern", "gethostname", "jemallocator", diff --git a/aw-server/Cargo.toml b/aw-server/Cargo.toml index 5e9d6ffc..2717feee 100644 --- a/aw-server/Cargo.toml +++ b/aw-server/Cargo.toml @@ -30,6 +30,7 @@ clap = { version = "4.1", features = ["derive", "cargo"] } log-panics = { version = "2", features = ["with-backtrace"]} subtle = "2" rust-embed = { version = "8.0.0", features = ["interpolate-folder-path", "debug-embed"] } +fancy-regex = "0.12.0" aw-datastore = { path = "../aw-datastore" } aw-models = { path = "../aw-models" } diff --git a/aw-server/src/android/mod.rs b/aw-server/src/android/mod.rs index 3c32a13e..de65fe2b 100644 --- a/aw-server/src/android/mod.rs +++ b/aw-server/src/android/mod.rs @@ -124,6 +124,7 @@ pub mod android { datastore: Mutex::new(openDatastore()), asset_resolver: endpoints::AssetResolver::new(None), device_id: device_id::get_device_id(), + privacy_filters: Vec::new(), }; info!("Using server_state:: device_id: {}", server_state.device_id); diff --git a/aw-server/src/config.rs b/aw-server/src/config.rs index 37d59bbd..337c694b 100644 --- a/aw-server/src/config.rs +++ b/aw-server/src/config.rs @@ -7,6 +7,7 @@ use rocket::log::LogLevel; use serde::{Deserialize, Serialize}; use crate::dirs; +use crate::privacy_filter::PrivacyFilter; // Far from an optimal way to solve it, but works and is simple static mut TESTING: bool = true; @@ -55,6 +56,11 @@ pub struct AWConfig { // custom visualizations are located. #[serde(default = "default_custom_static")] pub custom_static: std::collections::HashMap, + + /// Server-side privacy filter rules applied at heartbeat / event ingestion. + /// See `privacy_filter` module for shape and semantics. + #[serde(default = "default_privacy_filters")] + pub privacy_filters: Vec, } impl Default for AWConfig { @@ -67,6 +73,7 @@ impl Default for AWConfig { cors: default_cors(), cors_regex: default_cors(), custom_static: default_custom_static(), + privacy_filters: default_privacy_filters(), } } } @@ -119,6 +126,10 @@ fn default_custom_static() -> std::collections::HashMap { std::collections::HashMap::new() } +fn default_privacy_filters() -> Vec { + Vec::new() +} + pub fn create_config(testing: bool) -> AWConfig { set_testing(testing); let mut config_path = dirs::get_config_dir().unwrap(); diff --git a/aw-server/src/endpoints/apikey.rs b/aw-server/src/endpoints/apikey.rs index f7bfcfe7..3b879dc2 100644 --- a/aw-server/src/endpoints/apikey.rs +++ b/aw-server/src/endpoints/apikey.rs @@ -162,6 +162,7 @@ mod tests { datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), asset_resolver: endpoints::AssetResolver::new(None), device_id: "test_id".to_string(), + privacy_filters: Vec::new(), }; let mut aw_config = AWConfig::default(); aw_config.auth.api_key = api_key; diff --git a/aw-server/src/endpoints/bucket.rs b/aw-server/src/endpoints/bucket.rs index b53d12af..a83a2b74 100644 --- a/aw-server/src/endpoints/bucket.rs +++ b/aw-server/src/endpoints/bucket.rs @@ -16,6 +16,7 @@ use rocket::State; use crate::endpoints::util::BucketsExportRocket; use crate::endpoints::{HttpErrorJson, ServerState}; +use crate::privacy_filter; #[get("/")] pub fn buckets_get( @@ -135,8 +136,12 @@ pub fn bucket_events_create( state: &State, ) -> Result>, HttpErrorJson> { let datastore = endpoints_get_lock!(state.datastore); - let res = datastore.insert_events(bucket_id, &events); - match res { + let filtered = + privacy_filter::apply_batch(&state.privacy_filters, bucket_id, events.into_inner()); + if filtered.is_empty() { + return Ok(Json(Vec::new())); + } + match datastore.insert_events(bucket_id, &filtered) { Ok(events) => Ok(Json(events)), Err(err) => Err(err.into()), } @@ -154,6 +159,14 @@ pub fn bucket_events_heartbeat( state: &State, ) -> Result, HttpErrorJson> { let heartbeat = heartbeat_json.into_inner(); + // Apply server-side privacy filter. If a `drop` rule matches, swallow + // the heartbeat with 200 OK and an empty event so older clients don't + // retry-storm; the watcher's view that "the event was accepted" is + // upheld even though nothing was persisted. + let heartbeat = match privacy_filter::apply(&state.privacy_filters, bucket_id, heartbeat) { + Some(e) => e, + None => return Ok(Json(Event::default())), + }; let datastore = endpoints_get_lock!(state.datastore); match datastore.heartbeat(bucket_id, heartbeat, pulsetime) { Ok(e) => Ok(Json(e)), diff --git a/aw-server/src/endpoints/hostcheck.rs b/aw-server/src/endpoints/hostcheck.rs index 6f583cdf..9bf079cf 100644 --- a/aw-server/src/endpoints/hostcheck.rs +++ b/aw-server/src/endpoints/hostcheck.rs @@ -127,6 +127,7 @@ mod tests { datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), asset_resolver: endpoints::AssetResolver::new(None), device_id: "test_id".to_string(), + privacy_filters: Vec::new(), }; let mut aw_config = AWConfig::default(); aw_config.address = address; diff --git a/aw-server/src/endpoints/mod.rs b/aw-server/src/endpoints/mod.rs index f0621196..70a01914 100644 --- a/aw-server/src/endpoints/mod.rs +++ b/aw-server/src/endpoints/mod.rs @@ -42,6 +42,8 @@ pub struct ServerState { pub datastore: Mutex, pub asset_resolver: AssetResolver, pub device_id: String, + /// Compiled privacy-filter rules applied at heartbeat / event ingestion. + pub privacy_filters: Vec, } #[macro_use] diff --git a/aw-server/src/lib.rs b/aw-server/src/lib.rs index 2755d536..f03e423c 100644 --- a/aw-server/src/lib.rs +++ b/aw-server/src/lib.rs @@ -24,6 +24,7 @@ pub mod device_id; pub mod dirs; pub mod endpoints; pub mod logging; +pub mod privacy_filter; #[cfg(target_os = "android")] pub mod android; diff --git a/aw-server/src/main.rs b/aw-server/src/main.rs index 2cbf39e7..e35a7e46 100644 --- a/aw-server/src/main.rs +++ b/aw-server/src/main.rs @@ -141,12 +141,18 @@ async fn main() -> Result<(), rocket::Error> { device_id::get_device_id() }; + let privacy_filters = aw_server::privacy_filter::compile(&config.privacy_filters); + if !privacy_filters.is_empty() { + info!("Loaded {} privacy filter rule(s)", privacy_filters.len()); + } + let server_state = endpoints::ServerState { // Even if legacy_import is set to true it is disabled on Android so // it will not happen there datastore: Mutex::new(aw_datastore::Datastore::new(db_path, legacy_import)), asset_resolver: endpoints::AssetResolver::new(asset_path), device_id, + privacy_filters, }; let _rocket = endpoints::build_rocket(server_state, config) diff --git a/aw-server/src/privacy_filter.rs b/aw-server/src/privacy_filter.rs new file mode 100644 index 00000000..11dca0ee --- /dev/null +++ b/aw-server/src/privacy_filter.rs @@ -0,0 +1,339 @@ +//! Server-side privacy filter for heartbeat / event ingestion. +//! +//! See ActivityWatch/aw-server-rust#482. This is the double-filter that +//! runs at ingestion regardless of whether the watcher pre-filtered. +//! Privacy-aware watchers should still pre-filter before sending; this +//! module is the consistency guarantee for older or less-aware clients. +//! +//! Rules are loaded from server config and applied to events whose +//! bucket id starts with `bucket_prefix`. Each rule has: +//! - `field`: dotted path inside `event.data` (e.g. `title`) +//! - `pattern`: a regex applied to the value as a string +//! - `action`: `drop` (discard event) or `redact` (replace value) +//! - `replacement`: replacement string for redact (default `REDACTED`) + +use fancy_regex::Regex; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use aw_models::Event; + +/// Action to take on a matching event. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum FilterAction { + /// Discard the event entirely (return `None` from apply). + Drop, + /// Replace the matched field with `replacement` (default `REDACTED`). + Redact, +} + +fn default_enabled() -> bool { + true +} + +fn default_replacement() -> String { + "REDACTED".to_string() +} + +/// A single privacy filter rule, as serialized in the server config. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PrivacyFilter { + #[serde(default = "default_enabled")] + pub enabled: bool, + + /// Bucket id prefix the rule applies to (e.g. `aw-watcher-window`). + /// Empty string matches every bucket. + #[serde(default)] + pub bucket_prefix: String, + + /// Dotted path into `event.data`. Only string values are inspected. + pub field: String, + + /// Regex pattern. Invalid patterns disable the rule with a loud log + /// at compile time (see `compile`); they are never silently escaped. + pub pattern: String, + + pub action: FilterAction, + + #[serde(default = "default_replacement")] + pub replacement: String, +} + +/// Compiled rule with its regex pre-built once. +pub struct CompiledRule { + pub bucket_prefix: String, + pub field: String, + pub action: FilterAction, + pub replacement: String, + pub regex: Regex, +} + +/// Compile a list of `PrivacyFilter` rules. Disabled or invalid-regex +/// rules are skipped with a `warn!` log (loud failure, no silent escape). +pub fn compile(filters: &[PrivacyFilter]) -> Vec { + let mut out = Vec::new(); + for f in filters { + if !f.enabled { + continue; + } + match Regex::new(&f.pattern) { + Ok(regex) => out.push(CompiledRule { + bucket_prefix: f.bucket_prefix.clone(), + field: f.field.clone(), + action: f.action, + replacement: f.replacement.clone(), + regex, + }), + Err(err) => warn!( + "Disabling privacy filter rule for bucket_prefix={:?} field={:?}: invalid regex: {}", + f.bucket_prefix, f.field, err + ), + } + } + out +} + +/// Look up a dotted field path in `event.data`. Returns the string value +/// if present and string-typed; `None` otherwise (so non-string fields +/// like numbers are not coerced into matchable strings). +fn get_field_str<'a>(event: &'a Event, field: &str) -> Option<&'a str> { + let mut cur: &Value = event.data.get(field.split('.').next()?)?; + for part in field.split('.').skip(1) { + cur = cur.get(part)?; + } + cur.as_str() +} + +/// Set a dotted field path inside `event.data` to a new string value. +/// Creates intermediate maps if necessary. +fn set_field_str(event: &mut Event, field: &str, value: &str) { + let parts: Vec<&str> = field.split('.').collect(); + if parts.is_empty() { + return; + } + if parts.len() == 1 { + event + .data + .insert(parts[0].to_string(), Value::String(value.to_string())); + return; + } + // The aw-models Event uses serde_json::Map for `data`. Walk into it + // for nested fields, ignoring non-object intermediates. + let first = parts[0]; + if !event.data.contains_key(first) { + return; // don't auto-create paths we couldn't read from + } + let entry = event.data.get_mut(first).unwrap(); + let mut cur = entry; + for part in &parts[1..parts.len() - 1] { + match cur.get_mut(*part) { + Some(next) => cur = next, + None => return, + } + } + if let Some(obj) = cur.as_object_mut() { + obj.insert( + parts[parts.len() - 1].to_string(), + Value::String(value.to_string()), + ); + } +} + +/// Apply the compiled rules to an event for the given bucket id. +/// +/// Returns `None` if any matching rule says `drop`. Returns `Some(event)` +/// with redacted fields applied otherwise (including the no-rules case). +pub fn apply(rules: &[CompiledRule], bucket_id: &str, mut event: Event) -> Option { + for rule in rules { + if !rule.bucket_prefix.is_empty() && !bucket_id.starts_with(&rule.bucket_prefix) { + continue; + } + let Some(value) = get_field_str(&event, &rule.field) else { + continue; + }; + let matched = match rule.regex.is_match(value) { + Ok(m) => m, + Err(err) => { + warn!("privacy_filter regex error on {:?}: {}", rule.field, err); + false + } + }; + if !matched { + continue; + } + match rule.action { + FilterAction::Drop => return None, + FilterAction::Redact => { + set_field_str(&mut event, &rule.field, &rule.replacement); + } + } + } + Some(event) +} + +/// Apply rules to a batch of events, removing those that should be dropped. +pub fn apply_batch(rules: &[CompiledRule], bucket_id: &str, events: Vec) -> Vec { + if rules.is_empty() { + return events; + } + events + .into_iter() + .filter_map(|e| apply(rules, bucket_id, e)) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use serde_json::{json, Map}; + + fn evt(data: serde_json::Value) -> Event { + let map = data.as_object().unwrap().clone(); + Event { + id: None, + timestamp: Utc::now(), + duration: chrono::Duration::seconds(0), + data: map, + } + } + + fn rule(bucket_prefix: &str, field: &str, pattern: &str, action: FilterAction) -> PrivacyFilter { + PrivacyFilter { + enabled: true, + bucket_prefix: bucket_prefix.to_string(), + field: field.to_string(), + pattern: pattern.to_string(), + action, + replacement: default_replacement(), + } + } + + #[test] + fn drop_matching_event() { + let rules = compile(&[rule( + "aw-watcher-window", + "title", + "(?i)private browsing|incognito", + FilterAction::Drop, + )]); + let dropped = apply( + &rules, + "aw-watcher-window_test", + evt(json!({"app": "firefox", "title": "Banking - Private Browsing"})), + ); + assert!(dropped.is_none()); + + let kept = apply( + &rules, + "aw-watcher-window_test", + evt(json!({"app": "firefox", "title": "GitHub"})), + ); + assert!(kept.is_some()); + } + + #[test] + fn redact_matching_field() { + let rules = compile(&[rule( + "aw-watcher-window", + "title", + "(?i)password", + FilterAction::Redact, + )]); + let result = apply( + &rules, + "aw-watcher-window_x11", + evt(json!({"app": "1Password", "title": "1Password - Master password"})), + ) + .expect("event should be kept after redact"); + assert_eq!(result.data.get("title").unwrap(), "REDACTED"); + assert_eq!(result.data.get("app").unwrap(), "1Password"); + } + + #[test] + fn bucket_prefix_scoping() { + let rules = compile(&[rule( + "aw-watcher-window", + "title", + ".*", + FilterAction::Drop, + )]); + // Different bucket prefix, rule should not apply. + let kept = apply( + &rules, + "aw-watcher-afk_test", + evt(json!({"status": "afk"})), + ); + assert!(kept.is_some()); + } + + #[test] + fn invalid_regex_disables_rule() { + let rules = compile(&[rule("any", "title", "(unbalanced", FilterAction::Drop)]); + assert!(rules.is_empty(), "invalid regex must not produce a rule"); + } + + #[test] + fn disabled_rule_is_skipped() { + let mut r = rule("any", "title", ".*", FilterAction::Drop); + r.enabled = false; + let rules = compile(&[r]); + assert!(rules.is_empty()); + } + + #[test] + fn empty_prefix_matches_any_bucket() { + let rules = compile(&[rule("", "title", "secret", FilterAction::Drop)]); + let result = apply( + &rules, + "aw-watcher-anything", + evt(json!({"title": "a secret"})), + ); + assert!(result.is_none()); + } + + #[test] + fn non_string_field_is_skipped() { + let rules = compile(&[rule("", "count", "42", FilterAction::Drop)]); + // Numeric value, regex would match the digits but field-as-string lookup yields None. + let kept = apply(&rules, "any", evt(json!({"count": 42}))); + assert!(kept.is_some()); + } + + #[test] + fn dotted_field_path() { + let mut event_data = Map::new(); + event_data.insert("url".to_string(), json!({"host": "bank.example.com"})); + let event = Event { + id: None, + timestamp: Utc::now(), + duration: chrono::Duration::seconds(0), + data: event_data, + }; + let rules = compile(&[rule("", "url.host", "(?i)bank", FilterAction::Drop)]); + assert!(apply(&rules, "any", event).is_none()); + } + + #[test] + fn redact_uses_custom_replacement() { + let mut r = rule("", "title", ".*", FilterAction::Redact); + r.replacement = "***".to_string(); + let rules = compile(&[r]); + let result = apply(&rules, "any", evt(json!({"title": "anything"}))).unwrap(); + assert_eq!(result.data.get("title").unwrap(), "***"); + } + + #[test] + fn apply_batch_drops_and_keeps() { + let rules = compile(&[rule("", "title", "drop_me", FilterAction::Drop)]); + let events = vec![ + evt(json!({"title": "drop_me first"})), + evt(json!({"title": "keep this"})), + evt(json!({"title": "drop_me again"})), + ]; + let out = apply_batch(&rules, "any", events); + assert_eq!(out.len(), 1); + assert_eq!(out[0].data.get("title").unwrap(), "keep this"); + } +} diff --git a/aw-server/tests/api.rs b/aw-server/tests/api.rs index d45b4872..2125147b 100644 --- a/aw-server/tests/api.rs +++ b/aw-server/tests/api.rs @@ -24,6 +24,7 @@ mod api_tests { datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), asset_resolver: endpoints::AssetResolver::new(None), device_id: "test_id".to_string(), + privacy_filters: Vec::new(), }; let aw_config = config::AWConfig::default(); endpoints::build_rocket(state, aw_config) @@ -763,4 +764,118 @@ mod api_tests { .dispatch(); assert_eq!(res.status(), rocket::http::Status::Ok); } + + fn setup_testserver_with_privacy_filter( + filters: Vec, + ) -> rocket::Rocket { + let compiled = aw_server::privacy_filter::compile(&filters); + let state = endpoints::ServerState { + datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)), + asset_resolver: endpoints::AssetResolver::new(None), + device_id: "test_id".to_string(), + privacy_filters: compiled, + }; + let aw_config = config::AWConfig::default(); + endpoints::build_rocket(state, aw_config) + } + + /// Heartbeats matching a `drop` rule should return 200 OK (so clients + /// don't retry-storm) but the event must not be stored. + #[test] + fn test_privacy_filter_drop_heartbeat() { + use aw_server::privacy_filter::{FilterAction, PrivacyFilter}; + + let server = setup_testserver_with_privacy_filter(vec![PrivacyFilter { + enabled: true, + bucket_prefix: "aw-watcher-window".to_string(), + field: "title".to_string(), + pattern: "(?i)private browsing|incognito".to_string(), + action: FilterAction::Drop, + replacement: "REDACTED".to_string(), + }]); + let client = Client::untracked(server).expect("valid instance"); + + // Create bucket + client + .post("/api/0/buckets/aw-watcher-window_test") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .body(r#"{"client":"test","type":"currentwindow","hostname":"test"}"#) + .dispatch(); + + // Send a heartbeat whose title matches the drop rule. + let res = client + .post("/api/0/buckets/aw-watcher-window_test/heartbeat?pulsetime=30") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .body(r#"{"timestamp":"2024-01-01T00:00:00Z","duration":0.0,"data":{"app":"Firefox","title":"Banking - Private Browsing"}}"#) + .dispatch(); + assert_eq!(res.status(), Status::Ok, "drop rule must return 200 OK"); + + // The event store must be empty — the event was silently discarded. + let res = client + .get("/api/0/buckets/aw-watcher-window_test/events") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .dispatch(); + assert_eq!(res.status(), Status::Ok); + let events: Vec = + serde_json::from_str(&res.into_string().unwrap()).unwrap(); + assert!(events.is_empty(), "dropped heartbeat must not be stored"); + } + + /// Heartbeats matching a `redact` rule must be stored with the field + /// value replaced, not the original sensitive text. + #[test] + fn test_privacy_filter_redact_heartbeat() { + use aw_server::privacy_filter::{FilterAction, PrivacyFilter}; + + let server = setup_testserver_with_privacy_filter(vec![PrivacyFilter { + enabled: true, + bucket_prefix: String::new(), + field: "title".to_string(), + pattern: "(?i)password".to_string(), + action: FilterAction::Redact, + replacement: "REDACTED".to_string(), + }]); + let client = Client::untracked(server).expect("valid instance"); + + // Create bucket + client + .post("/api/0/buckets/aw-watcher-window_test2") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .body(r#"{"client":"test","type":"currentwindow","hostname":"test"}"#) + .dispatch(); + + // Send heartbeat with a sensitive title. + let res = client + .post("/api/0/buckets/aw-watcher-window_test2/heartbeat?pulsetime=30") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .body(r#"{"timestamp":"2024-01-01T00:00:00Z","duration":0.0,"data":{"app":"1Password","title":"1Password - Master password"}}"#) + .dispatch(); + assert_eq!(res.status(), Status::Ok); + + // Event must be stored but with title replaced. + let res = client + .get("/api/0/buckets/aw-watcher-window_test2/events") + .header(ContentType::JSON) + .header(Header::new("Host", "127.0.0.1:5600")) + .dispatch(); + assert_eq!(res.status(), Status::Ok); + let events: Vec = + serde_json::from_str(&res.into_string().unwrap()).unwrap(); + assert_eq!(events.len(), 1, "redacted event must be stored"); + assert_eq!( + events[0]["data"]["title"], + "REDACTED", + "sensitive title must be replaced" + ); + assert_eq!( + events[0]["data"]["app"], + "1Password", + "non-matching fields must be preserved" + ); + } }