Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions embedded-service/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
//! Common traits for event senders and receivers
use core::{future::ready, marker::PhantomData};

use core::marker::PhantomData;
use crate::error;

use embassy_sync::channel::{DynamicReceiver, DynamicSender};
use embassy_sync::{
channel::{DynamicReceiver, DynamicSender},
pubsub::{DynImmediatePublisher, DynSubscriber, WaitResult},
};

/// Common event sender trait
pub trait Sender<E> {
Expand Down Expand Up @@ -44,6 +48,42 @@ impl<E> Receiver<E> for DynamicReceiver<'_, E> {
}
}

impl<E: Clone> Sender<E> for DynImmediatePublisher<'_, E> {
fn try_send(&mut self, event: E) -> Option<()> {
self.try_publish(event).ok()
}

fn send(&mut self, event: E) -> impl Future<Output = ()> {
self.publish_immediate(event);
ready(())
}
}

impl<E: Clone> Receiver<E> for DynSubscriber<'_, E> {
fn try_next(&mut self) -> Option<E> {
match self.try_next_message() {
Some(WaitResult::Message(e)) => Some(e),
Some(WaitResult::Lagged(e)) => {
error!("Subscriber lagged, skipping {} events", e);
None
}
_ => None,
}
}

async fn wait_next(&mut self) -> E {
loop {
match self.next_message().await {
WaitResult::Message(e) => return e,
WaitResult::Lagged(e) => {
error!("Subscriber lagged, skipping {} events", e);
continue;
}
}
}
}
}

/// A sender that discards all events sent to it.
pub struct NoopSender;

Expand Down
52 changes: 35 additions & 17 deletions examples/rt685s-evk/src/bin/type_c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use embassy_imxrt::{bind_interrupts, peripherals};
use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender};
use embassy_sync::mutex::Mutex;
use embassy_sync::once_lock::OnceLock;
use embassy_sync::pubsub::PubSubChannel;
use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel};
use embassy_time::{self as _, Delay};
use embedded_cfu_protocol::protocol_definitions::{FwUpdateOffer, FwUpdateOfferResponse, FwVersion, HostToken};
use embedded_services::GlobalRawMutex;
use embedded_services::event::NoopSender;
use embedded_services::event::MapSender;
use embedded_services::{error, info};
use embedded_usb_pd::GlobalPortId;
use power_policy_interface::psu;
Expand All @@ -26,7 +26,7 @@ use static_cell::StaticCell;
use tps6699x::asynchronous::embassy as tps6699x;
use type_c_interface::port::ControllerId;
use type_c_service::driver::tps6699x::{self as tps6699x_drv};
use type_c_service::service::Service;
use type_c_service::service::{EventReceiver, Service};
use type_c_service::wrapper::ControllerWrapper;
use type_c_service::wrapper::backing::{IntermediateStorage, ReferencedStorage, Storage};
use type_c_service::wrapper::proxy::PowerProxyDevice;
Expand Down Expand Up @@ -66,11 +66,27 @@ type Wrapper<'a> = ControllerWrapper<
type Controller<'a> = tps6699x::controller::Controller<GlobalRawMutex, BusDevice<'a>>;
type Interrupt<'a> = tps6699x::Interrupt<'a, GlobalRawMutex, BusDevice<'a>>;

type PowerPolicySenderType = MapSender<
power_policy_interface::service::event::Event<'static, DeviceType>,
power_policy_interface::service::event::EventData,
DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>,
fn(
power_policy_interface::service::event::Event<'static, DeviceType>,
) -> power_policy_interface::service::event::EventData,
>;

type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>;

type PowerPolicyServiceType = Mutex<
GlobalRawMutex,
power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 2, NoopSender, 1>>,
power_policy_service::service::Service<
'static,
ArrayRegistration<'static, DeviceType, 2, PowerPolicySenderType, 1>,
>,
>;

type ServiceType = Service<'static>;

