Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion benchmarks-website/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ path = "src/main.rs"
anyhow = { workspace = true }
axum = "0.8"
base64 = "0.22"
dashmap = { workspace = true }
# track vortex-duckdb's bundled engine version (build.rs)
duckdb = { version = "1.10502", features = ["bundled"] }
maud = { version = "0.27", features = ["axum"] }
serde = { workspace = true, features = ["derive"] }
parking_lot = { workspace = true }
serde = { workspace = true, features = ["derive", "rc"] }
serde_json = { workspace = true }
subtle = "2.6"
thiserror = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion benchmarks-website/server/src/api/charts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
//! returns a [`ChartResponse`].

use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::Context as _;
use anyhow::Result;
Expand Down Expand Up @@ -100,7 +101,7 @@ pub(crate) fn collect_group_charts(
charts.push(NamedChartResponse {
name: link.name,
slug: link.slug,
chart,
chart: Arc::new(chart),
});
}
if charts.is_empty() {
Expand Down
23 changes: 16 additions & 7 deletions benchmarks-website/server/src/api/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//! ingest side, see [`crate::records`]).

use std::collections::BTreeMap;
use std::sync::Arc;

use serde::Serialize;
use serde_json::Value as JsonValue;
Expand Down Expand Up @@ -54,17 +55,21 @@ pub fn group_sort_key(name: &str) -> (usize, &str) {
}

/// Body of `GET /api/groups`: every group with its chart links and summary.
///
/// The inner [`Vec`] is held in an [`Arc`] so [`crate::query_cache::QueryCache`]
/// can serve the same allocation to every concurrent reader without cloning.
/// `Arc<T>` serialises through to `T`, so the wire shape is unchanged.
#[derive(Debug, Serialize)]
pub struct GroupsResponse {
/// Every group surfaced by the discovery passes, in canonical order.
pub groups: Vec<Group>,
pub groups: Arc<Vec<Group>>,
}

/// One group: a display name, a slug for the group permalink, and the chart
/// links inside it. Optionally carries a v2-compatible rollup summary and a
/// short editorial description (rendered as a hover tooltip on the
/// disclosure title).
#[derive(Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct Group {
/// Human-readable group label rendered in the disclosure header.
pub name: String,
Expand Down Expand Up @@ -98,7 +103,7 @@ pub struct GroupChartsResponse {
}

/// Server-computed group summary, matching the v2 metadata contract.
#[derive(Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")]
pub enum Summary {
/// Random-access format ranking for the latest populated random-access chart.
Expand Down Expand Up @@ -161,7 +166,7 @@ pub enum Summary {
}

/// One random-access summary row.
#[derive(Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct RandomAccessRanking {
/// Series name, normally the physical format.
pub name: String,
Expand All @@ -172,7 +177,7 @@ pub struct RandomAccessRanking {
}

/// One query benchmark summary row.
#[derive(Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct QueryRanking {
/// Series name, normally `engine:format`.
pub name: String,
Expand All @@ -186,6 +191,10 @@ pub struct QueryRanking {
/// A single chart inside a [`GroupChartsResponse`]. `name` is the chart's
/// short label inside the group (e.g. `Q1`); `slug` round-trips through
/// `/api/chart/{slug}`.
///
/// `chart` is held in an [`Arc`] so the cache and the landing-page builder
/// share the same allocation; `Arc<T>` serialises as `T`, so the wire shape
/// is identical to a plain `ChartResponse`.
#[derive(Debug, Serialize)]
pub struct NamedChartResponse {
/// Chart label rendered in the chart-card title (e.g. `Q1`).
Expand All @@ -194,12 +203,12 @@ pub struct NamedChartResponse {
pub slug: String,
/// Inlined chart payload — same shape as `/api/chart/{slug}`.
#[serde(flatten)]
pub chart: ChartResponse,
pub chart: Arc<ChartResponse>,
}

/// One chart's short label inside a group (e.g. `Q1`) plus the slug that
/// resolves to its `/api/chart/{slug}` payload.
#[derive(Debug, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct ChartLink {
/// Chart label rendered in the chart-card title (e.g. `Q1`).
pub name: String,
Expand Down
72 changes: 65 additions & 7 deletions benchmarks-website/server/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use crate::slug::GroupKey;

/// Handler for `GET /api/groups`.
pub async fn groups(State(state): State<AppState>) -> Result<impl IntoResponse, ApiError> {
let groups = db::run_blocking(&state.db, |conn| collect_groups(conn)).await?;
let groups = cached_groups(&state).await?;
Ok(Json(GroupsResponse { groups }))
}

Expand All @@ -80,8 +80,7 @@ pub async fn chart(
let key = ChartKey::from_slug(&slug)
.map_err(|e| ApiError::BadRequest(format!("invalid slug: {e}")))?;
let window = q.window();
let response =
db::run_blocking(&state.db, move |conn| chart_payload(conn, &key, &window)).await?;
let response = cached_chart_payload(&state, &slug, &key, &window).await?;
let response =
response.ok_or_else(|| ApiError::NotFound(format!("no data for slug {slug:?}")))?;
Ok(Json(response))
Expand All @@ -96,15 +95,74 @@ pub async fn group(
let key = GroupKey::from_slug(&slug)
.map_err(|e| ApiError::BadRequest(format!("invalid group slug: {e}")))?;
let window = q.window();
let response = db::run_blocking(&state.db, move |conn| {
collect_group_charts(conn, &key, &window)
})
.await?;
let response = cached_group_charts(&state, &slug, &key, &window).await?;
let response =
response.ok_or_else(|| ApiError::NotFound(format!("no data for group slug {slug:?}")))?;
Ok(Json(response))
}

/// Cache-aware wrapper around [`collect_groups`].
pub async fn cached_groups(state: &AppState) -> Result<std::sync::Arc<Vec<Group>>> {
let db = state.db.clone();
state
.cache
.groups(move || async move { db::run_blocking(&db, |conn| collect_groups(conn)).await })
.await
}

/// Cache-aware wrapper around [`collect_filter_universe`].
pub async fn cached_filter_universe(state: &AppState) -> Result<std::sync::Arc<FilterUniverse>> {
let db = state.db.clone();
state
.cache
.filter_universe(move || async move {
db::run_blocking(&db, |conn| collect_filter_universe(conn)).await
})
.await
}

/// Cache-aware wrapper around [`chart_payload`].
pub async fn cached_chart_payload(
state: &AppState,
slug: &str,
key: &ChartKey,
window: &CommitWindow,
) -> Result<Option<std::sync::Arc<ChartResponse>>> {
let db = state.db.clone();
let key_for_compute = key.clone();
let window_for_compute = *window;
state
.cache
.chart_payload(slug, window, move || async move {
db::run_blocking(&db, move |conn| {
chart_payload(conn, &key_for_compute, &window_for_compute)
})
.await
})
.await
}

/// Cache-aware wrapper around [`collect_group_charts`].
pub async fn cached_group_charts(
state: &AppState,
slug: &str,
key: &GroupKey,
window: &CommitWindow,
) -> Result<Option<std::sync::Arc<GroupChartsResponse>>> {
let db = state.db.clone();
let key_for_compute = key.clone();
let window_for_compute = *window;
state
.cache
.group_charts(slug, window, move || async move {
db::run_blocking(&db, move |conn| {
collect_group_charts(conn, &key_for_compute, &window_for_compute)
})
.await
})
.await
}

/// Handler for `GET /health`.
pub async fn health(State(state): State<AppState>) -> Result<impl IntoResponse, ApiError> {
let path = state.db_path.display().to_string();
Expand Down
2 changes: 1 addition & 1 deletion benchmarks-website/server/src/api/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use super::dto::DEFAULT_COMMIT_WINDOW;
///
/// `Last(n)` keeps the most recent `n` commits by `commits.timestamp`; `All`
/// returns every commit ever ingested.
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CommitWindow {
/// Keep the most recent `n` commits.
Last(NonZeroU32),
Expand Down
7 changes: 6 additions & 1 deletion benchmarks-website/server/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ use crate::db::DbHandle;
use crate::db::{self};
use crate::html;
use crate::ingest;
use crate::query_cache::QueryCache;

/// Shared state for all handlers. Cheap to clone (everything is `Arc`-shaped
/// or a small `String`).
#[derive(Clone)]
pub struct AppState {
/// Mutex-guarded DuckDB connection. See [`crate::db`].
/// Shared DuckDB handle. See [`crate::db`].
pub db: DbHandle,
/// Bearer token expected on `/api/ingest`. Compared via constant-time eq.
pub bearer_token: Arc<String>,
/// On-disk path of the DuckDB file. Surfaced on `/health`.
pub db_path: Arc<PathBuf>,
/// In-memory cache of every read-side query result. Cleared by
/// [`crate::ingest`] after a successful commit. See [`crate::query_cache`].
pub cache: Arc<QueryCache>,
}

impl AppState {
Expand All @@ -52,6 +56,7 @@ impl AppState {
db,
bearer_token: Arc::new(bearer_token),
db_path: Arc::new(path),
cache: Arc::new(QueryCache::new()),
})
}
}
Expand Down
40 changes: 29 additions & 11 deletions benchmarks-website/server/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

//! DuckDB connection management plus the deterministic `measurement_id` hash.
//!
//! The server holds a single [`duckdb::Connection`] inside an async
//! [`tokio::sync::Mutex`]. All DB work runs inside `spawn_blocking` so the
//! Tokio runtime is never blocked on synchronous DuckDB calls.
//! The server keeps one root [`duckdb::Connection`] and clones a fresh
//! connection from it for each blocking DB task. All DB work runs inside
//! `spawn_blocking` so the Tokio runtime is never blocked on synchronous
//! DuckDB calls.
//!
//! `measurement_id` is a server-internal xxhash64 over `commit_sha` plus
//! each table's dimensional tuple. Including `commit_sha` makes every
Expand All @@ -21,7 +22,7 @@ use std::sync::Arc;
use anyhow::Context as _;
use anyhow::Result;
use duckdb::Connection;
use tokio::sync::Mutex;
use parking_lot::Mutex;
use twox_hash::XxHash64;

use crate::records::CompressionSize;
Expand All @@ -31,8 +32,25 @@ use crate::records::RandomAccessTime;
use crate::records::VectorSearchRun;
use crate::schema::SCHEMA_DDL;

/// A connection guard the rest of the crate hands around.
pub type DbHandle = Arc<Mutex<Connection>>;
/// Shared DuckDB handle. Cloning the handle is cheap; each DB task clones a
/// task-local [`Connection`] before doing work.
#[derive(Clone)]
pub struct DbHandle {
root: Arc<Mutex<Connection>>,
}

impl DbHandle {
fn new(root: Connection) -> Self {
Self {
root: Arc::new(Mutex::new(root)),
}
}

fn connection(&self) -> Result<Connection> {
let root = self.root.lock();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't love that this still requires locking, even though the criticial section is pretty short lived...guess it's fine 🤷

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we cant really get around this? Also it is unlikely that this would have contention anyways

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is called on every request right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at this point, why do we even need the lock? can't we just clone here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type isn't send so it needs to be wrapped in something with interior mut

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I guess it might have some contention, but duckdb is going to have a lot of overhead in creating txns for all of these requests (which is further dominated by the actual queries) so I don't think it's a problem

root.try_clone().context("cloning DuckDB connection")
}
}

/// Open the DuckDB file at `path` (creating it if absent) and apply the
/// schema DDL. Returns a handle ready to be cloned into the Axum state.
Expand All @@ -41,20 +59,20 @@ pub fn open<P: AsRef<Path>>(path: P) -> Result<DbHandle> {
.with_context(|| format!("opening DuckDB at {}", path.as_ref().display()))?;
conn.execute_batch(SCHEMA_DDL)
.context("applying schema DDL")?;
Ok(Arc::new(Mutex::new(conn)))
Ok(DbHandle::new(conn))
}

/// Run a synchronous DB operation on the blocking pool, holding the connection
/// mutex for the duration of the call.
/// Run a synchronous DB operation on the blocking pool using a task-local
/// DuckDB connection cloned from the shared database handle.
pub async fn run_blocking<F, T>(handle: &DbHandle, f: F) -> Result<T>
where
F: FnOnce(&mut Connection) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let handle = handle.clone();
tokio::task::spawn_blocking(move || {
let mut guard = handle.blocking_lock();
f(&mut guard)
let mut conn = handle.connection()?;
f(&mut conn)
})
.await
.context("DB task panicked")?
Expand Down
Loading
Loading