shed package

shed.simple module

Nodes for translating between base data and event model

class shed.simple.AlignEventStreams(*upstreams, event_stream_name='--ALL THE DOCS--', stream_name=None, **kwargs)[source]

Bases: shed.simple.align_event_streams

class shed.simple.LastCache(upstream=None, upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)[source]

Bases: rapidz.core.Stream

Cache the last event in all streams then emit them under their own descriptor when the stop document comes down.

update(x, who=None)[source]
class shed.simple.SimpleFromEventStream(doc_type, data_address, upstream=None, event_stream_name='--ALL THE DOCS--', stream_name=None, principle=False, **kwargs)[source]

Bases: shed.simple.simple_from_event_stream

class shed.simple.SimpleToEventStream(upstream, data_keys=None, stream_name=None, data_key_md=None, **kwargs)[source]

Bases: shed.simple.simple_to_event_stream

class shed.simple.align_event_streams(*upstreams, event_stream_name='--ALL THE DOCS--', stream_name=None, **kwargs)[source]

Bases: rapidz.core.zip

Zips and aligns multiple streams of documents, note that the last upstream takes precedence where merging is not possible, this requires the two streams to be of equal length.

update(x, who=None)[source]
shed.simple.build_upstream_node_set(node, s=None)[source]

Build a set of all the nodes in a rapidz graph

Parameters
  • node (Stream) – The node to use as a starting point for building the set

  • s (set or None) – The set to put the nodes into. If None return a new set full of nodes

Returns

s – The set of nodes in the graph

Return type

set

class shed.simple.simple_from_event_stream(upstream, doc_type, data_address, event_stream_name='--ALL THE DOCS--', stream_name=None, principle=False, **kwargs)[source]

Bases: rapidz.core.Stream

Extracts data from the event stream, and passes it downstream.

