Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions rivetkit-rust/packages/rivetkit-core/src/actor/persist.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::{Context, Result};
use vbare::OwnedVersionedData;

use crate::serde_metrics;

pub(crate) fn encode_latest_with_embedded_version<T>(
latest: T::Latest,
version: u16,
Expand All @@ -9,9 +11,11 @@ pub(crate) fn encode_latest_with_embedded_version<T>(
where
T: OwnedVersionedData,
{
T::wrap_latest(latest)
.serialize_with_embedded_version(version)
.with_context(|| format!("encode {label} versioned bare payload"))
serde_metrics::measure_serialize("bare", label, || {
T::wrap_latest(latest)
.serialize_with_embedded_version(version)
.with_context(|| format!("encode {label} versioned bare payload"))
})
}

pub(crate) fn decode_latest_with_embedded_version<T>(
Expand All @@ -21,6 +25,8 @@ pub(crate) fn decode_latest_with_embedded_version<T>(
where
T: OwnedVersionedData,
{
<T as OwnedVersionedData>::deserialize_with_embedded_version(payload)
.with_context(|| format!("decode {label} versioned bare payload"))
serde_metrics::measure_deserialize("bare", label, payload.len(), || {
<T as OwnedVersionedData>::deserialize_with_embedded_version(payload)
.with_context(|| format!("decode {label} versioned bare payload"))
})
}
1 change: 1 addition & 0 deletions rivetkit-rust/packages/rivetkit-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod inspector_bundle;
pub mod metrics_endpoint;
pub mod registry;
pub mod runtime;
pub(crate) mod serde_metrics;
pub mod serverless;
#[cfg(feature = "native-runtime")]
pub mod serverless_http;
Expand Down
100 changes: 62 additions & 38 deletions rivetkit-rust/packages/rivetkit-core/src/registry/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::dispatch::*;
use super::inspector::*;
use super::*;
use crate::error::{ProtocolError, client_error_message, client_error_metadata};
use crate::serde_metrics;
use ::http;

const HEADER_RIVET_ACTOR: &str = "x-rivet-actor";
Expand Down Expand Up @@ -773,6 +774,15 @@ pub(super) fn content_type_for_encoding(encoding: HttpResponseEncoding) -> &'sta
}
}

/// Bounded serde metric `format` label for the request/response encoding.
fn encoding_format_label(encoding: HttpResponseEncoding) -> &'static str {
match encoding {
HttpResponseEncoding::Json => "json",
HttpResponseEncoding::Cbor => "cbor",
HttpResponseEncoding::Bare => "bare",
}
}

pub(super) fn serialize_http_response_error(
encoding: HttpResponseEncoding,
group: &str,
Expand Down Expand Up @@ -830,26 +840,31 @@ pub(super) fn decode_http_action_args(
encoding: HttpResponseEncoding,
body: &[u8],
) -> Result<Vec<u8>> {
match encoding {
HttpResponseEncoding::Json => {
let request: HttpActionRequestJson =
serde_json::from_slice(body).context("decode json HTTP action request")?;
let args = normalize_json_args(request.args);
encode_json_as_cbor(&args)
}
HttpResponseEncoding::Cbor => {
let request: HttpActionRequestJson = ciborium::from_reader(Cursor::new(body))
.context("decode cbor HTTP action request")?;
let args = normalize_json_args(request.args);
encode_json_as_cbor(&args)
}
HttpResponseEncoding::Bare => {
let request =
<client_protocol::versioned::HttpActionRequest as OwnedVersionedData>::deserialize_with_embedded_version(body)
.context("decode bare HTTP action request")?;
Ok(request.args)
}
}
serde_metrics::measure_deserialize(
encoding_format_label(encoding),
"http_action_request",
body.len(),
|| match encoding {
HttpResponseEncoding::Json => {
let request: HttpActionRequestJson =
serde_json::from_slice(body).context("decode json HTTP action request")?;
let args = normalize_json_args(request.args);
encode_json_as_cbor(&args)
}
HttpResponseEncoding::Cbor => {
let request: HttpActionRequestJson = ciborium::from_reader(Cursor::new(body))
.context("decode cbor HTTP action request")?;
let args = normalize_json_args(request.args);
encode_json_as_cbor(&args)
}
HttpResponseEncoding::Bare => {
let request =
<client_protocol::versioned::HttpActionRequest as OwnedVersionedData>::deserialize_with_embedded_version(body)
.context("decode bare HTTP action request")?;
Ok(request.args)
}
},
)
}

fn normalize_json_args(args: JsonValue) -> Vec<JsonValue> {
Expand Down Expand Up @@ -900,25 +915,34 @@ pub(super) fn encode_http_action_response(
encoding: HttpResponseEncoding,
output: Vec<u8>,
) -> Result<HttpResponse> {
let body = match encoding {
HttpResponseEncoding::Json => serde_json::to_vec(&json!({
"output": decode_cbor_json_or_null(&output),
}))?,
HttpResponseEncoding::Cbor => {
let mut out = Vec::new();
ciborium::into_writer(
&json!({
let body = serde_metrics::measure_serialize(
encoding_format_label(encoding),
"http_action_response",
|| {
let body = match encoding {
HttpResponseEncoding::Json => serde_json::to_vec(&json!({
"output": decode_cbor_json_or_null(&output),
}),
&mut out,
)?;
out
}
HttpResponseEncoding::Bare => client_protocol::versioned::HttpActionResponse::wrap_latest(
client_protocol::HttpActionResponse { output },
)
.serialize_with_embedded_version(client_protocol::PROTOCOL_VERSION)?,
};
}))?,
HttpResponseEncoding::Cbor => {
let mut out = Vec::new();
ciborium::into_writer(
&json!({
"output": decode_cbor_json_or_null(&output),
}),
&mut out,
)?;
out
}
HttpResponseEncoding::Bare => {
client_protocol::versioned::HttpActionResponse::wrap_latest(
client_protocol::HttpActionResponse { output },
)
.serialize_with_embedded_version(client_protocol::PROTOCOL_VERSION)?
}
};
Ok(body)
},
)?;
Ok(HttpResponse {
status: StatusCode::OK.as_u16(),
headers: HashMap::from([(
Expand Down
163 changes: 163 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/src/serde_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
//! Duration and size metrics for serialization and deserialization hot paths.
//!
//! These mirror the engine-side serde observability but follow rivetkit's
//! metric conventions: `rivetkit_`-prefixed names registered through a
//! `LazyLock` collector struct, and `crate::time::Instant` so the same code
//! compiles for the wasm runtime.
//!
//! The `format` label is the wire format (`bare`, `json`, `cbor`). The
//! `location` label identifies the call site and must be a bounded, code-defined
//! string, never user input.

use std::sync::LazyLock;
use std::time::Duration;

use rivet_metrics::{
MICRO_BUCKETS,
prometheus::{HistogramOpts, HistogramVec, Registry},
};

use crate::time::Instant;

const SERDE_LABELS: &[&str] = &["format", "location"];

/// Byte-size buckets shared by serialize and deserialize size histograms.
fn serde_size_buckets() -> Vec<f64> {
vec![
16.0, 32.0, 64.0, 128.0, 256.0, 1024.0, 4096.0, 16384.0, 65536.0, 262144.0, 1048576.0,
4194304.0, 16777216.0,
]
}

struct SerdeMetricCollectors {
serialize_size: HistogramVec,
deserialize_size: HistogramVec,
serialize_duration_seconds: HistogramVec,
deserialize_duration_seconds: HistogramVec,
}

static METRICS: LazyLock<SerdeMetricCollectors> = LazyLock::new(SerdeMetricCollectors::new);

impl SerdeMetricCollectors {
fn new() -> Self {
let serialize_size = HistogramVec::new(
HistogramOpts::new(
"rivetkit_serialize_size",
"size in bytes for any serialization",
)
.buckets(serde_size_buckets()),
SERDE_LABELS,
)
.expect("create rivetkit_serialize_size histogram");
let deserialize_size = HistogramVec::new(
HistogramOpts::new(
"rivetkit_deserialize_size",
"size in bytes for any deserialization",
)
.buckets(serde_size_buckets()),
SERDE_LABELS,
)
.expect("create rivetkit_deserialize_size histogram");
let serialize_duration_seconds = HistogramVec::new(
HistogramOpts::new(
"rivetkit_serialize_duration_seconds",
"duration in seconds for any serialization",
)
.buckets(MICRO_BUCKETS.to_vec()),
SERDE_LABELS,
)
.expect("create rivetkit_serialize_duration_seconds histogram");
let deserialize_duration_seconds = HistogramVec::new(
HistogramOpts::new(
"rivetkit_deserialize_duration_seconds",
"duration in seconds for any deserialization",
)
.buckets(MICRO_BUCKETS.to_vec()),
SERDE_LABELS,
)
.expect("create rivetkit_deserialize_duration_seconds histogram");

register_metric(&rivet_metrics::REGISTRY, serialize_size.clone());
register_metric(&rivet_metrics::REGISTRY, deserialize_size.clone());
register_metric(&rivet_metrics::REGISTRY, serialize_duration_seconds.clone());
register_metric(
&rivet_metrics::REGISTRY,
deserialize_duration_seconds.clone(),
);

Self {
serialize_size,
deserialize_size,
serialize_duration_seconds,
deserialize_duration_seconds,
}
}
}

/// Records the duration and output size of a serialization producing `Vec<u8>`.
///
/// The size is only recorded when the closure succeeds.
pub(crate) fn measure_serialize(
format: &str,
location: &str,
f: impl FnOnce() -> anyhow::Result<Vec<u8>>,
) -> anyhow::Result<Vec<u8>> {
let started = Instant::now();
let result = f();
observe(
&METRICS.serialize_duration_seconds,
format,
location,
started.elapsed(),
);
if let Ok(bytes) = &result {
observe_size(&METRICS.serialize_size, format, location, bytes.len());
}
result
}

/// Records the duration and input size of a deserialization.
///
/// The input size is recorded unconditionally because the bytes are available
/// regardless of whether decoding succeeds.
pub(crate) fn measure_deserialize<T>(
format: &str,
location: &str,
input_len: usize,
f: impl FnOnce() -> anyhow::Result<T>,
) -> anyhow::Result<T> {
observe_size(&METRICS.deserialize_size, format, location, input_len);
let started = Instant::now();
let result = f();
observe(
&METRICS.deserialize_duration_seconds,
format,
location,
started.elapsed(),
);
result
}

fn observe(metric: &HistogramVec, format: &str, location: &str, elapsed: Duration) {
metric
.with_label_values(&[format, location])
.observe(elapsed.as_secs_f64());
}

fn observe_size(metric: &HistogramVec, format: &str, location: &str, size: usize) {
metric
.with_label_values(&[format, location])
.observe(size as f64);
}

fn register_metric<M>(registry: &Registry, metric: M)
where
M: rivet_metrics::prometheus::core::Collector + Clone + Send + Sync + 'static,
{
if let Err(error) = registry.register(Box::new(metric)) {
tracing::warn!(
?error,
"serde metric registration failed, using existing collector"
);
}
}
Loading