API¶
Stream¶
| 
 | A Stream is an infinite sequence of data | 
| 
 | Accumulate results with previous state | 
| 
 | Allow results to pile up at this point in the stream | 
| 
 | Hold elements in a cache and emit them as a collection when flushed. | 
| 
 | Combine multiple streams together to a stream of tuples | 
| 
 | Connect this stream to a downstream element. | 
| 
 | Add a time delay to results | 
| 
 | Disconnect this stream from any upstream sources | 
| 
 | Disconnect this stream to a downstream element. | 
| 
 | Only pass through elements that satisfy the predicate | 
| 
 | Flatten streams of lists or iterables into a stream of elements | 
| 
 | Apply a function to every element in the stream | 
| 
 | Partition stream into tuples of equal size | 
| 
 | Limit the flow of data | 
| 
 | Convert local stream to Dask Stream | 
| 
 | Apply a function on every element | 
| 
 | Produce overlapping tuples of size n | 
| 
 | Apply a function to every element in the stream, splayed out | 
| 
 | Emit a tuple of collected results every interval | 
| 
 | Combine multiple streams into one | 
| 
 | Avoid sending through repeated elements | 
| 
 | Select elements from elements in the stream. | 
| 
 | Combine streams together into a stream of tuples | 
| 
 | Combine multiple streams together to a stream of tuples | 
Sources¶
| 
 | Stream over filenames in a directory | 
| 
 | Accepts messages from Kafka | 
| 
 | Stream data from a text file | 
DaskStream¶
| 
 | A Parallel stream using Dask | 
| 
 | Wait on and gather results from DaskStream to local Stream | 
Definitions¶
- 
rapidz.accumulate(upstream, func, start='--no-default--', returns_state=False, reset_stream=None, **kwargs)¶
- Accumulate results with previous state - This performs running or cumulative reductions, applying the function to the previous total and the new element. The function should take two arguments, the previous accumulated state and the next element and it should return a new accumulated state. - Parameters
- func: callable
- start: object
- Initial value. Defaults to the first submitted element 
- returns_state: boolean, optional
- If true then func should return both the state and the value to emit If false then both values are the same, and func returns one value 
- reset_streamStream instance or None, optional
- If not None, when the - reset_streamstream emits the accumulate node’s state will revert to the initial state (set by- start), defaults to None
- **kwargs:
- Keyword arguments to pass to func 
 
 - Examples - >>> source = Stream() >>> source.accumulate(lambda acc, x: acc + x).sink(print) >>> for i in range(5): ... source.emit(i) 1 3 6 10 
- 
rapidz.buffer(upstream, n, **kwargs)¶
- Allow results to pile up at this point in the stream - This allows results to buffer in place at various points in the stream. This can help to smooth flow through the system when backpressure is applied. 
- 
rapidz.collect(upstream, cache=None, **kwargs)¶
- Hold elements in a cache and emit them as a collection when flushed. - Examples - >>> source1 = Stream() >>> source2 = Stream() >>> collector = collect(source1) >>> collector.sink(print) >>> source2.sink(collector.flush) >>> source1.emit(1) >>> source1.emit(2) >>> source2.emit('anything') # flushes collector ... [1, 2] 
- 
rapidz.combine_latest(*upstreams, **kwargs)¶
- Combine multiple streams together to a stream of tuples - This will emit a new tuple of all of the most recent elements seen from any stream. - Parameters
- emit_onstream or list of streams or None
- only emit upon update of the streams listed. If None, emit on update from any stream 
 
 - See also 
- 
rapidz.delay(upstream, interval, **kwargs)¶
- Add a time delay to results 
- 
rapidz.filter(upstream, predicate, *args, **kwargs)¶
- Only pass through elements that satisfy the predicate - Parameters
- predicatefunction
- The predicate. Should return True or False, where True means that the predicate is satisfied. 
 
 - Examples - >>> source = Stream() >>> source.filter(lambda x: x % 2 == 0).sink(print) >>> for i in range(5): ... source.emit(i) 0 2 4 
