Skip to content

Commit e11f43c

Browse files
authored
Merge pull request #392 from superfly/gorbak/experiment-log-all-queries
How much time do the queries really take?
2 parents 54cccea + 3b4727b commit e11f43c

File tree

6 files changed

+136
-16
lines changed

6 files changed

+136
-16
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ tracing-filter = { version = "0.1.0-alpha.2", features = ["smallvec"] }
8585
tracing-opentelemetry = { version = "0.31.0", default-features = false, features = ["tracing-log"]}
8686
tracing-subscriber = { version = "0.3.16", features = ["json", "env-filter"] }
8787
tracing-test = "0.2"
88+
thread_local = "1.1.9"
8889
uhlc = { version = "0.7", features = ["defmt"] }
8990
uuid = { version = "1.3.1", features = ["v4", "serde"] }
9091
webpki = { version = "0.22.0", default-features = false, features = ["std"] }

crates/corro-agent/src/agent/run_root.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ async fn run(
139139
transport.clone(),
140140
tripwire.clone(),
141141
));
142+
143+
spawn_counted(corro_types::sqlite::query_metrics_loop(tripwire.clone()));
144+
142145
spawn_counted(handlers::handle_gossip_to_send(
143146
transport.clone(),
144147
to_send_rx,

crates/corro-types/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ tokio = { workspace = true }
5151
tokio-util = { workspace = true }
5252
tracing = { workspace = true }
5353
tripwire = { version = "0.1.0-alpha.0", path = "../tripwire" }
54+
thread_local = { workspace = true }
5455
uhlc = { workspace = true }
5556
uuid = { workspace = true }
5657
strum = { workspace = true }

crates/corro-types/src/agent.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use rangemap::RangeInclusiveSet;
2525
use rusqlite::{named_params, Connection, OpenFlags, OptionalExtension, Transaction};
2626
use serde::{Deserialize, Serialize};
2727
use serde_json::json;
28+
use sqlite_pool::SqliteConn;
2829
use tokio::{
2930
runtime::Handle,
3031
sync::{oneshot, Semaphore},
@@ -50,8 +51,8 @@ use crate::{
5051
pubsub::SubsManager,
5152
schema::Schema,
5253
sqlite::{
53-
rusqlite_to_crsqlite, rusqlite_to_crsqlite_write, setup_conn, unnest_param, CrConn,
54-
Migration, SqlitePool, SqlitePoolError,
54+
rusqlite_to_crsqlite, rusqlite_to_crsqlite_write, setup_conn, trace_heavy_queries,
55+
unnest_param, CrConn, Migration, SqlitePool, SqlitePoolError,
5556
},
5657
updates::UpdatesManager,
5758
};
@@ -570,13 +571,16 @@ impl SplitPool {
570571
pub fn dedicated(&self) -> rusqlite::Result<Connection> {
571572
let conn = rusqlite::Connection::open(&self.0.path)?;
572573
setup_conn(&conn)?;
574+
trace_heavy_queries(&conn)?;
573575
Ok(conn)
574576
}
575577

576578
#[tracing::instrument(skip(self), level = "debug")]
577579
pub fn client_dedicated(&self) -> rusqlite::Result<CrConn> {
578580
let conn = rusqlite::Connection::open(&self.0.path)?;
579-
rusqlite_to_crsqlite_write(conn)
581+
let cr_conn = rusqlite_to_crsqlite_write(conn)?;
582+
trace_heavy_queries(cr_conn.conn())?;
583+
Ok(cr_conn)
580584
}
581585

582586
#[tracing::instrument(skip(self), level = "debug")]
@@ -585,7 +589,9 @@ impl SplitPool {
585589
&self.0.path,
586590
OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
587591
)?;
588-
rusqlite_to_crsqlite(conn)
592+
let cr_conn = rusqlite_to_crsqlite(conn)?;
593+
trace_heavy_queries(cr_conn.conn())?;
594+
Ok(cr_conn)
589595
}
590596

591597
// get a high priority write connection (e.g. client input)

crates/corro-types/src/sqlite.rs

Lines changed: 120 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,141 @@
11
use std::{
2+
collections::HashMap,
23
ops::{Deref, DerefMut},
34
time::{Duration, Instant},
45
};
56

7+
use metrics::counter;
68
use once_cell::sync::Lazy;
7-
use rusqlite::types::{ToSql, ToSqlOutput, Value};
9+
use parking_lot::Mutex;
810
use rusqlite::{
911
params, trace::TraceEventCodes, vtab::eponymous_only_module, Connection, Transaction,
1012
};
13+
use rusqlite::{
14+
types::{ToSql, ToSqlOutput, Value},
15+
DatabaseName,
16+
};
1117
use sqlite_pool::{Committable, SqliteConn};
1218
use std::rc::Rc;
1319
use tempfile::TempDir;
20+
use thread_local::ThreadLocal;
1421
use tracing::{error, info, trace, warn};
22+
use tripwire::Tripwire;
1523

1624
use crate::vtab::unnest::UnnestTab;
1725

1826
pub type SqlitePool = sqlite_pool::Pool<CrConn>;
1927
pub type SqlitePoolError = sqlite_pool::PoolError;
2028

29+
// Global registry for query stats
30+
// (sql, readonly) => (total_count, total_nanos)
31+
type QueryStats = HashMap<(String, bool), (u64, u128)>;
32+
static QUERY_STATS: ThreadLocal<Mutex<QueryStats>> = ThreadLocal::new();
33+
pub async fn query_metrics_loop(mut tripwire: Tripwire) {
34+
let mut interval = tokio::time::interval(Duration::from_secs(10));
35+
let mut prev_tick = interval.tick().await;
36+
loop {
37+
tokio::select! {
38+
t = interval.tick() => {
39+
let elapsed = t.duration_since(prev_tick);
40+
prev_tick = t;
41+
handle_query_metrics(elapsed);
42+
},
43+
_ = &mut tripwire => break,
44+
}
45+
}
46+
}
47+
48+
// Log to stdout queries taking more than 1 second
49+
const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(1);
50+
// Send utilization metrics for queries taking more than 10ms per second on average
51+
const IMPACTFUL_QUERY_THRESHOLD_MS_PER_SECOND: f64 = 10.0;
52+
// The default length in prometheus is 4kb but 1kb is more than enough
53+
const MAX_QUERY_LABEL_LENGTH: usize = 1024;
54+
fn handle_query_metrics(elapsed: Duration) {
55+
// Aggregate and drain stats from all threads into a single map
56+
let mut aggregated: QueryStats = Default::default();
57+
58+
for stats_mutex in QUERY_STATS.iter() {
59+
let mut stats = stats_mutex.lock();
60+
for (key, (count, nanos)) in stats.drain() {
61+
let entry = aggregated.entry(key).or_insert((0u64, 0u128));
62+
entry.0 += count;
63+
entry.1 += nanos;
64+
}
65+
}
66+
67+
for ((query_raw, readonly), (total_count, total_nanos)) in aggregated.into_iter() {
68+
let total_ms = (total_nanos / 1_000_000) as u64;
69+
let ms_per_second = total_ms as f64 / elapsed.as_secs_f64();
70+
if ms_per_second > IMPACTFUL_QUERY_THRESHOLD_MS_PER_SECOND {
71+
// For too long queries, truncate them to cap the label length
72+
// and append a hash to avoid collisions
73+
let query = if query_raw.len() > MAX_QUERY_LABEL_LENGTH {
74+
use std::collections::hash_map::DefaultHasher;
75+
use std::hash::Hash;
76+
use std::hash::Hasher;
77+
let mut h = DefaultHasher::new();
78+
query_raw.hash(&mut h);
79+
format!(
80+
"{}_{:x}",
81+
query_raw.chars().take(1024 - 16 - 1).collect::<String>(),
82+
h.finish()
83+
)
84+
} else {
85+
query_raw.clone()
86+
};
87+
counter!("corro.db.query.ms", "query" => query.clone() , "readonly" => readonly.to_string()).increment(total_ms);
88+
counter!("corro.db.query.count", "query" => query.clone(), "readonly" => readonly.to_string()).increment(total_count);
89+
}
90+
}
91+
}
92+
93+
fn tracing_callback_ro(ev: rusqlite::trace::TraceEvent) {
94+
handle_sql_tracing_event(ev, true);
95+
}
96+
97+
fn tracing_callback_rw(ev: rusqlite::trace::TraceEvent) {
98+
handle_sql_tracing_event(ev, false);
99+
}
100+
101+
fn handle_sql_tracing_event(ev: rusqlite::trace::TraceEvent, readonly: bool) {
102+
if let rusqlite::trace::TraceEvent::Profile(stmt_ref, duration) = ev {
103+
let dur = duration.as_nanos();
104+
let sql = stmt_ref.sql().to_string();
105+
106+
// Update per-thread stats to avoid contention on hot path
107+
let stats_mutex = QUERY_STATS.get_or_default();
108+
let mut stats = stats_mutex.lock();
109+
let entry = stats
110+
.entry((sql.clone(), readonly))
111+
.or_insert((0u64, 0u128));
112+
entry.0 += 1;
113+
entry.1 += dur;
114+
drop(stats); // Release lock quickly
115+
116+
if duration >= SLOW_QUERY_THRESHOLD {
117+
warn!(
118+
"SLOW {} query {duration:?} => {}",
119+
if readonly { "RO" } else { "RW" },
120+
sql
121+
);
122+
}
123+
}
124+
}
125+
126+
pub fn trace_heavy_queries(conn: &Connection) -> rusqlite::Result<()> {
127+
let readonly = conn.is_readonly(DatabaseName::Main)?;
128+
conn.trace_v2(
129+
TraceEventCodes::SQLITE_TRACE_PROFILE,
130+
Some(if readonly {
131+
tracing_callback_ro
132+
} else {
133+
tracing_callback_rw
134+
}),
135+
);
136+
Ok(())
137+
}
138+
21139
const CRSQL_EXT_GENERIC_NAME: &str = "crsqlite";
22140

23141
#[cfg(target_os = "macos")]
@@ -71,17 +189,7 @@ pub fn rusqlite_to_crsqlite(mut conn: rusqlite::Connection) -> rusqlite::Result<
71189
// I spent too much time debugging, it looks like a real bug in sqlite .-.
72190
let _ = conn.prepare_cached(INSERT_CRSQL_CHANGES_QUERY)?;
73191

74-
const SLOW_THRESHOLD: Duration = Duration::from_secs(1);
75-
conn.trace_v2(
76-
TraceEventCodes::SQLITE_TRACE_PROFILE,
77-
Some(|event| {
78-
if let rusqlite::trace::TraceEvent::Profile(stmt_ref, duration) = event {
79-
if duration >= SLOW_THRESHOLD {
80-
warn!("SLOW query {duration:?} => {}", stmt_ref.sql());
81-
}
82-
}
83-
}),
84-
);
192+
trace_heavy_queries(&conn)?;
85193

86194
Ok(CrConn(conn))
87195
}

0 commit comments

Comments
 (0)