from itertools import islice
from pprint import pprint
from .dev_utils import _timestampstr
import collections
from databroker._core import Header
[docs]def sort_scans_by_hdr_key(hdrs, key, verbose=True):
"""In a list of hdrs, group the scans by header-key.
Use this function to find all the scans in the list of headers that
have a particular key value, such as 'sample_name'='Ni'
Function returns a list of indices for the position in the list,
associated with the metadata key values.
Parameters
----------
hdrs: list of Header objects
The dictionary containing {'key-value':[list of scan indices]}.
For example a search over 'sample_name' might return
{'Ni':[0,1,2,3,4,9,10],'gold nanoparticles':[5,6,7,8]}
key: str
The scans will be sorted by the values of this metadata key,
e.g., 'sample_name'
verbose: bool, optional
If true prints the results. Defaults to True
Returns
-------
dict:
The dictionary containing {'key-value':[list of scan indices]}.
For example a search over 'sample_name' might return
{'Ni':[0,1,2,3,4,9,10],'gold nanoparticles':[5,6,7,8]}
"""
d = {}
for i, hdr in enumerate(hdrs):
if key in hdr["start"].keys():
if hdr["start"][key] in d.keys():
d[hdr["start"][key]].append(i)
else:
d[hdr["start"][key]] = [i]
if verbose:
pprint(d)
return d
[docs]def scan_diff(hdrs, verbose=True, blacklist=None):
"""Get the metadata differences between scans in hdrs list
Parameters
----------
hdrs: list of Header objects
The headers to be diffed
verbose: bool, optional
If true prints the results. Defaults to True
blacklist: list of str, optional
List of keys to not be included in diff. If None, defaults to `uid`
Returns
-------
dict:
The dictionary of keys with at least one different value across the
scans. The values are the results for each header.
"""
if blacklist is None:
blacklist = ["uid"]
keys = set(
[k for hdr in hdrs for k in hdr["start"].keys() if k not in blacklist]
)
kv = {}
for k in keys:
# TODO: eventually support dict differences
# See http://stackoverflow.com/a/11092607/5100330
v = [
hdr["start"][k]
for hdr in hdrs
if k in hdr["start"].keys()
if isinstance(hdr["start"][k], collections.Hashable)
]
if len(set(v)) != 1:
kv[k] = v
if verbose:
pprint(kv)
return kv
[docs]def scan_summary(hdrs, fields=None, verbose=True):
"""Provide one line summaries of headers
Parameters
----------
hdrs: list of headers
The headers from the databroker
fields: list of str, optional
'Specify a list of fields to summarize. If None, the following
will be returned
``['sample_name', 'sp_type', 'sp_startingT', 'sp_endingT']``
defaults to None
verbose: bool, optional
If True print the summary
Returns
-------
list:
List of summary strings
"""
if fields is None:
fields = ["sample_name", "sp_type", "sp_startingT", "sp_endingT"]
fields = fields
datas = []
for i, hdr in enumerate(hdrs):
data = [
hdr["start"][key] for key in fields if key in hdr["start"].keys()
]
data2 = {
key: hdr["start"][key]
for key in fields
if key in hdr["start"].keys()
}
data2["time"] = _timestampstr(hdr["start"]["time"])
data = [_timestampstr(hdr["start"]["time"])] + data
data2["uid"] = hdr["start"]["uid"][:6]
data += [hdr["start"]["uid"][:6]]
data = [str(d) for d in data]
data = "_".join(data)
if verbose:
print((i, data2))
datas.append(data)
return datas
[docs]def query_dark(docs, db, schema=1):
"""Get dark data from databroker
Parameters
----------
db: Broker instance
docs: tuple of dict
schema: int
Schema version
Returns
-------
list of Header :
The list of headers which meet the criteria
"""
if schema == 1:
if isinstance(docs, (tuple, list)):
doc = docs[0]
else:
doc = docs
dk_uid = doc.get("sc_dk_field_uid")
if dk_uid:
try:
z = db[dk_uid]
except ValueError:
return []
else:
return z
else:
return []
[docs]def query_flat_field(docs, db, schema=1):
"""Get flat_field data from databroker
Parameters
----------
db: Broker instance
docs: tuple of dict
schema: int
Schema version
Returns
-------
list of Header :
The list of headers which meet the criteria
"""
if schema == 1:
if isinstance(docs, (tuple, list)):
doc = docs[0]
else:
doc = docs
dk_uid = doc.get("sc_flat_field_uid")
if dk_uid:
try:
z = db[dk_uid]
except ValueError:
return []
else:
return z
else:
return []
[docs]def query_background(docs, db, schema=1):
"""Get background data from databroker
Parameters
----------
db: Broker instance
docs: tuple of dict
schema: int
Schema version
Returns
-------
list of Header :
The list of headers which meet the criteria
"""
if schema == 1:
if isinstance(docs, (tuple, list)):
doc = docs[0]
else:
doc = docs
sample_name = doc.get("bkgd_sample_name")
if sample_name:
return db(
sample_name=sample_name,
bt_uid=doc["bt_uid"],
is_dark={"$exists": False},
)
else:
return []
[docs]def temporal_prox(res, docs):
# If there is only one result just use that one
if isinstance(res, Header):
return [res]
if isinstance(docs, (tuple, list)):
doc = docs[0]
else:
doc = docs
t = doc["time"]
dt_sq = [(t - r["start"]["time"]) ** 2 for r in res]
if dt_sq:
i = dt_sq.index(min(dt_sq))
min_r = [next(islice(res, i, i + 1))]
else:
min_r = []
return min_r