Source code for xpdan.startup.db_server

import fire

from rapidz import Stream
from shed.writers import NpyWriter
from xpdan.vend.callbacks.core import RunRouter
from xpdan.vend.callbacks.zmq import RemoteDispatcher
from xpdconf.conf import glbl_dict


[docs]def run_server( data_dir, outbound_proxy_address=glbl_dict["outbound_proxy_address"], prefix=b"an", ): """Start up the databroker server for analyzed data. Parameters ---------- data_dir : str The directory to save the array data into. outbound_proxy_address : str, optional The address and port of the zmq proxy. Defaults to ``glbl_dict["outbound_proxy_address"]`` prefix : bytes or list of bytes, optional The Publisher channels to listen to. Defaults to ``b"an"`` """ d = RemoteDispatcher(outbound_proxy_address, prefix=prefix) an_broker = glbl_dict["an_db"] an_source = Stream() an_source.Store(data_dir, NpyWriter).starsink(an_broker.insert) rr = RunRouter( [ lambda x: (lambda *nd: an_source.emit(nd)) if x.get("analysis_stage", None) == "pdf" else None, lambda x: (lambda *nd: an_source.emit(nd)) if x.get("analysis_stage", None) == "integration" else None, ] ) d.subscribe(rr) print("Starting DB Server") d.start()
[docs]def run_main(): fire.Fire(run_server)
if __name__ == "__main__": run_main()