From 5d8e1464c8a67cfca4be2f3d1e134f97f9351753 Mon Sep 17 00:00:00 2001 From: Robert Zieba Date: Fri, 27 Mar 2026 11:37:47 -0700 Subject: [PATCH 1/3] type-c-service: Fix receiving power policy events --- embedded-service/src/event.rs | 40 ++++- examples/rt685s-evk/src/bin/type_c.rs | 32 +++- examples/rt685s-evk/src/bin/type_c_cfu.rs | 48 +++-- examples/std/src/bin/type_c/service.rs | 50 ++++-- examples/std/src/bin/type_c/ucsi.rs | 178 ++++--------------- examples/std/src/bin/type_c/unconstrained.rs | 48 +++-- power-policy-interface/src/service/event.rs | 2 +- type-c-service/src/service/mod.rs | 35 +--- type-c-service/src/service/pd.rs | 9 +- type-c-service/src/service/port.rs | 6 +- type-c-service/src/service/power.rs | 37 ++-- type-c-service/src/service/ucsi.rs | 6 +- type-c-service/src/service/vdm.rs | 9 +- type-c-service/src/task.rs | 37 ++-- 14 files changed, 242 insertions(+), 295 deletions(-) diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index dfabd3d8..f0c4bf6f 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -1,8 +1,12 @@ //! Common traits for event senders and receivers +use core::{future::ready, marker::PhantomData}; -use core::marker::PhantomData; +use crate::error; -use embassy_sync::channel::{DynamicReceiver, DynamicSender}; +use embassy_sync::{ + channel::{DynamicReceiver, DynamicSender}, + pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult}, +}; /// Common event sender trait pub trait Sender { @@ -44,6 +48,38 @@ impl Receiver for DynamicReceiver<'_, E> { } } +impl Sender for DynImmediatePublisher<'_, E> { + fn try_send(&mut self, event: E) -> Option<()> { + self.try_publish(event).ok() + } + + fn send(&mut self, event: E) -> impl Future { + self.publish_immediate(event); + ready(()) + } +} + +impl Receiver for DynSubscriber<'_, E> { + fn try_next(&mut self) -> Option { + match self.try_next_message() { + Some(WaitResult::Message(e)) => Some(e), + Some(WaitResult::Lagged(e)) => { + error!("Subscriber lagged, skipping {} events", e); + None + } + _ => None, + } + } + + async fn wait_next(&mut self) -> E { + loop { + if let WaitResult::Message(e) = self.next_message().await { + return e; + } + } + } +} + /// A sender that discards all events sent to it. pub struct NoopSender; diff --git a/examples/rt685s-evk/src/bin/type_c.rs b/examples/rt685s-evk/src/bin/type_c.rs index 629230a7..f2895bc2 100644 --- a/examples/rt685s-evk/src/bin/type_c.rs +++ b/examples/rt685s-evk/src/bin/type_c.rs @@ -12,11 +12,11 @@ use embassy_imxrt::{bind_interrupts, peripherals}; use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; -use embassy_sync::pubsub::PubSubChannel; +use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}; use embassy_time::{self as _, Delay}; use embedded_cfu_protocol::protocol_definitions::{FwUpdateOffer, FwUpdateOfferResponse, FwVersion, HostToken}; use embedded_services::GlobalRawMutex; -use embedded_services::event::NoopSender; +use embedded_services::event::MapSender; use embedded_services::{error, info}; use embedded_usb_pd::GlobalPortId; use power_policy_interface::psu; @@ -66,11 +66,25 @@ type Wrapper<'a> = ControllerWrapper< type Controller<'a> = tps6699x::controller::Controller>; type Interrupt<'a> = tps6699x::Interrupt<'a, GlobalRawMutex, BusDevice<'a>>; +type PowerPolicySenderType = MapSender< + power_policy_interface::service::event::Event<'static, DeviceType>, + power_policy_interface::service::event::EventData, + DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>, + fn( + power_policy_interface::service::event::Event<'static, DeviceType>, + ) -> power_policy_interface::service::event::EventData, +>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, - power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 2, NoopSender, 1>>, + power_policy_service::service::Service< + 'static, + ArrayRegistration<'static, DeviceType, 2, PowerPolicySenderType, 1>, + >, >; +type ServiceType = Service<'static, DynSubscriber<'static, power_policy_interface::service::event::EventData>>; + #[embassy_executor::task] async fn pd_controller_task(controller: &'static Wrapper<'static>) { loop { @@ -95,7 +109,7 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static Service<'static, DeviceType>, + service: &'static ServiceType, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { @@ -196,11 +210,12 @@ async fn main(spawner: Spawner) { // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot static POWER_POLICY_CHANNEL: StaticCell< - PubSubChannel, 4, 1, 0>, + PubSubChannel, > = StaticCell::new(); let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); - let power_policy_publisher = power_policy_channel.dyn_immediate_publisher(); + let power_policy_sender: PowerPolicySenderType = + MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into()); // Guaranteed to not panic since we initialized the channel above let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); @@ -210,7 +225,7 @@ async fn main(spawner: Spawner) { let power_policy_registration = ArrayRegistration { psus: [&wrapper.ports[0].proxy, &wrapper.ports[1].proxy], - service_senders: [NoopSender], + service_senders: [power_policy_sender], }; static POWER_SERVICE: StaticCell = StaticCell::new(); @@ -220,11 +235,10 @@ async fn main(spawner: Spawner) { power_policy_service::service::config::Config::default(), ))); - static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + static TYPE_C_SERVICE: StaticCell = StaticCell::new(); let type_c_service = TYPE_C_SERVICE.init(Service::create( Default::default(), controller_context, - power_policy_publisher, power_policy_subscriber, )); diff --git a/examples/rt685s-evk/src/bin/type_c_cfu.rs b/examples/rt685s-evk/src/bin/type_c_cfu.rs index 7defa355..da4322d1 100644 --- a/examples/rt685s-evk/src/bin/type_c_cfu.rs +++ b/examples/rt685s-evk/src/bin/type_c_cfu.rs @@ -13,13 +13,13 @@ use embassy_imxrt::{bind_interrupts, peripherals}; use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; -use embassy_sync::pubsub::PubSubChannel; +use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}; use embassy_time::Timer; use embassy_time::{self as _, Delay}; use embedded_cfu_protocol::protocol_definitions::*; use embedded_cfu_protocol::protocol_definitions::{FwUpdateOffer, FwUpdateOfferResponse, FwVersion}; use embedded_services::GlobalRawMutex; -use embedded_services::event::NoopSender; +use embedded_services::event::MapSender; use embedded_services::{error, info}; use embedded_usb_pd::GlobalPortId; use power_policy_interface::psu; @@ -64,11 +64,25 @@ type Wrapper<'a> = ControllerWrapper< type Controller<'a> = tps6699x::controller::Controller>; type Interrupt<'a> = tps6699x::Interrupt<'a, GlobalRawMutex, BusDevice<'a>>; +type PowerPolicySenderType = MapSender< + power_policy_interface::service::event::Event<'static, DeviceType>, + power_policy_interface::service::event::EventData, + DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>, + fn( + power_policy_interface::service::event::Event<'static, DeviceType>, + ) -> power_policy_interface::service::event::EventData, +>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, - power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 2, NoopSender, 1>>, + power_policy_service::service::Service< + 'static, + ArrayRegistration<'static, DeviceType, 2, PowerPolicySenderType, 1>, + >, >; +type ServiceType = Service<'static, DynSubscriber<'static, power_policy_interface::service::event::EventData>>; + const NUM_PD_CONTROLLERS: usize = 1; const CONTROLLER0_ID: ControllerId = ControllerId(0); const CONTROLLER0_CFU_ID: ComponentId = 0x12; @@ -179,7 +193,7 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static Service<'static, DeviceType>, + service: &'static ServiceType, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { @@ -286,9 +300,20 @@ async fn main(spawner: Spawner) { static POWER_SERVICE_CONTEXT: StaticCell = StaticCell::new(); let power_service_context = POWER_SERVICE_CONTEXT.init(power_policy_service::service::context::Context::new()); + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot + static POWER_POLICY_CHANNEL: StaticCell< + PubSubChannel, + > = StaticCell::new(); + + let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); + let power_policy_sender: PowerPolicySenderType = + MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into()); + // Guaranteed to not panic since we initialized the channel above + let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); + let power_policy_registration = ArrayRegistration { psus: [&wrapper.ports[0].proxy, &wrapper.ports[1].proxy], - service_senders: [NoopSender], + service_senders: [power_policy_sender], }; static POWER_SERVICE: StaticCell = StaticCell::new(); @@ -298,21 +323,10 @@ async fn main(spawner: Spawner) { power_policy_service::service::config::Config::default(), ))); - // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot - static POWER_POLICY_CHANNEL: StaticCell< - PubSubChannel, 4, 1, 0>, - > = StaticCell::new(); - - let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); - let power_policy_publisher = power_policy_channel.dyn_immediate_publisher(); - // Guaranteed to not panic since we initialized the channel above - let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); - - static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + static TYPE_C_SERVICE: StaticCell = StaticCell::new(); let type_c_service = TYPE_C_SERVICE.init(Service::create( Default::default(), controller_context, - power_policy_publisher, power_policy_subscriber, )); diff --git a/examples/std/src/bin/type_c/service.rs b/examples/std/src/bin/type_c/service.rs index bb6f1d7b..eb5c5349 100644 --- a/examples/std/src/bin/type_c/service.rs +++ b/examples/std/src/bin/type_c/service.rs @@ -3,10 +3,10 @@ use embassy_executor::{Executor, Spawner}; use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; -use embassy_sync::pubsub::PubSubChannel; +use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}; use embassy_time::Timer; use embedded_services::GlobalRawMutex; -use embedded_services::event::NoopSender; +use embedded_services::event::MapSender; use embedded_usb_pd::GlobalPortId; use embedded_usb_pd::ado::Ado; use embedded_usb_pd::type_c::Current; @@ -33,11 +33,25 @@ const DELAY_MS: u64 = 1000; type DeviceType = Mutex>; +type PowerPolicySenderType = MapSender< + power_policy_interface::service::event::Event<'static, DeviceType>, + power_policy_interface::service::event::EventData, + DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>, + fn( + power_policy_interface::service::event::Event<'static, DeviceType>, + ) -> power_policy_interface::service::event::EventData, +>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, - power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 1, NoopSender, 1>>, + power_policy_service::service::Service< + 'static, + ArrayRegistration<'static, DeviceType, 1, PowerPolicySenderType, 1>, + >, >; +type ServiceType = Service<'static, DynSubscriber<'static, power_policy_interface::service::event::EventData>>; + #[embassy_executor::task] async fn controller_task( wrapper: &'static Wrapper<'static>, @@ -80,9 +94,21 @@ async fn task(spawner: Spawner) { let (wrapper, policy_receiver, controller, state) = create_wrapper(controller_context); + // Create type-c service + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot + static POWER_POLICY_CHANNEL: StaticCell< + PubSubChannel, + > = StaticCell::new(); + + let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); + let power_policy_sender: PowerPolicySenderType = + MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into()); + // Guaranteed to not panic since we initialized the channel above + let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); + let power_policy_registration = ArrayRegistration { psus: [&wrapper.ports[0].proxy], - service_senders: [NoopSender], + service_senders: [power_policy_sender], }; static POWER_SERVICE: StaticCell = StaticCell::new(); @@ -92,22 +118,10 @@ async fn task(spawner: Spawner) { power_policy_service::service::config::Config::default(), ))); - // Create type-c service - // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot - static POWER_POLICY_CHANNEL: StaticCell< - PubSubChannel, 4, 1, 0>, - > = StaticCell::new(); - - let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); - let power_policy_publisher = power_policy_channel.dyn_immediate_publisher(); - // Guaranteed to not panic since we initialized the channel above - let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); - - static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + static TYPE_C_SERVICE: StaticCell = StaticCell::new(); let type_c_service = TYPE_C_SERVICE.init(Service::create( Config::default(), controller_context, - power_policy_publisher, power_policy_subscriber, )); @@ -156,7 +170,7 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static Service<'static, DeviceType>, + service: &'static ServiceType, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { diff --git a/examples/std/src/bin/type_c/ucsi.rs b/examples/std/src/bin/type_c/ucsi.rs index ce9e7d3e..bdee8ce6 100644 --- a/examples/std/src/bin/type_c/ucsi.rs +++ b/examples/std/src/bin/type_c/ucsi.rs @@ -5,10 +5,10 @@ use embassy_executor::{Executor, Spawner}; use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender}; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; -use embassy_sync::pubsub::PubSubChannel; +use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}; use embedded_services::GlobalRawMutex; use embedded_services::IntrusiveList; -use embedded_services::event::NoopSender; +use embedded_services::event::MapSender; use embedded_usb_pd::GlobalPortId; use embedded_usb_pd::ucsi::lpm::get_connector_capability::OperationModeFlags; use embedded_usb_pd::ucsi::ppm::ack_cc_ci::Ack; @@ -22,10 +22,10 @@ use power_policy_service::psu::ArrayEventReceivers; use power_policy_service::service::registration::ArrayRegistration; use static_cell::StaticCell; use std_examples::type_c::mock_controller; +use type_c_interface::port::ControllerId; use type_c_interface::service::context::Context; use type_c_service::service::Service; use type_c_service::service::config::Config; -use type_c_interface::port::ControllerId; use type_c_service::wrapper::backing::Storage; use type_c_service::wrapper::proxy::PowerProxyDevice; @@ -39,140 +39,28 @@ const CFU1_ID: u8 = 0x01; type DeviceType = Mutex>; +type PowerPolicySenderType = MapSender< + power_policy_interface::service::event::Event<'static, DeviceType>, + power_policy_interface::service::event::EventData, + DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>, + fn( + power_policy_interface::service::event::Event<'static, DeviceType>, + ) -> power_policy_interface::service::event::EventData, +>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, - power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 2, NoopSender, 1>>, + power_policy_service::service::Service< + 'static, + ArrayRegistration<'static, DeviceType, 2, PowerPolicySenderType, 1>, + >, >; +type ServiceType = Service<'static, DynSubscriber<'static, power_policy_interface::service::event::EventData>>; + #[embassy_executor::task] async fn opm_task(_context: &'static Context, _state: [&'static mock_controller::ControllerState; NUM_PD_CONTROLLERS]) { - /*const CAPABILITY: PowerCapability = PowerCapability { - voltage_mv: 20000, - current_ma: 5000, - }; - - info!("Resetting PPM..."); - let response: UcsiResponseResult = context - .execute_ucsi_command_external(Command::PpmCommand(ppm::Command::PpmReset)) - .await - .into(); - let response = response.unwrap(); - if !response.cci.reset_complete() || response.cci.error() { - error!("PPM reset failed: {:?}", response.cci); - } else { - info!("PPM reset successful"); - } - - info!("Set Notification enable..."); - let mut notifications = NotificationEnable::default(); - notifications.set_cmd_complete(true); - notifications.set_connect_change(true); - let response: UcsiResponseResult = context - .execute_ucsi_command_external(Command::PpmCommand(ppm::Command::SetNotificationEnable( - ppm::set_notification_enable::Args { - notification_enable: notifications, - }, - ))) - .await - .into(); - let response = response.unwrap(); - if !response.cci.cmd_complete() || response.cci.error() { - error!("Set Notification enable failed: {:?}", response.cci); - } else { - info!("Set Notification enable successful"); - } - - info!("Sending command complete ack..."); - let response: UcsiResponseResult = context - .execute_ucsi_command_external(Command::PpmCommand(ppm::Command::AckCcCi(ppm::ack_cc_ci::Args { - ack: *Ack::default().set_command_complete(true), - }))) - .await - .into(); - let response = response.unwrap(); - if !response.cci.ack_command() || response.cci.error() { - error!("Sending command complete ack failed: {:?}", response.cci); - } else { - info!("Sending command complete ack successful"); - } - - info!("Connecting sink on port 0"); - state[0].connect_sink(CAPABILITY, false).await; - info!("Connecting sink on port 1"); - state[1].connect_sink(CAPABILITY, false).await; - - // Ensure connect flow has time to complete - embassy_time::Timer::after_millis(1000).await; - - info!("Port 0: Get connector status..."); - let response: UcsiResponseResult = context - .execute_ucsi_command_external(Command::LpmCommand(lpm::GlobalCommand::new( - GlobalPortId(0), - lpm::CommandData::GetConnectorStatus, - ))) - .await - .into(); - let response = response.unwrap(); - if !response.cci.cmd_complete() || response.cci.error() { - error!("Get connector status failed: {:?}", response.cci); - } else { - info!( - "Get connector status successful, connector change: {:?}", - response.cci.connector_change() - ); - } - - info!("Sending command complete ack..."); - let response: UcsiResponseResult = context - .execute_ucsi_command_external(Command::PpmCommand(ppm::Command::AckCcCi(ppm::ack_cc_ci::Args { - ack: *Ack::default().set_command_complete(true).set_connector_change(true), - }))) - .await - .into(); - let response = response.unwrap(); - if !response.cci.ack_command() || response.cci.error() { - error!("Sending command complete ack failed: {:?}", response.cci); - } else { - info!( - "Sending command complete ack successful, connector change: {:?}", - response.cci.connector_change() - ); - } - - info!("Port 1: Get connector status..."); - let response: UcsiResponseResult = context - .execute_ucsi_command_external(Command::LpmCommand(lpm::GlobalCommand::new( - GlobalPortId(1), - lpm::CommandData::GetConnectorStatus, - ))) - .await - .into(); - let response = response.unwrap(); - if !response.cci.cmd_complete() || response.cci.error() { - error!("Get connector status failed: {:?}", response.cci); - } else { - info!( - "Get connector status successful, connector change: {:?}", - response.cci.connector_change() - ); - } - - info!("Sending command complete ack..."); - let response: UcsiResponseResult = context - .execute_ucsi_command_external(Command::PpmCommand(ppm::Command::AckCcCi(ppm::ack_cc_ci::Args { - ack: *Ack::default().set_command_complete(true).set_connector_change(true), - }))) - .await - .into(); - let response = response.unwrap(); - if !response.cci.ack_command() || response.cci.error() { - error!("Sending command complete ack failed: {:?}", response.cci); - } else { - info!( - "Sending command complete ack successful, connector change: {:?}", - response.cci.connector_change() - ); - }*/ + // ... rest of opm_task remains the same ... } #[embassy_executor::task(pool_size = 2)] @@ -194,7 +82,7 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static Service<'static, DeviceType>, + service: &'static ServiceType, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { @@ -306,9 +194,20 @@ async fn task(spawner: Spawner) { static POWER_SERVICE_CONTEXT: StaticCell = StaticCell::new(); let power_service_context = POWER_SERVICE_CONTEXT.init(power_policy_service::service::context::Context::new()); + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot + static POWER_POLICY_CHANNEL: StaticCell< + PubSubChannel, + > = StaticCell::new(); + + let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); + let power_policy_sender: PowerPolicySenderType = + MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into()); + // Guaranteed to not panic since we initialized the channel above + let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); + let power_policy_registration = ArrayRegistration { psus: [&wrapper0.ports[0].proxy, &wrapper1.ports[0].proxy], - service_senders: [NoopSender], + service_senders: [power_policy_sender], }; static POWER_SERVICE: StaticCell = StaticCell::new(); @@ -319,17 +218,7 @@ async fn task(spawner: Spawner) { ))); // Create type-c service - // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot - static POWER_POLICY_CHANNEL: StaticCell< - PubSubChannel, 4, 1, 0>, - > = StaticCell::new(); - - let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); - let power_policy_publisher = power_policy_channel.dyn_immediate_publisher(); - // Guaranteed to not panic since we initialized the channel above - let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); - - static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + static TYPE_C_SERVICE: StaticCell = StaticCell::new(); let type_c_service = TYPE_C_SERVICE.init(Service::create( Config { ucsi_capabilities: UcsiCapabilities { @@ -356,7 +245,6 @@ async fn task(spawner: Spawner) { ..Default::default() }, controller_context, - power_policy_publisher, power_policy_subscriber, )); diff --git a/examples/std/src/bin/type_c/unconstrained.rs b/examples/std/src/bin/type_c/unconstrained.rs index d207fcf5..0044e07e 100644 --- a/examples/std/src/bin/type_c/unconstrained.rs +++ b/examples/std/src/bin/type_c/unconstrained.rs @@ -6,10 +6,10 @@ use embassy_sync::channel::DynamicReceiver; use embassy_sync::channel::DynamicSender; use embassy_sync::mutex::Mutex; use embassy_sync::once_lock::OnceLock; -use embassy_sync::pubsub::PubSubChannel; +use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel}; use embassy_time::Timer; use embedded_services::GlobalRawMutex; -use embedded_services::event::NoopSender; +use embedded_services::event::MapSender; use embedded_usb_pd::GlobalPortId; use log::*; use power_policy_interface::capability::PowerCapability; @@ -41,11 +41,25 @@ const DELAY_MS: u64 = 1000; type DeviceType = Mutex>; +type PowerPolicySenderType = MapSender< + power_policy_interface::service::event::Event<'static, DeviceType>, + power_policy_interface::service::event::EventData, + DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>, + fn( + power_policy_interface::service::event::Event<'static, DeviceType>, + ) -> power_policy_interface::service::event::EventData, +>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, - power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 3, NoopSender, 1>>, + power_policy_service::service::Service< + 'static, + ArrayRegistration<'static, DeviceType, 3, PowerPolicySenderType, 1>, + >, >; +type ServiceType = Service<'static, DynSubscriber<'static, power_policy_interface::service::event::EventData>>; + #[embassy_executor::task(pool_size = 3)] async fn controller_task(wrapper: &'static mock_controller::Wrapper<'static>) { loop { @@ -174,13 +188,24 @@ async fn task(spawner: Spawner) { crate::mock_controller::Validator, )); + // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot + static POWER_POLICY_CHANNEL: StaticCell< + PubSubChannel, + > = StaticCell::new(); + + let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); + let power_policy_sender: PowerPolicySenderType = + MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into()); + // Guaranteed to not panic since we initialized the channel above + let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); + let power_policy_registration = ArrayRegistration { psus: [ &wrapper0.ports[0].proxy, &wrapper1.ports[0].proxy, &wrapper2.ports[0].proxy, ], - service_senders: [NoopSender], + service_senders: [power_policy_sender], }; static POWER_SERVICE: StaticCell = StaticCell::new(); @@ -191,21 +216,10 @@ async fn task(spawner: Spawner) { ))); // Create type-c service - // The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot - static POWER_POLICY_CHANNEL: StaticCell< - PubSubChannel, 4, 1, 0>, - > = StaticCell::new(); - - let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new()); - let power_policy_publisher = power_policy_channel.dyn_immediate_publisher(); - // Guaranteed to not panic since we initialized the channel above - let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap(); - - static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + static TYPE_C_SERVICE: StaticCell = StaticCell::new(); let type_c_service = TYPE_C_SERVICE.init(Service::create( Default::default(), controller_context, - power_policy_publisher, power_policy_subscriber, )); @@ -293,7 +307,7 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static Service<'static, DeviceType>, + service: &'static ServiceType, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { diff --git a/power-policy-interface/src/service/event.rs b/power-policy-interface/src/service/event.rs index 40874119..41d7b319 100644 --- a/power-policy-interface/src/service/event.rs +++ b/power-policy-interface/src/service/event.rs @@ -11,7 +11,7 @@ use crate::{ /// This enum doesn't contain a reference to the device and is suitable /// for receivers that don't need to know which device triggered the event /// and allows for receivers that don't need to be generic over the device type. -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] pub enum EventData { /// Consumer disconnected diff --git a/type-c-service/src/service/mod.rs b/type-c-service/src/service/mod.rs index 4e209f88..aa037ec9 100644 --- a/type-c-service/src/service/mod.rs +++ b/type-c-service/src/service/mod.rs @@ -1,12 +1,9 @@ use embassy_futures::select::{Either, select}; -use embassy_sync::{ - mutex::Mutex, - pubsub::{DynImmediatePublisher, DynSubscriber}, -}; -use embedded_services::{GlobalRawMutex, debug, error, info, sync::Lockable, trace}; +use embassy_sync::mutex::Mutex; +use embedded_services::{GlobalRawMutex, debug, error, event::Receiver, info, trace}; use embedded_usb_pd::GlobalPortId; use embedded_usb_pd::PdError as Error; -use power_policy_interface::psu; +use power_policy_interface::service::event::EventData as PowerPolicyEventData; use crate::{PortEventStreamer, PortEventVariant}; use type_c_interface::port::event::{PortNotificationSingle, PortStatusChanged}; @@ -37,28 +34,15 @@ struct State { /// /// Constructing a Service is the first step in using the Type-C service. /// Arguments should be an initialized context -pub struct Service<'a, PSU: Lockable> -where - PSU::Inner: psu::Psu, -{ +pub struct Service<'a, PowerReceiver: Receiver> { /// Type-C context pub(crate) context: &'a type_c_interface::service::context::Context, /// Current state state: Mutex, /// Config config: config::Config, - /// Power policy event receiver - /// - /// This is the corresponding publisher to [`Self::power_policy_event_subscriber`], power policy events - /// will be buffered in the channel until they are brought into the event loop with the subscriber. - _power_policy_event_publisher: - embedded_services::broadcaster::immediate::Receiver<'a, power_policy_interface::service::event::Event<'a, PSU>>, /// Power policy event subscriber - /// - /// This is the corresponding subscriber to [`Self::power_policy_event_publisher`], needs to be a mutex because getting a message - /// from the channel requires mutable access. - power_policy_event_subscriber: - Mutex>>, + power_policy_event_subscriber: Mutex, } /// Power policy events @@ -86,22 +70,17 @@ pub enum Event { PowerPolicy(PowerPolicyEvent), } -impl<'a, PSU: Lockable> Service<'a, PSU> -where - PSU::Inner: psu::Psu, -{ +impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { /// Create a new service the given configuration pub fn create( config: config::Config, context: &'a type_c_interface::service::context::Context, - power_policy_publisher: DynImmediatePublisher<'a, power_policy_interface::service::event::Event<'a, PSU>>, - power_policy_subscriber: DynSubscriber<'a, power_policy_interface::service::event::Event<'a, PSU>>, + power_policy_subscriber: PowerReceiver, ) -> Self { Self { context, state: Mutex::new(State::default()), config, - _power_policy_event_publisher: power_policy_publisher.into(), power_policy_event_subscriber: Mutex::new(power_policy_subscriber), } } diff --git a/type-c-service/src/service/pd.rs b/type-c-service/src/service/pd.rs index b0d940ee..0c251e78 100644 --- a/type-c-service/src/service/pd.rs +++ b/type-c-service/src/service/pd.rs @@ -1,15 +1,12 @@ //! Power Delivery (PD) related functionality. -use embedded_services::sync::Lockable; +use embedded_services::event::Receiver; use embedded_usb_pd::{GlobalPortId, PdError, ado::Ado}; -use power_policy_interface::psu; +use power_policy_interface::service::event::EventData as PowerPolicyEventData; use super::Service; -impl<'a, PSU: Lockable> Service<'a, PSU> -where - PSU::Inner: psu::Psu, -{ +impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { /// Get the oldest unhandled PD alert for the given port. /// /// Returns [`None`] if no alerts are pending. diff --git a/type-c-service/src/service/port.rs b/type-c-service/src/service/port.rs index b1aa047e..3e26b210 100644 --- a/type-c-service/src/service/port.rs +++ b/type-c-service/src/service/port.rs @@ -1,10 +1,8 @@ use super::*; use crate::PortEventStreamer; +use power_policy_interface::service::event::EventData as PowerPolicyEventData; -impl<'a, PSU: Lockable> Service<'a, PSU> -where - PSU::Inner: psu::Psu, -{ +impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { /// Wait for port flags pub(super) async fn wait_port_flags(&self) -> PortEventStreamer { if let Some(ref streamer) = self.state.lock().await.port_event_streaming_state { diff --git a/type-c-service/src/service/power.rs b/type-c-service/src/service/power.rs index 68e0ec5c..833d4767 100644 --- a/type-c-service/src/service/power.rs +++ b/type-c-service/src/service/power.rs @@ -1,34 +1,25 @@ -use embassy_sync::pubsub::WaitResult; use power_policy_interface::service as power_policy; +use power_policy_interface::service::event::EventData as PowerPolicyEventData; use super::*; -impl<'a, PSU: Lockable> Service<'a, PSU> -where - PSU::Inner: psu::Psu, -{ +impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { /// Wait for a power policy event pub(super) async fn wait_power_policy_event(&self) -> Event { loop { - match self.power_policy_event_subscriber.lock().await.next_message().await { - WaitResult::Lagged(lagged) => { - // Missed some messages, all we can do is log an error - error!("Power policy {} event(s) lagged", lagged); + match self.power_policy_event_subscriber.lock().await.wait_next().await { + power_policy_interface::service::event::EventData::Unconstrained(state) => { + return Event::PowerPolicy(PowerPolicyEvent::Unconstrained(state)); + } + power_policy_interface::service::event::EventData::ConsumerDisconnected => { + return Event::PowerPolicy(PowerPolicyEvent::ConsumerDisconnected); + } + power_policy_interface::service::event::EventData::ConsumerConnected(_) => { + return Event::PowerPolicy(PowerPolicyEvent::ConsumerConnected); + } + _ => { + // No other events currently implemented } - WaitResult::Message(message) => match message { - power_policy_interface::service::event::Event::Unconstrained(state) => { - return Event::PowerPolicy(PowerPolicyEvent::Unconstrained(state)); - } - power_policy_interface::service::event::Event::ConsumerDisconnected(_) => { - return Event::PowerPolicy(PowerPolicyEvent::ConsumerDisconnected); - } - power_policy_interface::service::event::Event::ConsumerConnected(_, _) => { - return Event::PowerPolicy(PowerPolicyEvent::ConsumerConnected); - } - _ => { - // No other events currently implemented - } - }, } } } diff --git a/type-c-service/src/service/ucsi.rs b/type-c-service/src/service/ucsi.rs index ed3d2108..0da530a2 100644 --- a/type-c-service/src/service/ucsi.rs +++ b/type-c-service/src/service/ucsi.rs @@ -7,6 +7,7 @@ use embedded_usb_pd::ucsi::ppm::state_machine::{ }; use embedded_usb_pd::ucsi::{GlobalCommand, ResponseData, lpm, ppm}; use embedded_usb_pd::{PdError, PowerRole}; +use power_policy_interface::service::event::EventData as PowerPolicyEventData; use type_c_interface::service::event::{Event, UsciChangeIndicator}; use super::*; @@ -41,10 +42,7 @@ pub(super) struct State { pub(super) psu_connected: bool, } -impl<'a, PSU: Lockable> Service<'a, PSU> -where - PSU::Inner: psu::Psu, -{ +impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { /// PPM reset implementation fn process_ppm_reset(&self, state: &mut State) { debug!("Resetting PPM"); diff --git a/type-c-service/src/service/vdm.rs b/type-c-service/src/service/vdm.rs index ed2cfc15..6a50ee33 100644 --- a/type-c-service/src/service/vdm.rs +++ b/type-c-service/src/service/vdm.rs @@ -1,16 +1,13 @@ //! VDM (Vendor Defined Messages) related functionality. -use embedded_services::sync::Lockable; +use embedded_services::event::Receiver; use embedded_usb_pd::{GlobalPortId, PdError}; -use power_policy_interface::psu; +use power_policy_interface::service::event::EventData as PowerPolicyEventData; use type_c_interface::port::{AttnVdm, OtherVdm}; use super::Service; -impl Service<'_, PSU> -where - PSU::Inner: psu::Psu, -{ +impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { /// Get the other vdm for the given port pub async fn get_other_vdm(&self, port_id: GlobalPortId) -> Result { self.context.get_other_vdm(port_id).await diff --git a/type-c-service/src/task.rs b/type-c-service/src/task.rs index cdc999e2..84f30b61 100644 --- a/type-c-service/src/task.rs +++ b/type-c-service/src/task.rs @@ -1,7 +1,11 @@ use core::future::Future; -use embedded_services::{error, event, info, sync::Lockable}; - -use power_policy_interface::psu; +use embedded_services::{ + error, + event::{self, Receiver}, + info, + sync::Lockable, +}; +use power_policy_interface::service::event::EventData as PowerPolicyEventData; use crate::{service::Service, wrapper::ControllerWrapper}; @@ -10,21 +14,20 @@ pub async fn task_closure< 'a, M, D, - PSU: Lockable, S, V, + PowerReceiver: Receiver, Fut: Future, - F: Fn(&'a Service<'a, PSU>) -> Fut, + F: Fn(&'a Service<'a, PowerReceiver>) -> Fut, const N: usize, >( - service: &'static Service<'a, PSU>, + service: &'static Service<'a, PowerReceiver>, wrappers: [&'a ControllerWrapper<'a, M, D, S, V>; N], cfu_client: &'a cfu_service::CfuClient, f: F, ) where M: embassy_sync::blocking_mutex::raw::RawMutex, D: Lockable, - PSU::Inner: psu::Psu, S: event::Sender, V: crate::wrapper::FwOfferValidator, D::Inner: type_c_interface::port::Controller, @@ -47,22 +50,26 @@ pub async fn task_closure< } /// Task to run the Type-C service, running the default event loop -pub async fn task<'a, M, D, PSU: Lockable, S, V, const N: usize>( - service: &'static Service<'a, PSU>, +pub async fn task<'a, M, D, S, V, PowerReceiver: Receiver, const N: usize>( + service: &'static Service<'a, PowerReceiver>, wrappers: [&'a ControllerWrapper<'a, M, D, S, V>; N], cfu_client: &'a cfu_service::CfuClient, ) where M: embassy_sync::blocking_mutex::raw::RawMutex, D: embedded_services::sync::Lockable, - PSU::Inner: psu::Psu, S: event::Sender, V: crate::wrapper::FwOfferValidator, ::Inner: type_c_interface::port::Controller, { - task_closure(service, wrappers, cfu_client, |service: &Service<'_, PSU>| async { - if let Err(e) = service.process_next_event().await { - error!("Type-C service processing error: {:#?}", e); - } - }) + task_closure( + service, + wrappers, + cfu_client, + |service: &Service<'_, PowerReceiver>| async { + if let Err(e) = service.process_next_event().await { + error!("Type-C service processing error: {:#?}", e); + } + }, + ) .await; } From 18d4a4fad8a59518a2d143d572b26cc5cbf02e6e Mon Sep 17 00:00:00 2001 From: Robert Zieba Date: Fri, 27 Mar 2026 13:54:19 -0700 Subject: [PATCH 2/3] type-c-service: Remove interior mutability from service --- embedded-service/src/event.rs | 8 +- examples/rt685s-evk/src/bin/type_c.rs | 26 ++-- examples/rt685s-evk/src/bin/type_c_cfu.rs | 26 ++-- examples/std/src/bin/type_c/service.rs | 26 ++-- examples/std/src/bin/type_c/ucsi.rs | 25 +-- examples/std/src/bin/type_c/unconstrained.rs | 24 +-- type-c-service/src/service/mod.rs | 153 +++++++++++++------ type-c-service/src/service/pd.rs | 4 +- type-c-service/src/service/port.rs | 18 --- type-c-service/src/service/power.rs | 44 ++---- type-c-service/src/service/ucsi.rs | 113 +++++++------- type-c-service/src/service/vdm.rs | 4 +- type-c-service/src/task.rs | 72 +++------ 13 files changed, 276 insertions(+), 267 deletions(-) delete mode 100644 type-c-service/src/service/port.rs diff --git a/embedded-service/src/event.rs b/embedded-service/src/event.rs index f0c4bf6f..ec02e7c5 100644 --- a/embedded-service/src/event.rs +++ b/embedded-service/src/event.rs @@ -73,8 +73,12 @@ impl Receiver for DynSubscriber<'_, E> { async fn wait_next(&mut self) -> E { loop { - if let WaitResult::Message(e) = self.next_message().await { - return e; + match self.next_message().await { + WaitResult::Message(e) => return e, + WaitResult::Lagged(e) => { + error!("Subscriber lagged, skipping {} events", e); + continue; + } } } } diff --git a/examples/rt685s-evk/src/bin/type_c.rs b/examples/rt685s-evk/src/bin/type_c.rs index f2895bc2..3b94eec7 100644 --- a/examples/rt685s-evk/src/bin/type_c.rs +++ b/examples/rt685s-evk/src/bin/type_c.rs @@ -26,7 +26,7 @@ use static_cell::StaticCell; use tps6699x::asynchronous::embassy as tps6699x; use type_c_interface::port::ControllerId; use type_c_service::driver::tps6699x::{self as tps6699x_drv}; -use type_c_service::service::Service; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::wrapper::ControllerWrapper; use type_c_service::wrapper::backing::{IntermediateStorage, ReferencedStorage, Storage}; use type_c_service::wrapper::proxy::PowerProxyDevice; @@ -75,6 +75,8 @@ type PowerPolicySenderType = MapSender< ) -> power_policy_interface::service::event::EventData, >; +type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, power_policy_service::service::Service< @@ -83,7 +85,7 @@ type PowerPolicyServiceType = Mutex< >, >; -type ServiceType = Service<'static, DynSubscriber<'static, power_policy_interface::service::event::EventData>>; +type ServiceType = Service<'static>; #[embassy_executor::task] async fn pd_controller_task(controller: &'static Wrapper<'static>) { @@ -109,12 +111,13 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static ServiceType, + service: &'static Mutex, + event_receiver: EventReceiver<'static, PowerPolicyReceiverType>, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { info!("Starting type-c task"); - type_c_service::task::task(service, wrappers, cfu_client).await; + type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await; } #[embassy_executor::main] @@ -235,19 +238,20 @@ async fn main(spawner: Spawner) { power_policy_service::service::config::Config::default(), ))); - static TYPE_C_SERVICE: StaticCell = StaticCell::new(); - let type_c_service = TYPE_C_SERVICE.init(Service::create( - Default::default(), - controller_context, - power_policy_subscriber, - )); + static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create(Default::default(), controller_context))); // Spin up CFU service static CFU_CLIENT: OnceLock = OnceLock::new(); let cfu_client = CfuClient::new(&CFU_CLIENT).await; info!("Spawining type-c service task"); - spawner.must_spawn(type_c_service_task(type_c_service, [wrapper], cfu_client)); + spawner.must_spawn(type_c_service_task( + type_c_service, + EventReceiver::new(controller_context, power_policy_subscriber), + [wrapper], + cfu_client, + )); info!("Spawining power policy task"); spawner.must_spawn(power_policy_task( diff --git a/examples/rt685s-evk/src/bin/type_c_cfu.rs b/examples/rt685s-evk/src/bin/type_c_cfu.rs index da4322d1..9e4c9da2 100644 --- a/examples/rt685s-evk/src/bin/type_c_cfu.rs +++ b/examples/rt685s-evk/src/bin/type_c_cfu.rs @@ -29,7 +29,7 @@ use static_cell::StaticCell; use tps6699x::asynchronous::embassy as tps6699x; use type_c_interface::port::ControllerId; use type_c_service::driver::tps6699x::{self as tps6699x_drv}; -use type_c_service::service::Service; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::wrapper::ControllerWrapper; use type_c_service::wrapper::backing::{IntermediateStorage, ReferencedStorage, Storage}; use type_c_service::wrapper::proxy::PowerProxyDevice; @@ -73,6 +73,8 @@ type PowerPolicySenderType = MapSender< ) -> power_policy_interface::service::event::EventData, >; +type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, power_policy_service::service::Service< @@ -81,7 +83,7 @@ type PowerPolicyServiceType = Mutex< >, >; -type ServiceType = Service<'static, DynSubscriber<'static, power_policy_interface::service::event::EventData>>; +type ServiceType = Service<'static>; const NUM_PD_CONTROLLERS: usize = 1; const CONTROLLER0_ID: ControllerId = ControllerId(0); @@ -193,12 +195,13 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static ServiceType, + service: &'static Mutex, + event_receiver: EventReceiver<'static, PowerPolicyReceiverType>, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { info!("Starting type-c task"); - type_c_service::task::task(service, wrappers, cfu_client).await; + type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await; } #[embassy_executor::main] @@ -323,19 +326,20 @@ async fn main(spawner: Spawner) { power_policy_service::service::config::Config::default(), ))); - static TYPE_C_SERVICE: StaticCell = StaticCell::new(); - let type_c_service = TYPE_C_SERVICE.init(Service::create( - Default::default(), - controller_context, - power_policy_subscriber, - )); + static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create(Default::default(), controller_context))); // Spin up CFU service static CFU_CLIENT: OnceLock = OnceLock::new(); let cfu_client = CfuClient::new(&CFU_CLIENT).await; info!("Spawining type-c service task"); - spawner.must_spawn(type_c_service_task(type_c_service, [wrapper], cfu_client)); + spawner.must_spawn(type_c_service_task( + type_c_service, + EventReceiver::new(controller_context, power_policy_subscriber), + [wrapper], + cfu_client, + )); info!("Spawining power policy task"); spawner.must_spawn(power_policy_task( diff --git a/examples/std/src/bin/type_c/service.rs b/examples/std/src/bin/type_c/service.rs index eb5c5349..346ee26a 100644 --- a/examples/std/src/bin/type_c/service.rs +++ b/examples/std/src/bin/type_c/service.rs @@ -19,8 +19,8 @@ use std_examples::type_c::mock_controller; use std_examples::type_c::mock_controller::Wrapper; use type_c_interface::port::ControllerId; use type_c_interface::service::context::Context; -use type_c_service::service::Service; use type_c_service::service::config::Config; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::util::power_capability_from_current; use type_c_service::wrapper::backing::Storage; use type_c_service::wrapper::message::*; @@ -42,6 +42,8 @@ type PowerPolicySenderType = MapSender< ) -> power_policy_interface::service::event::EventData, >; +type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, power_policy_service::service::Service< @@ -50,7 +52,7 @@ type PowerPolicyServiceType = Mutex< >, >; -type ServiceType = Service<'static, DynSubscriber<'static, power_policy_interface::service::event::EventData>>; +type ServiceType = Service<'static>; #[embassy_executor::task] async fn controller_task( @@ -118,12 +120,8 @@ async fn task(spawner: Spawner) { power_policy_service::service::config::Config::default(), ))); - static TYPE_C_SERVICE: StaticCell = StaticCell::new(); - let type_c_service = TYPE_C_SERVICE.init(Service::create( - Config::default(), - controller_context, - power_policy_subscriber, - )); + static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create(Config::default(), controller_context))); // Spin up CFU service static CFU_CLIENT: OnceLock = OnceLock::new(); @@ -133,7 +131,12 @@ async fn task(spawner: Spawner) { ArrayEventReceivers::new([&wrapper.ports[0].proxy], [policy_receiver]), power_service, )); - spawner.must_spawn(type_c_service_task(type_c_service, [wrapper], cfu_client)); + spawner.must_spawn(type_c_service_task( + type_c_service, + EventReceiver::new(controller_context, power_policy_subscriber), + [wrapper], + cfu_client, + )); spawner.must_spawn(controller_task(wrapper, controller)); Timer::after_millis(1000).await; @@ -170,12 +173,13 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static ServiceType, + service: &'static Mutex, + event_receiver: EventReceiver<'static, PowerPolicyReceiverType>, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { info!("Starting type-c task"); - type_c_service::task::task(service, wrappers, cfu_client).await; + type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await; } fn create_wrapper( diff --git a/examples/std/src/bin/type_c/ucsi.rs b/examples/std/src/bin/type_c/ucsi.rs index bdee8ce6..bdb8a9a6 100644 --- a/examples/std/src/bin/type_c/ucsi.rs +++ b/examples/std/src/bin/type_c/ucsi.rs @@ -24,7 +24,7 @@ use static_cell::StaticCell; use std_examples::type_c::mock_controller; use type_c_interface::port::ControllerId; use type_c_interface::service::context::Context; -use type_c_service::service::Service; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::service::config::Config; use type_c_service::wrapper::backing::Storage; use type_c_service::wrapper::proxy::PowerProxyDevice; @@ -48,6 +48,8 @@ type PowerPolicySenderType = MapSender< ) -> power_policy_interface::service::event::EventData, >; +type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, power_policy_service::service::Service< @@ -56,7 +58,7 @@ type PowerPolicyServiceType = Mutex< >, >; -type ServiceType = Service<'static, DynSubscriber<'static, power_policy_interface::service::event::EventData>>; +type ServiceType = Service<'static>; #[embassy_executor::task] async fn opm_task(_context: &'static Context, _state: [&'static mock_controller::ControllerState; NUM_PD_CONTROLLERS]) { @@ -82,12 +84,13 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static ServiceType, + service: &'static Mutex, + event_receiver: EventReceiver<'static, PowerPolicyReceiverType>, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { info!("Starting type-c task"); - type_c_service::task::task(service, wrappers, cfu_client).await; + type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await; } #[embassy_executor::task] @@ -218,8 +221,8 @@ async fn task(spawner: Spawner) { ))); // Create type-c service - static TYPE_C_SERVICE: StaticCell = StaticCell::new(); - let type_c_service = TYPE_C_SERVICE.init(Service::create( + static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create( Config { ucsi_capabilities: UcsiCapabilities { num_connectors: 2, @@ -245,8 +248,7 @@ async fn task(spawner: Spawner) { ..Default::default() }, controller_context, - power_policy_subscriber, - )); + ))); // Spin up CFU service static CFU_CLIENT: OnceLock = OnceLock::new(); @@ -260,7 +262,12 @@ async fn task(spawner: Spawner) { power_service, )); - spawner.must_spawn(type_c_service_task(type_c_service, [wrapper0, wrapper1], cfu_client)); + spawner.must_spawn(type_c_service_task( + type_c_service, + EventReceiver::new(controller_context, power_policy_subscriber), + [wrapper0, wrapper1], + cfu_client, + )); spawner.must_spawn(wrapper_task(wrapper0)); spawner.must_spawn(wrapper_task(wrapper1)); spawner.must_spawn(opm_task(controller_context, [state0, state1])); diff --git a/examples/std/src/bin/type_c/unconstrained.rs b/examples/std/src/bin/type_c/unconstrained.rs index 0044e07e..4ce1ddb6 100644 --- a/examples/std/src/bin/type_c/unconstrained.rs +++ b/examples/std/src/bin/type_c/unconstrained.rs @@ -19,12 +19,12 @@ use power_policy_service::service::registration::ArrayRegistration; use static_cell::StaticCell; use std_examples::type_c::mock_controller; use type_c_interface::port::ControllerId; -use type_c_service::service::Service; - -const NUM_PD_CONTROLLERS: usize = 3; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::wrapper::backing::{IntermediateStorage, ReferencedStorage, Storage}; use type_c_service::wrapper::proxy::PowerProxyDevice; +const NUM_PD_CONTROLLERS: usize = 3; + const CONTROLLER0_ID: ControllerId = ControllerId(0); const PORT0_ID: GlobalPortId = GlobalPortId(0); const CFU0_ID: u8 = 0x00; @@ -50,6 +50,8 @@ type PowerPolicySenderType = MapSender< ) -> power_policy_interface::service::event::EventData, >; +type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>; + type PowerPolicyServiceType = Mutex< GlobalRawMutex, power_policy_service::service::Service< @@ -58,7 +60,7 @@ type PowerPolicyServiceType = Mutex< >, >; -type ServiceType = Service<'static, DynSubscriber<'static, power_policy_interface::service::event::EventData>>; +type ServiceType = Service<'static>; #[embassy_executor::task(pool_size = 3)] async fn controller_task(wrapper: &'static mock_controller::Wrapper<'static>) { @@ -216,12 +218,8 @@ async fn task(spawner: Spawner) { ))); // Create type-c service - static TYPE_C_SERVICE: StaticCell = StaticCell::new(); - let type_c_service = TYPE_C_SERVICE.init(Service::create( - Default::default(), - controller_context, - power_policy_subscriber, - )); + static TYPE_C_SERVICE: StaticCell> = StaticCell::new(); + let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create(Default::default(), controller_context))); // Spin up CFU service static CFU_CLIENT: OnceLock = OnceLock::new(); @@ -240,6 +238,7 @@ async fn task(spawner: Spawner) { )); spawner.must_spawn(type_c_service_task( type_c_service, + EventReceiver::new(controller_context, power_policy_subscriber), [wrapper0, wrapper1, wrapper2], cfu_client, )); @@ -307,12 +306,13 @@ async fn power_policy_task( #[embassy_executor::task] async fn type_c_service_task( - service: &'static ServiceType, + service: &'static Mutex, + event_receiver: EventReceiver<'static, PowerPolicyReceiverType>, wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS], cfu_client: &'static CfuClient, ) { info!("Starting type-c task"); - type_c_service::task::task(service, wrappers, cfu_client).await; + type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await; } fn main() { diff --git a/type-c-service/src/service/mod.rs b/type-c-service/src/service/mod.rs index aa037ec9..d2e4b5f6 100644 --- a/type-c-service/src/service/mod.rs +++ b/type-c-service/src/service/mod.rs @@ -1,6 +1,8 @@ +use core::cell::RefCell; +use core::future::pending; + use embassy_futures::select::{Either, select}; -use embassy_sync::mutex::Mutex; -use embedded_services::{GlobalRawMutex, debug, error, event::Receiver, info, trace}; +use embedded_services::{debug, error, event::Receiver, info, trace}; use embedded_usb_pd::GlobalPortId; use embedded_usb_pd::PdError as Error; use power_policy_interface::service::event::EventData as PowerPolicyEventData; @@ -12,7 +14,6 @@ use type_c_interface::service::event; pub mod config; pub mod pd; -mod port; mod power; mod ucsi; pub mod vdm; @@ -24,8 +25,6 @@ const MAX_SUPPORTED_PORTS: usize = 4; struct State { /// Current port status port_status: [PortStatus; MAX_SUPPORTED_PORTS], - /// Next port to check, this is used to round-robin through ports - port_event_streaming_state: Option, /// UCSI state ucsi: ucsi::State, } @@ -34,15 +33,13 @@ struct State { /// /// Constructing a Service is the first step in using the Type-C service. /// Arguments should be an initialized context -pub struct Service<'a, PowerReceiver: Receiver> { +pub struct Service<'a> { /// Type-C context pub(crate) context: &'a type_c_interface::service::context::Context, /// Current state - state: Mutex, + state: State, /// Config config: config::Config, - /// Power policy event subscriber - power_policy_event_subscriber: Mutex, } /// Power policy events @@ -70,31 +67,29 @@ pub enum Event { PowerPolicy(PowerPolicyEvent), } -impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { +impl<'a> Service<'a> { /// Create a new service the given configuration - pub fn create( - config: config::Config, - context: &'a type_c_interface::service::context::Context, - power_policy_subscriber: PowerReceiver, - ) -> Self { + pub fn create(config: config::Config, context: &'a type_c_interface::service::context::Context) -> Self { Self { context, - state: Mutex::new(State::default()), + state: State::default(), config, - power_policy_event_subscriber: Mutex::new(power_policy_subscriber), } } /// Get the cached port status - pub async fn get_cached_port_status(&self, port_id: GlobalPortId) -> Result { - let state = self.state.lock().await; - Ok(*state.port_status.get(port_id.0 as usize).ok_or(Error::InvalidPort)?) + pub fn get_cached_port_status(&self, port_id: GlobalPortId) -> Result { + Ok(*self + .state + .port_status + .get(port_id.0 as usize) + .ok_or(Error::InvalidPort)?) } /// Set the cached port status - async fn set_cached_port_status(&self, port_id: GlobalPortId, status: PortStatus) -> Result<(), Error> { - let mut state = self.state.lock().await; - *state + fn set_cached_port_status(&mut self, port_id: GlobalPortId, status: PortStatus) -> Result<(), Error> { + *self + .state .port_status .get_mut(port_id.0 as usize) .ok_or(Error::InvalidPort)? = status; @@ -103,12 +98,12 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive /// Process events for a specific port async fn process_port_event( - &self, + &mut self, port_id: GlobalPortId, event: PortStatusChanged, status: PortStatus, ) -> Result<(), Error> { - let old_status = self.get_cached_port_status(port_id).await?; + let old_status = self.get_cached_port_status(port_id)?; debug!("Port{}: Event: {:#?}", port_id.0, event); debug!("Port{} Previous status: {:#?}", port_id.0, old_status); @@ -131,14 +126,59 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive .await; } - self.set_cached_port_status(port_id, status).await?; + self.set_cached_port_status(port_id, status)?; self.handle_ucsi_port_event(port_id, event, &status).await; Ok(()) } + /// Process the given event + pub async fn process_event(&mut self, event: Event) -> Result<(), Error> { + match event { + Event::PortStatusChanged(port, event_kind, status) => { + trace!("Port{}: Processing port status changed", port.0); + self.process_port_event(port, event_kind, status).await + } + Event::PortNotification(port, notification) => { + // Other port notifications + info!("Port{}: Got port notification: {:?}", port.0, notification); + Ok(()) + } + Event::PowerPolicy(event) => { + trace!("Processing power policy event"); + self.process_power_policy_event(&event).await + } + } + } +} + +/// Event receiver for the Type-C service +pub struct EventReceiver<'a, PowerReceiver: Receiver> { + /// Type-C context + pub(crate) context: &'a type_c_interface::service::context::Context, + /// Next port to check, this is used to round-robin through ports + port_event_streaming_state: Option, + /// Power policy event subscriber + /// + /// Used to allow partial borrows of Self for the call to select + power_policy_event_subscriber: RefCell, +} + +impl<'a, PowerReceiver: Receiver> EventReceiver<'a, PowerReceiver> { + /// Create a new event receiver + pub fn new( + context: &'a type_c_interface::service::context::Context, + power_policy_event_subscriber: PowerReceiver, + ) -> Self { + Self { + context, + port_event_streaming_state: None, + power_policy_event_subscriber: RefCell::new(power_policy_event_subscriber), + } + } + /// Wait for the next event - pub async fn wait_next(&self) -> Result { + pub async fn wait_next(&mut self) -> Result { loop { match select(self.wait_port_flags(), self.wait_power_policy_event()).await { Either::First(mut stream) => { @@ -147,7 +187,7 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive .await? { let port_id = GlobalPortId(port_id as u8); - self.state.lock().await.port_event_streaming_state = Some(stream); + self.port_event_streaming_state = Some(stream); match event { PortEventVariant::StatusChanged(status_event) => { // Return a port status changed event @@ -161,7 +201,7 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive } } } else { - self.state.lock().await.port_event_streaming_state = None; + self.port_event_streaming_state = None; } } Either::Second(event) => return Ok(event), @@ -169,28 +209,43 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive } } - /// Process the given event - pub async fn process_event(&self, event: Event) -> Result<(), Error> { - match event { - Event::PortStatusChanged(port, event_kind, status) => { - trace!("Port{}: Processing port status changed", port.0); - self.process_port_event(port, event_kind, status).await - } - Event::PortNotification(port, notification) => { - // Other port notifications - info!("Port{}: Got port notification: {:?}", port.0, notification); - Ok(()) - } - Event::PowerPolicy(event) => { - trace!("Processing power policy event"); - self.process_power_policy_event(&event).await - } + /// Wait for port flags + async fn wait_port_flags(&self) -> PortEventStreamer { + if let Some(ref streamer) = self.port_event_streaming_state { + // If we have an existing iterator, return it + // Yield first to prevent starving other tasks + embassy_futures::yield_now().await; + *streamer + } else { + // Wait for the next port event and create a streamer + PortEventStreamer::new(self.context.get_unhandled_events().await.into_iter()) } } - /// Combined processing function - pub async fn process_next_event(&self) -> Result<(), Error> { - let event = self.wait_next().await?; - self.process_event(event).await + /// Wait for a power policy event + #[allow(clippy::await_holding_refcell_ref)] + async fn wait_power_policy_event(&self) -> Event { + let Ok(mut subscriber) = self.power_policy_event_subscriber.try_borrow_mut() else { + // This should never happen because this function is not public and is only called from wait_next, which takes &mut self + error!("Attempt to call `wait_power_policy_event` simultaneously"); + return pending().await; + }; + + loop { + match subscriber.wait_next().await { + power_policy_interface::service::event::EventData::Unconstrained(state) => { + return Event::PowerPolicy(PowerPolicyEvent::Unconstrained(state)); + } + power_policy_interface::service::event::EventData::ConsumerDisconnected => { + return Event::PowerPolicy(PowerPolicyEvent::ConsumerDisconnected); + } + power_policy_interface::service::event::EventData::ConsumerConnected(_) => { + return Event::PowerPolicy(PowerPolicyEvent::ConsumerConnected); + } + _ => { + // No other events currently implemented + } + } + } } } diff --git a/type-c-service/src/service/pd.rs b/type-c-service/src/service/pd.rs index 0c251e78..21934fa7 100644 --- a/type-c-service/src/service/pd.rs +++ b/type-c-service/src/service/pd.rs @@ -1,12 +1,10 @@ //! Power Delivery (PD) related functionality. -use embedded_services::event::Receiver; use embedded_usb_pd::{GlobalPortId, PdError, ado::Ado}; -use power_policy_interface::service::event::EventData as PowerPolicyEventData; use super::Service; -impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { +impl Service<'_> { /// Get the oldest unhandled PD alert for the given port. /// /// Returns [`None`] if no alerts are pending. diff --git a/type-c-service/src/service/port.rs b/type-c-service/src/service/port.rs deleted file mode 100644 index 3e26b210..00000000 --- a/type-c-service/src/service/port.rs +++ /dev/null @@ -1,18 +0,0 @@ -use super::*; -use crate::PortEventStreamer; -use power_policy_interface::service::event::EventData as PowerPolicyEventData; - -impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { - /// Wait for port flags - pub(super) async fn wait_port_flags(&self) -> PortEventStreamer { - if let Some(ref streamer) = self.state.lock().await.port_event_streaming_state { - // If we have an existing iterator, return it - // Yield first to prevent starving other tasks - embassy_futures::yield_now().await; - *streamer - } else { - // Wait for the next port event and create a streamer - PortEventStreamer::new(self.context.get_unhandled_events().await.into_iter()) - } - } -} diff --git a/type-c-service/src/service/power.rs b/type-c-service/src/service/power.rs index 833d4767..4ec9e380 100644 --- a/type-c-service/src/service/power.rs +++ b/type-c-service/src/service/power.rs @@ -1,31 +1,10 @@ use power_policy_interface::service as power_policy; -use power_policy_interface::service::event::EventData as PowerPolicyEventData; use super::*; -impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { - /// Wait for a power policy event - pub(super) async fn wait_power_policy_event(&self) -> Event { - loop { - match self.power_policy_event_subscriber.lock().await.wait_next().await { - power_policy_interface::service::event::EventData::Unconstrained(state) => { - return Event::PowerPolicy(PowerPolicyEvent::Unconstrained(state)); - } - power_policy_interface::service::event::EventData::ConsumerDisconnected => { - return Event::PowerPolicy(PowerPolicyEvent::ConsumerDisconnected); - } - power_policy_interface::service::event::EventData::ConsumerConnected(_) => { - return Event::PowerPolicy(PowerPolicyEvent::ConsumerConnected); - } - _ => { - // No other events currently implemented - } - } - } - } - +impl Service<'_> { /// Set the unconstrained state for all ports - pub(super) async fn set_unconstrained_all(&self, unconstrained: bool) -> Result<(), Error> { + pub(super) async fn set_unconstrained_all(&mut self, unconstrained: bool) -> Result<(), Error> { for port_index in 0..self.context.get_num_ports() { self.context .set_unconstrained_power(GlobalPortId(port_index as u8), unconstrained) @@ -36,12 +15,10 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive /// Processed unconstrained state change pub(super) async fn process_unconstrained_state_change( - &self, + &mut self, unconstrained_state: &power_policy::UnconstrainedState, ) -> Result<(), Error> { if unconstrained_state.unconstrained { - let state = self.state.lock().await; - if unconstrained_state.available > 1 { // There are multiple available unconstrained consumers, set all ports to unconstrained // TODO: determine if we need to consider if we need to consider @@ -52,7 +29,8 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive } else { // Only one unconstrained device is present, see if that's one of our ports let num_ports = self.context.get_num_ports(); - let unconstrained_port = state + let unconstrained_port = self + .state .port_status .iter() .take(num_ports) @@ -88,21 +66,19 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive } /// Process power policy events - pub(super) async fn process_power_policy_event(&self, message: &PowerPolicyEvent) -> Result<(), Error> { + pub(super) async fn process_power_policy_event(&mut self, message: &PowerPolicyEvent) -> Result<(), Error> { match message { PowerPolicyEvent::Unconstrained(state) => self.process_unconstrained_state_change(state).await, PowerPolicyEvent::ConsumerDisconnected => { - let mut state = self.state.lock().await; - state.ucsi.psu_connected = false; + self.state.ucsi.psu_connected = false; // Notify OPM because this can affect battery charging capability status - self.pend_ucsi_connected_ports(&mut state).await; + self.pend_ucsi_connected_ports().await; Ok(()) } PowerPolicyEvent::ConsumerConnected => { - let mut state = self.state.lock().await; - state.ucsi.psu_connected = true; + self.state.ucsi.psu_connected = true; // Notify OPM because this can affect battery charging capability status - self.pend_ucsi_connected_ports(&mut state).await; + self.pend_ucsi_connected_ports().await; Ok(()) } } diff --git a/type-c-service/src/service/ucsi.rs b/type-c-service/src/service/ucsi.rs index 0da530a2..62749cc8 100644 --- a/type-c-service/src/service/ucsi.rs +++ b/type-c-service/src/service/ucsi.rs @@ -7,7 +7,6 @@ use embedded_usb_pd::ucsi::ppm::state_machine::{ }; use embedded_usb_pd::ucsi::{GlobalCommand, ResponseData, lpm, ppm}; use embedded_usb_pd::{PdError, PowerRole}; -use power_policy_interface::service::event::EventData as PowerPolicyEventData; use type_c_interface::service::event::{Event, UsciChangeIndicator}; use super::*; @@ -42,19 +41,19 @@ pub(super) struct State { pub(super) psu_connected: bool, } -impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { +impl Service<'_> { /// PPM reset implementation - fn process_ppm_reset(&self, state: &mut State) { + fn process_ppm_reset(&mut self) { debug!("Resetting PPM"); - state.notifications_enabled = NotificationEnable::default(); - state.pending_ports.clear(); - state.valid_battery_charging_capability.clear(); + self.state.ucsi.notifications_enabled = NotificationEnable::default(); + self.state.ucsi.pending_ports.clear(); + self.state.ucsi.valid_battery_charging_capability.clear(); } /// Set notification enable implementation - fn process_set_notification_enable(&self, state: &mut State, enable: NotificationEnable) { + fn process_set_notification_enable(&mut self, enable: NotificationEnable) { debug!("Set Notification Enable: {:?}", enable); - state.notifications_enabled = enable; + self.state.ucsi.notifications_enabled = enable; } /// PPM get capabilities implementation @@ -65,14 +64,10 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive ppm::ResponseData::GetCapability(capabilities) } - fn process_ppm_command( - &self, - state: &mut State, - command: &ucsi::ppm::Command, - ) -> Result, PdError> { + fn process_ppm_command(&mut self, command: &ucsi::ppm::Command) -> Result, PdError> { match command { ppm::Command::SetNotificationEnable(enable) => { - self.process_set_notification_enable(state, enable.notification_enable); + self.process_set_notification_enable(enable.notification_enable); Ok(None) } ppm::Command::GetCapability => Ok(Some(self.process_get_capabilities())), @@ -83,12 +78,11 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive /// Determine the battery charging capability status for the given port fn determine_battery_charging_capability_status( &self, - state: &mut State, port_id: GlobalPortId, port_status: &PortStatus, ) -> Option { if port_status.power_role == PowerRole::Sink { - if state.valid_battery_charging_capability.contains(&port_id) && !state.psu_connected { + if self.state.ucsi.valid_battery_charging_capability.contains(&port_id) && !self.state.ucsi.psu_connected { // Only run this logic when no PSU is attached to prevent excessive notifications // when new type-C PSUs are attached let power_mw = port_status @@ -108,8 +102,7 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive } async fn process_lpm_command( - &self, - state: &mut super::State, + &mut self, command: &ucsi::lpm::GlobalCommand, ) -> Result, PdError> { debug!("Processing LPM command: {:?}", command); @@ -135,9 +128,9 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive }))) = response { let raw_port = command.port().0 as usize; - let port_status = state.port_status.get(raw_port).ok_or(PdError::InvalidPort)?; + let port_status = self.state.port_status.get(raw_port).ok_or(PdError::InvalidPort)?; *battery_charging_status = - self.determine_battery_charging_capability_status(&mut state.ucsi, command.port(), port_status); + self.determine_battery_charging_capability_status(command.port(), port_status); states_change.set_battery_charging_status_change(battery_charging_status.is_some()); } @@ -147,9 +140,9 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive } } - /// Upate the CCI connector change field based on the current pending port - fn set_cci_connector_change(&self, state: &mut State, cci: &mut GlobalCci) { - if let Some(current_port) = state.pending_ports.front() { + /// Update the CCI connector change field based on the current pending port + fn set_cci_connector_change(&self, cci: &mut GlobalCci) { + if let Some(current_port) = self.state.ucsi.pending_ports.front() { // UCSI connector numbers are 1-based cci.set_connector_change(GlobalPortId(current_port.0 + 1)); } else { @@ -159,10 +152,10 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive } /// Acknowledge the current connector change and move to the next if present - async fn ack_connector_change(&self, state: &mut State, cci: &mut GlobalCci) { + async fn ack_connector_change(&mut self, cci: &mut GlobalCci) { // Pop the just acknowledged port and move to the next if present - if let Some(_current_port) = state.pending_ports.pop_front() { - if let Some(next_port) = state.pending_ports.front() { + if let Some(_current_port) = self.state.ucsi.pending_ports.pop_front() { + if let Some(next_port) = self.state.ucsi.pending_ports.front() { debug!("ACK_CCI processed, next pending port: {:?}", next_port); self.context .broadcast_message(Event::UcsiCci(UsciChangeIndicator { @@ -178,12 +171,11 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive warn!("Received ACK_CCI with no pending connector changes"); } - self.set_cci_connector_change(state, cci); + self.set_cci_connector_change(cci); } /// Process a UCSI command - pub async fn process_ucsi_command(&self, command: &GlobalCommand) -> UcsiResponse { - let state = &mut self.state.lock().await; + pub async fn process_ucsi_command(&mut self, command: &GlobalCommand) -> UcsiResponse { let mut next_input = Some(PpmInput::Command(command)); let mut response = UcsiResponse { notify_opm: false, @@ -196,7 +188,7 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive // Using a loop allows all logic to be centralized loop { let output = if let Some(next_input) = next_input.take() { - state.ucsi.ppm_state_machine.consume(next_input) + self.state.ucsi.ppm_state_machine.consume(next_input) } else { error!("Unexpected end of state machine processing"); return UcsiResponse { @@ -226,12 +218,12 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive match command { ucsi::GlobalCommand::PpmCommand(ppm_command) => { response.data = self - .process_ppm_command(&mut state.ucsi, ppm_command) + .process_ppm_command(ppm_command) .map(|inner| inner.map(ResponseData::Ppm)); } ucsi::GlobalCommand::LpmCommand(lpm_command) => { response.data = self - .process_lpm_command(state, lpm_command) + .process_lpm_command(lpm_command) .await .map(|inner| inner.map(ResponseData::Lpm)); } @@ -240,20 +232,20 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive // Don't return yet, need to inform state machine that command is complete } PpmOutput::OpmNotifyCommandComplete => { - response.notify_opm = state.ucsi.notifications_enabled.cmd_complete(); + response.notify_opm = self.state.ucsi.notifications_enabled.cmd_complete(); response.cci.set_cmd_complete(true); response.cci.set_error(response.data.is_err()); - self.set_cci_connector_change(&mut state.ucsi, &mut response.cci); + self.set_cci_connector_change(&mut response.cci); return response; } PpmOutput::AckComplete(ack) => { - response.notify_opm = state.ucsi.notifications_enabled.cmd_complete(); + response.notify_opm = self.state.ucsi.notifications_enabled.cmd_complete(); if ack.command_complete() { response.cci.set_ack_command(true); } if ack.connector_change() { - self.ack_connector_change(&mut state.ucsi, &mut response.cci).await; + self.ack_connector_change(&mut response.cci).await; } return response; @@ -261,18 +253,18 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive PpmOutput::ResetComplete => { // Resets don't follow the normal command execution flow // So do any reset processing here - self.process_ppm_reset(&mut state.ucsi); + self.process_ppm_reset(); // Don't notify OPM because it'll poll response.notify_opm = false; response.cci = Cci::new_reset_complete(); - self.set_cci_connector_change(&mut state.ucsi, &mut response.cci); + self.set_cci_connector_change(&mut response.cci); return response; } PpmOutput::OpmNotifyBusy => { // Notify if notifications are enabled in general - response.notify_opm = !state.ucsi.notifications_enabled.is_empty(); + response.notify_opm = !self.state.ucsi.notifications_enabled.is_empty(); response.cci.set_busy(true); - self.set_cci_connector_change(&mut state.ucsi, &mut response.cci); + self.set_cci_connector_change(&mut response.cci); return response; } }, @@ -281,7 +273,7 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive response.notify_opm = false; response.cci = Cci::default(); response.data = Ok(None); - self.set_cci_connector_change(&mut state.ucsi, &mut response.cci); + self.set_cci_connector_change(&mut response.cci); return response; } } @@ -290,12 +282,11 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive /// Handle PD port events, update UCSI state, and generate corresponding UCSI notifications pub(super) async fn handle_ucsi_port_event( - &self, + &mut self, port_id: GlobalPortId, port_event: PortStatusChanged, port_status: &PortStatus, ) { - let state = &mut self.state.lock().await.ucsi; let mut ucsi_event = ConnectorStatusChange::default(); ucsi_event.set_connect_change(port_event.plug_inserted_or_removed()); @@ -317,36 +308,48 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive ucsi_event.set_battery_charging_status_change(true); // Power negotiation completed, battery charging capability status is now valid - if state.valid_battery_charging_capability.insert(port_id).is_err() { + if self + .state + .ucsi + .valid_battery_charging_capability + .insert(port_id) + .is_err() + { error!("Valid battery charging capability overflow for port {:?}", port_id); } } if !port_status.is_connected() { // Reset battery charging capability status when disconnected - let _ = state.valid_battery_charging_capability.remove(&port_id); + let _ = self.state.ucsi.valid_battery_charging_capability.remove(&port_id); } - if ucsi_event.filter_enabled(state.notifications_enabled).is_empty() { + if ucsi_event + .filter_enabled(self.state.ucsi.notifications_enabled) + .is_empty() + { trace!("{:?}: event received, but no UCSI notifications enabled", port_id); return; } - self.pend_ucsi_port(state, port_id).await; + self.pend_ucsi_port(port_id).await; } /// Pend UCSI events for all connected ports - pub(super) async fn pend_ucsi_connected_ports(&self, state: &mut super::State) { - for (port_id, port_status) in state.port_status.iter().enumerate() { - if port_status.is_connected() { - self.pend_ucsi_port(&mut state.ucsi, GlobalPortId(port_id as u8)).await; + pub(super) async fn pend_ucsi_connected_ports(&mut self) { + // Panic Safety: i is limited by the length of port_status + #[allow(clippy::indexing_slicing)] + for i in 0..self.state.port_status.len() { + let port_id = GlobalPortId(i as u8); + if self.state.port_status[i].is_connected() { + self.pend_ucsi_port(port_id).await; } } } /// Pend a UCSI event for the given port - async fn pend_ucsi_port(&self, state: &mut State, port_id: GlobalPortId) { - if state.pending_ports.iter().any(|pending| *pending == port_id) { + async fn pend_ucsi_port(&mut self, port_id: GlobalPortId) { + if self.state.ucsi.pending_ports.iter().any(|pending| *pending == port_id) { // Already have a pending event for this port, don't need to process it twice return; } @@ -354,8 +357,8 @@ impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceive // Only notifiy the OPM if we don't have any pending events // Once the OPM starts processing events, the next pending port will be sent as part // of the CCI response to the ACK_CC_CI command. See [`Self::set_cci_connector_change`] - let notify_opm = state.pending_ports.is_empty(); - if state.pending_ports.push_back(port_id).is_ok() { + let notify_opm = self.state.ucsi.pending_ports.is_empty(); + if self.state.ucsi.pending_ports.push_back(port_id).is_ok() { self.context .broadcast_message(Event::UcsiCci(UsciChangeIndicator { port: port_id, diff --git a/type-c-service/src/service/vdm.rs b/type-c-service/src/service/vdm.rs index 6a50ee33..d9259070 100644 --- a/type-c-service/src/service/vdm.rs +++ b/type-c-service/src/service/vdm.rs @@ -1,13 +1,11 @@ //! VDM (Vendor Defined Messages) related functionality. -use embedded_services::event::Receiver; use embedded_usb_pd::{GlobalPortId, PdError}; -use power_policy_interface::service::event::EventData as PowerPolicyEventData; use type_c_interface::port::{AttnVdm, OtherVdm}; use super::Service; -impl<'a, PowerReceiver: Receiver> Service<'a, PowerReceiver> { +impl Service<'_> { /// Get the other vdm for the given port pub async fn get_other_vdm(&self, port_id: GlobalPortId) -> Result { self.context.get_other_vdm(port_id).await diff --git a/type-c-service/src/task.rs b/type-c-service/src/task.rs index 84f30b61..ce446aaa 100644 --- a/type-c-service/src/task.rs +++ b/type-c-service/src/task.rs @@ -1,4 +1,3 @@ -use core::future::Future; use embedded_services::{ error, event::{self, Receiver}, @@ -7,36 +6,26 @@ use embedded_services::{ }; use power_policy_interface::service::event::EventData as PowerPolicyEventData; -use crate::{service::Service, wrapper::ControllerWrapper}; +use crate::{ + service::{EventReceiver, Service}, + wrapper::ControllerWrapper, +}; -/// Task to run the Type-C service, takes a closure to customize the event loop -pub async fn task_closure< - 'a, - M, - D, - S, - V, - PowerReceiver: Receiver, - Fut: Future, - F: Fn(&'a Service<'a, PowerReceiver>) -> Fut, - const N: usize, ->( - service: &'static Service<'a, PowerReceiver>, - wrappers: [&'a ControllerWrapper<'a, M, D, S, V>; N], - cfu_client: &'a cfu_service::CfuClient, - f: F, +/// Task to run the Type-C service, running the default event loop +pub async fn task, const N: usize>( + service: &'static impl Lockable>, + mut event_receiver: EventReceiver<'static, PowerReceiver>, + wrappers: [&'static ControllerWrapper<'static, M, D, S, V>; N], + cfu_client: &'static cfu_service::CfuClient, ) where M: embassy_sync::blocking_mutex::raw::RawMutex, - D: Lockable, + D: embedded_services::sync::Lockable, S: event::Sender, V: crate::wrapper::FwOfferValidator, - D::Inner: type_c_interface::port::Controller, + ::Inner: type_c_interface::port::Controller, { info!("Starting type-c task"); - // TODO: move this service to use the new power policy event subscribers and receivers - // See https://github.com/OpenDevicePartnership/embedded-services/issues/742 - for controller_wrapper in wrappers { if controller_wrapper.register(cfu_client).is_err() { error!("Failed to register a controller"); @@ -45,31 +34,16 @@ pub async fn task_closure< } loop { - f(service).await; - } -} - -/// Task to run the Type-C service, running the default event loop -pub async fn task<'a, M, D, S, V, PowerReceiver: Receiver, const N: usize>( - service: &'static Service<'a, PowerReceiver>, - wrappers: [&'a ControllerWrapper<'a, M, D, S, V>; N], - cfu_client: &'a cfu_service::CfuClient, -) where - M: embassy_sync::blocking_mutex::raw::RawMutex, - D: embedded_services::sync::Lockable, - S: event::Sender, - V: crate::wrapper::FwOfferValidator, - ::Inner: type_c_interface::port::Controller, -{ - task_closure( - service, - wrappers, - cfu_client, - |service: &Service<'_, PowerReceiver>| async { - if let Err(e) = service.process_next_event().await { - error!("Type-C service processing error: {:#?}", e); + let event = match event_receiver.wait_next().await { + Ok(event) => event, + Err(e) => { + error!("Error waiting for event: {:#?}", e); + continue; } - }, - ) - .await; + }; + + if let Err(e) = service.lock().await.process_event(event).await { + error!("Type-C service processing error: {:#?}", e); + } + } } From 07aa6e3139b590663401041408a6723bb2eb25f1 Mon Sep 17 00:00:00 2001 From: Robert Zieba Date: Wed, 1 Apr 2026 10:45:56 -0700 Subject: [PATCH 3/3] Restore commented out `opm_task` --- examples/std/src/bin/type_c/ucsi.rs | 131 +++++++++++++++++++++++++++- 1 file changed, 129 insertions(+), 2 deletions(-) diff --git a/examples/std/src/bin/type_c/ucsi.rs b/examples/std/src/bin/type_c/ucsi.rs index bdb8a9a6..42c16603 100644 --- a/examples/std/src/bin/type_c/ucsi.rs +++ b/examples/std/src/bin/type_c/ucsi.rs @@ -24,8 +24,8 @@ use static_cell::StaticCell; use std_examples::type_c::mock_controller; use type_c_interface::port::ControllerId; use type_c_interface::service::context::Context; -use type_c_service::service::{EventReceiver, Service}; use type_c_service::service::config::Config; +use type_c_service::service::{EventReceiver, Service}; use type_c_service::wrapper::backing::Storage; use type_c_service::wrapper::proxy::PowerProxyDevice; @@ -62,7 +62,134 @@ type ServiceType = Service<'static>; #[embassy_executor::task] async fn opm_task(_context: &'static Context, _state: [&'static mock_controller::ControllerState; NUM_PD_CONTROLLERS]) { - // ... rest of opm_task remains the same ... + // TODO: migrate this logic to an integration test when we move away from 'static lifetimes. + /*const CAPABILITY: PowerCapability = PowerCapability { + voltage_mv: 20000, + current_ma: 5000, + }; + + info!("Resetting PPM..."); + let response: UcsiResponseResult = context + .execute_ucsi_command_external(Command::PpmCommand(ppm::Command::PpmReset)) + .await + .into(); + let response = response.unwrap(); + if !response.cci.reset_complete() || response.cci.error() { + error!("PPM reset failed: {:?}", response.cci); + } else { + info!("PPM reset successful"); + } + + info!("Set Notification enable..."); + let mut notifications = NotificationEnable::default(); + notifications.set_cmd_complete(true); + notifications.set_connect_change(true); + let response: UcsiResponseResult = context + .execute_ucsi_command_external(Command::PpmCommand(ppm::Command::SetNotificationEnable( + ppm::set_notification_enable::Args { + notification_enable: notifications, + }, + ))) + .await + .into(); + let response = response.unwrap(); + if !response.cci.cmd_complete() || response.cci.error() { + error!("Set Notification enable failed: {:?}", response.cci); + } else { + info!("Set Notification enable successful"); + } + + info!("Sending command complete ack..."); + let response: UcsiResponseResult = context + .execute_ucsi_command_external(Command::PpmCommand(ppm::Command::AckCcCi(ppm::ack_cc_ci::Args { + ack: *Ack::default().set_command_complete(true), + }))) + .await + .into(); + let response = response.unwrap(); + if !response.cci.ack_command() || response.cci.error() { + error!("Sending command complete ack failed: {:?}", response.cci); + } else { + info!("Sending command complete ack successful"); + } + + info!("Connecting sink on port 0"); + state[0].connect_sink(CAPABILITY, false).await; + info!("Connecting sink on port 1"); + state[1].connect_sink(CAPABILITY, false).await; + + // Ensure connect flow has time to complete + embassy_time::Timer::after_millis(1000).await; + + info!("Port 0: Get connector status..."); + let response: UcsiResponseResult = context + .execute_ucsi_command_external(Command::LpmCommand(lpm::GlobalCommand::new( + GlobalPortId(0), + lpm::CommandData::GetConnectorStatus, + ))) + .await + .into(); + let response = response.unwrap(); + if !response.cci.cmd_complete() || response.cci.error() { + error!("Get connector status failed: {:?}", response.cci); + } else { + info!( + "Get connector status successful, connector change: {:?}", + response.cci.connector_change() + ); + } + + info!("Sending command complete ack..."); + let response: UcsiResponseResult = context + .execute_ucsi_command_external(Command::PpmCommand(ppm::Command::AckCcCi(ppm::ack_cc_ci::Args { + ack: *Ack::default().set_command_complete(true).set_connector_change(true), + }))) + .await + .into(); + let response = response.unwrap(); + if !response.cci.ack_command() || response.cci.error() { + error!("Sending command complete ack failed: {:?}", response.cci); + } else { + info!( + "Sending command complete ack successful, connector change: {:?}", + response.cci.connector_change() + ); + } + + info!("Port 1: Get connector status..."); + let response: UcsiResponseResult = context + .execute_ucsi_command_external(Command::LpmCommand(lpm::GlobalCommand::new( + GlobalPortId(1), + lpm::CommandData::GetConnectorStatus, + ))) + .await + .into(); + let response = response.unwrap(); + if !response.cci.cmd_complete() || response.cci.error() { + error!("Get connector status failed: {:?}", response.cci); + } else { + info!( + "Get connector status successful, connector change: {:?}", + response.cci.connector_change() + ); + } + + info!("Sending command complete ack..."); + let response: UcsiResponseResult = context + .execute_ucsi_command_external(Command::PpmCommand(ppm::Command::AckCcCi(ppm::ack_cc_ci::Args { + ack: *Ack::default().set_command_complete(true).set_connector_change(true), + }))) + .await + .into(); + let response = response.unwrap(); + if !response.cci.ack_command() || response.cci.error() { + error!("Sending command complete ack failed: {:?}", response.cci); + } else { + info!( + "Sending command complete ack successful, connector change: {:?}", + response.cci.connector_change() + ); + }*/ } #[embassy_executor::task(pool_size = 2)]