Source code for xpdan.startup.portable_db_server

import io
import os

import fire
import yaml
from databroker import Broker
from rapidz import Stream
from shed.writers import NpyWriter
from xpdan.vend.callbacks.core import RunRouter, ExportCallback
from xpdan.vend.callbacks.zmq import RemoteDispatcher
from xpdconf.conf import glbl_dict

portable_template = """description: '{0} database'
metadatastore:
    module: 'databroker.headersource.sqlite'
    class: 'MDS'
    config:
        directory: '{0}'
        timezone: 'US/Eastern'
assets:
    module: 'databroker.assets.sqlite'
    class: 'Registry'
    config:
        dbpath: '{0}/assets.sqlite'
handlers:
    NPY_SEQ:
        module: 'ophyd.sim'
        class: 'NumpySeqHandler'
"""

load_script = """
from databroker import Broker
import yaml
dbs = {}
for yaml_file in ['raw', 'an']:
    with open(f'{yaml_file}.yml', 'r') as f:
        dbs[yaml_file] = Broker.from_config(yaml.load(f))
"""


[docs]def run_server( folder, outbound_proxy_address=glbl_dict["outbound_proxy_address"], prefix=None, handlers=None, ): """Start up the portable databroker server Parameters ---------- folder : str The location where to save the portable databrokers outbound_proxy_address : str, optional The address and port of the zmq proxy. Defaults to ``glbl_dict["outbound_proxy_address"]`` prefix : bytes or list of bytes, optional The Publisher channels to listen to. Defaults to ``[b"an", b"raw"]`` handlers : dict The map between handler specs and handler classes, defaults to the map used by the experimental databroker if possible """ # TODO: convert to bytestrings if needed # TODO: maybe separate this into different processes? # TODO: support multiple locations for folders if prefix is None: prefix = [b"an", b"raw"] d = RemoteDispatcher(outbound_proxy_address, prefix=prefix) portable_folder = folder portable_configs = {} for folder_name in ["an", "raw"]: fn = os.path.join(portable_folder, folder_name) os.makedirs(fn, exist_ok=True) # if the path doesn't exist then make the databrokers with open( os.path.join(portable_folder, f"{folder_name}.yml"), "w" ) as f: f.write(portable_template.format(folder_name)) print(portable_template.format(folder_name)) print(fn) portable_configs[folder_name] = yaml.load( io.StringIO(portable_template.format(fn)) ) os.makedirs(os.path.join(fn, "data"), exist_ok=True) # TODO: add more files here, eg. a databroker readme/tutorial with open(os.path.join(portable_folder, "db_load.py"), "w") as f: f.write(load_script) an_broker = Broker.from_config(portable_configs["an"]) an_source = Stream() zed = an_source.Store( os.path.join( portable_configs["an"]["metadatastore"]["config"]["directory"], "data", ), NpyWriter, ) zed.starsink(an_broker.insert) raw_broker = Broker.from_config(portable_configs["raw"]) if handlers is None: handlers = raw_broker.reg.handler_reg raw_source = Stream() raw_source.starmap( ExportCallback( os.path.join( portable_configs["raw"]["metadatastore"]["config"][ "directory" ], "data", ), handler_reg=handlers, ) ).starsink(raw_broker.insert) rr = RunRouter( [ lambda x: (lambda *nd: raw_source.emit(nd)) if x.get("analysis_stage", "") == "raw" else None ] + [ lambda x: (lambda *nd: an_source.emit(nd)) if x.get("analysis_stage", None) == "pdf" else None, lambda x: (lambda *nd: an_source.emit(nd)) if x.get("analysis_stage", None) == "integration" else None, ] ) d.subscribe(rr) print("Starting Portable DB Server") d.start()
[docs]def run_main(): fire.Fire(run_server)
if __name__ == "__main__": run_main()