#[embassy_executor::task]
async fn pd_controller_task(controller: &'static Wrapper<'static>) {
loop {
Expand All @@ -95,12 +111,13 @@ async fn power_policy_task(

#[embassy_executor::task]
async fn type_c_service_task(
service: &'static Service<'static, DeviceType>,
service: &'static Mutex<GlobalRawMutex, ServiceType>,
event_receiver: EventReceiver<'static, PowerPolicyReceiverType>,
wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS],
cfu_client: &'static CfuClient,
) {
info!("Starting type-c task");
type_c_service::task::task(service, wrappers, cfu_client).await;
type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await;
}

#[embassy_executor::main]
Expand Down Expand Up @@ -196,11 +213,12 @@ async fn main(spawner: Spawner) {

// The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot
static POWER_POLICY_CHANNEL: StaticCell<
PubSubChannel<GlobalRawMutex, power_policy_interface::service::event::Event<'static, DeviceType>, 4, 1, 0>,
PubSubChannel<GlobalRawMutex, power_policy_interface::service::event::EventData, 4, 1, 0>,
> = StaticCell::new();

let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new());
let power_policy_publisher = power_policy_channel.dyn_immediate_publisher();
let power_policy_sender: PowerPolicySenderType =
MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into());
// Guaranteed to not panic since we initialized the channel above
let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap();

Expand All @@ -210,7 +228,7 @@ async fn main(spawner: Spawner) {

let power_policy_registration = ArrayRegistration {
psus: [&wrapper.ports[0].proxy, &wrapper.ports[1].proxy],
service_senders: [NoopSender],
service_senders: [power_policy_sender],
};

static POWER_SERVICE: StaticCell<PowerPolicyServiceType> = StaticCell::new();
Expand All @@ -220,20 +238,20 @@ async fn main(spawner: Spawner) {
power_policy_service::service::config::Config::default(),
)));

static TYPE_C_SERVICE: StaticCell<Service<'static, DeviceType>> = StaticCell::new();
let type_c_service = TYPE_C_SERVICE.init(Service::create(
Default::default(),
controller_context,
power_policy_publisher,
power_policy_subscriber,
));
static TYPE_C_SERVICE: StaticCell<Mutex<GlobalRawMutex, ServiceType>> = StaticCell::new();
let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create(Default::default(), controller_context)));

// Spin up CFU service
static CFU_CLIENT: OnceLock<CfuClient> = OnceLock::new();
let cfu_client = CfuClient::new(&CFU_CLIENT).await;

info!("Spawining type-c service task");
spawner.must_spawn(type_c_service_task(type_c_service, [wrapper], cfu_client));
spawner.must_spawn(type_c_service_task(
type_c_service,
EventReceiver::new(controller_context, power_policy_subscriber),
[wrapper],
cfu_client,
));

