shed package¶
shed.simple module¶
Nodes for translating between base data and event model
-
class
shed.simple.
AlignEventStreams
(*upstreams, event_stream_name='--ALL THE DOCS--', stream_name=None, **kwargs)[source]¶
-
class
shed.simple.
LastCache
(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)[source]¶ Bases:
rapidz.core.Stream
Cache the last event in all streams then emit them under their own descriptor when the stop document comes down.
-
class
shed.simple.
SimpleFromEventStream
(doc_type, data_address, upstream=None, event_stream_name='--ALL THE DOCS--', stream_name=None, principle=False, **kwargs)[source]¶
-
class
shed.simple.
SimpleToEventStream
(upstream, data_keys=None, stream_name=None, data_key_md=None, **kwargs)[source]¶
-
class
shed.simple.
align_event_streams
(*upstreams, event_stream_name='--ALL THE DOCS--', stream_name=None, **kwargs)[source]¶ Bases:
rapidz.core.zip
Zips and aligns multiple streams of documents, note that the last upstream takes precedence where merging is not possible, this requires the two streams to be of equal length.
-
shed.simple.
build_upstream_node_set
(node, s=None)[source]¶ Build a set of all the nodes in a rapidz graph
- Parameters
node (Stream) – The node to use as a starting point for building the set
s (set or None) – The set to put the nodes into. If None return a new set full of nodes
- Returns
s – The set of nodes in the graph
- Return type
set
-
class
shed.simple.
simple_from_event_stream
(upstream, doc_type, data_address, event_stream_name='--ALL THE DOCS--', stream_name=None, principle=False, **kwargs)[source]¶ Bases:
rapidz.core.Stream
Extracts data from the event stream, and passes it downstream.
- Parameters
doc_type ({‘start’, ‘descriptor’, ‘event’, ‘stop’}) – The type of document to extract data from
data_address (tuple) – A tuple of successive keys walking through the document considered, if the tuple is empty all the data from that document is returned as a dict
upstream (Stream instance or None, optional) – The upstream node to receive streams from, defaults to None
event_stream_name (str, optional) – Filter by en event stream name (see : http://nsls-ii.github.io/databroker/api.html?highlight=stream_name#data)
stream_name (str, optional) – Name for this stream node
principle (bool, optional) – If True then when this node receives a stop document then all downstream ToEventStream nodes will issue a stop document. Defaults to False. Note that one principle node is required for proper pipeline operation.
Notes
The result emitted from this stream no longer follows the document model.
This node also keeps track of when and which data came through the node.
Examples
>>> import uuid >>> from rapidz import Stream >>> from shed.translation import FromEventStream >>> ssource = Stream() >>> s2 = FromEventStream(source, 'event', ('data', 'motor1')) >>> s3 = s2.map(print) >>> from ophyd.sim import hw >>> hw = hw() >>> from bluesky.run_engine import RunEngine >>> RE = RunEngine() >>> import bluesky.plans as bp >>> node.sink(pprint) >>> RE.subscribe(lambda *x: source.emit(x)) >>> RE(bp.scan([hw.motor1], hw.motor1, 0, 10, 11))
prints:
>>> 1 >>> 2 >>> ... >>> 10
-
class
shed.simple.
simple_to_event_stream
(upstream, data_keys=None, stream_name=None, data_key_md=None, **kwargs)[source]¶ Bases:
rapidz.core.Stream
,shed.doc_gen.CreateDocs
Converts data into a event stream, and passes it downstream.
- Parameters
upstream – the upstream node to receive streams from
data_keys (tuple, optional) – Names of the data keys. If None assume incoming data is dict and use the keys from the dict. Defauls to None
stream_name (str, optional) – Name for this stream node
Notes
The result emitted from this stream follows the document model. This is essentially a state machine. Transitions are: start -> stop start -> descriptor -> event -> stop Note that start -> start is not allowed, this node always issues a stop document so the data input times can be stored.
Examples
>>> import uuid >>> from rapidz import Stream >>> from shed.translation import FromEventStream, ToEventStream >>> source = Stream() >>> s2 = FromEventStream(source, 'event', ('data', 'det_image'), ... principle=True) >>> s3 = ToEventStream(s2, ('det_image',)) >>> s3.sink(print) >>> from ophyd.sim import hw >>> hw = hw() >>> from bluesky.run_engine import RunEngine >>> RE = RunEngine() >>> import bluesky.plans as bp >>> node.sink(pprint) >>> RE.subscribe(lambda *x: source.emit(x)) >>> RE(bp.scan([hw.motor1], hw.motor1, 0, 10, 11))
prints:
>>> ('start',...) >>> ('descriptor',...) >>> ('event',...) >>> ('stop',...)
-
class
shed.simple.
simple_to_event_stream_new_api
(descriptor_dicts, stream_name=None, **kwargs)[source]¶ Bases:
rapidz.core.Stream
Converts data into a event stream, and passes it downstream.
- Parameters
descriptor_dicts (dict) – Dictionary describing the mapping between streams and their associated metadata. Top level keys are the streams to use accept data from. The values for these keys are the entries of the
descriptor
document (data_keys
,name
,configuration
, etc. see _https://nsls-ii.github.io/bluesky/event_descriptors.html for more details. Note that some of this data is automatically generated ifstream_name (str, optional) – Name for this stream node
Notes
The result emitted from this stream follows the document model. This is essentially a state machine. Transitions are: start -> stop start -> descriptor -> event -> stop Note that start -> start is not allowed, this node always issues a stop document so the data input times can be stored.
Additionally note that this takes advantage of Python 3.6+ order stable dictionaries. Since the key order is stable the data keys can be given in the same order as the elements of streams which contain multiple elements, see stream a in the example.
Examples
>>> from pprint import pprint >>> from rapidz import Stream >>> from shed import (simple_from_event_stream, ... simple_to_event_stream_new_api) >>> import operator as op >>> source = Stream() >>> stop = simple_from_event_stream(source, 'stop', ()) >>> fes = simple_from_event_stream(source, 'event', ('data', 'motor1'), ... principle=True) >>> fes2 = simple_from_event_stream(source, 'event', ('data', ), ... principle=True) >>> a = fes.map(op.add, 2).zip(fes) >>> b = fes.combine_latest(stop, emit_on=stop).pluck(0) >>> node = simple_to_event_stream_new_api( ... { ... a: { ... 'data_keys': {'motor1_2': {}, 'motor1': {}}, ... }, ... b: { ... 'name': 'final', ... 'data_keys': {'motor1': {}}, ... 'configuration': {'motor1': {'hi': 'world'}} ... }, ... fes2: {'name': 'raw'} ... } ... ) >>> node2 = simple_to_event_stream_new_api( ... {fes2: {'name': 'primary'}} ... ) >>> from ophyd.sim import hw >>> hw = hw() >>> from bluesky.run_engine import RunEngine >>> RE = RunEngine() >>> import bluesky.plans as bp >>> node.sink(pprint) >>> RE.subscribe(lambda *x: source.emit(x)) >>> RE(bp.scan([hw.motor1], hw.motor1, 0, 10, 11))
prints:
>>> ('start',...) >>> ('descriptor',...) >>> ('event',...) >>> ('stop',...)
-
shed.simple.
walk_to_translation
(node, graph, prior_node=None)[source]¶ Creates a graph that is a subset of the graph from the stream.
The walk starts at a translation
ToEventStream
node and ends at any instances of FromEventStream or ToEventStream. Each iteration of the walk goes up one node, determines if that node is aFromEventStream
node, if not walks one down to see if there are anyToEventStream
nodes, if not it keeps walking up. The walk down allows us to replace live data with stored data/stubs when it comes time to get the parent uids. Graph nodes are hashes or uids of the node objects withstream=node
in the nodes.- Parameters
node (Stream instance)
graph (DiGraph instance)
prior_node (Stream instance)
shed.simple_parallel module¶
-
class
shed.simple_parallel.
SimpleToEventStream
(upstream, data_keys=None, stream_name=None, **kwargs)[source]¶ Bases:
rapidz.parallel.ParallelStream
,shed.doc_gen.CreateDocs
Converts data into a event stream, and passes it downstream.
- Parameters
upstream – the upstream node to receive streams from
data_keys (tuple, optional) – Names of the data keys. If None assume incoming data is dict and use the keys from the dict. Defauls to None
stream_name (str, optional) – Name for this stream node
Notes
The result emitted from this stream follows the document model. This is essentially a state machine. Transitions are: start -> stop start -> descriptor -> event -> stop Note that start -> start is not allowed, this node always issues a stop document so the data input times can be stored.
Examples
>>> import uuid >>> from rapidz import Stream >>> from shed.translation import FromEventStream, ToEventStream >>> source = Stream() >>> s2 = FromEventStream(source, 'event', ('data', 'det_image'), ... principle=True) >>> s3 = ToEventStream(s2, ('det_image',)) >>> s3.sink(print) >>> from ophyd.sim import hw >>> hw = hw() >>> from bluesky.run_engine import RunEngine >>> RE = RunEngine() >>> import bluesky.plans as bp >>> node.sink(pprint) >>> RE.subscribe(lambda *x: source.emit(x)) >>> RE(bp.scan([hw.motor1], hw.motor1, 0, 10, 11))
prints:
>>> ('start',...) >>> ('descriptor',...) >>> ('event',...) >>> ('stop',...)
shed.replay module¶
-
shed.replay.
replay
(db, hdr)[source]¶ Replay data analysis
- Parameters
db (Broker instance) – The databroker to pull data from
hdr (Header instance) – The analyzed data header
- Returns
loaded_graph (DiGraph) – The data processing pipeline as a graph
parent_nodes (dict) – The source nodes for the graph
data (dict) – A map between the document uids and documents
vs (list) – List of document uid in time order
Notes
>>> graph, parents, data, vs = replay(db, hdr) >>> for v in vs: ... parents[v["node"]].update(data[v["uid"]])