Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions engine/artifacts/errors/actor.shutdown_timeout.json

This file was deleted.

22 changes: 1 addition & 21 deletions rivetkit-rust/packages/rivetkit-core/src/actor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ impl Default for CanHibernateWebSocket {
}
}

#[derive(Clone, Debug, Default)]
pub struct ActorConfigOverrides {
pub sleep_grace_period: Option<Duration>,
}

#[derive(Clone, Debug)]
pub struct ActionDefinition {
pub name: String,
Expand Down Expand Up @@ -84,7 +79,6 @@ pub struct ActorConfig {
pub max_outgoing_message_size: u32,
pub preload_max_workflow_bytes: Option<u64>,
pub preload_max_connections_bytes: Option<u64>,
pub overrides: Option<ActorConfigOverrides>,
pub actions: Vec<ActionDefinition>,
/// Author-declared inspector tab entries (custom tabs + built-in
/// hides). Validated upstream (Zod / builder).
Expand Down Expand Up @@ -201,12 +195,7 @@ impl ActorConfig {
}

pub fn effective_sleep_grace_period(&self) -> Duration {
cap_duration(
self.sleep_grace_period,
self.overrides
.as_ref()
.and_then(|overrides| overrides.sleep_grace_period),
)
self.sleep_grace_period
}

/// Runtime authority for rejecting malformed config that bypassed the
Expand Down Expand Up @@ -247,21 +236,12 @@ impl Default for ActorConfig {
max_outgoing_message_size: DEFAULT_MAX_OUTGOING_MESSAGE_SIZE,
preload_max_workflow_bytes: None,
preload_max_connections_bytes: None,
overrides: None,
actions: Vec::new(),
inspector_tabs: Vec::new(),
}
}
}

fn cap_duration(duration: Duration, override_duration: Option<Duration>) -> Duration {
if let Some(override_duration) = override_duration {
duration.min(override_duration)
} else {
duration
}
}

fn duration_ms(value: u32) -> Duration {
Duration::from_millis(u64::from(value))
}
Expand Down
18 changes: 0 additions & 18 deletions rivetkit-rust/packages/rivetkit-core/src/actor/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ pub const PERSIST_DATA_KEY: &[u8] = &[1];
pub const CONN_PREFIX: [u8; 1] = [2];
// The inspector auth token lives at [3].
pub const INSPECTOR_TOKEN_KEY: [u8; 1] = [3];
// User KV entries live under [4, ...user_key].
pub const KV_PREFIX: [u8; 1] = [4];
// Queue storage lives under [5, ...].
pub const QUEUE_PREFIX: [u8; 1] = [5];
// Workflow storage lives under [6, ...].
Expand Down Expand Up @@ -72,22 +70,6 @@ struct QueueInvalidMessageKey {
reason: String,
}

pub fn make_prefixed_key(key: &[u8]) -> Vec<u8> {
concat_prefix(&KV_PREFIX, key)
}

pub fn remove_prefix_from_key(prefixed_key: &[u8]) -> &[u8] {
&prefixed_key[KV_PREFIX.len()..]
}

pub fn make_workflow_key(key: &[u8]) -> Vec<u8> {
concat_prefix(&WORKFLOW_STORAGE_PREFIX, key)
}

pub fn make_traces_key(key: &[u8]) -> Vec<u8> {
concat_prefix(&TRACES_STORAGE_PREFIX, key)
}

pub fn make_connection_key(conn_id: &str) -> Vec<u8> {
concat_prefix(&CONN_PREFIX, conn_id.as_bytes())
}
Expand Down
8 changes: 0 additions & 8 deletions rivetkit-rust/packages/rivetkit-core/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ impl Request {
)
}

pub fn into_inner(self) -> http::Request<Vec<u8>> {
self.0
}

pub fn into_body(self) -> Vec<u8> {
self.0.into_body()
}
Expand Down Expand Up @@ -154,10 +150,6 @@ impl Response {
)
}

pub fn into_inner(self) -> http::Response<Vec<u8>> {
self.0
}

pub fn into_body(self) -> Vec<u8> {
self.0.into_body()
}
Expand Down
4 changes: 2 additions & 2 deletions rivetkit-rust/packages/rivetkit-core/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod task_types;
pub(crate) mod work_registry;

