Tutorials¶
These area tutorials designed to better understand what SHED is, and what it
attempts to resolve.
Before beginning, we’ll assume that you have the shed
and
rapidz libraries installed.
Tutorial 1 : Simple Streams¶
First, we begin with a quick review of simple streams. Let’s say we had a stream of incoming data, and needed to increment by one, and print the result. The definition is as simple as the following:
from rapidz import Stream
def myadder(x):
return x + 1
s = Stream()
s2 = s.map(myadder)
s3 = s2.sink(print)
# now send the data here
s.emit(1)
s.emit(3)
Here, the stream definition is done via s = Stream()
.
The data is not input into the stream until s.emit(var)
is called.
The incrementing by one is done via the map
method. This method
takes a function as its first argument.
If this makes sense, then we’re ready to understand event streams. If it doesn’t, then it is suggested you read the documentation and examples for the rapidz library.
Tutorial 2 : A simple Event Stream¶
An event stream is as defined by NSLS-II, which is a series of documents, beginning from a start document, descriptor, event documents and stop documents. Please refer to the NSLS-II documentation for more details.
SHED handles the translations between the event model, raw data, and back.
To better understand the general
idea, let’s begin with a simple event stream.
We’ll use the bluesky.run_engine.RunEngine
and ophyd.sim.hw
to mimic
an experiment run at a beamline.
Here is an example pipeline which multiplies the output of a detector by 5 and
then performs a running sum of the results.
The results are then repackaged into the event model and printed via
pprint
.
Finally we run a count scan with the ab_det
detector:
from rapidz import Stream
from shed.simple import SimpleFromEventStream, SimpleToEventStream
import operator as op
from pprint import pprint
# where we'll input data
raw_source = Stream()
# extract from the events the values associated with 'data' and 'det_a'
# (principle=True means that we listen to this node for when to issue
# start and stop documents, all SHED pipelines must have at least one
# principle node)
raw_output = SimpleFromEventStream('event', ('data', 'det_a'), raw_source,
principle=True)
# multiply by 5 and performa a cumulative sum
pipeline = raw_output.map(op.mul, 5).accumulate(op.add)
# repackage the data in the event model under the name 'result'
res = SimpleToEventStream(pipeline, ('result', ))
# print out the documents as they come out
res.sink(pprint)
Now with the pipeline setup we can connect it to the RunEngine
and
run our experiment:
import bluesky.plans as bp
from ophyd.sim import hw
from bluesky import RunEngine
# make some simulated devices
hw = hw()
RE = RunEngine()
# have the RunEngine send data into the pipeline
RE.subscribe(lambda *x: raw_source.emit(x))
# Run the scan
RE(bp.scan([hw.ab_det], hw.motor1, 0, 4, 5))
We can also subscribe BestEffortCallback
into the pipeline for live
visualization:
from bluesky.utils import install_qt_kicker()
from bluesky.callbacks.best_effort import BestEffortCallback
install_qt_kicker()
bec = BestEffortCallback()
# AlignEventStream so we inherit scan details
# starsink because we need to splay out the data as args
res.AlignEventStreams(raw_source).starsink(bec)
We can also extract data from other documents, for instance from the start document:
from_start = SimpleFromEventStream('start', ('my_number', ),
upstream=raw_source)
from_start.sink(print)
RE(bp.count([hw.ab_det], 5), my_number=3)
Finally we can send the data to a databroker:
from databroker import Broker
# create a temporary database
db = Broker.named('temp')
# we use starsink here because we need to splay out the (name, document)
# pair into the args
res.starsink(db.insert)
Tutorial 3: Replay¶
To capture the provenance of the data processing we need to use the full
translation nodes (rather than the Simple
nodes).
We can use the pipeline from above with a small modification:
from rapidz import Stream
from shed.translation import FromEventStream, ToEventStream
import operator as op
from databroker import Broker
import bluesky.plans as bp
from ophyd.sim import hw
from bluesky import RunEngine
from pprint import pprint
# where we'll input data
raw_source = Stream()
# extract from the events the values associated with 'data' and 'det_a'
# (principle=True means that we listen to this node for when to issue
# start and stop documents, all SHED pipelines must have at least one
# principle node)
raw_output = FromEventStream('event', ('data', 'det_a'),
upstream=raw_source, principle=True)
# multiply by 5 and performa a cumulative sum
pipeline = raw_output.map(op.mul, 5).accumulate(op.add)
# repackage the data in the event model under the name 'result'
res = ToEventStream(pipeline, ('result', ))
# print out the documents as they come out
res.sink(pprint)
# create a temporary database
db = Broker.named('temp')
# Make certain that the data is DB friendly (serialize the graph)
# we use starsink here because we need to splay out the (name, document)
# pair into the args
res.DBFriendly().starsink(db.insert)
# make some simulated devices
hw = hw()
RE = RunEngine()
# Send raw data to the databroker as well
RE.subscribe(db.insert)
# have the RunEngine send data into the pipeline
RE.subscribe(lambda *x: raw_source.emit(x))
# Run the scan
RE(bp.count([hw.ab_det], 5))
Now that we have created the pipeline, ran the experiment, and captured it into the databroker we can then replay the analysis:
from shed.replay import replay
from rapidz.graph import _clean_text, readable_graph
# get the graph and data
graph, parents, data, vs = replay(db, db[-1])
# make a graph with human readable names
for k, v in graph.nodes.items():
v.update(label=_clean_text(str(v['stream'])).strip())
graph = readable_graph(graph)
# create a plot of the graph so we can look at it and figure out what
# the node names are
# the file will be named ``mystream.png``
graph.nodes['data det_a FromEventStream']['stream'].visualize()
# print the results
graph.nodes['result ToEventStream']['stream'].sink(pprint)
# change the multiplication factor from 5 to 10
graph.nodes['map; mul']['stream'].args = (10, )
# rerun the analysis and print the results
for v in vs:
dd = data[v['uid']]
parents[v["node"]].update(dd)