import os
import time
import numpy as np
from bluesky.callbacks.core import CallbackBase
from skbeam.io import save_output
from skbeam.io.fit2d import fit2d_save
from tifffile import imsave
from xpdan.formatters import pfmt, clean_template, render2
from xpdan.io import pdf_saver, dump_yml
from xpdan.vend.callbacks.core import Retrieve
from xpdtools.dev_utils import _timestampstr
[docs]class StartStopCallback(CallbackBase):
"""Print the time for analysis"""
def __init__(self):
self.t0 = 0
self.event_t0 = None
self.total_analysis_time = 0
self.n_events = 1
[docs] def start(self, doc):
self.t0 = time.time()
self.total_analysis_time = 0
self.n_events = 1
print("START ANALYSIS ON {}".format(doc["uid"]))
[docs] def event(self, doc):
self.n_events = doc["seq_num"]
if not self.event_t0:
self.event_t0 = time.time()
else:
self.total_analysis_time += time.time() - self.event_t0
print(
"Single Event Analysis time {}".format(
time.time() - self.event_t0
)
)
self.event_t0 = time.time()
[docs] def stop(self, doc):
print("FINISH ANALYSIS ON {}".format(doc.get("run_start", "NA")))
print(
"Average Event analysis time {}".format(
self.total_analysis_time / self.n_events
)
)
print("Analysis time {}".format(time.time() - self.t0))
[docs]class SaveBaseClass(Retrieve):
"""Base class for saving files with human friendly file names.
For each document this class applys a format to the string based off the
document which has been seen. When the time comes for the file to be
written any templating which has not been formatted will be removed from
the filename.
Parameters
----------
template : str
The templated filename
handler_reg : dict
The registry of file handlers for loading files from disk
root_map : dict
Mapping between the old file root and a new root, used for loading
files from disk
kwargs : dict
All extra kwargs are passed to the filename formatter when the start
document is received
Notes
-----
Every instance of ``__independent_vars__`` will be replaced with
``{name}_{data}_{units}_`` for each independent variable of the experiment
in the template.
"""
def __init__(
self, template, handler_reg, root_map=None, base_folders=None, **kwargs
):
if base_folders is None:
base_folders = []
elif isinstance(base_folders, str):
base_folders = [base_folders]
self.base_folders = base_folders
self._template = template
self.start_template = ""
self.descriptor_templates = {}
self.dim_names = []
self.kwargs = kwargs
self.in_dep_shapes = {}
self.dep_shapes = {}
# If you see this filename something bad happened (no metadata was
# captured/formatted)
self.filenames = "something_horrible_happened.xxx"
super().__init__(handler_reg, root_map)
[docs] def start(self, doc):
# Get the independent vars
self.dim_names = [
d[0][0]
for d in doc.get("hints", {}).get("dimensions")
if d[0][0] != "time"
]
if "original_start_uid" not in doc:
doc["original_start_uid"] = doc["uid"]
if "original_start_time" not in doc:
doc["original_start_time"] = doc["time"]
# use the magic formatter to leave things behind
self.start_template = render2(
self._template,
start=doc,
human_timestamp=_timestampstr(doc["original_start_time"]),
**doc,
**self.kwargs,
)
return super().start(doc)
[docs] def descriptor(self, doc):
self.in_dep_shapes = {
n: doc["data_keys"][n]["shape"] for n in self.dim_names
}
self.dep_shapes = {
n: doc["data_keys"][n]["shape"]
for n in set(self.dim_names) ^ set(doc["data_keys"])
}
# Use independent vars to create the filename
independent_var_string = "_"
for dim in sorted(self.dim_names):
# Only use scalar data in filenames
if len(self.in_dep_shapes[dim]) == 0:
independent_var_string += "{name}_{data}_{units}_".format(
name=dim,
data=f"{{event[data][{dim}]:1.{doc['data_keys'][dim]['precision']}f}}",
# TODO: fill in the sig figs
units=f"{doc['data_keys'][dim].get('units', 'arb')}",
)
self.descriptor_templates[doc["uid"]] = pfmt.format(
self.start_template,
descriptor=doc,
__independent_vars__=independent_var_string,
)
return super().descriptor(doc)
[docs] def event(self, doc):
self.filenames = [
pfmt.format(
self.descriptor_templates[doc["descriptor"]],
event=doc,
base_folder=bf,
)
.replace(".", ",")
.replace("__", "_")
for bf in self.base_folders
]
# Note that formally there are more steps to the formatting, but we
# should have the folder by now
for filename in self.filenames:
print(f"Saving file to {filename}")
os.makedirs(os.path.dirname(filename), exist_ok=True)
return super().event(doc)
[docs]class SaveTiff(SaveBaseClass):
"""Callback for saving Tiff files"""
[docs] def event(self, doc):
# fill the document
doc = super().event(doc)
for two_d_var in [
k for k, v in self.dep_shapes.items() if len(v) == 2
]:
for filename in self.filenames:
imsave(
clean_template(
pfmt.format(filename, ext=f"_{two_d_var}.tiff")
),
doc["data"][two_d_var],
)
[docs]class SaveIntensity(SaveBaseClass):
"""Callback for saving Q and tth ``.chi`` files"""
[docs] def event(self, doc):
# fill the document
doc = super().event(doc)
for one_d_ind_var in [
k for k, v in self.in_dep_shapes.items() if len(v) == 1
]:
for one_d_dep_var in [
k for k, v in self.dep_shapes.items() if len(v) == 1
]:
for filename in self.filenames:
save_output(
doc["data"][one_d_ind_var],
doc["data"][one_d_dep_var],
clean_template(
pfmt.format(
filename,
ext=f"_{one_d_dep_var}_{one_d_ind_var}",
)
),
{"tth": "2theta", "q": "Q"}.get(one_d_ind_var),
)
[docs]class SaveMask(SaveBaseClass):
"""Callback for saving masks as ``.msk`` and ``.npy`` files"""
[docs] def event(self, doc):
# fill the document
doc = super().event(doc)
for two_d_var in [
k for k, v in self.dep_shapes.items() if len(v) == 2
]:
for filename in self.filenames:
fit2d_save(
np.flipud(doc["data"][two_d_var]),
clean_template(pfmt.format(filename, ext="")),
)
np.save(
clean_template(pfmt.format(filename, ext="_mask.npy")),
doc["data"][two_d_var],
)
[docs]class SavePDFgetx3(SaveBaseClass):
"""Callback for saving PDF, F(Q), S(Q) files"""
[docs] def event(self, doc):
# fill the document
doc = super().event(doc)
for one_d_ind_var in [
k for k, v in self.in_dep_shapes.items() if len(v) == 1
]:
for one_d_dep_var in [
k for k, v in self.dep_shapes.items() if len(v) == 1
]:
for filename in self.filenames:
pdf_saver(
doc["data"][one_d_ind_var],
doc["data"][one_d_dep_var],
doc["data"]["config"],
clean_template(
pfmt.format(filename, ext=f".{one_d_dep_var}")
),
)
[docs]class SaveCalib(SaveBaseClass):
"""Callback for saving pyFAI calibrations as ``.poni`` files"""
[docs] def event(self, doc):
doc = super().event(doc)
for filename in self.filenames:
doc["data"]["calibration"].save(
clean_template(pfmt.format(filename, ext=".poni"))
)
SAVER_MAP = {
"dark_sub": SaveTiff,
"integration": SaveIntensity,
"mask": SaveMask,
"pdf": SavePDFgetx3,
"fq": SavePDFgetx3,
"sq": SavePDFgetx3,
"calib": SaveCalib,
"raw": SaveMeta,
}