From c1222cc8661bae2106f2c8805ffdfd6ef97f58bb Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Tue, 23 Jun 2026 14:24:25 -0700 Subject: [PATCH] [SLOP(claude-opus-4-8)] feat(rivetkit-core): add serde duration and size metrics --- .../rivetkit-core/src/actor/persist.rs | 16 +- .../packages/rivetkit-core/src/lib.rs | 1 + .../rivetkit-core/src/registry/http.rs | 100 +++++++---- .../rivetkit-core/src/serde_metrics.rs | 163 ++++++++++++++++++ 4 files changed, 237 insertions(+), 43 deletions(-) create mode 100644 rivetkit-rust/packages/rivetkit-core/src/serde_metrics.rs diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/persist.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/persist.rs index 52612c9dbf..ec29e45f8e 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/persist.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/persist.rs @@ -1,6 +1,8 @@ use anyhow::{Context, Result}; use vbare::OwnedVersionedData; +use crate::serde_metrics; + pub(crate) fn encode_latest_with_embedded_version( latest: T::Latest, version: u16, @@ -9,9 +11,11 @@ pub(crate) fn encode_latest_with_embedded_version( where T: OwnedVersionedData, { - T::wrap_latest(latest) - .serialize_with_embedded_version(version) - .with_context(|| format!("encode {label} versioned bare payload")) + serde_metrics::measure_serialize("bare", label, || { + T::wrap_latest(latest) + .serialize_with_embedded_version(version) + .with_context(|| format!("encode {label} versioned bare payload")) + }) } pub(crate) fn decode_latest_with_embedded_version( @@ -21,6 +25,8 @@ pub(crate) fn decode_latest_with_embedded_version( where T: OwnedVersionedData, { - ::deserialize_with_embedded_version(payload) - .with_context(|| format!("decode {label} versioned bare payload")) + serde_metrics::measure_deserialize("bare", label, payload.len(), || { + ::deserialize_with_embedded_version(payload) + .with_context(|| format!("decode {label} versioned bare payload")) + }) } diff --git a/rivetkit-rust/packages/rivetkit-core/src/lib.rs b/rivetkit-rust/packages/rivetkit-core/src/lib.rs index 63e5be3cde..e398405fe7 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/lib.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/lib.rs @@ -12,6 +12,7 @@ pub mod inspector_bundle; pub mod metrics_endpoint; pub mod registry; pub mod runtime; +pub(crate) mod serde_metrics; pub mod serverless; #[cfg(feature = "native-runtime")] pub mod serverless_http; diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs index 9839d79247..088debd34c 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/http.rs @@ -2,6 +2,7 @@ use super::dispatch::*; use super::inspector::*; use super::*; use crate::error::{ProtocolError, client_error_message, client_error_metadata}; +use crate::serde_metrics; use ::http; const HEADER_RIVET_ACTOR: &str = "x-rivet-actor"; @@ -773,6 +774,15 @@ pub(super) fn content_type_for_encoding(encoding: HttpResponseEncoding) -> &'sta } } +/// Bounded serde metric `format` label for the request/response encoding. +fn encoding_format_label(encoding: HttpResponseEncoding) -> &'static str { + match encoding { + HttpResponseEncoding::Json => "json", + HttpResponseEncoding::Cbor => "cbor", + HttpResponseEncoding::Bare => "bare", + } +} + pub(super) fn serialize_http_response_error( encoding: HttpResponseEncoding, group: &str, @@ -830,26 +840,31 @@ pub(super) fn decode_http_action_args( encoding: HttpResponseEncoding, body: &[u8], ) -> Result> { - match encoding { - HttpResponseEncoding::Json => { - let request: HttpActionRequestJson = - serde_json::from_slice(body).context("decode json HTTP action request")?; - let args = normalize_json_args(request.args); - encode_json_as_cbor(&args) - } - HttpResponseEncoding::Cbor => { - let request: HttpActionRequestJson = ciborium::from_reader(Cursor::new(body)) - .context("decode cbor HTTP action request")?; - let args = normalize_json_args(request.args); - encode_json_as_cbor(&args) - } - HttpResponseEncoding::Bare => { - let request = - ::deserialize_with_embedded_version(body) - .context("decode bare HTTP action request")?; - Ok(request.args) - } - } + serde_metrics::measure_deserialize( + encoding_format_label(encoding), + "http_action_request", + body.len(), + || match encoding { + HttpResponseEncoding::Json => { + let request: HttpActionRequestJson = + serde_json::from_slice(body).context("decode json HTTP action request")?; + let args = normalize_json_args(request.args); + encode_json_as_cbor(&args) + } + HttpResponseEncoding::Cbor => { + let request: HttpActionRequestJson = ciborium::from_reader(Cursor::new(body)) + .context("decode cbor HTTP action request")?; + let args = normalize_json_args(request.args); + encode_json_as_cbor(&args) + } + HttpResponseEncoding::Bare => { + let request = + ::deserialize_with_embedded_version(body) + .context("decode bare HTTP action request")?; + Ok(request.args) + } + }, + ) } fn normalize_json_args(args: JsonValue) -> Vec { @@ -900,25 +915,34 @@ pub(super) fn encode_http_action_response( encoding: HttpResponseEncoding, output: Vec, ) -> Result { - let body = match encoding { - HttpResponseEncoding::Json => serde_json::to_vec(&json!({ - "output": decode_cbor_json_or_null(&output), - }))?, - HttpResponseEncoding::Cbor => { - let mut out = Vec::new(); - ciborium::into_writer( - &json!({ + let body = serde_metrics::measure_serialize( + encoding_format_label(encoding), + "http_action_response", + || { + let body = match encoding { + HttpResponseEncoding::Json => serde_json::to_vec(&json!({ "output": decode_cbor_json_or_null(&output), - }), - &mut out, - )?; - out - } - HttpResponseEncoding::Bare => client_protocol::versioned::HttpActionResponse::wrap_latest( - client_protocol::HttpActionResponse { output }, - ) - .serialize_with_embedded_version(client_protocol::PROTOCOL_VERSION)?, - }; + }))?, + HttpResponseEncoding::Cbor => { + let mut out = Vec::new(); + ciborium::into_writer( + &json!({ + "output": decode_cbor_json_or_null(&output), + }), + &mut out, + )?; + out + } + HttpResponseEncoding::Bare => { + client_protocol::versioned::HttpActionResponse::wrap_latest( + client_protocol::HttpActionResponse { output }, + ) + .serialize_with_embedded_version(client_protocol::PROTOCOL_VERSION)? + } + }; + Ok(body) + }, + )?; Ok(HttpResponse { status: StatusCode::OK.as_u16(), headers: HashMap::from([( diff --git a/rivetkit-rust/packages/rivetkit-core/src/serde_metrics.rs b/rivetkit-rust/packages/rivetkit-core/src/serde_metrics.rs new file mode 100644 index 0000000000..b105c19f44 --- /dev/null +++ b/rivetkit-rust/packages/rivetkit-core/src/serde_metrics.rs @@ -0,0 +1,163 @@ +//! Duration and size metrics for serialization and deserialization hot paths. +//! +//! These mirror the engine-side serde observability but follow rivetkit's +//! metric conventions: `rivetkit_`-prefixed names registered through a +//! `LazyLock` collector struct, and `crate::time::Instant` so the same code +//! compiles for the wasm runtime. +//! +//! The `format` label is the wire format (`bare`, `json`, `cbor`). The +//! `location` label identifies the call site and must be a bounded, code-defined +//! string, never user input. + +use std::sync::LazyLock; +use std::time::Duration; + +use rivet_metrics::{ + MICRO_BUCKETS, + prometheus::{HistogramOpts, HistogramVec, Registry}, +}; + +use crate::time::Instant; + +const SERDE_LABELS: &[&str] = &["format", "location"]; + +/// Byte-size buckets shared by serialize and deserialize size histograms. +fn serde_size_buckets() -> Vec { + vec![ + 16.0, 32.0, 64.0, 128.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0, + 4194304.0, 16777216.0, + ] +} + +struct SerdeMetricCollectors { + serialize_size: HistogramVec, + deserialize_size: HistogramVec, + serialize_duration_seconds: HistogramVec, + deserialize_duration_seconds: HistogramVec, +} + +static METRICS: LazyLock = LazyLock::new(SerdeMetricCollectors::new); + +impl SerdeMetricCollectors { + fn new() -> Self { + let serialize_size = HistogramVec::new( + HistogramOpts::new( + "rivetkit_serialize_size", + "size in bytes for any serialization", + ) + .buckets(serde_size_buckets()), + SERDE_LABELS, + ) + .expect("create rivetkit_serialize_size histogram"); + let deserialize_size = HistogramVec::new( + HistogramOpts::new( + "rivetkit_deserialize_size", + "size in bytes for any deserialization", + ) + .buckets(serde_size_buckets()), + SERDE_LABELS, + ) + .expect("create rivetkit_deserialize_size histogram"); + let serialize_duration_seconds = HistogramVec::new( + HistogramOpts::new( + "rivetkit_serialize_duration_seconds", + "duration in seconds for any serialization", + ) + .buckets(MICRO_BUCKETS.to_vec()), + SERDE_LABELS, + ) + .expect("create rivetkit_serialize_duration_seconds histogram"); + let deserialize_duration_seconds = HistogramVec::new( + HistogramOpts::new( + "rivetkit_deserialize_duration_seconds", + "duration in seconds for any deserialization", + ) + .buckets(MICRO_BUCKETS.to_vec()), + SERDE_LABELS, + ) + .expect("create rivetkit_deserialize_duration_seconds histogram"); + + register_metric(&rivet_metrics::REGISTRY, serialize_size.clone()); + register_metric(&rivet_metrics::REGISTRY, deserialize_size.clone()); + register_metric(&rivet_metrics::REGISTRY, serialize_duration_seconds.clone()); + register_metric( + &rivet_metrics::REGISTRY, + deserialize_duration_seconds.clone(), + ); + + Self { + serialize_size, + deserialize_size, + serialize_duration_seconds, + deserialize_duration_seconds, + } + } +} + +/// Records the duration and output size of a serialization producing `Vec`. +/// +/// The size is only recorded when the closure succeeds. +pub(crate) fn measure_serialize( + format: &str, + location: &str, + f: impl FnOnce() -> anyhow::Result>, +) -> anyhow::Result> { + let started = Instant::now(); + let result = f(); + observe( + &METRICS.serialize_duration_seconds, + format, + location, + started.elapsed(), + ); + if let Ok(bytes) = &result { + observe_size(&METRICS.serialize_size, format, location, bytes.len()); + } + result +} + +/// Records the duration and input size of a deserialization. +/// +/// The input size is recorded unconditionally because the bytes are available +/// regardless of whether decoding succeeds. +pub(crate) fn measure_deserialize( + format: &str, + location: &str, + input_len: usize, + f: impl FnOnce() -> anyhow::Result, +) -> anyhow::Result { + observe_size(&METRICS.deserialize_size, format, location, input_len); + let started = Instant::now(); + let result = f(); + observe( + &METRICS.deserialize_duration_seconds, + format, + location, + started.elapsed(), + ); + result +} + +fn observe(metric: &HistogramVec, format: &str, location: &str, elapsed: Duration) { + metric + .with_label_values(&[format, location]) + .observe(elapsed.as_secs_f64()); +} + +fn observe_size(metric: &HistogramVec, format: &str, location: &str, size: usize) { + metric + .with_label_values(&[format, location]) + .observe(size as f64); +} + +fn register_metric(registry: &Registry, metric: M) +where + M: rivet_metrics::prometheus::core::Collector + Clone + Send + Sync + 'static, +{ + if let Err(error) = registry.register(Box::new(metric)) { + tracing::warn!( + ?error, + "serde metric registration failed, using existing collector" + ); + } +}