diff --git a/README.md b/README.md index 394a57577..124a3067d 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ use timely::dataflow::operators::*; fn main() { timely::example(|scope| { (0..10).to_stream(scope) + .container::>() .inspect(|x| println!("seen: {:?}", x)); }); } @@ -66,6 +67,7 @@ fn main() { // create a new input, exchange data, and inspect its output worker.dataflow(|scope| { scope.input_from(&mut input) + .container::>() .exchange(|x| *x) .inspect(move |x| println!("worker {}:\thello {}", index, x)) .probe_with(&mut probe); @@ -177,14 +179,6 @@ With the current interfaces there is not much to be done. One possible change wo The timely communication layer currently discards most buffers it moves through exchange channels, because it doesn't have a sane way of rate controlling the output, nor a sane way to determine how many buffers should be cached. If either of these problems were fixed, it would make sense to recycle the buffers to avoid random allocations, especially for small batches. These changes have something like a 10%-20% performance impact in the `dataflow-join` triangle computation workload. -## Support for non-serializable types - -The communication layer is based on a type `Content` which can be backed by typed or binary data. Consequently, it requires that the type it supports be serializable, because it needs to have logic for the case that the data is binary, even if this case is not used. It seems like the `Stream` type should be extendable to be parametric in the type of storage used for the data, so that we can express the fact that some types are not serializable and that this is ok. - -**NOTE**: Differential dataflow demonstrates how to do this at the user level in its `operators/arrange.rs`, if somewhat sketchily (with a wrapper that lies about the properties of the type it transports). - -This would allow us to safely pass `Rc` types around, as long as we use the `Pipeline` parallelization contract. - ## Coarse- vs fine-grained timestamps The progress tracking machinery involves some non-trivial overhead per timestamp. This means that using very fine-grained timestamps, for example the nanosecond at which a record is processed, can swamp the progress tracking logic. By contrast, the logging infrastructure demotes nanoseconds to data, part of the logged payload, and approximates batches of events with the smallest timestamp in the batch. This is less accurate from a progress tracking point of view, but more performant. It may be possible to generalize this so that users can write programs without thinking about granularity of timestamp, and the system automatically coarsens when possible (essentially boxcar-ing times). diff --git a/mdbook/src/chapter_0/chapter_0_0.md b/mdbook/src/chapter_0/chapter_0_0.md index 5399f3386..ee761300d 100644 --- a/mdbook/src/chapter_0/chapter_0_0.md +++ b/mdbook/src/chapter_0/chapter_0_0.md @@ -5,7 +5,7 @@ Let's start with what may be the simplest non-trivial timely dataflow program. ```rust # extern crate timely; -use timely::dataflow::operators::{ToStream, Inspect}; +use timely::dataflow::operators::{vec::ToStream, Inspect}; timely::example(|scope| { (0..10).to_stream(scope) diff --git a/mdbook/src/chapter_0/chapter_0_1.md b/mdbook/src/chapter_0/chapter_0_1.md index 2858d9780..e2f0a8b3c 100644 --- a/mdbook/src/chapter_0/chapter_0_1.md +++ b/mdbook/src/chapter_0/chapter_0_1.md @@ -19,6 +19,7 @@ timely::execute_from_args(std::env::args(), |worker| { // create a new input, exchange data, and inspect its output let probe = worker.dataflow(|scope| scope.input_from(&mut input) + .container::>() .exchange(|x| *x) .inspect(move |x| println!("worker {}:\thello {}", index, x)) .probe() diff --git a/mdbook/src/chapter_0/chapter_0_2.md b/mdbook/src/chapter_0/chapter_0_2.md index e759b5ad5..c79e9135f 100644 --- a/mdbook/src/chapter_0/chapter_0_2.md +++ b/mdbook/src/chapter_0/chapter_0_2.md @@ -4,7 +4,7 @@ Timely dataflow may be a different programming model than you are used to, but i * **Data Parallelism**: The operators in timely dataflow are largely "data-parallel", meaning they can operate on independent parts of the data concurrently. This allows the underlying system to distribute timely dataflow computations across multiple parallel workers. These can be threads on your computer, or even threads across computers in a cluster you have access to. This distribution typically improves the throughput of the system, and lets you scale to larger problems with access to more resources (computation, communication, and memory). -* **Streaming Data**: The core data type in timely dataflow is a *stream* of data, an unbounded collection of data not all of which is available right now, but which instead arrives as the computation proceeds. Streams are a helpful generalization of static data sets, which are assumed available at the start of the computation. By expressing your program as a computation on streams, you've explained both how it should respond to static input data sets (feed all the data in at once) but also how it should react to new data that might arrive later on. +* **Streaming Data**: The core data type in timely dataflow is a *stream* of data, an unbounded volume of data not all of which is available right now, but which instead arrives as the computation proceeds. Streams are a helpful generalization of static data sets, which are assumed available at the start of the computation. By expressing your program as a computation on streams, you've explained both how it should respond to static input data sets (feed all the data in at once) but also how it should react to new data that might arrive later on. * **Expressivity**: Timely dataflow's main addition over traditional stream processors is its ability to express higher-level control constructs, like iteration. This moves stream computations from the limitations of straight line code to the world of *algorithms*. Many of the advantages of timely dataflow computations come from our ability to express a more intelligent algorithm than the alternative systems, which can only express more primitive computations. diff --git a/mdbook/src/chapter_1/chapter_1.md b/mdbook/src/chapter_1/chapter_1.md index 2ea3f6dad..1f62ba90f 100644 --- a/mdbook/src/chapter_1/chapter_1.md +++ b/mdbook/src/chapter_1/chapter_1.md @@ -1,6 +1,6 @@ # Chapter 1: Core Concepts -Timely dataflow relies on two fundamental concepts: **timestamps** and **dataflow**, which together lead to the concept of **progress**. We will want to break down these concepts because they play a fundamental role in understanding how timely dataflow programs are structured. +Timely dataflow relies on two fundamental concepts: **dataflow** and **timestamps**, which together lead to the concept of **progress**. We will want to break down these concepts because they play a fundamental role in understanding how timely dataflow programs are structured. ## Dataflow diff --git a/mdbook/src/chapter_1/chapter_1_1.md b/mdbook/src/chapter_1/chapter_1_1.md index 9d5c75381..0e42bd260 100644 --- a/mdbook/src/chapter_1/chapter_1_1.md +++ b/mdbook/src/chapter_1/chapter_1_1.md @@ -6,7 +6,8 @@ Dataflow programming is fundamentally about describing your program as independe Let's write an overly simple dataflow program. Remember our `examples/hello.rs` program? We are going to revisit that, but with some **timestamp** aspects removed. The goal is to get a sense for dataflow with all of its warts, and to get you excited for the next section where we bring back the timestamps. :) -Here is a reduced version of `examples/hello.rs` that just feeds data into our dataflow, without paying any attention to progress made. In particular, we have removed the `probe()` operation, the resulting `probe` variable, and the use of `probe` to determine how long we should step the worker before introducing more data. +Here is a reduced version of `examples/hello.rs` that just feeds data into our dataflow, without paying any attention to progress made. +In particular, we have commented out the line that holds back the introduce of data until `probe` and `input` agree on a time. ```rust #![allow(unused_variables)] @@ -25,6 +26,7 @@ fn main() { // create a new input, exchange data, and inspect its output let probe = worker.dataflow(|scope| scope.input_from(&mut input) + .container::>() .exchange(|x| *x) .inspect(move |x| println!("worker {}:\thello {}", index, x)) .probe() @@ -42,9 +44,9 @@ fn main() { } ``` -This program is a *dataflow program*. There are two dataflow operators here, `exchange` and `inspect`, each of which is asked to do a thing in response to input data. The `exchange` operator takes each datum and hands it to a downstream worker based on the value it sees; with two workers, one will get all the even numbers and the other all the odd numbers. The `inspect` operator takes an action for each datum, in this case printing something to the screen. - -Importantly, we haven't imposed any constraints on how these operators need to run. We removed the code that caused the input to be delayed until a certain amount of progress had been made, and it shows in the results when we run with more than one worker: +It turns out this handshake between `probe` and `input` was part of what made the output make any sense. +We waited for `probe` to confirm that the system was caught up before introducing more data to `input`. +When we remove these constraints we get a more haphazard output. ```ignore Echidnatron% cargo run --example hello -- -w2 diff --git a/mdbook/src/chapter_1/chapter_1_2.md b/mdbook/src/chapter_1/chapter_1_2.md index 5e8dd829e..25fe5684f 100644 --- a/mdbook/src/chapter_1/chapter_1_2.md +++ b/mdbook/src/chapter_1/chapter_1_2.md @@ -34,20 +34,20 @@ The output we get with two workers is now: Echidnatron% cargo run --example hello -- -w2 Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs Running `target/debug/examples/hello -w2` - worker 1: hello 1 @ (Root, 1) - worker 1: hello 3 @ (Root, 3) - worker 1: hello 5 @ (Root, 5) - worker 0: hello 0 @ (Root, 0) - worker 0: hello 2 @ (Root, 2) - worker 0: hello 4 @ (Root, 4) - worker 0: hello 6 @ (Root, 6) - worker 0: hello 8 @ (Root, 8) - worker 1: hello 7 @ (Root, 7) - worker 1: hello 9 @ (Root, 9) + worker 1: hello 1 @ 1 + worker 1: hello 3 @ 3 + worker 1: hello 5 @ 5 + worker 0: hello 0 @ 0 + worker 0: hello 2 @ 2 + worker 0: hello 4 @ 4 + worker 0: hello 6 @ 6 + worker 0: hello 8 @ 8 + worker 1: hello 7 @ 7 + worker 1: hello 9 @ 9 Echidnatron% ``` -The timestamps are the `(Root, i)` things for various values of `i`. These happen to correspond to the data themselves, but had we provided random input data rather than `i` itself we would still be able to make sense of the output and put it back "in order". +The timestamps are the `@ i` things for various values of `i`. These happen to correspond to the data themselves, but had we provided random input data rather than `i` itself we would still be able to make sense of the output and put it back "in order". ## Timestamps for dataflow operators diff --git a/mdbook/src/chapter_1/chapter_1_3.md b/mdbook/src/chapter_1/chapter_1_3.md index 0d7107ec7..a05be688e 100644 --- a/mdbook/src/chapter_1/chapter_1_3.md +++ b/mdbook/src/chapter_1/chapter_1_3.md @@ -20,6 +20,7 @@ fn main() { // create a new input, exchange data, and inspect its output let probe = worker.dataflow(|scope| scope.input_from(&mut input) + .container::>() .exchange(|x| *x) .inspect(move |x| println!("worker {}:\thello {}", index, x)) .probe() @@ -48,13 +49,13 @@ Let's talk about each of them. ## Input capabilities -The `input` structure is how we provide data to a timely dataflow computation, and it has a timestamp associated with it. Initially this timestamp is the default value, usually something like `0` for integers. Whatever timestamp `input` has, it can introduce data with that timestamp or greater. We can advance this timestamp, via the `advance_to` method, which restricts the timestamps we can use to those greater or equal to whatever timestamp is supplied as the argument. +The `input` structure is how we provide data to a timely dataflow computation, and it has a timestamp associated with it. Initially this timestamp is the default value, usually something like `0` for unsigned integers. Whatever timestamp `input` has, it can introduce data with that timestamp or greater. We can advance this timestamp, via the `advance_to` method, which permanently restricts the timestamps we can use to those greater or equal to whatever timestamp is supplied as the argument. The `advance_to` method is a big deal. This is the moment in the computation where our program reveals to the system, and through the system to all other dataflow workers, that we might soon be able to announce a timestamp as complete. There may still be records in flight bearing that timestamp, but as they are retired the system can finally report that progress has been made. ## Output possibilities -The `probe` structure is how we learn about the possibility of timestamped data at some point in the dataflow graph. We can, at any point, consult a probe with the `less_than` method and ask whether it is still possible that we might see a time less than the argument at that point in the dataflow graph. There is also a `less_equal` method, if you prefer that. +The `probe` structure is how we learn about the possibility of receiving timestamped data at some point in the dataflow graph. We can, at any point, consult a probe with the `less_than` method and ask whether it is still possible that we might see a time less than the argument at that point in the dataflow graph. There is also a `less_equal` method, if you prefer that. Putting a probe after the `inspect` operator, which passes through all data it receives as input only after invoking its method, tells us whether we should expect to see the method associated with `inspect` fire again for a given timestamp. If we are told we won't see any more messages with timestamp `t` after the `inspect`, then the `inspect` won't see any either. diff --git a/mdbook/src/chapter_2/chapter_2.md b/mdbook/src/chapter_2/chapter_2.md index 7fa23fc20..60c85080c 100644 --- a/mdbook/src/chapter_2/chapter_2.md +++ b/mdbook/src/chapter_2/chapter_2.md @@ -14,6 +14,7 @@ use timely::dataflow::operators::{ToStream, Inspect}; fn main() { timely::example(|scope| { (0..10).to_stream(scope) + .container::>() .inspect(|x| println!("seen: {:?}", x)); }); } diff --git a/mdbook/src/chapter_2/chapter_2_1.md b/mdbook/src/chapter_2/chapter_2_1.md index bccfaa913..27b91c3ff 100644 --- a/mdbook/src/chapter_2/chapter_2_1.md +++ b/mdbook/src/chapter_2/chapter_2_1.md @@ -16,14 +16,13 @@ fn main() { // initializes and runs a timely dataflow. timely::execute_from_args(std::env::args(), |worker| { - let mut input = InputHandle::<(), String>::new(); + let mut input = InputHandle::new(); // define a new dataflow - worker.dataflow(|scope| { - - let stream1 = input.to_stream(scope); - let stream2 = (0 .. 10).to_stream(scope); + worker.dataflow::<(),_,_>(|scope| { + let stream1 = input.to_stream(scope).container::>(); + let stream2 = (0 .. 10).to_stream(scope).container::>(); }); }).unwrap(); @@ -32,6 +31,12 @@ fn main() { There will be more to do to get data into `input`, and we aren't going to worry about that at the moment. But, now you know two of the places you can get data from! +At this point you may also have questions about the `container()` method with many symbols. +Rust is statically typed, and needs to know the type of things that each program will manipulate. +Timely dataflow bundles your individual data atoms into batches backed by a "container" type, and we need to communicate this type to Rust as well. +In our first case, we've said we have an input but we haven't provided any clues about the type of data, and must do so to compile the program even thought we don't care for the example. +In our second case, we've shown some data (integers) but we haven't revealed how we want to hold on to them, as we communicate the `Vec` structure and leave the data type unspecified with `_` (which Rust can fill in from the type of the integers). + ## Other sources There are other sources of input that are a bit more advanced. Once we learn how to create custom operators, the `source` method will allow us to create a custom operator with zero input streams and one output stream, which looks like a source of data (hence the name). There are also the `Capture` and `Replay` traits that allow us to exfiltrate the contents of a stream from one dataflow (using `capture_into`) and re-load it in another dataflow (using `replay_into`). diff --git a/mdbook/src/chapter_2/chapter_2_2.md b/mdbook/src/chapter_2/chapter_2_2.md index f16c69c19..8f2152aed 100644 --- a/mdbook/src/chapter_2/chapter_2_2.md +++ b/mdbook/src/chapter_2/chapter_2_2.md @@ -14,6 +14,7 @@ fn main() { worker.dataflow::<(),_,_>(|scope| { (0 .. 10) .to_stream(scope) + .container::>() .inspect(|x| println!("hello: {}", x)); }); }).unwrap(); @@ -36,6 +37,7 @@ fn main() { worker.dataflow::<(),_,_>(|scope| { (0 .. 10) .to_stream(scope) + .container::>() .inspect_batch(|t, xs| println!("hello: {:?} @ {:?}", xs, t)); }); }).unwrap(); @@ -58,8 +60,8 @@ use timely::dataflow::operators::capture::Extract; fn main() { let (data1, data2) = timely::example(|scope| { - let data1 = (0..3).to_stream(scope).capture(); - let data2 = vec![0,1,2].to_stream(scope).capture(); + let data1 = (0..3).to_stream(scope).container::>().capture(); + let data2 = vec![0,1,2].to_stream(scope).container::>().capture(); (data1, data2) }); diff --git a/mdbook/src/chapter_2/chapter_2_3.md b/mdbook/src/chapter_2/chapter_2_3.md index 5ca859a33..b0c6b31ef 100644 --- a/mdbook/src/chapter_2/chapter_2_3.md +++ b/mdbook/src/chapter_2/chapter_2_3.md @@ -13,7 +13,7 @@ The following program should print out the numbers one through ten. ```rust extern crate timely; -use timely::dataflow::operators::{ToStream, Inspect, Map}; +use timely::dataflow::operators::{ToStream, Inspect, vec::Map}; fn main() { timely::execute_from_args(std::env::args(), |worker| { @@ -27,12 +27,12 @@ fn main() { } ``` -The closure `map` takes *owned* data as input, which means you are able to mutate it as you like without cloning or copying the data. For example, if you have a stream of `String` data, then you could upper-case the string contents without having to make a second copy; your closure owns the data that comes in, with all the benefits that entails. +This `map` operator consumes and produces *owned* data, which allows you to mutate the data without necessarily reallocating. For example, if you have a stream of `String` data, then you could upper-case the string contents without having to make a second copy; your closure owns the data that comes in, with all the benefits that entails. ```rust extern crate timely; -use timely::dataflow::operators::{ToStream, Inspect, Map}; +use timely::dataflow::operators::{ToStream, Inspect, vec::Map}; fn main() { timely::execute_from_args(std::env::args(), |worker| { @@ -56,7 +56,7 @@ For example, the `map_in_place` method takes a closure which receives a mutable ```rust extern crate timely; -use timely::dataflow::operators::{ToStream, Inspect, Map}; +use timely::dataflow::operators::{ToStream, Inspect, vec::Map}; fn main() { timely::execute_from_args(std::env::args(), |worker| { @@ -76,7 +76,7 @@ Alternately, the `flat_map` method takes input data and allows your closure to t ```rust extern crate timely; -use timely::dataflow::operators::{ToStream, Inspect, Map}; +use timely::dataflow::operators::{ToStream, Inspect, vec::Map}; fn main() { timely::execute_from_args(std::env::args(), |worker| { @@ -97,7 +97,7 @@ Another fundamental operation is *filtering*, in which a predicate dictates a su ```rust extern crate timely; -use timely::dataflow::operators::{ToStream, Inspect, Filter}; +use timely::dataflow::operators::{ToStream, Inspect, vec::Filter}; fn main() { timely::execute_from_args(std::env::args(), |worker| { @@ -122,7 +122,7 @@ The `partition` operator takes two arguments, a number of resulting streams to p ```rust extern crate timely; -use timely::dataflow::operators::{ToStream, Partition, Inspect}; +use timely::dataflow::operators::{ToStream, Inspect, vec::Partition}; fn main() { timely::example(|scope| { @@ -143,7 +143,7 @@ In the other direction, `concat` takes two streams and produces one output strea ```rust extern crate timely; -use timely::dataflow::operators::{ToStream, Partition, Concat, Inspect}; +use timely::dataflow::operators::{ToStream, Concat, Inspect, vec::Partition}; fn main() { timely::example(|scope| { @@ -163,7 +163,7 @@ There is also a `concatenate` method defined for scopes which collects all strea ```rust extern crate timely; -use timely::dataflow::operators::{ToStream, Partition, Concatenate, Inspect}; +use timely::dataflow::operators::{ToStream, Concatenate, Inspect, vec::Partition}; fn main() { timely::example(|scope| { diff --git a/mdbook/src/chapter_2/chapter_2_4.md b/mdbook/src/chapter_2/chapter_2_4.md index 3fa4d2f59..49e71eaf7 100644 --- a/mdbook/src/chapter_2/chapter_2_4.md +++ b/mdbook/src/chapter_2/chapter_2_4.md @@ -2,23 +2,22 @@ What if there isn't an operator that does what you want to do? What if what you want to do is better written as imperative code rather than a tangle of dataflow operators? Not a problem! Timely dataflow has you covered. -Timely has several "generic" dataflow operators that are pretty much ready to run, except someone (you) needs to supply their implementation. This isn't as scary as it sounds; you just need to write a closure that says "given a handle to my inputs and outputs, what do I do when timely asks me to run?". +Timely has several "generic" dataflow operators that are pretty much ready to run, except that someone (you) needs to supply their implementation. This isn't as scary as it sounds; you just need to write a closure that says "given a handle to my inputs and outputs, what do I do when timely asks me to run?". Let's look at an example ```rust extern crate timely; -use timely::dataflow::operators::ToStream; -use timely::dataflow::operators::generic::operator::Operator; +use timely::dataflow::operators::{ToStream, Operator}; use timely::dataflow::channels::pact::Pipeline; fn main() { timely::example(|scope| { (0u64..10) .to_stream(scope) + .container::>() .unary(Pipeline, "increment", |capability, info| { - move |input, output| { input.for_each_time(|time, data| { let mut session = output.session(&time); @@ -33,15 +32,25 @@ fn main() { } ``` -What is going on here? The heart of the mess is the dataflow operator `unary`, which is a ready-to-assemble dataflow operator with one input and one output. The `unary` operator takes three arguments (it looks like so many more!): (i) instructions about how it should distribute its inputs, (ii) a tasteful name, and (iii) the logic it should execute whenever timely gives it a chance to do things. +What is going on here? The heart of the mess is the dataflow operator `unary`, which is a ready-to-assemble dataflow operator with one input and one output. The `unary` operator takes three arguments (it looks like so many more!): (i) instructions about how its input data should be distributed, (ii) a tasteful name, and (iii) the logic it should execute whenever timely gives it a chance to do things. Most of what is interesting lies in the closure, so let's first tidy up some loose ends before we dive in there. There are a few ways to request how input data should be distributed and `Pipeline` is the one that says "don't move anything". The string "increment" is utterly arbitrary; this happens to be what the operator does, but you could change it to be your name, or a naughty word, or whatever you like. The `|capability|` stuff should be ignored for the moment; we'll explain in just a moment (it has to do with whether you would like the ability to send data before you receive any). The heart of the logic lies in the closure that binds `input` and `output`. These two are handles respectively to the operator's input (from which it can read records) and the operator's output (to which it can send records). -The input handle `input` has one primary method, `next`, which may return a pair consisting of a `CapabilityRef` and a batch of data. Rust really likes you to demonstrate a commitment to only looking at valid data, and our `while` loop does what is called deconstruction: we acknowledge the optional structure and only execute in the case the `Option` variant is `Some`, containing data. The `next` method could also return `None`, indicating that there is no more data available at the moment. It is strongly recommended that you take the hint and stop trying to read inputs at that point; timely gives you the courtesy of executing whatever code you want in this closure, but if you never release control back to the system you'll break things (timely employs ["cooperative multitasking"](https://en.wikipedia.org/wiki/Cooperative_multitasking)). +The input handle `input` has one primary method, `for_each_time`, which is invoked for each distinct timestamp with a `CapabilityRef` and a list of batches of data. +This method should be called each scheduling invocation, if nothing else to move the batches of data from the input to some operator-local storage. + +The output handle `output` has one primary method, `session`, which starts up an output session at the indicated time. The resulting session can be given data in various ways: (i) an element at a time with `give`, (ii) an iterator at a time with `give_iterator`, and (iii) a container at a time with `give_container`. Internally it is buffering up the output and flushing automatically when the session goes out of scope, which happens above when we go around the `while` loop. + +### Containers + +The example above has two calls to `container::>()`. +These exist because timely allows you to be flexible about how you would like to store your data in batches, perhaps in a `Vec` or perhaps using some other type. +The method signals the container type the stream should have, and mostly exists to inform Rust's type inference of your preferences when they are not otherwise available. +In this case `unary`, as well as the other generic operators, works with arbitrary containers on its inputs and outputs, and needs to prescriptive guidance. -The output handle `output` has one primary method, `session`, which starts up an output session at the indicated time. The resulting session can be given data in various ways: (i) element at a time with `give`, (ii) iterator at a time with `give_iterator`, and (iii) vector at a time with `give_content`. Internally it is buffering up the output and flushing automatically when the session goes out of scope, which happens above when we go around the `while` loop. +We use `Vec` as a container for demonstration code, but the `columnar.rs` example shows how to build an alternate container that is able to lay out the data differently, in its case using fewer and larger allocations, even for types like `String` which can own small allocations. ### Other shapes @@ -131,6 +140,7 @@ fn main() { timely::example(|scope| { (0u64..10) .to_stream(scope) + .container::>() .unary(Pipeline, "increment", |capability, info| { let mut maximum = 0; // define this here; use in the closure @@ -179,8 +189,8 @@ use timely::dataflow::channels::pact::Pipeline; fn main() { timely::example(|scope| { - let in1 = (0 .. 10).to_stream(scope); - let in2 = (0 .. 10).to_stream(scope); + let in1 = (0 .. 10).to_stream(scope).container::>(); + let in2 = (0 .. 10).to_stream(scope).container::>(); in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| { @@ -230,8 +240,8 @@ use timely::dataflow::channels::pact::Pipeline; fn main() { timely::example(|scope| { - let in1 = (0 .. 10).to_stream(scope); - let in2 = (0 .. 10).to_stream(scope); + let in1 = (0 .. 10).to_stream(scope).container::>(); + let in2 = (0 .. 10).to_stream(scope).container::>(); in1.binary_frontier(in2, Pipeline, Pipeline, "concat_buffer", |capability, info| { diff --git a/mdbook/src/chapter_2/chapter_2_5.md b/mdbook/src/chapter_2/chapter_2_5.md index a0100a2e7..2dc4eeb4c 100644 --- a/mdbook/src/chapter_2/chapter_2_5.md +++ b/mdbook/src/chapter_2/chapter_2_5.md @@ -51,6 +51,7 @@ fn main() { // build a new dataflow. worker.dataflow(|scope| { input.to_stream(scope) + .container::>() .inspect(|x| println!("seen: {:?}", x)) .probe_with(&mut probe); }); @@ -119,7 +120,7 @@ Rather than repeat all the code up above, I'm just going to show you the fragmen # extern crate timely; # # use timely::dataflow::{InputHandle, ProbeHandle}; -# use timely::dataflow::operators::{Inspect, Probe, Map}; +# use timely::dataflow::operators::{Inspect, Probe, vec::Map}; # # fn main() { # // initializes and runs a timely dataflow. @@ -173,7 +174,7 @@ As before, I'm just going to show you the new code, which now lives just after ` # extern crate timely; # # use timely::dataflow::{InputHandle, ProbeHandle}; -# use timely::dataflow::operators::{Inspect, Probe, Map}; +# use timely::dataflow::operators::{Inspect, Probe, vec::Map}; # # use std::collections::HashMap; # use timely::dataflow::channels::pact::Exchange; diff --git a/mdbook/src/chapter_4/chapter_4_1.md b/mdbook/src/chapter_4/chapter_4_1.md index 963e690a2..70248ce88 100644 --- a/mdbook/src/chapter_4/chapter_4_1.md +++ b/mdbook/src/chapter_4/chapter_4_1.md @@ -50,7 +50,7 @@ use timely::dataflow::operators::*; fn main() { timely::example(|scope| { - let stream = (0 .. 10).to_stream(scope); + let stream = (0 .. 10).to_stream(scope).container::>(); // Create a new scope with the same (u64) timestamp. let result = scope.scoped::("SubScope", |subscope| { @@ -80,7 +80,7 @@ use timely::dataflow::operators::*; fn main() { timely::example(|scope| { - let stream = (0 .. 10).to_stream(scope); + let stream = (0 .. 10).to_stream(scope).container::>(); // Create a new scope with the same (u64) timestamp. let result = scope.region(|subscope| { @@ -112,7 +112,7 @@ use timely::dataflow::operators::*; fn main() { timely::example(|scope| { - let stream = (0 .. 10).to_stream(scope); + let stream = (0 .. 10).to_stream(scope).container::>(); // Create a new scope with a (u64, u32) timestamp. let result = scope.iterative::(|subscope| { diff --git a/mdbook/src/chapter_4/chapter_4_2.md b/mdbook/src/chapter_4/chapter_4_2.md index 45230367a..36244e976 100644 --- a/mdbook/src/chapter_4/chapter_4_2.md +++ b/mdbook/src/chapter_4/chapter_4_2.md @@ -13,7 +13,8 @@ We are going to check the [Collatz conjecture](https://en.wikipedia.org/wiki/Col ```rust extern crate timely; -use timely::dataflow::operators::*; +use timely::dataflow::operators::{Feedback, ToStream, Concat, Inspect, ConnectLoop}; +use timely::dataflow::operators::vec::{Map, Filter, BranchWhen}; fn main() { timely::example(|scope| { @@ -49,7 +50,8 @@ Perhaps you are a very clever person, and you've realized that we don't need to ```rust extern crate timely; -use timely::dataflow::operators::*; +use timely::dataflow::operators::{Feedback, ToStream, Concat, Inspect, ConnectLoop}; +use timely::dataflow::operators::vec::{Map, Filter, Partition}; fn main() { timely::example(|scope| { @@ -87,7 +89,8 @@ Of course, you can do all of this in a nested scope, if that is appropriate. In ```rust extern crate timely; -use timely::dataflow::operators::*; +use timely::dataflow::operators::{ToStream, Enter, Concat, Inspect, LoopVariable, ConnectLoop}; +use timely::dataflow::operators::vec::{Map, Filter}; use timely::dataflow::Scope; fn main() { diff --git a/mdbook/src/chapter_4/chapter_4_3.md b/mdbook/src/chapter_4/chapter_4_3.md index 9fd014ac0..9315c1d2e 100644 --- a/mdbook/src/chapter_4/chapter_4_3.md +++ b/mdbook/src/chapter_4/chapter_4_3.md @@ -9,7 +9,7 @@ Let's consider a simple example, where we take an input stream of numbers, and p ```rust extern crate timely; -use timely::dataflow::operators::*; +use timely::dataflow::operators::{ToStream, vec::Map}; fn main() { timely::example(|scope| { @@ -36,7 +36,7 @@ Let's take a simple example, where we have a stream of timestamped numbers comin ```rust,no_run extern crate timely; -use timely::dataflow::operators::*; +use timely::dataflow::operators::{ToStream, vec::Map}; fn main() { timely::example(|scope| { @@ -59,7 +59,8 @@ The idea here is to take our stream of work, and to use the `delay` operator to ```rust,no_run extern crate timely; -use timely::dataflow::operators::*; +use timely::dataflow::operators::{Feedback, ToStream, Operator, ConnectLoop}; +use timely::dataflow::operators::vec::{Delay, Map, Filter}; use timely::dataflow::channels::pact::Pipeline; fn main() { diff --git a/mdbook/src/chapter_4/chapter_4_4.md b/mdbook/src/chapter_4/chapter_4_4.md index 6ba0108b5..1b2f7fdbb 100644 --- a/mdbook/src/chapter_4/chapter_4_4.md +++ b/mdbook/src/chapter_4/chapter_4_4.md @@ -52,7 +52,7 @@ One nice aspect of `capture_into` is that it really does reveal everything that At *its* core, `replay_into` takes some sequence of `Event` items and reproduces the stream, as it was recorded. It is also fairly simple, and we can just look at its implementation as well: ```rust,ignore - fn replay_into>(self, scope: &mut S) -> Stream{ + fn replay_into>(self, scope: &mut S) -> StreamVec{ let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); let (targets, stream) = builder.new_output(); @@ -130,6 +130,7 @@ fn main() { worker.dataflow::(|scope| (0..10u64) .to_stream(scope) + .container::>() .capture_into(EventWriter::new(send)) ); }).unwrap(); diff --git a/mdbook/src/chapter_4/chapter_4_5.md b/mdbook/src/chapter_4/chapter_4_5.md index 1d494fc61..2f01f5b98 100644 --- a/mdbook/src/chapter_4/chapter_4_5.md +++ b/mdbook/src/chapter_4/chapter_4_5.md @@ -1,6 +1,6 @@ -# Custom Datatypes +# Custom Datatypes and Containers -**WORK IN PROGRESS** +**THIS TEXT IS LARGELY INCORRECT AND NEEDS IMPROVEMENT** Timely dataflow allows you to use a variety of Rust types, but you may also find that you need (or would prefer) your own `struct` and `enum` types. diff --git a/timely/examples/bfs.rs b/timely/examples/bfs.rs index b57d1cf84..ed3176f60 100644 --- a/timely/examples/bfs.rs +++ b/timely/examples/bfs.rs @@ -39,7 +39,8 @@ fn main() { .map(move |i| (hasher.hash_one(&(i,index,0)) as usize % nodes, hasher.hash_one(&(i,index,1)) as usize % nodes)) .map(|(src,dst)| (src as u32, dst as u32)) - .to_stream(scope); + .to_stream(scope) + .container::>(); // define a loop variable, for the (node, worker) pairs. let (handle, stream) = scope.feedback(1usize); diff --git a/timely/examples/capture_send.rs b/timely/examples/capture_send.rs index 37a34aff0..7e26b3635 100644 --- a/timely/examples/capture_send.rs +++ b/timely/examples/capture_send.rs @@ -11,6 +11,7 @@ fn main() { worker.dataflow::(|scope| (0..10u64) .to_stream(scope) + .container::>() .capture_into(EventWriter::new(send)) ); }).unwrap(); diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs index 122a3d95f..c8c3b8400 100644 --- a/timely/examples/columnar.rs +++ b/timely/examples/columnar.rs @@ -6,7 +6,7 @@ use columnar::Index; use timely::Accountable; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; -use timely::dataflow::InputHandleCore; +use timely::dataflow::InputHandle; use timely::dataflow::operators::{InspectCore, Operator, Probe}; use timely::dataflow::ProbeHandle; @@ -32,7 +32,7 @@ fn main() { // initializes and runs a timely dataflow. timely::execute(config, |worker| { - let mut input = >>::new(); + let mut input = >>::new(); let probe = ProbeHandle::new(); // create a new input, exchange data, and inspect its output diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index a8956c253..7e2393192 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -1,5 +1,5 @@ -// use timely::dataflow::{InputHandle, ProbeHandle}; -use timely::dataflow::operators::{Input, Map, Probe}; +use timely::dataflow::operators::{Input, Probe}; +use timely::dataflow::operators::vec::Map; fn main() { // initializes and runs a timely dataflow. diff --git a/timely/examples/exchange.rs b/timely/examples/exchange.rs index 895fc4c34..19720242f 100644 --- a/timely/examples/exchange.rs +++ b/timely/examples/exchange.rs @@ -13,6 +13,7 @@ fn main() { let probe = worker.dataflow(|scope| scope .input_from(&mut input) + .container::>() .exchange(|&x| x as u64) .probe() ); diff --git a/timely/examples/flow_controlled.rs b/timely/examples/flow_controlled.rs index 77817b045..e9a8881eb 100644 --- a/timely/examples/flow_controlled.rs +++ b/timely/examples/flow_controlled.rs @@ -1,4 +1,4 @@ -use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput}; +use timely::dataflow::operators::vec::flow_controlled::{iterator_source, IteratorSourceInput}; use timely::dataflow::operators::{probe, Probe, Inspect}; fn main() { diff --git a/timely/examples/hello.rs b/timely/examples/hello.rs index 3353fed68..75abf9b26 100644 --- a/timely/examples/hello.rs +++ b/timely/examples/hello.rs @@ -12,6 +12,7 @@ fn main() { // create a new input, exchange data, and inspect its output worker.dataflow(|scope| { scope.input_from(&mut input) + .container::>() .exchange(|x| *x) .inspect(move |x| println!("worker {}:\thello {}", index, x)) .probe_with(&probe); diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index f74106da5..898c5a984 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -76,6 +76,7 @@ fn main() { worker.dataflow(|scope| { scope .input_from(&mut input) + .container::>() .exchange(|&x| x as u64) .probe_with(&probe); }); diff --git a/timely/examples/loopdemo.rs b/timely/examples/loopdemo.rs index 681d8d69e..7c1499a38 100644 --- a/timely/examples/loopdemo.rs +++ b/timely/examples/loopdemo.rs @@ -1,5 +1,6 @@ use timely::dataflow::{InputHandle, ProbeHandle}; -use timely::dataflow::operators::{Input, Feedback, Concat, Map, Filter, ConnectLoop, Probe}; +use timely::dataflow::operators::{Input, Feedback, Concat, ConnectLoop, Probe}; +use timely::dataflow::operators::vec::{Map, Filter}; fn main() { diff --git a/timely/examples/openloop.rs b/timely/examples/openloop.rs index 4174667cc..a3f374eff 100644 --- a/timely/examples/openloop.rs +++ b/timely/examples/openloop.rs @@ -1,5 +1,6 @@ use timely::dataflow::{InputHandle, ProbeHandle}; -use timely::dataflow::operators::{Input, Filter, Probe}; +use timely::dataflow::operators::{Input, Probe}; +use timely::dataflow::operators::vec::Filter; fn main() { diff --git a/timely/examples/pingpong.rs b/timely/examples/pingpong.rs index edd644d38..e674879a0 100644 --- a/timely/examples/pingpong.rs +++ b/timely/examples/pingpong.rs @@ -1,4 +1,4 @@ -use timely::dataflow::operators::*; +use timely::dataflow::operators::{ToStream, Exchange, Feedback, Concat, ConnectLoop, vec::{Map, BranchWhen}}; fn main() { diff --git a/timely/examples/rc.rs b/timely/examples/rc.rs index d481675b7..c8f273cd4 100644 --- a/timely/examples/rc.rs +++ b/timely/examples/rc.rs @@ -16,6 +16,7 @@ fn main() { let probe = ProbeHandle::new(); worker.dataflow(|scope| { scope.input_from(&mut input) + .container::>() //.exchange(|x| *x) // <-- cannot exchange this; Rc is not Send. .inspect(move |x| println!("worker {}:\thello {:?}", index, x)) .probe_with(&probe); diff --git a/timely/examples/simple.rs b/timely/examples/simple.rs index 83e11999d..34f19c718 100644 --- a/timely/examples/simple.rs +++ b/timely/examples/simple.rs @@ -1,4 +1,4 @@ -use timely::dataflow::operators::*; +use timely::dataflow::operators::{Inspect, vec::ToStream}; fn main() { timely::example(|scope| { diff --git a/timely/examples/threadless.rs b/timely/examples/threadless.rs index a25273c57..ebf8ee390 100644 --- a/timely/examples/threadless.rs +++ b/timely/examples/threadless.rs @@ -16,6 +16,7 @@ fn main() { worker.dataflow(|scope| { input .to_stream(scope) + .container::>() .inspect(|x| println!("{:?}", x)) .probe_with(&probe); }); diff --git a/timely/examples/unionfind.rs b/timely/examples/unionfind.rs index 606238775..9eb41cebb 100644 --- a/timely/examples/unionfind.rs +++ b/timely/examples/unionfind.rs @@ -53,8 +53,8 @@ trait UnionFind { fn union_find(self) -> Self; } -impl UnionFind for Stream { - fn union_find(self) -> Stream { +impl UnionFind for StreamVec { + fn union_find(self) -> StreamVec { self.unary(Pipeline, "UnionFind", |_,_| { diff --git a/timely/examples/unordered_input.rs b/timely/examples/unordered_input.rs index da469dce5..b4209bc63 100644 --- a/timely/examples/unordered_input.rs +++ b/timely/examples/unordered_input.rs @@ -1,11 +1,11 @@ -use timely::dataflow::operators::*; +use timely::dataflow::operators::{core::UnorderedInput, Inspect}; use timely::Config; fn main() { timely::execute(Config::thread(), |worker| { let (mut input, mut cap) = worker.dataflow::(|scope| { let (input, stream) = scope.new_unordered_input(); - stream.inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); + stream.container::>().inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); input }); diff --git a/timely/examples/wordcount.rs b/timely/examples/wordcount.rs index 9d6ad6018..87cc1770e 100644 --- a/timely/examples/wordcount.rs +++ b/timely/examples/wordcount.rs @@ -1,7 +1,8 @@ use std::collections::HashMap; use timely::dataflow::{InputHandle, ProbeHandle}; -use timely::dataflow::operators::{Map, Operator, Inspect, Probe}; +use timely::dataflow::operators::{Operator, Inspect, Probe}; +use timely::dataflow::operators::vec::Map; use timely::dataflow::channels::pact::Exchange; fn main() { @@ -17,13 +18,11 @@ fn main() { // create a new input, exchange data, and inspect its output worker.dataflow::(|scope| { input.to_stream(scope) - .container::>() - .flat_map(|(text, diff): (String, i64)| + .flat_map(|(text, diff): (String, i64)| text.split_whitespace() .map(move |word| (word.to_owned(), diff)) .collect::>() ) - .container::>() .unary_frontier(exchange, "WordCount", |_capability, _info| { let mut queues = HashMap::new(); diff --git a/timely/src/dataflow/mod.rs b/timely/src/dataflow/mod.rs index e2ac17e14..eeffc4106 100644 --- a/timely/src/dataflow/mod.rs +++ b/timely/src/dataflow/mod.rs @@ -9,15 +9,16 @@ //! //! timely::example(|scope| { //! (0..10).to_stream(scope) +//! .container::>() //! .inspect(|x| println!("seen: {:?}", x)); //! }); //! ``` -pub use self::stream::{Stream, StreamCore}; +pub use self::stream::{Stream, StreamVec}; pub use self::scopes::{Scope, ScopeParent}; -pub use self::operators::core::input::Handle as InputHandleCore; -pub use self::operators::input::Handle as InputHandle; +pub use self::operators::core::input::Handle as InputHandle; +pub use self::operators::vec::input::Handle as InputHandleVec; pub use self::operators::probe::Handle as ProbeHandle; pub mod operators; diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index 251f84d50..f7a2b4b82 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -404,7 +404,10 @@ impl CapabilitySet { /// use timely::dataflow::operators::CapabilitySet; /// /// timely::example(|scope| { - /// vec![()].into_iter().to_stream(scope) + /// vec![()] + /// .into_iter() + /// .to_stream(scope) + /// .container::>() /// .unary_frontier(Pipeline, "example", |default_cap, _info| { /// let mut cap = CapabilitySet::from_elem(default_cap); /// move |(input, frontier), output| { diff --git a/timely/src/dataflow/operators/core/capture/capture.rs b/timely/src/dataflow/operators/core/capture/capture.rs index b20a63005..07545ddab 100644 --- a/timely/src/dataflow/operators/core/capture/capture.rs +++ b/timely/src/dataflow/operators/core/capture/capture.rs @@ -5,7 +5,7 @@ //! and there are several default implementations, including a linked-list, Rust's MPSC //! queue, and a binary serializer wrapping any `W: Write`. -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; @@ -47,6 +47,7 @@ pub trait Capture : Sized { /// /// worker.dataflow::(|scope1| /// (0..10).to_stream(scope1) + /// .container::>() /// .capture_into(handle1) /// ); /// @@ -89,6 +90,7 @@ pub trait Capture : Sized { /// s.spawn(move || timely::example(move |scope1| { /// (0..10u64) /// .to_stream(scope1) + /// .container::>() /// .capture_into(EventWriter::new(send)) /// })); /// s.spawn(move || timely::example(move |scope2| { @@ -113,7 +115,7 @@ pub trait Capture : Sized { } } -impl Capture for StreamCore { +impl Capture for Stream { fn capture_into+'static>(self, mut event_pusher: P) { let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope()); diff --git a/timely/src/dataflow/operators/core/capture/extract.rs b/timely/src/dataflow/operators/core/capture/extract.rs index c0af57d37..e6ed7b9cc 100644 --- a/timely/src/dataflow/operators/core/capture/extract.rs +++ b/timely/src/dataflow/operators/core/capture/extract.rs @@ -35,6 +35,7 @@ pub trait Extract { /// /// worker.dataflow::(|scope1| /// (0..10).to_stream(scope1) + /// .container::>() /// .capture_into(handle1) /// ); /// diff --git a/timely/src/dataflow/operators/core/capture/mod.rs b/timely/src/dataflow/operators/core/capture/mod.rs index d92286bfa..c76033a80 100644 --- a/timely/src/dataflow/operators/core/capture/mod.rs +++ b/timely/src/dataflow/operators/core/capture/mod.rs @@ -33,6 +33,7 @@ //! //! worker.dataflow::(|scope1| //! (0..10).to_stream(scope1) +//! .container::>() //! .capture_into(handle1) //! ); //! @@ -69,6 +70,7 @@ //! worker.dataflow::(|scope1| //! (0..10u64) //! .to_stream(scope1) +//! .container::>() //! .capture_into(EventWriter::new(send)) //! ); //! diff --git a/timely/src/dataflow/operators/core/capture/replay.rs b/timely/src/dataflow/operators/core/capture/replay.rs index 7772d5ed9..1883110ea 100644 --- a/timely/src/dataflow/operators/core/capture/replay.rs +++ b/timely/src/dataflow/operators/core/capture/replay.rs @@ -38,7 +38,7 @@ //! allowing the replay to occur in a timely dataflow computation with more or fewer workers //! than that in which the stream was captured. -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::channels::pushers::Counter as PushCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; use crate::progress::Timestamp; @@ -50,16 +50,16 @@ use crate::dataflow::channels::Message; /// Replay a capture stream into a scope with the same timestamp. pub trait Replay : Sized { - /// Replays `self` into the provided scope, as a `StreamCore`. - fn replay_into>(self, scope: &mut S) -> StreamCore { + /// Replays `self` into the provided scope, as a `Stream`. + fn replay_into>(self, scope: &mut S) -> Stream { self.replay_core(scope, Some(std::time::Duration::new(0, 0))) } - /// Replays `self` into the provided scope, as a `StreamCore`. + /// Replays `self` into the provided scope, as a `Stream`. /// /// The `period` argument allows the specification of a re-activation period, where the operator /// will re-activate itself every so often. The `None` argument instructs the operator not to /// re-activate itself. - fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore; + fn replay_core>(self, scope: &mut S, period: Option) -> Stream; } impl Replay for I @@ -67,7 +67,7 @@ where I : IntoIterator, ::Item: EventIterator+'static, { - fn replay_core>(self, scope: &mut S, period: Option) -> StreamCore{ + fn replay_core>(self, scope: &mut S, period: Option) -> Stream{ let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone()); diff --git a/timely/src/dataflow/operators/core/concat.rs b/timely/src/dataflow/operators/core/concat.rs index b94db00bf..a9813de81 100644 --- a/timely/src/dataflow/operators/core/concat.rs +++ b/timely/src/dataflow/operators/core/concat.rs @@ -2,7 +2,7 @@ use crate::Container; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::{Stream, Scope}; /// Merge the contents of two streams. pub trait Concat { @@ -15,15 +15,17 @@ pub trait Concat { /// timely::example(|scope| { /// /// let stream = (0..10).to_stream(scope); - /// stream.clone().concat(stream) + /// stream.clone() + /// .concat(stream) + /// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concat(self, other: StreamCore) -> StreamCore; + fn concat(self, other: Stream) -> Stream; } -impl Concat for StreamCore { - fn concat(self, other: StreamCore) -> StreamCore { +impl Concat for Stream { + fn concat(self, other: Stream) -> Stream { self.scope().concatenate([self, other]) } } @@ -43,18 +45,19 @@ pub trait Concatenate { /// (0..10).to_stream(scope)]; /// /// scope.concatenate(streams) + /// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn concatenate(&self, sources: I) -> StreamCore + fn concatenate(&self, sources: I) -> Stream where - I: IntoIterator>; + I: IntoIterator>; } impl Concatenate for G { - fn concatenate(&self, sources: I) -> StreamCore + fn concatenate(&self, sources: I) -> Stream where - I: IntoIterator> + I: IntoIterator> { // create an operator builder. diff --git a/timely/src/dataflow/operators/core/enterleave.rs b/timely/src/dataflow/operators/core/enterleave.rs index 3a1ab0b93..c4c8f0682 100644 --- a/timely/src/dataflow/operators/core/enterleave.rs +++ b/timely/src/dataflow/operators/core/enterleave.rs @@ -10,7 +10,7 @@ //! use timely::dataflow::operators::{Enter, Leave, ToStream, Inspect}; //! //! timely::example(|outer| { -//! let stream = (0..9).to_stream(outer); +//! let stream = (0..9).to_stream(outer).container::>(); //! let output = outer.region(|inner| { //! stream.enter(inner) //! .inspect(|x| println!("in nested scope: {:?}", x)) @@ -31,7 +31,7 @@ use crate::communication::Push; use crate::dataflow::channels::pushers::{Counter, Tee}; use crate::dataflow::channels::Message; use crate::worker::AsWorker; -use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::{Stream, Scope}; use crate::dataflow::scopes::Child; /// Extension trait to move a `Stream` into a child of its current `Scope`. @@ -44,17 +44,17 @@ pub trait Enter, C> { /// use timely::dataflow::operators::{Enter, Leave, ToStream}; /// /// timely::example(|outer| { - /// let stream = (0..9).to_stream(outer); + /// let stream = (0..9).to_stream(outer).container::>(); /// let output = outer.region(|inner| { /// stream.enter(inner).leave() /// }); /// }); /// ``` - fn enter<'a>(self, _: &Child<'a, G, T>) -> StreamCore, C>; + fn enter<'a>(self, _: &Child<'a, G, T>) -> Stream, C>; } -impl, C: Container> Enter for StreamCore { - fn enter<'a>(self, scope: &Child<'a, G, T>) -> StreamCore, C> { +impl, C: Container> Enter for Stream { + fn enter<'a>(self, scope: &Child<'a, G, T>) -> Stream, C> { use crate::scheduling::Scheduler; @@ -76,7 +76,7 @@ impl, C: Container> Enter self.connect_to(input, ingress, channel_id); } - StreamCore::new( + Stream::new( Source::new(0, input.port), registrar, scope.clone(), @@ -94,17 +94,17 @@ pub trait Leave { /// use timely::dataflow::operators::{Enter, Leave, ToStream}; /// /// timely::example(|outer| { - /// let stream = (0..9).to_stream(outer); + /// let stream = (0..9).to_stream(outer).container::>(); /// let output = outer.region(|inner| { /// stream.enter(inner).leave() /// }); /// }); /// ``` - fn leave(self) -> StreamCore; + fn leave(self) -> Stream; } -impl> Leave for StreamCore, C> { - fn leave(self) -> StreamCore { +impl> Leave for Stream, C> { + fn leave(self) -> Stream { let scope = self.scope(); @@ -121,7 +121,7 @@ impl> Leave for self.connect_to(target, egress, channel_id); } - StreamCore::new( + Stream::new( output, registrar, scope.parent, @@ -243,7 +243,7 @@ mod test { fn test_nested() { use crate::dataflow::{InputHandle, ProbeHandle}; - use crate::dataflow::operators::{Input, Inspect, Probe}; + use crate::dataflow::operators::{vec::Input, Inspect, Probe}; use crate::dataflow::Scope; use crate::dataflow::operators::{Enter, Leave}; diff --git a/timely/src/dataflow/operators/core/exchange.rs b/timely/src/dataflow/operators/core/exchange.rs index cfab5f716..b980cdd86 100644 --- a/timely/src/dataflow/operators/core/exchange.rs +++ b/timely/src/dataflow/operators/core/exchange.rs @@ -4,7 +4,7 @@ use crate::Container; use crate::container::{DrainContainer, SizableContainer, PushInto}; use crate::dataflow::channels::pact::ExchangeCore; use crate::dataflow::operators::generic::operator::Operator; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; /// Exchange records between workers. pub trait Exchange { @@ -19,6 +19,7 @@ pub trait Exchange { /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .exchange(|x| *x) /// .inspect(|x| println!("seen: {:?}", x)); /// }); @@ -28,7 +29,7 @@ pub trait Exchange { for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static; } -impl Exchange for StreamCore +impl Exchange for Stream where C: Container + SizableContainer @@ -37,7 +38,7 @@ where + crate::dataflow::channels::ContainerBytes + for<'a> PushInto>, { - fn exchange(self, route: F) -> StreamCore + fn exchange(self, route: F) -> Stream where for<'a> F: FnMut(&C::Item<'a>) -> u64 + 'static, { diff --git a/timely/src/dataflow/operators/core/feedback.rs b/timely/src/dataflow/operators/core/feedback.rs index 2a20f1278..e13a22426 100644 --- a/timely/src/dataflow/operators/core/feedback.rs +++ b/timely/src/dataflow/operators/core/feedback.rs @@ -4,56 +4,60 @@ use crate::Container; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::scopes::child::Iterative; -use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::{Stream, Scope}; use crate::order::Product; use crate::progress::frontier::Antichain; use crate::progress::{Timestamp, PathSummary}; -/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. +/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. pub trait Feedback { - /// Creates a [StreamCore] and a [Handle] to later bind the source of that `StreamCore`. + /// Creates a [Stream] and a [Handle] to later bind the source of that `Stream`. /// - /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with + /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with /// its `Handle` passed as an argument. Containers passed through the stream will have their /// timestamps advanced by `summary`. /// /// # Examples /// ``` /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; + /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect}; + /// use timely::dataflow::operators::vec::BranchWhen; /// /// timely::example(|scope| { /// // circulate 0..10 for 100 iterations. /// let (handle, cycle) = scope.feedback(1); /// (0..10).to_stream(scope) + /// .container::>() /// .concat(cycle) /// .inspect(|x| println!("seen: {:?}", x)) /// .branch_when(|t| t < &100).1 /// .connect_loop(handle); /// }); /// ``` - fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore); + fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream); } -/// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. +/// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. pub trait LoopVariable<'a, G: Scope, T: Timestamp> { - /// Creates a `StreamCore` and a `Handle` to later bind the source of that `StreamCore`. + /// Creates a `Stream` and a `Handle` to later bind the source of that `Stream`. /// - /// The resulting `StreamCore` will have its data defined by a future call to `connect_loop` with + /// The resulting `Stream` will have its data defined by a future call to `connect_loop` with /// its `Handle` passed as an argument. Containers passed through the stream will have their /// timestamps advanced by `summary`. /// /// # Examples /// ``` /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; + /// use timely::dataflow::operators::{LoopVariable, ConnectLoop, ToStream, Concat, Inspect}; + /// use timely::dataflow::operators::vec::BranchWhen; /// /// timely::example(|scope| { /// // circulate 0..10 for 100 iterations. /// scope.iterative::(|inner| { /// let (handle, cycle) = inner.loop_variable(1); /// (0..10).to_stream(inner) + /// .container::>() /// .concat(cycle) /// .inspect(|x| println!("seen: {:?}", x)) /// .branch_when(|t| t.inner < 100).1 @@ -61,12 +65,12 @@ pub trait LoopVariable<'a, G: Scope, T: Timestamp> { /// }); /// }); /// ``` - fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>); + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, Stream, C>); } impl Feedback for G { - fn feedback(&mut self, summary: ::Summary) -> (Handle, StreamCore) { + fn feedback(&mut self, summary: ::Summary) -> (Handle, Stream) { let mut builder = OperatorBuilder::new("Feedback".to_owned(), self.clone()); builder.set_notify(false); @@ -77,7 +81,7 @@ impl Feedback for G { } impl<'a, G: Scope, T: Timestamp> LoopVariable<'a, G, T> for Iterative<'a, G, T> { - fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, StreamCore, C>) { + fn loop_variable(&mut self, summary: T::Summary) -> (Handle, C>, Stream, C>) { self.feedback(Product::new(Default::default(), summary)) } } @@ -89,12 +93,14 @@ pub trait ConnectLoop { /// # Examples /// ``` /// use timely::dataflow::Scope; - /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect, BranchWhen}; + /// use timely::dataflow::operators::{Feedback, ConnectLoop, ToStream, Concat, Inspect}; + /// use timely::dataflow::operators::vec::BranchWhen; /// /// timely::example(|scope| { /// // circulate 0..10 for 100 iterations. /// let (handle, cycle) = scope.feedback(1); /// (0..10).to_stream(scope) + /// .container::>() /// .concat(cycle) /// .inspect(|x| println!("seen: {:?}", x)) /// .branch_when(|t| t < &100).1 @@ -104,7 +110,7 @@ pub trait ConnectLoop { fn connect_loop(self, handle: Handle); } -impl ConnectLoop for StreamCore { +impl ConnectLoop for Stream { fn connect_loop(self, handle: Handle) { let mut builder = handle.builder; diff --git a/timely/src/dataflow/operators/core/filter.rs b/timely/src/dataflow/operators/core/filter.rs index 376cc9c09..f291fbdf0 100644 --- a/timely/src/dataflow/operators/core/filter.rs +++ b/timely/src/dataflow/operators/core/filter.rs @@ -2,7 +2,7 @@ use crate::container::{DrainContainer, SizableContainer, PushInto}; use crate::Container; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for filtering. @@ -11,8 +11,8 @@ pub trait Filter { /// /// # Examples /// ``` - /// use timely::dataflow::operators::ToStream; - /// use timely::dataflow::operators::core::{Filter, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::vec::Filter; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) @@ -23,11 +23,11 @@ pub trait Filter { fn filter)->bool+'static>(self, predicate: P) -> Self; } -impl Filter for StreamCore +impl Filter for Stream where for<'a> C: PushInto> { - fn filter)->bool+'static>(self, mut predicate: P) -> StreamCore { + fn filter)->bool+'static>(self, mut predicate: P) -> Stream { self.unary(Pipeline, "Filter", move |_,_| move |input, output| { input.for_each_time(|time, data| { output.session(&time) diff --git a/timely/src/dataflow/operators/core/input.rs b/timely/src/dataflow/operators/core/input.rs index cc558fb0e..42c080c7b 100644 --- a/timely/src/dataflow/operators/core/input.rs +++ b/timely/src/dataflow/operators/core/input.rs @@ -12,7 +12,7 @@ use crate::progress::Source; use crate::progress::operate::Connectivity; use crate::{Accountable, Container, ContainerBuilder}; use crate::communication::Push; -use crate::dataflow::{Scope, ScopeParent, StreamCore}; +use crate::dataflow::{Scope, ScopeParent, Stream}; use crate::dataflow::channels::pushers::{Tee, Counter}; use crate::dataflow::channels::Message; @@ -26,9 +26,9 @@ use crate::dataflow::channels::Message; /// Create a new `Stream` and `Handle` through which to supply input. pub trait Input : Scope { - /// Create a new [StreamCore] and [Handle] through which to supply input. + /// Create a new [Stream] and [Handle] through which to supply input. /// - /// The `new_input` method returns a pair `(Handle, StreamCore)` where the [StreamCore] can be used + /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used /// immediately for timely dataflow construction, and the `Handle` is later used to introduce /// data into the timely dataflow computation. /// @@ -39,7 +39,7 @@ pub trait Input : Scope { /// # Examples /// ``` /// use timely::*; - /// use timely::dataflow::operators::core::{Input, Inspect}; + /// use timely::dataflow::operators::{Input, Inspect}; /// /// // construct and execute a timely dataflow /// timely::execute(Config::thread(), |worker| { @@ -59,11 +59,11 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore); + fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, Stream); - /// Create a new [StreamCore] and [Handle] through which to supply input. + /// Create a new [Stream] and [Handle] through which to supply input. /// - /// The `new_input` method returns a pair `(Handle, StreamCore)` where the [StreamCore] can be used + /// The `new_input` method returns a pair `(Handle, Stream)` where the [Stream] can be used /// immediately for timely dataflow construction, and the `Handle` is later used to introduce /// data into the timely dataflow computation. /// @@ -75,7 +75,7 @@ pub trait Input : Scope { /// ``` /// use std::rc::Rc; /// use timely::*; - /// use timely::dataflow::operators::core::{Input, InspectCore}; + /// use timely::dataflow::operators::{Input, InspectCore}; /// use timely::container::CapacityContainerBuilder; /// /// // construct and execute a timely dataflow @@ -96,7 +96,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input_with_builder>(&mut self) -> (Handle<::Timestamp, CB>, StreamCore); + fn new_input_with_builder>(&mut self) -> (Handle<::Timestamp, CB>, Stream); /// Create a new stream from a supplied interactive handle. /// @@ -107,14 +107,14 @@ pub trait Input : Scope { /// # Examples /// ``` /// use timely::*; - /// use timely::dataflow::operators::core::{Input, Inspect}; - /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::InputHandle; + /// use timely::dataflow::operators::{Input, Inspect}; /// /// // construct and execute a timely dataflow /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it - /// let mut input = Handle::new(); + /// let mut input = InputHandle::new(); /// worker.dataflow(|scope| { /// scope.input_from(&mut input) /// .container::>() @@ -129,24 +129,24 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from>(&mut self, handle: &mut Handle<::Timestamp, CB>) -> StreamCore; + fn input_from>(&mut self, handle: &mut Handle<::Timestamp, CB>) -> Stream; } use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { - fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, StreamCore) { + fn new_input(&mut self) -> (Handle<::Timestamp, CapacityContainerBuilder>, Stream) { let mut handle = Handle::new(); let stream = self.input_from(&mut handle); (handle, stream) } - fn new_input_with_builder>(&mut self) -> (Handle<::Timestamp, CB>, StreamCore) { + fn new_input_with_builder>(&mut self) -> (Handle<::Timestamp, CB>, Stream) { let mut handle = Handle::new_with_builder(); let stream = self.input_from(&mut handle); (handle, stream) } - fn input_from>(&mut self, handle: &mut Handle<::Timestamp, CB>) -> StreamCore { + fn input_from>(&mut self, handle: &mut Handle<::Timestamp, CB>) -> Stream { let (output, registrar) = Tee::<::Timestamp, CB::Container>::new(); let counter = Counter::new(output); let produced = Rc::clone(counter.produced()); @@ -171,7 +171,7 @@ impl Input for G where ::Timestamp: TotalOrder { copies, }), index); - StreamCore::new(Source::new(index, 0), registrar, self.clone()) + Stream::new(Source::new(index, 0), registrar, self.clone()) } } @@ -213,7 +213,7 @@ impl Operate for Operator { } -/// A handle to an input `StreamCore`, used to introduce data to a timely dataflow computation. +/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. #[derive(Debug)] pub struct Handle> { activate: Vec, @@ -230,14 +230,14 @@ impl Handle> { /// # Examples /// ``` /// use timely::*; - /// use timely::dataflow::operators::core::{Input, Inspect}; - /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::InputHandle; + /// use timely::dataflow::operators::{Input, Inspect}; /// /// // construct and execute a timely dataflow /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it - /// let mut input = Handle::new(); + /// let mut input = InputHandle::new(); /// worker.dataflow(|scope| { /// scope.input_from(&mut input) /// .container::>() @@ -270,15 +270,15 @@ impl> Handle { /// # Examples /// ``` /// use timely::*; - /// use timely::dataflow::operators::core::{Input, Inspect}; - /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::InputHandle; + /// use timely::dataflow::operators::{Input, Inspect}; /// use timely_container::CapacityContainerBuilder; /// /// // construct and execute a timely dataflow /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it - /// let mut input = Handle::<_, CapacityContainerBuilder<_>>::new_with_builder(); + /// let mut input = InputHandle::<_, CapacityContainerBuilder<_>>::new_with_builder(); /// worker.dataflow(|scope| { /// scope.input_from(&mut input) /// .container::>() @@ -309,14 +309,14 @@ impl> Handle { /// # Examples /// ``` /// use timely::*; - /// use timely::dataflow::operators::core::{Input, Inspect}; - /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::InputHandle; + /// use timely::dataflow::operators::{Input, Inspect}; /// /// // construct and execute a timely dataflow /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it - /// let mut input = Handle::new(); + /// let mut input = InputHandle::new(); /// worker.dataflow(|scope| { /// input.to_stream(scope) /// .container::>() @@ -331,7 +331,7 @@ impl> Handle { /// } /// }); /// ``` - pub fn to_stream(&mut self, scope: &mut G) -> StreamCore + pub fn to_stream(&mut self, scope: &mut G) -> Stream where T: TotalOrder, G: Scope, @@ -410,21 +410,21 @@ impl> Handle { } } - /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch. + /// Sends a batch of records into the corresponding timely dataflow [Stream], at the current epoch. /// /// This method flushes single elements previously sent with `send`, to keep the insertion order. /// /// # Examples /// ``` /// use timely::*; - /// use timely::dataflow::operators::core::{Input, InspectCore}; - /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::InputHandle; + /// use timely::dataflow::operators::{Input, InspectCore}; /// /// // construct and execute a timely dataflow /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it - /// let mut input = Handle::new(); + /// let mut input = InputHandle::new(); /// worker.dataflow(|scope| { /// scope.input_from(&mut input) /// .inspect_container(|x| println!("hello {:?}", x)); @@ -498,14 +498,14 @@ impl> Handle { /// # Examples /// ``` /// use timely::*; - /// use timely::dataflow::operators::core::{Input, Inspect}; - /// use timely::dataflow::operators::core::input::Handle; + /// use timely::dataflow::InputHandle; + /// use timely::dataflow::operators::{Input, Inspect}; /// /// // construct and execute a timely dataflow /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it - /// let mut input = Handle::new(); + /// let mut input = InputHandle::new(); /// worker.dataflow(|scope| { /// scope.input_from(&mut input) /// .container::>() diff --git a/timely/src/dataflow/operators/core/inspect.rs b/timely/src/dataflow/operators/core/inspect.rs index a4b235455..eb98ded73 100644 --- a/timely/src/dataflow/operators/core/inspect.rs +++ b/timely/src/dataflow/operators/core/inspect.rs @@ -2,7 +2,7 @@ use crate::Container; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::operators::generic::Operator; /// Methods to inspect records and batches of records on a stream. @@ -14,10 +14,11 @@ where /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Map, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` @@ -34,10 +35,11 @@ where /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Map, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x)); /// }); /// ``` @@ -56,10 +58,11 @@ where /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Map, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len())); /// }); /// ``` @@ -78,10 +81,11 @@ where /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Map, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .inspect_core(|event| { /// match event { /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()), @@ -93,7 +97,7 @@ where fn inspect_core(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } -impl Inspect for StreamCore +impl Inspect for Stream where for<'a> &'a C: IntoIterator, { @@ -111,10 +115,11 @@ pub trait InspectCore { /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Map, InspectCore}; + /// use timely::dataflow::operators::{ToStream, InspectCore}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .inspect_container(|event| { /// match event { /// Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()), @@ -126,7 +131,7 @@ pub trait InspectCore { fn inspect_container(self, func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static; } -impl InspectCore for StreamCore { +impl InspectCore for Stream { fn inspect_container(self, mut func: F) -> Self where F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static diff --git a/timely/src/dataflow/operators/core/map.rs b/timely/src/dataflow/operators/core/map.rs index 57ab43b0a..df74965b8 100644 --- a/timely/src/dataflow/operators/core/map.rs +++ b/timely/src/dataflow/operators/core/map.rs @@ -1,8 +1,8 @@ -//! Extension methods for `StreamCore` based on record-by-record transformation. +//! Extension methods for `Stream` based on record-by-record transformation. use crate::container::{DrainContainer, SizableContainer, PushInto}; use crate::Container; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; @@ -12,17 +12,18 @@ pub trait Map : Sized { /// /// # Examples /// ``` - /// use timely::dataflow::operators::ToStream; - /// use timely::dataflow::operators::core::{Map, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::core::Map; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .map(|x| x + 1) /// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map(self, mut logic: L) -> StreamCore + fn map(self, mut logic: L) -> Stream where C2: Container + SizableContainer + PushInto, L: FnMut(C::Item<'_>)->D2 + 'static, @@ -33,17 +34,18 @@ pub trait Map : Sized { /// /// # Examples /// ``` - /// use timely::dataflow::operators::ToStream; - /// use timely::dataflow::operators::core::{Map, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::core::Map; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .flat_map(|x| (0..x)) /// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn flat_map(self, logic: L) -> StreamCore + fn flat_map(self, logic: L) -> Stream where I: IntoIterator, C2: Container + SizableContainer + PushInto, @@ -59,12 +61,14 @@ pub trait Map : Sized { /// /// # Examples /// ``` - /// use timely::dataflow::operators::{Capture, ToStream, core::Map}; + /// use timely::dataflow::operators::{Capture, ToStream}; + /// use timely::dataflow::operators::core::Map; /// use timely::dataflow::operators::capture::Extract; /// /// let data = timely::example(|scope| { /// (0..10i32) /// .to_stream(scope) + /// .container::>() /// .flat_map_builder(|x| x + 1) /// .map(|x| x + 1) /// .map(|x| x + 1) @@ -84,11 +88,11 @@ pub trait Map : Sized { } } -impl Map for StreamCore { +impl Map for Stream { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_map(self, mut logic: L) -> StreamCore + fn flat_map(self, mut logic: L) -> Stream where I: IntoIterator, C2: Container + SizableContainer + PushInto, @@ -133,7 +137,7 @@ where } } /// Convert the wrapper into a stream. - pub fn into_stream(self) -> StreamCore + pub fn into_stream(self) -> Stream where I: IntoIterator, S: Scope, @@ -152,7 +156,7 @@ mod tests { #[test] fn test_builder() { let data = crate::example(|scope| { - let stream = (0..10i32).to_stream(scope); + let stream = (0..10i32).to_stream(scope).container::>(); stream.flat_map_builder(|x| x + 1) .map(|x| x + 1) .map(|x| x + 1) diff --git a/timely/src/dataflow/operators/core/mod.rs b/timely/src/dataflow/operators/core/mod.rs index 3674fc49e..a90350cfd 100644 --- a/timely/src/dataflow/operators/core/mod.rs +++ b/timely/src/dataflow/operators/core/mod.rs @@ -1,4 +1,4 @@ -//! Extension traits for `StreamCore` implementing various operators that +//! Extension traits for `Stream` implementing various operators that //! are independent of specific container types. pub mod capture; diff --git a/timely/src/dataflow/operators/core/ok_err.rs b/timely/src/dataflow/operators/core/ok_err.rs index 0b54d20dc..7e7d983a9 100644 --- a/timely/src/dataflow/operators/core/ok_err.rs +++ b/timely/src/dataflow/operators/core/ok_err.rs @@ -5,7 +5,7 @@ use crate::container::{DrainContainer, SizableContainer, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OutputBuilder; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; /// Extension trait for `Stream`. pub trait OkErr { @@ -17,12 +17,13 @@ pub trait OkErr { /// /// # Examples /// ``` - /// use timely::dataflow::operators::ToStream; - /// use timely::dataflow::operators::core::{OkErr, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::core::OkErr; /// /// timely::example(|scope| { /// let (odd, even) = (0..10) /// .to_stream(scope) + /// .container::>() /// .ok_err(|x| if x % 2 == 0 { Ok(x) } else { Err(x) }); /// /// even.container::>().inspect(|x| println!("even: {:?}", x)); @@ -32,7 +33,7 @@ pub trait OkErr { fn ok_err( self, logic: L, - ) -> (StreamCore, StreamCore) + ) -> (Stream, Stream) where C1: Container + SizableContainer + PushInto, C2: Container + SizableContainer + PushInto, @@ -40,11 +41,11 @@ pub trait OkErr { ; } -impl OkErr for StreamCore { +impl OkErr for Stream { fn ok_err( self, mut logic: L, - ) -> (StreamCore, StreamCore) + ) -> (Stream, Stream) where C1: Container + SizableContainer + PushInto, C2: Container + SizableContainer + PushInto, diff --git a/timely/src/dataflow/operators/core/partition.rs b/timely/src/dataflow/operators/core/partition.rs index 9d6f32aee..b2ac992e7 100644 --- a/timely/src/dataflow/operators/core/partition.rs +++ b/timely/src/dataflow/operators/core/partition.rs @@ -4,7 +4,7 @@ use std::collections::BTreeMap; use crate::container::{DrainContainer, PushInto}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::{Container, ContainerBuilder}; /// Partition a stream of records into multiple streams. @@ -13,12 +13,13 @@ pub trait Partition { /// /// # Examples /// ``` - /// use timely::dataflow::operators::ToStream; - /// use timely::dataflow::operators::core::{Partition, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::core::Partition; /// use timely_container::CapacityContainerBuilder; /// /// timely::example(|scope| { /// let streams = (0..10).to_stream(scope) + /// .container::>() /// .partition::>, _, _>(3, |x| (x % 3, x)); /// /// for (idx, stream) in streams.into_iter().enumerate() { @@ -27,14 +28,14 @@ pub trait Partition { /// } /// }); /// ``` - fn partition(self, parts: u64, route: F) -> Vec> + fn partition(self, parts: u64, route: F) -> Vec> where CB: ContainerBuilder + PushInto, F: FnMut(C::Item<'_>) -> (u64, D2) + 'static; } -impl Partition for StreamCore { - fn partition(self, parts: u64, mut route: F) -> Vec> +impl Partition for Stream { + fn partition(self, parts: u64, mut route: F) -> Vec> where CB: ContainerBuilder + PushInto, F: FnMut(C::Item<'_>) -> (u64, D2) + 'static, diff --git a/timely/src/dataflow/operators/core/probe.rs b/timely/src/dataflow/operators/core/probe.rs index 7625e390c..899e7b085 100644 --- a/timely/src/dataflow/operators/core/probe.rs +++ b/timely/src/dataflow/operators/core/probe.rs @@ -11,7 +11,7 @@ use crate::dataflow::channels::pullers::Counter as PullCounter; use crate::dataflow::operators::generic::builder_raw::OperatorBuilder; -use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::{Stream, Scope}; use crate::Container; use crate::dataflow::channels::Message; @@ -30,7 +30,7 @@ pub trait Probe { /// /// // add an input and base computation off of it /// let (mut input, probe) = worker.dataflow(|scope| { - /// let (input, stream) = scope.new_input(); + /// let (input, stream) = scope.new_input::>(); /// let probe = stream.inspect(|x| println!("hello {:?}", x)) /// .probe(); /// (input, probe) @@ -61,7 +61,7 @@ pub trait Probe { /// // add an input and base computation off of it /// let mut probe = Handle::new(); /// let mut input = worker.dataflow(|scope| { - /// let (input, stream) = scope.new_input(); + /// let (input, stream) = scope.new_input::>(); /// stream.probe_with(&mut probe) /// .inspect(|x| println!("hello {:?}", x)); /// @@ -76,10 +76,10 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe_with(self, handle: &Handle) -> StreamCore; + fn probe_with(self, handle: &Handle) -> Stream; } -impl Probe for StreamCore { +impl Probe for Stream { fn probe(self) -> Handle { // the frontier is shared state; scope updates, handle reads. @@ -87,7 +87,7 @@ impl Probe for StreamCore { self.probe_with(&handle); handle } - fn probe_with(self, handle: &Handle) -> StreamCore { + fn probe_with(self, handle: &Handle) -> Stream { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); @@ -196,7 +196,7 @@ mod tests { // create a new input, and inspect its output let (mut input, probe) = worker.dataflow(move |scope| { - let (input, stream) = scope.new_input::(); + let (input, stream) = scope.new_input::>(); (input, stream.probe()) }); diff --git a/timely/src/dataflow/operators/core/rc.rs b/timely/src/dataflow/operators/core/rc.rs index 11174b1a9..28818088d 100644 --- a/timely/src/dataflow/operators/core/rc.rs +++ b/timely/src/dataflow/operators/core/rc.rs @@ -2,7 +2,7 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::Operator; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::Container; use std::rc::Rc; @@ -17,15 +17,16 @@ pub trait SharedStream { /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .shared() /// .inspect_container(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn shared(self) -> StreamCore>; + fn shared(self) -> Stream>; } -impl SharedStream for StreamCore { - fn shared(self) -> StreamCore> { +impl SharedStream for Stream { + fn shared(self) -> Stream> { self.unary(Pipeline, "Shared", move |_, _| { move |input, output| { input.for_each_time(|time, data| { diff --git a/timely/src/dataflow/operators/core/reclock.rs b/timely/src/dataflow/operators/core/reclock.rs index 9dca4dc59..5e1747085 100644 --- a/timely/src/dataflow/operators/core/reclock.rs +++ b/timely/src/dataflow/operators/core/reclock.rs @@ -2,7 +2,7 @@ use crate::Container; use crate::order::PartialOrder; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; @@ -19,7 +19,8 @@ pub trait Reclock { /// # Examples /// /// ``` - /// use timely::dataflow::operators::{ToStream, Delay, Map, Reclock, Capture}; + /// use timely::dataflow::operators::{ToStream, Reclock, Capture}; + /// use timely::dataflow::operators::vec::{Delay, Map}; /// use timely::dataflow::operators::capture::Extract; /// /// let captured = timely::example(|scope| { @@ -45,11 +46,11 @@ pub trait Reclock { /// assert_eq!(extracted[1], (5, vec![4,5])); /// assert_eq!(extracted[2], (8, vec![6,7,8])); /// ``` - fn reclock(self, clock: StreamCore) -> Self; + fn reclock(self, clock: Stream) -> Self; } -impl Reclock for StreamCore { - fn reclock(self, clock: StreamCore) -> StreamCore { +impl Reclock for Stream { + fn reclock(self, clock: Stream) -> Stream { let mut stash = vec![]; diff --git a/timely/src/dataflow/operators/core/to_stream.rs b/timely/src/dataflow/operators/core/to_stream.rs index 82b03e1dc..139f8a540 100644 --- a/timely/src/dataflow/operators/core/to_stream.rs +++ b/timely/src/dataflow/operators/core/to_stream.rs @@ -1,13 +1,13 @@ -//! Conversion to the `StreamCore` type from iterators. +//! Conversion to the `Stream` type from iterators. use crate::container::{CapacityContainerBuilder, SizableContainer, PushInto}; use crate::{Container, ContainerBuilder}; use crate::dataflow::operators::generic::operator::source; -use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::{Stream, Scope}; -/// Converts to a timely [StreamCore], using a container builder. +/// Converts to a timely [Stream], using a container builder. pub trait ToStreamBuilder { - /// Converts to a timely [StreamCore], using the supplied container builder type. + /// Converts to a timely [Stream], using the supplied container builder type. /// /// # Examples /// @@ -28,11 +28,11 @@ pub trait ToStreamBuilder { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream_with_builder(self, scope: &mut S) -> StreamCore; + fn to_stream_with_builder(self, scope: &mut S) -> Stream; } impl ToStreamBuilder for I where CB: PushInto { - fn to_stream_with_builder(self, scope: &mut S) -> StreamCore { + fn to_stream_with_builder(self, scope: &mut S) -> Stream { source::<_, CB, _, _>(scope, "ToStreamBuilder", |capability, info| { @@ -59,10 +59,10 @@ impl ToStreamBuilder for I wh } } -/// Converts to a timely [StreamCore]. Equivalent to [`ToStreamBuilder`] but +/// Converts to a timely [Stream]. Equivalent to [`ToStreamBuilder`] but /// uses a [`CapacityContainerBuilder`]. pub trait ToStream { - /// Converts to a timely [StreamCore]. + /// Converts to a timely [Stream]. /// /// # Examples /// @@ -78,11 +78,11 @@ pub trait ToStream { /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream(self, scope: &mut S) -> StreamCore; + fn to_stream(self, scope: &mut S) -> Stream; } impl ToStream for I where C: PushInto { - fn to_stream(self, scope: &mut S) -> StreamCore { + fn to_stream(self, scope: &mut S) -> Stream { ToStreamBuilder::>::to_stream_with_builder(self, scope) } } diff --git a/timely/src/dataflow/operators/core/unordered_input.rs b/timely/src/dataflow/operators/core/unordered_input.rs index ac4f40f97..c7ab1d899 100644 --- a/timely/src/dataflow/operators/core/unordered_input.rs +++ b/timely/src/dataflow/operators/core/unordered_input.rs @@ -1,4 +1,4 @@ -//! Create new `StreamCore`s connected to external inputs. +//! Create new `Stream`s connected to external inputs. use std::rc::Rc; use std::cell::RefCell; @@ -14,16 +14,16 @@ use crate::progress::operate::Connectivity; use crate::dataflow::channels::pushers::{Counter, Output, Tee}; use crate::dataflow::operators::generic::{OutputBuilder, OutputBuilderSession}; use crate::dataflow::operators::{ActivateCapability, Capability}; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::scheduling::Activations; /// Create a new `Stream` and `Handle` through which to supply input. pub trait UnorderedInput { - /// Create a new capability-based [StreamCore] and [UnorderedHandle] through which to supply input. This + /// Create a new capability-based [Stream] and [UnorderedHandle] through which to supply input. This /// input supports multiple open epochs (timestamps) at the same time. /// - /// The `new_unordered_input_core` method returns `((HandleCore, Capability), StreamCore)` where the `StreamCore` can be used + /// The `new_unordered_input_core` method returns `((HandleCore, Capability), Stream)` where the `Stream` can be used /// immediately for timely dataflow construction, `HandleCore` and `Capability` are later used to introduce /// data into the timely dataflow computation. /// @@ -42,7 +42,6 @@ pub trait UnorderedInput { /// use timely::*; /// use timely::dataflow::operators::{capture::Extract, Capture}; /// use timely::dataflow::operators::core::{UnorderedInput}; - /// use timely::dataflow::Stream; /// /// // get send and recv endpoints, wrap send to share /// let (send, recv) = ::std::sync::mpsc::channel(); @@ -75,11 +74,11 @@ pub trait UnorderedInput { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore); + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream); } impl UnorderedInput for G { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamCore) { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { let (output, registrar) = Tee::::new(); let internal = Rc::new(RefCell::new(ChangeBatch::new())); @@ -106,7 +105,7 @@ impl UnorderedInput for G { peers, }), index); - ((helper, cap), StreamCore::new(Source::new(index, 0), registrar, self.clone())) + ((helper, cap), Stream::new(Source::new(index, 0), registrar, self.clone())) } } @@ -145,7 +144,7 @@ impl Operate for UnorderedOperator { fn notify_me(&self) -> bool { false } } -/// A handle to an input [StreamCore], used to introduce data to a timely dataflow computation. +/// A handle to an input [Stream], used to introduce data to a timely dataflow computation. pub struct UnorderedHandle { output: OutputBuilder, address: Rc<[usize]>, diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index b23206046..e393af227 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -14,7 +14,7 @@ use crate::progress::{Source, Target}; use crate::progress::{Timestamp, Operate, operate::SharedProgress, Antichain}; use crate::progress::operate::{Connectivity, PortConnectivity}; use crate::Container; -use crate::dataflow::{StreamCore, Scope}; +use crate::dataflow::{Stream, Scope}; use crate::dataflow::channels::pushers::Tee; use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::operators::generic::operator_info::OperatorInfo; @@ -104,7 +104,7 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: StreamCore, pact: P) -> P::Puller + pub fn new_input(&mut self, stream: Stream, pact: P) -> P::Puller where P: ParallelizationContract { @@ -113,7 +113,7 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input_connection(&mut self, stream: StreamCore, pact: P, connection: I) -> P::Puller + pub fn new_input_connection(&mut self, stream: Stream, pact: P, connection: I) -> P::Puller where P: ParallelizationContract, I: IntoIterator::Summary>)>, @@ -133,14 +133,14 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (Tee, StreamCore) { + pub fn new_output(&mut self) -> (Tee, Stream) { let connection = (0 .. self.shape.inputs).map(|i| (i, Antichain::from_elem(Default::default()))); self.new_output_connection(connection) } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output_connection(&mut self, connection: I) -> (Tee, StreamCore) + pub fn new_output_connection(&mut self, connection: I) -> (Tee, Stream) where I: IntoIterator::Summary>)>, { @@ -148,7 +148,7 @@ impl OperatorBuilder { self.shape.outputs += 1; let (target, registrar) = Tee::new(); let source = Source::new(self.index, new_output); - let stream = StreamCore::new(source, registrar, self.scope.clone()); + let stream = Stream::new(source, registrar, self.scope.clone()); for (input, entry) in connection { self.summary[input].add_port(new_output, entry); diff --git a/timely/src/dataflow/operators/generic/builder_rc.rs b/timely/src/dataflow/operators/generic/builder_rc.rs index f98f9bf6d..e89edc8af 100644 --- a/timely/src/dataflow/operators/generic/builder_rc.rs +++ b/timely/src/dataflow/operators/generic/builder_rc.rs @@ -9,7 +9,7 @@ use crate::progress::operate::SharedProgress; use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::Container; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use crate::dataflow::channels::pushers::Counter as PushCounter; use crate::dataflow::channels::pushers; use crate::dataflow::channels::pact::ParallelizationContract; @@ -54,7 +54,7 @@ impl OperatorBuilder { } /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use. - pub fn new_input(&mut self, stream: StreamCore, pact: P) -> InputHandleCore + pub fn new_input(&mut self, stream: Stream, pact: P) -> InputHandleCore where P: ParallelizationContract { @@ -70,7 +70,7 @@ impl OperatorBuilder { /// /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty /// antichain indicating that there is no connection from the input to the output. - pub fn new_input_connection(&mut self, stream: StreamCore, pact: P, connection: I) -> InputHandleCore + pub fn new_input_connection(&mut self, stream: Stream, pact: P, connection: I) -> InputHandleCore where P: ParallelizationContract, I: IntoIterator::Summary>)> + Clone, @@ -88,7 +88,7 @@ impl OperatorBuilder { } /// Adds a new output to a generic operator builder, returning the `Push` implementor to use. - pub fn new_output(&mut self) -> (pushers::Output, StreamCore) { + pub fn new_output(&mut self) -> (pushers::Output, Stream) { let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default()))); self.new_output_connection(connection) } @@ -103,7 +103,7 @@ impl OperatorBuilder { /// antichain indicating that there is no connection from the input to the output. pub fn new_output_connection(&mut self, connection: I) -> ( pushers::Output, - StreamCore, + Stream, ) where I: IntoIterator::Summary>)> + Clone, diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index e0e13b59e..e62fee677 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -54,6 +54,7 @@ impl<'a, T: Timestamp> Notificator<'a, T> { /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .unary_notify(Pipeline, "example", Some(0), |input, output, notificator| { /// input.for_each_time(|cap, data| { /// output.session(&cap).give_containers(data); @@ -186,8 +187,8 @@ fn notificator_delivers_notifications_in_topo_order() { /// /// timely::execute(timely::Config::thread(), |worker| { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { -/// let (in1_handle, in1) = scope.new_input(); -/// let (in2_handle, in2) = scope.new_input(); +/// let (in1_handle, in1) = scope.new_input::>(); +/// let (in2_handle, in2) = scope.new_input::>(); /// in1.binary_frontier(in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { /// let mut notificator = FrontierNotificator::default(); /// let mut stash = HashMap::new(); @@ -261,6 +262,7 @@ impl FrontierNotificator { /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .unary_frontier(Pipeline, "example", |_, _| { /// let mut notificator = FrontierNotificator::default(); /// move |(input, frontier), output| { @@ -391,6 +393,7 @@ impl FrontierNotificator { /// /// timely::example(|scope| { /// (0..10).to_stream(scope) + /// .container::>() /// .unary_frontier(Pipeline, "example", |_, _| { /// let mut notificator = FrontierNotificator::default(); /// move |(input, frontier), output| { diff --git a/timely/src/dataflow/operators/generic/operator.rs b/timely/src/dataflow/operators/generic/operator.rs index b95cf48eb..9a4319437 100644 --- a/timely/src/dataflow/operators/generic/operator.rs +++ b/timely/src/dataflow/operators/generic/operator.rs @@ -7,7 +7,7 @@ use crate::dataflow::channels::pact::ParallelizationContract; use crate::dataflow::operators::generic::handles::{InputHandleCore, OutputBuilderSession, OutputBuilder}; use crate::dataflow::operators::capability::Capability; -use crate::dataflow::{Scope, StreamCore}; +use crate::dataflow::{Scope, Stream}; use super::builder_rc::OperatorBuilder; use crate::dataflow::operators::generic::OperatorInfo; @@ -29,7 +29,9 @@ pub trait Operator { /// use timely::dataflow::channels::pact::Pipeline; /// /// timely::example(|scope| { - /// (0u64..10).to_stream(scope) + /// (0u64..10) + /// .to_stream(scope) + /// .container::>() /// .unary_frontier(Pipeline, "example", |default_cap, _info| { /// let mut cap = Some(default_cap.delayed(&12)); /// let mut notificator = FrontierNotificator::default(); @@ -53,7 +55,7 @@ pub trait Operator { /// .container::>(); /// }); /// ``` - fn unary_frontier(self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary_frontier(self, pact: P, name: &str, constructor: B) -> Stream where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, @@ -75,6 +77,7 @@ pub trait Operator { /// timely::example(|scope| { /// (0u64..10) /// .to_stream(scope) + /// .container::>() /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| { /// input.for_each_time(|time, data| { /// output.session(&time).give_containers(data); @@ -91,7 +94,7 @@ pub trait Operator { &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P: ParallelizationContract> - (self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; + (self, pact: P, name: &str, init: impl IntoIterator, logic: L) -> Stream; /// Creates a new dataflow operator that partitions its input stream by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -105,7 +108,9 @@ pub trait Operator { /// use timely::dataflow::Scope; /// /// timely::example(|scope| { - /// (0u64..10).to_stream(scope) + /// (0u64..10) + /// .to_stream(scope) + /// .container::>() /// .unary(Pipeline, "example", |default_cap, _info| { /// let mut cap = Some(default_cap.delayed(&12)); /// move |input, output| { @@ -119,7 +124,7 @@ pub trait Operator { /// }); /// }); /// ``` - fn unary(self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary(self, pact: P, name: &str, constructor: B) -> Stream where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, @@ -140,8 +145,8 @@ pub trait Operator { /// /// timely::execute(timely::Config::thread(), |worker| { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { - /// let (in1_handle, in1) = scope.new_input(); - /// let (in2_handle, in2) = scope.new_input(); + /// let (in1_handle, in1) = scope.new_input::>(); + /// let (in2_handle, in2) = scope.new_input::>(); /// in1.binary_frontier(in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { /// let mut notificator = FrontierNotificator::default(); /// let mut stash = HashMap::new(); @@ -175,7 +180,7 @@ pub trait Operator { /// } /// }).unwrap(); /// ``` - fn binary_frontier(self, other: StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary_frontier(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where C2: Container, CB: ContainerBuilder, @@ -199,8 +204,8 @@ pub trait Operator { /// /// timely::execute(timely::Config::thread(), |worker| { /// let (mut in1, mut in2) = worker.dataflow::(|scope| { - /// let (in1_handle, in1) = scope.new_input(); - /// let (in2_handle, in2) = scope.new_input(); + /// let (in1_handle, in1) = scope.new_input::>(); + /// let (in2_handle, in2) = scope.new_input::>(); /// /// in1.binary_notify(in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| { /// input1.for_each_time(|time, data| { @@ -235,7 +240,7 @@ pub trait Operator { &mut Notificator)+'static, P1: ParallelizationContract, P2: ParallelizationContract> - (self, other: StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> StreamCore; + (self, other: Stream, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, logic: L) -> Stream; /// Creates a new dataflow operator that partitions its input streams by a parallelization /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`. @@ -249,8 +254,9 @@ pub trait Operator { /// use timely::dataflow::Scope; /// /// timely::example(|scope| { - /// let stream2 = (0u64..10).to_stream(scope); - /// (0u64..10).to_stream(scope) + /// let stream1 = (0u64..10).to_stream(scope).container::>(); + /// let stream2 = (0u64..10).to_stream(scope).container::>(); + /// stream1 /// .binary(stream2, Pipeline, Pipeline, "example", |default_cap, _info| { /// let mut cap = Some(default_cap.delayed(&12)); /// move |input1, input2, output| { @@ -263,7 +269,7 @@ pub trait Operator { /// }).inspect(|x| println!("{:?}", x)); /// }); /// ``` - fn binary(self, other: StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where C2: Container, CB: ContainerBuilder, @@ -288,6 +294,7 @@ pub trait Operator { /// timely::example(|scope| { /// (0u64..10) /// .to_stream(scope) + /// .container::>() /// .sink(Pipeline, "example", |(input, frontier)| { /// input.for_each_time(|time, data| { /// for datum in data.flatten() { @@ -303,9 +310,9 @@ pub trait Operator { P: ParallelizationContract; } -impl Operator for StreamCore { +impl Operator for Stream { - fn unary_frontier(self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary_frontier(self, pact: P, name: &str, constructor: B) -> Stream where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, @@ -338,7 +345,7 @@ impl Operator for StreamCore { &mut OutputBuilderSession<'_, G::Timestamp, CB>, &mut Notificator)+'static, P: ParallelizationContract> - (self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { + (self, pact: P, name: &str, init: impl IntoIterator, mut logic: L) -> Stream { self.unary_frontier(pact, name, move |capability, _info| { let mut notificator = FrontierNotificator::default(); @@ -354,7 +361,7 @@ impl Operator for StreamCore { }) } - fn unary(self, pact: P, name: &str, constructor: B) -> StreamCore + fn unary(self, pact: P, name: &str, constructor: B) -> Stream where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, @@ -380,7 +387,7 @@ impl Operator for StreamCore { stream } - fn binary_frontier(self, other: StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary_frontier(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where C2: Container, CB: ContainerBuilder, @@ -420,7 +427,7 @@ impl Operator for StreamCore { &mut Notificator)+'static, P1: ParallelizationContract, P2: ParallelizationContract> - (self, other: StreamCore, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> StreamCore { + (self, other: Stream, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator, mut logic: L) -> Stream { self.binary_frontier(other, pact1, pact2, name, |capability, _info| { let mut notificator = FrontierNotificator::default(); @@ -438,7 +445,7 @@ impl Operator for StreamCore { } - fn binary(self, other: StreamCore, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore + fn binary(self, other: Stream, pact1: P1, pact2: P2, name: &str, constructor: B) -> Stream where C2: Container, CB: ContainerBuilder, @@ -529,7 +536,7 @@ impl Operator for StreamCore { /// .inspect(|x| println!("number: {:?}", x)); /// }); /// ``` -pub fn source(scope: &G, name: &str, constructor: B) -> StreamCore +pub fn source(scope: &G, name: &str, constructor: B) -> Stream where CB: ContainerBuilder, B: FnOnce(Capability, OperatorInfo) -> L, @@ -574,7 +581,7 @@ where /// /// }); /// ``` -pub fn empty(scope: &G) -> StreamCore { +pub fn empty(scope: &G) -> Stream { source::<_, CapacityContainerBuilder, _, _>(scope, "Empty", |_capability, _info| |_output| { // drop capability, do nothing }) diff --git a/timely/src/dataflow/operators/mod.rs b/timely/src/dataflow/operators/mod.rs index f6d212a58..fb0ab9c7d 100644 --- a/timely/src/dataflow/operators/mod.rs +++ b/timely/src/dataflow/operators/mod.rs @@ -1,61 +1,42 @@ -//! Extension traits for `Stream` implementing various operators. +//! Extension traits for `Stream` and `StreamVec` implementing various operators. //! -//! A collection of functions taking typed `Stream` objects as input and producing new `Stream` +//! A collection of functions taking typed stream objects as input and producing new stream //! objects as output. Many of the operators provide simple, composable functionality. Some of the //! operators are more complicated, for use with advanced timely dataflow features. //! +//! The [`core`](core) module defines operators that work for streams of general containers. +//! The [`vec`](vec) module defines operators that work for streams of vector containers. +//! //! The [`Operator`](generic::operator) trait provides general //! operators whose behavior can be supplied using closures accepting input and output handles. //! Most of the operators in this module are defined using these two general operators. -pub use self::input::Input; -pub use self::unordered_input::UnorderedInput; -pub use self::partition::Partition; -pub use self::map::Map; pub use self::inspect::{Inspect, InspectCore}; -pub use self::filter::Filter; -pub use self::delay::Delay; pub use self::exchange::Exchange; -pub use self::broadcast::Broadcast; -pub use self::branch::{Branch, BranchWhen}; -pub use self::result::ResultStream; -pub use self::to_stream::ToStream; - pub use self::generic::Operator; pub use self::generic::{Notificator, FrontierNotificator}; pub use self::reclock::Reclock; -pub use self::count::Accumulate; pub mod core; +pub mod vec; pub use self::core::enterleave::{self, Enter, Leave}; -pub mod input; -pub mod flow_controlled; -pub mod unordered_input; pub use self::core::feedback::{self, Feedback, LoopVariable, ConnectLoop}; pub use self::core::concat::{self, Concat, Concatenate}; -pub mod partition; -pub mod map; pub use self::core::inspect; -pub mod filter; -pub mod delay; pub use self::core::exchange; -pub mod broadcast; pub use self::core::probe::{self, Probe}; -pub mod to_stream; pub use self::core::capture::{self, Capture}; -pub mod branch; pub use self::core::ok_err::{self, OkErr}; pub use self::core::rc; -pub mod result; +pub use self::core::to_stream::ToStream; +pub use self::core::input::Input; -pub mod aggregation; pub mod generic; pub use self::core::reclock; -pub mod count; // keep "mint" module-private mod capability; diff --git a/timely/src/dataflow/operators/aggregation/aggregate.rs b/timely/src/dataflow/operators/vec/aggregation/aggregate.rs similarity index 86% rename from timely/src/dataflow/operators/aggregation/aggregate.rs rename to timely/src/dataflow/operators/vec/aggregation/aggregate.rs index 27bd594ae..bac0a8905 100644 --- a/timely/src/dataflow/operators/aggregation/aggregate.rs +++ b/timely/src/dataflow/operators/vec/aggregation/aggregate.rs @@ -3,7 +3,7 @@ use std::hash::Hash; use std::collections::HashMap; use crate::ExchangeData; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamVec, Scope}; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::channels::pact::Exchange; @@ -24,8 +24,8 @@ pub trait Aggregate { /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Map, Inspect}; - /// use timely::dataflow::operators::aggregation::Aggregate; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::vec::{Map, aggregation::Aggregate}; /// /// timely::example(|scope| { /// @@ -45,12 +45,13 @@ pub trait Aggregate { /// obviously do more efficiently; imagine we were doing a hash instead). /// /// ``` - /// use timely::dataflow::operators::{ToStream, Map, Inspect}; - /// use timely::dataflow::operators::aggregation::Aggregate; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::vec::{Map, aggregation::Aggregate}; /// /// timely::example(|scope| { /// - /// (0..10).to_stream(scope) + /// (0..10) + /// .to_stream(scope) /// .map(|x| (x % 2, x)) /// .aggregate::<_,Vec,_,_,_>( /// |_key, val, agg| { agg.push(val); }, @@ -64,16 +65,16 @@ pub trait Aggregate { self, fold: F, emit: E, - hash: H) -> Stream where S::Timestamp: Eq; + hash: H) -> StreamVec where S::Timestamp: Eq; } -impl, K: ExchangeData+Clone+Hash+Eq, V: ExchangeData> Aggregate for Stream { +impl, K: ExchangeData+Clone+Hash+Eq, V: ExchangeData> Aggregate for StreamVec { fn aggregateR+'static, H: Fn(&K)->u64+'static>( self, fold: F, emit: E, - hash: H) -> Stream where S::Timestamp: Eq { + hash: H) -> StreamVec where S::Timestamp: Eq { let mut aggregates = HashMap::new(); self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| { diff --git a/timely/src/dataflow/operators/aggregation/mod.rs b/timely/src/dataflow/operators/vec/aggregation/mod.rs similarity index 100% rename from timely/src/dataflow/operators/aggregation/mod.rs rename to timely/src/dataflow/operators/vec/aggregation/mod.rs diff --git a/timely/src/dataflow/operators/aggregation/state_machine.rs b/timely/src/dataflow/operators/vec/aggregation/state_machine.rs similarity index 92% rename from timely/src/dataflow/operators/aggregation/state_machine.rs rename to timely/src/dataflow/operators/vec/aggregation/state_machine.rs index 77c117409..f1f2a669f 100644 --- a/timely/src/dataflow/operators/aggregation/state_machine.rs +++ b/timely/src/dataflow/operators/vec/aggregation/state_machine.rs @@ -3,7 +3,7 @@ use std::hash::Hash; use std::collections::HashMap; use crate::ExchangeData; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamVec, Scope}; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::channels::pact::Exchange; @@ -26,8 +26,8 @@ pub trait StateMachine { /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Map, Inspect}; - /// use timely::dataflow::operators::aggregation::StateMachine; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::vec::{Map, aggregation::StateMachine}; /// /// timely::example(|scope| { /// @@ -51,17 +51,17 @@ pub trait StateMachine { I: IntoIterator, // type of output iterator F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic H: Fn(&K)->u64+'static, // "hash" function for keys - >(self, fold: F, hash: H) -> Stream where S::Timestamp : Hash+Eq ; + >(self, fold: F, hash: H) -> StreamVec where S::Timestamp : Hash+Eq ; } -impl StateMachine for Stream { +impl StateMachine for StreamVec { fn state_machine< R: 'static, // output type D: Default+'static, // per-key state (data) I: IntoIterator, // type of output iterator F: Fn(&K, V, &mut D)->(bool, I)+'static, // state update logic H: Fn(&K)->u64+'static, // "hash" function for keys - >(self, fold: F, hash: H) -> Stream where S::Timestamp : Hash+Eq { + >(self, fold: F, hash: H) -> StreamVec where S::Timestamp : Hash+Eq { let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); // times -> (keys -> state) let mut states = HashMap::new(); // keys -> state diff --git a/timely/src/dataflow/operators/branch.rs b/timely/src/dataflow/operators/vec/branch.rs similarity index 90% rename from timely/src/dataflow/operators/branch.rs rename to timely/src/dataflow/operators/vec/branch.rs index 9b618ee07..62bb427b3 100644 --- a/timely/src/dataflow/operators/branch.rs +++ b/timely/src/dataflow/operators/vec/branch.rs @@ -3,10 +3,10 @@ use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::OutputBuilder; use crate::dataflow::operators::generic::builder_rc::OperatorBuilder; -use crate::dataflow::{Scope, Stream, StreamCore}; +use crate::dataflow::{Scope, StreamVec, Stream}; use crate::Container; -/// Extension trait for `Stream`. +/// Extension trait for `StreamVec`. pub trait Branch { /// Takes one input stream and splits it into two output streams. /// For each record, the supplied closure is called with a reference to @@ -18,7 +18,7 @@ pub trait Branch { /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Branch, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::Branch}; /// /// timely::example(|scope| { /// let (odd, even) = (0..10) @@ -32,14 +32,14 @@ pub trait Branch { fn branch( self, condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, - ) -> (Stream, Stream); + ) -> (StreamVec, StreamVec); } -impl Branch for Stream { +impl Branch for StreamVec { fn branch( self, condition: impl Fn(&S::Timestamp, &D) -> bool + 'static, - ) -> (Stream, Stream) { + ) -> (StreamVec, StreamVec) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); builder.set_notify(false); @@ -82,11 +82,13 @@ pub trait BranchWhen: Sized { /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, BranchWhen, Inspect, Delay}; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::vec::{BranchWhen, Delay}; /// /// timely::example(|scope| { /// let (before_five, after_five) = (0..10) /// .to_stream(scope) + /// .container::>() /// .delay(|x,t| *x) // data 0..10 at time 0..10 /// .branch_when(|time| time >= &5); /// @@ -97,7 +99,7 @@ pub trait BranchWhen: Sized { fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self); } -impl BranchWhen for StreamCore { +impl BranchWhen for Stream { fn branch_when(self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) { let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope()); builder.set_notify(false); diff --git a/timely/src/dataflow/operators/broadcast.rs b/timely/src/dataflow/operators/vec/broadcast.rs similarity index 73% rename from timely/src/dataflow/operators/broadcast.rs rename to timely/src/dataflow/operators/vec/broadcast.rs index 8a1979d12..8bf4397ca 100644 --- a/timely/src/dataflow/operators/broadcast.rs +++ b/timely/src/dataflow/operators/vec/broadcast.rs @@ -1,8 +1,8 @@ //! Broadcast records to all workers. use crate::ExchangeData; -use crate::dataflow::{Stream, Scope}; -use crate::dataflow::operators::{Map, Exchange}; +use crate::dataflow::{StreamVec, Scope}; +use crate::dataflow::operators::{vec::Map, Exchange}; /// Broadcast records to all workers. pub trait Broadcast { @@ -10,7 +10,7 @@ pub trait Broadcast { /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Broadcast, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::Broadcast}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) @@ -21,8 +21,8 @@ pub trait Broadcast { fn broadcast(self) -> Self; } -impl Broadcast for Stream { - fn broadcast(self) -> Stream { +impl Broadcast for StreamVec { + fn broadcast(self) -> StreamVec { // NOTE: Simplified implementation due to underlying motion // in timely dataflow internals. Optimize once they have diff --git a/timely/src/dataflow/operators/count.rs b/timely/src/dataflow/operators/vec/count.rs similarity index 81% rename from timely/src/dataflow/operators/count.rs rename to timely/src/dataflow/operators/vec/count.rs index 349b542f6..24a1363c8 100644 --- a/timely/src/dataflow/operators/count.rs +++ b/timely/src/dataflow/operators/vec/count.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamVec, Scope}; use crate::dataflow::operators::generic::operator::Operator; /// Accumulates records within a timestamp. @@ -12,7 +12,8 @@ pub trait Accumulate : Sized { /// # Examples /// /// ``` - /// use timely::dataflow::operators::{ToStream, Accumulate, Capture}; + /// use timely::dataflow::operators::{ToStream, Capture}; + /// use timely::dataflow::operators::vec::count::Accumulate; /// use timely::dataflow::operators::capture::Extract; /// /// let captured = timely::example(|scope| { @@ -24,13 +25,14 @@ pub trait Accumulate : Sized { /// let extracted = captured.extract(); /// assert_eq!(extracted, vec![(0, vec![45])]); /// ``` - fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream; + fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> StreamVec; /// Counts the number of records observed at each time. /// /// # Examples /// /// ``` - /// use timely::dataflow::operators::{ToStream, Accumulate, Capture}; + /// use timely::dataflow::operators::{ToStream, Capture}; + /// use timely::dataflow::operators::vec::count::Accumulate; /// use timely::dataflow::operators::capture::Extract; /// /// let captured = timely::example(|scope| { @@ -42,13 +44,13 @@ pub trait Accumulate : Sized { /// let extracted = captured.extract(); /// assert_eq!(extracted, vec![(0, vec![10])]); /// ``` - fn count(self) -> Stream { + fn count(self) -> StreamVec { self.accumulate(0, |sum, data| *sum += data.len()) } } -impl, D: 'static> Accumulate for Stream { - fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> Stream { +impl, D: 'static> Accumulate for StreamVec { + fn accumulate(self, default: A, logic: impl Fn(&mut A, &mut Vec)+'static) -> StreamVec { let mut accums = HashMap::new(); self.unary_notify(Pipeline, "Accumulate", vec![], move |input, output, notificator| { diff --git a/timely/src/dataflow/operators/delay.rs b/timely/src/dataflow/operators/vec/delay.rs similarity index 93% rename from timely/src/dataflow/operators/delay.rs rename to timely/src/dataflow/operators/vec/delay.rs index c9151964f..3ff0596f5 100644 --- a/timely/src/dataflow/operators/delay.rs +++ b/timely/src/dataflow/operators/vec/delay.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use crate::order::{PartialOrder, TotalOrder}; use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamVec, Scope}; use crate::dataflow::operators::generic::operator::Operator; /// Methods to advance the timestamps of records or batches of records. @@ -22,7 +22,8 @@ pub trait Delay { /// and delays each element `i` to time `i`. /// /// ``` - /// use timely::dataflow::operators::{ToStream, Delay, Operator}; + /// use timely::dataflow::operators::{ToStream, Operator}; + /// use timely::dataflow::operators::vec::Delay; /// use timely::dataflow::channels::pact::Pipeline; /// /// timely::example(|scope| { @@ -49,7 +50,8 @@ pub trait Delay { /// and delays each element `i` to time `i`. /// /// ``` - /// use timely::dataflow::operators::{ToStream, Delay, Operator}; + /// use timely::dataflow::operators::{ToStream, Operator}; + /// use timely::dataflow::operators::vec::Delay; /// use timely::dataflow::channels::pact::Pipeline; /// /// timely::example(|scope| { @@ -77,7 +79,8 @@ pub trait Delay { /// and delays each batch (there is just one) to time `1`. /// /// ``` - /// use timely::dataflow::operators::{ToStream, Delay, Operator}; + /// use timely::dataflow::operators::{ToStream, Operator}; + /// use timely::dataflow::operators::vec::Delay; /// use timely::dataflow::channels::pact::Pipeline; /// /// timely::example(|scope| { @@ -93,7 +96,7 @@ pub trait Delay { fn delay_batchG::Timestamp+'static>(self, func: L) -> Self; } -impl, D: 'static> Delay for Stream { +impl, D: 'static> Delay for StreamVec { fn delayG::Timestamp+'static>(self, mut func: L) -> Self { let mut elements = HashMap::new(); self.unary_notify(Pipeline, "Delay", vec![], move |input, output, notificator| { diff --git a/timely/src/dataflow/operators/filter.rs b/timely/src/dataflow/operators/vec/filter.rs similarity index 81% rename from timely/src/dataflow/operators/filter.rs rename to timely/src/dataflow/operators/vec/filter.rs index 0481dac35..091e9dc9d 100644 --- a/timely/src/dataflow/operators/filter.rs +++ b/timely/src/dataflow/operators/vec/filter.rs @@ -1,7 +1,7 @@ //! Filters a stream by a predicate. use crate::dataflow::channels::pact::Pipeline; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamVec, Scope}; use crate::dataflow::operators::generic::operator::Operator; /// Extension trait for filtering. @@ -10,7 +10,8 @@ pub trait Filter { /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Filter, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// use timely::dataflow::operators::vec::Filter; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) @@ -21,8 +22,8 @@ pub trait Filter { fn filterbool+'static>(self, predicate: P) -> Self; } -impl Filter for Stream { - fn filterbool+'static>(self, mut predicate: P) -> Stream { +impl Filter for StreamVec { + fn filterbool+'static>(self, mut predicate: P) -> StreamVec { self.unary(Pipeline, "Filter", move |_,_| move |input, output| { input.for_each_time(|time, data| { let mut session = output.session(&time); diff --git a/timely/src/dataflow/operators/flow_controlled.rs b/timely/src/dataflow/operators/vec/flow_controlled.rs similarity index 96% rename from timely/src/dataflow/operators/flow_controlled.rs rename to timely/src/dataflow/operators/vec/flow_controlled.rs index 1f8600d89..5274406f3 100644 --- a/timely/src/dataflow/operators/flow_controlled.rs +++ b/timely/src/dataflow/operators/vec/flow_controlled.rs @@ -4,7 +4,7 @@ use crate::order::{PartialOrder, TotalOrder}; use crate::progress::timestamp::Timestamp; use crate::dataflow::operators::generic::operator::source; use crate::dataflow::operators::probe::Handle; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamVec, Scope}; /// Output of the input reading function for iterator_source. pub struct IteratorSourceInput, I: IntoIterator> { @@ -35,7 +35,7 @@ pub struct IteratorSourceInput, I /// /// # Example /// ```rust -/// use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput}; +/// use timely::dataflow::operators::vec::flow_controlled::{iterator_source, IteratorSourceInput}; /// use timely::dataflow::operators::{probe, Probe, Inspect}; /// /// timely::execute_from_args(std::env::args(), |worker| { @@ -78,7 +78,7 @@ pub fn iterator_source< name: &str, mut input_f: F, probe: Handle, - ) -> Stream where G::Timestamp: TotalOrder { + ) -> StreamVec where G::Timestamp: TotalOrder { let mut target = G::Timestamp::minimum(); source(scope, name, |cap, info| { diff --git a/timely/src/dataflow/operators/input.rs b/timely/src/dataflow/operators/vec/input.rs similarity index 79% rename from timely/src/dataflow/operators/input.rs rename to timely/src/dataflow/operators/vec/input.rs index b9aba80eb..5e6ef4b51 100644 --- a/timely/src/dataflow/operators/input.rs +++ b/timely/src/dataflow/operators/vec/input.rs @@ -1,7 +1,7 @@ //! Create new `Streams` connected to external inputs. use crate::container::CapacityContainerBuilder; -use crate::dataflow::{Stream, ScopeParent, Scope}; +use crate::dataflow::{StreamVec, ScopeParent, Scope}; use crate::dataflow::operators::core::{Input as InputCore}; // TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something @@ -11,11 +11,11 @@ use crate::dataflow::operators::core::{Input as InputCore}; // NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long. // NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a. -/// Create a new `Stream` and `Handle` through which to supply input. +/// Create a new `StreamVec` and `Handle` through which to supply input. pub trait Input : Scope { - /// Create a new `Stream` and `Handle` through which to supply input. + /// Create a new `StreamVec` and `Handle` through which to supply input. /// - /// The `new_input` method returns a pair `(Handle, Stream)` where the `Stream` can be used + /// The `new_input` method returns a pair `(Handle, StreamVec)` where the `StreamVec` can be used /// immediately for timely dataflow construction, and the `Handle` is later used to introduce /// data into the timely dataflow computation. /// @@ -33,7 +33,7 @@ pub trait Input : Scope { /// /// // add an input and base computation off of it /// let mut input = worker.dataflow(|scope| { - /// let (input, stream) = scope.new_input(); + /// let (input, stream) = scope.new_input::>(); /// stream.inspect(|x| println!("hello {:?}", x)); /// input /// }); @@ -46,7 +46,7 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream); + fn new_input(&mut self) -> (Handle<::Timestamp, D>, StreamVec); /// Create a new stream from a supplied interactive handle. /// @@ -58,15 +58,16 @@ pub trait Input : Scope { /// ``` /// use timely::*; /// use timely::dataflow::operators::{Input, Inspect}; - /// use timely::dataflow::operators::input::Handle; + /// use timely::dataflow::InputHandle; /// /// // construct and execute a timely dataflow /// timely::execute(Config::thread(), |worker| { /// /// // add an input and base computation off of it - /// let mut input = Handle::new(); + /// let mut input = InputHandle::new(); /// worker.dataflow(|scope| { /// scope.input_from(&mut input) + /// .container::>() /// .inspect(|x| println!("hello {:?}", x)); /// }); /// @@ -78,19 +79,19 @@ pub trait Input : Scope { /// } /// }); /// ``` - fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream; + fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> StreamVec; } use crate::order::TotalOrder; impl Input for G where ::Timestamp: TotalOrder { - fn new_input(&mut self) -> (Handle<::Timestamp, D>, Stream) { + fn new_input(&mut self) -> (Handle<::Timestamp, D>, StreamVec) { InputCore::new_input(self) } - fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> Stream { + fn input_from(&mut self, handle: &mut Handle<::Timestamp, D>) -> StreamVec { InputCore::input_from(self, handle) } } -/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation. +/// A handle to an input `StreamVec`, used to introduce data to a timely dataflow computation. pub type Handle = crate::dataflow::operators::core::input::Handle>>; diff --git a/timely/src/dataflow/operators/map.rs b/timely/src/dataflow/operators/vec/map.rs similarity index 78% rename from timely/src/dataflow/operators/map.rs rename to timely/src/dataflow/operators/vec/map.rs index abb65f517..66b4b8b2a 100644 --- a/timely/src/dataflow/operators/map.rs +++ b/timely/src/dataflow/operators/vec/map.rs @@ -1,17 +1,17 @@ -//! Extension methods for `Stream` based on record-by-record transformation. +//! Extension methods for `StreamVec` based on record-by-record transformation. -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamVec, Scope}; use crate::dataflow::channels::pact::Pipeline; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::operators::core::{Map as MapCore}; -/// Extension trait for `Stream`. +/// Extension trait for `StreamVec`. pub trait Map : Sized { /// Consumes each element of the stream and yields a new element. /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Map, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::Map}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) @@ -19,14 +19,14 @@ pub trait Map : Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn mapD2+'static>(self, mut logic: L) -> Stream { + fn mapD2+'static>(self, mut logic: L) -> StreamVec { self.flat_map(move |x| std::iter::once(logic(x))) } /// Updates each element of the stream and yields the element, re-using memory where possible. /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Map, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::Map}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) @@ -34,12 +34,12 @@ pub trait Map : Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_in_place(self, logic: L) -> Stream; + fn map_in_place(self, logic: L) -> StreamVec; /// Consumes each element of the stream and yields some number of new elements. /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Map, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::Map}; /// /// timely::example(|scope| { /// (0..10).to_stream(scope) @@ -47,11 +47,11 @@ pub trait Map : Sized { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn flat_mapI+'static>(self, logic: L) -> Stream where I::Item: 'static; + fn flat_mapI+'static>(self, logic: L) -> StreamVec where I::Item: 'static; } -impl Map for Stream { - fn map_in_place(self, mut logic: L) -> Stream { +impl Map for StreamVec { + fn map_in_place(self, mut logic: L) -> StreamVec { self.unary(Pipeline, "MapInPlace", move |_,_| move |input, output| { input.for_each_time(|time, data| { let mut session = output.session(&time); @@ -65,7 +65,7 @@ impl Map for Stream { // TODO : This would be more robust if it captured an iterator and then pulled an appropriate // TODO : number of elements from the iterator. This would allow iterators that produce many // TODO : records without taking arbitrarily long and arbitrarily much memory. - fn flat_mapI+'static>(self, logic: L) -> Stream where I::Item: 'static { + fn flat_mapI+'static>(self, logic: L) -> StreamVec where I::Item: 'static { MapCore::flat_map(self, logic) } } diff --git a/timely/src/dataflow/operators/vec/mod.rs b/timely/src/dataflow/operators/vec/mod.rs new file mode 100644 index 000000000..d3f1920a4 --- /dev/null +++ b/timely/src/dataflow/operators/vec/mod.rs @@ -0,0 +1,34 @@ +//! Extension methods and implementations for streams of vector containers. +//! +//! These methods operate on containers that are vectors of individual items, +//! and much of their behavior can be described item-by-item. +//! +//! Vector containers are a natural entry point before one forms a stronger +//! opinion on the containers one would like to use. While they are easy to +//! use, vector containers (and owned data) invite performance antipatterns +//! around resource management (allocation and deallocation, across threads). + +pub mod input; +pub mod flow_controlled; +pub mod unordered_input; +pub mod partition; +pub mod map; +pub mod filter; +pub mod delay; +pub mod broadcast; +pub mod to_stream; +pub mod branch; +pub mod result; +pub mod aggregation; +pub mod count; + +pub use self::input::Input; +pub use self::unordered_input::UnorderedInput; +pub use self::partition::Partition; +pub use self::map::Map; +pub use self::filter::Filter; +pub use self::delay::Delay; +pub use self::broadcast::Broadcast; +pub use self::branch::{Branch, BranchWhen}; +pub use self::result::ResultStream; +pub use self::to_stream::ToStream; diff --git a/timely/src/dataflow/operators/partition.rs b/timely/src/dataflow/operators/vec/partition.rs similarity index 76% rename from timely/src/dataflow/operators/partition.rs rename to timely/src/dataflow/operators/vec/partition.rs index eae748645..1a8548468 100644 --- a/timely/src/dataflow/operators/partition.rs +++ b/timely/src/dataflow/operators/vec/partition.rs @@ -2,7 +2,7 @@ use crate::container::CapacityContainerBuilder; use crate::dataflow::operators::core::Partition as PartitionCore; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::{Scope, StreamVec}; /// Partition a stream of records into multiple streams. pub trait Partition (u64, D2)> { @@ -10,7 +10,7 @@ pub trait Partition (u64, D2)> { /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Partition, Inspect}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::Partition}; /// /// timely::example(|scope| { /// let mut streams = (0..10).to_stream(scope) @@ -21,11 +21,11 @@ pub trait Partition (u64, D2)> { /// streams.pop().unwrap().inspect(|x| println!("seen 0: {:?}", x)); /// }); /// ``` - fn partition(self, parts: u64, route: F) -> Vec>; + fn partition(self, parts: u64, route: F) -> Vec>; } -impl(u64, D2)+'static> Partition for Stream { - fn partition(self, parts: u64, route: F) -> Vec> { +impl(u64, D2)+'static> Partition for StreamVec { + fn partition(self, parts: u64, route: F) -> Vec> { PartitionCore::partition::, _, _>(self, parts, route) } } diff --git a/timely/src/dataflow/operators/queue.rs b/timely/src/dataflow/operators/vec/queue.rs similarity index 86% rename from timely/src/dataflow/operators/queue.rs rename to timely/src/dataflow/operators/vec/queue.rs index 09313b2e5..fa1395591 100644 --- a/timely/src/dataflow/operators/queue.rs +++ b/timely/src/dataflow/operators/vec/queue.rs @@ -2,15 +2,15 @@ use std::collections::HashMap; use Data; use dataflow::channels::pact::Pipeline; -use dataflow::{Stream, Scope}; +use dataflow::{StreamVec, Scope}; use dataflow::operators::unary::Unary; pub trait Queue { fn queue(&self) -> Self; } -impl Queue for Stream { - fn queue(&self) -> Stream { +impl Queue for StreamVec { + fn queue(&self) -> StreamVec { let mut elements = HashMap::new(); self.unary_notify(Pipeline, "Queue", vec![], move |input, output, notificator| { while let Some((time, data)) = input.next() { diff --git a/timely/src/dataflow/operators/result.rs b/timely/src/dataflow/operators/vec/result.rs similarity index 78% rename from timely/src/dataflow/operators/result.rs rename to timely/src/dataflow/operators/vec/result.rs index b57124956..cecff9223 100644 --- a/timely/src/dataflow/operators/result.rs +++ b/timely/src/dataflow/operators/vec/result.rs @@ -1,15 +1,15 @@ -//! Extension methods for `Stream` containing `Result`s. +//! Extension methods for `StreamVec` containing `Result`s. -use crate::dataflow::operators::Map; -use crate::dataflow::{Scope, Stream}; +use crate::dataflow::operators::vec::Map; +use crate::dataflow::{Scope, StreamVec}; -/// Extension trait for `Stream`. +/// Extension trait for `StreamVec`. pub trait ResultStream { /// Returns a new instance of `self` containing only `ok` records. /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream}; /// /// timely::example(|scope| { /// vec![Ok(0), Err(())].to_stream(scope) @@ -17,13 +17,13 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn ok(self) -> Stream; + fn ok(self) -> StreamVec; /// Returns a new instance of `self` containing only `err` records. /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream}; /// /// timely::example(|scope| { /// vec![Ok(0), Err(())].to_stream(scope) @@ -31,13 +31,13 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn err(self) -> Stream; + fn err(self) -> StreamVec; /// Returns a new instance of `self` applying `logic` on all `Ok` records. /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream}; /// /// timely::example(|scope| { /// vec![Ok(0), Err(())].to_stream(scope) @@ -45,13 +45,13 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_ok T2 + 'static>(self, logic: L) -> Stream>; + fn map_ok T2 + 'static>(self, logic: L) -> StreamVec>; /// Returns a new instance of `self` applying `logic` on all `Err` records. /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream}; /// /// timely::example(|scope| { /// vec![Ok(0), Err(())].to_stream(scope) @@ -59,14 +59,14 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn map_err E2 + 'static>(self, logic: L) -> Stream>; + fn map_err E2 + 'static>(self, logic: L) -> StreamVec>; /// Returns a new instance of `self` applying `logic` on all `Ok` records, passes through `Err` /// records. /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream}; /// /// timely::example(|scope| { /// vec![Ok(0), Err(())].to_stream(scope) @@ -77,13 +77,13 @@ pub trait ResultStream { fn and_then Result + 'static>( self, logic: L, - ) -> Stream>; + ) -> StreamVec>; /// Returns a new instance of `self` applying `logic` on all `Ok` records. /// /// # Examples /// ``` - /// use timely::dataflow::operators::{ToStream, Inspect, ResultStream}; + /// use timely::dataflow::operators::{ToStream, Inspect, vec::ResultStream}; /// /// timely::example(|scope| { /// vec![Ok(1), Err(())].to_stream(scope) @@ -91,38 +91,38 @@ pub trait ResultStream { /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` - fn unwrap_or_else T + 'static>(self, logic: L) -> Stream; + fn unwrap_or_else T + 'static>(self, logic: L) -> StreamVec; } -impl ResultStream for Stream> { - fn ok(self) -> Stream { +impl ResultStream for StreamVec> { + fn ok(self) -> StreamVec { self.flat_map(Result::ok) } - fn err(self) -> Stream { + fn err(self) -> StreamVec { self.flat_map(Result::err) } - fn map_ok T2 + 'static>(self, mut logic: L) -> Stream> { + fn map_ok T2 + 'static>(self, mut logic: L) -> StreamVec> { self.map(move |r| r.map(&mut logic)) } - fn map_err E2 + 'static>(self, mut logic: L) -> Stream> { + fn map_err E2 + 'static>(self, mut logic: L) -> StreamVec> { self.map(move |r| r.map_err(&mut logic)) } - fn and_then Result + 'static>(self, mut logic: L) -> Stream> { + fn and_then Result + 'static>(self, mut logic: L) -> StreamVec> { self.map(move |r| r.and_then(&mut logic)) } - fn unwrap_or_else T + 'static>(self, mut logic: L) -> Stream { + fn unwrap_or_else T + 'static>(self, mut logic: L) -> StreamVec { self.map(move |r| r.unwrap_or_else(&mut logic)) } } #[cfg(test)] mod tests { - use crate::dataflow::operators::{ToStream, ResultStream, Capture, capture::Extract}; + use crate::dataflow::operators::{vec::{ToStream, ResultStream}, Capture, capture::Extract}; #[test] fn test_ok() { diff --git a/timely/src/dataflow/operators/to_stream.rs b/timely/src/dataflow/operators/vec/to_stream.rs similarity index 53% rename from timely/src/dataflow/operators/to_stream.rs rename to timely/src/dataflow/operators/vec/to_stream.rs index b7916fb1c..fba5898c6 100644 --- a/timely/src/dataflow/operators/to_stream.rs +++ b/timely/src/dataflow/operators/vec/to_stream.rs @@ -1,11 +1,11 @@ -//! Conversion to the `Stream` type from iterators. +//! Conversion to the `StreamVec` type from iterators. -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamVec, Scope}; use crate::dataflow::operators::core::{ToStream as ToStreamCore}; -/// Converts to a timely `Stream`. +/// Converts to a timely `StreamVec`. pub trait ToStream { - /// Converts to a timely `Stream`. + /// Converts to a timely `StreamVec`. /// /// # Examples /// @@ -14,18 +14,18 @@ pub trait ToStream { /// use timely::dataflow::operators::capture::Extract; /// /// let (data1, data2) = timely::example(|scope| { - /// let data1 = (0..3).to_stream(scope).capture(); - /// let data2 = vec![0,1,2].to_stream(scope).capture(); + /// let data1 = (0..3).to_stream(scope).container::>().capture(); + /// let data2 = vec![0,1,2].to_stream(scope).container::>().capture(); /// (data1, data2) /// }); /// /// assert_eq!(data1.extract(), data2.extract()); /// ``` - fn to_stream(self, scope: &mut S) -> Stream; + fn to_stream(self, scope: &mut S) -> StreamVec; } impl ToStream for I { - fn to_stream(self, scope: &mut S) -> Stream { + fn to_stream(self, scope: &mut S) -> StreamVec { ToStreamCore::to_stream(self, scope) } } diff --git a/timely/src/dataflow/operators/unordered_input.rs b/timely/src/dataflow/operators/vec/unordered_input.rs similarity index 88% rename from timely/src/dataflow/operators/unordered_input.rs rename to timely/src/dataflow/operators/vec/unordered_input.rs index afdc2253a..63f195ede 100644 --- a/timely/src/dataflow/operators/unordered_input.rs +++ b/timely/src/dataflow/operators/vec/unordered_input.rs @@ -3,14 +3,14 @@ use crate::container::CapacityContainerBuilder; use crate::dataflow::operators::{ActivateCapability}; use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore}; -use crate::dataflow::{Stream, Scope}; +use crate::dataflow::{StreamVec, Scope}; -/// Create a new `Stream` and `Handle` through which to supply input. +/// Create a new `StreamVec` and `Handle` through which to supply input. pub trait UnorderedInput { - /// Create a new capability-based `Stream` and `Handle` through which to supply input. This + /// Create a new capability-based `StreamVec` and `Handle` through which to supply input. This /// input supports multiple open epochs (timestamps) at the same time. /// - /// The `new_unordered_input` method returns `((Handle, Capability), Stream)` where the `Stream` can be used + /// The `new_unordered_input` method returns `((Handle, Capability), Stream)` where the `StreamVec` can be used /// immediately for timely dataflow construction, `Handle` and `Capability` are later used to introduce /// data into the timely dataflow computation. /// @@ -28,8 +28,8 @@ pub trait UnorderedInput { /// /// use timely::*; /// use timely::dataflow::operators::*; + /// use timely::dataflow::operators::vec::UnorderedInput; /// use timely::dataflow::operators::capture::Extract; - /// use timely::dataflow::Stream; /// /// // get send and recv endpoints, wrap send to share /// let (send, recv) = ::std::sync::mpsc::channel(); @@ -60,12 +60,12 @@ pub trait UnorderedInput { /// assert_eq!(extract[i], (i, vec![i])); /// } /// ``` - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream); + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamVec); } impl UnorderedInput for G { - fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), Stream) { + fn new_unordered_input(&mut self) -> ((UnorderedHandle, ActivateCapability), StreamVec) { UnorderedInputCore::new_unordered_input(self) } } diff --git a/timely/src/dataflow/scopes/mod.rs b/timely/src/dataflow/scopes/mod.rs index 511bfe492..106d5f9d4 100644 --- a/timely/src/dataflow/scopes/mod.rs +++ b/timely/src/dataflow/scopes/mod.rs @@ -88,7 +88,7 @@ pub trait Scope: ScopeParent { /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::(); + /// let (input, stream) = child1.new_input::>(); /// let output = child1.scoped::,_,_>("ScopeName", |child2| { /// stream.enter(child2).leave() /// }); @@ -115,7 +115,7 @@ pub trait Scope: ScopeParent { /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::(); + /// let (input, stream) = child1.new_input::>(); /// let output = child1.iterative::(|child2| { /// stream.enter(child2).leave() /// }); @@ -145,7 +145,7 @@ pub trait Scope: ScopeParent { /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::(); + /// let (input, stream) = child1.new_input::>(); /// let output = child1.region(|child2| { /// stream.enter(child2).leave() /// }); @@ -177,7 +177,7 @@ pub trait Scope: ScopeParent { /// timely::execute_from_args(std::env::args(), |worker| { /// // must specify types as nothing else drives inference. /// let input = worker.dataflow::(|child1| { - /// let (input, stream) = child1.new_input::(); + /// let (input, stream) = child1.new_input::>(); /// let output = child1.region_named("region", |child2| { /// stream.enter(child2).leave() /// }); diff --git a/timely/src/dataflow/stream.rs b/timely/src/dataflow/stream.rs index 1544ad0b6..25ab446d1 100644 --- a/timely/src/dataflow/stream.rs +++ b/timely/src/dataflow/stream.rs @@ -18,7 +18,7 @@ use std::fmt::{self, Debug}; /// /// Internally `Stream` maintains a list of data recipients who should be presented with data /// produced by the source of the stream. -pub struct StreamCore { +pub struct Stream { /// The progress identifier of the stream's data source. name: Source, /// The `Scope` containing the stream. @@ -27,7 +27,7 @@ pub struct StreamCore { ports: TeeHelper, } -impl Clone for StreamCore { +impl Clone for Stream { fn clone(&self) -> Self { Self { name: self.name, @@ -43,10 +43,10 @@ impl Clone for StreamCore { } } -/// A stream batching data in vectors. -pub type Stream = StreamCore>; +/// A stream batching data in owning vectors. +pub type StreamVec = Stream>; -impl StreamCore { +impl Stream { /// Connects the stream to a destination. /// /// The destination is described both by a `Target`, for progress tracking information, and a `P: Push` where the @@ -75,20 +75,34 @@ impl StreamCore { pub fn scope(&self) -> S { self.scope.clone() } /// Allows the assertion of a container type, for the benefit of type inference. - pub fn container(self) -> StreamCore where Self: AsStream { self.as_stream() } + /// + /// This method can be needed when the container type of a stream is unconstrained, + /// most commonly after creating an input, or bracking wholly generic operators. + /// + /// # Examples + /// ``` + /// use timely::dataflow::operators::{ToStream, Inspect}; + /// + /// timely::example(|scope| { + /// (0..10).to_stream(scope) + /// .container::>() + /// .inspect(|x| println!("seen: {:?}", x)); + /// }); + /// ``` + pub fn container(self) -> Stream where Self: AsStream { self.as_stream() } } -/// A type that can be translated to a [StreamCore]. +/// A type that can be translated to a [Stream]. pub trait AsStream { - /// Translate `self` to a [StreamCore]. - fn as_stream(self) -> StreamCore; + /// Translate `self` to a [Stream]. + fn as_stream(self) -> Stream; } -impl AsStream for StreamCore { +impl AsStream for Stream { fn as_stream(self) -> Self { self } } -impl Debug for StreamCore +impl Debug for Stream where S: Scope, { @@ -112,6 +126,7 @@ mod tests { crate::example(|scope| { let _ = [NotClone] .to_stream(scope) + .container::>() .sink(Pipeline, "check non-clone", |(input, _frontier)| { input.for_each(|_cap, data| { for datum in data.drain(..) { diff --git a/timely/src/execute.rs b/timely/src/execute.rs index b9673a98e..f03e74df9 100644 --- a/timely/src/execute.rs +++ b/timely/src/execute.rs @@ -97,6 +97,7 @@ impl Config { /// /// timely::example(|scope| { /// (0..10).to_stream(scope) +/// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }); /// ``` @@ -112,6 +113,7 @@ impl Config { /// /// let data = timely::example(|scope| { /// (0..10).to_stream(scope) +/// .container::>() /// .inspect(|x| println!("seen: {:?}", x)) /// .capture() /// }); @@ -144,6 +146,7 @@ where /// timely::execute_directly(|worker| { /// worker.dataflow::<(),_,_>(|scope| { /// (0..10).to_stream(scope) +/// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }) /// }); @@ -188,6 +191,7 @@ where /// timely::execute(timely::Config::process(3), |worker| { /// worker.dataflow::<(),_,_>(|scope| { /// (0..10).to_stream(scope) +/// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }) /// }).unwrap(); @@ -211,6 +215,7 @@ where /// let send = send.lock().unwrap().clone(); /// worker.dataflow::<(),_,_>(move |scope| { /// (0..10).to_stream(scope) +/// .container::>() /// .inspect(|x| println!("seen: {:?}", x)) /// .capture_into(send); /// }); @@ -266,6 +271,7 @@ where /// timely::execute_from_args(std::env::args(), |worker| { /// worker.dataflow::<(),_,_>(|scope| { /// (0..10).to_stream(scope) +/// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }) /// }).unwrap(); @@ -305,6 +311,7 @@ pub fn execute_from_args(iter: I, func: F) -> Result,St /// timely::execute::execute_from(builders, other, WorkerConfig::default(), |worker| { /// worker.dataflow::<(),_,_>(|scope| { /// (0..10).to_stream(scope) +/// .container::>() /// .inspect(|x| println!("seen: {:?}", x)); /// }) /// }).unwrap(); diff --git a/timely/src/lib.rs b/timely/src/lib.rs index a4c8c1326..39625f7e3 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -30,7 +30,7 @@ //! //! // add an input and base computation off of it //! let mut input = worker.dataflow(|scope| { -//! let (input, stream) = scope.new_input(); +//! let (input, stream) = scope.new_input::>(); //! stream.inspect(|x| println!("hello {:?}", x)); //! input //! }); diff --git a/timely/src/synchronization/barrier.rs b/timely/src/synchronization/barrier.rs index e7d1fe460..4ccdba3cf 100644 --- a/timely/src/synchronization/barrier.rs +++ b/timely/src/synchronization/barrier.rs @@ -1,12 +1,12 @@ //! Barrier synchronization. use crate::communication::Allocate; -use crate::dataflow::{InputHandle, ProbeHandle}; +use crate::dataflow::{InputHandleVec, ProbeHandle}; use crate::worker::Worker; /// A re-usable barrier synchronization mechanism. pub struct Barrier { - input: InputHandle, + input: InputHandleVec, probe: ProbeHandle, worker: Worker, } @@ -17,7 +17,7 @@ impl Barrier { pub fn new(worker: &mut Worker) -> Self { use crate::dataflow::operators::{Input, Probe}; let (input, probe) = worker.dataflow(|scope| { - let (handle, stream) = scope.new_input::<()>(); + let (handle, stream) = scope.new_input::>(); (handle, stream.probe()) }); Barrier { input, probe, worker: worker.clone() } diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 29f40aeda..aa5040695 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -309,6 +309,7 @@ impl Worker { /// worker.dataflow::(|scope| { /// (0 .. 10) /// .to_stream(scope) + /// .container::>() /// .inspect(|x| println!("{:?}", x)); /// }); /// @@ -340,6 +341,7 @@ impl Worker { /// worker.dataflow::(|scope| { /// (0 .. 10) /// .to_stream(scope) + /// .container::>() /// .inspect(|x| println!("{:?}", x)); /// }); /// @@ -446,6 +448,7 @@ impl Worker { /// worker.dataflow::(|scope| { /// (0 .. 10) /// .to_stream(scope) + /// .container::>() /// .inspect(|x| println!("{:?}", x)) /// .probe() /// }); @@ -475,6 +478,7 @@ impl Worker { /// worker.dataflow::(|scope| { /// (0 .. 10) /// .to_stream(scope) + /// .container::>() /// .inspect(|x| println!("{:?}", x)) /// .probe() /// }); diff --git a/timely/tests/gh_523.rs b/timely/tests/gh_523.rs index 589420081..9b360f77f 100644 --- a/timely/tests/gh_523.rs +++ b/timely/tests/gh_523.rs @@ -10,6 +10,7 @@ fn gh_523() { let probe = worker.dataflow::(|scope| { scope .input_from(&mut input) + .container::>() .unary(Pipeline, "Test", move |_, _| { move |input, output| { input.for_each_time(|cap, data| { diff --git a/timely/tests/shape_scaling.rs b/timely/tests/shape_scaling.rs index 755b6f365..19e811a6b 100644 --- a/timely/tests/shape_scaling.rs +++ b/timely/tests/shape_scaling.rs @@ -14,7 +14,7 @@ fn operator_scaling(scale: u64) { timely::execute(Config::thread(), move |worker| { let mut input = InputHandle::new(); worker.dataflow::(|scope| { - use timely::dataflow::operators::Partition; + use timely::dataflow::operators::vec::Partition; let parts = scope .input_from(&mut input) @@ -58,7 +58,7 @@ fn subgraph_scaling(scale: u64) { timely::execute(Config::thread(), move |worker| { let mut input = InputHandle::new(); worker.dataflow::(|scope| { - use timely::dataflow::operators::Partition; + use timely::dataflow::operators::vec::Partition; let parts = scope .input_from(&mut input)