Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions engine/artifacts/config-schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 0 additions & 25 deletions engine/packages/config/src/config/api_public.rs

This file was deleted.

11 changes: 0 additions & 11 deletions engine/packages/config/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use serde::{Deserialize, Serialize};
use std::sync::LazyLock;

pub mod api_peer;
pub mod api_public;
pub mod auth;
pub mod cache;
pub mod clickhouse;
Expand All @@ -21,7 +20,6 @@ pub mod telemetry;
pub mod topology;

pub use api_peer::*;
pub use api_public::*;
pub use auth::*;
pub use cache::*;
pub use clickhouse::*;
Expand Down Expand Up @@ -74,9 +72,6 @@ pub struct Root {
#[serde(default)]
pub guard: Option<Guard>,

#[serde(default)]
pub api_public: Option<ApiPublic>,

#[serde(default)]
pub api_peer: Option<ApiPeer>,

Expand Down Expand Up @@ -122,7 +117,6 @@ impl Default for Root {
Root {
auth: None,
guard: None,
api_public: None,
api_peer: None,
pegboard: None,
logs: None,
Expand All @@ -146,11 +140,6 @@ impl Root {
self.guard.as_ref().unwrap_or(&DEFAULT)
}

pub fn api_public(&self) -> &ApiPublic {
static DEFAULT: LazyLock<ApiPublic> = LazyLock::new(ApiPublic::default);
self.api_public.as_ref().unwrap_or(&DEFAULT)
}

pub fn api_peer(&self) -> &ApiPeer {
static DEFAULT: LazyLock<ApiPeer> = LazyLock::new(ApiPeer::default);
self.api_peer.as_ref().unwrap_or(&DEFAULT)
Expand Down
2 changes: 2 additions & 0 deletions engine/packages/engine/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ fn main() -> Result<()> {
}

async fn main_inner() -> Result<()> {
tracing::info!(version=%build_meta::VERSION, git_sha=%build_meta::GIT_SHA, built_at=%build_meta::BUILD_TIMESTAMP, "starting rivet");

let cli = Cli::parse();

// Load config
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/metrics-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn run_standalone(config: rivet_config::Config) -> Result<()> {
Ok::<_, hyper::Error>(service_fn(serve_req))
}));

tracing::info!(?host, ?port, "started metrics server");
tracing::debug!(?host, ?port, "started metrics server");
server.await?;

Ok(())
Expand Down
9 changes: 7 additions & 2 deletions engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,14 @@ pub async fn task_inner(
// backpressure to the runner rather than dropping protocol messages.
let mut rate_limit = rivet_util::throttle::RateLimiter::new(
rivet_util::throttle::RateLimitMethod::LeakyBucket {
requests: ctx.config().pegboard().envoy_websocket_rate_limit_requests(),
requests: ctx
.config()
.pegboard()
.envoy_websocket_rate_limit_requests(),
drip_rate: Duration::from_micros(
ctx.config().pegboard().envoy_websocket_rate_limit_drip_rate_us(),
ctx.config()
.pegboard()
.envoy_websocket_rate_limit_drip_rate_us(),
),
},
);
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub struct SharedState(Arc<SharedStateInner>);
impl SharedState {
pub fn new(config: &rivet_config::Config, ups: PubSub) -> Self {
let gateway_id = protocol::util::generate_gateway_id();
tracing::info!(gateway_id = %protocol::util::id_to_string(&gateway_id), "setting up shared state for gateway");
tracing::debug!(gateway_id = %protocol::util::id_to_string(&gateway_id), "setting up shared state for gateway");
let receiver_subject = GatewayReceiverSubject::new(gateway_id);

let pegboard_config = config.pegboard();
Expand Down
6 changes: 0 additions & 6 deletions engine/packages/pegboard-gateway2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,6 @@ lazy_static::lazy_static! {
&["namespace_id", "pool_name", "protocol", "reason"],
*REGISTRY
).unwrap();
pub static ref SHUTDOWN_IN_FLIGHT_ABORTED_TOTAL: IntCounter =
register_int_counter_with_registry!(
"gateway2_shutdown_in_flight_aborted_total",
"In-flight gateway requests abandoned on pod shutdown without sending close.",
*REGISTRY
).unwrap();
pub static ref MSG_SENT_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
"gateway2_msg_sent_total",
"Count of total of tunnel messages sent.",
Expand Down
20 changes: 1 addition & 19 deletions engine/packages/pegboard-gateway2/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl SharedState {
init_slow_ping_threshold_from_env();

let gateway_id = protocol::util::generate_gateway_id();
tracing::info!(gateway_id = %display_id(&gateway_id), "setting up shared state for gateway");
tracing::debug!(gateway_id = %display_id(&gateway_id), "setting up shared state for gateway");
let receiver_subject = GatewayReceiverSubject::new(gateway_id);

let pegboard_config = config.pegboard();
Expand Down Expand Up @@ -194,27 +194,9 @@ impl SharedState {
let self_clone = self.clone();
tokio::spawn(async move { self_clone.gc().await });

let self_clone = self.clone();
tokio::spawn(async move { self_clone.shutdown_watcher().await });

Ok(())
}

#[tracing::instrument(skip_all)]
async fn shutdown_watcher(&self) {
let mut term_signal = __rivet_runtime::TermSignal::get();
term_signal.recv().await;

let in_flight_aborted = self.in_flight_requests.len();
if in_flight_aborted > 0 {
metrics::SHUTDOWN_IN_FLIGHT_ABORTED_TOTAL.inc_by(in_flight_aborted as u64);
}
tracing::info!(
in_flight_aborted,
"gateway shutdown in-flight requests abandoned without close"
);
}

#[tracing::instrument(skip_all)]
async fn receiver(&self) {
// Automatically resubscribe if unsubscribed
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/perf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Drop for PerfMeasure {

let elapsed = self.start.elapsed();
let _guard = self.span.enter();
tracing::warn!(
tracing::debug!(
name = self.name,
elapsed_ms = PerfMeasure::__elapsed_ms(elapsed),
"PerfMeasure dropped without finish() - measurement discarded",
Expand Down
2 changes: 0 additions & 2 deletions engine/packages/service-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ pub async fn start(
let shutting_down = Arc::new(AtomicBool::new(false));

for service in services {
tracing::debug!(name=%service.name, kind=?service.kind, "server starting service");

match service.kind.behavior() {
ServiceBehavior::Service => {
let config = config.clone();
Expand Down
1 change: 0 additions & 1 deletion engine/packages/test-deps/src/datacenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ pub async fn setup_single_datacenter(
let mut root = rivet_config::config::Root::default();
root.database = Some(db_config);
root.pubsub = Some(pubsub_config);
root.api_public = Some(Default::default());
root.api_peer = Some(rivet_config::config::ApiPeer {
port: Some(api_peer_port),
..Default::default()
Expand Down
1 change: 1 addition & 0 deletions engine/packages/universaldb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ rivet-config.workspace = true
rivet-env.workspace = true
rivet-pools.workspace = true
rivet-test-deps-docker.workspace = true
tokio-postgres.workspace = true
tracing-subscriber.workspace = true
7 changes: 6 additions & 1 deletion engine/packages/universaldb/src/driver/postgres/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ impl PostgresConfig {
pub struct PostgresDatabaseDriver {
shared: Arc<PostgresShared>,
max_retries: AtomicI32,
resolver_handle: JoinHandle<()>,
gc_handle: JoinHandle<()>,
}

Expand Down Expand Up @@ -112,13 +113,14 @@ impl PostgresDatabaseDriver {
let shared = PostgresShared::new(pool, node_id, listener);

// Every node runs the resolver; only the elected leader drains the commit queue.
resolver::spawn(shared.clone());
let resolver_handle = resolver::spawn(shared.clone());

let gc_handle = Self::spawn_gc(shared.clone());

Ok(PostgresDatabaseDriver {
shared,
max_retries: AtomicI32::new(100),
resolver_handle,
gc_handle,
})
}
Expand Down Expand Up @@ -292,6 +294,9 @@ impl DatabaseDriver for PostgresDatabaseDriver {

impl Drop for PostgresDatabaseDriver {
fn drop(&mut self) {
// Abort the resolver so a dropped node stops renewing its lease; the lease then expires and
// another node can take over. Without this a dropped leader would renew its lease forever.
self.resolver_handle.abort();
self.gc_handle.abort();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ enum DrainOutcome {
}

/// Spawn the per-process resolver task. Every node runs this; only the elected leader drains the
/// commit queue.
pub fn spawn(shared: Arc<PostgresShared>) {
tokio::spawn(run(shared));
/// commit queue. The returned handle is aborted when the owning driver drops, which stops lease
/// renewal so the lease expires and another node can take over (node-death / failover path).
pub fn spawn(shared: Arc<PostgresShared>) -> tokio::task::JoinHandle<()> {
tokio::spawn(run(shared))
}

async fn run(shared: Arc<PostgresShared>) {
Expand Down
Loading
Loading