Source code for shed.replay

import importlib
from collections import MutableMapping, Hashable

import networkx as nx
from rapidz import Stream
from shed import SimpleFromEventStream


# One problem we're facing is that the various pipelines handle document
# filling external to the logic of the data processing (outside of the captured
# graph. This is a good thing since assuming that the documents have the data
# we're looking for is nice. However, we need to do this on the replay as well
# since our pipeline expects the data to be loaded. This could be done in the
# creation of the data dict but that requires us loading up all the data at
# once which is a major anti-pattern. We could use a filler but then we
# run into issues since we don't actually track the resource and datum
# documents in the FromEventModel nodes since they didn't exist until recently
# (and they aren't really used in the data processing).


[docs]def replay(db, hdr): """Replay data analysis Parameters ---------- db : Broker instance The databroker to pull data from hdr : Header instance The analyzed data header Returns ------- loaded_graph : DiGraph The data processing pipeline as a graph parent_nodes : dict The source nodes for the graph data : dict A map between the document uids and documents vs : list List of document uid in time order Notes ----- >>> graph, parents, data, vs = replay(db, hdr) >>> for v in vs: ... parents[v["node"]].update(data[v["uid"]]) """ data = {} parent_nodes = {} # TODO: try either raw or analysis db (or stash something to know who comes # from where) Maybe take in a list of dbs? raw_hdrs = [db[u] for u in hdr["start"]["parent_node_map"].values()] # load data from raw/partially analyzed headers # TODO: figure out how to handle filling documents, since we don't want # to fill them here but they need to be filled for raw_hdr in raw_hdrs: data.update( { d.get("uid", d.get("datum_id")): (n, d) for n, d in raw_hdr.documents() } ) # get information from old analyzed header times = hdr["stop"]["times"] graph = hdr["start"]["graph"] loaded_graph = nx.node_link_graph(graph) for n in nx.topological_sort(loaded_graph): loaded_graph.nodes[n]["stream"] = rebuild_node( loaded_graph.nodes[n]["stream"], loaded_graph ) for node_uid in hdr["start"]["parent_node_map"]: parent_nodes[node_uid] = loaded_graph.nodes[node_uid]["stream"] vs = [ {'uid': dct['uid'], 'node': dct['node']} for dct in sorted( times, key=lambda d: d['time'] ) ] return loaded_graph, parent_nodes, data, vs
[docs]def rebuild_node(node_dict, graph): d = dict(node_dict) node = getattr(importlib.import_module(d["mod"]), d["name"]) d.pop("name") d.pop("mod") aa = [] for a in d["args"]: if isinstance(a, MutableMapping) and a.get("name") and a.get("mod"): aa.append(getattr(importlib.import_module(a["mod"]), a["name"])) elif isinstance(a, (tuple, list)): aa.append(a) elif a in graph.nodes: aa.append(graph.nodes[a]["stream"]) else: aa.append(a) d["args"] = aa kk = {} for k, a in d["kwargs"].items(): # We can't check if non hashables are in the graph (also I don't think # we can put non hashables as nodes in the graph) if isinstance(a, Hashable) and a in graph.nodes: kk[k] = graph.nodes[a]["stream"] elif isinstance(a, MutableMapping) and a.get("name") and a.get("mod"): kk[k] = getattr(importlib.import_module(a["mod"]), a["name"]) # If there is an upstream node for our FromEventStream node then # it is out of scope, make a Placeholder node to keep the instantiation # happy elif issubclass(node, SimpleFromEventStream) and k == "upstream": kk[k] = Stream(stream_name="Placeholder") else: kk[k] = a d["kwargs"] = kk n = node(*d["args"], **d["kwargs"]) return n