pub use action::ActionDispatchError;
pub use config::{ActionDefinition, ActorConfig, ActorConfigOverrides, CanHibernateWebSocket};
pub use config::{ActionDefinition, ActorConfig, CanHibernateWebSocket};
pub use connection::ConnHandle;
pub use context::{ActorContext, ActorWorkRegion, KeepAwakeRegion, WebSocketCallbackRegion};
pub use factory::{ActorEntryFn, ActorFactory};
Expand All @@ -40,5 +40,5 @@ pub use task::{
ActionDispatchResult, ActorTask, DispatchCommand, HttpDispatchResult, LifecycleCommand,
LifecycleEvent, LifecycleState,
};
pub use task_types::{ActorChildOutcome, ShutdownKind, StateMutationReason, UserTaskKind};
pub use task_types::{ShutdownKind, StateMutationReason, UserTaskKind};
pub use work_registry::{ActorWorkKind, ActorWorkPolicy};
14 changes: 0 additions & 14 deletions rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,20 +141,6 @@ impl SqliteDb {
self.backend
}

pub async fn get_pages(
&self,
request: protocol::SqliteGetPagesRequest,
) -> Result<protocol::SqliteGetPagesResponse> {
self.handle()?.sqlite_get_pages(request).await
}

pub async fn commit(
&self,
request: protocol::SqliteCommitRequest,
) -> Result<protocol::SqliteCommitResponse> {
self.handle()?.sqlite_commit(request).await
}

