Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aw-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ clap = { version = "4.1", features = ["derive", "cargo"] }
log-panics = { version = "2", features = ["with-backtrace"]}
subtle = "2"
rust-embed = { version = "8.0.0", features = ["interpolate-folder-path", "debug-embed"] }
fancy-regex = "0.12.0"

aw-datastore = { path = "../aw-datastore" }
aw-models = { path = "../aw-models" }
Expand Down
1 change: 1 addition & 0 deletions aw-server/src/android/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ pub mod android {
datastore: Mutex::new(openDatastore()),
asset_resolver: endpoints::AssetResolver::new(None),
device_id: device_id::get_device_id(),
privacy_filters: Vec::new(),
};
info!("Using server_state:: device_id: {}", server_state.device_id);

Expand Down
11 changes: 11 additions & 0 deletions aw-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use rocket::log::LogLevel;
use serde::{Deserialize, Serialize};

use crate::dirs;
use crate::privacy_filter::PrivacyFilter;

// Far from an optimal way to solve it, but works and is simple
static mut TESTING: bool = true;
Expand Down Expand Up @@ -55,6 +56,11 @@ pub struct AWConfig {
// custom visualizations are located.
#[serde(default = "default_custom_static")]
pub custom_static: std::collections::HashMap<String, String>,

/// Server-side privacy filter rules applied at heartbeat / event ingestion.
/// See `privacy_filter` module for shape and semantics.
#[serde(default = "default_privacy_filters")]
pub privacy_filters: Vec<PrivacyFilter>,
}

impl Default for AWConfig {
Expand All @@ -67,6 +73,7 @@ impl Default for AWConfig {
cors: default_cors(),
cors_regex: default_cors(),
custom_static: default_custom_static(),
privacy_filters: default_privacy_filters(),
}
}
}
Expand Down Expand Up @@ -119,6 +126,10 @@ fn default_custom_static() -> std::collections::HashMap<String, String> {
std::collections::HashMap::new()
}

fn default_privacy_filters() -> Vec<PrivacyFilter> {
Vec::new()
}

pub fn create_config(testing: bool) -> AWConfig {
set_testing(testing);
let mut config_path = dirs::get_config_dir().unwrap();
Expand Down
1 change: 1 addition & 0 deletions aw-server/src/endpoints/apikey.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ mod tests {
datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)),
asset_resolver: endpoints::AssetResolver::new(None),
device_id: "test_id".to_string(),
privacy_filters: Vec::new(),
};
let mut aw_config = AWConfig::default();
aw_config.auth.api_key = api_key;
Expand Down
17 changes: 15 additions & 2 deletions aw-server/src/endpoints/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use rocket::State;

use crate::endpoints::util::BucketsExportRocket;
use crate::endpoints::{HttpErrorJson, ServerState};
use crate::privacy_filter;

#[get("/")]
pub fn buckets_get(
Expand Down Expand Up @@ -135,8 +136,12 @@ pub fn bucket_events_create(
state: &State<ServerState>,
) -> Result<Json<Vec<Event>>, HttpErrorJson> {
let datastore = endpoints_get_lock!(state.datastore);
let res = datastore.insert_events(bucket_id, &events);
match res {
let filtered =
privacy_filter::apply_batch(&state.privacy_filters, bucket_id, events.into_inner());
if filtered.is_empty() {
return Ok(Json(Vec::new()));
}
match datastore.insert_events(bucket_id, &filtered) {
Comment on lines 138 to +144
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Lock acquired before filtering in bucket_events_create. The datastore mutex is grabbed on line 138 and then held for the duration of the regex-matching pass over the entire event batch. The heartbeat handler gets this right — it filters first and only acquires the lock after. Under a large batch or a complex pattern, every concurrent request to the datastore stalls until filtering completes.

Suggested change
let datastore = endpoints_get_lock!(state.datastore);
let res = datastore.insert_events(bucket_id, &events);
match res {
let filtered =
privacy_filter::apply_batch(&state.privacy_filters, bucket_id, events.into_inner());
if filtered.is_empty() {
return Ok(Json(Vec::new()));
}
match datastore.insert_events(bucket_id, &filtered) {
let filtered =
privacy_filter::apply_batch(&state.privacy_filters, bucket_id, events.into_inner());
if filtered.is_empty() {
return Ok(Json(Vec::new()));
}
let datastore = endpoints_get_lock!(state.datastore);
match datastore.insert_events(bucket_id, &filtered) {

Ok(events) => Ok(Json(events)),
Err(err) => Err(err.into()),
}
Expand All @@ -154,6 +159,14 @@ pub fn bucket_events_heartbeat(
state: &State<ServerState>,
) -> Result<Json<Event>, HttpErrorJson> {
let heartbeat = heartbeat_json.into_inner();
// Apply server-side privacy filter. If a `drop` rule matches, swallow
// the heartbeat with 200 OK and an empty event so older clients don't
// retry-storm; the watcher's view that "the event was accepted" is
// upheld even though nothing was persisted.
let heartbeat = match privacy_filter::apply(&state.privacy_filters, bucket_id, heartbeat) {
Some(e) => e,
None => return Ok(Json(Event::default())),
};
let datastore = endpoints_get_lock!(state.datastore);
match datastore.heartbeat(bucket_id, heartbeat, pulsetime) {
Ok(e) => Ok(Json(e)),
Expand Down
1 change: 1 addition & 0 deletions aw-server/src/endpoints/hostcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ mod tests {
datastore: Mutex::new(aw_datastore::Datastore::new_in_memory(false)),
asset_resolver: endpoints::AssetResolver::new(None),
device_id: "test_id".to_string(),
privacy_filters: Vec::new(),
};
let mut aw_config = AWConfig::default();
aw_config.address = address;
Expand Down
2 changes: 2 additions & 0 deletions aw-server/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ pub struct ServerState {
pub datastore: Mutex<Datastore>,
pub asset_resolver: AssetResolver,
pub device_id: String,
/// Compiled privacy-filter rules applied at heartbeat / event ingestion.
pub privacy_filters: Vec<crate::privacy_filter::CompiledRule>,
}

#[macro_use]
Expand Down
1 change: 1 addition & 0 deletions aw-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod device_id;
pub mod dirs;
pub mod endpoints;
pub mod logging;
pub mod privacy_filter;

#[cfg(target_os = "android")]
pub mod android;
Expand Down
6 changes: 6 additions & 0 deletions aw-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,18 @@ async fn main() -> Result<(), rocket::Error> {
device_id::get_device_id()
};

let privacy_filters = aw_server::privacy_filter::compile(&config.privacy_filters);
if !privacy_filters.is_empty() {
info!("Loaded {} privacy filter rule(s)", privacy_filters.len());
}

let server_state = endpoints::ServerState {
// Even if legacy_import is set to true it is disabled on Android so
// it will not happen there
datastore: Mutex::new(aw_datastore::Datastore::new(db_path, legacy_import)),
asset_resolver: endpoints::AssetResolver::new(asset_path),
device_id,
privacy_filters,
};

let _rocket = endpoints::build_rocket(server_state, config)
Expand Down
Loading
Loading