webber.core
Base module for abstract multiprocessing system - a directed acyclic graph.
1""" 2Base module for abstract multiprocessing system - a directed acyclic graph. 3""" 4import sys as _sys 5import typing as _T 6import types as _types 7import traceback as _traceback 8import collections.abc as _abc 9import concurrent.futures as _futures 10import queue as _q 11import itertools as _it 12import threading as _threading 13from datetime import datetime as _datetime 14 15import networkx as _nx 16import webber.edges as _edges 17import webber.xcoms as _xcoms 18import webber.queue as _queue 19 20from webber.edges import Condition, dotdict, edgedict 21 22__all__ = ["DAG", "Condition", "QueueDAG"] 23 24def _iscallable(function: _T.Any) -> bool: 25 return callable(function) 26 27 28class _OutputLogger: 29 """ 30 Thread-safe logger that uses a queue to serialize stdout writes. 31 All threads put messages into a queue, a single consumer writes to stdout. 32 This avoids race conditions from concurrent redirect_stdout calls. 33 """ 34 _instance: _T.Optional['_OutputLogger'] = None 35 _lock = _threading.Lock() 36 37 def __new__(cls, *args: _T.Any, **kwargs: _T.Any) -> '_OutputLogger': 38 """Singleton pattern - one logger for all threads.""" 39 if cls._instance is None: 40 with cls._lock: 41 if cls._instance is None: 42 cls._instance = super().__new__(cls) 43 cls._instance._initialized = False 44 return cls._instance 45 46 def __init__(self, file: _T.TextIO = _sys.stdout) -> None: 47 if self._initialized: 48 return 49 50 self._queue: _q.Queue[_T.Optional[str]] = _q.Queue() 51 self._file = file 52 self._running = True 53 54 # Start consumer thread 55 self._consumer = _threading.Thread(target=self._consume, daemon=True) 56 self._consumer.start() 57 self._initialized = True 58 59 def _consume(self) -> None: 60 """Consumer thread - reads from queue and writes to stdout.""" 61 while self._running or not self._queue.empty(): 62 try: 63 record = self._queue.get(timeout=0.1) 64 if record is None: # Shutdown signal 65 self._queue.task_done() 66 break 67 self._file.write(record + "\n") 68 self._file.flush() 69 self._queue.task_done() 70 except _q.Empty: 71 continue 72 73 def log(self, name: str, message: str) -> None: 74 """Thread-safe log - puts message on queue.""" 75 if message and not message.isspace(): 76 timestamp = _datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f")[:-3] 77 formatted = f"{timestamp} {name:>15}: {message}" 78 self._queue.put(formatted) 79 80 def shutdown(self) -> None: 81 """Wait for queue to drain and stop consumer.""" 82 self._queue.join() 83 self._running = False 84 self._queue.put(None) 85 self._consumer.join(timeout=1.0) 86 87 @classmethod 88 def reset(cls) -> None: 89 """Reset singleton instance (useful for testing).""" 90 with cls._lock: 91 if cls._instance is not None: 92 cls._instance.shutdown() 93 cls._instance = None 94 95 96class _PrefixedStdout: 97 """ 98 Stdout wrapper that prefixes each line with timestamp and task name. 99 Thread-safe via thread-local task name storage. 100 """ 101 _local = _threading.local() 102 103 def __init__(self, original_stdout: _T.TextIO) -> None: 104 self._original = original_stdout 105 self._lock = _threading.Lock() 106 107 @classmethod 108 def set_task_name(cls, name: str | None) -> None: 109 """Set the task name for the current thread.""" 110 cls._local.task_name = name 111 112 @classmethod 113 def get_task_name(cls) -> str | None: 114 """Get the task name for the current thread.""" 115 return getattr(cls._local, 'task_name', None) 116 117 def write(self, msg: str) -> int: 118 """Write with timestamp/task prefix if task is set.""" 119 task_name = self.get_task_name() 120 if task_name: 121 # When in a task context, suppress standalone newlines/whitespace 122 # (print() sends content and newline as separate write calls) 123 if not msg or not msg.strip(): 124 return len(msg) if msg else 0 125 timestamp = _datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f")[:-3] 126 # Handle multi-line messages 127 lines = [line for line in msg.rstrip('\n').split('\n') if line.strip()] 128 if lines: 129 formatted_lines = [f"{timestamp} {task_name:>15}: {line}" for line in lines] 130 formatted = '\n'.join(formatted_lines) + '\n' 131 with self._lock: 132 return self._original.write(formatted) 133 return len(msg) 134 else: 135 with self._lock: 136 return self._original.write(msg) 137 138 def flush(self) -> None: 139 self._original.flush() 140 141 def __getattr__(self, name: str) -> _T.Any: 142 return getattr(self._original, name) 143 144 145# Install prefixed stdout globally (once) 146_original_stdout = _sys.stdout 147_prefixed_stdout = _PrefixedStdout(_original_stdout) 148 149 150class _TaskLogger: 151 """ 152 Per-task logger context manager that sets up stdout prefixing. 153 Used as a context manager for each task execution. 154 """ 155 def __init__(self, task_name: str) -> None: 156 self._name = task_name 157 self._prev_name: str | None = None 158 159 def __enter__(self) -> '_TaskLogger': 160 self._prev_name = _PrefixedStdout.get_task_name() 161 _PrefixedStdout.set_task_name(self._name) 162 _sys.stdout = _prefixed_stdout 163 return self 164 165 def __exit__( 166 self, 167 exc_type: type[BaseException] | None, 168 exc_value: BaseException | None, 169 traceback: _types.TracebackType | None 170 ) -> bool | None: 171 _PrefixedStdout.set_task_name(self._prev_name) 172 if self._prev_name is None: 173 _sys.stdout = _original_stdout 174 return None 175 176 177def _event_wrapper( 178 _callable: _T.Callable, 179 _name: str, 180 _args: _T.Union[tuple, list], 181 _kwargs: _T.Dict[str, _T.Any], 182 _verbose: bool = True 183) -> _T.Any: 184 """Wrapper used by Webber DAGs to log and execute Python callables as a unit of work.""" 185 if _verbose: 186 with _TaskLogger(_name): 187 # print(f"Starting {_name}", flush=True) 188 result = _callable(*_args, **_kwargs) 189 # print(f"Completed {_name}", flush=True) 190 return result 191 else: 192 return _callable(*_args, **_kwargs) 193 194class DAG: 195 """ 196 Directed Acyclic Graph used to represent Pythonic tasks in parallel. 197 """ 198 graph: _nx.DiGraph 199 _callable_to_id: _T.Dict[_T.Callable, _T.Optional[str]] # O(1) callable->node_id lookup cache (None = duplicates) 200 201 def _callable_status(self, c: _T.Callable) -> _T.Literal['none', 'one', 'many']: 202 """Returns 'none' if not in DAG, 'one' if unique, 'many' if duplicated. 203 O(1) lookup using _callable_to_id cache.""" 204 if c not in self._callable_to_id: 205 return 'none' 206 return 'one' if self._callable_to_id[c] is not None else 'many' 207 208 def _assign_node(self, n: _T.Any, new_callables: _T.Dict[_T.Callable, _T.Callable]) -> str: 209 """Helper to resolve node to its ID, adding to new_callables dict if needed.""" 210 if not _iscallable(n): 211 return n 212 status = self._callable_status(n) 213 if status == 'many': 214 err_msg = f"Callable {n.__name__} " \ 215 + "exists more than once in this DAG. " \ 216 + "Use the unique string identifier of the required node." 217 raise ValueError(err_msg) 218 if status == 'one': 219 return self._callable_to_id[n] # type: ignore[return-value] 220 new_callables[n] = n 221 return _edges.label_node(n) 222 223 def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str: 224 """ 225 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 226 On success, return unique identifier for the new node. 227 """ 228 if not _iscallable(node): 229 err_msg = f"{node}: requested node is not a callable Python function." 230 raise TypeError(err_msg) 231 232 node_name = _edges.label_node(node) 233 234 args = tuple( 235 arg if not isinstance(arg, _xcoms.Promise) else self.resolve_promise(arg) 236 for arg in args 237 ) 238 239 for k, val in kwargs.items(): 240 if isinstance(val, _xcoms.Promise): 241 kwargs[k] = self.resolve_promise(val) 242 243 self.graph.add_node( 244 node_for_adding=node_name, 245 callable=node, args=args, kwargs=kwargs, 246 name=node.__name__, 247 id=node_name 248 ) 249 250 # Populate callable->id cache for O(1) lookup 251 # If callable already exists, mark as None to indicate duplicates 252 if node in self._callable_to_id: 253 self._callable_to_id[node] = None # Multiple occurrences - must use string ID 254 else: 255 self._callable_to_id[node] = node_name 256 257 return node_name 258 259 260 def add_edge( 261 self, 262 u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable], 263 continue_on: Condition = Condition.Success 264 ) -> _T.Tuple[str,str]: 265 """ 266 Adds an edge between nodes in the DAG's underlying graph, 267 so long as the requested edge is unique and has not been added previously. 268 269 On success, returns Tuple of the new edge's unique identifiers. 270 """ 271 # Validate inputs prior to execution 272 # - Nodes must be identifiers or callables 273 # - Conditions must belong to the webber.edges.Condition class 274 if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)): 275 err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable" 276 raise TypeError(err_msg) 277 if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)): 278 err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable" 279 raise TypeError(err_msg) 280 if not isinstance(continue_on, Condition): 281 raise TypeError("Edge conditions must use the webber.edges.Condition class.") 282 283 # Base Case 0: No nodes are present in the DAG: 284 # Ensure that both nodes are callables, then add both to the graph and 285 # assign the outgoing node as a root. 286 if len(self.graph.nodes()) == 0: 287 if not _iscallable(u_of_edge): 288 err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope." 289 raise ValueError(err_msg) 290 if not _iscallable(v_of_edge): 291 err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope." 292 raise ValueError(err_msg) 293 outgoing_node = self.add_node(u_of_edge) 294 incoming_node = self.add_node(v_of_edge) 295 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 296 return (outgoing_node, incoming_node) 297 298 new_callables: _T.Dict[_T.Callable, _T.Callable] = {} 299 300 if _iscallable(u_of_edge) and _iscallable(v_of_edge): 301 # Type narrowing: we've verified both are callables 302 u_callable = _T.cast(_T.Callable, u_of_edge) 303 v_callable = _T.cast(_T.Callable, v_of_edge) 304 305 # Error Cases 0, 1: Either of the callables appear more than once in the DAG. 306 if self._callable_status(u_callable) == 'many': 307 err_msg = f"Callable {u_callable.__name__} " \ 308 + "exists more than once in this DAG. " \ 309 + "Use the unique string identifier of the required node." 310 raise ValueError(err_msg) 311 if self._callable_status(v_callable) == 'many': 312 err_msg = f"Callable {v_callable.__name__} " \ 313 + "exists more than once in this DAG. " \ 314 + "Use the unique string identifier of the required node." 315 raise ValueError(err_msg) 316 317 # Base Case 1: Both args are callables and will be present in the DAG scope no more than once. 318 # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes. 319 cached_u = self._callable_to_id.get(u_callable) 320 if cached_u is not None: 321 outgoing_node = cached_u 322 else: 323 new_callables[u_callable] = u_callable 324 outgoing_node = _edges.label_node(u_callable) 325 326 cached_v = self._callable_to_id.get(v_callable) 327 if cached_v is not None: 328 incoming_node = cached_v 329 else: 330 new_callables[v_callable] = v_callable 331 incoming_node = _edges.label_node(v_callable) 332 333 else: 334 335 # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope. 336 if isinstance(u_of_edge, str) and u_of_edge not in self.graph.nodes: 337 err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope." 338 raise ValueError(err_msg) 339 if isinstance(v_of_edge, str) and v_of_edge not in self.graph.nodes: 340 err_msg = f"Incoming node {v_of_edge} not in DAG's current scope." 341 raise ValueError(err_msg) 342 343 # Both nodes' unique identifiers are present in the DAG 344 # and should be evaluated for a valid edge. 345 if isinstance(u_of_edge, str) and isinstance(v_of_edge, str): 346 outgoing_node = u_of_edge 347 incoming_node = v_of_edge 348 349 # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier. 350 else: 351 for node in (u_of_edge, v_of_edge): 352 if node == u_of_edge: 353 outgoing_node = self._assign_node(node, new_callables) 354 else: 355 incoming_node = self._assign_node(node, new_callables) 356 357 # Error Case 5: Both callables exist only once in the DAG, 358 # but an edge already exists between them. O(1) lookup with has_edge(). 359 if self.graph.has_edge(outgoing_node, incoming_node): 360 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \ 361 + "a definition in this DAG." 362 raise ValueError(err_msg) 363 364 # Error Case 6: Adding an edge would create circular dependencies. 365 # Use has_path() for O(V+E) incremental check instead of rebuilding entire graph. 366 if incoming_node in self.graph and _nx.has_path(self.graph, incoming_node, outgoing_node): 367 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \ 368 + "results in circular dependencies." 369 raise ValueError(err_msg) 370 371 # We can now add the edge to the DAG, since we are certain it will not result in 372 # illegal dependencies/behavior. 373 # First, we should account for potential new nodes. This also handles 374 # duplicates on first entry to the DAG (e.g.: edge == (print, print)) 375 if _iscallable(u_of_edge) and new_callables.get(_T.cast(_T.Callable, u_of_edge)) is not None: 376 outgoing_node = self.add_node(new_callables[_T.cast(_T.Callable, u_of_edge)]) 377 if _iscallable(v_of_edge) and new_callables.get(_T.cast(_T.Callable, v_of_edge)) is not None: 378 incoming_node = self.add_node(new_callables[_T.cast(_T.Callable, v_of_edge)]) 379 380 # Then we can add the new edge. 381 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 382 return (outgoing_node, incoming_node) 383 384 def remove_node(self, *posargs, **kwargs) -> None: 385 """ 386 Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG. 387 Throws error message and recommends safer methods. 388 """ 389 raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.") 390 391 def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]: 392 """ 393 Removes an directed edge between nodes in the DAG's underlying graph. 394 Throws error if the edge does not exist. 395 On success, returns Tuple of the removed edge's unique identifiers. 396 """ 397 edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge)) 398 if edge_id not in self.graph.edges(data = False): 399 err_msg = "Requested edge does not exist in the DAG's scope" 400 raise ValueError(err_msg) 401 self.graph.remove_edge(edge_id[0], edge_id[1]) 402 return edge_id 403 404 def update_edges(self, *E: _T.Any, continue_on: Condition | None = None, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | edgedict | None = None) -> None: 405 """ 406 Flexible function to update properties of edges in the DAG's scope, 407 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 408 lambda filter using the edgedict syntax. 409 410 List of nodes to update or filter argument is expected. Valid edge lists include: 411 412 \t update_edges((node1, node2), ...) or update_edges([(node1, node2)], ...) 413 414 \t update_edges((node1, callable2), ...) or update_edges([(node1, callable2)], ...) 415 416 \t update_edges(edgedict), where isinstance(edgedict, webber.edges.edgedict) == True 417 418 Parameters: 419 420 > continue_on: webber.edges.Condition value to update matching edges with, 421 422 \t Execute child on success, failure, or any exit state, of the parent node 423 424 > filter: lambda property that can be used instead of a list of edges to be updated. 425 426 \t filter = (lambda e: n.parent == print or e.child == print) 427 428 > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set. 429 430 \t data = { 'continue_on': webber.edges.Condition } 431 432 """ 433 if len(E) == 0 and filter == None: 434 raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.") 435 436 elif isinstance(E, dict) or isinstance(E, edgedict): 437 E = [E] 438 439 elif len(E) == 1 and isinstance(E[0], _abc.Iterable): 440 try: 441 _ = self.get_edges(E[0]) 442 except: 443 E = E[0] 444 445 if filter is not None: 446 edge_ids = self.filter_edges(filter, data = False) 447 else: 448 if isinstance(E[0], dict) or isinstance(E[0], edgedict): 449 try: 450 ids = [e['id'] for e in E] 451 except KeyError: 452 err_msg = 'In dictionary form, all given edges must be follow edgedict standards.' 453 raise ValueError(err_msg) 454 else: 455 ids = E 456 edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids] 457 458 std_update = (continue_on is None) 459 460 if std_update: 461 for edge_id, e in zip(edge_ids, E): 462 if data is not None: 463 self._update_edge(data, id = edge_id) 464 else: 465 self._update_edge(e, id = edge_id) 466 467 else: 468 if continue_on is not None: 469 if not isinstance(continue_on, Condition): 470 err_msg = f"Condition assignment must use webber.edges.Condition" 471 raise TypeError(err_msg) 472 for e in edge_ids: 473 self.graph.edges[e]['Condition'] = continue_on 474 475 def _update_edge(self, edgedict: _T.Dict[str, _T.Any], id: _T.Tuple[str, str] | None = None, force: bool = False) -> None: 476 """ 477 Internal only. Update properties of an individual edge within a DAG's scope, 478 given a well-structured dictionary and the tuple identifier of the network edge. 479 Force argument bypasses validation, and should only be used internally. 480 """ 481 if id is not None: 482 try: 483 if edgedict.get('id') and id != edgedict['id']: 484 raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {edgedict['id']}") 485 except ValueError as e: 486 if not force: 487 raise e 488 edgedict['id'] = id 489 490 expected_keys = ('parent', 'child', 'id', 'continue_on') 491 if not set(expected_keys).issuperset(set(edgedict.keys())): 492 raise ValueError(f"Expecting keys: {expected_keys}") 493 494 if not force: 495 496 e1, e2 = None, None 497 498 if edgedict.get('id'): 499 e1 = self.get_edge(edgedict['id'], data = False) 500 501 if edgedict.get('parent') and edgedict.get('child'): 502 e2 = self.get_edge(edgedict['parent'], edgedict['child'], data = False) 503 else: 504 e2 = e1 505 506 if e1 != e2: 507 raise ValueError('Edge vertices should not be changed using update functions.') 508 509 elif e1 == None: 510 raise ValueError('Requested edge was not given an identifier.') 511 512 if edgedict.get('continue_on') and not isinstance(edgedict['continue_on'], Condition): 513 err_msg = f"Condition assignment must use webber.edges.Condition" 514 raise TypeError(err_msg) 515 516 edge_id = edgedict.pop('id') 517 edge = {k: v for k,v in edgedict.items() if k not in ('parent', 'child', 'id')} 518 self.graph.edges[(edge_id[0], edge_id[1])].update(edge) 519 520 def relabel_node(self, node: _T.Union[str, _T.Callable], label: str) -> str: 521 """ 522 Update the label, or name, given to a node in the DAG's scope, 523 given a Python string and a node identifier. 524 Well-structured wrapper for a common use-case of DAG.update_nodes. 525 """ 526 node_id = self.node_id(node) 527 if not isinstance(label, str) or len(label) == 0: 528 err_msg = "Node label must be a Python string with one or more characters." 529 raise ValueError(err_msg) 530 self.update_nodes(node_id, data = {'name': label}) 531 return label 532 533 def update_nodes(self, *N: _T.Any, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | dotdict | None = None, callable: _T.Callable | None = None, args: _T.Iterable[_T.Any] | None = None, kwargs: _T.Dict[str, _T.Any] | None = None) -> None: 534 """ 535 Flexible function to update properties of nodes in the DAG's scope, 536 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 537 lambda filter using the dotdict syntax. 538 539 List of nodes to update or filter argument is expected. Valid node lists include: 540 541 \t update_nodes(node_id, ...) or update_nodes([node_id], ...) 542 543 \t update_nodes(callable, ...) or update_nodes([callable], ...) 544 545 \t update_nodes(node1, node2, ...) or update_nodes([node1, node2], ...) 546 547 \t update_nodes(node1, callable2, ...) or update_nodes([node1, callable2], ...) 548 549 > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id) 550 551 Parameters: 552 553 > filter: lambda property that can be used instead of a list of nodes to be updated. 554 555 \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args) 556 557 > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set. 558 Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method. 559 560 \t data = { 561 \t 'callable': print, 562 \t 'args': ['Hello', 'World'], 563 \t 'kwargs': {'sep': ', '}, 564 \t 'name': 'custom_label', 565 \t 'id': 'unique-identifier' 566 \t} 567 568 > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List). 569 570 > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary. 571 572 """ 573 if len(N) == 0 and filter == None: 574 raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.") 575 576 elif len(N) > 0 and filter is not None: 577 raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.") 578 579 elif isinstance(N, dict) or isinstance(N, str): 580 N = [N] 581 582 elif len(N) == 1 and isinstance(N[0], _abc.Iterable): 583 if isinstance(N[0][0], dict): 584 N = N[0] 585 elif isinstance(N[0][0], str): 586 # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator). 587 if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 588 N = N[0] 589 590 591 if filter is not None: 592 node_ids = self.filter_nodes(filter, data = False) 593 else: 594 if isinstance(N[0], dict): 595 ids = [n['id'] for n in N] 596 else: 597 ids = N 598 node_ids = [self.node_id(i) for i in ids] 599 600 std_update = (callable == None) and (args == None) and (kwargs == None) 601 602 if std_update: 603 for node_id, n in zip(node_ids, N): 604 if data is not None: 605 self._update_node(data, id = node_id) 606 else: 607 self._update_node(n, id = node_id) 608 609 else: 610 if callable is not None: 611 if not _iscallable(callable): 612 err_msg = f"Requested node is not assigned a callable Python function." 613 raise TypeError(err_msg) 614 for node_id in node_ids: 615 self.graph.nodes[node_id]['callable'] = callable 616 self.graph.nodes[node_id]['name'] = callable.__name__ 617 618 if args is not None: 619 if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)): 620 err_msg = f"Requested node is not assigned a tuple of pos args." 621 raise TypeError(err_msg) 622 args = tuple(args) 623 for node_id in node_ids: 624 self.graph.nodes[node_id]['args'] = args 625 626 if kwargs is not None: 627 if not isinstance(kwargs, dict): 628 err_msg = f"Requested node is not assigned a dictionary of kw args." 629 raise TypeError(err_msg) 630 for node_id in node_ids: 631 self.graph.nodes[node_id]['kwargs'] = kwargs 632 633 def get_edges(self, *N: _T.Any, data: bool = True) -> _T.Union[_T.List[edgedict], _T.List[_T.Tuple[str, str]]]: 634 """ 635 Retrieval function for DAG edge data, based on tuple identifiers. 636 Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2'])) 637 """ 638 if len(N) == 0: 639 if data: 640 return [edgedict(u, v, **d) for u, v, d in self.graph.edges.data()] 641 return list(self.graph.edges.data(data=False)) 642 643 # elif len(N) == 1: 644 # if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple): 645 # N = N[0] 646 647 if len(N) != len(set(N)) or not all(isinstance(n, _abc.Iterable) and len(n) == 2 for n in N): 648 err_msg = 'All requested edges must be unique tuples of size 2.' 649 raise ValueError(err_msg) 650 651 edge_data = [self.get_edge(o, i, data=data) for (o, i) in N] 652 return edge_data 653 654 def get_edge(self, outgoing_node: _T.Union[str, _T.Callable], incoming_node: _T.Union[str, _T.Callable], data: bool = True) -> _T.Union[edgedict, _T.Tuple[str, str]]: 655 """ 656 Retrieval function for a single directed edge between nodes in a DAG's scope. 657 """ 658 id = (self.node_id(outgoing_node), self.node_id(incoming_node)) 659 if not data: 660 return id 661 edge_data = self.graph.get_edge_data(u = id[0], v = id[1]) 662 if not edge_data: 663 err_msg = f'No match found for the directed edge requested: {id}' 664 raise ValueError(err_msg) 665 assert edge_data is not None # Type narrowing for Pylance 666 return edgedict(*id, **edge_data) 667 668 def get_node(self, n: _T.Union[str, _T.Callable]) -> dotdict: 669 """ 670 Given a unique identifier, returns a dictionary of node metadata 671 for a single node in the DAG's scope. 672 """ 673 node_id = self.node_id(n) 674 return dotdict(self.graph.nodes[node_id]) 675 676 def get_nodes(self, *N: _T.Any) -> _T.List[dotdict]: 677 """ 678 Flexible function to retrieve DAG node data, based on node identifiers 679 (e.g.: string IDs or unique callables). 680 """ 681 if len(N) == 0: 682 node_data = list(self.graph.nodes.values()) 683 return [dotdict(d) for d in node_data] 684 685 elif len(N) == 1: 686 if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str): 687 N = N[0] 688 else: 689 node_id = self.node_id(N[0]) 690 node_data = [dotdict(self.graph.nodes[node_id])] 691 return node_data 692 693 if not len(N) == len(set(N)): 694 err_msg = 'All requested nodes must be unique identifiers.' 695 raise ValueError(err_msg) 696 697 node_ids = [self.node_id(n) for n in N] 698 node_data = [dotdict(self.graph.nodes[n]) for n in node_ids] 699 return node_data 700 701 def filter_nodes(self, filter: _types.LambdaType, data: bool = False) -> _T.List[str] | _T.List[dotdict]: 702 """ 703 Given a lambda function, filter nodes in a DAG's scope based on its attributes. 704 Current limitation: Filters must use node identifier strings when referencing nodes. 705 Use get_nodes for more flexible controls. 706 """ 707 if not data: 708 return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))] 709 return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))] 710 711 def filter_edges(self, filter: _types.LambdaType, data: bool = False) -> _T.List[_T.Tuple[str, str]] | _T.List[edgedict]: 712 """ 713 Given a lambda function, filter edges in a DAG's scope based on its attributes. 714 Current limitation: Filters must use node identifier strings when referencing nodes. 715 Use get_edges for more flexible controls. 716 """ 717 if not data: 718 return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))] 719 return [edgedict(*e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))] 720 721 def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int) -> None: 722 """ 723 Given a node identifier, set number of automatic retries in case of failure. 724 Re-attempts will begin as soon as possible. 725 """ 726 if not isinstance(count, int) or not count >= 0: 727 raise ValueError("Retry count must be a non-negative integer.") 728 node = self.node_id(identifier) 729 self.graph.nodes[node]['retry'] = count 730 731 def skip_node(self, identifier: _T.Union[str, _T.Callable], skip: bool = True, as_failure: bool = False) -> None: 732 """ 733 Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error). 734 Allows conditional control and testing over DAG's order of operations. 735 """ 736 if not isinstance(skip, bool): 737 raise ValueError("Skip argument must be a boolean value.") 738 node = self.node_id(identifier) 739 self.graph.nodes[node]['skip'] = (skip, as_failure) 740 741 def critical_path(self, nodes: _T.Union[str, _T.Callable, _T.Iterable[_T.Union[str, _T.Callable]]]) -> 'DAG': 742 """ 743 Given a set of nodes, returns a subset of the DAG containing 744 only the node(s) and its parents, or upstream dependencies. 745 """ 746 if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str): 747 node_ids = {self.node_id(n) for n in nodes} 748 else: 749 node_ids = {self.node_id(nodes)} 750 return self._subgraph(node_ids) 751 752 def execute( 753 self, 754 return_ref: bool = False, 755 print_exc: bool = False, 756 max_workers: _T.Optional[int] = None, 757 verbose: bool = True 758 ) -> _T.Optional['DAG.DAGExecutor']: 759 """ 760 Basic wrapper for execution of the DAG's underlying callables. 761 762 Args: 763 return_ref: If True, return the DAGExecutor instance. 764 print_exc: If True, print full exception tracebacks. 765 max_workers: Maximum number of worker threads. Defaults to None (auto). 766 verbose: If True, log task start/completion messages. Defaults to True. 767 """ 768 executor = self.DAGExecutor( 769 self.graph, self.root, print_exc, 770 max_workers=max_workers, verbose=verbose 771 ) 772 return executor if return_ref else None 773 774 def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] | None = None) -> _T.Any: 775 """ 776 Basic wrapper to visualize DAG using Vis.js and NetGraph libraries. 777 By default, visualization library only loaded in after DAG.visualize() is called, halving import times. 778 """ 779 import webber.viz as _viz 780 781 match type: 782 case 'browser': 783 _viz.visualize_browser(self.graph) 784 785 case 'plt': 786 return _viz.visualize_plt(self.graph) 787 788 case 'gui': 789 # _visualize_gui(self.graph) 790 raise NotImplementedError 791 792 case None: 793 if _viz._in_notebook(): 794 return _viz.visualize_plt(self.graph) 795 else: 796 _viz.visualize_browser(self.graph) 797 798 case _: 799 err_msg = "Unknown visualization type requested." 800 raise NotImplementedError(err_msg) 801 802 @property 803 def root(self) -> _T.List[str]: 804 """ 805 Return list of nodes with no dependencies. 806 Root nodes will occur first in DAG's order of operations. 807 Uses O(1) in_degree() instead of O(k) predecessors list creation. 808 """ 809 return [node for node in self.graph.nodes if self.graph.in_degree(node) == 0] 810 811 @property 812 def nodes(self) -> _T.Any: 813 """Returns the NodeView of the underlying NetworkX graph containing all DAG nodes.""" 814 return self.graph.nodes 815 816 def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str: 817 """ 818 Validate whether identifier given is a valid node within the DAG's scope. 819 Primarily for internal use, but useful for retrieving string identifiers 820 for a unique callable in a DAG. 821 Uses O(1) cache lookup for callables instead of O(n) linear search. 822 """ 823 if isinstance(identifier, str): 824 # O(1) lookup using graph.nodes dict 825 if identifier not in self.graph.nodes: 826 err_msg = f"Node {identifier} is not defined in this DAG's scope." 827 raise ValueError(err_msg) 828 return identifier 829 elif _iscallable(identifier): 830 # O(1) lookup using _callable_to_id cache 831 if identifier not in self._callable_to_id: 832 err_msg = f"Callable {identifier} is not defined in this DAG's scope." 833 raise ValueError(err_msg) 834 cached_id = self._callable_to_id[identifier] 835 if cached_id is None: 836 err_msg = f"Callable {identifier.__name__} " \ 837 + "exists more than once in this DAG. " \ 838 + "Use the unique string identifier of the required node." 839 raise ValueError(err_msg) 840 return cached_id 841 else: 842 err_msg = f"Node {identifier} must be a string or a Python callable" 843 raise TypeError(err_msg) 844 845 def _update_node(self, nodedict: _T.Dict[str, _T.Any], id: str | None = None, force: bool = False) -> None: 846 """ 847 Internal only. Update properties of single node within a DAG's scope, 848 given a well-structured dictionary and the tuple identifier of the network edge. 849 Force argument bypasses dictionary validation, and should only be used internally. 850 """ 851 if id is not None: 852 try: 853 if nodedict.get('id') is not None and id != nodedict['id']: 854 raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {nodedict['id']}") 855 except ValueError as e: 856 if not force: 857 raise e 858 nodedict['id'] = id 859 860 expected_keys = ('callable', 'args', 'kwargs', 'name', 'id') 861 if not set(expected_keys).issuperset(set(nodedict.keys())): 862 raise ValueError(f"Expecting keys: {expected_keys}") 863 864 if not force: 865 if nodedict.get('callable'): 866 if not _iscallable(nodedict['callable']): 867 err_msg = f"Requested node is not assigned a callable Python function." 868 raise TypeError(err_msg) 869 if not nodedict.get('name'): 870 nodedict['name'] = nodedict['callable'].__name__ 871 872 if nodedict.get('name') and (not isinstance(nodedict['name'], str) or len(nodedict['name']) == 0): 873 err_msg = f"Requested node name must be a non-null Python string, will default to callable when not set." 874 raise TypeError(err_msg) 875 876 if nodedict.get('args'): 877 if not (isinstance(nodedict['args'], _abc.Iterable) and not isinstance(nodedict['args'], str)): 878 err_msg = f"Requested node is not assigned a tuple of pos args." 879 raise TypeError(err_msg) 880 nodedict['args'] = tuple(nodedict['args']) 881 882 if nodedict.get('kwargs') and not isinstance(nodedict['kwargs'], dict): 883 err_msg = f"Requested node is not assigned a dictionary of kw args." 884 raise TypeError(err_msg) 885 886 node_id = nodedict.pop('id') 887 self.graph.nodes[node_id].update(nodedict) 888 889 # Reset node name if implicitly requested. 890 if not nodedict.get('name'): 891 self.graph.nodes[node_id]['name'] = self.graph.nodes[node_id]['callable'].__name__ 892 893 def _subgraph(self, node_ids: set[str]) -> 'DAG': 894 """ 895 Internal only. Given a set of nodes, returns a subset of the DAG containing 896 only the node(s) and upstream dependencies. 897 Uses nx.ancestors() for O(V+E) performance instead of manual traversal. 898 """ 899 all_nodes = set(node_ids) 900 for node in node_ids: 901 all_nodes.update(_nx.ancestors(self.graph, node)) 902 subgraph = self.graph.subgraph(all_nodes) 903 return DAG(subgraph, __force=True) 904 905 def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise: 906 """ 907 Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope. 908 Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope. 909 """ 910 try: 911 key = self.node_id(promise.key) 912 except Exception as e: 913 raise _xcoms.InvalidCallable(e) 914 return _xcoms.Promise(key) 915 916 def __init__(self, graph: _T.Union[_nx.DiGraph, _nx.Graph, None] = None, **kwargs: _T.Any) -> None: 917 918 if graph is None: 919 self.graph = _nx.DiGraph() 920 self._callable_to_id = {} 921 return 922 923 # Meant for internal use only, creating DAGs from subgraphs. 924 if kwargs.get('__force') == True: 925 self.graph = _T.cast(_nx.DiGraph, graph) 926 # Build cache from existing subgraph 927 self._callable_to_id = {} 928 for node_id, data in self.graph.nodes(data=True): 929 callable_fn = data.get('callable') 930 if callable_fn is not None: 931 if callable_fn in self._callable_to_id: 932 self._callable_to_id[callable_fn] = None # Duplicate 933 else: 934 self._callable_to_id[callable_fn] = node_id 935 return 936 937 _edges.validate_dag(graph) 938 # Type narrowing: validate_dag ensures graph is a DiGraph 939 assert isinstance(graph, _nx.DiGraph) 940 941 # Define framework specific logic as nested dictionaries. 942 for node in graph.nodes.keys(): 943 graph.nodes[node]['callable'] = node 944 graph.nodes[node]['name'] = node.__name__ 945 graph.nodes[node]['args'] = [] 946 graph.nodes[node]['kwargs'] = {} 947 948 for e in graph.edges: 949 condition = graph.edges[e].get('Condition') 950 if condition is not None: 951 if condition not in Condition: 952 raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.') 953 else: 954 graph.edges[e]['Condition'] = Condition.Success 955 956 graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node)) 957 for n in graph.nodes: 958 graph.nodes[n]['id'] = n 959 self.graph = _nx.DiGraph(graph) 960 961 # Build callable->id cache after relabeling 962 self._callable_to_id = {} 963 for node_id, data in self.graph.nodes(data=True): 964 callable_fn = data.get('callable') 965 if callable_fn is not None: 966 if callable_fn in self._callable_to_id: 967 self._callable_to_id[callable_fn] = None # Duplicate 968 else: 969 self._callable_to_id[callable_fn] = node_id 970 971 class DAGExecutor: 972 """ 973 Base class used to execute DAG in embarrassingly parallel. 974 """ 975 def __init__( 976 self, 977 graph: _nx.DiGraph, 978 roots: _T.List[str], 979 print_exc: bool = False, 980 max_workers: _T.Optional[int] = None, 981 verbose: bool = True 982 ) -> None: 983 984 # Skip execution if there are no callables in scope. 985 if len(graph.nodes) == 0: 986 if verbose: 987 print('Given DAG has no callables in scope. Skipping execution...') 988 return 989 990 # Initialize local variables for execution. 991 complete: set[str] = set() 992 started: set[str] = set() 993 failed: set[str] = set() 994 skipped: set[str] = set() 995 refs: _T.Dict[str, _futures.Future[_T.Any]] = {} 996 997 def raise_exc(message: str) -> None: 998 raise ValueError(message) 999 1000 def run_conditions_met(n: str) -> bool: 1001 for p in graph.predecessors(n): 1002 match graph.edges[(p, n)]['Condition']: 1003 case Condition.Success: 1004 if p not in complete: 1005 return False 1006 case Condition.Failure: 1007 if p not in failed: 1008 return False 1009 case Condition.AnyCase: 1010 if p not in failed and p not in complete: 1011 return False 1012 return True 1013 1014 skip = graph.nodes.data("skip", default=(False, False)) 1015 retry: _T.Dict[str, _T.List[_T.Any]] = { 1016 n: [c + 1, {}] for n, c in graph.nodes.data("retry", default=0) 1017 } 1018 1019 # Start execution of root node functions. 1020 with _futures.ThreadPoolExecutor(max_workers=max_workers) as executor: 1021 1022 def Submit( 1023 event: str, 1024 callable: _T.Callable[..., _T.Any], 1025 name: str, 1026 args: _T.Union[tuple[_T.Any, ...], list[_T.Any]], 1027 kwargs: _T.Dict[str, _T.Any] 1028 ) -> _futures.Future[_T.Any]: 1029 if skip[event][0]: 1030 retry[event][0] = 0 1031 skip_callable = raise_exc if skip[event][1] else print 1032 return executor.submit( 1033 _event_wrapper, 1034 _callable=skip_callable, 1035 _name=graph.nodes[event]['name'], 1036 _args=[f"Event {event} skipped..."], 1037 _kwargs={}, 1038 _verbose=verbose 1039 ) 1040 else: 1041 retry[event][0] -= 1 1042 if (retry[event][0] > 0) and (retry[event][1] == {}): 1043 retry[event][1] = { 1044 'callable': callable, 1045 'name': name, 1046 'args': args, 1047 'kwargs': kwargs 1048 } 1049 return executor.submit( 1050 _event_wrapper, 1051 _callable=callable, 1052 _name=name, 1053 _args=args, 1054 _kwargs=kwargs, 1055 _verbose=verbose 1056 ) 1057 1058 # Submit root nodes 1059 for event in roots: 1060 refs[event] = Submit( 1061 event, 1062 graph.nodes[event]['callable'], 1063 graph.nodes[event]['name'], 1064 graph.nodes[event]['args'], 1065 graph.nodes[event]['kwargs'] 1066 ) 1067 started.add(event) 1068 1069 # Process futures as they complete (no busy-wait) 1070 pending = set(refs.values()) 1071 future_to_event = {v: k for k, v in refs.items()} 1072 1073 while (len(complete) + len(failed) + len(skipped)) != len(graph): 1074 if not pending: 1075 break 1076 1077 # Wait for at least one future to complete 1078 done, pending = _futures.wait(pending, return_when=_futures.FIRST_COMPLETED) 1079 1080 for future in done: 1081 event = future_to_event[future] 1082 1083 if future.exception() is not None: 1084 try: 1085 raise future.exception() # type: ignore 1086 except: 1087 if print_exc: 1088 _traceback.print_exc() 1089 1090 if retry[event][0] > 0: 1091 if verbose: 1092 print(f"Event {event} exited with exception, retrying...") 1093 new_future = Submit( 1094 event, 1095 callable=retry[event][1]['callable'], 1096 name=retry[event][1]['name'], 1097 args=retry[event][1]['args'], 1098 kwargs=retry[event][1]['kwargs'] 1099 ) 1100 refs[event] = new_future 1101 pending.add(new_future) 1102 future_to_event[new_future] = event 1103 continue 1104 1105 if verbose: 1106 print(f"Event {event} exited with exception...") 1107 failed.add(event) 1108 skipping = [ 1109 e[1] for e in set(graph.out_edges(event)) 1110 if not _edges.continue_on_failure(graph.edges[e]) 1111 ] 1112 else: 1113 complete.add(event) 1114 skipping = [ 1115 e[1] for e in set(graph.out_edges(event)) 1116 if not _edges.continue_on_success(graph.edges[e]) 1117 ] 1118 1119 skipped = skipped.union(skipping) 1120 for n in skipping: 1121 skipped = skipped.union(_nx.descendants(graph, n)) 1122 1123 carryon = set(graph.successors(event)).difference(skipped) 1124 starting = [ 1125 successor for successor in carryon 1126 if run_conditions_met(successor) 1127 ] 1128 1129 for successor in starting: 1130 _args = [ 1131 a if not isinstance(a, _xcoms.Promise) else refs[_T.cast(str, a.key)].result() 1132 for a in graph.nodes[successor]['args'] 1133 ] 1134 _kwargs = { 1135 k: v if not isinstance(v, _xcoms.Promise) else refs[_T.cast(str, v.key)].result() 1136 for k, v in graph.nodes[successor]['kwargs'].items() 1137 } 1138 new_future = Submit( 1139 successor, 1140 graph.nodes[successor]['callable'], 1141 graph.nodes[successor]['name'], 1142 _args, 1143 _kwargs 1144 ) 1145 refs[successor] = new_future 1146 pending.add(new_future) 1147 future_to_event[new_future] = successor 1148 started.add(successor) 1149 1150class QueueDAG(DAG): 1151 """ 1152 Directed Acyclic Graph used to queue and execute Pythonic callables in parallel, 1153 while stringing the outputs of those callables in linear sequences. 1154 1155 Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the 1156 behavior of root nodes to determine if and/or when the DAG run has been completed. 1157 1158 Root nodes will be re-executed until culled by one of two conditions: 1159 1. A max number of iterations has been completed, or 1160 2. The output of the root node's callable matches a lambda halt_condition. 1161 1162 Both conditions can be set at run-time. 1163 1164 QueueDAG can be nested inside a standard webber.DAG by passing qdag.execute as a callable. 1165 Benchmarks show moderate overhead of 1.3-2.5x (~1-2ms fixed cost) when nested, due to the 1166 outer DAG's ThreadPoolExecutor setup and task wrapper infrastructure. This is well within 1167 acceptable millisecond-scale latency for most real-time applications. 1168 """ 1169 1170 conditions: _T.Dict[str, _T.Dict[str, _T.Any]] = {} 1171 1172 def __init__(self) -> None: 1173 super().__init__() 1174 1175 def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str: 1176 """ 1177 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 1178 On success, return unique identifier for the new node. 1179 1180 Reserved key-words are used for Queue DAG definitions: 1181 1182 - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables. 1183 1184 \t halt_condition = (lambda output: output == None) 1185 1186 - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument. 1187 1188 - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop. 1189 """ 1190 halt_condition = kwargs.pop('halt_condition', None) 1191 iterator: int = kwargs.pop('iterator', None) 1192 max_iter: int = kwargs.pop('max_iter', None) 1193 1194 return_val = super().add_node(node, *args, **kwargs) 1195 1196 if max_iter is not None: 1197 iter_limit = int(max_iter) 1198 elif iterator is not None: 1199 iter_limit = int(iterator) 1200 else: 1201 iter_limit = None 1202 1203 self.conditions[return_val] = { 1204 'halt_condition': halt_condition, 1205 'iter_limit': iter_limit 1206 } 1207 1208 return return_val 1209 1210 def add_edge(self, u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable], continue_on: Condition = Condition.Success) -> _T.Tuple[str, str]: 1211 """ 1212 Adds an edge between nodes in the Queue DAG's underlying graph. 1213 Queue DAG nodes may have a maximum of one child and one parent worker. 1214 """ 1215 for node in (u_of_edge, v_of_edge): 1216 try: 1217 node_id = self.node_id(node) 1218 except: 1219 continue 1220 1221 filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id) 1222 try: 1223 assert(len(self.filter_edges(filter)) == 0) 1224 except Exception as e: 1225 e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.") 1226 raise e 1227 1228 return super().add_edge(u_of_edge, v_of_edge, continue_on) 1229 1230 def execute(self, *promises: _T.Any, return_ref: bool = False, print_exc: bool = False) -> _T.List[_T.Any] | None: 1231 """ 1232 Basic wrapper for execution of the DAG's underlying callables. 1233 """ 1234 queues: _T.Dict[str, _q.LifoQueue[_T.Any]] = {} 1235 processes: _T.Dict[str, _futures.Future[_T.Any]] = {} 1236 join: set[str] = set() 1237 end_proc: _T.Optional[str] = None 1238 1239 _promises: _T.Dict[str, _T.Any] = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {} 1240 1241 with _TaskLogger("root") as _: 1242 1243 # Skip execution if there are no callables in scope. 1244 if len(self.graph.nodes) == 0: 1245 print('Given DAG has no callables in scope. Skipping execution...') 1246 return 1247 1248 with _futures.ThreadPoolExecutor() as executor: 1249 1250 for id in self.root: 1251 node = self.get_node(id) 1252 queues[id] = _q.LifoQueue() 1253 node.update({ 1254 'callable': _queue._worker, 1255 'args': tuple(), 1256 'kwargs': { 1257 'work': node.callable, 1258 'args': node.args, 'kwargs': node.kwargs, 1259 'promises': _promises, 1260 'print_exc': print_exc, 1261 'halt_condition': self.conditions[id]['halt_condition'], 1262 'iter_limit': self.conditions[id]['iter_limit'], 1263 'out_queue': queues.get(id) 1264 } 1265 }) 1266 processes[id] = executor.submit( 1267 _event_wrapper, 1268 _callable=node['callable'], 1269 _name=node['name'], 1270 _args=node['args'], 1271 _kwargs=node['kwargs'] 1272 ) 1273 1274 for parent_id, id in self.graph.edges: 1275 node = self.get_node(id) 1276 queues[id] = _q.LifoQueue() 1277 if len(list(self.graph.successors(id))) == 0: 1278 end_proc = id 1279 node.update({ 1280 'callable': _queue._worker, 1281 'args': tuple(), 1282 'kwargs': { 1283 'work': node.callable, 1284 'args': node.args, 'kwargs': node.kwargs, 1285 'promises': _promises, 1286 'print_exc': print_exc, 1287 'parent_id': parent_id, 1288 'parent_process': processes[parent_id], 1289 'in_queue': queues.get(parent_id), 1290 'out_queue': queues.get(id) 1291 } 1292 }) 1293 processes[id] = executor.submit( 1294 _event_wrapper, 1295 _callable=node['callable'], 1296 _name=node['name'], 1297 _args=node['args'], 1298 _kwargs=node['kwargs'] 1299 ) 1300 1301 # For single-node DAGs with no edges, end_proc is the root node 1302 if end_proc is None and len(self.root) > 0: 1303 end_proc = self.root[0] 1304 1305 while len(join) != len(self.graph.nodes): 1306 for node in self.graph.nodes: 1307 if processes[node].done(): 1308 join.add(node) 1309 1310 return_val: _T.List[_T.Any] = [] 1311 if end_proc is not None and end_proc in queues: 1312 while not queues[end_proc].empty(): 1313 return_val.append(queues[end_proc].get()) 1314 1315 return return_val
195class DAG: 196 """ 197 Directed Acyclic Graph used to represent Pythonic tasks in parallel. 198 """ 199 graph: _nx.DiGraph 200 _callable_to_id: _T.Dict[_T.Callable, _T.Optional[str]] # O(1) callable->node_id lookup cache (None = duplicates) 201 202 def _callable_status(self, c: _T.Callable) -> _T.Literal['none', 'one', 'many']: 203 """Returns 'none' if not in DAG, 'one' if unique, 'many' if duplicated. 204 O(1) lookup using _callable_to_id cache.""" 205 if c not in self._callable_to_id: 206 return 'none' 207 return 'one' if self._callable_to_id[c] is not None else 'many' 208 209 def _assign_node(self, n: _T.Any, new_callables: _T.Dict[_T.Callable, _T.Callable]) -> str: 210 """Helper to resolve node to its ID, adding to new_callables dict if needed.""" 211 if not _iscallable(n): 212 return n 213 status = self._callable_status(n) 214 if status == 'many': 215 err_msg = f"Callable {n.__name__} " \ 216 + "exists more than once in this DAG. " \ 217 + "Use the unique string identifier of the required node." 218 raise ValueError(err_msg) 219 if status == 'one': 220 return self._callable_to_id[n] # type: ignore[return-value] 221 new_callables[n] = n 222 return _edges.label_node(n) 223 224 def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str: 225 """ 226 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 227 On success, return unique identifier for the new node. 228 """ 229 if not _iscallable(node): 230 err_msg = f"{node}: requested node is not a callable Python function." 231 raise TypeError(err_msg) 232 233 node_name = _edges.label_node(node) 234 235 args = tuple( 236 arg if not isinstance(arg, _xcoms.Promise) else self.resolve_promise(arg) 237 for arg in args 238 ) 239 240 for k, val in kwargs.items(): 241 if isinstance(val, _xcoms.Promise): 242 kwargs[k] = self.resolve_promise(val) 243 244 self.graph.add_node( 245 node_for_adding=node_name, 246 callable=node, args=args, kwargs=kwargs, 247 name=node.__name__, 248 id=node_name 249 ) 250 251 # Populate callable->id cache for O(1) lookup 252 # If callable already exists, mark as None to indicate duplicates 253 if node in self._callable_to_id: 254 self._callable_to_id[node] = None # Multiple occurrences - must use string ID 255 else: 256 self._callable_to_id[node] = node_name 257 258 return node_name 259 260 261 def add_edge( 262 self, 263 u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable], 264 continue_on: Condition = Condition.Success 265 ) -> _T.Tuple[str,str]: 266 """ 267 Adds an edge between nodes in the DAG's underlying graph, 268 so long as the requested edge is unique and has not been added previously. 269 270 On success, returns Tuple of the new edge's unique identifiers. 271 """ 272 # Validate inputs prior to execution 273 # - Nodes must be identifiers or callables 274 # - Conditions must belong to the webber.edges.Condition class 275 if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)): 276 err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable" 277 raise TypeError(err_msg) 278 if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)): 279 err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable" 280 raise TypeError(err_msg) 281 if not isinstance(continue_on, Condition): 282 raise TypeError("Edge conditions must use the webber.edges.Condition class.") 283 284 # Base Case 0: No nodes are present in the DAG: 285 # Ensure that both nodes are callables, then add both to the graph and 286 # assign the outgoing node as a root. 287 if len(self.graph.nodes()) == 0: 288 if not _iscallable(u_of_edge): 289 err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope." 290 raise ValueError(err_msg) 291 if not _iscallable(v_of_edge): 292 err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope." 293 raise ValueError(err_msg) 294 outgoing_node = self.add_node(u_of_edge) 295 incoming_node = self.add_node(v_of_edge) 296 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 297 return (outgoing_node, incoming_node) 298 299 new_callables: _T.Dict[_T.Callable, _T.Callable] = {} 300 301 if _iscallable(u_of_edge) and _iscallable(v_of_edge): 302 # Type narrowing: we've verified both are callables 303 u_callable = _T.cast(_T.Callable, u_of_edge) 304 v_callable = _T.cast(_T.Callable, v_of_edge) 305 306 # Error Cases 0, 1: Either of the callables appear more than once in the DAG. 307 if self._callable_status(u_callable) == 'many': 308 err_msg = f"Callable {u_callable.__name__} " \ 309 + "exists more than once in this DAG. " \ 310 + "Use the unique string identifier of the required node." 311 raise ValueError(err_msg) 312 if self._callable_status(v_callable) == 'many': 313 err_msg = f"Callable {v_callable.__name__} " \ 314 + "exists more than once in this DAG. " \ 315 + "Use the unique string identifier of the required node." 316 raise ValueError(err_msg) 317 318 # Base Case 1: Both args are callables and will be present in the DAG scope no more than once. 319 # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes. 320 cached_u = self._callable_to_id.get(u_callable) 321 if cached_u is not None: 322 outgoing_node = cached_u 323 else: 324 new_callables[u_callable] = u_callable 325 outgoing_node = _edges.label_node(u_callable) 326 327 cached_v = self._callable_to_id.get(v_callable) 328 if cached_v is not None: 329 incoming_node = cached_v 330 else: 331 new_callables[v_callable] = v_callable 332 incoming_node = _edges.label_node(v_callable) 333 334 else: 335 336 # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope. 337 if isinstance(u_of_edge, str) and u_of_edge not in self.graph.nodes: 338 err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope." 339 raise ValueError(err_msg) 340 if isinstance(v_of_edge, str) and v_of_edge not in self.graph.nodes: 341 err_msg = f"Incoming node {v_of_edge} not in DAG's current scope." 342 raise ValueError(err_msg) 343 344 # Both nodes' unique identifiers are present in the DAG 345 # and should be evaluated for a valid edge. 346 if isinstance(u_of_edge, str) and isinstance(v_of_edge, str): 347 outgoing_node = u_of_edge 348 incoming_node = v_of_edge 349 350 # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier. 351 else: 352 for node in (u_of_edge, v_of_edge): 353 if node == u_of_edge: 354 outgoing_node = self._assign_node(node, new_callables) 355 else: 356 incoming_node = self._assign_node(node, new_callables) 357 358 # Error Case 5: Both callables exist only once in the DAG, 359 # but an edge already exists between them. O(1) lookup with has_edge(). 360 if self.graph.has_edge(outgoing_node, incoming_node): 361 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \ 362 + "a definition in this DAG." 363 raise ValueError(err_msg) 364 365 # Error Case 6: Adding an edge would create circular dependencies. 366 # Use has_path() for O(V+E) incremental check instead of rebuilding entire graph. 367 if incoming_node in self.graph and _nx.has_path(self.graph, incoming_node, outgoing_node): 368 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \ 369 + "results in circular dependencies." 370 raise ValueError(err_msg) 371 372 # We can now add the edge to the DAG, since we are certain it will not result in 373 # illegal dependencies/behavior. 374 # First, we should account for potential new nodes. This also handles 375 # duplicates on first entry to the DAG (e.g.: edge == (print, print)) 376 if _iscallable(u_of_edge) and new_callables.get(_T.cast(_T.Callable, u_of_edge)) is not None: 377 outgoing_node = self.add_node(new_callables[_T.cast(_T.Callable, u_of_edge)]) 378 if _iscallable(v_of_edge) and new_callables.get(_T.cast(_T.Callable, v_of_edge)) is not None: 379 incoming_node = self.add_node(new_callables[_T.cast(_T.Callable, v_of_edge)]) 380 381 # Then we can add the new edge. 382 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 383 return (outgoing_node, incoming_node) 384 385 def remove_node(self, *posargs, **kwargs) -> None: 386 """ 387 Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG. 388 Throws error message and recommends safer methods. 389 """ 390 raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.") 391 392 def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]: 393 """ 394 Removes an directed edge between nodes in the DAG's underlying graph. 395 Throws error if the edge does not exist. 396 On success, returns Tuple of the removed edge's unique identifiers. 397 """ 398 edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge)) 399 if edge_id not in self.graph.edges(data = False): 400 err_msg = "Requested edge does not exist in the DAG's scope" 401 raise ValueError(err_msg) 402 self.graph.remove_edge(edge_id[0], edge_id[1]) 403 return edge_id 404 405 def update_edges(self, *E: _T.Any, continue_on: Condition | None = None, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | edgedict | None = None) -> None: 406 """ 407 Flexible function to update properties of edges in the DAG's scope, 408 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 409 lambda filter using the edgedict syntax. 410 411 List of nodes to update or filter argument is expected. Valid edge lists include: 412 413 \t update_edges((node1, node2), ...) or update_edges([(node1, node2)], ...) 414 415 \t update_edges((node1, callable2), ...) or update_edges([(node1, callable2)], ...) 416 417 \t update_edges(edgedict), where isinstance(edgedict, webber.edges.edgedict) == True 418 419 Parameters: 420 421 > continue_on: webber.edges.Condition value to update matching edges with, 422 423 \t Execute child on success, failure, or any exit state, of the parent node 424 425 > filter: lambda property that can be used instead of a list of edges to be updated. 426 427 \t filter = (lambda e: n.parent == print or e.child == print) 428 429 > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set. 430 431 \t data = { 'continue_on': webber.edges.Condition } 432 433 """ 434 if len(E) == 0 and filter == None: 435 raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.") 436 437 elif isinstance(E, dict) or isinstance(E, edgedict): 438 E = [E] 439 440 elif len(E) == 1 and isinstance(E[0], _abc.Iterable): 441 try: 442 _ = self.get_edges(E[0]) 443 except: 444 E = E[0] 445 446 if filter is not None: 447 edge_ids = self.filter_edges(filter, data = False) 448 else: 449 if isinstance(E[0], dict) or isinstance(E[0], edgedict): 450 try: 451 ids = [e['id'] for e in E] 452 except KeyError: 453 err_msg = 'In dictionary form, all given edges must be follow edgedict standards.' 454 raise ValueError(err_msg) 455 else: 456 ids = E 457 edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids] 458 459 std_update = (continue_on is None) 460 461 if std_update: 462 for edge_id, e in zip(edge_ids, E): 463 if data is not None: 464 self._update_edge(data, id = edge_id) 465 else: 466 self._update_edge(e, id = edge_id) 467 468 else: 469 if continue_on is not None: 470 if not isinstance(continue_on, Condition): 471 err_msg = f"Condition assignment must use webber.edges.Condition" 472 raise TypeError(err_msg) 473 for e in edge_ids: 474 self.graph.edges[e]['Condition'] = continue_on 475 476 def _update_edge(self, edgedict: _T.Dict[str, _T.Any], id: _T.Tuple[str, str] | None = None, force: bool = False) -> None: 477 """ 478 Internal only. Update properties of an individual edge within a DAG's scope, 479 given a well-structured dictionary and the tuple identifier of the network edge. 480 Force argument bypasses validation, and should only be used internally. 481 """ 482 if id is not None: 483 try: 484 if edgedict.get('id') and id != edgedict['id']: 485 raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {edgedict['id']}") 486 except ValueError as e: 487 if not force: 488 raise e 489 edgedict['id'] = id 490 491 expected_keys = ('parent', 'child', 'id', 'continue_on') 492 if not set(expected_keys).issuperset(set(edgedict.keys())): 493 raise ValueError(f"Expecting keys: {expected_keys}") 494 495 if not force: 496 497 e1, e2 = None, None 498 499 if edgedict.get('id'): 500 e1 = self.get_edge(edgedict['id'], data = False) 501 502 if edgedict.get('parent') and edgedict.get('child'): 503 e2 = self.get_edge(edgedict['parent'], edgedict['child'], data = False) 504 else: 505 e2 = e1 506 507 if e1 != e2: 508 raise ValueError('Edge vertices should not be changed using update functions.') 509 510 elif e1 == None: 511 raise ValueError('Requested edge was not given an identifier.') 512 513 if edgedict.get('continue_on') and not isinstance(edgedict['continue_on'], Condition): 514 err_msg = f"Condition assignment must use webber.edges.Condition" 515 raise TypeError(err_msg) 516 517 edge_id = edgedict.pop('id') 518 edge = {k: v for k,v in edgedict.items() if k not in ('parent', 'child', 'id')} 519 self.graph.edges[(edge_id[0], edge_id[1])].update(edge) 520 521 def relabel_node(self, node: _T.Union[str, _T.Callable], label: str) -> str: 522 """ 523 Update the label, or name, given to a node in the DAG's scope, 524 given a Python string and a node identifier. 525 Well-structured wrapper for a common use-case of DAG.update_nodes. 526 """ 527 node_id = self.node_id(node) 528 if not isinstance(label, str) or len(label) == 0: 529 err_msg = "Node label must be a Python string with one or more characters." 530 raise ValueError(err_msg) 531 self.update_nodes(node_id, data = {'name': label}) 532 return label 533 534 def update_nodes(self, *N: _T.Any, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | dotdict | None = None, callable: _T.Callable | None = None, args: _T.Iterable[_T.Any] | None = None, kwargs: _T.Dict[str, _T.Any] | None = None) -> None: 535 """ 536 Flexible function to update properties of nodes in the DAG's scope, 537 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 538 lambda filter using the dotdict syntax. 539 540 List of nodes to update or filter argument is expected. Valid node lists include: 541 542 \t update_nodes(node_id, ...) or update_nodes([node_id], ...) 543 544 \t update_nodes(callable, ...) or update_nodes([callable], ...) 545 546 \t update_nodes(node1, node2, ...) or update_nodes([node1, node2], ...) 547 548 \t update_nodes(node1, callable2, ...) or update_nodes([node1, callable2], ...) 549 550 > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id) 551 552 Parameters: 553 554 > filter: lambda property that can be used instead of a list of nodes to be updated. 555 556 \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args) 557 558 > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set. 559 Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method. 560 561 \t data = { 562 \t 'callable': print, 563 \t 'args': ['Hello', 'World'], 564 \t 'kwargs': {'sep': ', '}, 565 \t 'name': 'custom_label', 566 \t 'id': 'unique-identifier' 567 \t} 568 569 > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List). 570 571 > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary. 572 573 """ 574 if len(N) == 0 and filter == None: 575 raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.") 576 577 elif len(N) > 0 and filter is not None: 578 raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.") 579 580 elif isinstance(N, dict) or isinstance(N, str): 581 N = [N] 582 583 elif len(N) == 1 and isinstance(N[0], _abc.Iterable): 584 if isinstance(N[0][0], dict): 585 N = N[0] 586 elif isinstance(N[0][0], str): 587 # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator). 588 if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 589 N = N[0] 590 591 592 if filter is not None: 593 node_ids = self.filter_nodes(filter, data = False) 594 else: 595 if isinstance(N[0], dict): 596 ids = [n['id'] for n in N] 597 else: 598 ids = N 599 node_ids = [self.node_id(i) for i in ids] 600 601 std_update = (callable == None) and (args == None) and (kwargs == None) 602 603 if std_update: 604 for node_id, n in zip(node_ids, N): 605 if data is not None: 606 self._update_node(data, id = node_id) 607 else: 608 self._update_node(n, id = node_id) 609 610 else: 611 if callable is not None: 612 if not _iscallable(callable): 613 err_msg = f"Requested node is not assigned a callable Python function." 614 raise TypeError(err_msg) 615 for node_id in node_ids: 616 self.graph.nodes[node_id]['callable'] = callable 617 self.graph.nodes[node_id]['name'] = callable.__name__ 618 619 if args is not None: 620 if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)): 621 err_msg = f"Requested node is not assigned a tuple of pos args." 622 raise TypeError(err_msg) 623 args = tuple(args) 624 for node_id in node_ids: 625 self.graph.nodes[node_id]['args'] = args 626 627 if kwargs is not None: 628 if not isinstance(kwargs, dict): 629 err_msg = f"Requested node is not assigned a dictionary of kw args." 630 raise TypeError(err_msg) 631 for node_id in node_ids: 632 self.graph.nodes[node_id]['kwargs'] = kwargs 633 634 def get_edges(self, *N: _T.Any, data: bool = True) -> _T.Union[_T.List[edgedict], _T.List[_T.Tuple[str, str]]]: 635 """ 636 Retrieval function for DAG edge data, based on tuple identifiers. 637 Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2'])) 638 """ 639 if len(N) == 0: 640 if data: 641 return [edgedict(u, v, **d) for u, v, d in self.graph.edges.data()] 642 return list(self.graph.edges.data(data=False)) 643 644 # elif len(N) == 1: 645 # if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple): 646 # N = N[0] 647 648 if len(N) != len(set(N)) or not all(isinstance(n, _abc.Iterable) and len(n) == 2 for n in N): 649 err_msg = 'All requested edges must be unique tuples of size 2.' 650 raise ValueError(err_msg) 651 652 edge_data = [self.get_edge(o, i, data=data) for (o, i) in N] 653 return edge_data 654 655 def get_edge(self, outgoing_node: _T.Union[str, _T.Callable], incoming_node: _T.Union[str, _T.Callable], data: bool = True) -> _T.Union[edgedict, _T.Tuple[str, str]]: 656 """ 657 Retrieval function for a single directed edge between nodes in a DAG's scope. 658 """ 659 id = (self.node_id(outgoing_node), self.node_id(incoming_node)) 660 if not data: 661 return id 662 edge_data = self.graph.get_edge_data(u = id[0], v = id[1]) 663 if not edge_data: 664 err_msg = f'No match found for the directed edge requested: {id}' 665 raise ValueError(err_msg) 666 assert edge_data is not None # Type narrowing for Pylance 667 return edgedict(*id, **edge_data) 668 669 def get_node(self, n: _T.Union[str, _T.Callable]) -> dotdict: 670 """ 671 Given a unique identifier, returns a dictionary of node metadata 672 for a single node in the DAG's scope. 673 """ 674 node_id = self.node_id(n) 675 return dotdict(self.graph.nodes[node_id]) 676 677 def get_nodes(self, *N: _T.Any) -> _T.List[dotdict]: 678 """ 679 Flexible function to retrieve DAG node data, based on node identifiers 680 (e.g.: string IDs or unique callables). 681 """ 682 if len(N) == 0: 683 node_data = list(self.graph.nodes.values()) 684 return [dotdict(d) for d in node_data] 685 686 elif len(N) == 1: 687 if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str): 688 N = N[0] 689 else: 690 node_id = self.node_id(N[0]) 691 node_data = [dotdict(self.graph.nodes[node_id])] 692 return node_data 693 694 if not len(N) == len(set(N)): 695 err_msg = 'All requested nodes must be unique identifiers.' 696 raise ValueError(err_msg) 697 698 node_ids = [self.node_id(n) for n in N] 699 node_data = [dotdict(self.graph.nodes[n]) for n in node_ids] 700 return node_data 701 702 def filter_nodes(self, filter: _types.LambdaType, data: bool = False) -> _T.List[str] | _T.List[dotdict]: 703 """ 704 Given a lambda function, filter nodes in a DAG's scope based on its attributes. 705 Current limitation: Filters must use node identifier strings when referencing nodes. 706 Use get_nodes for more flexible controls. 707 """ 708 if not data: 709 return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))] 710 return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))] 711 712 def filter_edges(self, filter: _types.LambdaType, data: bool = False) -> _T.List[_T.Tuple[str, str]] | _T.List[edgedict]: 713 """ 714 Given a lambda function, filter edges in a DAG's scope based on its attributes. 715 Current limitation: Filters must use node identifier strings when referencing nodes. 716 Use get_edges for more flexible controls. 717 """ 718 if not data: 719 return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))] 720 return [edgedict(*e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))] 721 722 def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int) -> None: 723 """ 724 Given a node identifier, set number of automatic retries in case of failure. 725 Re-attempts will begin as soon as possible. 726 """ 727 if not isinstance(count, int) or not count >= 0: 728 raise ValueError("Retry count must be a non-negative integer.") 729 node = self.node_id(identifier) 730 self.graph.nodes[node]['retry'] = count 731 732 def skip_node(self, identifier: _T.Union[str, _T.Callable], skip: bool = True, as_failure: bool = False) -> None: 733 """ 734 Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error). 735 Allows conditional control and testing over DAG's order of operations. 736 """ 737 if not isinstance(skip, bool): 738 raise ValueError("Skip argument must be a boolean value.") 739 node = self.node_id(identifier) 740 self.graph.nodes[node]['skip'] = (skip, as_failure) 741 742 def critical_path(self, nodes: _T.Union[str, _T.Callable, _T.Iterable[_T.Union[str, _T.Callable]]]) -> 'DAG': 743 """ 744 Given a set of nodes, returns a subset of the DAG containing 745 only the node(s) and its parents, or upstream dependencies. 746 """ 747 if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str): 748 node_ids = {self.node_id(n) for n in nodes} 749 else: 750 node_ids = {self.node_id(nodes)} 751 return self._subgraph(node_ids) 752 753 def execute( 754 self, 755 return_ref: bool = False, 756 print_exc: bool = False, 757 max_workers: _T.Optional[int] = None, 758 verbose: bool = True 759 ) -> _T.Optional['DAG.DAGExecutor']: 760 """ 761 Basic wrapper for execution of the DAG's underlying callables. 762 763 Args: 764 return_ref: If True, return the DAGExecutor instance. 765 print_exc: If True, print full exception tracebacks. 766 max_workers: Maximum number of worker threads. Defaults to None (auto). 767 verbose: If True, log task start/completion messages. Defaults to True. 768 """ 769 executor = self.DAGExecutor( 770 self.graph, self.root, print_exc, 771 max_workers=max_workers, verbose=verbose 772 ) 773 return executor if return_ref else None 774 775 def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] | None = None) -> _T.Any: 776 """ 777 Basic wrapper to visualize DAG using Vis.js and NetGraph libraries. 778 By default, visualization library only loaded in after DAG.visualize() is called, halving import times. 779 """ 780 import webber.viz as _viz 781 782 match type: 783 case 'browser': 784 _viz.visualize_browser(self.graph) 785 786 case 'plt': 787 return _viz.visualize_plt(self.graph) 788 789 case 'gui': 790 # _visualize_gui(self.graph) 791 raise NotImplementedError 792 793 case None: 794 if _viz._in_notebook(): 795 return _viz.visualize_plt(self.graph) 796 else: 797 _viz.visualize_browser(self.graph) 798 799 case _: 800 err_msg = "Unknown visualization type requested." 801 raise NotImplementedError(err_msg) 802 803 @property 804 def root(self) -> _T.List[str]: 805 """ 806 Return list of nodes with no dependencies. 807 Root nodes will occur first in DAG's order of operations. 808 Uses O(1) in_degree() instead of O(k) predecessors list creation. 809 """ 810 return [node for node in self.graph.nodes if self.graph.in_degree(node) == 0] 811 812 @property 813 def nodes(self) -> _T.Any: 814 """Returns the NodeView of the underlying NetworkX graph containing all DAG nodes.""" 815 return self.graph.nodes 816 817 def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str: 818 """ 819 Validate whether identifier given is a valid node within the DAG's scope. 820 Primarily for internal use, but useful for retrieving string identifiers 821 for a unique callable in a DAG. 822 Uses O(1) cache lookup for callables instead of O(n) linear search. 823 """ 824 if isinstance(identifier, str): 825 # O(1) lookup using graph.nodes dict 826 if identifier not in self.graph.nodes: 827 err_msg = f"Node {identifier} is not defined in this DAG's scope." 828 raise ValueError(err_msg) 829 return identifier 830 elif _iscallable(identifier): 831 # O(1) lookup using _callable_to_id cache 832 if identifier not in self._callable_to_id: 833 err_msg = f"Callable {identifier} is not defined in this DAG's scope." 834 raise ValueError(err_msg) 835 cached_id = self._callable_to_id[identifier] 836 if cached_id is None: 837 err_msg = f"Callable {identifier.__name__} " \ 838 + "exists more than once in this DAG. " \ 839 + "Use the unique string identifier of the required node." 840 raise ValueError(err_msg) 841 return cached_id 842 else: 843 err_msg = f"Node {identifier} must be a string or a Python callable" 844 raise TypeError(err_msg) 845 846 def _update_node(self, nodedict: _T.Dict[str, _T.Any], id: str | None = None, force: bool = False) -> None: 847 """ 848 Internal only. Update properties of single node within a DAG's scope, 849 given a well-structured dictionary and the tuple identifier of the network edge. 850 Force argument bypasses dictionary validation, and should only be used internally. 851 """ 852 if id is not None: 853 try: 854 if nodedict.get('id') is not None and id != nodedict['id']: 855 raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {nodedict['id']}") 856 except ValueError as e: 857 if not force: 858 raise e 859 nodedict['id'] = id 860 861 expected_keys = ('callable', 'args', 'kwargs', 'name', 'id') 862 if not set(expected_keys).issuperset(set(nodedict.keys())): 863 raise ValueError(f"Expecting keys: {expected_keys}") 864 865 if not force: 866 if nodedict.get('callable'): 867 if not _iscallable(nodedict['callable']): 868 err_msg = f"Requested node is not assigned a callable Python function." 869 raise TypeError(err_msg) 870 if not nodedict.get('name'): 871 nodedict['name'] = nodedict['callable'].__name__ 872 873 if nodedict.get('name') and (not isinstance(nodedict['name'], str) or len(nodedict['name']) == 0): 874 err_msg = f"Requested node name must be a non-null Python string, will default to callable when not set." 875 raise TypeError(err_msg) 876 877 if nodedict.get('args'): 878 if not (isinstance(nodedict['args'], _abc.Iterable) and not isinstance(nodedict['args'], str)): 879 err_msg = f"Requested node is not assigned a tuple of pos args." 880 raise TypeError(err_msg) 881 nodedict['args'] = tuple(nodedict['args']) 882 883 if nodedict.get('kwargs') and not isinstance(nodedict['kwargs'], dict): 884 err_msg = f"Requested node is not assigned a dictionary of kw args." 885 raise TypeError(err_msg) 886 887 node_id = nodedict.pop('id') 888 self.graph.nodes[node_id].update(nodedict) 889 890 # Reset node name if implicitly requested. 891 if not nodedict.get('name'): 892 self.graph.nodes[node_id]['name'] = self.graph.nodes[node_id]['callable'].__name__ 893 894 def _subgraph(self, node_ids: set[str]) -> 'DAG': 895 """ 896 Internal only. Given a set of nodes, returns a subset of the DAG containing 897 only the node(s) and upstream dependencies. 898 Uses nx.ancestors() for O(V+E) performance instead of manual traversal. 899 """ 900 all_nodes = set(node_ids) 901 for node in node_ids: 902 all_nodes.update(_nx.ancestors(self.graph, node)) 903 subgraph = self.graph.subgraph(all_nodes) 904 return DAG(subgraph, __force=True) 905 906 def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise: 907 """ 908 Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope. 909 Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope. 910 """ 911 try: 912 key = self.node_id(promise.key) 913 except Exception as e: 914 raise _xcoms.InvalidCallable(e) 915 return _xcoms.Promise(key) 916 917 def __init__(self, graph: _T.Union[_nx.DiGraph, _nx.Graph, None] = None, **kwargs: _T.Any) -> None: 918 919 if graph is None: 920 self.graph = _nx.DiGraph() 921 self._callable_to_id = {} 922 return 923 924 # Meant for internal use only, creating DAGs from subgraphs. 925 if kwargs.get('__force') == True: 926 self.graph = _T.cast(_nx.DiGraph, graph) 927 # Build cache from existing subgraph 928 self._callable_to_id = {} 929 for node_id, data in self.graph.nodes(data=True): 930 callable_fn = data.get('callable') 931 if callable_fn is not None: 932 if callable_fn in self._callable_to_id: 933 self._callable_to_id[callable_fn] = None # Duplicate 934 else: 935 self._callable_to_id[callable_fn] = node_id 936 return 937 938 _edges.validate_dag(graph) 939 # Type narrowing: validate_dag ensures graph is a DiGraph 940 assert isinstance(graph, _nx.DiGraph) 941 942 # Define framework specific logic as nested dictionaries. 943 for node in graph.nodes.keys(): 944 graph.nodes[node]['callable'] = node 945 graph.nodes[node]['name'] = node.__name__ 946 graph.nodes[node]['args'] = [] 947 graph.nodes[node]['kwargs'] = {} 948 949 for e in graph.edges: 950 condition = graph.edges[e].get('Condition') 951 if condition is not None: 952 if condition not in Condition: 953 raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.') 954 else: 955 graph.edges[e]['Condition'] = Condition.Success 956 957 graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node)) 958 for n in graph.nodes: 959 graph.nodes[n]['id'] = n 960 self.graph = _nx.DiGraph(graph) 961 962 # Build callable->id cache after relabeling 963 self._callable_to_id = {} 964 for node_id, data in self.graph.nodes(data=True): 965 callable_fn = data.get('callable') 966 if callable_fn is not None: 967 if callable_fn in self._callable_to_id: 968 self._callable_to_id[callable_fn] = None # Duplicate 969 else: 970 self._callable_to_id[callable_fn] = node_id 971 972 class DAGExecutor: 973 """ 974 Base class used to execute DAG in embarrassingly parallel. 975 """ 976 def __init__( 977 self, 978 graph: _nx.DiGraph, 979 roots: _T.List[str], 980 print_exc: bool = False, 981 max_workers: _T.Optional[int] = None, 982 verbose: bool = True 983 ) -> None: 984 985 # Skip execution if there are no callables in scope. 986 if len(graph.nodes) == 0: 987 if verbose: 988 print('Given DAG has no callables in scope. Skipping execution...') 989 return 990 991 # Initialize local variables for execution. 992 complete: set[str] = set() 993 started: set[str] = set() 994 failed: set[str] = set() 995 skipped: set[str] = set() 996 refs: _T.Dict[str, _futures.Future[_T.Any]] = {} 997 998 def raise_exc(message: str) -> None: 999 raise ValueError(message) 1000 1001 def run_conditions_met(n: str) -> bool: 1002 for p in graph.predecessors(n): 1003 match graph.edges[(p, n)]['Condition']: 1004 case Condition.Success: 1005 if p not in complete: 1006 return False 1007 case Condition.Failure: 1008 if p not in failed: 1009 return False 1010 case Condition.AnyCase: 1011 if p not in failed and p not in complete: 1012 return False 1013 return True 1014 1015 skip = graph.nodes.data("skip", default=(False, False)) 1016 retry: _T.Dict[str, _T.List[_T.Any]] = { 1017 n: [c + 1, {}] for n, c in graph.nodes.data("retry", default=0) 1018 } 1019 1020 # Start execution of root node functions. 1021 with _futures.ThreadPoolExecutor(max_workers=max_workers) as executor: 1022 1023 def Submit( 1024 event: str, 1025 callable: _T.Callable[..., _T.Any], 1026 name: str, 1027 args: _T.Union[tuple[_T.Any, ...], list[_T.Any]], 1028 kwargs: _T.Dict[str, _T.Any] 1029 ) -> _futures.Future[_T.Any]: 1030 if skip[event][0]: 1031 retry[event][0] = 0 1032 skip_callable = raise_exc if skip[event][1] else print 1033 return executor.submit( 1034 _event_wrapper, 1035 _callable=skip_callable, 1036 _name=graph.nodes[event]['name'], 1037 _args=[f"Event {event} skipped..."], 1038 _kwargs={}, 1039 _verbose=verbose 1040 ) 1041 else: 1042 retry[event][0] -= 1 1043 if (retry[event][0] > 0) and (retry[event][1] == {}): 1044 retry[event][1] = { 1045 'callable': callable, 1046 'name': name, 1047 'args': args, 1048 'kwargs': kwargs 1049 } 1050 return executor.submit( 1051 _event_wrapper, 1052 _callable=callable, 1053 _name=name, 1054 _args=args, 1055 _kwargs=kwargs, 1056 _verbose=verbose 1057 ) 1058 1059 # Submit root nodes 1060 for event in roots: 1061 refs[event] = Submit( 1062 event, 1063 graph.nodes[event]['callable'], 1064 graph.nodes[event]['name'], 1065 graph.nodes[event]['args'], 1066 graph.nodes[event]['kwargs'] 1067 ) 1068 started.add(event) 1069 1070 # Process futures as they complete (no busy-wait) 1071 pending = set(refs.values()) 1072 future_to_event = {v: k for k, v in refs.items()} 1073 1074 while (len(complete) + len(failed) + len(skipped)) != len(graph): 1075 if not pending: 1076 break 1077 1078 # Wait for at least one future to complete 1079 done, pending = _futures.wait(pending, return_when=_futures.FIRST_COMPLETED) 1080 1081 for future in done: 1082 event = future_to_event[future] 1083 1084 if future.exception() is not None: 1085 try: 1086 raise future.exception() # type: ignore 1087 except: 1088 if print_exc: 1089 _traceback.print_exc() 1090 1091 if retry[event][0] > 0: 1092 if verbose: 1093 print(f"Event {event} exited with exception, retrying...") 1094 new_future = Submit( 1095 event, 1096 callable=retry[event][1]['callable'], 1097 name=retry[event][1]['name'], 1098 args=retry[event][1]['args'], 1099 kwargs=retry[event][1]['kwargs'] 1100 ) 1101 refs[event] = new_future 1102 pending.add(new_future) 1103 future_to_event[new_future] = event 1104 continue 1105 1106 if verbose: 1107 print(f"Event {event} exited with exception...") 1108 failed.add(event) 1109 skipping = [ 1110 e[1] for e in set(graph.out_edges(event)) 1111 if not _edges.continue_on_failure(graph.edges[e]) 1112 ] 1113 else: 1114 complete.add(event) 1115 skipping = [ 1116 e[1] for e in set(graph.out_edges(event)) 1117 if not _edges.continue_on_success(graph.edges[e]) 1118 ] 1119 1120 skipped = skipped.union(skipping) 1121 for n in skipping: 1122 skipped = skipped.union(_nx.descendants(graph, n)) 1123 1124 carryon = set(graph.successors(event)).difference(skipped) 1125 starting = [ 1126 successor for successor in carryon 1127 if run_conditions_met(successor) 1128 ] 1129 1130 for successor in starting: 1131 _args = [ 1132 a if not isinstance(a, _xcoms.Promise) else refs[_T.cast(str, a.key)].result() 1133 for a in graph.nodes[successor]['args'] 1134 ] 1135 _kwargs = { 1136 k: v if not isinstance(v, _xcoms.Promise) else refs[_T.cast(str, v.key)].result() 1137 for k, v in graph.nodes[successor]['kwargs'].items() 1138 } 1139 new_future = Submit( 1140 successor, 1141 graph.nodes[successor]['callable'], 1142 graph.nodes[successor]['name'], 1143 _args, 1144 _kwargs 1145 ) 1146 refs[successor] = new_future 1147 pending.add(new_future) 1148 future_to_event[new_future] = successor 1149 started.add(successor)
Directed Acyclic Graph used to represent Pythonic tasks in parallel.
917 def __init__(self, graph: _T.Union[_nx.DiGraph, _nx.Graph, None] = None, **kwargs: _T.Any) -> None: 918 919 if graph is None: 920 self.graph = _nx.DiGraph() 921 self._callable_to_id = {} 922 return 923 924 # Meant for internal use only, creating DAGs from subgraphs. 925 if kwargs.get('__force') == True: 926 self.graph = _T.cast(_nx.DiGraph, graph) 927 # Build cache from existing subgraph 928 self._callable_to_id = {} 929 for node_id, data in self.graph.nodes(data=True): 930 callable_fn = data.get('callable') 931 if callable_fn is not None: 932 if callable_fn in self._callable_to_id: 933 self._callable_to_id[callable_fn] = None # Duplicate 934 else: 935 self._callable_to_id[callable_fn] = node_id 936 return 937 938 _edges.validate_dag(graph) 939 # Type narrowing: validate_dag ensures graph is a DiGraph 940 assert isinstance(graph, _nx.DiGraph) 941 942 # Define framework specific logic as nested dictionaries. 943 for node in graph.nodes.keys(): 944 graph.nodes[node]['callable'] = node 945 graph.nodes[node]['name'] = node.__name__ 946 graph.nodes[node]['args'] = [] 947 graph.nodes[node]['kwargs'] = {} 948 949 for e in graph.edges: 950 condition = graph.edges[e].get('Condition') 951 if condition is not None: 952 if condition not in Condition: 953 raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.') 954 else: 955 graph.edges[e]['Condition'] = Condition.Success 956 957 graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node)) 958 for n in graph.nodes: 959 graph.nodes[n]['id'] = n 960 self.graph = _nx.DiGraph(graph) 961 962 # Build callable->id cache after relabeling 963 self._callable_to_id = {} 964 for node_id, data in self.graph.nodes(data=True): 965 callable_fn = data.get('callable') 966 if callable_fn is not None: 967 if callable_fn in self._callable_to_id: 968 self._callable_to_id[callable_fn] = None # Duplicate 969 else: 970 self._callable_to_id[callable_fn] = node_id
224 def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str: 225 """ 226 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 227 On success, return unique identifier for the new node. 228 """ 229 if not _iscallable(node): 230 err_msg = f"{node}: requested node is not a callable Python function." 231 raise TypeError(err_msg) 232 233 node_name = _edges.label_node(node) 234 235 args = tuple( 236 arg if not isinstance(arg, _xcoms.Promise) else self.resolve_promise(arg) 237 for arg in args 238 ) 239 240 for k, val in kwargs.items(): 241 if isinstance(val, _xcoms.Promise): 242 kwargs[k] = self.resolve_promise(val) 243 244 self.graph.add_node( 245 node_for_adding=node_name, 246 callable=node, args=args, kwargs=kwargs, 247 name=node.__name__, 248 id=node_name 249 ) 250 251 # Populate callable->id cache for O(1) lookup 252 # If callable already exists, mark as None to indicate duplicates 253 if node in self._callable_to_id: 254 self._callable_to_id[node] = None # Multiple occurrences - must use string ID 255 else: 256 self._callable_to_id[node] = node_name 257 258 return node_name
Adds a callable with positional and keyword arguments to the DAG's underlying graph. On success, return unique identifier for the new node.
261 def add_edge( 262 self, 263 u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable], 264 continue_on: Condition = Condition.Success 265 ) -> _T.Tuple[str,str]: 266 """ 267 Adds an edge between nodes in the DAG's underlying graph, 268 so long as the requested edge is unique and has not been added previously. 269 270 On success, returns Tuple of the new edge's unique identifiers. 271 """ 272 # Validate inputs prior to execution 273 # - Nodes must be identifiers or callables 274 # - Conditions must belong to the webber.edges.Condition class 275 if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)): 276 err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable" 277 raise TypeError(err_msg) 278 if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)): 279 err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable" 280 raise TypeError(err_msg) 281 if not isinstance(continue_on, Condition): 282 raise TypeError("Edge conditions must use the webber.edges.Condition class.") 283 284 # Base Case 0: No nodes are present in the DAG: 285 # Ensure that both nodes are callables, then add both to the graph and 286 # assign the outgoing node as a root. 287 if len(self.graph.nodes()) == 0: 288 if not _iscallable(u_of_edge): 289 err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope." 290 raise ValueError(err_msg) 291 if not _iscallable(v_of_edge): 292 err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope." 293 raise ValueError(err_msg) 294 outgoing_node = self.add_node(u_of_edge) 295 incoming_node = self.add_node(v_of_edge) 296 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 297 return (outgoing_node, incoming_node) 298 299 new_callables: _T.Dict[_T.Callable, _T.Callable] = {} 300 301 if _iscallable(u_of_edge) and _iscallable(v_of_edge): 302 # Type narrowing: we've verified both are callables 303 u_callable = _T.cast(_T.Callable, u_of_edge) 304 v_callable = _T.cast(_T.Callable, v_of_edge) 305 306 # Error Cases 0, 1: Either of the callables appear more than once in the DAG. 307 if self._callable_status(u_callable) == 'many': 308 err_msg = f"Callable {u_callable.__name__} " \ 309 + "exists more than once in this DAG. " \ 310 + "Use the unique string identifier of the required node." 311 raise ValueError(err_msg) 312 if self._callable_status(v_callable) == 'many': 313 err_msg = f"Callable {v_callable.__name__} " \ 314 + "exists more than once in this DAG. " \ 315 + "Use the unique string identifier of the required node." 316 raise ValueError(err_msg) 317 318 # Base Case 1: Both args are callables and will be present in the DAG scope no more than once. 319 # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes. 320 cached_u = self._callable_to_id.get(u_callable) 321 if cached_u is not None: 322 outgoing_node = cached_u 323 else: 324 new_callables[u_callable] = u_callable 325 outgoing_node = _edges.label_node(u_callable) 326 327 cached_v = self._callable_to_id.get(v_callable) 328 if cached_v is not None: 329 incoming_node = cached_v 330 else: 331 new_callables[v_callable] = v_callable 332 incoming_node = _edges.label_node(v_callable) 333 334 else: 335 336 # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope. 337 if isinstance(u_of_edge, str) and u_of_edge not in self.graph.nodes: 338 err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope." 339 raise ValueError(err_msg) 340 if isinstance(v_of_edge, str) and v_of_edge not in self.graph.nodes: 341 err_msg = f"Incoming node {v_of_edge} not in DAG's current scope." 342 raise ValueError(err_msg) 343 344 # Both nodes' unique identifiers are present in the DAG 345 # and should be evaluated for a valid edge. 346 if isinstance(u_of_edge, str) and isinstance(v_of_edge, str): 347 outgoing_node = u_of_edge 348 incoming_node = v_of_edge 349 350 # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier. 351 else: 352 for node in (u_of_edge, v_of_edge): 353 if node == u_of_edge: 354 outgoing_node = self._assign_node(node, new_callables) 355 else: 356 incoming_node = self._assign_node(node, new_callables) 357 358 # Error Case 5: Both callables exist only once in the DAG, 359 # but an edge already exists between them. O(1) lookup with has_edge(). 360 if self.graph.has_edge(outgoing_node, incoming_node): 361 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \ 362 + "a definition in this DAG." 363 raise ValueError(err_msg) 364 365 # Error Case 6: Adding an edge would create circular dependencies. 366 # Use has_path() for O(V+E) incremental check instead of rebuilding entire graph. 367 if incoming_node in self.graph and _nx.has_path(self.graph, incoming_node, outgoing_node): 368 err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \ 369 + "results in circular dependencies." 370 raise ValueError(err_msg) 371 372 # We can now add the edge to the DAG, since we are certain it will not result in 373 # illegal dependencies/behavior. 374 # First, we should account for potential new nodes. This also handles 375 # duplicates on first entry to the DAG (e.g.: edge == (print, print)) 376 if _iscallable(u_of_edge) and new_callables.get(_T.cast(_T.Callable, u_of_edge)) is not None: 377 outgoing_node = self.add_node(new_callables[_T.cast(_T.Callable, u_of_edge)]) 378 if _iscallable(v_of_edge) and new_callables.get(_T.cast(_T.Callable, v_of_edge)) is not None: 379 incoming_node = self.add_node(new_callables[_T.cast(_T.Callable, v_of_edge)]) 380 381 # Then we can add the new edge. 382 self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on) 383 return (outgoing_node, incoming_node)
Adds an edge between nodes in the DAG's underlying graph, so long as the requested edge is unique and has not been added previously.
On success, returns Tuple of the new edge's unique identifiers.
385 def remove_node(self, *posargs, **kwargs) -> None: 386 """ 387 Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG. 388 Throws error message and recommends safer methods. 389 """ 390 raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.")
Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG. Throws error message and recommends safer methods.
392 def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]: 393 """ 394 Removes an directed edge between nodes in the DAG's underlying graph. 395 Throws error if the edge does not exist. 396 On success, returns Tuple of the removed edge's unique identifiers. 397 """ 398 edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge)) 399 if edge_id not in self.graph.edges(data = False): 400 err_msg = "Requested edge does not exist in the DAG's scope" 401 raise ValueError(err_msg) 402 self.graph.remove_edge(edge_id[0], edge_id[1]) 403 return edge_id
Removes an directed edge between nodes in the DAG's underlying graph. Throws error if the edge does not exist. On success, returns Tuple of the removed edge's unique identifiers.
405 def update_edges(self, *E: _T.Any, continue_on: Condition | None = None, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | edgedict | None = None) -> None: 406 """ 407 Flexible function to update properties of edges in the DAG's scope, 408 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 409 lambda filter using the edgedict syntax. 410 411 List of nodes to update or filter argument is expected. Valid edge lists include: 412 413 \t update_edges((node1, node2), ...) or update_edges([(node1, node2)], ...) 414 415 \t update_edges((node1, callable2), ...) or update_edges([(node1, callable2)], ...) 416 417 \t update_edges(edgedict), where isinstance(edgedict, webber.edges.edgedict) == True 418 419 Parameters: 420 421 > continue_on: webber.edges.Condition value to update matching edges with, 422 423 \t Execute child on success, failure, or any exit state, of the parent node 424 425 > filter: lambda property that can be used instead of a list of edges to be updated. 426 427 \t filter = (lambda e: n.parent == print or e.child == print) 428 429 > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set. 430 431 \t data = { 'continue_on': webber.edges.Condition } 432 433 """ 434 if len(E) == 0 and filter == None: 435 raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.") 436 437 elif isinstance(E, dict) or isinstance(E, edgedict): 438 E = [E] 439 440 elif len(E) == 1 and isinstance(E[0], _abc.Iterable): 441 try: 442 _ = self.get_edges(E[0]) 443 except: 444 E = E[0] 445 446 if filter is not None: 447 edge_ids = self.filter_edges(filter, data = False) 448 else: 449 if isinstance(E[0], dict) or isinstance(E[0], edgedict): 450 try: 451 ids = [e['id'] for e in E] 452 except KeyError: 453 err_msg = 'In dictionary form, all given edges must be follow edgedict standards.' 454 raise ValueError(err_msg) 455 else: 456 ids = E 457 edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids] 458 459 std_update = (continue_on is None) 460 461 if std_update: 462 for edge_id, e in zip(edge_ids, E): 463 if data is not None: 464 self._update_edge(data, id = edge_id) 465 else: 466 self._update_edge(e, id = edge_id) 467 468 else: 469 if continue_on is not None: 470 if not isinstance(continue_on, Condition): 471 err_msg = f"Condition assignment must use webber.edges.Condition" 472 raise TypeError(err_msg) 473 for e in edge_ids: 474 self.graph.edges[e]['Condition'] = continue_on
Flexible function to update properties of edges in the DAG's scope, based on unique identifier(s) (e.g.: string IDs or unique callables) or a lambda filter using the edgedict syntax.
List of nodes to update or filter argument is expected. Valid edge lists include:
update_edges((node1, node2), ...) or update_edges([(node1, node2)], ...)
update_edges((node1, callable2), ...) or update_edges([(node1, callable2)], ...)
update_edges(edgedict), where isinstance(edgedict, webber.edges.edgedict) == True
Parameters:
continue_on: webber.edges.Condition value to update matching edges with,
Execute child on success, failure, or any exit state, of the parent nodefilter: lambda property that can be used instead of a list of edges to be updated.
filter = (lambda e: n.parent == print or e.child == print)data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set.
data = { 'continue_on': webber.edges.Condition }
521 def relabel_node(self, node: _T.Union[str, _T.Callable], label: str) -> str: 522 """ 523 Update the label, or name, given to a node in the DAG's scope, 524 given a Python string and a node identifier. 525 Well-structured wrapper for a common use-case of DAG.update_nodes. 526 """ 527 node_id = self.node_id(node) 528 if not isinstance(label, str) or len(label) == 0: 529 err_msg = "Node label must be a Python string with one or more characters." 530 raise ValueError(err_msg) 531 self.update_nodes(node_id, data = {'name': label}) 532 return label
Update the label, or name, given to a node in the DAG's scope, given a Python string and a node identifier. Well-structured wrapper for a common use-case of DAG.update_nodes.
534 def update_nodes(self, *N: _T.Any, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | dotdict | None = None, callable: _T.Callable | None = None, args: _T.Iterable[_T.Any] | None = None, kwargs: _T.Dict[str, _T.Any] | None = None) -> None: 535 """ 536 Flexible function to update properties of nodes in the DAG's scope, 537 based on unique identifier(s) (e.g.: string IDs or unique callables) or a 538 lambda filter using the dotdict syntax. 539 540 List of nodes to update or filter argument is expected. Valid node lists include: 541 542 \t update_nodes(node_id, ...) or update_nodes([node_id], ...) 543 544 \t update_nodes(callable, ...) or update_nodes([callable], ...) 545 546 \t update_nodes(node1, node2, ...) or update_nodes([node1, node2], ...) 547 548 \t update_nodes(node1, callable2, ...) or update_nodes([node1, callable2], ...) 549 550 > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id) 551 552 Parameters: 553 554 > filter: lambda property that can be used instead of a list of nodes to be updated. 555 556 \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args) 557 558 > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set. 559 Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method. 560 561 \t data = { 562 \t 'callable': print, 563 \t 'args': ['Hello', 'World'], 564 \t 'kwargs': {'sep': ', '}, 565 \t 'name': 'custom_label', 566 \t 'id': 'unique-identifier' 567 \t} 568 569 > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List). 570 571 > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary. 572 573 """ 574 if len(N) == 0 and filter == None: 575 raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.") 576 577 elif len(N) > 0 and filter is not None: 578 raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.") 579 580 elif isinstance(N, dict) or isinstance(N, str): 581 N = [N] 582 583 elif len(N) == 1 and isinstance(N[0], _abc.Iterable): 584 if isinstance(N[0][0], dict): 585 N = N[0] 586 elif isinstance(N[0][0], str): 587 # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator). 588 if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 589 N = N[0] 590 591 592 if filter is not None: 593 node_ids = self.filter_nodes(filter, data = False) 594 else: 595 if isinstance(N[0], dict): 596 ids = [n['id'] for n in N] 597 else: 598 ids = N 599 node_ids = [self.node_id(i) for i in ids] 600 601 std_update = (callable == None) and (args == None) and (kwargs == None) 602 603 if std_update: 604 for node_id, n in zip(node_ids, N): 605 if data is not None: 606 self._update_node(data, id = node_id) 607 else: 608 self._update_node(n, id = node_id) 609 610 else: 611 if callable is not None: 612 if not _iscallable(callable): 613 err_msg = f"Requested node is not assigned a callable Python function." 614 raise TypeError(err_msg) 615 for node_id in node_ids: 616 self.graph.nodes[node_id]['callable'] = callable 617 self.graph.nodes[node_id]['name'] = callable.__name__ 618 619 if args is not None: 620 if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)): 621 err_msg = f"Requested node is not assigned a tuple of pos args." 622 raise TypeError(err_msg) 623 args = tuple(args) 624 for node_id in node_ids: 625 self.graph.nodes[node_id]['args'] = args 626 627 if kwargs is not None: 628 if not isinstance(kwargs, dict): 629 err_msg = f"Requested node is not assigned a dictionary of kw args." 630 raise TypeError(err_msg) 631 for node_id in node_ids: 632 self.graph.nodes[node_id]['kwargs'] = kwargs
Flexible function to update properties of nodes in the DAG's scope, based on unique identifier(s) (e.g.: string IDs or unique callables) or a lambda filter using the dotdict syntax.
List of nodes to update or filter argument is expected. Valid node lists include:
update_nodes(node_id, ...) or update_nodes([node_id], ...)
update_nodes(callable, ...) or update_nodes([callable], ...)
update_nodes(node1, node2, ...) or update_nodes([node1, node2], ...)
update_nodes(node1, callable2, ...) or update_nodes([node1, callable2], ...)
update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id)
Parameters:
filter: lambda property that can be used instead of a list of nodes to be updated.
filter = (lambda n: n.callable == print or 'Hello, World' in n.args)data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set. Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method.
data = { 'callable': print, 'args': ['Hello', 'World'], 'kwargs': {'sep': ', '}, 'name': 'custom_label', 'id': 'unique-identifier' }args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List).
kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary.
634 def get_edges(self, *N: _T.Any, data: bool = True) -> _T.Union[_T.List[edgedict], _T.List[_T.Tuple[str, str]]]: 635 """ 636 Retrieval function for DAG edge data, based on tuple identifiers. 637 Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2'])) 638 """ 639 if len(N) == 0: 640 if data: 641 return [edgedict(u, v, **d) for u, v, d in self.graph.edges.data()] 642 return list(self.graph.edges.data(data=False)) 643 644 # elif len(N) == 1: 645 # if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple): 646 # N = N[0] 647 648 if len(N) != len(set(N)) or not all(isinstance(n, _abc.Iterable) and len(n) == 2 for n in N): 649 err_msg = 'All requested edges must be unique tuples of size 2.' 650 raise ValueError(err_msg) 651 652 edge_data = [self.get_edge(o, i, data=data) for (o, i) in N] 653 return edge_data
Retrieval function for DAG edge data, based on tuple identifiers. Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2']))
655 def get_edge(self, outgoing_node: _T.Union[str, _T.Callable], incoming_node: _T.Union[str, _T.Callable], data: bool = True) -> _T.Union[edgedict, _T.Tuple[str, str]]: 656 """ 657 Retrieval function for a single directed edge between nodes in a DAG's scope. 658 """ 659 id = (self.node_id(outgoing_node), self.node_id(incoming_node)) 660 if not data: 661 return id 662 edge_data = self.graph.get_edge_data(u = id[0], v = id[1]) 663 if not edge_data: 664 err_msg = f'No match found for the directed edge requested: {id}' 665 raise ValueError(err_msg) 666 assert edge_data is not None # Type narrowing for Pylance 667 return edgedict(*id, **edge_data)
Retrieval function for a single directed edge between nodes in a DAG's scope.
669 def get_node(self, n: _T.Union[str, _T.Callable]) -> dotdict: 670 """ 671 Given a unique identifier, returns a dictionary of node metadata 672 for a single node in the DAG's scope. 673 """ 674 node_id = self.node_id(n) 675 return dotdict(self.graph.nodes[node_id])
Given a unique identifier, returns a dictionary of node metadata for a single node in the DAG's scope.
677 def get_nodes(self, *N: _T.Any) -> _T.List[dotdict]: 678 """ 679 Flexible function to retrieve DAG node data, based on node identifiers 680 (e.g.: string IDs or unique callables). 681 """ 682 if len(N) == 0: 683 node_data = list(self.graph.nodes.values()) 684 return [dotdict(d) for d in node_data] 685 686 elif len(N) == 1: 687 if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str): 688 N = N[0] 689 else: 690 node_id = self.node_id(N[0]) 691 node_data = [dotdict(self.graph.nodes[node_id])] 692 return node_data 693 694 if not len(N) == len(set(N)): 695 err_msg = 'All requested nodes must be unique identifiers.' 696 raise ValueError(err_msg) 697 698 node_ids = [self.node_id(n) for n in N] 699 node_data = [dotdict(self.graph.nodes[n]) for n in node_ids] 700 return node_data
Flexible function to retrieve DAG node data, based on node identifiers (e.g.: string IDs or unique callables).
702 def filter_nodes(self, filter: _types.LambdaType, data: bool = False) -> _T.List[str] | _T.List[dotdict]: 703 """ 704 Given a lambda function, filter nodes in a DAG's scope based on its attributes. 705 Current limitation: Filters must use node identifier strings when referencing nodes. 706 Use get_nodes for more flexible controls. 707 """ 708 if not data: 709 return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))] 710 return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))]
Given a lambda function, filter nodes in a DAG's scope based on its attributes. Current limitation: Filters must use node identifier strings when referencing nodes. Use get_nodes for more flexible controls.
712 def filter_edges(self, filter: _types.LambdaType, data: bool = False) -> _T.List[_T.Tuple[str, str]] | _T.List[edgedict]: 713 """ 714 Given a lambda function, filter edges in a DAG's scope based on its attributes. 715 Current limitation: Filters must use node identifier strings when referencing nodes. 716 Use get_edges for more flexible controls. 717 """ 718 if not data: 719 return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))] 720 return [edgedict(*e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
Given a lambda function, filter edges in a DAG's scope based on its attributes. Current limitation: Filters must use node identifier strings when referencing nodes. Use get_edges for more flexible controls.
722 def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int) -> None: 723 """ 724 Given a node identifier, set number of automatic retries in case of failure. 725 Re-attempts will begin as soon as possible. 726 """ 727 if not isinstance(count, int) or not count >= 0: 728 raise ValueError("Retry count must be a non-negative integer.") 729 node = self.node_id(identifier) 730 self.graph.nodes[node]['retry'] = count
Given a node identifier, set number of automatic retries in case of failure. Re-attempts will begin as soon as possible.
732 def skip_node(self, identifier: _T.Union[str, _T.Callable], skip: bool = True, as_failure: bool = False) -> None: 733 """ 734 Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error). 735 Allows conditional control and testing over DAG's order of operations. 736 """ 737 if not isinstance(skip, bool): 738 raise ValueError("Skip argument must be a boolean value.") 739 node = self.node_id(identifier) 740 self.graph.nodes[node]['skip'] = (skip, as_failure)
Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error). Allows conditional control and testing over DAG's order of operations.
742 def critical_path(self, nodes: _T.Union[str, _T.Callable, _T.Iterable[_T.Union[str, _T.Callable]]]) -> 'DAG': 743 """ 744 Given a set of nodes, returns a subset of the DAG containing 745 only the node(s) and its parents, or upstream dependencies. 746 """ 747 if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str): 748 node_ids = {self.node_id(n) for n in nodes} 749 else: 750 node_ids = {self.node_id(nodes)} 751 return self._subgraph(node_ids)
Given a set of nodes, returns a subset of the DAG containing only the node(s) and its parents, or upstream dependencies.
753 def execute( 754 self, 755 return_ref: bool = False, 756 print_exc: bool = False, 757 max_workers: _T.Optional[int] = None, 758 verbose: bool = True 759 ) -> _T.Optional['DAG.DAGExecutor']: 760 """ 761 Basic wrapper for execution of the DAG's underlying callables. 762 763 Args: 764 return_ref: If True, return the DAGExecutor instance. 765 print_exc: If True, print full exception tracebacks. 766 max_workers: Maximum number of worker threads. Defaults to None (auto). 767 verbose: If True, log task start/completion messages. Defaults to True. 768 """ 769 executor = self.DAGExecutor( 770 self.graph, self.root, print_exc, 771 max_workers=max_workers, verbose=verbose 772 ) 773 return executor if return_ref else None
Basic wrapper for execution of the DAG's underlying callables.
Args: return_ref: If True, return the DAGExecutor instance. print_exc: If True, print full exception tracebacks. max_workers: Maximum number of worker threads. Defaults to None (auto). verbose: If True, log task start/completion messages. Defaults to True.
775 def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] | None = None) -> _T.Any: 776 """ 777 Basic wrapper to visualize DAG using Vis.js and NetGraph libraries. 778 By default, visualization library only loaded in after DAG.visualize() is called, halving import times. 779 """ 780 import webber.viz as _viz 781 782 match type: 783 case 'browser': 784 _viz.visualize_browser(self.graph) 785 786 case 'plt': 787 return _viz.visualize_plt(self.graph) 788 789 case 'gui': 790 # _visualize_gui(self.graph) 791 raise NotImplementedError 792 793 case None: 794 if _viz._in_notebook(): 795 return _viz.visualize_plt(self.graph) 796 else: 797 _viz.visualize_browser(self.graph) 798 799 case _: 800 err_msg = "Unknown visualization type requested." 801 raise NotImplementedError(err_msg)
Basic wrapper to visualize DAG using Vis.js and NetGraph libraries. By default, visualization library only loaded in after DAG.visualize() is called, halving import times.
803 @property 804 def root(self) -> _T.List[str]: 805 """ 806 Return list of nodes with no dependencies. 807 Root nodes will occur first in DAG's order of operations. 808 Uses O(1) in_degree() instead of O(k) predecessors list creation. 809 """ 810 return [node for node in self.graph.nodes if self.graph.in_degree(node) == 0]
Return list of nodes with no dependencies. Root nodes will occur first in DAG's order of operations. Uses O(1) in_degree() instead of O(k) predecessors list creation.
812 @property 813 def nodes(self) -> _T.Any: 814 """Returns the NodeView of the underlying NetworkX graph containing all DAG nodes.""" 815 return self.graph.nodes
Returns the NodeView of the underlying NetworkX graph containing all DAG nodes.
817 def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str: 818 """ 819 Validate whether identifier given is a valid node within the DAG's scope. 820 Primarily for internal use, but useful for retrieving string identifiers 821 for a unique callable in a DAG. 822 Uses O(1) cache lookup for callables instead of O(n) linear search. 823 """ 824 if isinstance(identifier, str): 825 # O(1) lookup using graph.nodes dict 826 if identifier not in self.graph.nodes: 827 err_msg = f"Node {identifier} is not defined in this DAG's scope." 828 raise ValueError(err_msg) 829 return identifier 830 elif _iscallable(identifier): 831 # O(1) lookup using _callable_to_id cache 832 if identifier not in self._callable_to_id: 833 err_msg = f"Callable {identifier} is not defined in this DAG's scope." 834 raise ValueError(err_msg) 835 cached_id = self._callable_to_id[identifier] 836 if cached_id is None: 837 err_msg = f"Callable {identifier.__name__} " \ 838 + "exists more than once in this DAG. " \ 839 + "Use the unique string identifier of the required node." 840 raise ValueError(err_msg) 841 return cached_id 842 else: 843 err_msg = f"Node {identifier} must be a string or a Python callable" 844 raise TypeError(err_msg)
Validate whether identifier given is a valid node within the DAG's scope. Primarily for internal use, but useful for retrieving string identifiers for a unique callable in a DAG. Uses O(1) cache lookup for callables instead of O(n) linear search.
906 def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise: 907 """ 908 Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope. 909 Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope. 910 """ 911 try: 912 key = self.node_id(promise.key) 913 except Exception as e: 914 raise _xcoms.InvalidCallable(e) 915 return _xcoms.Promise(key)
Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope.
Raises webber.xcoms.InvalidCallable if Promise requests a callable that is out of scope.
972 class DAGExecutor: 973 """ 974 Base class used to execute DAG in embarrassingly parallel. 975 """ 976 def __init__( 977 self, 978 graph: _nx.DiGraph, 979 roots: _T.List[str], 980 print_exc: bool = False, 981 max_workers: _T.Optional[int] = None, 982 verbose: bool = True 983 ) -> None: 984 985 # Skip execution if there are no callables in scope. 986 if len(graph.nodes) == 0: 987 if verbose: 988 print('Given DAG has no callables in scope. Skipping execution...') 989 return 990 991 # Initialize local variables for execution. 992 complete: set[str] = set() 993 started: set[str] = set() 994 failed: set[str] = set() 995 skipped: set[str] = set() 996 refs: _T.Dict[str, _futures.Future[_T.Any]] = {} 997 998 def raise_exc(message: str) -> None: 999 raise ValueError(message) 1000 1001 def run_conditions_met(n: str) -> bool: 1002 for p in graph.predecessors(n): 1003 match graph.edges[(p, n)]['Condition']: 1004 case Condition.Success: 1005 if p not in complete: 1006 return False 1007 case Condition.Failure: 1008 if p not in failed: 1009 return False 1010 case Condition.AnyCase: 1011 if p not in failed and p not in complete: 1012 return False 1013 return True 1014 1015 skip = graph.nodes.data("skip", default=(False, False)) 1016 retry: _T.Dict[str, _T.List[_T.Any]] = { 1017 n: [c + 1, {}] for n, c in graph.nodes.data("retry", default=0) 1018 } 1019 1020 # Start execution of root node functions. 1021 with _futures.ThreadPoolExecutor(max_workers=max_workers) as executor: 1022 1023 def Submit( 1024 event: str, 1025 callable: _T.Callable[..., _T.Any], 1026 name: str, 1027 args: _T.Union[tuple[_T.Any, ...], list[_T.Any]], 1028 kwargs: _T.Dict[str, _T.Any] 1029 ) -> _futures.Future[_T.Any]: 1030 if skip[event][0]: 1031 retry[event][0] = 0 1032 skip_callable = raise_exc if skip[event][1] else print 1033 return executor.submit( 1034 _event_wrapper, 1035 _callable=skip_callable, 1036 _name=graph.nodes[event]['name'], 1037 _args=[f"Event {event} skipped..."], 1038 _kwargs={}, 1039 _verbose=verbose 1040 ) 1041 else: 1042 retry[event][0] -= 1 1043 if (retry[event][0] > 0) and (retry[event][1] == {}): 1044 retry[event][1] = { 1045 'callable': callable, 1046 'name': name, 1047 'args': args, 1048 'kwargs': kwargs 1049 } 1050 return executor.submit( 1051 _event_wrapper, 1052 _callable=callable, 1053 _name=name, 1054 _args=args, 1055 _kwargs=kwargs, 1056 _verbose=verbose 1057 ) 1058 1059 # Submit root nodes 1060 for event in roots: 1061 refs[event] = Submit( 1062 event, 1063 graph.nodes[event]['callable'], 1064 graph.nodes[event]['name'], 1065 graph.nodes[event]['args'], 1066 graph.nodes[event]['kwargs'] 1067 ) 1068 started.add(event) 1069 1070 # Process futures as they complete (no busy-wait) 1071 pending = set(refs.values()) 1072 future_to_event = {v: k for k, v in refs.items()} 1073 1074 while (len(complete) + len(failed) + len(skipped)) != len(graph): 1075 if not pending: 1076 break 1077 1078 # Wait for at least one future to complete 1079 done, pending = _futures.wait(pending, return_when=_futures.FIRST_COMPLETED) 1080 1081 for future in done: 1082 event = future_to_event[future] 1083 1084 if future.exception() is not None: 1085 try: 1086 raise future.exception() # type: ignore 1087 except: 1088 if print_exc: 1089 _traceback.print_exc() 1090 1091 if retry[event][0] > 0: 1092 if verbose: 1093 print(f"Event {event} exited with exception, retrying...") 1094 new_future = Submit( 1095 event, 1096 callable=retry[event][1]['callable'], 1097 name=retry[event][1]['name'], 1098 args=retry[event][1]['args'], 1099 kwargs=retry[event][1]['kwargs'] 1100 ) 1101 refs[event] = new_future 1102 pending.add(new_future) 1103 future_to_event[new_future] = event 1104 continue 1105 1106 if verbose: 1107 print(f"Event {event} exited with exception...") 1108 failed.add(event) 1109 skipping = [ 1110 e[1] for e in set(graph.out_edges(event)) 1111 if not _edges.continue_on_failure(graph.edges[e]) 1112 ] 1113 else: 1114 complete.add(event) 1115 skipping = [ 1116 e[1] for e in set(graph.out_edges(event)) 1117 if not _edges.continue_on_success(graph.edges[e]) 1118 ] 1119 1120 skipped = skipped.union(skipping) 1121 for n in skipping: 1122 skipped = skipped.union(_nx.descendants(graph, n)) 1123 1124 carryon = set(graph.successors(event)).difference(skipped) 1125 starting = [ 1126 successor for successor in carryon 1127 if run_conditions_met(successor) 1128 ] 1129 1130 for successor in starting: 1131 _args = [ 1132 a if not isinstance(a, _xcoms.Promise) else refs[_T.cast(str, a.key)].result() 1133 for a in graph.nodes[successor]['args'] 1134 ] 1135 _kwargs = { 1136 k: v if not isinstance(v, _xcoms.Promise) else refs[_T.cast(str, v.key)].result() 1137 for k, v in graph.nodes[successor]['kwargs'].items() 1138 } 1139 new_future = Submit( 1140 successor, 1141 graph.nodes[successor]['callable'], 1142 graph.nodes[successor]['name'], 1143 _args, 1144 _kwargs 1145 ) 1146 refs[successor] = new_future 1147 pending.add(new_future) 1148 future_to_event[new_future] = successor 1149 started.add(successor)
Base class used to execute DAG in embarrassingly parallel.
976 def __init__( 977 self, 978 graph: _nx.DiGraph, 979 roots: _T.List[str], 980 print_exc: bool = False, 981 max_workers: _T.Optional[int] = None, 982 verbose: bool = True 983 ) -> None: 984 985 # Skip execution if there are no callables in scope. 986 if len(graph.nodes) == 0: 987 if verbose: 988 print('Given DAG has no callables in scope. Skipping execution...') 989 return 990 991 # Initialize local variables for execution. 992 complete: set[str] = set() 993 started: set[str] = set() 994 failed: set[str] = set() 995 skipped: set[str] = set() 996 refs: _T.Dict[str, _futures.Future[_T.Any]] = {} 997 998 def raise_exc(message: str) -> None: 999 raise ValueError(message) 1000 1001 def run_conditions_met(n: str) -> bool: 1002 for p in graph.predecessors(n): 1003 match graph.edges[(p, n)]['Condition']: 1004 case Condition.Success: 1005 if p not in complete: 1006 return False 1007 case Condition.Failure: 1008 if p not in failed: 1009 return False 1010 case Condition.AnyCase: 1011 if p not in failed and p not in complete: 1012 return False 1013 return True 1014 1015 skip = graph.nodes.data("skip", default=(False, False)) 1016 retry: _T.Dict[str, _T.List[_T.Any]] = { 1017 n: [c + 1, {}] for n, c in graph.nodes.data("retry", default=0) 1018 } 1019 1020 # Start execution of root node functions. 1021 with _futures.ThreadPoolExecutor(max_workers=max_workers) as executor: 1022 1023 def Submit( 1024 event: str, 1025 callable: _T.Callable[..., _T.Any], 1026 name: str, 1027 args: _T.Union[tuple[_T.Any, ...], list[_T.Any]], 1028 kwargs: _T.Dict[str, _T.Any] 1029 ) -> _futures.Future[_T.Any]: 1030 if skip[event][0]: 1031 retry[event][0] = 0 1032 skip_callable = raise_exc if skip[event][1] else print 1033 return executor.submit( 1034 _event_wrapper, 1035 _callable=skip_callable, 1036 _name=graph.nodes[event]['name'], 1037 _args=[f"Event {event} skipped..."], 1038 _kwargs={}, 1039 _verbose=verbose 1040 ) 1041 else: 1042 retry[event][0] -= 1 1043 if (retry[event][0] > 0) and (retry[event][1] == {}): 1044 retry[event][1] = { 1045 'callable': callable, 1046 'name': name, 1047 'args': args, 1048 'kwargs': kwargs 1049 } 1050 return executor.submit( 1051 _event_wrapper, 1052 _callable=callable, 1053 _name=name, 1054 _args=args, 1055 _kwargs=kwargs, 1056 _verbose=verbose 1057 ) 1058 1059 # Submit root nodes 1060 for event in roots: 1061 refs[event] = Submit( 1062 event, 1063 graph.nodes[event]['callable'], 1064 graph.nodes[event]['name'], 1065 graph.nodes[event]['args'], 1066 graph.nodes[event]['kwargs'] 1067 ) 1068 started.add(event) 1069 1070 # Process futures as they complete (no busy-wait) 1071 pending = set(refs.values()) 1072 future_to_event = {v: k for k, v in refs.items()} 1073 1074 while (len(complete) + len(failed) + len(skipped)) != len(graph): 1075 if not pending: 1076 break 1077 1078 # Wait for at least one future to complete 1079 done, pending = _futures.wait(pending, return_when=_futures.FIRST_COMPLETED) 1080 1081 for future in done: 1082 event = future_to_event[future] 1083 1084 if future.exception() is not None: 1085 try: 1086 raise future.exception() # type: ignore 1087 except: 1088 if print_exc: 1089 _traceback.print_exc() 1090 1091 if retry[event][0] > 0: 1092 if verbose: 1093 print(f"Event {event} exited with exception, retrying...") 1094 new_future = Submit( 1095 event, 1096 callable=retry[event][1]['callable'], 1097 name=retry[event][1]['name'], 1098 args=retry[event][1]['args'], 1099 kwargs=retry[event][1]['kwargs'] 1100 ) 1101 refs[event] = new_future 1102 pending.add(new_future) 1103 future_to_event[new_future] = event 1104 continue 1105 1106 if verbose: 1107 print(f"Event {event} exited with exception...") 1108 failed.add(event) 1109 skipping = [ 1110 e[1] for e in set(graph.out_edges(event)) 1111 if not _edges.continue_on_failure(graph.edges[e]) 1112 ] 1113 else: 1114 complete.add(event) 1115 skipping = [ 1116 e[1] for e in set(graph.out_edges(event)) 1117 if not _edges.continue_on_success(graph.edges[e]) 1118 ] 1119 1120 skipped = skipped.union(skipping) 1121 for n in skipping: 1122 skipped = skipped.union(_nx.descendants(graph, n)) 1123 1124 carryon = set(graph.successors(event)).difference(skipped) 1125 starting = [ 1126 successor for successor in carryon 1127 if run_conditions_met(successor) 1128 ] 1129 1130 for successor in starting: 1131 _args = [ 1132 a if not isinstance(a, _xcoms.Promise) else refs[_T.cast(str, a.key)].result() 1133 for a in graph.nodes[successor]['args'] 1134 ] 1135 _kwargs = { 1136 k: v if not isinstance(v, _xcoms.Promise) else refs[_T.cast(str, v.key)].result() 1137 for k, v in graph.nodes[successor]['kwargs'].items() 1138 } 1139 new_future = Submit( 1140 successor, 1141 graph.nodes[successor]['callable'], 1142 graph.nodes[successor]['name'], 1143 _args, 1144 _kwargs 1145 ) 1146 refs[successor] = new_future 1147 pending.add(new_future) 1148 future_to_event[new_future] = successor 1149 started.add(successor)
12class Condition(_enum.IntEnum): 13 """Represents edge condition for a node execution, based on outcome(s) of predecessor(s).""" 14 Success = 0 15 Failure = 1 16 AnyCase = 3
Represents edge condition for a node execution, based on outcome(s) of predecessor(s).
1151class QueueDAG(DAG): 1152 """ 1153 Directed Acyclic Graph used to queue and execute Pythonic callables in parallel, 1154 while stringing the outputs of those callables in linear sequences. 1155 1156 Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the 1157 behavior of root nodes to determine if and/or when the DAG run has been completed. 1158 1159 Root nodes will be re-executed until culled by one of two conditions: 1160 1. A max number of iterations has been completed, or 1161 2. The output of the root node's callable matches a lambda halt_condition. 1162 1163 Both conditions can be set at run-time. 1164 1165 QueueDAG can be nested inside a standard webber.DAG by passing qdag.execute as a callable. 1166 Benchmarks show moderate overhead of 1.3-2.5x (~1-2ms fixed cost) when nested, due to the 1167 outer DAG's ThreadPoolExecutor setup and task wrapper infrastructure. This is well within 1168 acceptable millisecond-scale latency for most real-time applications. 1169 """ 1170 1171 conditions: _T.Dict[str, _T.Dict[str, _T.Any]] = {} 1172 1173 def __init__(self) -> None: 1174 super().__init__() 1175 1176 def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str: 1177 """ 1178 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 1179 On success, return unique identifier for the new node. 1180 1181 Reserved key-words are used for Queue DAG definitions: 1182 1183 - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables. 1184 1185 \t halt_condition = (lambda output: output == None) 1186 1187 - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument. 1188 1189 - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop. 1190 """ 1191 halt_condition = kwargs.pop('halt_condition', None) 1192 iterator: int = kwargs.pop('iterator', None) 1193 max_iter: int = kwargs.pop('max_iter', None) 1194 1195 return_val = super().add_node(node, *args, **kwargs) 1196 1197 if max_iter is not None: 1198 iter_limit = int(max_iter) 1199 elif iterator is not None: 1200 iter_limit = int(iterator) 1201 else: 1202 iter_limit = None 1203 1204 self.conditions[return_val] = { 1205 'halt_condition': halt_condition, 1206 'iter_limit': iter_limit 1207 } 1208 1209 return return_val 1210 1211 def add_edge(self, u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable], continue_on: Condition = Condition.Success) -> _T.Tuple[str, str]: 1212 """ 1213 Adds an edge between nodes in the Queue DAG's underlying graph. 1214 Queue DAG nodes may have a maximum of one child and one parent worker. 1215 """ 1216 for node in (u_of_edge, v_of_edge): 1217 try: 1218 node_id = self.node_id(node) 1219 except: 1220 continue 1221 1222 filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id) 1223 try: 1224 assert(len(self.filter_edges(filter)) == 0) 1225 except Exception as e: 1226 e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.") 1227 raise e 1228 1229 return super().add_edge(u_of_edge, v_of_edge, continue_on) 1230 1231 def execute(self, *promises: _T.Any, return_ref: bool = False, print_exc: bool = False) -> _T.List[_T.Any] | None: 1232 """ 1233 Basic wrapper for execution of the DAG's underlying callables. 1234 """ 1235 queues: _T.Dict[str, _q.LifoQueue[_T.Any]] = {} 1236 processes: _T.Dict[str, _futures.Future[_T.Any]] = {} 1237 join: set[str] = set() 1238 end_proc: _T.Optional[str] = None 1239 1240 _promises: _T.Dict[str, _T.Any] = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {} 1241 1242 with _TaskLogger("root") as _: 1243 1244 # Skip execution if there are no callables in scope. 1245 if len(self.graph.nodes) == 0: 1246 print('Given DAG has no callables in scope. Skipping execution...') 1247 return 1248 1249 with _futures.ThreadPoolExecutor() as executor: 1250 1251 for id in self.root: 1252 node = self.get_node(id) 1253 queues[id] = _q.LifoQueue() 1254 node.update({ 1255 'callable': _queue._worker, 1256 'args': tuple(), 1257 'kwargs': { 1258 'work': node.callable, 1259 'args': node.args, 'kwargs': node.kwargs, 1260 'promises': _promises, 1261 'print_exc': print_exc, 1262 'halt_condition': self.conditions[id]['halt_condition'], 1263 'iter_limit': self.conditions[id]['iter_limit'], 1264 'out_queue': queues.get(id) 1265 } 1266 }) 1267 processes[id] = executor.submit( 1268 _event_wrapper, 1269 _callable=node['callable'], 1270 _name=node['name'], 1271 _args=node['args'], 1272 _kwargs=node['kwargs'] 1273 ) 1274 1275 for parent_id, id in self.graph.edges: 1276 node = self.get_node(id) 1277 queues[id] = _q.LifoQueue() 1278 if len(list(self.graph.successors(id))) == 0: 1279 end_proc = id 1280 node.update({ 1281 'callable': _queue._worker, 1282 'args': tuple(), 1283 'kwargs': { 1284 'work': node.callable, 1285 'args': node.args, 'kwargs': node.kwargs, 1286 'promises': _promises, 1287 'print_exc': print_exc, 1288 'parent_id': parent_id, 1289 'parent_process': processes[parent_id], 1290 'in_queue': queues.get(parent_id), 1291 'out_queue': queues.get(id) 1292 } 1293 }) 1294 processes[id] = executor.submit( 1295 _event_wrapper, 1296 _callable=node['callable'], 1297 _name=node['name'], 1298 _args=node['args'], 1299 _kwargs=node['kwargs'] 1300 ) 1301 1302 # For single-node DAGs with no edges, end_proc is the root node 1303 if end_proc is None and len(self.root) > 0: 1304 end_proc = self.root[0] 1305 1306 while len(join) != len(self.graph.nodes): 1307 for node in self.graph.nodes: 1308 if processes[node].done(): 1309 join.add(node) 1310 1311 return_val: _T.List[_T.Any] = [] 1312 if end_proc is not None and end_proc in queues: 1313 while not queues[end_proc].empty(): 1314 return_val.append(queues[end_proc].get()) 1315 1316 return return_val
Directed Acyclic Graph used to queue and execute Pythonic callables in parallel, while stringing the outputs of those callables in linear sequences.
Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the behavior of root nodes to determine if and/or when the DAG run has been completed.
Root nodes will be re-executed until culled by one of two conditions:
- A max number of iterations has been completed, or
- The output of the root node's callable matches a lambda halt_condition.
Both conditions can be set at run-time.
QueueDAG can be nested inside a standard webber.DAG by passing qdag.execute as a callable. Benchmarks show moderate overhead of 1.3-2.5x (~1-2ms fixed cost) when nested, due to the outer DAG's ThreadPoolExecutor setup and task wrapper infrastructure. This is well within acceptable millisecond-scale latency for most real-time applications.
1176 def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str: 1177 """ 1178 Adds a callable with positional and keyword arguments to the DAG's underlying graph. 1179 On success, return unique identifier for the new node. 1180 1181 Reserved key-words are used for Queue DAG definitions: 1182 1183 - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables. 1184 1185 \t halt_condition = (lambda output: output == None) 1186 1187 - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument. 1188 1189 - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop. 1190 """ 1191 halt_condition = kwargs.pop('halt_condition', None) 1192 iterator: int = kwargs.pop('iterator', None) 1193 max_iter: int = kwargs.pop('max_iter', None) 1194 1195 return_val = super().add_node(node, *args, **kwargs) 1196 1197 if max_iter is not None: 1198 iter_limit = int(max_iter) 1199 elif iterator is not None: 1200 iter_limit = int(iterator) 1201 else: 1202 iter_limit = None 1203 1204 self.conditions[return_val] = { 1205 'halt_condition': halt_condition, 1206 'iter_limit': iter_limit 1207 } 1208 1209 return return_val
Adds a callable with positional and keyword arguments to the DAG's underlying graph. On success, return unique identifier for the new node.
Reserved key-words are used for Queue DAG definitions:
halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables.
halt_condition = (lambda output: output == None)iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument.
max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop.
1211 def add_edge(self, u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable], continue_on: Condition = Condition.Success) -> _T.Tuple[str, str]: 1212 """ 1213 Adds an edge between nodes in the Queue DAG's underlying graph. 1214 Queue DAG nodes may have a maximum of one child and one parent worker. 1215 """ 1216 for node in (u_of_edge, v_of_edge): 1217 try: 1218 node_id = self.node_id(node) 1219 except: 1220 continue 1221 1222 filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id) 1223 try: 1224 assert(len(self.filter_edges(filter)) == 0) 1225 except Exception as e: 1226 e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.") 1227 raise e 1228 1229 return super().add_edge(u_of_edge, v_of_edge, continue_on)
Adds an edge between nodes in the Queue DAG's underlying graph. Queue DAG nodes may have a maximum of one child and one parent worker.
1231 def execute(self, *promises: _T.Any, return_ref: bool = False, print_exc: bool = False) -> _T.List[_T.Any] | None: 1232 """ 1233 Basic wrapper for execution of the DAG's underlying callables. 1234 """ 1235 queues: _T.Dict[str, _q.LifoQueue[_T.Any]] = {} 1236 processes: _T.Dict[str, _futures.Future[_T.Any]] = {} 1237 join: set[str] = set() 1238 end_proc: _T.Optional[str] = None 1239 1240 _promises: _T.Dict[str, _T.Any] = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {} 1241 1242 with _TaskLogger("root") as _: 1243 1244 # Skip execution if there are no callables in scope. 1245 if len(self.graph.nodes) == 0: 1246 print('Given DAG has no callables in scope. Skipping execution...') 1247 return 1248 1249 with _futures.ThreadPoolExecutor() as executor: 1250 1251 for id in self.root: 1252 node = self.get_node(id) 1253 queues[id] = _q.LifoQueue() 1254 node.update({ 1255 'callable': _queue._worker, 1256 'args': tuple(), 1257 'kwargs': { 1258 'work': node.callable, 1259 'args': node.args, 'kwargs': node.kwargs, 1260 'promises': _promises, 1261 'print_exc': print_exc, 1262 'halt_condition': self.conditions[id]['halt_condition'], 1263 'iter_limit': self.conditions[id]['iter_limit'], 1264 'out_queue': queues.get(id) 1265 } 1266 }) 1267 processes[id] = executor.submit( 1268 _event_wrapper, 1269 _callable=node['callable'], 1270 _name=node['name'], 1271 _args=node['args'], 1272 _kwargs=node['kwargs'] 1273 ) 1274 1275 for parent_id, id in self.graph.edges: 1276 node = self.get_node(id) 1277 queues[id] = _q.LifoQueue() 1278 if len(list(self.graph.successors(id))) == 0: 1279 end_proc = id 1280 node.update({ 1281 'callable': _queue._worker, 1282 'args': tuple(), 1283 'kwargs': { 1284 'work': node.callable, 1285 'args': node.args, 'kwargs': node.kwargs, 1286 'promises': _promises, 1287 'print_exc': print_exc, 1288 'parent_id': parent_id, 1289 'parent_process': processes[parent_id], 1290 'in_queue': queues.get(parent_id), 1291 'out_queue': queues.get(id) 1292 } 1293 }) 1294 processes[id] = executor.submit( 1295 _event_wrapper, 1296 _callable=node['callable'], 1297 _name=node['name'], 1298 _args=node['args'], 1299 _kwargs=node['kwargs'] 1300 ) 1301 1302 # For single-node DAGs with no edges, end_proc is the root node 1303 if end_proc is None and len(self.root) > 0: 1304 end_proc = self.root[0] 1305 1306 while len(join) != len(self.graph.nodes): 1307 for node in self.graph.nodes: 1308 if processes[node].done(): 1309 join.add(node) 1310 1311 return_val: _T.List[_T.Any] = [] 1312 if end_proc is not None and end_proc in queues: 1313 while not queues[end_proc].empty(): 1314 return_val.append(queues[end_proc].get()) 1315 1316 return return_val
Basic wrapper for execution of the DAG's underlying callables.