DataFrames¶
When handling large volumes of streaming tabular data it is often more efficient to pass around larger Pandas dataframes with many rows each rather than pass around individual Python tuples or dicts. Handling and computing on data with Pandas can be much faster than operating on Python objects.
So one could imagine building streaming dataframe pipelines using the .map
and .accumulate
streaming operators with functions that consume and produce
Pandas dataframes as in the following example:
from rapidz import Stream
def query(df):
return df[df.name == 'Alice']
def aggregate(acc, df):
return acc + df.amount.sum()
stream = Stream()
stream.map(query).accumulate(aggregate, start=0)
This is fine, and straightforward to do if you understand rapidz.core
,
Pandas, and have some skill with developing algorithms.
Streaming Dataframes¶
The rapidz.dataframe
module provides a streaming dataframe object that
implements many of these algorithms for you. It provides a Pandas-like
interface on streaming data. Our example above is rewritten below using
streaming dataframes:
import pandas as pd
from rapidz.dataframe import DataFrame
example = pd.DataFrame({'name': [], 'amount': []})
sdf = DataFrame(stream, example=example)
sdf[sdf.name == 'Alice'].amount.sum()
The two examples are identical in terms of performance and execution. The
resulting streaming dataframe contains a .stream
attribute which is
equivalent to the stream
produced in the first example. Streaming
dataframes are only syntactic sugar on core streams.
Supported Operations¶
Streaming dataframes support the following classes of operations
Elementwise operations like
df.x + 1
Filtering like
df[df.name == 'Alice']
Column addition like
df['z'] = df.x + df.y
Reductions like
df.amount.mean()
Groupby-aggregations like
df.groupby(df.name).amount.mean()
Windowed aggregations (fixed length) like
df.window(n=100).amount.sum()
Windowed aggregations (index valued) like
df.window(value='2h').amount.sum()
Windowed groupby aggregations like
df.window(value='2h').groupby('name').amount.sum()
DataFrame Aggregations¶
Dataframe aggregations are composed of an aggregation (like sum, mean, …) and a windowing scheme (fixed sized windows, index-valued, all time, …)
Aggregations¶
Streaming Dataframe aggregations are built from three methods
initial
: Creates initial state given an empty example dataframeon_new
: Updates state and produces new result to emit given new dataon_old
: Updates state and produces new result to emit given decayed data
So a simple implementation of sum
as an aggregation might look like the
following:
from rapidz.dataframe import Aggregation
class Mean(Aggregation):
def initial(self, new):
state = new.iloc[:0].sum(), new.iloc[:0].count()
return state
def on_new(self, state, new):
total, count = state
total = total + new.sum()
count = count + new.count()
new_state = (total, count)
new_value = total / count
return new_state, new_value
def on_old(self, state, old):
total, count = state
total = total - new.sum() # switch + for - here
count = count - new.count() # switch + for - here
new_state = (total, count)
new_value = total / count
return new_state, new_value
These aggregations can then used in a variety of different windowing schemes
with the aggregate
method as follows:
df.aggregate(Mean())
df.window(n=100).aggregate(Mean())
df.window(value='60s').aggregate(Mean())
whose job it is to deliver new and old data to your aggregation for processing.
Windowing Schemes¶
Different windowing schemes like fixed sized windows (last 100 elements) or value-indexed windows (last two hours of data) will track newly arrived and decaying data and call these methods accordingly. The mechanism to track data arriving and leaving is kept orthogonal from the aggregations themselves. These windowing schemes include the following:
All previous data. Only
initial
andon_new
are called,on_old
is never called.>>> df.sum()
The previous
n
elements>>> df.window(n=100).sum()
An index range, like a time range for a datetime index
>>> df.window(value='2h').sum()
Although this can be done for any range on any type of index, time is just a common case.
Windowing schemes generally maintain a deque of historical values within accumulated state. As new data comes in they inspect that state and eject data that no longer falls within the window.
Grouping¶
Groupby aggregations also maintain historical data on the grouper and perform a parallel aggregation on the number of times any key has been seen, removing that key once it is no longer present.
Dask¶
In all cases, dataframe operations are only implemented with the .map
and
.accumulate
operators, and so are equally compatible with core Stream
and DaskStream
objects.
Not Yet Supported¶
Streaming dataframes algorithms do not currently pay special attention to data arriving out-of-order.