Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use timely::dataflow::operators::*;
fn main() {
timely::example(|scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x));
});
}
Expand Down Expand Up @@ -66,6 +67,7 @@ fn main() {
// create a new input, exchange data, and inspect its output
worker.dataflow(|scope| {
scope.input_from(&mut input)
.container::<Vec<_>>()
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe_with(&mut probe);
Expand Down Expand Up @@ -177,14 +179,6 @@ With the current interfaces there is not much to be done. One possible change wo

The timely communication layer currently discards most buffers it moves through exchange channels, because it doesn't have a sane way of rate controlling the output, nor a sane way to determine how many buffers should be cached. If either of these problems were fixed, it would make sense to recycle the buffers to avoid random allocations, especially for small batches. These changes have something like a 10%-20% performance impact in the `dataflow-join` triangle computation workload.

## Support for non-serializable types

The communication layer is based on a type `Content<T>` which can be backed by typed or binary data. Consequently, it requires that the type it supports be serializable, because it needs to have logic for the case that the data is binary, even if this case is not used. It seems like the `Stream` type should be extendable to be parametric in the type of storage used for the data, so that we can express the fact that some types are not serializable and that this is ok.

**NOTE**: Differential dataflow demonstrates how to do this at the user level in its `operators/arrange.rs`, if somewhat sketchily (with a wrapper that lies about the properties of the type it transports).

This would allow us to safely pass `Rc<T>` types around, as long as we use the `Pipeline` parallelization contract.

## Coarse- vs fine-grained timestamps

The progress tracking machinery involves some non-trivial overhead per timestamp. This means that using very fine-grained timestamps, for example the nanosecond at which a record is processed, can swamp the progress tracking logic. By contrast, the logging infrastructure demotes nanoseconds to data, part of the logged payload, and approximates batches of events with the smallest timestamp in the batch. This is less accurate from a progress tracking point of view, but more performant. It may be possible to generalize this so that users can write programs without thinking about granularity of timestamp, and the system automatically coarsens when possible (essentially boxcar-ing times).
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_0/chapter_0_0.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Let's start with what may be the simplest non-trivial timely dataflow program.
```rust
# extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect};
use timely::dataflow::operators::{vec::ToStream, Inspect};

timely::example(|scope| {
(0..10).to_stream(scope)
Expand Down
1 change: 1 addition & 0 deletions mdbook/src/chapter_0/chapter_0_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ timely::execute_from_args(std::env::args(), |worker| {
// create a new input, exchange data, and inspect its output
let probe = worker.dataflow(|scope|
scope.input_from(&mut input)
.container::<Vec<_>>()
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe()
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_0/chapter_0_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Timely dataflow may be a different programming model than you are used to, but i

* **Data Parallelism**: The operators in timely dataflow are largely "data-parallel", meaning they can operate on independent parts of the data concurrently. This allows the underlying system to distribute timely dataflow computations across multiple parallel workers. These can be threads on your computer, or even threads across computers in a cluster you have access to. This distribution typically improves the throughput of the system, and lets you scale to larger problems with access to more resources (computation, communication, and memory).

* **Streaming Data**: The core data type in timely dataflow is a *stream* of data, an unbounded collection of data not all of which is available right now, but which instead arrives as the computation proceeds. Streams are a helpful generalization of static data sets, which are assumed available at the start of the computation. By expressing your program as a computation on streams, you've explained both how it should respond to static input data sets (feed all the data in at once) but also how it should react to new data that might arrive later on.
* **Streaming Data**: The core data type in timely dataflow is a *stream* of data, an unbounded volume of data not all of which is available right now, but which instead arrives as the computation proceeds. Streams are a helpful generalization of static data sets, which are assumed available at the start of the computation. By expressing your program as a computation on streams, you've explained both how it should respond to static input data sets (feed all the data in at once) but also how it should react to new data that might arrive later on.

* **Expressivity**: Timely dataflow's main addition over traditional stream processors is its ability to express higher-level control constructs, like iteration. This moves stream computations from the limitations of straight line code to the world of *algorithms*. Many of the advantages of timely dataflow computations come from our ability to express a more intelligent algorithm than the alternative systems, which can only express more primitive computations.

Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_1/chapter_1.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Chapter 1: Core Concepts

Timely dataflow relies on two fundamental concepts: **timestamps** and **dataflow**, which together lead to the concept of **progress**. We will want to break down these concepts because they play a fundamental role in understanding how timely dataflow programs are structured.
Timely dataflow relies on two fundamental concepts: **dataflow** and **timestamps**, which together lead to the concept of **progress**. We will want to break down these concepts because they play a fundamental role in understanding how timely dataflow programs are structured.

## Dataflow

Expand Down
10 changes: 6 additions & 4 deletions mdbook/src/chapter_1/chapter_1_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ Dataflow programming is fundamentally about describing your program as independe

Let's write an overly simple dataflow program. Remember our `examples/hello.rs` program? We are going to revisit that, but with some **timestamp** aspects removed. The goal is to get a sense for dataflow with all of its warts, and to get you excited for the next section where we bring back the timestamps. :)

Here is a reduced version of `examples/hello.rs` that just feeds data into our dataflow, without paying any attention to progress made. In particular, we have removed the `probe()` operation, the resulting `probe` variable, and the use of `probe` to determine how long we should step the worker before introducing more data.
Here is a reduced version of `examples/hello.rs` that just feeds data into our dataflow, without paying any attention to progress made.
In particular, we have commented out the line that holds back the introduce of data until `probe` and `input` agree on a time.

```rust
#![allow(unused_variables)]
Expand All @@ -25,6 +26,7 @@ fn main() {
// create a new input, exchange data, and inspect its output
let probe = worker.dataflow(|scope|
scope.input_from(&mut input)
.container::<Vec<_>>()
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe()
Expand All @@ -42,9 +44,9 @@ fn main() {
}
```

This program is a *dataflow program*. There are two dataflow operators here, `exchange` and `inspect`, each of which is asked to do a thing in response to input data. The `exchange` operator takes each datum and hands it to a downstream worker based on the value it sees; with two workers, one will get all the even numbers and the other all the odd numbers. The `inspect` operator takes an action for each datum, in this case printing something to the screen.

Importantly, we haven't imposed any constraints on how these operators need to run. We removed the code that caused the input to be delayed until a certain amount of progress had been made, and it shows in the results when we run with more than one worker:
It turns out this handshake between `probe` and `input` was part of what made the output make any sense.
We waited for `probe` to confirm that the system was caught up before introducing more data to `input`.
When we remove these constraints we get a more haphazard output.

```ignore
Echidnatron% cargo run --example hello -- -w2
Expand Down
22 changes: 11 additions & 11 deletions mdbook/src/chapter_1/chapter_1_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@ The output we get with two workers is now:
Echidnatron% cargo run --example hello -- -w2
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/examples/hello -w2`
worker 1: hello 1 @ (Root, 1)
worker 1: hello 3 @ (Root, 3)
worker 1: hello 5 @ (Root, 5)
worker 0: hello 0 @ (Root, 0)
worker 0: hello 2 @ (Root, 2)
worker 0: hello 4 @ (Root, 4)
worker 0: hello 6 @ (Root, 6)
worker 0: hello 8 @ (Root, 8)
worker 1: hello 7 @ (Root, 7)
worker 1: hello 9 @ (Root, 9)
worker 1: hello 1 @ 1
worker 1: hello 3 @ 3
worker 1: hello 5 @ 5
worker 0: hello 0 @ 0
worker 0: hello 2 @ 2
worker 0: hello 4 @ 4
worker 0: hello 6 @ 6
worker 0: hello 8 @ 8
worker 1: hello 7 @ 7
worker 1: hello 9 @ 9
Echidnatron%
```

The timestamps are the `(Root, i)` things for various values of `i`. These happen to correspond to the data themselves, but had we provided random input data rather than `i` itself we would still be able to make sense of the output and put it back "in order".
The timestamps are the `@ i` things for various values of `i`. These happen to correspond to the data themselves, but had we provided random input data rather than `i` itself we would still be able to make sense of the output and put it back "in order".

## Timestamps for dataflow operators

Expand Down
5 changes: 3 additions & 2 deletions mdbook/src/chapter_1/chapter_1_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ fn main() {
// create a new input, exchange data, and inspect its output
let probe = worker.dataflow(|scope|
scope.input_from(&mut input)
.container::<Vec<_>>()
.exchange(|x| *x)
.inspect(move |x| println!("worker {}:\thello {}", index, x))
.probe()
Expand Down Expand Up @@ -48,13 +49,13 @@ Let's talk about each of them.

## Input capabilities

The `input` structure is how we provide data to a timely dataflow computation, and it has a timestamp associated with it. Initially this timestamp is the default value, usually something like `0` for integers. Whatever timestamp `input` has, it can introduce data with that timestamp or greater. We can advance this timestamp, via the `advance_to` method, which restricts the timestamps we can use to those greater or equal to whatever timestamp is supplied as the argument.
The `input` structure is how we provide data to a timely dataflow computation, and it has a timestamp associated with it. Initially this timestamp is the default value, usually something like `0` for unsigned integers. Whatever timestamp `input` has, it can introduce data with that timestamp or greater. We can advance this timestamp, via the `advance_to` method, which permanently restricts the timestamps we can use to those greater or equal to whatever timestamp is supplied as the argument.

The `advance_to` method is a big deal. This is the moment in the computation where our program reveals to the system, and through the system to all other dataflow workers, that we might soon be able to announce a timestamp as complete. There may still be records in flight bearing that timestamp, but as they are retired the system can finally report that progress has been made.

## Output possibilities

The `probe` structure is how we learn about the possibility of timestamped data at some point in the dataflow graph. We can, at any point, consult a probe with the `less_than` method and ask whether it is still possible that we might see a time less than the argument at that point in the dataflow graph. There is also a `less_equal` method, if you prefer that.
The `probe` structure is how we learn about the possibility of receiving timestamped data at some point in the dataflow graph. We can, at any point, consult a probe with the `less_than` method and ask whether it is still possible that we might see a time less than the argument at that point in the dataflow graph. There is also a `less_equal` method, if you prefer that.

Putting a probe after the `inspect` operator, which passes through all data it receives as input only after invoking its method, tells us whether we should expect to see the method associated with `inspect` fire again for a given timestamp. If we are told we won't see any more messages with timestamp `t` after the `inspect`, then the `inspect` won't see any either.

Expand Down
1 change: 1 addition & 0 deletions mdbook/src/chapter_2/chapter_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use timely::dataflow::operators::{ToStream, Inspect};
fn main() {
timely::example(|scope| {
(0..10).to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x));
});
}
Expand Down
15 changes: 10 additions & 5 deletions mdbook/src/chapter_2/chapter_2_1.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {

let mut input = InputHandle::<(), String>::new();
let mut input = InputHandle::new();

// define a new dataflow
worker.dataflow(|scope| {

let stream1 = input.to_stream(scope);
let stream2 = (0 .. 10).to_stream(scope);
worker.dataflow::<(),_,_>(|scope| {

let stream1 = input.to_stream(scope).container::<Vec<String>>();
let stream2 = (0 .. 10).to_stream(scope).container::<Vec<_>>();
});

}).unwrap();
Expand All @@ -32,6 +31,12 @@ fn main() {

There will be more to do to get data into `input`, and we aren't going to worry about that at the moment. But, now you know two of the places you can get data from!

At this point you may also have questions about the `container()` method with many symbols.
Rust is statically typed, and needs to know the type of things that each program will manipulate.
Timely dataflow bundles your individual data atoms into batches backed by a "container" type, and we need to communicate this type to Rust as well.
In our first case, we've said we have an input but we haven't provided any clues about the type of data, and must do so to compile the program even thought we don't care for the example.
In our second case, we've shown some data (integers) but we haven't revealed how we want to hold on to them, as we communicate the `Vec` structure and leave the data type unspecified with `_` (which Rust can fill in from the type of the integers).

## Other sources

There are other sources of input that are a bit more advanced. Once we learn how to create custom operators, the `source` method will allow us to create a custom operator with zero input streams and one output stream, which looks like a source of data (hence the name). There are also the `Capture` and `Replay` traits that allow us to exfiltrate the contents of a stream from one dataflow (using `capture_into`) and re-load it in another dataflow (using `replay_into`).
6 changes: 4 additions & 2 deletions mdbook/src/chapter_2/chapter_2_2.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ fn main() {
worker.dataflow::<(),_,_>(|scope| {
(0 .. 10)
.to_stream(scope)
.container::<Vec<_>>()
.inspect(|x| println!("hello: {}", x));
});
}).unwrap();
Expand All @@ -36,6 +37,7 @@ fn main() {
worker.dataflow::<(),_,_>(|scope| {
(0 .. 10)
.to_stream(scope)
.container::<Vec<_>>()
.inspect_batch(|t, xs| println!("hello: {:?} @ {:?}", xs, t));
});
}).unwrap();
Expand All @@ -58,8 +60,8 @@ use timely::dataflow::operators::capture::Extract;

fn main() {
let (data1, data2) = timely::example(|scope| {
let data1 = (0..3).to_stream(scope).capture();
let data2 = vec![0,1,2].to_stream(scope).capture();
let data1 = (0..3).to_stream(scope).container::<Vec<_>>().capture();
let data2 = vec![0,1,2].to_stream(scope).container::<Vec<_>>().capture();
(data1, data2)
});

Expand Down
18 changes: 9 additions & 9 deletions mdbook/src/chapter_2/chapter_2_3.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The following program should print out the numbers one through ten.
```rust
extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect, Map};
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};

fn main() {
timely::execute_from_args(std::env::args(), |worker| {
Expand All @@ -27,12 +27,12 @@ fn main() {
}
```

The closure `map` takes *owned* data as input, which means you are able to mutate it as you like without cloning or copying the data. For example, if you have a stream of `String` data, then you could upper-case the string contents without having to make a second copy; your closure owns the data that comes in, with all the benefits that entails.
This `map` operator consumes and produces *owned* data, which allows you to mutate the data without necessarily reallocating. For example, if you have a stream of `String` data, then you could upper-case the string contents without having to make a second copy; your closure owns the data that comes in, with all the benefits that entails.

```rust
extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect, Map};
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};

fn main() {
timely::execute_from_args(std::env::args(), |worker| {
Expand All @@ -56,7 +56,7 @@ For example, the `map_in_place` method takes a closure which receives a mutable
```rust
extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect, Map};
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};

fn main() {
timely::execute_from_args(std::env::args(), |worker| {
Expand All @@ -76,7 +76,7 @@ Alternately, the `flat_map` method takes input data and allows your closure to t
```rust
extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect, Map};
use timely::dataflow::operators::{ToStream, Inspect, vec::Map};

fn main() {
timely::execute_from_args(std::env::args(), |worker| {
Expand All @@ -97,7 +97,7 @@ Another fundamental operation is *filtering*, in which a predicate dictates a su
```rust
extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect, Filter};
use timely::dataflow::operators::{ToStream, Inspect, vec::Filter};

fn main() {
timely::execute_from_args(std::env::args(), |worker| {
Expand All @@ -122,7 +122,7 @@ The `partition` operator takes two arguments, a number of resulting streams to p
```rust
extern crate timely;

use timely::dataflow::operators::{ToStream, Partition, Inspect};
use timely::dataflow::operators::{ToStream, Inspect, vec::Partition};

fn main() {
timely::example(|scope| {
Expand All @@ -143,7 +143,7 @@ In the other direction, `concat` takes two streams and produces one output strea
```rust
extern crate timely;

use timely::dataflow::operators::{ToStream, Partition, Concat, Inspect};
use timely::dataflow::operators::{ToStream, Concat, Inspect, vec::Partition};

fn main() {
timely::example(|scope| {
Expand All @@ -163,7 +163,7 @@ There is also a `concatenate` method defined for scopes which collects all strea
```rust
extern crate timely;

use timely::dataflow::operators::{ToStream, Partition, Concatenate, Inspect};
use timely::dataflow::operators::{ToStream, Concatenate, Inspect, vec::Partition};

fn main() {
timely::example(|scope| {
Expand Down
Loading
Loading