Source code for xpdan.pipelines.main

import os

import numpy as np
from shed.simple import SimpleFromEventStream as FromEventStream
from rapidz import move_to_first, union
from xpdan.callbacks import StartStopCallback
from xpdan.db_utils import query_background, query_dark, temporal_prox
from xpdan.pipelines.pipeline_utils import _timestampstr, clear_combine_latest
from xpdan.vend.callbacks.core import Retrieve
from xpdconf.conf import glbl_dict
from xpdtools.calib import _save_calib_param
from xpdtools.pipelines.raw_pipeline import (
    image_process,
    calibration,
    scattering_correction,
    gen_mask,
    integration,
    pdf_gen,
)


# TODO: use oracle to get rid of this
[docs]def clear_geo_gen(source, geometry_img_shape, **kwargs): # If new calibration uid invalidate our current calibration cache a = FromEventStream("start", ("detector_calibration_client_uid",), source) move_to_first(a) ( a.unique(history=1).sink( lambda x: geometry_img_shape.lossless_buffer.clear() ) )
# TODO: use oracle to get rid of this
[docs]def clear_comp(source, iq_comp, **kwargs): # Clear composition every start document # FIXME: Needs to go after the iq_comp is defined a = FromEventStream("start", (), source) move_to_first(a) (a.sink(lambda x: clear_combine_latest(iq_comp, 1)))
[docs]def save_cal(start_timestamp, gen_geo_cal, **kwargs): # Save out calibration data to special place h_timestamp = start_timestamp.map(_timestampstr) ( gen_geo_cal.pluck(0) .zip_latest(h_timestamp) .starsink( lambda x, y: _save_calib_param( x, y, os.path.join( glbl_dict["config_base"], glbl_dict["calib_config_name"] ), ) ) ) return locals()
# TODO: chunk this up a bit more so we can separate XRD from PDF
[docs]def start_gen( raw_source, image_names=glbl_dict["image_fields"], db=glbl_dict["exp_db"], calibration_md_folder=None, **kwargs, ): """ Parameters ---------- raw_source image_name db : databroker.Broker The databroker to use calibration_md_folder kwargs Returns ------- """ if calibration_md_folder is None: calibration_md_folder = {"folder": "xpdAcq_calib_info.yml"} # raw_source.sink(lambda x: print(x[0])) # Build the general pipeline from the raw_pipeline # TODO: change this when new dark logic comes # Check that the data isn't a dark (dark_frame = True when taking a dark) not_dark_scan = FromEventStream( "start", (), upstream=raw_source, stream_name="not dark scan" ).map(lambda x: not x.get("dark_frame", False)) # Fill the raw event stream source = ( # Emit on works here because we emit on the not_dark_scan first due # to the ordering of the nodes! raw_source.combine_latest(not_dark_scan, emit_on=0) .filter(lambda x: x[1]) .pluck(0) .starmap( Retrieve(handler_reg=db.reg.handler_reg, root_map=db.reg.root_map) ) .filter(lambda x: x[0] not in ["resource", "datum"]) ) # source.sink(lambda x: print('Source says ', x)) # Get all the documents start_docs = FromEventStream("start", (), source) descriptor_docs = FromEventStream( "descriptor", (), source, event_stream_name="primary" ) event_docs = FromEventStream( "event", (), source, event_stream_name="primary" ) all_docs = event_docs.combine_latest( start_docs, descriptor_docs, emit_on=0, first=True ).starmap( lambda e, s, d: { "raw_event": e, "raw_start": s, "raw_descriptor": d, "human_timestamp": _timestampstr(s["time"]), } ) # PDF specific composition = FromEventStream("start", ("composition_string",), source) # Calibration information wavelength = FromEventStream("start", ("bt_wavelength",), source).unique( history=1 ) calibrant = FromEventStream( "start", ("dSpacing",), source, principle=True ).unique(history=1) detector = ( FromEventStream("start", ("detector",), source) .union( *[ FromEventStream( "descriptor", ( "configuration", image_name.split("_")[0], "data", f'{image_name.split("_")[0]}_detector_type', ), source, event_stream_name="primary", ) for image_name in image_names ] ) .unique(history=1) ) is_calibration_img = FromEventStream("start", (), source).map( lambda x: "detector_calibration_server_uid" in x ) # Only pass through new calibrations (prevents us from recalculating cals) geo_input = FromEventStream( "start", ("calibration_md",), source, principle=True ).unique(history=1) start_timestamp = FromEventStream("start", ("time",), source) # Clean out the cached darks and backgrounds on start # so that this will run regardless of background/dark status # note that we get the proper data (if it exists downstream) # FIXME: this is kinda an anti-pattern and needs to go lower in the # pipeline start_docs.sink(lambda x: raw_background_dark.emit(0.0)) start_docs.sink(lambda x: raw_background.emit(0.0)) start_docs.sink(lambda x: raw_foreground_dark.emit(0.0)) bg_query = start_docs.map(query_background, db=db) bg_docs = ( bg_query.zip(start_docs) .starmap(temporal_prox) .filter(lambda x: x != []) .map(lambda x: x[0].documents(fill=True)) .flatten() ) # Get foreground dark fg_dark_query = start_docs.map(query_dark, db=db) fg_dark_query.filter(lambda x: x == []).sink( lambda x: print("No dark found!") ) raw_foreground_dark = union( *[ FromEventStream( "event", ("data", image_name), fg_dark_query.filter(lambda x: x != []) .map(lambda x: x if not isinstance(x, list) else x[0]) .map(lambda x: x.documents(fill=True)) .flatten(), event_stream_name="primary", ) for image_name in image_names ] ).map(np.float32) # Get bg dark bg_dark_query = FromEventStream("start", (), bg_docs).map( query_dark, db=db ) raw_background_dark = union( *[ FromEventStream( "event", ("data", image_name), bg_dark_query.filter(lambda x: x != []) .map(lambda x: x if not isinstance(x, list) else x[0]) .map(lambda x: x.documents(fill=True)) .flatten(), stream_name="raw_background_dark", event_stream_name="primary", ) for image_name in image_names ] ).map(np.float32) # Pull darks from their stream if it exists for image_name in image_names: ( FromEventStream( "event", ("data", image_name), source, event_stream_name="dark" ) .map(np.float32) .connect(raw_foreground_dark) ) # Get background raw_background = union( *[ FromEventStream( "event", ("data", image_name), bg_docs, stream_name="raw_background", event_stream_name="primary", ) for image_name in image_names ] ).map(np.float32) # Get foreground img_counter = FromEventStream( "event", ("seq_num",), source, stream_name="seq_num", # This doesn't make sense in multi-stream settings event_stream_name="primary", ) raw_foreground = union( *[ FromEventStream( "event", ("data", image_name), source, principle=True, event_stream_name="primary", stream_name="raw_foreground", ) for image_name in image_names ] ).map(np.float32) raw_source.starsink(StartStopCallback()) return locals()
pipeline_order = [ start_gen, image_process, calibration, clear_geo_gen, save_cal, scattering_correction, gen_mask, integration, pdf_gen, clear_comp, ] # If main print visualize pipeline if __name__ == "__main__": # pragma: no cover from rapidz import Stream from rapidz.link import link raw_source = Stream(stream_name="raw_source") ns = link(*pipeline_order, raw_source=raw_source) ns["raw_source"].visualize(source_node=True)