Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mdbook/src/chapter_0/chapter_0_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ timely::execute_from_args(std::env::args(), |worker| {
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe()
.0
);

// introduce data and watch!
Expand Down
1 change: 1 addition & 0 deletions mdbook/src/chapter_1/chapter_1_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fn main() {
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe()
.0
);

// introduce data and watch!
Expand Down
1 change: 1 addition & 0 deletions mdbook/src/chapter_1/chapter_1_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn main() {
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe()
.0
);

// introduce data and watch!
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/event_driven.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
for _step in 0 .. length {
stream = stream.map(|x: ()| x);
}
let probe = stream.probe();
let (probe, _stream) = stream.probe();
inputs.push(input);
probes.push(probe);
});
Expand Down
1 change: 1 addition & 0 deletions timely/examples/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fn main() {
.input_from(&mut input)
.exchange(|&x| x as u64)
.probe()
.0
);


Expand Down
16 changes: 9 additions & 7 deletions timely/src/dataflow/operators/core/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use crate::dataflow::channels::Message;
pub trait Probe<G: Scope, C: Container> {
/// Constructs a progress probe which indicates which timestamps have elapsed at the operator.
///
/// Returns a tuple of a probe handle and the input stream.
///
/// # Examples
/// ```
/// use timely::*;
Expand All @@ -31,8 +33,8 @@ pub trait Probe<G: Scope, C: Container> {
/// // add an input and base computation off of it
/// let (mut input, probe) = worker.dataflow(|scope| {
/// let (input, stream) = scope.new_input();
/// let probe = stream.inspect(|x| println!("hello {:?}", x))
/// .probe();
/// let (probe, _) = stream.inspect(|x| println!("hello {:?}", x))
/// .probe();
/// (input, probe)
/// });
///
Expand All @@ -44,7 +46,7 @@ pub trait Probe<G: Scope, C: Container> {
/// }
/// }).unwrap();
/// ```
fn probe(self) -> Handle<G::Timestamp>;
fn probe(self) -> (Handle<G::Timestamp>, Self);

/// Inserts a progress probe in a stream.
///
Expand Down Expand Up @@ -80,12 +82,12 @@ pub trait Probe<G: Scope, C: Container> {
}

impl<G: Scope, C: Container> Probe<G, C> for StreamCore<G, C> {
fn probe(self) -> Handle<G::Timestamp> {
fn probe(self) -> (Handle<G::Timestamp>, Self) {

// the frontier is shared state; scope updates, handle reads.
let handle = Handle::<G::Timestamp>::new();
self.probe_with(&handle);
handle
let stream = self.probe_with(&handle);
(handle, stream)
}
fn probe_with(self, handle: &Handle<G::Timestamp>) -> StreamCore<G, C> {

Expand Down Expand Up @@ -197,7 +199,7 @@ mod tests {
// create a new input, and inspect its output
let (mut input, probe) = worker.dataflow(move |scope| {
let (input, stream) = scope.new_input::<String>();
(input, stream.probe())
(input, stream.probe().0)
});

// introduce data and watch!
Expand Down
3 changes: 1 addition & 2 deletions timely/src/synchronization/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<A: Allocate> Barrier<A> {
use crate::dataflow::operators::{Input, Probe};
let (input, probe) = worker.dataflow(|scope| {
let (handle, stream) = scope.new_input::<()>();
(handle, stream.probe())
(handle, stream.probe().0)
});
Barrier { input, probe, worker: worker.clone() }
}
Expand Down Expand Up @@ -51,4 +51,3 @@ impl<A: Allocate> Barrier<A> {
!self.probe.less_than(self.input.time())
}
}

2 changes: 2 additions & 0 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ impl<A: Allocate> Worker<A> {
/// .to_stream(scope)
/// .inspect(|x| println!("{:?}", x))
/// .probe()
/// .0
/// });
///
/// worker.step_while(|| probe.less_than(&0));
Expand Down Expand Up @@ -477,6 +478,7 @@ impl<A: Allocate> Worker<A> {
/// .to_stream(scope)
/// .inspect(|x| println!("{:?}", x))
/// .probe()
/// .0
/// });
///
/// worker.step_or_park_while(None, || probe.less_than(&0));
Expand Down
1 change: 1 addition & 0 deletions timely/tests/gh_523.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ fn gh_523() {
})
.exchange(|x| *x)
.probe()
.0
});

for round in 0..2 {
Expand Down
Loading