webber.core
Base module for abstract multiprocessing system - a directed acyclic graph.
1""" 2Base module for abstract multiprocessing system - a directed acyclic graph. 3""" 4import sys as _sys 5import typing as _T 6import uuid as _uuid 7import types as _types 8import logging as _logging 9import traceback as _traceback 10import contextlib as _contextlib 11import collections.abc as _abc 12import concurrent.futures as _futures 13import networkx as _nx 14import webber.edges as _edges 15import webber.xcoms as _xcoms 16import webber.queue as _queue 17 18from webber.edges import Condition, dotdict, edgedict 19 20import queue as _q 21import itertools as _it 22 23__all__ = ["DAG", "Condition", "QueueDAG"] 24 25def _iscallable(function: any): 26 return callable(function) 27 28class _OutputLogger: 29 """ 30 Basic logger for synchronizing parallel output of Webber tasks. 31 Adapted from John Paton: https://johnpaton.net/posts/redirect-logging/ 32 """ 33 def __init__(self, name="root", level="INFO", callable_name="root", file=_sys.stdout) -> None: 34 """Initializes logger for given scope.""" 35 self.logger = _logging.getLogger(name) 36 self.name = self.logger.name 37 self.callable = callable_name 38 self.level = getattr(_logging, level) 39 self.format = _logging.Formatter( 40 "{asctime} " + f"{self.callable:>15}:" + " {message}", 41 style="{", 42 ) 43 stream_handler = _logging.StreamHandler(file) 44 stream_handler.setFormatter(self.format) 45 self.logger.addHandler(stream_handler) 46 self.logger.setLevel(self.level) 47 self._redirector = _contextlib.redirect_stdout(self) 48 # elif file is _stderr: 49 # self._redirector = _contextlib.redirect_stderr(self) 50 51 def write(self, msg: str): 52 """Writes non-empty strings to the logger's output -- stdout, by default.""" 53 if msg and not msg.isspace(): 54 self.logger.log(self.level, msg) 55 56 def flush(self): 57 """Ignores flushing of output, since logs are not concerned with such characters. (?)""" 58 pass 59 60 def __enter__(self): 61 """Allows Python built-ins for printing to forward data in scope. (?)""" 62 self._redirector.__enter__() 63 return self 64 65 def __exit__(self, exc_type, exc_value, traceback): 66 """Allows Python built-ins to exit scope on error. (?)""" 67 self._redirector.__exit__(exc_type, exc_value, traceback) 68 69def _event_wrapper(_callable: callable, _name: str, _args, _kwargs): 70 """Wrapper used by Webber DAGs to log and execute a Python callables as a unit of work.""" 71 with _OutputLogger(str(_uuid.uuid4()), "INFO", _name) as _: 72 return _callable(*_args, **_kwargs) 73 74class DAG: 75 """ 76 Directed Acyclic Graph used to represent Pythonic tasks in parallel. 77 """ 78 def add_node(self, node, *args, **kwargs) -> str: 79 """ 80 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 81 On success, return unique identifier for the new node. 82 """ 83 if not _iscallable(node): 84 err_msg = f"{node}: requested node is not a callable Python function." 85 raise TypeError(err_msg) 86 87 node_name = _edges.label_node(node) 88 89 args = tuple([ 90 arg if not isinstance(args, _xcoms.Promise) else self.resolve_promise(arg) 91 for arg in args 92 ]) 93 94 for k, val in kwargs.items(): 95 if isinstance(val, _xcoms.Promise): 96 kwargs[k] = self.resolve_promise(val) 97 98 self.graph.add_node( 99 node_for_adding=node_name, 100 callable=node, args=args, kwargs=kwargs, 101 name=node.__name__, 102 id=node_name 103 ) 104 105 return node_name 106 107 108 def add_edge( 109 self, 110 u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable], 111 continue_on: Condition = Condition.Success 112 ) -> _T.Tuple[str,str]: 113 """ 114 Adds an edge between nodes in the DAG's underlying graph, 115 so long as the requested edge is unique and has not been added previously. 116 117 On success, returns Tuple of the new edge's unique identifiers. 118 """ 119 # Validate inputs prior to execution 120 # - Nodes must be identifiers or callables 121 # - Conditions must belong to the webber.edges.Condition class 122 if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)): 123 err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable" 124 raise TypeError(err_msg) 125 if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)): 126 err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable" 127 raise TypeError(err_msg) 128 if not isinstance(continue_on, Condition): 129 raise TypeError("Edge conditions must use the webber.edges.Condition class.") 130 131 # Base Case 0: No nodes are present in the DAG: 132 # Ensure that both nodes are callables, then add both to the graph and 133 # assign the outgoing node as a root. 134 if len(self.graph.nodes()) == 0: 135 if not _iscallable(u_of_edge): 136 err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope." 137 raise ValueError(err_msg) 138 if not _iscallable(v_of_edge): 139 err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope." 140 raise ValueError(err_msg) 141 outgoing_node = self.add_node(u_of_edge) 142 incoming_node = self.add_node(v_of_edge) 143 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 144 return (outgoing_node, incoming_node) 145 146 node_names, node_callables = zip(*self.graph.nodes(data='callable')) 147 graph_edges = list(self.graph.edges(data=False)) 148 new_callables = dict() 149 150 if _iscallable(u_of_edge) and _iscallable(v_of_edge): 151 152 # Error Cases 0, 1: Either of the callables appear more than once in the DAG. 153 if node_callables.count(u_of_edge) > 1: 154 err_msg = f"Outgoing callable {u_of_edge.__name__} " \ 155 + "exists more than once in this DAG. " \ 156 + "Use the unique identifier of the required node or add a new node." 157 raise ValueError(err_msg) 158 if node_callables.count(v_of_edge) > 1: 159 err_msg = f"Incoming callable {v_of_edge.__name__} " \ 160 + "exists more than once in this DAG. " \ 161 + "Use the unique string identifier of the required node." 162 raise ValueError(err_msg) 163 164 # Base Case 1: Both args are callables and will be present in the DAG scope no more than once. 165 # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes. 166 try: 167 outgoing_node = self.node_id(u_of_edge) 168 except: 169 new_callables[u_of_edge] = u_of_edge 170 outgoing_node = _edges.label_node(u_of_edge) 171 try: 172 incoming_node = self.node_id(v_of_edge) 173 except: 174 new_callables[v_of_edge] = v_of_edge 175 incoming_node = _edges.label_node(v_of_edge) 176 177 else: 178 179 # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope. 180 if isinstance(u_of_edge, str) and u_of_edge not in node_names: 181 err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope." 182 raise ValueError(err_msg) 183 if isinstance(v_of_edge, str) and v_of_edge not in node_names: 184 err_msg = f"Incoming node {v_of_edge} not in DAG's current scope." 185 raise ValueError(err_msg) 186 187 # Both nodes' unique identifiers are present in the DAG 188 # and should be evaluated for a valid edge. 189 if isinstance(u_of_edge, str) and isinstance(v_of_edge, str): 190 outgoing_node = u_of_edge 191 incoming_node = v_of_edge 192 193 # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier. 194 else: 195 for node in (u_of_edge, v_of_edge): 196 def _assign_node(n): 197 # For the argument that is a unique string identifier, assign and continue. 198 if not _iscallable(n): 199 return n 200 # Error Case 4: The requested callable exists more than once in the DAG. 201 if node_callables.count(n) > 1: 202 err_msg = f"Outgoing callable {n.__name__} " \ 203 + "exists more than once in this DAG. " \ 204 + "Use the unique ID of the required node or add a new node." 205 raise ValueError(err_msg) 206 # If the callable exists only once in the DAG, use its unique identifier to 207 # evaluate the requested edge. 208 if node_callables.count(n) == 1: 209 return node_names[ node_callables.index(n) ] 210 # Otherwise, the callable is new and needs to be added to the DAG scope. 211 else: 212 new_callables[n] = n 213 return _edges.label_node(n) 214 if node == u_of_edge: 215 outgoing_node = _assign_node(node) 216 else: 217 incoming_node = _assign_node(node) 218 219 # Error Case 5: Both callables exist only once in the DAG, 220 # but an edge already exists between them. 221 if (outgoing_node, incoming_node) in graph_edges: 222 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \ 223 + "a definition in this DAG." 224 raise ValueError(err_msg) 225 226 # Ensure that no cycles will be created by adding this edge to the DAG. 227 test_edges: list = graph_edges + [(outgoing_node, incoming_node)] 228 229 # Error Case 6: Both callables exist only once in the DAG, 230 # but adding an edge between them creates circular dependencies. 231 if not _nx.is_directed_acyclic_graph(_nx.DiGraph(test_edges)): 232 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \ 233 + "results in circular dependencies." 234 raise ValueError(err_msg) 235 236 # We can now add the edge to the DAG, since we are certain it will not result in 237 # illegal dependencies/behavior. 238 # First, we should account for potential new nodes. This also handles 239 # duplicates on first entry to the DAG (e.g.: edge == (print, print)) 240 if new_callables.get(u_of_edge) != None: 241 outgoing_node = self.add_node(new_callables[u_of_edge]) 242 if new_callables.get(v_of_edge) != None: 243 incoming_node = self.add_node(new_callables[v_of_edge]) 244 245 # Then we can add the new edge. 246 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 247 return (outgoing_node, incoming_node) 248 249 def remove_node(self, *posargs, **kwargs) -> None: 250 """ 251 Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG. 252 Throws error message and recommends safer methods. 253 """ 254 raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.") 255 256 def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]: 257 """ 258 Removes an directed edge between nodes in the DAG's underlying graph. 259 Throws error if the edge does not exist. 260 On success, returns Tuple of the removed edge's unique identifiers. 261 """ 262 edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge)) 263 if edge_id not in self.graph.edges(data = False): 264 err_msg = "Requested edge does not exist in the DAG's scope" 265 raise ValueError(err_msg) 266 self.graph.remove_edge(edge_id[0], edge_id[1]) 267 return edge_id 268 269 def update_edges(self, *E, continue_on: Condition, filter: _types.LambdaType = None, data = None): 270 """ 271 Flexible function to update properties of edges in the DAG's scope, 272 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 273 lambda filter using the edgedict syntax. 274 275 List of nodes to update or filter argument is expected. Valid edge lists include: 276 277 \t update_edges((node1, node2), ...) or update_edges([(node1, node2)], ...) 278 279 \t update_edges((node1, callable2), ...) or update_edges([(node1, callable2)], ...) 280 281 \t update_edges(edgedict), where isinstance(edgedict, webber.edges.edgedict) == True 282 283 Parameters: 284 285 > continue_on: webber.edges.Condition value to update matching edges with, 286 287 \t Execute child on success, failure, or any exit state, of the parent node 288 289 > filter: lambda property that can be used instead of a list of edges to be updated. 290 291 \t filter = (lambda e: n.parent == print or e.child == print) 292 293 > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set. 294 295 \t data = { 'continue_on': webber.edges.Condition } 296 297 """ 298 if len(E) == 0 and filter == None: 299 raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.") 300 301 elif isinstance(E, dict) or isinstance(E, edgedict): 302 E = [E] 303 304 elif len(E) == 1 and isinstance(E[0], _abc.Iterable): 305 try: 306 _ = self.get_edges(E[0]) 307 except: 308 E = E[0] 309 310 if filter != None: 311 edge_ids = self.filter_edges(filter, data = False) 312 else: 313 if isinstance(E[0], dict) or isinstance(E[0], edgedict): 314 try: 315 ids = [e['id'] for e in E] 316 except KeyError: 317 err_msg = 'In dictionary form, all given edges must be follow edgedict standards.' 318 raise ValueError(err_msg) 319 else: 320 ids = E 321 edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids] 322 323 std_update = (continue_on == None) 324 325 if std_update: 326 for edge_id, e in zip(edge_ids, E): 327 if data != None: 328 self._update_edge(data, id = edge_id) 329 else: 330 self._update_edge(e, id = edge_id) 331 332 else: 333 if continue_on != None: 334 if not isinstance(continue_on, Condition): 335 err_msg = f"Condition assignment must use webber.edges.Condition" 336 raise TypeError(err_msg) 337 for e in edge_ids: 338 self.graph.edges[e]['Condition'] = continue_on 339 340 def _update_edge(self, edgedict: dict, id: tuple[str, str] = None, force: bool = False): 341 """ 342 Internal only. Update properties of an individual edge within a DAG's scope, 343 given a well-structured dictionary and the tuple identifier of the network edge. 344 Force argument bypasses validation, and should only be used internally. 345 """ 346 if id != None: 347 try: 348 if edgedict.get('id') and id != edgedict['id']: 349 raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {edgedict['id']}") 350 except ValueError as e: 351 if not force: 352 raise e 353 edgedict['id'] = id 354 355 expected_keys = ('parent', 'child', 'id', 'continue_on') 356 if not set(expected_keys).issuperset(set(edgedict.keys())): 357 raise ValueError(f"Expecting keys: {expected_keys}") 358 359 if not force: 360 361 e1, e2 = None, None 362 363 if edgedict.get('id'): 364 e1 = self.get_edge(edgedict['id'], data = False) 365 366 if edgedict.get('parent') or edgedict.get('child'): 367 e2 = self.get_edge(edgedict.get('parent'), edgedict.get('child'), data = False) 368 else: 369 e2 = e1 370 371 if e1 != e2: 372 raise ValueError('Edge vertices should not be changed using update functions.') 373 374 elif e1 == None: 375 raise ValueError('Requested edge was not given an identifier.') 376 377 if edgedict.get('continue_on') and not isinstance(edgedict['continue_on'], Condition): 378 err_msg = f"Condition assignment must use webber.edges.Condition" 379 raise TypeError(err_msg) 380 381 edge_id = edgedict.pop('id') 382 edge = {k: v for k,v in edgedict.items() if k not in ('parent', 'child', 'id')} 383 self.graph.edges[(edge_id[0], edge_id[1])].update(edge) 384 385 def relabel_node(self, node: _T.Union[str, _T.Callable], label: str): 386 """ 387 Update the label, or name, given to a node in the DAG's scope, 388 given a Python string and a node identifier. 389 Well-structured wrapper for a common use-case of DAG.update_nodes. 390 """ 391 node_id = self.node_id(node) 392 if not isinstance(label, str) or len(label) == 0: 393 err_msg = "Node label must be a Python string with one or more characters." 394 raise ValueError(err_msg) 395 self.update_nodes(node_id, data = {'name': label}) 396 return label 397 398 def update_nodes(self, *N, filter: _types.LambdaType = None, data = None, callable = None, args = None, kwargs = None): 399 """ 400 Flexible function to update properties of nodes in the DAG's scope, 401 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 402 lambda filter using the dotdict syntax. 403 404 List of nodes to update or filter argument is expected. Valid node lists include: 405 406 \t update_nodes(node_id, ...) or update_nodes([node_id], ...) 407 408 \t update_nodes(callable, ...) or update_nodes([callable], ...) 409 410 \t update_nodes(node1, node2, ...) or update_nodes([node1, node2], ...) 411 412 \t update_nodes(node1, callable2, ...) or update_nodes([node1, callable2], ...) 413 414 > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id) 415 416 Parameters: 417 418 > filter: lambda property that can be used instead of a list of nodes to be updated. 419 420 \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args) 421 422 > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set. 423 Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method. 424 425 \t data = { 426 \t 'callable': print, 427 \t 'args': ['Hello', 'World'], 428 \t 'kwargs': {'sep': ', '}, 429 \t 'name': 'custom_label', 430 \t 'id': 'unique-identifier' 431 \t} 432 433 > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List). 434 435 > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary. 436 437 """ 438 if len(N) == 0 and filter == None: 439 raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.") 440 441 elif len(N) > 0 and filter != None: 442 raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.") 443 444 elif isinstance(N, dict) or isinstance(N, str): 445 N = [N] 446 447 elif len(N) == 1 and isinstance(N[0], _abc.Iterable): 448 if isinstance(N[0][0], dict): 449 N = N[0] 450 elif isinstance(N[0][0], str): 451 # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator). 452 if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 453 N = N[0] 454 455 456 if filter != None: 457 node_ids = self.filter_nodes(filter, data = False) 458 else: 459 if isinstance(N[0], dict): 460 ids = [n['id'] for n in N] 461 else: 462 ids = N 463 node_ids = [self.node_id(i) for i in ids] 464 465 std_update = (callable == None) and (args == None) and (kwargs == None) 466 467 if std_update: 468 for node_id, n in zip(node_ids, N): 469 if data != None: 470 self._update_node(data, id = node_id) 471 else: 472 self._update_node(n, id = node_id) 473 474 else: 475 if callable != None: 476 if not _iscallable(callable): 477 err_msg = f"Requested node is not assigned a callable Python function." 478 raise TypeError(err_msg) 479 for node_id in node_ids: 480 self.graph.nodes[node_id]['callable'] = callable 481 self.graph.nodes[node_id]['name'] = callable.__name__ 482 483 if args != None: 484 if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)): 485 err_msg = f"Requested node is not assigned a tuple of pos args." 486 raise TypeError(err_msg) 487 args = tuple(args) 488 for node_id in node_ids: 489 self.graph.nodes[node_id]['args'] = args 490 491 if kwargs != None: 492 if not isinstance(kwargs, dict): 493 err_msg = f"Requested node is not assigned a dictionary of kw args." 494 raise TypeError(err_msg) 495 for node_id in node_ids: 496 self.graph.nodes[node_id]['kwargs'] = kwargs 497 498 def get_edges(self, *N, data: bool = True) -> _T.Union[list[edgedict], list[tuple]]: 499 """ 500 Retrieval function for DAG edge data, based on tuple identifiers. 501 Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2'])) 502 """ 503 if len(N) == 0: 504 if data == True: 505 return list(map(edgedict, self.graph.edges.data())) 506 return list(self.graph.edges.data(data=False)) 507 508 # elif len(N) == 1: 509 # if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple): 510 # N = N[0] 511 512 if len(N) != len(set(N)) or False in map(lambda n: isinstance(n, _abc.Iterable) and len(n) == 2, N): 513 err_msg = 'All requested edges must be unique tuples of size 2.' 514 raise ValueError(err_msg) 515 516 edge_data = [self.get_edge(o, i, data=data) for (o, i) in N] 517 return edge_data 518 519 def get_edge(self, outgoing_node: _T.Union[str, callable], incoming_node: _T.Union[str, callable], data: bool = True) -> _T.Union[edgedict, tuple]: 520 """ 521 Retrieval function for a single directed edge between nodes in a DAG's scope. 522 """ 523 id = (self.node_id(outgoing_node), self.node_id(incoming_node)) 524 if not data: 525 return id 526 edge_data = self.graph.get_edge_data(u = id[0], v = id[1]) 527 if not edge_data: 528 err_msg = f'No match found for the directed edge requested: {id}' 529 raise ValueError(err_msg) 530 return edgedict(*id, **edge_data) 531 532 def get_node(self, n: _T.Union[str, callable]) -> dotdict: 533 """ 534 Given a unique identifier, returns a dictionary of node metadata 535 for a single node in the DAG's scope. 536 """ 537 node_id = self.node_id(n) 538 return dotdict(self.graph.nodes[node_id]) 539 540 def get_nodes(self, *N) -> list[dotdict]: 541 """ 542 Flexible function to retrieve DAG node data, based on node identifiers 543 (e.g.: string IDs or unique callables). 544 """ 545 if len(N) == 0: 546 node_data = list(self.graph.nodes.values()) 547 return [dotdict(d) for d in node_data] 548 549 elif len(N) == 1: 550 if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str): 551 N = N[0] 552 else: 553 node_id = self.node_id(N[0]) 554 node_data = [dotdict(self.graph.nodes[node_id])] 555 return node_data 556 557 if not len(N) == len(set(N)): 558 err_msg = 'All requested nodes must be unique identifiers.' 559 raise ValueError(err_msg) 560 561 node_ids = [self.node_id(n) for n in N] 562 node_data = [dotdict(self.graph.nodes[n]) for n in node_ids] 563 return node_data 564 565 def filter_nodes(self, filter: _types.LambdaType, data: bool = False): 566 """ 567 Given a lambda function, filter nodes in a DAG's scope based on its attributes. 568 Current limitation: Filters must use node identifier strings when referencing nodes. 569 Use get_nodes for more flexible controls. 570 """ 571 if not data: 572 return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))] 573 return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))] 574 575 def filter_edges(self, filter = _types.LambdaType, data: bool = False) -> list[edgedict]: 576 """ 577 Given a lambda function, filter edges in a DAG's scope based on its attributes. 578 Current limitation: Filters must use node identifier strings when referencing nodes. 579 Use get_edges for more flexible controls. 580 """ 581 if not data: 582 return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))] 583 return [edgedict(e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))] 584 585 def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int): 586 """ 587 Given a node identifier, set number of automatic retries in case of failure. 588 Re-attempts will begin as soon as possible. 589 """ 590 if not isinstance(count, int) or not count >= 0: 591 raise ValueError("Retry count must be a non-negative integer.") 592 node = self.node_id(identifier) 593 self.graph.nodes[node]['retry'] = count 594 595 def skip_node(self, identifier: _T.Union[str,_T.Callable], skip: bool = True, as_failure = False): 596 """ 597 Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error). 598 Allows conditional control and testing over DAG's order of operations. 599 """ 600 if not isinstance(skip, bool): 601 raise ValueError("Skip argument must be a boolean value.") 602 node = self.node_id(identifier) 603 self.graph.nodes[node]['skip'] = (skip, as_failure) 604 605 def critical_path(self, nodes): 606 """ 607 Given a set of nodes, returns a subset of the DAG containing 608 only the node(s) and its parents, or upstream dependencies. 609 """ 610 if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str): 611 node_ids = {self.node_id(n) for n in nodes} 612 else: 613 node_ids = {self.node_id(nodes)} 614 return self._subgraph(node_ids) 615 616 def execute(self, return_ref=False, print_exc=False): 617 """ 618 Basic wrapper for execution of the DAG's underlying callables. 619 """ 620 executor = self.DAGExecutor(self.graph, self.root, print_exc) 621 return executor if return_ref else None 622 623 def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] = None): 624 """ 625 Basic wrapper to visualize DAG using Vis.js and NetGraph libraries. 626 By default, visualization library only loaded in after DAG.visualize() is called, halving import times. 627 """ 628 import webber.viz as _viz 629 630 match type: 631 case 'browser': 632 _viz.visualize_browser(self.graph) 633 634 case 'plt': 635 return _viz.visualize_plt(self.graph) 636 637 case 'gui': 638 # _visualize_gui(self.graph) 639 raise NotImplementedError 640 641 case None: 642 if _viz._in_notebook(): 643 return _viz.visualize_plt(self.graph) 644 else: 645 _viz.visualize_browser(self.graph) 646 647 case _: 648 err_msg = "Unknown visualization type requested." 649 raise NotImplementedError(err_msg) 650 651 @property 652 def root(self) -> list[str]: 653 """ 654 Return list of nodes with no dependencies. 655 Root nodes will occur first in DAG's order of operations. 656 """ 657 return list(filter( 658 lambda node: len(list(self.graph.predecessors(node))) < 1, 659 self.graph.nodes.keys() 660 )) 661 662 @property 663 def nodes(self): 664 return self.graph.nodes 665 666 def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str: 667 """ 668 Validate whether identifier given is a valid node within the DAG's scope. 669 Primarily for internal use, but useful for retrieving string identifiers 670 for a unique callable in a DAG. 671 """ 672 node_names, node_callables = zip(*self.graph.nodes(data='callable')) 673 674 if isinstance(identifier, str): 675 if identifier not in node_names: 676 err_msg = f"Node {identifier} is not defined in this DAG's scope." 677 raise ValueError(err_msg) 678 node = identifier 679 elif _iscallable(identifier): 680 match node_callables.count(identifier): 681 case 0: 682 err_msg = f"Callable {identifier} is not defined in this DAG's scope." 683 raise ValueError(err_msg) 684 case 1: 685 node = node_names[ node_callables.index(identifier) ] 686 case _: 687 err_msg = f"Callable {identifier.__name__} " \ 688 + "exists more than once in this DAG. " \ 689 + "Use the unique identifier of the required node." 690 raise ValueError(err_msg) 691 else: 692 err_msg = f"Node {identifier} must be a string or a Python callable" 693 raise TypeError(err_msg) 694 return node 695 696 def _update_node(self, nodedict: dict, id: str = None, force: bool = False): 697 """ 698 Internal only. Update properties of single node within a DAG's scope, 699 given a well-structured dictionary and the tuple identifier of the network edge. 700 Force argument bypasses dictionary validation, and should only be used internally. 701 """ 702 if id != None: 703 try: 704 if nodedict.get('id') != None and id != nodedict['id']: 705 raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {nodedict['id']}") 706 except ValueError as e: 707 if not force: 708 raise e 709 nodedict['id'] = id 710 711 expected_keys = ('callable', 'args', 'kwargs', 'name', 'id') 712 if not set(expected_keys).issuperset(set(nodedict.keys())): 713 raise ValueError(f"Expecting keys: {expected_keys}") 714 715 if not force: 716 if nodedict.get('callable'): 717 if _iscallable(nodedict['callable']): 718 err_msg = f"Requested node is not assigned a callable Python function." 719 raise TypeError(err_msg) 720 if not nodedict.get('name'): 721 nodedict['name'] = nodedict['callable'].__name__ 722 723 if nodedict.get('name') and (not isinstance(nodedict['name'], str) or len(nodedict['name']) == 0): 724 err_msg = f"Requested node name must be a non-null Python string, will default to callable when not set." 725 raise TypeError(err_msg) 726 727 if nodedict.get('args'): 728 if not (isinstance(nodedict['args'], _abc.Iterable) or isinstance(nodedict['args'], str)): 729 err_msg = f"Requested node is not assigned a tuple of pos args." 730 raise TypeError(err_msg) 731 nodedict['args'] = tuple(nodedict['args']) 732 733 if nodedict.get('kwargs') and not isinstance(nodedict['kwargs'], dict): 734 err_msg = f"Requested node is not assigned a dictionary of kw args." 735 raise TypeError(err_msg) 736 737 node_id = nodedict.pop('id') 738 self.graph.nodes[node_id].update(nodedict) 739 740 # Reset node name if implicitly requested. 741 if not nodedict.get('name'): 742 self.graph.nodes[node_id]['name'] = self.graph.nodes[node_id]['callable'].__name__ 743 744 def _subgraph(self, node_ids: set[str]): 745 """ 746 Internal only. Given a set of nodes, returns a subset of the DAG containing 747 only the node(s) and upstream dependencies. 748 """ 749 parent_nodes = set() 750 for node in node_ids: 751 parent_nodes = parent_nodes.union(set(self.graph.predecessors(node))) 752 while parent_nodes: 753 node_ids = node_ids.union(parent_nodes) 754 parent_nodes, child_nodes = set(), parent_nodes 755 for node in child_nodes: 756 parent_nodes = parent_nodes.union(set(self.graph.predecessors(node))) 757 subgraph = self.graph.subgraph(node_ids) 758 return DAG(subgraph, __force=True) 759 760 def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise: 761 """ 762 Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope. 763 Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope. 764 """ 765 try: 766 key = self.node_id(promise.key) 767 except Exception as e: 768 raise _xcoms.InvalidCallable(e) 769 return _xcoms.Promise(key) 770 771 def __init__(self, graph: _nx.DiGraph = None, **kwargs) -> None: 772 773 if graph is None: 774 self.graph = _nx.DiGraph() 775 return 776 777 # Meant for internal use only, creating DAGs from subgraphs. 778 if kwargs.get('__force') == True: 779 self.graph = graph 780 return 781 782 _edges.validate_dag(graph) 783 784 # Define framework specific logic as nested dictionaries. 785 for node in graph.nodes.keys(): 786 graph.nodes[node]['callable'] = node 787 graph.nodes[node]['name'] = node.__name__ 788 graph.nodes[node]['args'] = [] 789 graph.nodes[node]['kwargs'] = {} 790 791 for e in graph.edges: 792 condition = graph.edges[e].get('Condition') 793 if condition is not None: 794 if condition not in Condition: 795 raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.') 796 else: 797 graph.edges[e]['Condition'] = Condition.Success 798 799 graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node)) 800 for n in graph.nodes: 801 graph.nodes[n]['id'] = n 802 self.graph = _nx.DiGraph(graph) 803 804 class DAGExecutor: 805 """ 806 Base class used to execute DAG in embarrassingly parallel. 807 """ 808 def __init__(self, graph: _nx.DiGraph, roots: list, print_exc: bool = False) -> None: 809 810 with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _: 811 812 # Skip execution if there are no callables in scope. 813 if len(graph.nodes) == 0: 814 print('Given DAG has no callables in scope. Skipping execution...') 815 return 816 817 # Initialize local variables for execution. 818 complete, started, failed, skipped = set(), set(), set(), set() 819 events = set(roots) 820 refs: dict[str, _futures.Future] = {} 821 822 def raise_exc(message): 823 raise ValueError(message) 824 825 def run_conditions_met(n): 826 for p in graph.predecessors(n): 827 match graph.edges.get((p, n))['Condition']: 828 case Condition.Success: 829 if p not in complete: 830 return False 831 case Condition.Failure: 832 if p not in failed: 833 return False 834 case Condition.AnyCase: 835 if p not in failed and p not in complete: 836 return False 837 return True 838 839 skip = graph.nodes.data("skip", default=(False, False)) 840 retry = {n: [c+1, {}] for n,c in graph.nodes.data("retry", default=0)} 841 842 # Start execution of root node functions. 843 with _futures.ThreadPoolExecutor() as executor: 844 845 def Submit(event, callable, name, args, kwargs): 846 if skip[event][0]: # NOTE: Internally, DAGExecutor tracks these skipped events as successes or failures. 847 retry[event][0] = 0 848 skip_callable = raise_exc if skip[event][1] else print 849 return executor.submit( 850 _event_wrapper, 851 _callable=skip_callable, 852 _name=graph.nodes[event]['name'], 853 _args=[f"Event {event} skipped..."], 854 _kwargs={} 855 ) 856 else: 857 retry[event][0] -= 1 858 if (retry[event][0] > 0) and (retry[event][1] == {}): 859 retry[event][1] = { 860 'callable': callable, 861 'name': name, 862 'args': args, 863 'kwargs': kwargs 864 } 865 return executor.submit( 866 _event_wrapper, 867 _callable=callable, 868 _name=name, 869 _args=args, 870 _kwargs=kwargs 871 ) 872 873 for event in events: 874 refs[event] = Submit( 875 event, 876 graph.nodes[event]['callable'], 877 graph.nodes[event]['name'], 878 graph.nodes[event]['args'], 879 graph.nodes[event]['kwargs'] 880 ) 881 started.add(event) 882 # Loop until all nodes in the network are executed. 883 while (len(complete) + len(failed) + len(skipped)) != len(graph): 884 for event in events: 885 if refs[event].done() is True: 886 if refs[event].exception(timeout=0) is not None: 887 try: 888 raise refs[event].exception(timeout=0) 889 except: 890 if print_exc: 891 _traceback.print_exc() 892 893 if retry[event][0] > 0: 894 print(f"Event {event} exited with exception, retrying...") 895 refs[event] = Submit( 896 event, 897 callable=retry[event][1]['callable'], 898 name=retry[event][1]['name'], 899 args=retry[event][1]['args'], 900 kwargs=retry[event][1]['kwargs'] 901 ) 902 continue 903 904 print(f"Event {event} exited with exception...") 905 failed.add(event) 906 skipping = [ 907 e[1] for e in set(graph.out_edges(event)) 908 if not _edges.continue_on_failure(graph.edges.get(e)) 909 ] 910 else: 911 complete.add(event) 912 skipping = [ 913 e[1] for e in set(graph.out_edges(event)) 914 if not _edges.continue_on_success(graph.edges.get(e)) 915 ] 916 skipped = skipped.union(skipping) 917 for n in skipping: 918 skipped = skipped.union(_nx.descendants(graph, n)) 919 carryon = set(graph.successors(event)).difference(skipped) 920 starting = [ 921 successor for successor in carryon if 922 run_conditions_met(successor) 923 ] 924 for successor in starting: 925 _args = [ 926 a if not isinstance(a, _xcoms.Promise) else refs[a.key].result() 927 for a in graph.nodes[successor]['args'] 928 ] 929 _kwargs = { 930 k: v if not isinstance(v, _xcoms.Promise) else refs[v.key].result() 931 for k, v in graph.nodes[successor]['kwargs'].items() 932 } 933 refs[successor] = Submit( 934 successor, 935 graph.nodes[successor]['callable'], 936 graph.nodes[successor]['name'], 937 _args, 938 _kwargs 939 ) 940 started.add(successor) 941 # Initialized nodes that are incomplete or not yet documented as complete. 942 events = started.difference(complete.union(failed).union(skipped)) 943 944class QueueDAG(DAG): 945 """ 946 #### Experimental, as of v0.2. #### 947 948 Directed Acyclic Graph used to queue and execute Pythonic callables in parallel, 949 while stringing the outputs of those callables in linear sequences. 950 951 Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the 952 behavior of root nodes to determine if and/or when the DAG run has been completed. 953 954 Root nodes will be re-executed until culled by one of two conditions: 955 1. A max number of iterations has been completed, or 956 2. The output of the root node's callable matches a lambda halt_condition. 957 958 Both conditions can be set at run-time. 959 960 As of v0.2, QueueDAG experiences extreme latency when nested inside of a standard webber.DAG class. 961 """ 962 963 conditions = {} 964 965 def __init__(self): 966 super().__init__() 967 968 def add_node(self, node, *args, **kwargs): 969 """ 970 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 971 On success, return unique identifier for the new node. 972 973 Reserved key-words are used for Queue DAG definitions: 974 975 - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables. 976 977 \t halt_condition = (lambda output: output == None) 978 979 - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument. 980 981 - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop. 982 """ 983 halt_condition = kwargs.pop('halt_condition', None) 984 iterator: int = kwargs.pop('iterator', None) 985 max_iter: int = kwargs.pop('max_iter', None) 986 987 return_val = super().add_node(node, *args, **kwargs) 988 989 if max_iter != None: 990 iter_limit = int(max_iter) 991 elif iterator != None: 992 iter_limit = int(iterator) 993 else: 994 iter_limit = None 995 996 self.conditions[return_val] = { 997 'halt_condition': halt_condition, 998 'iter_limit': iter_limit 999 } 1000 1001 return return_val 1002 1003 def add_edge(self, u_of_edge, v_of_edge, continue_on = Condition.Success): 1004 """ 1005 Adds an edge between nodes in the Queue DAG's underlying graph. 1006 Queue DAG nodes may have a maximum of one child and one parent worker. 1007 """ 1008 for node in (u_of_edge, v_of_edge): 1009 try: 1010 node_id = self.node_id(node) 1011 except: 1012 continue 1013 1014 filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id) 1015 try: 1016 assert(len(self.filter_edges(filter)) == 0) 1017 except Exception as e: 1018 e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.") 1019 raise e 1020 1021 return super().add_edge(u_of_edge, v_of_edge, continue_on) 1022 1023 def execute(self, *promises, return_ref=False, print_exc=False): 1024 """ 1025 Basic wrapper for execution of the DAG's underlying callables. 1026 """ 1027 queues = {} 1028 processes = {} 1029 join = set() 1030 1031 promises: dict = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {} 1032 1033 with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _: 1034 1035 # Skip execution if there are no callables in scope. 1036 if len(self.graph.nodes) == 0: 1037 print('Given DAG has no callables in scope. Skipping execution...') 1038 return 1039 1040 with _futures.ThreadPoolExecutor() as executor: 1041 1042 for id in self.root: 1043 node = self.get_node(id) 1044 queues[id] = _q.LifoQueue() 1045 node.update({ 1046 'callable': _queue._worker, 1047 'args': tuple(), 1048 'kwargs': { 1049 'work': node.callable, 1050 'args': node.args, 'kwargs': node.kwargs, 1051 'promises': promises, 1052 'print_exc': print_exc, 1053 'halt_condition': self.conditions[id]['halt_condition'], 1054 'iter_limit': self.conditions[id]['iter_limit'], 1055 'out_queue': queues.get(id) 1056 } 1057 }) 1058 processes[id] = executor.submit( 1059 _event_wrapper, 1060 _callable=node['callable'], 1061 _name=node['name'], 1062 _args=node['args'], 1063 _kwargs=node['kwargs'] 1064 ) 1065 1066 for parent_id, id in self.graph.edges: 1067 node = self.get_node(id) 1068 queues[id] = _q.LifoQueue() 1069 if len(list(self.graph.successors(id))) == 0: 1070 end_proc = id 1071 node.update({ 1072 'callable': _queue._worker, 1073 'args': tuple(), 1074 'kwargs': { 1075 'work': node.callable, 1076 'args': node.args, 'kwargs': node.kwargs, 1077 'promises': promises, 1078 'print_exc': print_exc, 1079 'parent_id': parent_id, 1080 'parent_process': processes[parent_id], 1081 'in_queue': queues.get(parent_id), 1082 'out_queue': queues.get(id) 1083 } 1084 }) 1085 processes[id] = executor.submit( 1086 _event_wrapper, 1087 _callable=node['callable'], 1088 _name=node['name'], 1089 _args=node['args'], 1090 _kwargs=node['kwargs'] 1091 ) 1092 1093 1094 while len(join) != len(self.graph.nodes): 1095 for node in self.graph.nodes: 1096 if processes[node].done(): 1097 join.add(node) 1098 1099 return_val = [] 1100 while not queues[end_proc].empty(): 1101 return_val.append(queues[end_proc].get()) 1102 1103 return return_val
75class DAG: 76 """ 77 Directed Acyclic Graph used to represent Pythonic tasks in parallel. 78 """ 79 def add_node(self, node, *args, **kwargs) -> str: 80 """ 81 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 82 On success, return unique identifier for the new node. 83 """ 84 if not _iscallable(node): 85 err_msg = f"{node}: requested node is not a callable Python function." 86 raise TypeError(err_msg) 87 88 node_name = _edges.label_node(node) 89 90 args = tuple([ 91 arg if not isinstance(args, _xcoms.Promise) else self.resolve_promise(arg) 92 for arg in args 93 ]) 94 95 for k, val in kwargs.items(): 96 if isinstance(val, _xcoms.Promise): 97 kwargs[k] = self.resolve_promise(val) 98 99 self.graph.add_node( 100 node_for_adding=node_name, 101 callable=node, args=args, kwargs=kwargs, 102 name=node.__name__, 103 id=node_name 104 ) 105 106 return node_name 107 108 109 def add_edge( 110 self, 111 u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable], 112 continue_on: Condition = Condition.Success 113 ) -> _T.Tuple[str,str]: 114 """ 115 Adds an edge between nodes in the DAG's underlying graph, 116 so long as the requested edge is unique and has not been added previously. 117 118 On success, returns Tuple of the new edge's unique identifiers. 119 """ 120 # Validate inputs prior to execution 121 # - Nodes must be identifiers or callables 122 # - Conditions must belong to the webber.edges.Condition class 123 if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)): 124 err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable" 125 raise TypeError(err_msg) 126 if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)): 127 err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable" 128 raise TypeError(err_msg) 129 if not isinstance(continue_on, Condition): 130 raise TypeError("Edge conditions must use the webber.edges.Condition class.") 131 132 # Base Case 0: No nodes are present in the DAG: 133 # Ensure that both nodes are callables, then add both to the graph and 134 # assign the outgoing node as a root. 135 if len(self.graph.nodes()) == 0: 136 if not _iscallable(u_of_edge): 137 err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope." 138 raise ValueError(err_msg) 139 if not _iscallable(v_of_edge): 140 err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope." 141 raise ValueError(err_msg) 142 outgoing_node = self.add_node(u_of_edge) 143 incoming_node = self.add_node(v_of_edge) 144 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 145 return (outgoing_node, incoming_node) 146 147 node_names, node_callables = zip(*self.graph.nodes(data='callable')) 148 graph_edges = list(self.graph.edges(data=False)) 149 new_callables = dict() 150 151 if _iscallable(u_of_edge) and _iscallable(v_of_edge): 152 153 # Error Cases 0, 1: Either of the callables appear more than once in the DAG. 154 if node_callables.count(u_of_edge) > 1: 155 err_msg = f"Outgoing callable {u_of_edge.__name__} " \ 156 + "exists more than once in this DAG. " \ 157 + "Use the unique identifier of the required node or add a new node." 158 raise ValueError(err_msg) 159 if node_callables.count(v_of_edge) > 1: 160 err_msg = f"Incoming callable {v_of_edge.__name__} " \ 161 + "exists more than once in this DAG. " \ 162 + "Use the unique string identifier of the required node." 163 raise ValueError(err_msg) 164 165 # Base Case 1: Both args are callables and will be present in the DAG scope no more than once. 166 # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes. 167 try: 168 outgoing_node = self.node_id(u_of_edge) 169 except: 170 new_callables[u_of_edge] = u_of_edge 171 outgoing_node = _edges.label_node(u_of_edge) 172 try: 173 incoming_node = self.node_id(v_of_edge) 174 except: 175 new_callables[v_of_edge] = v_of_edge 176 incoming_node = _edges.label_node(v_of_edge) 177 178 else: 179 180 # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope. 181 if isinstance(u_of_edge, str) and u_of_edge not in node_names: 182 err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope." 183 raise ValueError(err_msg) 184 if isinstance(v_of_edge, str) and v_of_edge not in node_names: 185 err_msg = f"Incoming node {v_of_edge} not in DAG's current scope." 186 raise ValueError(err_msg) 187 188 # Both nodes' unique identifiers are present in the DAG 189 # and should be evaluated for a valid edge. 190 if isinstance(u_of_edge, str) and isinstance(v_of_edge, str): 191 outgoing_node = u_of_edge 192 incoming_node = v_of_edge 193 194 # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier. 195 else: 196 for node in (u_of_edge, v_of_edge): 197 def _assign_node(n): 198 # For the argument that is a unique string identifier, assign and continue. 199 if not _iscallable(n): 200 return n 201 # Error Case 4: The requested callable exists more than once in the DAG. 202 if node_callables.count(n) > 1: 203 err_msg = f"Outgoing callable {n.__name__} " \ 204 + "exists more than once in this DAG. " \ 205 + "Use the unique ID of the required node or add a new node." 206 raise ValueError(err_msg) 207 # If the callable exists only once in the DAG, use its unique identifier to 208 # evaluate the requested edge. 209 if node_callables.count(n) == 1: 210 return node_names[ node_callables.index(n) ] 211 # Otherwise, the callable is new and needs to be added to the DAG scope. 212 else: 213 new_callables[n] = n 214 return _edges.label_node(n) 215 if node == u_of_edge: 216 outgoing_node = _assign_node(node) 217 else: 218 incoming_node = _assign_node(node) 219 220 # Error Case 5: Both callables exist only once in the DAG, 221 # but an edge already exists between them. 222 if (outgoing_node, incoming_node) in graph_edges: 223 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \ 224 + "a definition in this DAG." 225 raise ValueError(err_msg) 226 227 # Ensure that no cycles will be created by adding this edge to the DAG. 228 test_edges: list = graph_edges + [(outgoing_node, incoming_node)] 229 230 # Error Case 6: Both callables exist only once in the DAG, 231 # but adding an edge between them creates circular dependencies. 232 if not _nx.is_directed_acyclic_graph(_nx.DiGraph(test_edges)): 233 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \ 234 + "results in circular dependencies." 235 raise ValueError(err_msg) 236 237 # We can now add the edge to the DAG, since we are certain it will not result in 238 # illegal dependencies/behavior. 239 # First, we should account for potential new nodes. This also handles 240 # duplicates on first entry to the DAG (e.g.: edge == (print, print)) 241 if new_callables.get(u_of_edge) != None: 242 outgoing_node = self.add_node(new_callables[u_of_edge]) 243 if new_callables.get(v_of_edge) != None: 244 incoming_node = self.add_node(new_callables[v_of_edge]) 245 246 # Then we can add the new edge. 247 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 248 return (outgoing_node, incoming_node) 249 250 def remove_node(self, *posargs, **kwargs) -> None: 251 """ 252 Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG. 253 Throws error message and recommends safer methods. 254 """ 255 raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.") 256 257 def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]: 258 """ 259 Removes an directed edge between nodes in the DAG's underlying graph. 260 Throws error if the edge does not exist. 261 On success, returns Tuple of the removed edge's unique identifiers. 262 """ 263 edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge)) 264 if edge_id not in self.graph.edges(data = False): 265 err_msg = "Requested edge does not exist in the DAG's scope" 266 raise ValueError(err_msg) 267 self.graph.remove_edge(edge_id[0], edge_id[1]) 268 return edge_id 269 270 def update_edges(self, *E, continue_on: Condition, filter: _types.LambdaType = None, data = None): 271 """ 272 Flexible function to update properties of edges in the DAG's scope, 273 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 274 lambda filter using the edgedict syntax. 275 276 List of nodes to update or filter argument is expected. Valid edge lists include: 277 278 \t update_edges((node1, node2), ...) or update_edges([(node1, node2)], ...) 279 280 \t update_edges((node1, callable2), ...) or update_edges([(node1, callable2)], ...) 281 282 \t update_edges(edgedict), where isinstance(edgedict, webber.edges.edgedict) == True 283 284 Parameters: 285 286 > continue_on: webber.edges.Condition value to update matching edges with, 287 288 \t Execute child on success, failure, or any exit state, of the parent node 289 290 > filter: lambda property that can be used instead of a list of edges to be updated. 291 292 \t filter = (lambda e: n.parent == print or e.child == print) 293 294 > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set. 295 296 \t data = { 'continue_on': webber.edges.Condition } 297 298 """ 299 if len(E) == 0 and filter == None: 300 raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.") 301 302 elif isinstance(E, dict) or isinstance(E, edgedict): 303 E = [E] 304 305 elif len(E) == 1 and isinstance(E[0], _abc.Iterable): 306 try: 307 _ = self.get_edges(E[0]) 308 except: 309 E = E[0] 310 311 if filter != None: 312 edge_ids = self.filter_edges(filter, data = False) 313 else: 314 if isinstance(E[0], dict) or isinstance(E[0], edgedict): 315 try: 316 ids = [e['id'] for e in E] 317 except KeyError: 318 err_msg = 'In dictionary form, all given edges must be follow edgedict standards.' 319 raise ValueError(err_msg) 320 else: 321 ids = E 322 edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids] 323 324 std_update = (continue_on == None) 325 326 if std_update: 327 for edge_id, e in zip(edge_ids, E): 328 if data != None: 329 self._update_edge(data, id = edge_id) 330 else: 331 self._update_edge(e, id = edge_id) 332 333 else: 334 if continue_on != None: 335 if not isinstance(continue_on, Condition): 336 err_msg = f"Condition assignment must use webber.edges.Condition" 337 raise TypeError(err_msg) 338 for e in edge_ids: 339 self.graph.edges[e]['Condition'] = continue_on 340 341 def _update_edge(self, edgedict: dict, id: tuple[str, str] = None, force: bool = False): 342 """ 343 Internal only. Update properties of an individual edge within a DAG's scope, 344 given a well-structured dictionary and the tuple identifier of the network edge. 345 Force argument bypasses validation, and should only be used internally. 346 """ 347 if id != None: 348 try: 349 if edgedict.get('id') and id != edgedict['id']: 350 raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {edgedict['id']}") 351 except ValueError as e: 352 if not force: 353 raise e 354 edgedict['id'] = id 355 356 expected_keys = ('parent', 'child', 'id', 'continue_on') 357 if not set(expected_keys).issuperset(set(edgedict.keys())): 358 raise ValueError(f"Expecting keys: {expected_keys}") 359 360 if not force: 361 362 e1, e2 = None, None 363 364 if edgedict.get('id'): 365 e1 = self.get_edge(edgedict['id'], data = False) 366 367 if edgedict.get('parent') or edgedict.get('child'): 368 e2 = self.get_edge(edgedict.get('parent'), edgedict.get('child'), data = False) 369 else: 370 e2 = e1 371 372 if e1 != e2: 373 raise ValueError('Edge vertices should not be changed using update functions.') 374 375 elif e1 == None: 376 raise ValueError('Requested edge was not given an identifier.') 377 378 if edgedict.get('continue_on') and not isinstance(edgedict['continue_on'], Condition): 379 err_msg = f"Condition assignment must use webber.edges.Condition" 380 raise TypeError(err_msg) 381 382 edge_id = edgedict.pop('id') 383 edge = {k: v for k,v in edgedict.items() if k not in ('parent', 'child', 'id')} 384 self.graph.edges[(edge_id[0], edge_id[1])].update(edge) 385 386 def relabel_node(self, node: _T.Union[str, _T.Callable], label: str): 387 """ 388 Update the label, or name, given to a node in the DAG's scope, 389 given a Python string and a node identifier. 390 Well-structured wrapper for a common use-case of DAG.update_nodes. 391 """ 392 node_id = self.node_id(node) 393 if not isinstance(label, str) or len(label) == 0: 394 err_msg = "Node label must be a Python string with one or more characters." 395 raise ValueError(err_msg) 396 self.update_nodes(node_id, data = {'name': label}) 397 return label 398 399 def update_nodes(self, *N, filter: _types.LambdaType = None, data = None, callable = None, args = None, kwargs = None): 400 """ 401 Flexible function to update properties of nodes in the DAG's scope, 402 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 403 lambda filter using the dotdict syntax. 404 405 List of nodes to update or filter argument is expected. Valid node lists include: 406 407 \t update_nodes(node_id, ...) or update_nodes([node_id], ...) 408 409 \t update_nodes(callable, ...) or update_nodes([callable], ...) 410 411 \t update_nodes(node1, node2, ...) or update_nodes([node1, node2], ...) 412 413 \t update_nodes(node1, callable2, ...) or update_nodes([node1, callable2], ...) 414 415 > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id) 416 417 Parameters: 418 419 > filter: lambda property that can be used instead of a list of nodes to be updated. 420 421 \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args) 422 423 > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set. 424 Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method. 425 426 \t data = { 427 \t 'callable': print, 428 \t 'args': ['Hello', 'World'], 429 \t 'kwargs': {'sep': ', '}, 430 \t 'name': 'custom_label', 431 \t 'id': 'unique-identifier' 432 \t} 433 434 > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List). 435 436 > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary. 437 438 """ 439 if len(N) == 0 and filter == None: 440 raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.") 441 442 elif len(N) > 0 and filter != None: 443 raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.") 444 445 elif isinstance(N, dict) or isinstance(N, str): 446 N = [N] 447 448 elif len(N) == 1 and isinstance(N[0], _abc.Iterable): 449 if isinstance(N[0][0], dict): 450 N = N[0] 451 elif isinstance(N[0][0], str): 452 # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator). 453 if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 454 N = N[0] 455 456 457 if filter != None: 458 node_ids = self.filter_nodes(filter, data = False) 459 else: 460 if isinstance(N[0], dict): 461 ids = [n['id'] for n in N] 462 else: 463 ids = N 464 node_ids = [self.node_id(i) for i in ids] 465 466 std_update = (callable == None) and (args == None) and (kwargs == None) 467 468 if std_update: 469 for node_id, n in zip(node_ids, N): 470 if data != None: 471 self._update_node(data, id = node_id) 472 else: 473 self._update_node(n, id = node_id) 474 475 else: 476 if callable != None: 477 if not _iscallable(callable): 478 err_msg = f"Requested node is not assigned a callable Python function." 479 raise TypeError(err_msg) 480 for node_id in node_ids: 481 self.graph.nodes[node_id]['callable'] = callable 482 self.graph.nodes[node_id]['name'] = callable.__name__ 483 484 if args != None: 485 if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)): 486 err_msg = f"Requested node is not assigned a tuple of pos args." 487 raise TypeError(err_msg) 488 args = tuple(args) 489 for node_id in node_ids: 490 self.graph.nodes[node_id]['args'] = args 491 492 if kwargs != None: 493 if not isinstance(kwargs, dict): 494 err_msg = f"Requested node is not assigned a dictionary of kw args." 495 raise TypeError(err_msg) 496 for node_id in node_ids: 497 self.graph.nodes[node_id]['kwargs'] = kwargs 498 499 def get_edges(self, *N, data: bool = True) -> _T.Union[list[edgedict], list[tuple]]: 500 """ 501 Retrieval function for DAG edge data, based on tuple identifiers. 502 Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2'])) 503 """ 504 if len(N) == 0: 505 if data == True: 506 return list(map(edgedict, self.graph.edges.data())) 507 return list(self.graph.edges.data(data=False)) 508 509 # elif len(N) == 1: 510 # if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple): 511 # N = N[0] 512 513 if len(N) != len(set(N)) or False in map(lambda n: isinstance(n, _abc.Iterable) and len(n) == 2, N): 514 err_msg = 'All requested edges must be unique tuples of size 2.' 515 raise ValueError(err_msg) 516 517 edge_data = [self.get_edge(o, i, data=data) for (o, i) in N] 518 return edge_data 519 520 def get_edge(self, outgoing_node: _T.Union[str, callable], incoming_node: _T.Union[str, callable], data: bool = True) -> _T.Union[edgedict, tuple]: 521 """ 522 Retrieval function for a single directed edge between nodes in a DAG's scope. 523 """ 524 id = (self.node_id(outgoing_node), self.node_id(incoming_node)) 525 if not data: 526 return id 527 edge_data = self.graph.get_edge_data(u = id[0], v = id[1]) 528 if not edge_data: 529 err_msg = f'No match found for the directed edge requested: {id}' 530 raise ValueError(err_msg) 531 return edgedict(*id, **edge_data) 532 533 def get_node(self, n: _T.Union[str, callable]) -> dotdict: 534 """ 535 Given a unique identifier, returns a dictionary of node metadata 536 for a single node in the DAG's scope. 537 """ 538 node_id = self.node_id(n) 539 return dotdict(self.graph.nodes[node_id]) 540 541 def get_nodes(self, *N) -> list[dotdict]: 542 """ 543 Flexible function to retrieve DAG node data, based on node identifiers 544 (e.g.: string IDs or unique callables). 545 """ 546 if len(N) == 0: 547 node_data = list(self.graph.nodes.values()) 548 return [dotdict(d) for d in node_data] 549 550 elif len(N) == 1: 551 if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str): 552 N = N[0] 553 else: 554 node_id = self.node_id(N[0]) 555 node_data = [dotdict(self.graph.nodes[node_id])] 556 return node_data 557 558 if not len(N) == len(set(N)): 559 err_msg = 'All requested nodes must be unique identifiers.' 560 raise ValueError(err_msg) 561 562 node_ids = [self.node_id(n) for n in N] 563 node_data = [dotdict(self.graph.nodes[n]) for n in node_ids] 564 return node_data 565 566 def filter_nodes(self, filter: _types.LambdaType, data: bool = False): 567 """ 568 Given a lambda function, filter nodes in a DAG's scope based on its attributes. 569 Current limitation: Filters must use node identifier strings when referencing nodes. 570 Use get_nodes for more flexible controls. 571 """ 572 if not data: 573 return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))] 574 return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))] 575 576 def filter_edges(self, filter = _types.LambdaType, data: bool = False) -> list[edgedict]: 577 """ 578 Given a lambda function, filter edges in a DAG's scope based on its attributes. 579 Current limitation: Filters must use node identifier strings when referencing nodes. 580 Use get_edges for more flexible controls. 581 """ 582 if not data: 583 return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))] 584 return [edgedict(e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))] 585 586 def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int): 587 """ 588 Given a node identifier, set number of automatic retries in case of failure. 589 Re-attempts will begin as soon as possible. 590 """ 591 if not isinstance(count, int) or not count >= 0: 592 raise ValueError("Retry count must be a non-negative integer.") 593 node = self.node_id(identifier) 594 self.graph.nodes[node]['retry'] = count 595 596 def skip_node(self, identifier: _T.Union[str,_T.Callable], skip: bool = True, as_failure = False): 597 """ 598 Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error). 599 Allows conditional control and testing over DAG's order of operations. 600 """ 601 if not isinstance(skip, bool): 602 raise ValueError("Skip argument must be a boolean value.") 603 node = self.node_id(identifier) 604 self.graph.nodes[node]['skip'] = (skip, as_failure) 605 606 def critical_path(self, nodes): 607 """ 608 Given a set of nodes, returns a subset of the DAG containing 609 only the node(s) and its parents, or upstream dependencies. 610 """ 611 if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str): 612 node_ids = {self.node_id(n) for n in nodes} 613 else: 614 node_ids = {self.node_id(nodes)} 615 return self._subgraph(node_ids) 616 617 def execute(self, return_ref=False, print_exc=False): 618 """ 619 Basic wrapper for execution of the DAG's underlying callables. 620 """ 621 executor = self.DAGExecutor(self.graph, self.root, print_exc) 622 return executor if return_ref else None 623 624 def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] = None): 625 """ 626 Basic wrapper to visualize DAG using Vis.js and NetGraph libraries. 627 By default, visualization library only loaded in after DAG.visualize() is called, halving import times. 628 """ 629 import webber.viz as _viz 630 631 match type: 632 case 'browser': 633 _viz.visualize_browser(self.graph) 634 635 case 'plt': 636 return _viz.visualize_plt(self.graph) 637 638 case 'gui': 639 # _visualize_gui(self.graph) 640 raise NotImplementedError 641 642 case None: 643 if _viz._in_notebook(): 644 return _viz.visualize_plt(self.graph) 645 else: 646 _viz.visualize_browser(self.graph) 647 648 case _: 649 err_msg = "Unknown visualization type requested." 650 raise NotImplementedError(err_msg) 651 652 @property 653 def root(self) -> list[str]: 654 """ 655 Return list of nodes with no dependencies. 656 Root nodes will occur first in DAG's order of operations. 657 """ 658 return list(filter( 659 lambda node: len(list(self.graph.predecessors(node))) < 1, 660 self.graph.nodes.keys() 661 )) 662 663 @property 664 def nodes(self): 665 return self.graph.nodes 666 667 def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str: 668 """ 669 Validate whether identifier given is a valid node within the DAG's scope. 670 Primarily for internal use, but useful for retrieving string identifiers 671 for a unique callable in a DAG. 672 """ 673 node_names, node_callables = zip(*self.graph.nodes(data='callable')) 674 675 if isinstance(identifier, str): 676 if identifier not in node_names: 677 err_msg = f"Node {identifier} is not defined in this DAG's scope." 678 raise ValueError(err_msg) 679 node = identifier 680 elif _iscallable(identifier): 681 match node_callables.count(identifier): 682 case 0: 683 err_msg = f"Callable {identifier} is not defined in this DAG's scope." 684 raise ValueError(err_msg) 685 case 1: 686 node = node_names[ node_callables.index(identifier) ] 687 case _: 688 err_msg = f"Callable {identifier.__name__} " \ 689 + "exists more than once in this DAG. " \ 690 + "Use the unique identifier of the required node." 691 raise ValueError(err_msg) 692 else: 693 err_msg = f"Node {identifier} must be a string or a Python callable" 694 raise TypeError(err_msg) 695 return node 696 697 def _update_node(self, nodedict: dict, id: str = None, force: bool = False): 698 """ 699 Internal only. Update properties of single node within a DAG's scope, 700 given a well-structured dictionary and the tuple identifier of the network edge. 701 Force argument bypasses dictionary validation, and should only be used internally. 702 """ 703 if id != None: 704 try: 705 if nodedict.get('id') != None and id != nodedict['id']: 706 raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {nodedict['id']}") 707 except ValueError as e: 708 if not force: 709 raise e 710 nodedict['id'] = id 711 712 expected_keys = ('callable', 'args', 'kwargs', 'name', 'id') 713 if not set(expected_keys).issuperset(set(nodedict.keys())): 714 raise ValueError(f"Expecting keys: {expected_keys}") 715 716 if not force: 717 if nodedict.get('callable'): 718 if _iscallable(nodedict['callable']): 719 err_msg = f"Requested node is not assigned a callable Python function." 720 raise TypeError(err_msg) 721 if not nodedict.get('name'): 722 nodedict['name'] = nodedict['callable'].__name__ 723 724 if nodedict.get('name') and (not isinstance(nodedict['name'], str) or len(nodedict['name']) == 0): 725 err_msg = f"Requested node name must be a non-null Python string, will default to callable when not set." 726 raise TypeError(err_msg) 727 728 if nodedict.get('args'): 729 if not (isinstance(nodedict['args'], _abc.Iterable) or isinstance(nodedict['args'], str)): 730 err_msg = f"Requested node is not assigned a tuple of pos args." 731 raise TypeError(err_msg) 732 nodedict['args'] = tuple(nodedict['args']) 733 734 if nodedict.get('kwargs') and not isinstance(nodedict['kwargs'], dict): 735 err_msg = f"Requested node is not assigned a dictionary of kw args." 736 raise TypeError(err_msg) 737 738 node_id = nodedict.pop('id') 739 self.graph.nodes[node_id].update(nodedict) 740 741 # Reset node name if implicitly requested. 742 if not nodedict.get('name'): 743 self.graph.nodes[node_id]['name'] = self.graph.nodes[node_id]['callable'].__name__ 744 745 def _subgraph(self, node_ids: set[str]): 746 """ 747 Internal only. Given a set of nodes, returns a subset of the DAG containing 748 only the node(s) and upstream dependencies. 749 """ 750 parent_nodes = set() 751 for node in node_ids: 752 parent_nodes = parent_nodes.union(set(self.graph.predecessors(node))) 753 while parent_nodes: 754 node_ids = node_ids.union(parent_nodes) 755 parent_nodes, child_nodes = set(), parent_nodes 756 for node in child_nodes: 757 parent_nodes = parent_nodes.union(set(self.graph.predecessors(node))) 758 subgraph = self.graph.subgraph(node_ids) 759 return DAG(subgraph, __force=True) 760 761 def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise: 762 """ 763 Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope. 764 Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope. 765 """ 766 try: 767 key = self.node_id(promise.key) 768 except Exception as e: 769 raise _xcoms.InvalidCallable(e) 770 return _xcoms.Promise(key) 771 772 def __init__(self, graph: _nx.DiGraph = None, **kwargs) -> None: 773 774 if graph is None: 775 self.graph = _nx.DiGraph() 776 return 777 778 # Meant for internal use only, creating DAGs from subgraphs. 779 if kwargs.get('__force') == True: 780 self.graph = graph 781 return 782 783 _edges.validate_dag(graph) 784 785 # Define framework specific logic as nested dictionaries. 786 for node in graph.nodes.keys(): 787 graph.nodes[node]['callable'] = node 788 graph.nodes[node]['name'] = node.__name__ 789 graph.nodes[node]['args'] = [] 790 graph.nodes[node]['kwargs'] = {} 791 792 for e in graph.edges: 793 condition = graph.edges[e].get('Condition') 794 if condition is not None: 795 if condition not in Condition: 796 raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.') 797 else: 798 graph.edges[e]['Condition'] = Condition.Success 799 800 graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node)) 801 for n in graph.nodes: 802 graph.nodes[n]['id'] = n 803 self.graph = _nx.DiGraph(graph) 804 805 class DAGExecutor: 806 """ 807 Base class used to execute DAG in embarrassingly parallel. 808 """ 809 def __init__(self, graph: _nx.DiGraph, roots: list, print_exc: bool = False) -> None: 810 811 with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _: 812 813 # Skip execution if there are no callables in scope. 814 if len(graph.nodes) == 0: 815 print('Given DAG has no callables in scope. Skipping execution...') 816 return 817 818 # Initialize local variables for execution. 819 complete, started, failed, skipped = set(), set(), set(), set() 820 events = set(roots) 821 refs: dict[str, _futures.Future] = {} 822 823 def raise_exc(message): 824 raise ValueError(message) 825 826 def run_conditions_met(n): 827 for p in graph.predecessors(n): 828 match graph.edges.get((p, n))['Condition']: 829 case Condition.Success: 830 if p not in complete: 831 return False 832 case Condition.Failure: 833 if p not in failed: 834 return False 835 case Condition.AnyCase: 836 if p not in failed and p not in complete: 837 return False 838 return True 839 840 skip = graph.nodes.data("skip", default=(False, False)) 841 retry = {n: [c+1, {}] for n,c in graph.nodes.data("retry", default=0)} 842 843 # Start execution of root node functions. 844 with _futures.ThreadPoolExecutor() as executor: 845 846 def Submit(event, callable, name, args, kwargs): 847 if skip[event][0]: # NOTE: Internally, DAGExecutor tracks these skipped events as successes or failures. 848 retry[event][0] = 0 849 skip_callable = raise_exc if skip[event][1] else print 850 return executor.submit( 851 _event_wrapper, 852 _callable=skip_callable, 853 _name=graph.nodes[event]['name'], 854 _args=[f"Event {event} skipped..."], 855 _kwargs={} 856 ) 857 else: 858 retry[event][0] -= 1 859 if (retry[event][0] > 0) and (retry[event][1] == {}): 860 retry[event][1] = { 861 'callable': callable, 862 'name': name, 863 'args': args, 864 'kwargs': kwargs 865 } 866 return executor.submit( 867 _event_wrapper, 868 _callable=callable, 869 _name=name, 870 _args=args, 871 _kwargs=kwargs 872 ) 873 874 for event in events: 875 refs[event] = Submit( 876 event, 877 graph.nodes[event]['callable'], 878 graph.nodes[event]['name'], 879 graph.nodes[event]['args'], 880 graph.nodes[event]['kwargs'] 881 ) 882 started.add(event) 883 # Loop until all nodes in the network are executed. 884 while (len(complete) + len(failed) + len(skipped)) != len(graph): 885 for event in events: 886 if refs[event].done() is True: 887 if refs[event].exception(timeout=0) is not None: 888 try: 889 raise refs[event].exception(timeout=0) 890 except: 891 if print_exc: 892 _traceback.print_exc() 893 894 if retry[event][0] > 0: 895 print(f"Event {event} exited with exception, retrying...") 896 refs[event] = Submit( 897 event, 898 callable=retry[event][1]['callable'], 899 name=retry[event][1]['name'], 900 args=retry[event][1]['args'], 901 kwargs=retry[event][1]['kwargs'] 902 ) 903 continue 904 905 print(f"Event {event} exited with exception...") 906 failed.add(event) 907 skipping = [ 908 e[1] for e in set(graph.out_edges(event)) 909 if not _edges.continue_on_failure(graph.edges.get(e)) 910 ] 911 else: 912 complete.add(event) 913 skipping = [ 914 e[1] for e in set(graph.out_edges(event)) 915 if not _edges.continue_on_success(graph.edges.get(e)) 916 ] 917 skipped = skipped.union(skipping) 918 for n in skipping: 919 skipped = skipped.union(_nx.descendants(graph, n)) 920 carryon = set(graph.successors(event)).difference(skipped) 921 starting = [ 922 successor for successor in carryon if 923 run_conditions_met(successor) 924 ] 925 for successor in starting: 926 _args = [ 927 a if not isinstance(a, _xcoms.Promise) else refs[a.key].result() 928 for a in graph.nodes[successor]['args'] 929 ] 930 _kwargs = { 931 k: v if not isinstance(v, _xcoms.Promise) else refs[v.key].result() 932 for k, v in graph.nodes[successor]['kwargs'].items() 933 } 934 refs[successor] = Submit( 935 successor, 936 graph.nodes[successor]['callable'], 937 graph.nodes[successor]['name'], 938 _args, 939 _kwargs 940 ) 941 started.add(successor) 942 # Initialized nodes that are incomplete or not yet documented as complete. 943 events = started.difference(complete.union(failed).union(skipped))
Directed Acyclic Graph used to represent Pythonic tasks in parallel.
772 def __init__(self, graph: _nx.DiGraph = None, **kwargs) -> None: 773 774 if graph is None: 775 self.graph = _nx.DiGraph() 776 return 777 778 # Meant for internal use only, creating DAGs from subgraphs. 779 if kwargs.get('__force') == True: 780 self.graph = graph 781 return 782 783 _edges.validate_dag(graph) 784 785 # Define framework specific logic as nested dictionaries. 786 for node in graph.nodes.keys(): 787 graph.nodes[node]['callable'] = node 788 graph.nodes[node]['name'] = node.__name__ 789 graph.nodes[node]['args'] = [] 790 graph.nodes[node]['kwargs'] = {} 791 792 for e in graph.edges: 793 condition = graph.edges[e].get('Condition') 794 if condition is not None: 795 if condition not in Condition: 796 raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.') 797 else: 798 graph.edges[e]['Condition'] = Condition.Success 799 800 graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node)) 801 for n in graph.nodes: 802 graph.nodes[n]['id'] = n 803 self.graph = _nx.DiGraph(graph)
79 def add_node(self, node, *args, **kwargs) -> str: 80 """ 81 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 82 On success, return unique identifier for the new node. 83 """ 84 if not _iscallable(node): 85 err_msg = f"{node}: requested node is not a callable Python function." 86 raise TypeError(err_msg) 87 88 node_name = _edges.label_node(node) 89 90 args = tuple([ 91 arg if not isinstance(args, _xcoms.Promise) else self.resolve_promise(arg) 92 for arg in args 93 ]) 94 95 for k, val in kwargs.items(): 96 if isinstance(val, _xcoms.Promise): 97 kwargs[k] = self.resolve_promise(val) 98 99 self.graph.add_node( 100 node_for_adding=node_name, 101 callable=node, args=args, kwargs=kwargs, 102 name=node.__name__, 103 id=node_name 104 ) 105 106 return node_name
Adds a callable with positional and keyword arguments to the DAG's underlying graph. On success, return unique identifier for the new node.
109 def add_edge( 110 self, 111 u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable], 112 continue_on: Condition = Condition.Success 113 ) -> _T.Tuple[str,str]: 114 """ 115 Adds an edge between nodes in the DAG's underlying graph, 116 so long as the requested edge is unique and has not been added previously. 117 118 On success, returns Tuple of the new edge's unique identifiers. 119 """ 120 # Validate inputs prior to execution 121 # - Nodes must be identifiers or callables 122 # - Conditions must belong to the webber.edges.Condition class 123 if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)): 124 err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable" 125 raise TypeError(err_msg) 126 if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)): 127 err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable" 128 raise TypeError(err_msg) 129 if not isinstance(continue_on, Condition): 130 raise TypeError("Edge conditions must use the webber.edges.Condition class.") 131 132 # Base Case 0: No nodes are present in the DAG: 133 # Ensure that both nodes are callables, then add both to the graph and 134 # assign the outgoing node as a root. 135 if len(self.graph.nodes()) == 0: 136 if not _iscallable(u_of_edge): 137 err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope." 138 raise ValueError(err_msg) 139 if not _iscallable(v_of_edge): 140 err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope." 141 raise ValueError(err_msg) 142 outgoing_node = self.add_node(u_of_edge) 143 incoming_node = self.add_node(v_of_edge) 144 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 145 return (outgoing_node, incoming_node) 146 147 node_names, node_callables = zip(*self.graph.nodes(data='callable')) 148 graph_edges = list(self.graph.edges(data=False)) 149 new_callables = dict() 150 151 if _iscallable(u_of_edge) and _iscallable(v_of_edge): 152 153 # Error Cases 0, 1: Either of the callables appear more than once in the DAG. 154 if node_callables.count(u_of_edge) > 1: 155 err_msg = f"Outgoing callable {u_of_edge.__name__} " \ 156 + "exists more than once in this DAG. " \ 157 + "Use the unique identifier of the required node or add a new node." 158 raise ValueError(err_msg) 159 if node_callables.count(v_of_edge) > 1: 160 err_msg = f"Incoming callable {v_of_edge.__name__} " \ 161 + "exists more than once in this DAG. " \ 162 + "Use the unique string identifier of the required node." 163 raise ValueError(err_msg) 164 165 # Base Case 1: Both args are callables and will be present in the DAG scope no more than once. 166 # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes. 167 try: 168 outgoing_node = self.node_id(u_of_edge) 169 except: 170 new_callables[u_of_edge] = u_of_edge 171 outgoing_node = _edges.label_node(u_of_edge) 172 try: 173 incoming_node = self.node_id(v_of_edge) 174 except: 175 new_callables[v_of_edge] = v_of_edge 176 incoming_node = _edges.label_node(v_of_edge) 177 178 else: 179 180 # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope. 181 if isinstance(u_of_edge, str) and u_of_edge not in node_names: 182 err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope." 183 raise ValueError(err_msg) 184 if isinstance(v_of_edge, str) and v_of_edge not in node_names: 185 err_msg = f"Incoming node {v_of_edge} not in DAG's current scope." 186 raise ValueError(err_msg) 187 188 # Both nodes' unique identifiers are present in the DAG 189 # and should be evaluated for a valid edge. 190 if isinstance(u_of_edge, str) and isinstance(v_of_edge, str): 191 outgoing_node = u_of_edge 192 incoming_node = v_of_edge 193 194 # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier. 195 else: 196 for node in (u_of_edge, v_of_edge): 197 def _assign_node(n): 198 # For the argument that is a unique string identifier, assign and continue. 199 if not _iscallable(n): 200 return n 201 # Error Case 4: The requested callable exists more than once in the DAG. 202 if node_callables.count(n) > 1: 203 err_msg = f"Outgoing callable {n.__name__} " \ 204 + "exists more than once in this DAG. " \ 205 + "Use the unique ID of the required node or add a new node." 206 raise ValueError(err_msg) 207 # If the callable exists only once in the DAG, use its unique identifier to 208 # evaluate the requested edge. 209 if node_callables.count(n) == 1: 210 return node_names[ node_callables.index(n) ] 211 # Otherwise, the callable is new and needs to be added to the DAG scope. 212 else: 213 new_callables[n] = n 214 return _edges.label_node(n) 215 if node == u_of_edge: 216 outgoing_node = _assign_node(node) 217 else: 218 incoming_node = _assign_node(node) 219 220 # Error Case 5: Both callables exist only once in the DAG, 221 # but an edge already exists between them. 222 if (outgoing_node, incoming_node) in graph_edges: 223 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \ 224 + "a definition in this DAG." 225 raise ValueError(err_msg) 226 227 # Ensure that no cycles will be created by adding this edge to the DAG. 228 test_edges: list = graph_edges + [(outgoing_node, incoming_node)] 229 230 # Error Case 6: Both callables exist only once in the DAG, 231 # but adding an edge between them creates circular dependencies. 232 if not _nx.is_directed_acyclic_graph(_nx.DiGraph(test_edges)): 233 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \ 234 + "results in circular dependencies." 235 raise ValueError(err_msg) 236 237 # We can now add the edge to the DAG, since we are certain it will not result in 238 # illegal dependencies/behavior. 239 # First, we should account for potential new nodes. This also handles 240 # duplicates on first entry to the DAG (e.g.: edge == (print, print)) 241 if new_callables.get(u_of_edge) != None: 242 outgoing_node = self.add_node(new_callables[u_of_edge]) 243 if new_callables.get(v_of_edge) != None: 244 incoming_node = self.add_node(new_callables[v_of_edge]) 245 246 # Then we can add the new edge. 247 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 248 return (outgoing_node, incoming_node)
Adds an edge between nodes in the DAG's underlying graph, so long as the requested edge is unique and has not been added previously.
On success, returns Tuple of the new edge's unique identifiers.
250 def remove_node(self, *posargs, **kwargs) -> None: 251 """ 252 Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG. 253 Throws error message and recommends safer methods. 254 """ 255 raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.")
Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG. Throws error message and recommends safer methods.
257 def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]: 258 """ 259 Removes an directed edge between nodes in the DAG's underlying graph. 260 Throws error if the edge does not exist. 261 On success, returns Tuple of the removed edge's unique identifiers. 262 """ 263 edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge)) 264 if edge_id not in self.graph.edges(data = False): 265 err_msg = "Requested edge does not exist in the DAG's scope" 266 raise ValueError(err_msg) 267 self.graph.remove_edge(edge_id[0], edge_id[1]) 268 return edge_id
Removes an directed edge between nodes in the DAG's underlying graph. Throws error if the edge does not exist. On success, returns Tuple of the removed edge's unique identifiers.
270 def update_edges(self, *E, continue_on: Condition, filter: _types.LambdaType = None, data = None): 271 """ 272 Flexible function to update properties of edges in the DAG's scope, 273 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 274 lambda filter using the edgedict syntax. 275 276 List of nodes to update or filter argument is expected. Valid edge lists include: 277 278 \t update_edges((node1, node2), ...) or update_edges([(node1, node2)], ...) 279 280 \t update_edges((node1, callable2), ...) or update_edges([(node1, callable2)], ...) 281 282 \t update_edges(edgedict), where isinstance(edgedict, webber.edges.edgedict) == True 283 284 Parameters: 285 286 > continue_on: webber.edges.Condition value to update matching edges with, 287 288 \t Execute child on success, failure, or any exit state, of the parent node 289 290 > filter: lambda property that can be used instead of a list of edges to be updated. 291 292 \t filter = (lambda e: n.parent == print or e.child == print) 293 294 > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set. 295 296 \t data = { 'continue_on': webber.edges.Condition } 297 298 """ 299 if len(E) == 0 and filter == None: 300 raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.") 301 302 elif isinstance(E, dict) or isinstance(E, edgedict): 303 E = [E] 304 305 elif len(E) == 1 and isinstance(E[0], _abc.Iterable): 306 try: 307 _ = self.get_edges(E[0]) 308 except: 309 E = E[0] 310 311 if filter != None: 312 edge_ids = self.filter_edges(filter, data = False) 313 else: 314 if isinstance(E[0], dict) or isinstance(E[0], edgedict): 315 try: 316 ids = [e['id'] for e in E] 317 except KeyError: 318 err_msg = 'In dictionary form, all given edges must be follow edgedict standards.' 319 raise ValueError(err_msg) 320 else: 321 ids = E 322 edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids] 323 324 std_update = (continue_on == None) 325 326 if std_update: 327 for edge_id, e in zip(edge_ids, E): 328 if data != None: 329 self._update_edge(data, id = edge_id) 330 else: 331 self._update_edge(e, id = edge_id) 332 333 else: 334 if continue_on != None: 335 if not isinstance(continue_on, Condition): 336 err_msg = f"Condition assignment must use webber.edges.Condition" 337 raise TypeError(err_msg) 338 for e in edge_ids: 339 self.graph.edges[e]['Condition'] = continue_on
Flexible function to update properties of edges in the DAG's scope, based on unique identifier(s) (e.g.: string IDs or unique callables) or a lambda filter using the edgedict syntax.
List of nodes to update or filter argument is expected. Valid edge lists include:
update_edges((node1, node2), ...) or update_edges([(node1, node2)], ...)
update_edges((node1, callable2), ...) or update_edges([(node1, callable2)], ...)
update_edges(edgedict), where isinstance(edgedict, webber.edges.edgedict) == True
Parameters:
continue_on: Condition value to update matching edges with,
Execute child on success, failure, or any exit state, of the parent node
filter: lambda property that can be used instead of a list of edges to be updated.
filter = (lambda e: n.parent == print or e.child == print)
data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set.
data = { 'continue_on': Condition }
386 def relabel_node(self, node: _T.Union[str, _T.Callable], label: str): 387 """ 388 Update the label, or name, given to a node in the DAG's scope, 389 given a Python string and a node identifier. 390 Well-structured wrapper for a common use-case of DAG.update_nodes. 391 """ 392 node_id = self.node_id(node) 393 if not isinstance(label, str) or len(label) == 0: 394 err_msg = "Node label must be a Python string with one or more characters." 395 raise ValueError(err_msg) 396 self.update_nodes(node_id, data = {'name': label}) 397 return label
Update the label, or name, given to a node in the DAG's scope, given a Python string and a node identifier. Well-structured wrapper for a common use-case of DAG.update_nodes.
399 def update_nodes(self, *N, filter: _types.LambdaType = None, data = None, callable = None, args = None, kwargs = None): 400 """ 401 Flexible function to update properties of nodes in the DAG's scope, 402 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 403 lambda filter using the dotdict syntax. 404 405 List of nodes to update or filter argument is expected. Valid node lists include: 406 407 \t update_nodes(node_id, ...) or update_nodes([node_id], ...) 408 409 \t update_nodes(callable, ...) or update_nodes([callable], ...) 410 411 \t update_nodes(node1, node2, ...) or update_nodes([node1, node2], ...) 412 413 \t update_nodes(node1, callable2, ...) or update_nodes([node1, callable2], ...) 414 415 > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id) 416 417 Parameters: 418 419 > filter: lambda property that can be used instead of a list of nodes to be updated. 420 421 \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args) 422 423 > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set. 424 Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method. 425 426 \t data = { 427 \t 'callable': print, 428 \t 'args': ['Hello', 'World'], 429 \t 'kwargs': {'sep': ', '}, 430 \t 'name': 'custom_label', 431 \t 'id': 'unique-identifier' 432 \t} 433 434 > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List). 435 436 > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary. 437 438 """ 439 if len(N) == 0 and filter == None: 440 raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.") 441 442 elif len(N) > 0 and filter != None: 443 raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.") 444 445 elif isinstance(N, dict) or isinstance(N, str): 446 N = [N] 447 448 elif len(N) == 1 and isinstance(N[0], _abc.Iterable): 449 if isinstance(N[0][0], dict): 450 N = N[0] 451 elif isinstance(N[0][0], str): 452 # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator). 453 if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 454 N = N[0] 455 456 457 if filter != None: 458 node_ids = self.filter_nodes(filter, data = False) 459 else: 460 if isinstance(N[0], dict): 461 ids = [n['id'] for n in N] 462 else: 463 ids = N 464 node_ids = [self.node_id(i) for i in ids] 465 466 std_update = (callable == None) and (args == None) and (kwargs == None) 467 468 if std_update: 469 for node_id, n in zip(node_ids, N): 470 if data != None: 471 self._update_node(data, id = node_id) 472 else: 473 self._update_node(n, id = node_id) 474 475 else: 476 if callable != None: 477 if not _iscallable(callable): 478 err_msg = f"Requested node is not assigned a callable Python function." 479 raise TypeError(err_msg) 480 for node_id in node_ids: 481 self.graph.nodes[node_id]['callable'] = callable 482 self.graph.nodes[node_id]['name'] = callable.__name__ 483 484 if args != None: 485 if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)): 486 err_msg = f"Requested node is not assigned a tuple of pos args." 487 raise TypeError(err_msg) 488 args = tuple(args) 489 for node_id in node_ids: 490 self.graph.nodes[node_id]['args'] = args 491 492 if kwargs != None: 493 if not isinstance(kwargs, dict): 494 err_msg = f"Requested node is not assigned a dictionary of kw args." 495 raise TypeError(err_msg) 496 for node_id in node_ids: 497 self.graph.nodes[node_id]['kwargs'] = kwargs
Flexible function to update properties of nodes in the DAG's scope, based on unique identifier(s) (e.g.: string IDs or unique callables) or a lambda filter using the dotdict syntax.
List of nodes to update or filter argument is expected. Valid node lists include:
update_nodes(node_id, ...) or update_nodes([node_id], ...)
update_nodes(callable, ...) or update_nodes([callable], ...)
update_nodes(node1, node2, ...) or update_nodes([node1, node2], ...)
update_nodes(node1, callable2, ...) or update_nodes([node1, callable2], ...)
update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id)
Parameters:
filter: lambda property that can be used instead of a list of nodes to be updated.
filter = (lambda n: n.callable == print or 'Hello, World' in n.args)
data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set. Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method.
data = { 'callable': print, 'args': ['Hello', 'World'], 'kwargs': {'sep': ', '}, 'name': 'custom_label', 'id': 'unique-identifier' }
args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List).
kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary.
499 def get_edges(self, *N, data: bool = True) -> _T.Union[list[edgedict], list[tuple]]: 500 """ 501 Retrieval function for DAG edge data, based on tuple identifiers. 502 Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2'])) 503 """ 504 if len(N) == 0: 505 if data == True: 506 return list(map(edgedict, self.graph.edges.data())) 507 return list(self.graph.edges.data(data=False)) 508 509 # elif len(N) == 1: 510 # if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple): 511 # N = N[0] 512 513 if len(N) != len(set(N)) or False in map(lambda n: isinstance(n, _abc.Iterable) and len(n) == 2, N): 514 err_msg = 'All requested edges must be unique tuples of size 2.' 515 raise ValueError(err_msg) 516 517 edge_data = [self.get_edge(o, i, data=data) for (o, i) in N] 518 return edge_data
Retrieval function for DAG edge data, based on tuple identifiers. Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2']))
520 def get_edge(self, outgoing_node: _T.Union[str, callable], incoming_node: _T.Union[str, callable], data: bool = True) -> _T.Union[edgedict, tuple]: 521 """ 522 Retrieval function for a single directed edge between nodes in a DAG's scope. 523 """ 524 id = (self.node_id(outgoing_node), self.node_id(incoming_node)) 525 if not data: 526 return id 527 edge_data = self.graph.get_edge_data(u = id[0], v = id[1]) 528 if not edge_data: 529 err_msg = f'No match found for the directed edge requested: {id}' 530 raise ValueError(err_msg) 531 return edgedict(*id, **edge_data)
Retrieval function for a single directed edge between nodes in a DAG's scope.
533 def get_node(self, n: _T.Union[str, callable]) -> dotdict: 534 """ 535 Given a unique identifier, returns a dictionary of node metadata 536 for a single node in the DAG's scope. 537 """ 538 node_id = self.node_id(n) 539 return dotdict(self.graph.nodes[node_id])
Given a unique identifier, returns a dictionary of node metadata for a single node in the DAG's scope.
541 def get_nodes(self, *N) -> list[dotdict]: 542 """ 543 Flexible function to retrieve DAG node data, based on node identifiers 544 (e.g.: string IDs or unique callables). 545 """ 546 if len(N) == 0: 547 node_data = list(self.graph.nodes.values()) 548 return [dotdict(d) for d in node_data] 549 550 elif len(N) == 1: 551 if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str): 552 N = N[0] 553 else: 554 node_id = self.node_id(N[0]) 555 node_data = [dotdict(self.graph.nodes[node_id])] 556 return node_data 557 558 if not len(N) == len(set(N)): 559 err_msg = 'All requested nodes must be unique identifiers.' 560 raise ValueError(err_msg) 561 562 node_ids = [self.node_id(n) for n in N] 563 node_data = [dotdict(self.graph.nodes[n]) for n in node_ids] 564 return node_data
Flexible function to retrieve DAG node data, based on node identifiers (e.g.: string IDs or unique callables).
566 def filter_nodes(self, filter: _types.LambdaType, data: bool = False): 567 """ 568 Given a lambda function, filter nodes in a DAG's scope based on its attributes. 569 Current limitation: Filters must use node identifier strings when referencing nodes. 570 Use get_nodes for more flexible controls. 571 """ 572 if not data: 573 return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))] 574 return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))]
Given a lambda function, filter nodes in a DAG's scope based on its attributes. Current limitation: Filters must use node identifier strings when referencing nodes. Use get_nodes for more flexible controls.
576 def filter_edges(self, filter = _types.LambdaType, data: bool = False) -> list[edgedict]: 577 """ 578 Given a lambda function, filter edges in a DAG's scope based on its attributes. 579 Current limitation: Filters must use node identifier strings when referencing nodes. 580 Use get_edges for more flexible controls. 581 """ 582 if not data: 583 return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))] 584 return [edgedict(e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
Given a lambda function, filter edges in a DAG's scope based on its attributes. Current limitation: Filters must use node identifier strings when referencing nodes. Use get_edges for more flexible controls.
586 def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int): 587 """ 588 Given a node identifier, set number of automatic retries in case of failure. 589 Re-attempts will begin as soon as possible. 590 """ 591 if not isinstance(count, int) or not count >= 0: 592 raise ValueError("Retry count must be a non-negative integer.") 593 node = self.node_id(identifier) 594 self.graph.nodes[node]['retry'] = count
Given a node identifier, set number of automatic retries in case of failure. Re-attempts will begin as soon as possible.
596 def skip_node(self, identifier: _T.Union[str,_T.Callable], skip: bool = True, as_failure = False): 597 """ 598 Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error). 599 Allows conditional control and testing over DAG's order of operations. 600 """ 601 if not isinstance(skip, bool): 602 raise ValueError("Skip argument must be a boolean value.") 603 node = self.node_id(identifier) 604 self.graph.nodes[node]['skip'] = (skip, as_failure)
Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error). Allows conditional control and testing over DAG's order of operations.
606 def critical_path(self, nodes): 607 """ 608 Given a set of nodes, returns a subset of the DAG containing 609 only the node(s) and its parents, or upstream dependencies. 610 """ 611 if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str): 612 node_ids = {self.node_id(n) for n in nodes} 613 else: 614 node_ids = {self.node_id(nodes)} 615 return self._subgraph(node_ids)
Given a set of nodes, returns a subset of the DAG containing only the node(s) and its parents, or upstream dependencies.
617 def execute(self, return_ref=False, print_exc=False): 618 """ 619 Basic wrapper for execution of the DAG's underlying callables. 620 """ 621 executor = self.DAGExecutor(self.graph, self.root, print_exc) 622 return executor if return_ref else None
Basic wrapper for execution of the DAG's underlying callables.
624 def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] = None): 625 """ 626 Basic wrapper to visualize DAG using Vis.js and NetGraph libraries. 627 By default, visualization library only loaded in after DAG.visualize() is called, halving import times. 628 """ 629 import webber.viz as _viz 630 631 match type: 632 case 'browser': 633 _viz.visualize_browser(self.graph) 634 635 case 'plt': 636 return _viz.visualize_plt(self.graph) 637 638 case 'gui': 639 # _visualize_gui(self.graph) 640 raise NotImplementedError 641 642 case None: 643 if _viz._in_notebook(): 644 return _viz.visualize_plt(self.graph) 645 else: 646 _viz.visualize_browser(self.graph) 647 648 case _: 649 err_msg = "Unknown visualization type requested." 650 raise NotImplementedError(err_msg)
Basic wrapper to visualize DAG using Vis.js and NetGraph libraries. By default, visualization library only loaded in after DAG.visualize() is called, halving import times.
652 @property 653 def root(self) -> list[str]: 654 """ 655 Return list of nodes with no dependencies. 656 Root nodes will occur first in DAG's order of operations. 657 """ 658 return list(filter( 659 lambda node: len(list(self.graph.predecessors(node))) < 1, 660 self.graph.nodes.keys() 661 ))
Return list of nodes with no dependencies. Root nodes will occur first in DAG's order of operations.
667 def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str: 668 """ 669 Validate whether identifier given is a valid node within the DAG's scope. 670 Primarily for internal use, but useful for retrieving string identifiers 671 for a unique callable in a DAG. 672 """ 673 node_names, node_callables = zip(*self.graph.nodes(data='callable')) 674 675 if isinstance(identifier, str): 676 if identifier not in node_names: 677 err_msg = f"Node {identifier} is not defined in this DAG's scope." 678 raise ValueError(err_msg) 679 node = identifier 680 elif _iscallable(identifier): 681 match node_callables.count(identifier): 682 case 0: 683 err_msg = f"Callable {identifier} is not defined in this DAG's scope." 684 raise ValueError(err_msg) 685 case 1: 686 node = node_names[ node_callables.index(identifier) ] 687 case _: 688 err_msg = f"Callable {identifier.__name__} " \ 689 + "exists more than once in this DAG. " \ 690 + "Use the unique identifier of the required node." 691 raise ValueError(err_msg) 692 else: 693 err_msg = f"Node {identifier} must be a string or a Python callable" 694 raise TypeError(err_msg) 695 return node
Validate whether identifier given is a valid node within the DAG's scope. Primarily for internal use, but useful for retrieving string identifiers for a unique callable in a DAG.
761 def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise: 762 """ 763 Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope. 764 Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope. 765 """ 766 try: 767 key = self.node_id(promise.key) 768 except Exception as e: 769 raise _xcoms.InvalidCallable(e) 770 return _xcoms.Promise(key)
Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope.
Raises webber.xcoms.InvalidCallable
if Promise requests a callable that is out of scope.
805 class DAGExecutor: 806 """ 807 Base class used to execute DAG in embarrassingly parallel. 808 """ 809 def __init__(self, graph: _nx.DiGraph, roots: list, print_exc: bool = False) -> None: 810 811 with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _: 812 813 # Skip execution if there are no callables in scope. 814 if len(graph.nodes) == 0: 815 print('Given DAG has no callables in scope. Skipping execution...') 816 return 817 818 # Initialize local variables for execution. 819 complete, started, failed, skipped = set(), set(), set(), set() 820 events = set(roots) 821 refs: dict[str, _futures.Future] = {} 822 823 def raise_exc(message): 824 raise ValueError(message) 825 826 def run_conditions_met(n): 827 for p in graph.predecessors(n): 828 match graph.edges.get((p, n))['Condition']: 829 case Condition.Success: 830 if p not in complete: 831 return False 832 case Condition.Failure: 833 if p not in failed: 834 return False 835 case Condition.AnyCase: 836 if p not in failed and p not in complete: 837 return False 838 return True 839 840 skip = graph.nodes.data("skip", default=(False, False)) 841 retry = {n: [c+1, {}] for n,c in graph.nodes.data("retry", default=0)} 842 843 # Start execution of root node functions. 844 with _futures.ThreadPoolExecutor() as executor: 845 846 def Submit(event, callable, name, args, kwargs): 847 if skip[event][0]: # NOTE: Internally, DAGExecutor tracks these skipped events as successes or failures. 848 retry[event][0] = 0 849 skip_callable = raise_exc if skip[event][1] else print 850 return executor.submit( 851 _event_wrapper, 852 _callable=skip_callable, 853 _name=graph.nodes[event]['name'], 854 _args=[f"Event {event} skipped..."], 855 _kwargs={} 856 ) 857 else: 858 retry[event][0] -= 1 859 if (retry[event][0] > 0) and (retry[event][1] == {}): 860 retry[event][1] = { 861 'callable': callable, 862 'name': name, 863 'args': args, 864 'kwargs': kwargs 865 } 866 return executor.submit( 867 _event_wrapper, 868 _callable=callable, 869 _name=name, 870 _args=args, 871 _kwargs=kwargs 872 ) 873 874 for event in events: 875 refs[event] = Submit( 876 event, 877 graph.nodes[event]['callable'], 878 graph.nodes[event]['name'], 879 graph.nodes[event]['args'], 880 graph.nodes[event]['kwargs'] 881 ) 882 started.add(event) 883 # Loop until all nodes in the network are executed. 884 while (len(complete) + len(failed) + len(skipped)) != len(graph): 885 for event in events: 886 if refs[event].done() is True: 887 if refs[event].exception(timeout=0) is not None: 888 try: 889 raise refs[event].exception(timeout=0) 890 except: 891 if print_exc: 892 _traceback.print_exc() 893 894 if retry[event][0] > 0: 895 print(f"Event {event} exited with exception, retrying...") 896 refs[event] = Submit( 897 event, 898 callable=retry[event][1]['callable'], 899 name=retry[event][1]['name'], 900 args=retry[event][1]['args'], 901 kwargs=retry[event][1]['kwargs'] 902 ) 903 continue 904 905 print(f"Event {event} exited with exception...") 906 failed.add(event) 907 skipping = [ 908 e[1] for e in set(graph.out_edges(event)) 909 if not _edges.continue_on_failure(graph.edges.get(e)) 910 ] 911 else: 912 complete.add(event) 913 skipping = [ 914 e[1] for e in set(graph.out_edges(event)) 915 if not _edges.continue_on_success(graph.edges.get(e)) 916 ] 917 skipped = skipped.union(skipping) 918 for n in skipping: 919 skipped = skipped.union(_nx.descendants(graph, n)) 920 carryon = set(graph.successors(event)).difference(skipped) 921 starting = [ 922 successor for successor in carryon if 923 run_conditions_met(successor) 924 ] 925 for successor in starting: 926 _args = [ 927 a if not isinstance(a, _xcoms.Promise) else refs[a.key].result() 928 for a in graph.nodes[successor]['args'] 929 ] 930 _kwargs = { 931 k: v if not isinstance(v, _xcoms.Promise) else refs[v.key].result() 932 for k, v in graph.nodes[successor]['kwargs'].items() 933 } 934 refs[successor] = Submit( 935 successor, 936 graph.nodes[successor]['callable'], 937 graph.nodes[successor]['name'], 938 _args, 939 _kwargs 940 ) 941 started.add(successor) 942 # Initialized nodes that are incomplete or not yet documented as complete. 943 events = started.difference(complete.union(failed).union(skipped))
Base class used to execute DAG in embarrassingly parallel.
809 def __init__(self, graph: _nx.DiGraph, roots: list, print_exc: bool = False) -> None: 810 811 with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _: 812 813 # Skip execution if there are no callables in scope. 814 if len(graph.nodes) == 0: 815 print('Given DAG has no callables in scope. Skipping execution...') 816 return 817 818 # Initialize local variables for execution. 819 complete, started, failed, skipped = set(), set(), set(), set() 820 events = set(roots) 821 refs: dict[str, _futures.Future] = {} 822 823 def raise_exc(message): 824 raise ValueError(message) 825 826 def run_conditions_met(n): 827 for p in graph.predecessors(n): 828 match graph.edges.get((p, n))['Condition']: 829 case Condition.Success: 830 if p not in complete: 831 return False 832 case Condition.Failure: 833 if p not in failed: 834 return False 835 case Condition.AnyCase: 836 if p not in failed and p not in complete: 837 return False 838 return True 839 840 skip = graph.nodes.data("skip", default=(False, False)) 841 retry = {n: [c+1, {}] for n,c in graph.nodes.data("retry", default=0)} 842 843 # Start execution of root node functions. 844 with _futures.ThreadPoolExecutor() as executor: 845 846 def Submit(event, callable, name, args, kwargs): 847 if skip[event][0]: # NOTE: Internally, DAGExecutor tracks these skipped events as successes or failures. 848 retry[event][0] = 0 849 skip_callable = raise_exc if skip[event][1] else print 850 return executor.submit( 851 _event_wrapper, 852 _callable=skip_callable, 853 _name=graph.nodes[event]['name'], 854 _args=[f"Event {event} skipped..."], 855 _kwargs={} 856 ) 857 else: 858 retry[event][0] -= 1 859 if (retry[event][0] > 0) and (retry[event][1] == {}): 860 retry[event][1] = { 861 'callable': callable, 862 'name': name, 863 'args': args, 864 'kwargs': kwargs 865 } 866 return executor.submit( 867 _event_wrapper, 868 _callable=callable, 869 _name=name, 870 _args=args, 871 _kwargs=kwargs 872 ) 873 874 for event in events: 875 refs[event] = Submit( 876 event, 877 graph.nodes[event]['callable'], 878 graph.nodes[event]['name'], 879 graph.nodes[event]['args'], 880 graph.nodes[event]['kwargs'] 881 ) 882 started.add(event) 883 # Loop until all nodes in the network are executed. 884 while (len(complete) + len(failed) + len(skipped)) != len(graph): 885 for event in events: 886 if refs[event].done() is True: 887 if refs[event].exception(timeout=0) is not None: 888 try: 889 raise refs[event].exception(timeout=0) 890 except: 891 if print_exc: 892 _traceback.print_exc() 893 894 if retry[event][0] > 0: 895 print(f"Event {event} exited with exception, retrying...") 896 refs[event] = Submit( 897 event, 898 callable=retry[event][1]['callable'], 899 name=retry[event][1]['name'], 900 args=retry[event][1]['args'], 901 kwargs=retry[event][1]['kwargs'] 902 ) 903 continue 904 905 print(f"Event {event} exited with exception...") 906 failed.add(event) 907 skipping = [ 908 e[1] for e in set(graph.out_edges(event)) 909 if not _edges.continue_on_failure(graph.edges.get(e)) 910 ] 911 else: 912 complete.add(event) 913 skipping = [ 914 e[1] for e in set(graph.out_edges(event)) 915 if not _edges.continue_on_success(graph.edges.get(e)) 916 ] 917 skipped = skipped.union(skipping) 918 for n in skipping: 919 skipped = skipped.union(_nx.descendants(graph, n)) 920 carryon = set(graph.successors(event)).difference(skipped) 921 starting = [ 922 successor for successor in carryon if 923 run_conditions_met(successor) 924 ] 925 for successor in starting: 926 _args = [ 927 a if not isinstance(a, _xcoms.Promise) else refs[a.key].result() 928 for a in graph.nodes[successor]['args'] 929 ] 930 _kwargs = { 931 k: v if not isinstance(v, _xcoms.Promise) else refs[v.key].result() 932 for k, v in graph.nodes[successor]['kwargs'].items() 933 } 934 refs[successor] = Submit( 935 successor, 936 graph.nodes[successor]['callable'], 937 graph.nodes[successor]['name'], 938 _args, 939 _kwargs 940 ) 941 started.add(successor) 942 # Initialized nodes that are incomplete or not yet documented as complete. 943 events = started.difference(complete.union(failed).union(skipped))
12class Condition(_enum.IntEnum): 13 """Represents edge condition for a node execution, based on outcome(s) of predecessor(s).""" 14 Success = 0 15 Failure = 1 16 AnyCase = 3
Represents edge condition for a node execution, based on outcome(s) of predecessor(s).
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- is_integer
- real
- imag
- numerator
- denominator
945class QueueDAG(DAG): 946 """ 947 #### Experimental, as of v0.2. #### 948 949 Directed Acyclic Graph used to queue and execute Pythonic callables in parallel, 950 while stringing the outputs of those callables in linear sequences. 951 952 Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the 953 behavior of root nodes to determine if and/or when the DAG run has been completed. 954 955 Root nodes will be re-executed until culled by one of two conditions: 956 1. A max number of iterations has been completed, or 957 2. The output of the root node's callable matches a lambda halt_condition. 958 959 Both conditions can be set at run-time. 960 961 As of v0.2, QueueDAG experiences extreme latency when nested inside of a standard webber.DAG class. 962 """ 963 964 conditions = {} 965 966 def __init__(self): 967 super().__init__() 968 969 def add_node(self, node, *args, **kwargs): 970 """ 971 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 972 On success, return unique identifier for the new node. 973 974 Reserved key-words are used for Queue DAG definitions: 975 976 - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables. 977 978 \t halt_condition = (lambda output: output == None) 979 980 - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument. 981 982 - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop. 983 """ 984 halt_condition = kwargs.pop('halt_condition', None) 985 iterator: int = kwargs.pop('iterator', None) 986 max_iter: int = kwargs.pop('max_iter', None) 987 988 return_val = super().add_node(node, *args, **kwargs) 989 990 if max_iter != None: 991 iter_limit = int(max_iter) 992 elif iterator != None: 993 iter_limit = int(iterator) 994 else: 995 iter_limit = None 996 997 self.conditions[return_val] = { 998 'halt_condition': halt_condition, 999 'iter_limit': iter_limit 1000 } 1001 1002 return return_val 1003 1004 def add_edge(self, u_of_edge, v_of_edge, continue_on = Condition.Success): 1005 """ 1006 Adds an edge between nodes in the Queue DAG's underlying graph. 1007 Queue DAG nodes may have a maximum of one child and one parent worker. 1008 """ 1009 for node in (u_of_edge, v_of_edge): 1010 try: 1011 node_id = self.node_id(node) 1012 except: 1013 continue 1014 1015 filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id) 1016 try: 1017 assert(len(self.filter_edges(filter)) == 0) 1018 except Exception as e: 1019 e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.") 1020 raise e 1021 1022 return super().add_edge(u_of_edge, v_of_edge, continue_on) 1023 1024 def execute(self, *promises, return_ref=False, print_exc=False): 1025 """ 1026 Basic wrapper for execution of the DAG's underlying callables. 1027 """ 1028 queues = {} 1029 processes = {} 1030 join = set() 1031 1032 promises: dict = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {} 1033 1034 with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _: 1035 1036 # Skip execution if there are no callables in scope. 1037 if len(self.graph.nodes) == 0: 1038 print('Given DAG has no callables in scope. Skipping execution...') 1039 return 1040 1041 with _futures.ThreadPoolExecutor() as executor: 1042 1043 for id in self.root: 1044 node = self.get_node(id) 1045 queues[id] = _q.LifoQueue() 1046 node.update({ 1047 'callable': _queue._worker, 1048 'args': tuple(), 1049 'kwargs': { 1050 'work': node.callable, 1051 'args': node.args, 'kwargs': node.kwargs, 1052 'promises': promises, 1053 'print_exc': print_exc, 1054 'halt_condition': self.conditions[id]['halt_condition'], 1055 'iter_limit': self.conditions[id]['iter_limit'], 1056 'out_queue': queues.get(id) 1057 } 1058 }) 1059 processes[id] = executor.submit( 1060 _event_wrapper, 1061 _callable=node['callable'], 1062 _name=node['name'], 1063 _args=node['args'], 1064 _kwargs=node['kwargs'] 1065 ) 1066 1067 for parent_id, id in self.graph.edges: 1068 node = self.get_node(id) 1069 queues[id] = _q.LifoQueue() 1070 if len(list(self.graph.successors(id))) == 0: 1071 end_proc = id 1072 node.update({ 1073 'callable': _queue._worker, 1074 'args': tuple(), 1075 'kwargs': { 1076 'work': node.callable, 1077 'args': node.args, 'kwargs': node.kwargs, 1078 'promises': promises, 1079 'print_exc': print_exc, 1080 'parent_id': parent_id, 1081 'parent_process': processes[parent_id], 1082 'in_queue': queues.get(parent_id), 1083 'out_queue': queues.get(id) 1084 } 1085 }) 1086 processes[id] = executor.submit( 1087 _event_wrapper, 1088 _callable=node['callable'], 1089 _name=node['name'], 1090 _args=node['args'], 1091 _kwargs=node['kwargs'] 1092 ) 1093 1094 1095 while len(join) != len(self.graph.nodes): 1096 for node in self.graph.nodes: 1097 if processes[node].done(): 1098 join.add(node) 1099 1100 return_val = [] 1101 while not queues[end_proc].empty(): 1102 return_val.append(queues[end_proc].get()) 1103 1104 return return_val
Experimental, as of v0.2.
Directed Acyclic Graph used to queue and execute Pythonic callables in parallel, while stringing the outputs of those callables in linear sequences.
Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the behavior of root nodes to determine if and/or when the DAG run has been completed.
Root nodes will be re-executed until culled by one of two conditions:
- A max number of iterations has been completed, or
- The output of the root node's callable matches a lambda halt_condition.
Both conditions can be set at run-time.
As of v0.2, QueueDAG experiences extreme latency when nested inside of a standard webber.DAG class.
969 def add_node(self, node, *args, **kwargs): 970 """ 971 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 972 On success, return unique identifier for the new node. 973 974 Reserved key-words are used for Queue DAG definitions: 975 976 - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables. 977 978 \t halt_condition = (lambda output: output == None) 979 980 - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument. 981 982 - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop. 983 """ 984 halt_condition = kwargs.pop('halt_condition', None) 985 iterator: int = kwargs.pop('iterator', None) 986 max_iter: int = kwargs.pop('max_iter', None) 987 988 return_val = super().add_node(node, *args, **kwargs) 989 990 if max_iter != None: 991 iter_limit = int(max_iter) 992 elif iterator != None: 993 iter_limit = int(iterator) 994 else: 995 iter_limit = None 996 997 self.conditions[return_val] = { 998 'halt_condition': halt_condition, 999 'iter_limit': iter_limit 1000 } 1001 1002 return return_val
Adds a callable with positional and keyword arguments to the DAG's underlying graph. On success, return unique identifier for the new node.
Reserved key-words are used for Queue DAG definitions:
halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables.
halt_condition = (lambda output: output == None)
iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument.
max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop.
1004 def add_edge(self, u_of_edge, v_of_edge, continue_on = Condition.Success): 1005 """ 1006 Adds an edge between nodes in the Queue DAG's underlying graph. 1007 Queue DAG nodes may have a maximum of one child and one parent worker. 1008 """ 1009 for node in (u_of_edge, v_of_edge): 1010 try: 1011 node_id = self.node_id(node) 1012 except: 1013 continue 1014 1015 filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id) 1016 try: 1017 assert(len(self.filter_edges(filter)) == 0) 1018 except Exception as e: 1019 e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.") 1020 raise e 1021 1022 return super().add_edge(u_of_edge, v_of_edge, continue_on)
Adds an edge between nodes in the Queue DAG's underlying graph. Queue DAG nodes may have a maximum of one child and one parent worker.
1024 def execute(self, *promises, return_ref=False, print_exc=False): 1025 """ 1026 Basic wrapper for execution of the DAG's underlying callables. 1027 """ 1028 queues = {} 1029 processes = {} 1030 join = set() 1031 1032 promises: dict = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {} 1033 1034 with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _: 1035 1036 # Skip execution if there are no callables in scope. 1037 if len(self.graph.nodes) == 0: 1038 print('Given DAG has no callables in scope. Skipping execution...') 1039 return 1040 1041 with _futures.ThreadPoolExecutor() as executor: 1042 1043 for id in self.root: 1044 node = self.get_node(id) 1045 queues[id] = _q.LifoQueue() 1046 node.update({ 1047 'callable': _queue._worker, 1048 'args': tuple(), 1049 'kwargs': { 1050 'work': node.callable, 1051 'args': node.args, 'kwargs': node.kwargs, 1052 'promises': promises, 1053 'print_exc': print_exc, 1054 'halt_condition': self.conditions[id]['halt_condition'], 1055 'iter_limit': self.conditions[id]['iter_limit'], 1056 'out_queue': queues.get(id) 1057 } 1058 }) 1059 processes[id] = executor.submit( 1060 _event_wrapper, 1061 _callable=node['callable'], 1062 _name=node['name'], 1063 _args=node['args'], 1064 _kwargs=node['kwargs'] 1065 ) 1066 1067 for parent_id, id in self.graph.edges: 1068 node = self.get_node(id) 1069 queues[id] = _q.LifoQueue() 1070 if len(list(self.graph.successors(id))) == 0: 1071 end_proc = id 1072 node.update({ 1073 'callable': _queue._worker, 1074 'args': tuple(), 1075 'kwargs': { 1076 'work': node.callable, 1077 'args': node.args, 'kwargs': node.kwargs, 1078 'promises': promises, 1079 'print_exc': print_exc, 1080 'parent_id': parent_id, 1081 'parent_process': processes[parent_id], 1082 'in_queue': queues.get(parent_id), 1083 'out_queue': queues.get(id) 1084 } 1085 }) 1086 processes[id] = executor.submit( 1087 _event_wrapper, 1088 _callable=node['callable'], 1089 _name=node['name'], 1090 _args=node['args'], 1091 _kwargs=node['kwargs'] 1092 ) 1093 1094 1095 while len(join) != len(self.graph.nodes): 1096 for node in self.graph.nodes: 1097 if processes[node].done(): 1098 join.add(node) 1099 1100 return_val = [] 1101 while not queues[end_proc].empty(): 1102 return_val.append(queues[end_proc].get()) 1103 1104 return return_val
Basic wrapper for execution of the DAG's underlying callables.