- 
rapidz.flatten(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)¶
- Flatten streams of lists or iterables into a stream of elements - See also - Examples - >>> source = Stream() >>> source.flatten().sink(print) >>> for x in [[1, 2, 3], [4, 5], [6, 7, 7]]: ... source.emit(x) 1 2 3 4 5 6 7 
- 
rapidz.map(upstream, func, *args, **kwargs)¶
- Apply a function to every element in the stream - Parameters
- func: callable
- *args :
- The arguments to pass to the function. 
- **kwargs:
- Keyword arguments to pass to func 
 
 - Examples - >>> source = Stream() >>> source.map(lambda x: 2*x).sink(print) >>> for i in range(5): ... source.emit(i) 0 2 4 6 8 
- 
rapidz.partition(upstream, n, **kwargs)¶
- Partition stream into tuples of equal size - Examples - >>> source = Stream() >>> source.partition(3).sink(print) >>> for i in range(10): ... source.emit(i) (0, 1, 2) (3, 4, 5) (6, 7, 8) 
- 
rapidz.rate_limit(upstream, interval, **kwargs)¶
- Limit the flow of data - This stops two elements of streaming through in an interval shorter than the provided value. - Parameters
- interval: float
- Time in seconds 
 
 
- 
rapidz.sink(upstream, func, *args, **kwargs)¶
- Apply a function on every element - See also - map,- Stream.sink_to_list- Examples - >>> source = Stream() >>> L = list() >>> source.sink(L.append) >>> source.sink(print) >>> source.sink(print) >>> source.emit(123) 123 123 >>> L [123] 
- 
rapidz.sliding_window(upstream, n, **kwargs)¶
- Produce overlapping tuples of size n - Examples - >>> source = Stream() >>> source.sliding_window(3).sink(print) >>> for i in range(8): ... source.emit(i) (0, 1, 2) (1, 2, 3) (2, 3, 4) (3, 4, 5) (4, 5, 6) (5, 6, 7) 
- 
rapidz.Stream(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)¶
- A Stream is an infinite sequence of data - Streams subscribe to each other passing and transforming data between them. A Stream object listens for updates from upstream, reacts to these updates, and then emits more data to flow downstream to all Stream objects that subscribe to it. Downstream Stream objects may connect at any point of a Stream graph to get a full view of the data coming off of that point to do with as they will. - Parameters
- asynchronous: boolean or None
- Whether or not this stream will be used in asynchronous functions or normal Python functions. Leave as None if you don’t know. True will cause operations like emit to return awaitable Futures False will use an Event loop in another thread (starts it if necessary) 
- ensure_io_loop: boolean
- Ensure that some IOLoop will be created. If asynchronous is None or False then this will be in a separate thread, otherwise it will be IOLoop.current 
 
 - Examples - >>> def inc(x): ... return x + 1 - >>> source = Stream() # Create a stream object >>> s = source.map(inc).map(str) # Subscribe to make new streams >>> s.sink(print) # take an action whenever an element reaches the end - >>> L = list() >>> s.sink(L.append) # or take multiple actions (streams can branch) - >>> for i in range(5): ... source.emit(i) # push data in at the source '1' '2' '3' '4' '5' >>> L # and the actions happen at the sinks ['1', '2', '3', '4', '5'] 
- 
rapidz.timed_window(upstream, interval, **kwargs)¶
- Emit a tuple of collected results every interval - Every - intervalseconds this emits a tuple of all of the results seen so far. This can help to batch data coming off of a high-volume stream.
- 
rapidz.union(*upstreams, **kwargs)¶
- Combine multiple streams into one - Every element from any of the upstreams streams will immediately flow into the output stream. They will not be combined with elements from other streams. - See also - Stream.zip,- Stream.combine_latest
- 
rapidz.unique(upstream, history=None, key=<function identity>, **kwargs)¶
- Avoid sending through repeated elements - This deduplicates a stream so that only new elements pass through. You can control how much of a history is stored with the - history=parameter. For example setting- history=1avoids sending through elements when one is repeated right after the other.- Examples - >>> source = Stream() >>> source.unique(history=1).sink(print) >>> for x in [1, 1, 2, 2, 2, 1, 3]: ... source.emit(x) 1 2 1 3 
- 
rapidz.pluck(upstream, pick, **kwargs)¶
- Select elements from elements in the stream. - Parameters
- pluckobject, list
- The element(s) to pick from the incoming element in the stream If an instance of list, will pick multiple elements. 
 
 - Examples - >>> source = Stream() >>> source.pluck([0, 3]).sink(print) >>> for x in [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]: ... source.emit(x) (1, 4) (4, 7) (8, 11) - >>> source = Stream() >>> source.pluck('name').sink(print) >>> for x in [{'name': 'Alice', 'x': 123}, {'name': 'Bob', 'x': 456}]: ... source.emit(x) 'Alice' 'Bob' 