pub async fn open(&self) -> Result<()> {
match self.backend {
SqliteBackend::LocalNative => {
Expand Down
22 changes: 0 additions & 22 deletions rivetkit-rust/packages/rivetkit-core/src/actor/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,15 +511,6 @@ impl ActorContext {
self.0.persisted.read().scheduled_events.clone()
}

pub fn set_scheduled_events(&self, scheduled_events: Vec<PersistedScheduleEvent>) {
self.0.persisted.write().scheduled_events = scheduled_events;
self.0
.metrics
.inc_state_mutation(StateMutationReason::ScheduledEventsUpdate);
self.mark_dirty();
self.schedule_save(None);
}

pub(crate) fn update_scheduled_events<R>(
&self,
update: impl FnOnce(&mut Vec<PersistedScheduleEvent>) -> R,
Expand All @@ -537,19 +528,6 @@ impl ActorContext {
result
}

pub fn set_input(&self, input: Option<Vec<u8>>) {
self.0.persisted.write().input = input;
self.0
.metrics
.inc_state_mutation(StateMutationReason::InputSet);
self.mark_dirty();
self.schedule_save(None);
}

pub fn input(&self) -> Option<Vec<u8>> {
self.0.persisted.read().input.clone()
}

pub fn set_has_initialized(&self, has_initialized: bool) {
{
let mut persisted = self.0.persisted.write();
Expand Down
34 changes: 0 additions & 34 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task_types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use std::{any::Any, fmt};

use anyhow::Result;

#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum LifecycleState {
#[default]
Expand Down Expand Up @@ -66,7 +62,6 @@ impl UserTaskKind {
pub enum StateMutationReason {
InternalReplace,
ScheduledEventsUpdate,
InputSet,
HasInitialized,
}

Expand All @@ -75,36 +70,7 @@ impl StateMutationReason {
match self {
Self::InternalReplace => "internal_replace",
Self::ScheduledEventsUpdate => "scheduled_events_update",
Self::InputSet => "input_set",
Self::HasInitialized => "has_initialized",
}
}
}

pub enum ActorChildOutcome {
UserTaskFinished {
kind: UserTaskKind,
result: Result<()>,
},
UserTaskPanicked {
kind: UserTaskKind,
payload: Box<dyn Any + Send>,
},
}

impl fmt::Debug for ActorChildOutcome {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ActorChildOutcome::UserTaskFinished { kind, result } => f
.debug_struct("UserTaskFinished")
.field("kind", kind)
.field("result", result)
.finish(),
ActorChildOutcome::UserTaskPanicked { kind, .. } => f
.debug_struct("UserTaskPanicked")
.field("kind", kind)
.field("payload", &"<panic payload>")
.finish(),
}
}
}
23 changes: 0 additions & 23 deletions rivetkit-rust/packages/rivetkit-core/src/actor/vars.rs

This file was deleted.

3 changes: 0 additions & 3 deletions rivetkit-rust/packages/rivetkit-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ pub enum ActorLifecycle {
#[error("destroying", "Actor is destroying.")]
Destroying,

#[error("shutdown_timeout", "Actor shutdown timed out.")]
ShutdownTimeout,

#[error("dropped_reply", "Actor reply channel was dropped without a response.")]
DroppedReply,

Expand Down
6 changes: 2 additions & 4 deletions rivetkit-rust/packages/rivetkit-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,7 @@ pub mod websocket;
pub use actor::{kv, sqlite};

pub use actor::action::ActionDispatchError;
pub use actor::config::{
ActionDefinition, ActorConfig, ActorConfigInput, ActorConfigOverrides, CanHibernateWebSocket,
};
pub use actor::config::{ActionDefinition, ActorConfig, ActorConfigInput, CanHibernateWebSocket};
pub use actor::connection::ConnHandle;
pub use actor::context::{ActorContext, ActorWorkRegion, KeepAwakeRegion, WebSocketCallbackRegion};
pub use actor::factory::{ActorEntryFn, ActorFactory};
Expand All @@ -141,7 +139,7 @@ pub use actor::work_registry::{ActorWorkKind, ActorWorkPolicy};
pub use error::ActorLifecycle;
pub use inspector::{Inspector, InspectorSnapshot};
pub use registry::{CoreRegistry, EngineSpawnMode, ServeConfig};
pub use runtime::{RuntimeBoxFuture, RuntimeSpawner, boxed_runtime_future};
pub use runtime::{RuntimeBoxFuture, RuntimeSpawner};
pub use serverless::{CoreServerlessRuntime, ServerlessRequest, ServerlessResponse};
pub use types::{
ActorKey, ActorKeySegment, ConnId, ListOpts, SaveStateOpts, WsMessage, format_actor_key,
Expand Down
7 changes: 0 additions & 7 deletions rivetkit-rust/packages/rivetkit-core/src/metrics_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,6 @@ pub fn render_prometheus_metrics() -> Result<RenderedMetrics> {
})
}

pub fn authorization_bearer_token(headers: &http::HeaderMap) -> Option<&str> {
headers
.get(http::header::AUTHORIZATION)
.and_then(|value| value.to_str().ok())
.and_then(bearer_token_from_authorization)
}

pub fn authorization_bearer_token_map(headers: &HashMap<String, String>) -> Option<&str> {
headers
.iter()
Expand Down
54 changes: 0 additions & 54 deletions rivetkit-rust/packages/rivetkit-core/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,57 +79,3 @@ impl RuntimeSpawner {
tokio::task::spawn_local(future)
}
}

#[cfg(feature = "native-runtime")]
pub fn boxed_runtime_future<F, T>(future: F) -> RuntimeBoxFuture<T>
where
F: Future<Output = T> + Send + 'static,
{
Box::pin(future)
}

#[cfg(not(any(feature = "native-runtime", feature = "wasm-runtime")))]
pub fn boxed_runtime_future<F, T>(future: F) -> RuntimeBoxFuture<T>
where
F: Future<Output = T> + Send + 'static,
{
Box::pin(future)
}

#[cfg(feature = "wasm-runtime")]
pub fn boxed_runtime_future<F, T>(future: F) -> RuntimeBoxFuture<T>
where
F: Future<Output = T> + 'static,
{
Box::pin(future)
}

#[cfg(all(test, feature = "wasm-runtime"))]
mod tests {
use std::cell::RefCell;
use std::rc::Rc;

use super::{RuntimeBoxFuture, boxed_runtime_future};

fn accepts_wasm_local_callback(
callback: impl Fn() -> RuntimeBoxFuture<()> + 'static,
) -> impl Fn() -> RuntimeBoxFuture<()> {
callback
}

#[test]
fn wasm_runtime_box_future_accepts_local_callbacks() {
let state = Rc::new(RefCell::new(0));
let callback = accepts_wasm_local_callback({
let state = state.clone();
move || {
let state = state.clone();
boxed_runtime_future(async move {
*state.borrow_mut() += 1;
})
}
});

let _future = callback();
}
}
6 changes: 0 additions & 6 deletions rivetkit-rust/packages/rivetkit-core/src/serverless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,6 @@ impl CoreServerlessRuntime {
}
}

pub async fn active_envoy_actor_count(&self) -> Option<usize> {
self.active_envoy_status()
.await
.map(|status| status.active_actor_count)
}

pub async fn active_envoy_status(&self) -> Option<CoreEnvoyStatus> {
self.envoy
.lock()
Expand Down
6 changes: 0 additions & 6 deletions rivetkit-rust/packages/rivetkit-core/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,6 @@ impl WebSocket {
}))
}

pub fn from_sender(sender: WebSocketSender) -> Self {
let websocket = Self::new();
websocket.configure_sender(sender);
websocket
}

pub fn send(&self, msg: WsMessage) {
if let Err(error) = self.try_send(msg) {
tracing::error!(?error, "failed to send websocket message");
Expand Down
Loading
Loading