Parameters
  • doc_type ({‘start’, ‘descriptor’, ‘event’, ‘stop’}) – The type of document to extract data from

  • data_address (tuple) – A tuple of successive keys walking through the document considered, if the tuple is empty all the data from that document is returned as a dict

  • upstream (Stream instance or None, optional) – The upstream node to receive streams from, defaults to None

  • event_stream_name (str, optional) – Filter by en event stream name (see : http://nsls-ii.github.io/databroker/api.html?highlight=stream_name#data)

  • stream_name (str, optional) – Name for this stream node

  • principle (bool, optional) – If True then when this node receives a stop document then all downstream ToEventStream nodes will issue a stop document. Defaults to False. Note that one principle node is required for proper pipeline operation.

Notes

The result emitted from this stream no longer follows the document model.

This node also keeps track of when and which data came through the node.

Examples

>>> import uuid
>>> from rapidz import Stream
>>> from shed.translation import FromEventStream
>>> ssource = Stream()
>>> s2 = FromEventStream(source, 'event', ('data', 'motor1'))
>>> s3 = s2.map(print)
>>> from ophyd.sim import hw
>>> hw = hw()
>>> from bluesky.run_engine import RunEngine
>>> RE = RunEngine()
>>> import bluesky.plans as bp
>>> node.sink(pprint)
>>> RE.subscribe(lambda *x: source.emit(x))
>>> RE(bp.scan([hw.motor1], hw.motor1, 0, 10, 11))

prints:

>>> 1
>>> 2
>>> ...
>>> 10
update(x, who=None)[source]
class shed.simple.simple_to_event_stream(upstream, data_keys=None, stream_name=None, data_key_md=None, **kwargs)[source]

Bases: rapidz.core.Stream, shed.doc_gen.CreateDocs

Converts data into a event stream, and passes it downstream.

Parameters
  • upstream – the upstream node to receive streams from

  • data_keys (tuple, optional) – Names of the data keys. If None assume incoming data is dict and use the keys from the dict. Defauls to None

  • stream_name (str, optional) – Name for this stream node

Notes

The result emitted from this stream follows the document model. This is essentially a state machine. Transitions are: start -> stop start -> descriptor -> event -> stop Note that start -> start is not allowed, this node always issues a stop document so the data input times can be stored.

Examples

>>> import uuid
>>> from rapidz import Stream
>>> from shed.translation import FromEventStream, ToEventStream
>>> source = Stream()
>>> s2 = FromEventStream(source, 'event', ('data', 'det_image'),
...                      principle=True)
>>> s3 = ToEventStream(s2, ('det_image',))
>>> s3.sink(print)
>>> from ophyd.sim import hw
>>> hw = hw()
>>> from bluesky.run_engine import RunEngine
>>> RE = RunEngine()
>>> import bluesky.plans as bp
>>> node.sink(pprint)
>>> RE.subscribe(lambda *x: source.emit(x))
>>> RE(bp.scan([hw.motor1], hw.motor1, 0, 10, 11))

prints:

>>> ('start',...)
>>> ('descriptor',...)
>>> ('event',...)
>>> ('stop',...)
emit_start(x)[source]
emit_stop(x)[source]
update(x, who=None)[source]
class shed.simple.simple_to_event_stream_new_api(descriptor_dicts, stream_name=None, **kwargs)[source]

Bases: rapidz.core.Stream

Converts data into a event stream, and passes it downstream.

Parameters
  • descriptor_dicts (dict) – Dictionary describing the mapping between streams and their associated metadata. Top level keys are the streams to use accept data from. The values for these keys are the entries of the descriptor document (data_keys, name, configuration, etc. see _https://nsls-ii.github.io/bluesky/event_descriptors.html for more details. Note that some of this data is automatically generated if

  • stream_name (str, optional) – Name for this stream node

Notes

The result emitted from this stream follows the document model. This is essentially a state machine. Transitions are: start -> stop start -> descriptor -> event -> stop Note that start -> start is not allowed, this node always issues a stop document so the data input times can be stored.

Additionally note that this takes advantage of Python 3.6+ order stable dictionaries. Since the key order is stable the data keys can be given in the same order as the elements of streams which contain multiple elements, see stream a in the example.

Examples

>>> from pprint import pprint
>>> from rapidz import Stream
>>> from shed import (simple_from_event_stream,
...                   simple_to_event_stream_new_api)
>>> import operator as op
>>> source = Stream()
>>> stop = simple_from_event_stream(source, 'stop', ())
>>> fes = simple_from_event_stream(source, 'event', ('data', 'motor1'),
...                                principle=True)
>>> fes2 = simple_from_event_stream(source, 'event', ('data', ),
...                                 principle=True)
>>> a = fes.map(op.add, 2).zip(fes)
>>> b = fes.combine_latest(stop, emit_on=stop).pluck(0)
>>> node = simple_to_event_stream_new_api(
...     {
...         a: {
...             'data_keys': {'motor1_2': {}, 'motor1': {}},
...         },
...         b: {
...             'name': 'final',
...             'data_keys': {'motor1': {}},
...             'configuration': {'motor1': {'hi': 'world'}}
...         },
...         fes2: {'name': 'raw'}
...     }
... )
>>> node2 = simple_to_event_stream_new_api(
...     {fes2: {'name': 'primary'}}
... )
>>> from ophyd.sim import hw
>>> hw = hw()
>>> from bluesky.run_engine import RunEngine
>>> RE = RunEngine()
>>> import bluesky.plans as bp
>>> node.sink(pprint)
>>> RE.subscribe(lambda *x: source.emit(x))
>>> RE(bp.scan([hw.motor1], hw.motor1, 0, 10, 11))

prints:

>>> ('start',...)
>>> ('descriptor',...)
>>> ('event',...)
>>> ('stop',...)
emit_start(x)[source]
emit_stop(x)[source]
update(x, who=None)[source]
shed.simple.walk_to_translation(node, graph, prior_node=None)[source]

Creates a graph that is a subset of the graph from the stream.

The walk starts at a translation ToEventStream node and ends at any instances of FromEventStream or ToEventStream. Each iteration of the walk goes up one node, determines if that node is a FromEventStream node, if not walks one down to see if there are any ToEventStream nodes, if not it keeps walking up. The walk down allows us to replace live data with stored data/stubs when it comes time to get the parent uids. Graph nodes are hashes or uids of the node objects with stream=node in the nodes.

Parameters
  • node (Stream instance)

  • graph (DiGraph instance)

  • prior_node (Stream instance)

shed.simple_parallel module

class shed.simple_parallel.SimpleToEventStream(upstream, data_keys=None, stream_name=None, **kwargs)[source]

Bases: rapidz.parallel.ParallelStream, shed.doc_gen.CreateDocs

Converts data into a event stream, and passes it downstream.

Parameters
  • upstream – the upstream node to receive streams from

  • data_keys (tuple, optional) – Names of the data keys. If None assume incoming data is dict and use the keys from the dict. Defauls to None

  • stream_name (str, optional) – Name for this stream node

Notes

The result emitted from this stream follows the document model. This is essentially a state machine. Transitions are: start -> stop start -> descriptor -> event -> stop Note that start -> start is not allowed, this node always issues a stop document so the data input times can be stored.

Examples

>>> import uuid
>>> from rapidz import Stream
>>> from shed.translation import FromEventStream, ToEventStream
>>> source = Stream()
>>> s2 = FromEventStream(source, 'event', ('data', 'det_image'),
...                      principle=True)
>>> s3 = ToEventStream(s2, ('det_image',))
>>> s3.sink(print)
>>> from ophyd.sim import hw
>>> hw = hw()
>>> from bluesky.run_engine import RunEngine
>>> RE = RunEngine()
>>> import bluesky.plans as bp
>>> node.sink(pprint)
>>> RE.subscribe(lambda *x: source.emit(x))
>>> RE(bp.scan([hw.motor1], hw.motor1, 0, 10, 11))

prints:

>>> ('start',...)
>>> ('descriptor',...)
>>> ('event',...)
>>> ('stop',...)
descriptor(x)[source]
emit(x, asynchronous=False)[source]

Push data into the stream at this point

This is typically done only at source Streams but can theortically be done at any point

emit_start(x)[source]
emit_stop(x)[source]
start_doc(x)[source]
update(x, who=None)[source]

shed.replay module

shed.replay.rebuild_node(node_dict, graph)[source]
shed.replay.replay(db, hdr)[source]

Replay data analysis

Parameters
  • db (Broker instance) – The databroker to pull data from

  • hdr (Header instance) – The analyzed data header

Returns

  • loaded_graph (DiGraph) – The data processing pipeline as a graph

  • parent_nodes (dict) – The source nodes for the graph

  • data (dict) – A map between the document uids and documents

  • vs (list) – List of document uid in time order

Notes

>>> graph, parents, data, vs = replay(db, hdr)
>>> for v in vs:
...     parents[v["node"]].update(data[v["uid"]])