Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions agent/src/control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use tokio::{
net::UnixStream,
};

use protocol::{Decoder, FactoryInfo, Message, Payload};
use protocol::{
Decoder, FactoryInfo, Message, Payload, PayloadReq, PayloadRes,
};

pub(crate) mod protocol;
pub(crate) mod server;
Expand All @@ -33,10 +35,16 @@ impl Stuff {
Ok(())
}

async fn req(&mut self, payload: Payload) -> Result<Payload> {
let message = Message { id: self.ids.next().unwrap(), payload };
async fn req(&mut self, req: PayloadReq) -> Result<PayloadRes> {
let message = Message {
id: self.ids.next().unwrap(),
payload: Payload::Req(req),
};
match self.send_and_recv(&message).await {
Ok(response) => Ok(response.payload),
Ok(Message { payload: Payload::Resp(r), .. }) => Ok(r),
Ok(Message { payload: Payload::Req(r), .. }) => {
bail!("received a request instead of a response: {r:?}");
}
Err(e) => {
/*
* Requests to the agent are relatively simple and over a UNIX
Expand All @@ -54,10 +62,10 @@ impl Stuff {
* need to retry until we are able to get a successful response of some
* kind back from the server.
*/
async fn req_retry(&mut self, payload: Payload) -> Result<Payload> {
async fn req_retry(&mut self, req: PayloadReq) -> Result<PayloadRes> {
loop {
match self.req(payload.clone()).await? {
Payload::Error(e) => {
match self.req(req.clone()).await? {
PayloadRes::Error(e) => {
eprintln!(
"WARNING: control request failure (retrying): {e}"
);
Expand Down Expand Up @@ -147,11 +155,12 @@ async fn cmd_address_list(mut l: Level<Stuff>) -> Result<()> {

let filter = a.opts().opt_str("f");

let addrs = match l.context_mut().req(Payload::MetadataAddresses).await? {
Payload::Error(e) => {
let addrs = match l.context_mut().req(PayloadReq::MetadataAddresses).await?
{
PayloadRes::Error(e) => {
bail!("WARNING: control request failure: {e}");
}
Payload::MetadataAddressesResult(addrs) => addrs,
PayloadRes::MetadataAddresses(addrs) => addrs,
other => bail!("unexpected response: {other:?}"),
};

Expand Down Expand Up @@ -249,11 +258,11 @@ async fn cmd_eng(mut l: Level<Stuff>) -> Result<()> {
async fn cmd_eng_metadata(mut l: Level<Stuff>) -> Result<()> {
let _ = no_args!(l);

match l.context_mut().req(Payload::MetadataAddresses).await? {
Payload::Error(e) => {
match l.context_mut().req(PayloadReq::MetadataAddresses).await? {
PayloadRes::Error(e) => {
bail!("WARNING: control request failure: {e}");
}
Payload::MetadataAddressesResult(addrs) => {
PayloadRes::MetadataAddresses(addrs) => {
println!("addrs = {addrs:#?}");
Ok(())
}
Expand Down Expand Up @@ -284,10 +293,10 @@ async fn cmd_store_get(mut l: Level<Stuff>) -> Result<()> {
let no_wait = a.opts().opt_present("W");
let mut printed_wait = false;

let req = Payload::StoreGet(name.clone());
let req = PayloadReq::StoreGet(name.clone());
loop {
match l.context_mut().req_retry(req.clone()).await? {
Payload::StoreGetResult(Some(ent)) => {
PayloadRes::StoreGet(Some(ent)) => {
/*
* Output formatting here should be kept consistent with
* what "buildomat job store get" does outside a job;
Expand All @@ -300,7 +309,7 @@ async fn cmd_store_get(mut l: Level<Stuff>) -> Result<()> {
}
break Ok(());
}
Payload::StoreGetResult(None) => {
PayloadRes::StoreGet(None) => {
if no_wait {
bail!("the store has no value for {name:?}");
}
Expand Down Expand Up @@ -356,10 +365,11 @@ async fn cmd_store_put(mut l: Level<Stuff>) -> Result<()> {
};

let secret = a.opts().opt_present("s");
let req = Payload::StorePut(a.args()[0].to_string(), value.clone(), secret);
let req =
PayloadReq::StorePut(a.args()[0].to_string(), value.clone(), secret);

match l.context_mut().req_retry(req).await? {
Payload::Ack => Ok(()),
PayloadRes::Ack => Ok(()),
other => bail!("unexpected response: {other:?}"),
}
}
Expand All @@ -381,7 +391,7 @@ async fn cmd_process_start(mut l: Level<Stuff>) -> Result<()> {
bad_args!(l, "specify at least a process name and a command to run");
}

let payload = Payload::ProcessStart {
let payload = PayloadReq::ProcessStart {
name: a.args()[0].to_string(),
cmd: a.args()[1].to_string(),
args: a.args().iter().skip(2).cloned().collect::<Vec<_>>(),
Expand All @@ -401,28 +411,28 @@ async fn cmd_process_start(mut l: Level<Stuff>) -> Result<()> {
};

match l.context_mut().req(payload).await? {
Payload::Error(e) => {
PayloadRes::Error(e) => {
/*
* This request is purely local to the agent, so an
* error is not something we should retry indefinitely.
*/
bail!("could not start process: {e}");
}
Payload::Ack => Ok(()),
PayloadRes::Ack => Ok(()),
other => bail!("unexpected response: {other:?}"),
}
}

async fn factory_info(s: &mut Stuff) -> Result<FactoryInfo> {
match s.req(Payload::FactoryInfo).await? {
Payload::Error(e) => {
match s.req(PayloadReq::FactoryInfo).await? {
PayloadRes::Error(e) => {
/*
* This request is purely local to the agent, so an
* error is not something we should retry indefinitely.
*/
bail!("could not get factory info: {e}");
}
Payload::FactoryInfoResult(fi) => Ok(fi),
PayloadRes::FactoryInfo(fi) => Ok(fi),
other => bail!("unexpected response: {other:?}"),
}
}
Expand Down
27 changes: 16 additions & 11 deletions agent/src/control/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,10 @@ pub struct StoreEntry {
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum Payload {
Ack,
Error(String),

pub enum PayloadReq {
StoreGet(String),
StoreGetResult(Option<StoreEntry>),

StorePut(String, String, bool),

MetadataAddresses,
MetadataAddressesResult(Vec<metadata::FactoryAddresses>),

ProcessStart {
name: String,
cmd: String,
Expand All @@ -45,9 +37,22 @@ pub enum Payload {
uid: u32,
gid: u32,
},

FactoryInfo,
FactoryInfoResult(FactoryInfo),
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum PayloadRes {
Ack,
Error(String),
StoreGet(Option<StoreEntry>),
MetadataAddresses(Vec<metadata::FactoryAddresses>),
FactoryInfo(FactoryInfo),
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub enum Payload {
Req(PayloadReq),
Resp(PayloadRes),
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand Down
24 changes: 11 additions & 13 deletions agent/src/control/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,22 @@ use tokio::{
};

use super::{
protocol::{Decoder, Message, Payload},
protocol::{Decoder, Message, Payload, PayloadReq, PayloadRes},
SOCKET_PATH,
};

#[derive(Debug)]
pub struct Request {
id: u64,
payload: Payload,
payload: PayloadReq,
conn: Arc<Connection>,
}

impl Request {
pub async fn reply(self, payload: Payload) {
let m = Message { id: self.id, payload }.pack().unwrap();
pub async fn reply(self, resp: PayloadRes) {
let m = Message { id: self.id, payload: Payload::Resp(resp) }
.pack()
.unwrap();

/*
* Put the serialised message on the write queue for the socket from
Expand All @@ -40,7 +42,7 @@ impl Request {
self.conn.notify.notify_one();
}

pub fn payload(&self) -> &Payload {
pub fn payload(&self) -> &PayloadReq {
&self.payload
}
}
Expand Down Expand Up @@ -201,25 +203,21 @@ async fn handle_client_turn(
while let Some(msg) = ci.decoder.take()? {
match ci.state {
ClientState::Running => match &msg.payload {
Payload::StoreGet(..)
| Payload::StorePut(..)
| Payload::MetadataAddresses
| Payload::ProcessStart { .. }
| Payload::FactoryInfo => {
Payload::Req(request) => {
/*
* These are requests from the control program. Pass them
* on to the main loop.
*/
let req = Request {
id: msg.id,
payload: msg.payload.clone(),
payload: request.clone(),
conn: Arc::clone(conn),
};

tx.send(req).await.unwrap();
}
other => {
bail!("unexpected message {:?}", other);
Payload::Resp(resp) => {
bail!("received response instead of request: {resp:?}");
}
},
}
Expand Down
29 changes: 14 additions & 15 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ mod exec;
mod shadow;
mod upload;

use control::protocol::{FactoryInfo, Payload};
use control::protocol::{FactoryInfo, PayloadReq, PayloadRes};
use exec::ExitDetails;

struct Agent {
Expand Down Expand Up @@ -1402,7 +1402,7 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
* Handle requests from the control program.
*/
let reply = match req.payload() {
Payload::StoreGet(name) => {
PayloadReq::StoreGet(name) => {
match cw
.client
.worker_job_store_get()
Expand All @@ -1411,17 +1411,17 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
.send()
.await
{
Ok(res) => Payload::StoreGetResult(
Ok(res) => PayloadRes::StoreGet(
res.into_inner().value.map(|v| StoreEntry {
name: name.to_string(),
value: v.value,
secret: v.secret,
}),
),
Err(e) => Payload::Error(e.to_string()),
Err(e) => PayloadRes::Error(e.to_string()),
}
}
Payload::StorePut(name, value, secret) => {
PayloadReq::StorePut(name, value, secret) => {
match cw
.client
.worker_job_store_put()
Expand All @@ -1431,17 +1431,17 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
.send()
.await
{
Ok(..) => Payload::Ack,
Err(e) => Payload::Error(e.to_string()),
Ok(..) => PayloadRes::Ack,
Err(e) => PayloadRes::Error(e.to_string()),
}
}
Payload::MetadataAddresses => Payload::MetadataAddressesResult(
PayloadReq::MetadataAddresses => PayloadRes::MetadataAddresses(
metadata
.as_ref()
.map(|md| md.addresses().to_vec())
.unwrap_or_default(),
),
Payload::ProcessStart {
PayloadReq::ProcessStart {
name,
cmd,
args,
Expand All @@ -1451,22 +1451,21 @@ async fn cmd_run(mut l: Level<Agent>) -> Result<()> {
gid,
} => {
match bgprocs.start(name, cmd, args, env, pwd, *uid, *gid) {
Ok(_) => Payload::Ack,
Err(e) => Payload::Error(e.to_string()),
Ok(_) => PayloadRes::Ack,
Err(e) => PayloadRes::Error(e.to_string()),
}
}
Payload::FactoryInfo => {
PayloadReq::FactoryInfo => {
if let Some(f) = &factory {
Payload::FactoryInfoResult(FactoryInfo {
PayloadRes::FactoryInfo(FactoryInfo {
id: f.id.to_string(),
name: f.name.to_string(),
private: f.private.clone(),
})
} else {
Payload::Error("factory info not available".into())
PayloadRes::Error("factory info not available".into())
}
}
_ => Payload::Error("unexpected message type".to_string()),
};

req.reply(reply).await;
Expand Down