info!("Spawining power policy task");
spawner.must_spawn(power_policy_task(
Expand Down
68 changes: 43 additions & 25 deletions examples/rt685s-evk/src/bin/type_c_cfu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ use embassy_imxrt::{bind_interrupts, peripherals};
use embassy_sync::channel::{Channel, DynamicReceiver, DynamicSender};
use embassy_sync::mutex::Mutex;
use embassy_sync::once_lock::OnceLock;
use embassy_sync::pubsub::PubSubChannel;
use embassy_sync::pubsub::{DynImmediatePublisher, DynSubscriber, PubSubChannel};
use embassy_time::Timer;
use embassy_time::{self as _, Delay};
use embedded_cfu_protocol::protocol_definitions::*;
use embedded_cfu_protocol::protocol_definitions::{FwUpdateOffer, FwUpdateOfferResponse, FwVersion};
use embedded_services::GlobalRawMutex;
use embedded_services::event::NoopSender;
use embedded_services::event::MapSender;
use embedded_services::{error, info};
use embedded_usb_pd::GlobalPortId;
use power_policy_interface::psu;
Expand All @@ -29,7 +29,7 @@ use static_cell::StaticCell;
use tps6699x::asynchronous::embassy as tps6699x;
use type_c_interface::port::ControllerId;
use type_c_service::driver::tps6699x::{self as tps6699x_drv};
use type_c_service::service::Service;
use type_c_service::service::{EventReceiver, Service};
use type_c_service::wrapper::ControllerWrapper;
use type_c_service::wrapper::backing::{IntermediateStorage, ReferencedStorage, Storage};
use type_c_service::wrapper::proxy::PowerProxyDevice;
Expand Down Expand Up @@ -64,11 +64,27 @@ type Wrapper<'a> = ControllerWrapper<
type Controller<'a> = tps6699x::controller::Controller<GlobalRawMutex, BusDevice<'a>>;
type Interrupt<'a> = tps6699x::Interrupt<'a, GlobalRawMutex, BusDevice<'a>>;

type PowerPolicySenderType = MapSender<
power_policy_interface::service::event::Event<'static, DeviceType>,
power_policy_interface::service::event::EventData,
DynImmediatePublisher<'static, power_policy_interface::service::event::EventData>,
fn(
power_policy_interface::service::event::Event<'static, DeviceType>,
) -> power_policy_interface::service::event::EventData,
>;

type PowerPolicyReceiverType = DynSubscriber<'static, power_policy_interface::service::event::EventData>;

type PowerPolicyServiceType = Mutex<
GlobalRawMutex,
power_policy_service::service::Service<'static, ArrayRegistration<'static, DeviceType, 2, NoopSender, 1>>,
power_policy_service::service::Service<
'static,
ArrayRegistration<'static, DeviceType, 2, PowerPolicySenderType, 1>,
>,
>;

type ServiceType = Service<'static>;

const NUM_PD_CONTROLLERS: usize = 1;
const CONTROLLER0_ID: ControllerId = ControllerId(0);
const CONTROLLER0_CFU_ID: ComponentId = 0x12;
Expand Down Expand Up @@ -179,12 +195,13 @@ async fn power_policy_task(

#[embassy_executor::task]
async fn type_c_service_task(
service: &'static Service<'static, DeviceType>,
service: &'static Mutex<GlobalRawMutex, ServiceType>,
event_receiver: EventReceiver<'static, PowerPolicyReceiverType>,
wrappers: [&'static Wrapper<'static>; NUM_PD_CONTROLLERS],
cfu_client: &'static CfuClient,
) {
info!("Starting type-c task");
type_c_service::task::task(service, wrappers, cfu_client).await;
type_c_service::task::task(service, event_receiver, wrappers, cfu_client).await;
}

#[embassy_executor::main]
Expand Down Expand Up @@ -286,9 +303,20 @@ async fn main(spawner: Spawner) {
static POWER_SERVICE_CONTEXT: StaticCell<power_policy_service::service::context::Context> = StaticCell::new();
let power_service_context = POWER_SERVICE_CONTEXT.init(power_policy_service::service::context::Context::new());

// The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot
static POWER_POLICY_CHANNEL: StaticCell<
PubSubChannel<GlobalRawMutex, power_policy_interface::service::event::EventData, 4, 1, 0>,
> = StaticCell::new();

let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new());
let power_policy_sender: PowerPolicySenderType =
MapSender::new(power_policy_channel.dyn_immediate_publisher(), |e| e.into());
// Guaranteed to not panic since we initialized the channel above
let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap();

let power_policy_registration = ArrayRegistration {
psus: [&wrapper.ports[0].proxy, &wrapper.ports[1].proxy],
service_senders: [NoopSender],
service_senders: [power_policy_sender],
};

static POWER_SERVICE: StaticCell<PowerPolicyServiceType> = StaticCell::new();
Expand All @@ -298,30 +326,20 @@ async fn main(spawner: Spawner) {
power_policy_service::service::config::Config::default(),
)));

// The service is the only receiver and we only use a DynImmediatePublisher, which doesn't take a publisher slot
static POWER_POLICY_CHANNEL: StaticCell<
PubSubChannel<GlobalRawMutex, power_policy_interface::service::event::Event<'static, DeviceType>, 4, 1, 0>,
> = StaticCell::new();

let power_policy_channel = POWER_POLICY_CHANNEL.init(PubSubChannel::new());
let power_policy_publisher = power_policy_channel.dyn_immediate_publisher();
// Guaranteed to not panic since we initialized the channel above
let power_policy_subscriber = power_policy_channel.dyn_subscriber().unwrap();

static TYPE_C_SERVICE: StaticCell<Service<'static, DeviceType>> = StaticCell::new();
let type_c_service = TYPE_C_SERVICE.init(Service::create(
Default::default(),
controller_context,
power_policy_publisher,
power_policy_subscriber,
));
static TYPE_C_SERVICE: StaticCell<Mutex<GlobalRawMutex, ServiceType>> = StaticCell::new();
let type_c_service = TYPE_C_SERVICE.init(Mutex::new(Service::create(Default::default(), controller_context)));

// Spin up CFU service
static CFU_CLIENT: OnceLock<CfuClient> = OnceLock::new();
let cfu_client = CfuClient::new(&CFU_CLIENT).await;

info!("Spawining type-c service task");
spawner.must_spawn(type_c_service_task(type_c_service, [wrapper], cfu_client));
spawner.must_spawn(type_c_service_task(
type_c_service,
EventReceiver::new(controller_context, power_policy_subscriber),
[wrapper],
cfu_client,
));

info!("Spawining power policy task");
spawner.must_spawn(power_policy_task(
Expand Down
Loading
Loading