Core Streams¶
This document takes you through how to build basic streams and push data through them. We start with map and accumulate, talk about emitting data, then discuss flow control and finally back pressure. Examples are used throughout.
Map, emit, and sink¶
|
Push data into the stream at this point |
|
Apply a function to every element in the stream |
|
Apply a function on every element |
You can create a basic pipeline by instantiating the rapidz
object and then using methods like map
, accumulate
, and sink
.
from rapidz import Stream
def increment(x):
return x + 1
source = Stream()
source.map(increment).sink(print)
The map
and sink
methods both take a function and apply that function to every element in the stream. The map
method returns a new stream with the modified elements while sink
is typically used at the end of a stream for final actions.
To push data through our pipeline we call emit
>>> source.emit(1)
2
>>> source.emit(2)
3
>>> source.emit(10)
11
As we can see, whenever we push data in at the source, our pipeline calls
increment
on that data, and then calls print
on that data, resulting in
incremented results being printed to the screen.
Often we call emit
from some other continuous process, like reading lines
from a file
import json
data = []
source = Stream()
source.map(json.loads).sink(data.append)
for line in open('myfile.json'):
source.emit(line)
Accumulating State¶
|
Accumulate results with previous state |
Map and sink both pass data directly through a stream. One piece of data comes in, either one or zero pieces go out. Accumulate allows you to track some state within the pipeline. It takes an accumulation function that takes the previous state, the new element, and then returns a new state and a new element to emit. In the following example we make an accumulator that keeps a running total of the elements seen so far.
def add(x, y):
return x + y
source = Stream()
source.accumulate(add).sink(print)
>>> source.emit(1)
1
>>> source.emit(2)
3
>>> source.emit(3)
6
>>> source.emit(4)
10
The accumulation function above is particularly simple, the state that we store and the value that we emit are the same. In more complex situations we might want to keep around different state than we emit. For example lets count the number of distinct elements that we have seen so far.
def num_distinct(state, new):
state.add(new)
return state, len(state)
source = Stream()
source.accumulate(num_distinct, returns_state=True, start=set()).sink(print)
>>> source.emit('cat')
1
>>> source.emit('dog')
2
>>> source.emit('cat')
2
>>> source.emit('mouse')
3
Accumulators allow us to build many interesting operations.
Flow Control¶
|
Allow results to pile up at this point in the stream |
|
Flatten streams of lists or iterables into a stream of elements |
|
Partition stream into tuples of equal size |
|
Produce overlapping tuples of size n |
|
Combine multiple streams into one |
|
Avoid sending through repeated elements |
You can batch and slice streams into streams of batches in various ways with
operations like partition
, buffer
, and sliding_window
source = Stream()
source.sliding_window(3).sink(print)
>>> source.emit(1)
>>> source.emit(2)
>>> source.emit(3)
(1, 2, 3)
>>> source.emit(4)
(2, 3, 4)
>>> source.emit(5)
(3, 4, 5)
Branching and Joining¶
|
Combine multiple streams together to a stream of tuples |
|
Combine streams together into a stream of tuples |
|
Combine multiple streams together to a stream of tuples |
You can branch multiple streams off of a single stream. Elements that go into the input will pass through to both output streams.
def increment(x):
return x + 1
def decrement(x):
return x - 1
source = Stream()
a = source.map(increment).sink(print)
b = source.map(decrement).sink(print)
b.visualize(rankdir='LR')
>>> source.emit(1)
0
2
>>> source.emit(10)
9
11
Similarly you can also combine multiple streams together with operations like
zip
, which emits once both streams have provided a new element, or
combine_latest
which emits when either stream has provided a new element.
source = Stream()
a = source.map(increment)
b = source.map(decrement)
c = a.zip(b).map(sum).sink(print)
>>> source.emit(10)
20 # 9 + 11
This branching and combining is where Python iterators break down, and projects
like rapidz
start becoming valuable.
Processing Time and Back Pressure¶
|
Add a time delay to results |
|
Limit the flow of data |
|
Emit a tuple of collected results every interval |
Time-based flow control depends on having an active Tornado event loop. Tornado is active by default within a Jupyter notebook, but otherwise you will need to learn at least a little about asynchronous programming in Python to use these features. Learning async programming is not mandatory, the rest of the project will work fine without Tornado.
You can control the flow of data through your stream over time. For example you may want to batch all elements that have arrived in the last minute, or slow down the flow of data through sensitive parts of the pipeline, particularly when they may be writing to slow resources like databases.
rapidz helps you do these operations both with operations like delay
,
rate_limit
, and timed_window
, and also by passing Tornado futures back through the
pipeline. As data moves forward through the pipeline, futures that signal work
completed move backwards. In this way you can reliably avoid buildup of data
in slower parts of your pipeline.
Lets consider the following example that reads JSON data from a file and inserts it into a database using an async-aware insertion function.
async def write_to_database(...):
...
# build pipeline
source = Source()
source.map(json.loads).sink(write_to_database)
async def process_file(fn):
with open(fn) as f:
for line in f:
await source.emit(line) # wait for pipeline to clear
As we call the write_to_database
function on our parsed JSON data it
produces a future for us to signal that the writing process has finished.
rapidz will ensure that this future is passed all the way back to the
source.emit
call, so that user code at the start of our pipeline can await
on it. This allows us to avoid buildup even in very large and complex streams.
We always pass futures back to ensure responsiveness.
But wait, maybe we don’t mind having a few messages in memory at once, this
will help steady the flow of data so that we can continue to work even if our
sources or sinks become less productive for brief periods. We might add a
buffer
just before writing to the database.
source.map(json.loads).buffer(100).sink(write_to_database)
And if we are pulling from an API with known limits then we might want to introduce artificial rate limits at 10ms.
source.rate_limit(0.010).map(json.loads).buffer(100).sink(write_to_database)
Operations like these (and more) allow us to shape the flow of data through our pipelines.
Modifying and Cleaning up Streams¶
When you call Stream
you create a stream. When you call any method on a
Stream
, like Stream.map
, you also create a stream. All operations can
be chained together. Additionally, as discussed in the section on Branching,
you can split multiple streams off of any point. Streams will pass their
outputs on to all downstream streams so that anyone can hook in at any point,
and get a full view of what that stream is producing.
If you delete a part of a stream then it will stop getting data. rapidz
follows normal Python garbage collection semantics so once all references to a
stream have been lost those operations will no longer occur. The one counter
example to this is sink
, which is intended to be used with side effects and
will stick around even without a reference.
Note
Sink streams store themselves in rapidz.core._global_sinks
. You
can remove them permanently by clearing that collection.
>>> source.map(print) # this doesn't do anything
>>> source.sink(print) # this stays active even without a reference
>>> s = source.map(print) # this works too because we have a handle to s
Recursion and Feedback¶
By connecting sources to sinks you can create feedback loops. As an example, here is a tiny web crawler:
from rapidz import Stream
source = Stream()
pages = source.unique()
content = pages.map(requests.get).map(lambda x: x.content)
links = content.map(get_list_of_links).flatten()
links.sink(source.emit) # pipe new links back into pages
pages.sink(print)
>>> source.emit('http://github.com')
http://github.com
http://github.com/features
http://github.com/business
http://github.com/explore
http://github.com/pricing
...
Performance¶
rapidz adds microsecond overhead to normal Python operations.
from rapidz import Stream
source = Stream()
def inc(x):
return x + 1
source.sink(inc)
In [5]: %timeit source.emit(1)
100000 loops, best of 3: 3.19 µs per loop
In [6]: %timeit inc(1)
10000000 loops, best of 3: 91.5 ns per loop
You may want to avoid pushing millions of individual elements per second through a stream. However, you can avoid performance issues by collecting lots of data into single elements, for example by pushing through Pandas dataframes instead of individual integers and strings. This will be faster regardless, just because projects like NumPy and Pandas can be much faster than Python generally.
In the following example we pass filenames through a stream, convert them to Pandas dataframes, and then map pandas-level functions on those dataframes. For operations like this rapidz adds virtually no overhead.
source = Stream()
s = source.map(pd.read_csv).map(lambda df: df.value.sum()).accumulate(add)
for fn in glob('data/2017-*-*.csv'):
source.emit(fn)
Streams provides higher level APIs for situations just like this one. You may want to read further about collections