- 
rapidz.zip(*upstreams, **kwargs)¶
- Combine streams together into a stream of tuples - We emit a new tuple once all streams have produce a new tuple. - See also 
- 
rapidz.zip_latest(lossless, *upstreams, **kwargs)¶
- Combine multiple streams together to a stream of tuples - The stream which this is called from is lossless. All elements from the lossless stream are emitted reguardless of when they came in. This will emit a new tuple consisting of an element from the lossless stream paired with the latest elements from the other streams. Elements are only emitted when an element on the lossless stream are received, similar to - combine_latestwith the- emit_onflag.- See also - Stream.combine_latest,- Stream.zip
- 
rapidz.filenames(path, poll_interval=0.1, start=False, **kwargs)¶
- Stream over filenames in a directory - Parameters
- path: string
- Directory path or globstring over which to search for files 
- poll_interval: Number
- Seconds between checking path 
- start: bool (False)
- Whether to start running immediately; otherwise call stream.start() explicitly. 
 
 - Examples - >>> source = Stream.filenames('path/to/dir') # doctest: +SKIP >>> source = Stream.filenames('path/to/*.csv', poll_interval=0.500) # doctest: +SKIP 
- 
rapidz.from_kafka(topics, consumer_params, poll_interval=0.1, start=False, **kwargs)¶
- Accepts messages from Kafka - Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/ - Parameters
- topics: list of str
- Labels of Kafka topics to consume from 
- consumer_params: dict
- Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers: Connection string(s) (host:port) by which to reach Kafka group.id: Identity of the consumer. If multiple sources share the same - group, each message will be passed to only one of them. 
- poll_interval: number
- Seconds that elapse between polling Kafka for new messages 
- start: bool (False)
- Whether to start polling upon instantiation 
 
 
- 
rapidz.from_textfile(f, poll_interval=0.1, delimiter='\n', start=False, **kwargs)¶
- Stream data from a text file - Parameters
- f: file or string
- poll_interval: Number
- Interval to poll file for new data in seconds 
 - delimiter: str (“ 
- “)
- Character(s) to use to split the data into parts - start: bool (False)
- Whether to start running immediately; otherwise call stream.start() explicitly. 
 
 
- Returns
- Stream
 
 
- 
rapidz.dask.DaskStream(*args, **kwargs)¶
- A Parallel stream using Dask - This object is fully compliant with the - rapidz.core.Streamobject but uses a Dask client for execution. Operations like- mapand- accumulatesubmit functions to run on the Dask instance using- dask.distributed.Client.submitand pass around Dask futures. Time-based operations like- timed_window, buffer, and so on operate as normal.- Typically one transfers between normal Stream and DaskStream objects using the - Stream.scatter()and- DaskStream.gather()methods.- See also - dask.distributed.Client- Examples - >>> from dask.distributed import Client >>> client = Client() - >>> from rapidz import Stream >>> source = Stream() >>> source.scatter().map(func).accumulate(binop).gather().sink(...) 
- 
rapidz.dask.gather(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)¶
- Wait on and gather results from DaskStream to local Stream - This waits on every result in the stream and then gathers that result back to the local stream. Warning, this can restrict parallelism. It is common to combine a - gather()node with a- buffer()to allow unfinished futures to pile up.- See also - buffer,- scatter- Examples - >>> local_stream = dask_stream.buffer(20).gather()