webber.core

Base module for abstract multiprocessing system - a directed acyclic graph.

   1"""
   2Base module for abstract multiprocessing system - a directed acyclic graph.
   3"""
   4import sys as _sys
   5import typing as _T
   6import types as _types
   7import traceback as _traceback
   8import collections.abc as _abc
   9import concurrent.futures as _futures
  10import queue as _q
  11import itertools as _it
  12import threading as _threading
  13from datetime import datetime as _datetime
  14
  15import networkx as _nx
  16import webber.edges as _edges
  17import webber.xcoms as _xcoms
  18import webber.queue as _queue
  19
  20from webber.edges import Condition, dotdict, edgedict
  21
  22__all__ = ["DAG", "Condition", "QueueDAG"]
  23
  24def _iscallable(function: _T.Any) -> bool:
  25    return callable(function)
  26
  27
  28class _OutputLogger:
  29    """
  30    Thread-safe logger that uses a queue to serialize stdout writes.
  31    All threads put messages into a queue, a single consumer writes to stdout.
  32    This avoids race conditions from concurrent redirect_stdout calls.
  33    """
  34    _instance: _T.Optional['_OutputLogger'] = None
  35    _lock = _threading.Lock()
  36
  37    def __new__(cls, *args: _T.Any, **kwargs: _T.Any) -> '_OutputLogger':
  38        """Singleton pattern - one logger for all threads."""
  39        if cls._instance is None:
  40            with cls._lock:
  41                if cls._instance is None:
  42                    cls._instance = super().__new__(cls)
  43                    cls._instance._initialized = False
  44        return cls._instance
  45
  46    def __init__(self, file: _T.TextIO = _sys.stdout) -> None:
  47        if self._initialized:
  48            return
  49
  50        self._queue: _q.Queue[_T.Optional[str]] = _q.Queue()
  51        self._file = file
  52        self._running = True
  53
  54        # Start consumer thread
  55        self._consumer = _threading.Thread(target=self._consume, daemon=True)
  56        self._consumer.start()
  57        self._initialized = True
  58
  59    def _consume(self) -> None:
  60        """Consumer thread - reads from queue and writes to stdout."""
  61        while self._running or not self._queue.empty():
  62            try:
  63                record = self._queue.get(timeout=0.1)
  64                if record is None:  # Shutdown signal
  65                    self._queue.task_done()
  66                    break
  67                self._file.write(record + "\n")
  68                self._file.flush()
  69                self._queue.task_done()
  70            except _q.Empty:
  71                continue
  72
  73    def log(self, name: str, message: str) -> None:
  74        """Thread-safe log - puts message on queue."""
  75        if message and not message.isspace():
  76            timestamp = _datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
  77            formatted = f"{timestamp} {name:>15}: {message}"
  78            self._queue.put(formatted)
  79
  80    def shutdown(self) -> None:
  81        """Wait for queue to drain and stop consumer."""
  82        self._queue.join()
  83        self._running = False
  84        self._queue.put(None)
  85        self._consumer.join(timeout=1.0)
  86
  87    @classmethod
  88    def reset(cls) -> None:
  89        """Reset singleton instance (useful for testing)."""
  90        with cls._lock:
  91            if cls._instance is not None:
  92                cls._instance.shutdown()
  93                cls._instance = None
  94
  95
  96class _PrefixedStdout:
  97    """
  98    Stdout wrapper that prefixes each line with timestamp and task name.
  99    Thread-safe via thread-local task name storage.
 100    """
 101    _local = _threading.local()
 102
 103    def __init__(self, original_stdout: _T.TextIO) -> None:
 104        self._original = original_stdout
 105        self._lock = _threading.Lock()
 106
 107    @classmethod
 108    def set_task_name(cls, name: str | None) -> None:
 109        """Set the task name for the current thread."""
 110        cls._local.task_name = name
 111
 112    @classmethod
 113    def get_task_name(cls) -> str | None:
 114        """Get the task name for the current thread."""
 115        return getattr(cls._local, 'task_name', None)
 116
 117    def write(self, msg: str) -> int:
 118        """Write with timestamp/task prefix if task is set."""
 119        task_name = self.get_task_name()
 120        if task_name:
 121            # When in a task context, suppress standalone newlines/whitespace
 122            # (print() sends content and newline as separate write calls)
 123            if not msg or not msg.strip():
 124                return len(msg) if msg else 0
 125            timestamp = _datetime.now().strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
 126            # Handle multi-line messages
 127            lines = [line for line in msg.rstrip('\n').split('\n') if line.strip()]
 128            if lines:
 129                formatted_lines = [f"{timestamp} {task_name:>15}: {line}" for line in lines]
 130                formatted = '\n'.join(formatted_lines) + '\n'
 131                with self._lock:
 132                    return self._original.write(formatted)
 133            return len(msg)
 134        else:
 135            with self._lock:
 136                return self._original.write(msg)
 137
 138    def flush(self) -> None:
 139        self._original.flush()
 140
 141    def __getattr__(self, name: str) -> _T.Any:
 142        return getattr(self._original, name)
 143
 144
 145# Install prefixed stdout globally (once)
 146_original_stdout = _sys.stdout
 147_prefixed_stdout = _PrefixedStdout(_original_stdout)
 148
 149
 150class _TaskLogger:
 151    """
 152    Per-task logger context manager that sets up stdout prefixing.
 153    Used as a context manager for each task execution.
 154    """
 155    def __init__(self, task_name: str) -> None:
 156        self._name = task_name
 157        self._prev_name: str | None = None
 158
 159    def __enter__(self) -> '_TaskLogger':
 160        self._prev_name = _PrefixedStdout.get_task_name()
 161        _PrefixedStdout.set_task_name(self._name)
 162        _sys.stdout = _prefixed_stdout
 163        return self
 164
 165    def __exit__(
 166        self,
 167        exc_type: type[BaseException] | None,
 168        exc_value: BaseException | None,
 169        traceback: _types.TracebackType | None
 170    ) -> bool | None:
 171        _PrefixedStdout.set_task_name(self._prev_name)
 172        if self._prev_name is None:
 173            _sys.stdout = _original_stdout
 174        return None
 175
 176
 177def _event_wrapper(
 178    _callable: _T.Callable,
 179    _name: str,
 180    _args: _T.Union[tuple, list],
 181    _kwargs: _T.Dict[str, _T.Any],
 182    _verbose: bool = True
 183) -> _T.Any:
 184    """Wrapper used by Webber DAGs to log and execute Python callables as a unit of work."""
 185    if _verbose:
 186        with _TaskLogger(_name):
 187            # print(f"Starting {_name}", flush=True)
 188            result = _callable(*_args, **_kwargs)
 189            # print(f"Completed {_name}", flush=True)
 190            return result
 191    else:
 192        return _callable(*_args, **_kwargs)
 193
 194class DAG:
 195    """
 196    Directed Acyclic Graph used to represent Pythonic tasks in parallel.
 197    """
 198    graph: _nx.DiGraph
 199    _callable_to_id: _T.Dict[_T.Callable, _T.Optional[str]]  # O(1) callable->node_id lookup cache (None = duplicates)
 200
 201    def _callable_status(self, c: _T.Callable) -> _T.Literal['none', 'one', 'many']:
 202        """Returns 'none' if not in DAG, 'one' if unique, 'many' if duplicated.
 203        O(1) lookup using _callable_to_id cache."""
 204        if c not in self._callable_to_id:
 205            return 'none'
 206        return 'one' if self._callable_to_id[c] is not None else 'many'
 207
 208    def _assign_node(self, n: _T.Any, new_callables: _T.Dict[_T.Callable, _T.Callable]) -> str:
 209        """Helper to resolve node to its ID, adding to new_callables dict if needed."""
 210        if not _iscallable(n):
 211            return n
 212        status = self._callable_status(n)
 213        if status == 'many':
 214            err_msg = f"Callable {n.__name__} " \
 215                + "exists more than once in this DAG. " \
 216                + "Use the unique string identifier of the required node."
 217            raise ValueError(err_msg)
 218        if status == 'one':
 219            return self._callable_to_id[n]  # type: ignore[return-value]
 220        new_callables[n] = n
 221        return _edges.label_node(n)
 222
 223    def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str:
 224        """
 225        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
 226        On success, return unique identifier for the new node.
 227        """
 228        if not _iscallable(node):
 229            err_msg = f"{node}: requested node is not a callable Python function."
 230            raise TypeError(err_msg)
 231
 232        node_name = _edges.label_node(node)
 233
 234        args = tuple(
 235            arg if not isinstance(arg, _xcoms.Promise) else self.resolve_promise(arg)
 236            for arg in args
 237        )
 238
 239        for k, val in kwargs.items():
 240            if isinstance(val, _xcoms.Promise):
 241                kwargs[k] = self.resolve_promise(val)
 242
 243        self.graph.add_node(
 244            node_for_adding=node_name,
 245            callable=node, args=args, kwargs=kwargs,
 246            name=node.__name__,
 247            id=node_name
 248        )
 249
 250        # Populate callable->id cache for O(1) lookup
 251        # If callable already exists, mark as None to indicate duplicates
 252        if node in self._callable_to_id:
 253            self._callable_to_id[node] = None  # Multiple occurrences - must use string ID
 254        else:
 255            self._callable_to_id[node] = node_name
 256
 257        return node_name
 258
 259
 260    def add_edge(
 261            self, 
 262            u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable],
 263            continue_on: Condition = Condition.Success
 264        ) -> _T.Tuple[str,str]:
 265        """
 266        Adds an edge between nodes in the DAG's underlying graph,
 267        so long as the requested edge is unique and has not been added previously.
 268
 269        On success, returns Tuple of the new edge's unique identifiers.
 270        """
 271        # Validate inputs prior to execution
 272        # - Nodes must be identifiers or callables
 273        # - Conditions must belong to the webber.edges.Condition class
 274        if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)):
 275            err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable"
 276            raise TypeError(err_msg)
 277        if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)):
 278            err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable"
 279            raise TypeError(err_msg)
 280        if not isinstance(continue_on, Condition):
 281            raise TypeError("Edge conditions must use the webber.edges.Condition class.")
 282
 283        # Base Case 0: No nodes are present in the DAG:
 284        # Ensure that both nodes are callables, then add both to the graph and
 285        # assign the outgoing node as a root.
 286        if len(self.graph.nodes()) == 0:
 287            if not _iscallable(u_of_edge):
 288                err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope."
 289                raise ValueError(err_msg)
 290            if not _iscallable(v_of_edge):
 291                err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope."
 292                raise ValueError(err_msg)
 293            outgoing_node = self.add_node(u_of_edge)
 294            incoming_node = self.add_node(v_of_edge)
 295            self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
 296            return (outgoing_node, incoming_node)
 297
 298        new_callables: _T.Dict[_T.Callable, _T.Callable] = {}
 299
 300        if _iscallable(u_of_edge) and _iscallable(v_of_edge):
 301            # Type narrowing: we've verified both are callables
 302            u_callable = _T.cast(_T.Callable, u_of_edge)
 303            v_callable = _T.cast(_T.Callable, v_of_edge)
 304
 305            # Error Cases 0, 1: Either of the callables appear more than once in the DAG.
 306            if self._callable_status(u_callable) == 'many':
 307                err_msg = f"Callable {u_callable.__name__} " \
 308                        + "exists more than once in this DAG. " \
 309                        + "Use the unique string identifier of the required node."
 310                raise ValueError(err_msg)
 311            if self._callable_status(v_callable) == 'many':
 312                err_msg = f"Callable {v_callable.__name__} " \
 313                        + "exists more than once in this DAG. " \
 314                        + "Use the unique string identifier of the required node."
 315                raise ValueError(err_msg)
 316
 317            # Base Case 1: Both args are callables and will be present in the DAG scope no more than once.
 318            # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes.
 319            cached_u = self._callable_to_id.get(u_callable)
 320            if cached_u is not None:
 321                outgoing_node = cached_u
 322            else:
 323                new_callables[u_callable] = u_callable
 324                outgoing_node = _edges.label_node(u_callable)
 325
 326            cached_v = self._callable_to_id.get(v_callable)
 327            if cached_v is not None:
 328                incoming_node = cached_v
 329            else:
 330                new_callables[v_callable] = v_callable
 331                incoming_node = _edges.label_node(v_callable)
 332
 333        else:
 334
 335            # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope.
 336            if isinstance(u_of_edge, str) and u_of_edge not in self.graph.nodes:
 337                err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope."
 338                raise ValueError(err_msg)
 339            if isinstance(v_of_edge, str) and v_of_edge not in self.graph.nodes:
 340                err_msg = f"Incoming node {v_of_edge} not in DAG's current scope."
 341                raise ValueError(err_msg)
 342
 343            # Both nodes' unique identifiers are present in the DAG
 344            # and should be evaluated for a valid edge.
 345            if isinstance(u_of_edge, str) and isinstance(v_of_edge, str):
 346                outgoing_node = u_of_edge
 347                incoming_node = v_of_edge
 348
 349            # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier.
 350            else:
 351                for node in (u_of_edge, v_of_edge):
 352                    if node == u_of_edge:
 353                        outgoing_node = self._assign_node(node, new_callables)
 354                    else:
 355                        incoming_node = self._assign_node(node, new_callables)
 356
 357        # Error Case 5: Both callables exist only once in the DAG,
 358        # but an edge already exists between them. O(1) lookup with has_edge().
 359        if self.graph.has_edge(outgoing_node, incoming_node):
 360            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \
 361                    + "a definition in this DAG."
 362            raise ValueError(err_msg)
 363
 364        # Error Case 6: Adding an edge would create circular dependencies.
 365        # Use has_path() for O(V+E) incremental check instead of rebuilding entire graph.
 366        if incoming_node in self.graph and _nx.has_path(self.graph, incoming_node, outgoing_node):
 367            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \
 368                    + "results in circular dependencies."
 369            raise ValueError(err_msg)
 370
 371        # We can now add the edge to the DAG, since we are certain it will not result in
 372        # illegal dependencies/behavior.
 373        # First, we should account for potential new nodes. This also handles
 374        # duplicates on first entry to the DAG (e.g.: edge == (print, print))
 375        if _iscallable(u_of_edge) and new_callables.get(_T.cast(_T.Callable, u_of_edge)) is not None:
 376            outgoing_node = self.add_node(new_callables[_T.cast(_T.Callable, u_of_edge)])
 377        if _iscallable(v_of_edge) and new_callables.get(_T.cast(_T.Callable, v_of_edge)) is not None:
 378            incoming_node = self.add_node(new_callables[_T.cast(_T.Callable, v_of_edge)])
 379
 380        # Then we can add the new edge.
 381        self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
 382        return (outgoing_node, incoming_node)
 383    
 384    def remove_node(self, *posargs, **kwargs) -> None:
 385        """
 386        Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG.
 387        Throws error message and recommends safer methods.
 388        """
 389        raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.")
 390
 391    def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]:
 392        """
 393        Removes an directed edge between nodes in the DAG's underlying graph.
 394        Throws error if the edge does not exist.
 395        On success, returns Tuple of the removed edge's unique identifiers.
 396        """
 397        edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge))
 398        if edge_id not in self.graph.edges(data = False):
 399            err_msg = "Requested edge does not exist in the DAG's scope"
 400            raise ValueError(err_msg)
 401        self.graph.remove_edge(edge_id[0], edge_id[1])
 402        return edge_id
 403
 404    def update_edges(self, *E: _T.Any, continue_on: Condition | None = None, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | edgedict | None = None) -> None:
 405        """
 406        Flexible function to update properties of edges in the DAG's scope, 
 407        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
 408        lambda filter using the edgedict syntax.
 409        
 410        List of nodes to update or filter argument is expected. Valid edge lists include:
 411
 412        \t update_edges((node1, node2), ...)       or update_edges([(node1, node2)], ...)
 413
 414        \t update_edges((node1, callable2), ...)   or update_edges([(node1, callable2)], ...)
 415        
 416        \t update_edges(edgedict),                 where isinstance(edgedict, webber.edges.edgedict) == True
 417
 418        Parameters:
 419
 420        > continue_on: webber.edges.Condition value to update matching edges with, 
 421        
 422        \t Execute child on success, failure, or any exit state, of the parent node
 423
 424        > filter: lambda property that can be used instead of a list of edges to be updated.
 425        
 426        \t filter = (lambda e: n.parent == print or e.child == print)
 427
 428        > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set.
 429
 430        \t data = { 'continue_on': webber.edges.Condition }
 431
 432        """
 433        if len(E) == 0 and filter == None:
 434            raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.")
 435
 436        elif isinstance(E, dict) or isinstance(E, edgedict):
 437            E = [E]
 438
 439        elif len(E) == 1 and isinstance(E[0], _abc.Iterable):
 440            try:
 441                _ = self.get_edges(E[0])
 442            except:
 443                E = E[0]
 444
 445        if filter is not None:
 446            edge_ids = self.filter_edges(filter, data = False)
 447        else:
 448            if isinstance(E[0], dict) or isinstance(E[0], edgedict):
 449                try:
 450                    ids = [e['id'] for e in E]
 451                except KeyError:
 452                    err_msg = 'In dictionary form, all given edges must be follow edgedict standards.'
 453                    raise ValueError(err_msg)
 454            else:
 455                ids = E
 456            edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids]
 457
 458        std_update = (continue_on is None)
 459
 460        if std_update:
 461            for edge_id, e in zip(edge_ids, E):
 462                if data is not None:
 463                    self._update_edge(data, id = edge_id)
 464                else:
 465                    self._update_edge(e, id = edge_id)
 466        
 467        else:
 468            if continue_on is not None:
 469                if not isinstance(continue_on, Condition):
 470                    err_msg = f"Condition assignment must use webber.edges.Condition"
 471                    raise TypeError(err_msg)
 472                for e in edge_ids:
 473                    self.graph.edges[e]['Condition'] = continue_on
 474
 475    def _update_edge(self, edgedict: _T.Dict[str, _T.Any], id: _T.Tuple[str, str] | None = None, force: bool = False) -> None:
 476        """
 477        Internal only. Update properties of an individual edge within a DAG's scope, 
 478        given a well-structured dictionary and the tuple identifier of the network edge. 
 479        Force argument bypasses validation, and should only be used internally.
 480        """
 481        if id is not None:
 482            try:
 483                if edgedict.get('id') and id != edgedict['id']:
 484                    raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {edgedict['id']}")
 485            except ValueError as e:
 486                if not force:
 487                    raise e
 488            edgedict['id'] = id
 489
 490        expected_keys = ('parent', 'child', 'id', 'continue_on')
 491        if not set(expected_keys).issuperset(set(edgedict.keys())):
 492            raise ValueError(f"Expecting keys: {expected_keys}")
 493
 494        if not force:
 495            
 496            e1, e2 = None, None
 497
 498            if edgedict.get('id'):
 499                e1 = self.get_edge(edgedict['id'], data = False)
 500
 501            if edgedict.get('parent') and edgedict.get('child'):
 502                e2 = self.get_edge(edgedict['parent'], edgedict['child'], data = False)
 503            else:
 504                e2 = e1
 505
 506            if e1 != e2:
 507                raise ValueError('Edge vertices should not be changed using update functions.')
 508            
 509            elif e1 == None:
 510                raise ValueError('Requested edge was not given an identifier.')
 511
 512            if edgedict.get('continue_on') and not isinstance(edgedict['continue_on'], Condition):
 513                err_msg = f"Condition assignment must use webber.edges.Condition"
 514                raise TypeError(err_msg)
 515                       
 516        edge_id = edgedict.pop('id')
 517        edge = {k: v for k,v in edgedict.items() if k not in ('parent', 'child', 'id')}
 518        self.graph.edges[(edge_id[0], edge_id[1])].update(edge)
 519
 520    def relabel_node(self, node: _T.Union[str, _T.Callable], label: str) -> str:
 521        """
 522        Update the label, or name, given to a node in the DAG's scope, 
 523        given a Python string and a node identifier. 
 524        Well-structured wrapper for a common use-case of DAG.update_nodes.
 525        """
 526        node_id = self.node_id(node)
 527        if not isinstance(label, str) or len(label) == 0:
 528            err_msg = "Node label must be a Python string with one or more characters."
 529            raise ValueError(err_msg)
 530        self.update_nodes(node_id, data = {'name': label})
 531        return label
 532
 533    def update_nodes(self, *N: _T.Any, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | dotdict | None = None, callable: _T.Callable | None = None, args: _T.Iterable[_T.Any] | None = None, kwargs: _T.Dict[str, _T.Any] | None = None) -> None:
 534        """
 535        Flexible function to update properties of nodes in the DAG's scope, 
 536        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
 537        lambda filter using the dotdict syntax.
 538        
 539        List of nodes to update or filter argument is expected. Valid node lists include:
 540
 541        \t update_nodes(node_id, ...)           or update_nodes([node_id], ...)
 542
 543        \t update_nodes(callable, ...)          or update_nodes([callable], ...)
 544        
 545        \t update_nodes(node1, node2, ...)      or update_nodes([node1, node2], ...)
 546        
 547        \t update_nodes(node1, callable2, ...)  or update_nodes([node1, callable2], ...)
 548
 549        > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id)
 550
 551        Parameters:
 552
 553        > filter: lambda property that can be used instead of a list of nodes to be updated.
 554        
 555        \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args)
 556
 557        > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set.
 558            Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method.
 559            
 560        \t data = {
 561            \t 'callable': print,
 562            \t 'args': ['Hello', 'World'],
 563            \t 'kwargs': {'sep': ', '},
 564            \t 'name': 'custom_label',
 565            \t 'id': 'unique-identifier'
 566            \t}
 567
 568        > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List).
 569
 570        > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary.
 571
 572        """
 573        if len(N) == 0 and filter == None:
 574            raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.")
 575
 576        elif len(N) > 0 and filter is not None:
 577            raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.")
 578
 579        elif isinstance(N, dict) or isinstance(N, str):
 580            N = [N]
 581
 582        elif len(N) == 1 and isinstance(N[0], _abc.Iterable):
 583            if isinstance(N[0][0], dict):
 584                N = N[0]
 585            elif isinstance(N[0][0], str):
 586                # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator).
 587                if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 
 588                    N = N[0]
 589
 590
 591        if filter is not None:
 592            node_ids = self.filter_nodes(filter, data = False)
 593        else:
 594            if isinstance(N[0], dict):
 595                ids = [n['id'] for n in N]
 596            else:
 597                ids = N
 598            node_ids = [self.node_id(i) for i in ids]
 599        
 600        std_update = (callable == None) and (args == None) and (kwargs == None)
 601
 602        if std_update:
 603            for node_id, n in zip(node_ids, N):
 604                if data is not None:
 605                    self._update_node(data, id = node_id)
 606                else:
 607                    self._update_node(n, id = node_id)
 608        
 609        else:
 610            if callable is not None:
 611                if not _iscallable(callable):
 612                    err_msg = f"Requested node is not assigned a callable Python function."
 613                    raise TypeError(err_msg)
 614                for node_id in node_ids:
 615                    self.graph.nodes[node_id]['callable'] = callable
 616                    self.graph.nodes[node_id]['name'] = callable.__name__
 617            
 618            if args is not None:
 619                if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)):
 620                    err_msg = f"Requested node is not assigned a tuple of pos args."
 621                    raise TypeError(err_msg)
 622                args = tuple(args)
 623                for node_id in node_ids:
 624                    self.graph.nodes[node_id]['args'] = args
 625            
 626            if kwargs is not None:
 627                if not isinstance(kwargs, dict):
 628                    err_msg = f"Requested node is not assigned a dictionary of kw args."
 629                    raise TypeError(err_msg)
 630                for node_id in node_ids:
 631                    self.graph.nodes[node_id]['kwargs'] = kwargs
 632
 633    def get_edges(self, *N: _T.Any, data: bool = True) -> _T.Union[_T.List[edgedict], _T.List[_T.Tuple[str, str]]]:
 634        """
 635        Retrieval function for DAG edge data, based on tuple identifiers.
 636        Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2']))
 637        """
 638        if len(N) == 0:
 639            if data:
 640                return [edgedict(u, v, **d) for u, v, d in self.graph.edges.data()]
 641            return list(self.graph.edges.data(data=False))
 642            
 643        # elif len(N) == 1:
 644        #     if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple):
 645        #         N = N[0]
 646
 647        if len(N) != len(set(N)) or not all(isinstance(n, _abc.Iterable) and len(n) == 2 for n in N):
 648            err_msg = 'All requested edges must be unique tuples of size 2.'
 649            raise ValueError(err_msg)
 650    
 651        edge_data = [self.get_edge(o, i, data=data) for (o, i) in N]
 652        return edge_data
 653    
 654    def get_edge(self, outgoing_node: _T.Union[str, _T.Callable], incoming_node: _T.Union[str, _T.Callable], data: bool = True) -> _T.Union[edgedict, _T.Tuple[str, str]]:
 655        """
 656        Retrieval function for a single directed edge between nodes in a DAG's scope. 
 657        """
 658        id = (self.node_id(outgoing_node), self.node_id(incoming_node))
 659        if not data:
 660            return id
 661        edge_data = self.graph.get_edge_data(u = id[0], v = id[1])
 662        if not edge_data:
 663            err_msg = f'No match found for the directed edge requested: {id}'
 664            raise ValueError(err_msg)
 665        assert edge_data is not None  # Type narrowing for Pylance
 666        return edgedict(*id, **edge_data)
 667
 668    def get_node(self, n: _T.Union[str, _T.Callable]) -> dotdict:
 669        """
 670        Given a unique identifier, returns a dictionary of node metadata
 671        for a single node in the DAG's scope.
 672        """
 673        node_id = self.node_id(n)
 674        return dotdict(self.graph.nodes[node_id])
 675
 676    def get_nodes(self, *N: _T.Any) -> _T.List[dotdict]:
 677        """
 678        Flexible function to retrieve DAG node data, based on node identifiers 
 679        (e.g.: string IDs or unique callables).
 680        """
 681        if len(N) == 0:
 682            node_data = list(self.graph.nodes.values())
 683            return [dotdict(d) for d in node_data]
 684
 685        elif len(N) == 1:
 686            if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str):
 687                N = N[0]
 688            else:
 689                node_id = self.node_id(N[0])
 690                node_data = [dotdict(self.graph.nodes[node_id])]
 691                return node_data
 692            
 693        if not len(N) == len(set(N)):
 694            err_msg = 'All requested nodes must be unique identifiers.'
 695            raise ValueError(err_msg)
 696        
 697        node_ids  = [self.node_id(n) for n in N]
 698        node_data = [dotdict(self.graph.nodes[n]) for n in node_ids]
 699        return node_data
 700
 701    def filter_nodes(self, filter: _types.LambdaType, data: bool = False) -> _T.List[str] | _T.List[dotdict]:
 702        """
 703        Given a lambda function, filter nodes in a DAG's scope based on its attributes.
 704        Current limitation: Filters must use node identifier strings when referencing nodes.
 705        Use get_nodes for more flexible controls.
 706        """
 707        if not data:
 708            return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))]
 709        return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))]
 710    
 711    def filter_edges(self, filter: _types.LambdaType, data: bool = False) -> _T.List[_T.Tuple[str, str]] | _T.List[edgedict]:
 712        """
 713        Given a lambda function, filter edges in a DAG's scope based on its attributes.
 714        Current limitation: Filters must use node identifier strings when referencing nodes.
 715        Use get_edges for more flexible controls.
 716        """
 717        if not data:
 718            return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
 719        return [edgedict(*e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
 720
 721    def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int) -> None:
 722        """
 723        Given a node identifier, set number of automatic retries in case of failure.
 724        Re-attempts will begin as soon as possible.
 725        """
 726        if not isinstance(count, int) or not count >= 0:
 727            raise ValueError("Retry count must be a non-negative integer.")
 728        node = self.node_id(identifier)
 729        self.graph.nodes[node]['retry'] = count
 730
 731    def skip_node(self, identifier: _T.Union[str, _T.Callable], skip: bool = True, as_failure: bool = False) -> None:
 732        """
 733        Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error).
 734        Allows conditional control and testing over DAG's order of operations.
 735        """
 736        if not isinstance(skip, bool):
 737            raise ValueError("Skip argument must be a boolean value.")
 738        node = self.node_id(identifier)
 739        self.graph.nodes[node]['skip'] = (skip, as_failure)
 740      
 741    def critical_path(self, nodes: _T.Union[str, _T.Callable, _T.Iterable[_T.Union[str, _T.Callable]]]) -> 'DAG':
 742        """
 743        Given a set of nodes, returns a subset of the DAG containing
 744        only the node(s) and its parents, or upstream dependencies.
 745        """
 746        if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str):
 747            node_ids = {self.node_id(n) for n in nodes}
 748        else:
 749            node_ids = {self.node_id(nodes)}
 750        return self._subgraph(node_ids)
 751
 752    def execute(
 753        self,
 754        return_ref: bool = False,
 755        print_exc: bool = False,
 756        max_workers: _T.Optional[int] = None,
 757        verbose: bool = True
 758    ) -> _T.Optional['DAG.DAGExecutor']:
 759        """
 760        Basic wrapper for execution of the DAG's underlying callables.
 761
 762        Args:
 763            return_ref: If True, return the DAGExecutor instance.
 764            print_exc: If True, print full exception tracebacks.
 765            max_workers: Maximum number of worker threads. Defaults to None (auto).
 766            verbose: If True, log task start/completion messages. Defaults to True.
 767        """
 768        executor = self.DAGExecutor(
 769            self.graph, self.root, print_exc,
 770            max_workers=max_workers, verbose=verbose
 771        )
 772        return executor if return_ref else None
 773
 774    def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] | None = None) -> _T.Any:
 775        """
 776        Basic wrapper to visualize DAG using Vis.js and NetGraph libraries.
 777        By default, visualization library only loaded in after DAG.visualize() is called, halving import times.
 778        """
 779        import webber.viz as _viz
 780
 781        match type:
 782            case 'browser':
 783                _viz.visualize_browser(self.graph)
 784
 785            case 'plt':
 786                return _viz.visualize_plt(self.graph)
 787
 788            case 'gui':
 789                # _visualize_gui(self.graph)
 790                raise NotImplementedError
 791
 792            case None: 
 793                if _viz._in_notebook():
 794                    return _viz.visualize_plt(self.graph)
 795                else:
 796                    _viz.visualize_browser(self.graph)
 797
 798            case _:
 799                err_msg = "Unknown visualization type requested."
 800                raise NotImplementedError(err_msg)
 801
 802    @property
 803    def root(self) -> _T.List[str]:
 804        """
 805        Return list of nodes with no dependencies.
 806        Root nodes will occur first in DAG's order of operations.
 807        Uses O(1) in_degree() instead of O(k) predecessors list creation.
 808        """
 809        return [node for node in self.graph.nodes if self.graph.in_degree(node) == 0]
 810
 811    @property
 812    def nodes(self) -> _T.Any:
 813        """Returns the NodeView of the underlying NetworkX graph containing all DAG nodes."""
 814        return self.graph.nodes
 815
 816    def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str:
 817        """
 818        Validate whether identifier given is a valid node within the DAG's scope.
 819        Primarily for internal use, but useful for retrieving string identifiers
 820        for a unique callable in a DAG.
 821        Uses O(1) cache lookup for callables instead of O(n) linear search.
 822        """
 823        if isinstance(identifier, str):
 824            # O(1) lookup using graph.nodes dict
 825            if identifier not in self.graph.nodes:
 826                err_msg = f"Node {identifier} is not defined in this DAG's scope."
 827                raise ValueError(err_msg)
 828            return identifier
 829        elif _iscallable(identifier):
 830            # O(1) lookup using _callable_to_id cache
 831            if identifier not in self._callable_to_id:
 832                err_msg = f"Callable {identifier} is not defined in this DAG's scope."
 833                raise ValueError(err_msg)
 834            cached_id = self._callable_to_id[identifier]
 835            if cached_id is None:
 836                err_msg = f"Callable {identifier.__name__} " \
 837                        + "exists more than once in this DAG. " \
 838                        + "Use the unique string identifier of the required node."
 839                raise ValueError(err_msg)
 840            return cached_id
 841        else:
 842            err_msg = f"Node {identifier} must be a string or a Python callable"
 843            raise TypeError(err_msg)
 844    
 845    def _update_node(self, nodedict: _T.Dict[str, _T.Any], id: str | None = None, force: bool = False) -> None:
 846        """
 847        Internal only. Update properties of single node within a DAG's scope, 
 848        given a well-structured dictionary and the tuple identifier of the network edge. 
 849        Force argument bypasses dictionary validation, and should only be used internally.
 850        """
 851        if id is not None:
 852            try:
 853                if nodedict.get('id') is not None and id != nodedict['id']:
 854                    raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {nodedict['id']}")
 855            except ValueError as e:
 856                if not force:
 857                    raise e
 858            nodedict['id'] = id
 859        
 860        expected_keys = ('callable', 'args', 'kwargs', 'name', 'id')
 861        if not set(expected_keys).issuperset(set(nodedict.keys())):
 862            raise ValueError(f"Expecting keys: {expected_keys}")
 863        
 864        if not force:
 865            if nodedict.get('callable'):
 866                if not _iscallable(nodedict['callable']):
 867                    err_msg = f"Requested node is not assigned a callable Python function."
 868                    raise TypeError(err_msg)
 869                if not nodedict.get('name'):
 870                    nodedict['name'] = nodedict['callable'].__name__
 871
 872            if nodedict.get('name') and (not isinstance(nodedict['name'], str) or len(nodedict['name']) == 0): 
 873                err_msg = f"Requested node name must be a non-null Python string, will default to callable when not set."
 874                raise TypeError(err_msg)
 875
 876            if nodedict.get('args'):
 877                if not (isinstance(nodedict['args'], _abc.Iterable) and not isinstance(nodedict['args'], str)):
 878                    err_msg = f"Requested node is not assigned a tuple of pos args."
 879                    raise TypeError(err_msg)
 880                nodedict['args'] = tuple(nodedict['args'])
 881            
 882            if nodedict.get('kwargs') and not isinstance(nodedict['kwargs'], dict):
 883                err_msg = f"Requested node is not assigned a dictionary of kw args."
 884                raise TypeError(err_msg)                    
 885        
 886        node_id = nodedict.pop('id')
 887        self.graph.nodes[node_id].update(nodedict)
 888
 889        # Reset node name if implicitly requested.
 890        if not nodedict.get('name'):
 891            self.graph.nodes[node_id]['name'] = self.graph.nodes[node_id]['callable'].__name__
 892
 893    def _subgraph(self, node_ids: set[str]) -> 'DAG':
 894        """
 895        Internal only. Given a set of nodes, returns a subset of the DAG containing
 896        only the node(s) and upstream dependencies.
 897        Uses nx.ancestors() for O(V+E) performance instead of manual traversal.
 898        """
 899        all_nodes = set(node_ids)
 900        for node in node_ids:
 901            all_nodes.update(_nx.ancestors(self.graph, node))
 902        subgraph = self.graph.subgraph(all_nodes)
 903        return DAG(subgraph, __force=True)
 904
 905    def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise:
 906        """
 907        Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope.
 908        Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope.
 909        """
 910        try:
 911            key = self.node_id(promise.key)
 912        except Exception as e:
 913            raise _xcoms.InvalidCallable(e)
 914        return _xcoms.Promise(key)
 915
 916    def __init__(self, graph: _T.Union[_nx.DiGraph, _nx.Graph, None] = None, **kwargs: _T.Any) -> None:
 917
 918        if graph is None:
 919            self.graph = _nx.DiGraph()
 920            self._callable_to_id = {}
 921            return
 922
 923        # Meant for internal use only, creating DAGs from subgraphs.
 924        if kwargs.get('__force') == True:
 925            self.graph = _T.cast(_nx.DiGraph, graph)
 926            # Build cache from existing subgraph
 927            self._callable_to_id = {}
 928            for node_id, data in self.graph.nodes(data=True):
 929                callable_fn = data.get('callable')
 930                if callable_fn is not None:
 931                    if callable_fn in self._callable_to_id:
 932                        self._callable_to_id[callable_fn] = None  # Duplicate
 933                    else:
 934                        self._callable_to_id[callable_fn] = node_id
 935            return
 936
 937        _edges.validate_dag(graph)
 938        # Type narrowing: validate_dag ensures graph is a DiGraph
 939        assert isinstance(graph, _nx.DiGraph)
 940
 941        # Define framework specific logic as nested dictionaries.
 942        for node in graph.nodes.keys():
 943            graph.nodes[node]['callable'] = node
 944            graph.nodes[node]['name'] = node.__name__
 945            graph.nodes[node]['args'] = []
 946            graph.nodes[node]['kwargs'] = {}
 947
 948        for e in graph.edges:
 949            condition = graph.edges[e].get('Condition')
 950            if condition is not None:
 951                if condition not in Condition:
 952                    raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.')
 953            else:
 954                graph.edges[e]['Condition'] = Condition.Success
 955
 956        graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node))
 957        for n in graph.nodes:
 958            graph.nodes[n]['id'] = n
 959        self.graph = _nx.DiGraph(graph)
 960
 961        # Build callable->id cache after relabeling
 962        self._callable_to_id = {}
 963        for node_id, data in self.graph.nodes(data=True):
 964            callable_fn = data.get('callable')
 965            if callable_fn is not None:
 966                if callable_fn in self._callable_to_id:
 967                    self._callable_to_id[callable_fn] = None  # Duplicate
 968                else:
 969                    self._callable_to_id[callable_fn] = node_id
 970
 971    class DAGExecutor:
 972        """
 973        Base class used to execute DAG in embarrassingly parallel.
 974        """
 975        def __init__(
 976            self,
 977            graph: _nx.DiGraph,
 978            roots: _T.List[str],
 979            print_exc: bool = False,
 980            max_workers: _T.Optional[int] = None,
 981            verbose: bool = True
 982        ) -> None:
 983
 984            # Skip execution if there are no callables in scope.
 985            if len(graph.nodes) == 0:
 986                if verbose:
 987                    print('Given DAG has no callables in scope. Skipping execution...')
 988                return
 989
 990            # Initialize local variables for execution.
 991            complete: set[str] = set()
 992            started: set[str] = set()
 993            failed: set[str] = set()
 994            skipped: set[str] = set()
 995            refs: _T.Dict[str, _futures.Future[_T.Any]] = {}
 996
 997            def raise_exc(message: str) -> None:
 998                raise ValueError(message)
 999
1000            def run_conditions_met(n: str) -> bool:
1001                for p in graph.predecessors(n):
1002                    match graph.edges[(p, n)]['Condition']:
1003                        case Condition.Success:
1004                            if p not in complete:
1005                                return False
1006                        case Condition.Failure:
1007                            if p not in failed:
1008                                return False
1009                        case Condition.AnyCase:
1010                            if p not in failed and p not in complete:
1011                                return False
1012                return True
1013
1014            skip = graph.nodes.data("skip", default=(False, False))
1015            retry: _T.Dict[str, _T.List[_T.Any]] = {
1016                n: [c + 1, {}] for n, c in graph.nodes.data("retry", default=0)
1017            }
1018
1019            # Start execution of root node functions.
1020            with _futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
1021
1022                def Submit(
1023                    event: str,
1024                    callable: _T.Callable[..., _T.Any],
1025                    name: str,
1026                    args: _T.Union[tuple[_T.Any, ...], list[_T.Any]],
1027                    kwargs: _T.Dict[str, _T.Any]
1028                ) -> _futures.Future[_T.Any]:
1029                    if skip[event][0]:
1030                        retry[event][0] = 0
1031                        skip_callable = raise_exc if skip[event][1] else print
1032                        return executor.submit(
1033                            _event_wrapper,
1034                            _callable=skip_callable,
1035                            _name=graph.nodes[event]['name'],
1036                            _args=[f"Event {event} skipped..."],
1037                            _kwargs={},
1038                            _verbose=verbose
1039                        )
1040                    else:
1041                        retry[event][0] -= 1
1042                        if (retry[event][0] > 0) and (retry[event][1] == {}):
1043                            retry[event][1] = {
1044                                'callable': callable,
1045                                'name': name,
1046                                'args': args,
1047                                'kwargs': kwargs
1048                            }
1049                        return executor.submit(
1050                            _event_wrapper,
1051                            _callable=callable,
1052                            _name=name,
1053                            _args=args,
1054                            _kwargs=kwargs,
1055                            _verbose=verbose
1056                        )
1057
1058                # Submit root nodes
1059                for event in roots:
1060                    refs[event] = Submit(
1061                        event,
1062                        graph.nodes[event]['callable'],
1063                        graph.nodes[event]['name'],
1064                        graph.nodes[event]['args'],
1065                        graph.nodes[event]['kwargs']
1066                    )
1067                    started.add(event)
1068
1069                # Process futures as they complete (no busy-wait)
1070                pending = set(refs.values())
1071                future_to_event = {v: k for k, v in refs.items()}
1072
1073                while (len(complete) + len(failed) + len(skipped)) != len(graph):
1074                    if not pending:
1075                        break
1076
1077                    # Wait for at least one future to complete
1078                    done, pending = _futures.wait(pending, return_when=_futures.FIRST_COMPLETED)
1079
1080                    for future in done:
1081                        event = future_to_event[future]
1082
1083                        if future.exception() is not None:
1084                            try:
1085                                raise future.exception()  # type: ignore
1086                            except:
1087                                if print_exc:
1088                                    _traceback.print_exc()
1089
1090                                if retry[event][0] > 0:
1091                                    if verbose:
1092                                        print(f"Event {event} exited with exception, retrying...")
1093                                    new_future = Submit(
1094                                        event,
1095                                        callable=retry[event][1]['callable'],
1096                                        name=retry[event][1]['name'],
1097                                        args=retry[event][1]['args'],
1098                                        kwargs=retry[event][1]['kwargs']
1099                                    )
1100                                    refs[event] = new_future
1101                                    pending.add(new_future)
1102                                    future_to_event[new_future] = event
1103                                    continue
1104
1105                                if verbose:
1106                                    print(f"Event {event} exited with exception...")
1107                                failed.add(event)
1108                                skipping = [
1109                                    e[1] for e in set(graph.out_edges(event))
1110                                    if not _edges.continue_on_failure(graph.edges[e])
1111                                ]
1112                        else:
1113                            complete.add(event)
1114                            skipping = [
1115                                e[1] for e in set(graph.out_edges(event))
1116                                if not _edges.continue_on_success(graph.edges[e])
1117                            ]
1118
1119                        skipped = skipped.union(skipping)
1120                        for n in skipping:
1121                            skipped = skipped.union(_nx.descendants(graph, n))
1122
1123                        carryon = set(graph.successors(event)).difference(skipped)
1124                        starting = [
1125                            successor for successor in carryon
1126                            if run_conditions_met(successor)
1127                        ]
1128
1129                        for successor in starting:
1130                            _args = [
1131                                a if not isinstance(a, _xcoms.Promise) else refs[_T.cast(str, a.key)].result()
1132                                for a in graph.nodes[successor]['args']
1133                            ]
1134                            _kwargs = {
1135                                k: v if not isinstance(v, _xcoms.Promise) else refs[_T.cast(str, v.key)].result()
1136                                for k, v in graph.nodes[successor]['kwargs'].items()
1137                            }
1138                            new_future = Submit(
1139                                successor,
1140                                graph.nodes[successor]['callable'],
1141                                graph.nodes[successor]['name'],
1142                                _args,
1143                                _kwargs
1144                            )
1145                            refs[successor] = new_future
1146                            pending.add(new_future)
1147                            future_to_event[new_future] = successor
1148                            started.add(successor)
1149
1150class QueueDAG(DAG):
1151    """
1152    Directed Acyclic Graph used to queue and execute Pythonic callables in parallel,
1153    while stringing the outputs of those callables in linear sequences.
1154
1155    Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the
1156    behavior of root nodes to determine if and/or when the DAG run has been completed.
1157
1158    Root nodes will be re-executed until culled by one of two conditions:
1159    1. A max number of iterations has been completed, or
1160    2. The output of the root node's callable matches a lambda halt_condition.
1161
1162    Both conditions can be set at run-time.
1163
1164    QueueDAG can be nested inside a standard webber.DAG by passing qdag.execute as a callable.
1165    Benchmarks show moderate overhead of 1.3-2.5x (~1-2ms fixed cost) when nested, due to the
1166    outer DAG's ThreadPoolExecutor setup and task wrapper infrastructure. This is well within
1167    acceptable millisecond-scale latency for most real-time applications.
1168    """
1169
1170    conditions: _T.Dict[str, _T.Dict[str, _T.Any]] = {}
1171
1172    def __init__(self) -> None:
1173        super().__init__()
1174
1175    def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str:
1176        """
1177        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
1178        On success, return unique identifier for the new node.
1179        
1180        Reserved key-words are used for Queue DAG definitions:
1181        
1182        - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables.
1183
1184        \t halt_condition = (lambda output: output == None) 
1185
1186        - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument.
1187
1188        - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop.
1189        """
1190        halt_condition = kwargs.pop('halt_condition', None)
1191        iterator: int = kwargs.pop('iterator', None)
1192        max_iter: int = kwargs.pop('max_iter', None)
1193
1194        return_val = super().add_node(node, *args, **kwargs)
1195        
1196        if max_iter is not None:
1197            iter_limit = int(max_iter)
1198        elif iterator is not None:
1199            iter_limit = int(iterator)
1200        else:
1201            iter_limit = None
1202        
1203        self.conditions[return_val] = {
1204            'halt_condition': halt_condition,
1205            'iter_limit': iter_limit
1206        }
1207
1208        return return_val
1209        
1210    def add_edge(self, u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable], continue_on: Condition = Condition.Success) -> _T.Tuple[str, str]:
1211        """
1212        Adds an edge between nodes in the Queue DAG's underlying graph.
1213        Queue DAG nodes may have a maximum of one child and one parent worker.
1214        """
1215        for node in (u_of_edge, v_of_edge):
1216            try:
1217                node_id = self.node_id(node)
1218            except:
1219                continue
1220        
1221            filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id)
1222            try:
1223                assert(len(self.filter_edges(filter)) == 0)
1224            except Exception as e:
1225                e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.")
1226                raise e
1227
1228        return super().add_edge(u_of_edge, v_of_edge, continue_on)
1229
1230    def execute(self, *promises: _T.Any, return_ref: bool = False, print_exc: bool = False) -> _T.List[_T.Any] | None:
1231        """
1232        Basic wrapper for execution of the DAG's underlying callables.
1233        """
1234        queues: _T.Dict[str, _q.LifoQueue[_T.Any]] = {}
1235        processes: _T.Dict[str, _futures.Future[_T.Any]] = {}
1236        join: set[str] = set()
1237        end_proc: _T.Optional[str] = None
1238
1239        _promises: _T.Dict[str, _T.Any] = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {}
1240
1241        with _TaskLogger("root") as _:
1242
1243            # Skip execution if there are no callables in scope.
1244            if len(self.graph.nodes) == 0:
1245                print('Given DAG has no callables in scope. Skipping execution...')
1246                return
1247
1248            with _futures.ThreadPoolExecutor() as executor:
1249
1250                for id in self.root:
1251                    node = self.get_node(id)
1252                    queues[id] = _q.LifoQueue()
1253                    node.update({
1254                        'callable': _queue._worker,
1255                        'args': tuple(),
1256                        'kwargs': {
1257                            'work': node.callable,
1258                            'args': node.args, 'kwargs': node.kwargs,
1259                            'promises': _promises,
1260                            'print_exc': print_exc,
1261                            'halt_condition': self.conditions[id]['halt_condition'],
1262                            'iter_limit': self.conditions[id]['iter_limit'],
1263                            'out_queue': queues.get(id)
1264                        }
1265                    })
1266                    processes[id] = executor.submit(
1267                        _event_wrapper,
1268                        _callable=node['callable'],
1269                        _name=node['name'],
1270                        _args=node['args'],
1271                        _kwargs=node['kwargs']
1272                    )
1273                
1274                for parent_id, id in self.graph.edges:
1275                    node = self.get_node(id)
1276                    queues[id] = _q.LifoQueue()
1277                    if len(list(self.graph.successors(id))) == 0:
1278                        end_proc = id
1279                    node.update({
1280                        'callable': _queue._worker,
1281                        'args': tuple(),
1282                        'kwargs': {
1283                            'work': node.callable,
1284                            'args': node.args, 'kwargs': node.kwargs,
1285                            'promises': _promises,
1286                            'print_exc': print_exc,
1287                            'parent_id': parent_id,
1288                            'parent_process': processes[parent_id],
1289                            'in_queue': queues.get(parent_id),
1290                            'out_queue': queues.get(id)
1291                        }
1292                    })
1293                    processes[id] = executor.submit(
1294                        _event_wrapper,
1295                        _callable=node['callable'],
1296                        _name=node['name'],
1297                        _args=node['args'],
1298                        _kwargs=node['kwargs']
1299                    )
1300
1301            # For single-node DAGs with no edges, end_proc is the root node
1302            if end_proc is None and len(self.root) > 0:
1303                end_proc = self.root[0]
1304
1305            while len(join) != len(self.graph.nodes):
1306                for node in self.graph.nodes:
1307                    if processes[node].done():
1308                        join.add(node)
1309
1310            return_val: _T.List[_T.Any] = []
1311            if end_proc is not None and end_proc in queues:
1312                while not queues[end_proc].empty():
1313                    return_val.append(queues[end_proc].get())
1314
1315            return return_val
class DAG:
 195class DAG:
 196    """
 197    Directed Acyclic Graph used to represent Pythonic tasks in parallel.
 198    """
 199    graph: _nx.DiGraph
 200    _callable_to_id: _T.Dict[_T.Callable, _T.Optional[str]]  # O(1) callable->node_id lookup cache (None = duplicates)
 201
 202    def _callable_status(self, c: _T.Callable) -> _T.Literal['none', 'one', 'many']:
 203        """Returns 'none' if not in DAG, 'one' if unique, 'many' if duplicated.
 204        O(1) lookup using _callable_to_id cache."""
 205        if c not in self._callable_to_id:
 206            return 'none'
 207        return 'one' if self._callable_to_id[c] is not None else 'many'
 208
 209    def _assign_node(self, n: _T.Any, new_callables: _T.Dict[_T.Callable, _T.Callable]) -> str:
 210        """Helper to resolve node to its ID, adding to new_callables dict if needed."""
 211        if not _iscallable(n):
 212            return n
 213        status = self._callable_status(n)
 214        if status == 'many':
 215            err_msg = f"Callable {n.__name__} " \
 216                + "exists more than once in this DAG. " \
 217                + "Use the unique string identifier of the required node."
 218            raise ValueError(err_msg)
 219        if status == 'one':
 220            return self._callable_to_id[n]  # type: ignore[return-value]
 221        new_callables[n] = n
 222        return _edges.label_node(n)
 223
 224    def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str:
 225        """
 226        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
 227        On success, return unique identifier for the new node.
 228        """
 229        if not _iscallable(node):
 230            err_msg = f"{node}: requested node is not a callable Python function."
 231            raise TypeError(err_msg)
 232
 233        node_name = _edges.label_node(node)
 234
 235        args = tuple(
 236            arg if not isinstance(arg, _xcoms.Promise) else self.resolve_promise(arg)
 237            for arg in args
 238        )
 239
 240        for k, val in kwargs.items():
 241            if isinstance(val, _xcoms.Promise):
 242                kwargs[k] = self.resolve_promise(val)
 243
 244        self.graph.add_node(
 245            node_for_adding=node_name,
 246            callable=node, args=args, kwargs=kwargs,
 247            name=node.__name__,
 248            id=node_name
 249        )
 250
 251        # Populate callable->id cache for O(1) lookup
 252        # If callable already exists, mark as None to indicate duplicates
 253        if node in self._callable_to_id:
 254            self._callable_to_id[node] = None  # Multiple occurrences - must use string ID
 255        else:
 256            self._callable_to_id[node] = node_name
 257
 258        return node_name
 259
 260
 261    def add_edge(
 262            self, 
 263            u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable],
 264            continue_on: Condition = Condition.Success
 265        ) -> _T.Tuple[str,str]:
 266        """
 267        Adds an edge between nodes in the DAG's underlying graph,
 268        so long as the requested edge is unique and has not been added previously.
 269
 270        On success, returns Tuple of the new edge's unique identifiers.
 271        """
 272        # Validate inputs prior to execution
 273        # - Nodes must be identifiers or callables
 274        # - Conditions must belong to the webber.edges.Condition class
 275        if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)):
 276            err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable"
 277            raise TypeError(err_msg)
 278        if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)):
 279            err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable"
 280            raise TypeError(err_msg)
 281        if not isinstance(continue_on, Condition):
 282            raise TypeError("Edge conditions must use the webber.edges.Condition class.")
 283
 284        # Base Case 0: No nodes are present in the DAG:
 285        # Ensure that both nodes are callables, then add both to the graph and
 286        # assign the outgoing node as a root.
 287        if len(self.graph.nodes()) == 0:
 288            if not _iscallable(u_of_edge):
 289                err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope."
 290                raise ValueError(err_msg)
 291            if not _iscallable(v_of_edge):
 292                err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope."
 293                raise ValueError(err_msg)
 294            outgoing_node = self.add_node(u_of_edge)
 295            incoming_node = self.add_node(v_of_edge)
 296            self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
 297            return (outgoing_node, incoming_node)
 298
 299        new_callables: _T.Dict[_T.Callable, _T.Callable] = {}
 300
 301        if _iscallable(u_of_edge) and _iscallable(v_of_edge):
 302            # Type narrowing: we've verified both are callables
 303            u_callable = _T.cast(_T.Callable, u_of_edge)
 304            v_callable = _T.cast(_T.Callable, v_of_edge)
 305
 306            # Error Cases 0, 1: Either of the callables appear more than once in the DAG.
 307            if self._callable_status(u_callable) == 'many':
 308                err_msg = f"Callable {u_callable.__name__} " \
 309                        + "exists more than once in this DAG. " \
 310                        + "Use the unique string identifier of the required node."
 311                raise ValueError(err_msg)
 312            if self._callable_status(v_callable) == 'many':
 313                err_msg = f"Callable {v_callable.__name__} " \
 314                        + "exists more than once in this DAG. " \
 315                        + "Use the unique string identifier of the required node."
 316                raise ValueError(err_msg)
 317
 318            # Base Case 1: Both args are callables and will be present in the DAG scope no more than once.
 319            # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes.
 320            cached_u = self._callable_to_id.get(u_callable)
 321            if cached_u is not None:
 322                outgoing_node = cached_u
 323            else:
 324                new_callables[u_callable] = u_callable
 325                outgoing_node = _edges.label_node(u_callable)
 326
 327            cached_v = self._callable_to_id.get(v_callable)
 328            if cached_v is not None:
 329                incoming_node = cached_v
 330            else:
 331                new_callables[v_callable] = v_callable
 332                incoming_node = _edges.label_node(v_callable)
 333
 334        else:
 335
 336            # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope.
 337            if isinstance(u_of_edge, str) and u_of_edge not in self.graph.nodes:
 338                err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope."
 339                raise ValueError(err_msg)
 340            if isinstance(v_of_edge, str) and v_of_edge not in self.graph.nodes:
 341                err_msg = f"Incoming node {v_of_edge} not in DAG's current scope."
 342                raise ValueError(err_msg)
 343
 344            # Both nodes' unique identifiers are present in the DAG
 345            # and should be evaluated for a valid edge.
 346            if isinstance(u_of_edge, str) and isinstance(v_of_edge, str):
 347                outgoing_node = u_of_edge
 348                incoming_node = v_of_edge
 349
 350            # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier.
 351            else:
 352                for node in (u_of_edge, v_of_edge):
 353                    if node == u_of_edge:
 354                        outgoing_node = self._assign_node(node, new_callables)
 355                    else:
 356                        incoming_node = self._assign_node(node, new_callables)
 357
 358        # Error Case 5: Both callables exist only once in the DAG,
 359        # but an edge already exists between them. O(1) lookup with has_edge().
 360        if self.graph.has_edge(outgoing_node, incoming_node):
 361            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \
 362                    + "a definition in this DAG."
 363            raise ValueError(err_msg)
 364
 365        # Error Case 6: Adding an edge would create circular dependencies.
 366        # Use has_path() for O(V+E) incremental check instead of rebuilding entire graph.
 367        if incoming_node in self.graph and _nx.has_path(self.graph, incoming_node, outgoing_node):
 368            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \
 369                    + "results in circular dependencies."
 370            raise ValueError(err_msg)
 371
 372        # We can now add the edge to the DAG, since we are certain it will not result in
 373        # illegal dependencies/behavior.
 374        # First, we should account for potential new nodes. This also handles
 375        # duplicates on first entry to the DAG (e.g.: edge == (print, print))
 376        if _iscallable(u_of_edge) and new_callables.get(_T.cast(_T.Callable, u_of_edge)) is not None:
 377            outgoing_node = self.add_node(new_callables[_T.cast(_T.Callable, u_of_edge)])
 378        if _iscallable(v_of_edge) and new_callables.get(_T.cast(_T.Callable, v_of_edge)) is not None:
 379            incoming_node = self.add_node(new_callables[_T.cast(_T.Callable, v_of_edge)])
 380
 381        # Then we can add the new edge.
 382        self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
 383        return (outgoing_node, incoming_node)
 384    
 385    def remove_node(self, *posargs, **kwargs) -> None:
 386        """
 387        Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG.
 388        Throws error message and recommends safer methods.
 389        """
 390        raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.")
 391
 392    def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]:
 393        """
 394        Removes an directed edge between nodes in the DAG's underlying graph.
 395        Throws error if the edge does not exist.
 396        On success, returns Tuple of the removed edge's unique identifiers.
 397        """
 398        edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge))
 399        if edge_id not in self.graph.edges(data = False):
 400            err_msg = "Requested edge does not exist in the DAG's scope"
 401            raise ValueError(err_msg)
 402        self.graph.remove_edge(edge_id[0], edge_id[1])
 403        return edge_id
 404
 405    def update_edges(self, *E: _T.Any, continue_on: Condition | None = None, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | edgedict | None = None) -> None:
 406        """
 407        Flexible function to update properties of edges in the DAG's scope, 
 408        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
 409        lambda filter using the edgedict syntax.
 410        
 411        List of nodes to update or filter argument is expected. Valid edge lists include:
 412
 413        \t update_edges((node1, node2), ...)       or update_edges([(node1, node2)], ...)
 414
 415        \t update_edges((node1, callable2), ...)   or update_edges([(node1, callable2)], ...)
 416        
 417        \t update_edges(edgedict),                 where isinstance(edgedict, webber.edges.edgedict) == True
 418
 419        Parameters:
 420
 421        > continue_on: webber.edges.Condition value to update matching edges with, 
 422        
 423        \t Execute child on success, failure, or any exit state, of the parent node
 424
 425        > filter: lambda property that can be used instead of a list of edges to be updated.
 426        
 427        \t filter = (lambda e: n.parent == print or e.child == print)
 428
 429        > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set.
 430
 431        \t data = { 'continue_on': webber.edges.Condition }
 432
 433        """
 434        if len(E) == 0 and filter == None:
 435            raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.")
 436
 437        elif isinstance(E, dict) or isinstance(E, edgedict):
 438            E = [E]
 439
 440        elif len(E) == 1 and isinstance(E[0], _abc.Iterable):
 441            try:
 442                _ = self.get_edges(E[0])
 443            except:
 444                E = E[0]
 445
 446        if filter is not None:
 447            edge_ids = self.filter_edges(filter, data = False)
 448        else:
 449            if isinstance(E[0], dict) or isinstance(E[0], edgedict):
 450                try:
 451                    ids = [e['id'] for e in E]
 452                except KeyError:
 453                    err_msg = 'In dictionary form, all given edges must be follow edgedict standards.'
 454                    raise ValueError(err_msg)
 455            else:
 456                ids = E
 457            edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids]
 458
 459        std_update = (continue_on is None)
 460
 461        if std_update:
 462            for edge_id, e in zip(edge_ids, E):
 463                if data is not None:
 464                    self._update_edge(data, id = edge_id)
 465                else:
 466                    self._update_edge(e, id = edge_id)
 467        
 468        else:
 469            if continue_on is not None:
 470                if not isinstance(continue_on, Condition):
 471                    err_msg = f"Condition assignment must use webber.edges.Condition"
 472                    raise TypeError(err_msg)
 473                for e in edge_ids:
 474                    self.graph.edges[e]['Condition'] = continue_on
 475
 476    def _update_edge(self, edgedict: _T.Dict[str, _T.Any], id: _T.Tuple[str, str] | None = None, force: bool = False) -> None:
 477        """
 478        Internal only. Update properties of an individual edge within a DAG's scope, 
 479        given a well-structured dictionary and the tuple identifier of the network edge. 
 480        Force argument bypasses validation, and should only be used internally.
 481        """
 482        if id is not None:
 483            try:
 484                if edgedict.get('id') and id != edgedict['id']:
 485                    raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {edgedict['id']}")
 486            except ValueError as e:
 487                if not force:
 488                    raise e
 489            edgedict['id'] = id
 490
 491        expected_keys = ('parent', 'child', 'id', 'continue_on')
 492        if not set(expected_keys).issuperset(set(edgedict.keys())):
 493            raise ValueError(f"Expecting keys: {expected_keys}")
 494
 495        if not force:
 496            
 497            e1, e2 = None, None
 498
 499            if edgedict.get('id'):
 500                e1 = self.get_edge(edgedict['id'], data = False)
 501
 502            if edgedict.get('parent') and edgedict.get('child'):
 503                e2 = self.get_edge(edgedict['parent'], edgedict['child'], data = False)
 504            else:
 505                e2 = e1
 506
 507            if e1 != e2:
 508                raise ValueError('Edge vertices should not be changed using update functions.')
 509            
 510            elif e1 == None:
 511                raise ValueError('Requested edge was not given an identifier.')
 512
 513            if edgedict.get('continue_on') and not isinstance(edgedict['continue_on'], Condition):
 514                err_msg = f"Condition assignment must use webber.edges.Condition"
 515                raise TypeError(err_msg)
 516                       
 517        edge_id = edgedict.pop('id')
 518        edge = {k: v for k,v in edgedict.items() if k not in ('parent', 'child', 'id')}
 519        self.graph.edges[(edge_id[0], edge_id[1])].update(edge)
 520
 521    def relabel_node(self, node: _T.Union[str, _T.Callable], label: str) -> str:
 522        """
 523        Update the label, or name, given to a node in the DAG's scope, 
 524        given a Python string and a node identifier. 
 525        Well-structured wrapper for a common use-case of DAG.update_nodes.
 526        """
 527        node_id = self.node_id(node)
 528        if not isinstance(label, str) or len(label) == 0:
 529            err_msg = "Node label must be a Python string with one or more characters."
 530            raise ValueError(err_msg)
 531        self.update_nodes(node_id, data = {'name': label})
 532        return label
 533
 534    def update_nodes(self, *N: _T.Any, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | dotdict | None = None, callable: _T.Callable | None = None, args: _T.Iterable[_T.Any] | None = None, kwargs: _T.Dict[str, _T.Any] | None = None) -> None:
 535        """
 536        Flexible function to update properties of nodes in the DAG's scope, 
 537        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
 538        lambda filter using the dotdict syntax.
 539        
 540        List of nodes to update or filter argument is expected. Valid node lists include:
 541
 542        \t update_nodes(node_id, ...)           or update_nodes([node_id], ...)
 543
 544        \t update_nodes(callable, ...)          or update_nodes([callable], ...)
 545        
 546        \t update_nodes(node1, node2, ...)      or update_nodes([node1, node2], ...)
 547        
 548        \t update_nodes(node1, callable2, ...)  or update_nodes([node1, callable2], ...)
 549
 550        > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id)
 551
 552        Parameters:
 553
 554        > filter: lambda property that can be used instead of a list of nodes to be updated.
 555        
 556        \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args)
 557
 558        > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set.
 559            Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method.
 560            
 561        \t data = {
 562            \t 'callable': print,
 563            \t 'args': ['Hello', 'World'],
 564            \t 'kwargs': {'sep': ', '},
 565            \t 'name': 'custom_label',
 566            \t 'id': 'unique-identifier'
 567            \t}
 568
 569        > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List).
 570
 571        > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary.
 572
 573        """
 574        if len(N) == 0 and filter == None:
 575            raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.")
 576
 577        elif len(N) > 0 and filter is not None:
 578            raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.")
 579
 580        elif isinstance(N, dict) or isinstance(N, str):
 581            N = [N]
 582
 583        elif len(N) == 1 and isinstance(N[0], _abc.Iterable):
 584            if isinstance(N[0][0], dict):
 585                N = N[0]
 586            elif isinstance(N[0][0], str):
 587                # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator).
 588                if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 
 589                    N = N[0]
 590
 591
 592        if filter is not None:
 593            node_ids = self.filter_nodes(filter, data = False)
 594        else:
 595            if isinstance(N[0], dict):
 596                ids = [n['id'] for n in N]
 597            else:
 598                ids = N
 599            node_ids = [self.node_id(i) for i in ids]
 600        
 601        std_update = (callable == None) and (args == None) and (kwargs == None)
 602
 603        if std_update:
 604            for node_id, n in zip(node_ids, N):
 605                if data is not None:
 606                    self._update_node(data, id = node_id)
 607                else:
 608                    self._update_node(n, id = node_id)
 609        
 610        else:
 611            if callable is not None:
 612                if not _iscallable(callable):
 613                    err_msg = f"Requested node is not assigned a callable Python function."
 614                    raise TypeError(err_msg)
 615                for node_id in node_ids:
 616                    self.graph.nodes[node_id]['callable'] = callable
 617                    self.graph.nodes[node_id]['name'] = callable.__name__
 618            
 619            if args is not None:
 620                if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)):
 621                    err_msg = f"Requested node is not assigned a tuple of pos args."
 622                    raise TypeError(err_msg)
 623                args = tuple(args)
 624                for node_id in node_ids:
 625                    self.graph.nodes[node_id]['args'] = args
 626            
 627            if kwargs is not None:
 628                if not isinstance(kwargs, dict):
 629                    err_msg = f"Requested node is not assigned a dictionary of kw args."
 630                    raise TypeError(err_msg)
 631                for node_id in node_ids:
 632                    self.graph.nodes[node_id]['kwargs'] = kwargs
 633
 634    def get_edges(self, *N: _T.Any, data: bool = True) -> _T.Union[_T.List[edgedict], _T.List[_T.Tuple[str, str]]]:
 635        """
 636        Retrieval function for DAG edge data, based on tuple identifiers.
 637        Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2']))
 638        """
 639        if len(N) == 0:
 640            if data:
 641                return [edgedict(u, v, **d) for u, v, d in self.graph.edges.data()]
 642            return list(self.graph.edges.data(data=False))
 643            
 644        # elif len(N) == 1:
 645        #     if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple):
 646        #         N = N[0]
 647
 648        if len(N) != len(set(N)) or not all(isinstance(n, _abc.Iterable) and len(n) == 2 for n in N):
 649            err_msg = 'All requested edges must be unique tuples of size 2.'
 650            raise ValueError(err_msg)
 651    
 652        edge_data = [self.get_edge(o, i, data=data) for (o, i) in N]
 653        return edge_data
 654    
 655    def get_edge(self, outgoing_node: _T.Union[str, _T.Callable], incoming_node: _T.Union[str, _T.Callable], data: bool = True) -> _T.Union[edgedict, _T.Tuple[str, str]]:
 656        """
 657        Retrieval function for a single directed edge between nodes in a DAG's scope. 
 658        """
 659        id = (self.node_id(outgoing_node), self.node_id(incoming_node))
 660        if not data:
 661            return id
 662        edge_data = self.graph.get_edge_data(u = id[0], v = id[1])
 663        if not edge_data:
 664            err_msg = f'No match found for the directed edge requested: {id}'
 665            raise ValueError(err_msg)
 666        assert edge_data is not None  # Type narrowing for Pylance
 667        return edgedict(*id, **edge_data)
 668
 669    def get_node(self, n: _T.Union[str, _T.Callable]) -> dotdict:
 670        """
 671        Given a unique identifier, returns a dictionary of node metadata
 672        for a single node in the DAG's scope.
 673        """
 674        node_id = self.node_id(n)
 675        return dotdict(self.graph.nodes[node_id])
 676
 677    def get_nodes(self, *N: _T.Any) -> _T.List[dotdict]:
 678        """
 679        Flexible function to retrieve DAG node data, based on node identifiers 
 680        (e.g.: string IDs or unique callables).
 681        """
 682        if len(N) == 0:
 683            node_data = list(self.graph.nodes.values())
 684            return [dotdict(d) for d in node_data]
 685
 686        elif len(N) == 1:
 687            if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str):
 688                N = N[0]
 689            else:
 690                node_id = self.node_id(N[0])
 691                node_data = [dotdict(self.graph.nodes[node_id])]
 692                return node_data
 693            
 694        if not len(N) == len(set(N)):
 695            err_msg = 'All requested nodes must be unique identifiers.'
 696            raise ValueError(err_msg)
 697        
 698        node_ids  = [self.node_id(n) for n in N]
 699        node_data = [dotdict(self.graph.nodes[n]) for n in node_ids]
 700        return node_data
 701
 702    def filter_nodes(self, filter: _types.LambdaType, data: bool = False) -> _T.List[str] | _T.List[dotdict]:
 703        """
 704        Given a lambda function, filter nodes in a DAG's scope based on its attributes.
 705        Current limitation: Filters must use node identifier strings when referencing nodes.
 706        Use get_nodes for more flexible controls.
 707        """
 708        if not data:
 709            return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))]
 710        return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))]
 711    
 712    def filter_edges(self, filter: _types.LambdaType, data: bool = False) -> _T.List[_T.Tuple[str, str]] | _T.List[edgedict]:
 713        """
 714        Given a lambda function, filter edges in a DAG's scope based on its attributes.
 715        Current limitation: Filters must use node identifier strings when referencing nodes.
 716        Use get_edges for more flexible controls.
 717        """
 718        if not data:
 719            return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
 720        return [edgedict(*e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
 721
 722    def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int) -> None:
 723        """
 724        Given a node identifier, set number of automatic retries in case of failure.
 725        Re-attempts will begin as soon as possible.
 726        """
 727        if not isinstance(count, int) or not count >= 0:
 728            raise ValueError("Retry count must be a non-negative integer.")
 729        node = self.node_id(identifier)
 730        self.graph.nodes[node]['retry'] = count
 731
 732    def skip_node(self, identifier: _T.Union[str, _T.Callable], skip: bool = True, as_failure: bool = False) -> None:
 733        """
 734        Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error).
 735        Allows conditional control and testing over DAG's order of operations.
 736        """
 737        if not isinstance(skip, bool):
 738            raise ValueError("Skip argument must be a boolean value.")
 739        node = self.node_id(identifier)
 740        self.graph.nodes[node]['skip'] = (skip, as_failure)
 741      
 742    def critical_path(self, nodes: _T.Union[str, _T.Callable, _T.Iterable[_T.Union[str, _T.Callable]]]) -> 'DAG':
 743        """
 744        Given a set of nodes, returns a subset of the DAG containing
 745        only the node(s) and its parents, or upstream dependencies.
 746        """
 747        if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str):
 748            node_ids = {self.node_id(n) for n in nodes}
 749        else:
 750            node_ids = {self.node_id(nodes)}
 751        return self._subgraph(node_ids)
 752
 753    def execute(
 754        self,
 755        return_ref: bool = False,
 756        print_exc: bool = False,
 757        max_workers: _T.Optional[int] = None,
 758        verbose: bool = True
 759    ) -> _T.Optional['DAG.DAGExecutor']:
 760        """
 761        Basic wrapper for execution of the DAG's underlying callables.
 762
 763        Args:
 764            return_ref: If True, return the DAGExecutor instance.
 765            print_exc: If True, print full exception tracebacks.
 766            max_workers: Maximum number of worker threads. Defaults to None (auto).
 767            verbose: If True, log task start/completion messages. Defaults to True.
 768        """
 769        executor = self.DAGExecutor(
 770            self.graph, self.root, print_exc,
 771            max_workers=max_workers, verbose=verbose
 772        )
 773        return executor if return_ref else None
 774
 775    def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] | None = None) -> _T.Any:
 776        """
 777        Basic wrapper to visualize DAG using Vis.js and NetGraph libraries.
 778        By default, visualization library only loaded in after DAG.visualize() is called, halving import times.
 779        """
 780        import webber.viz as _viz
 781
 782        match type:
 783            case 'browser':
 784                _viz.visualize_browser(self.graph)
 785
 786            case 'plt':
 787                return _viz.visualize_plt(self.graph)
 788
 789            case 'gui':
 790                # _visualize_gui(self.graph)
 791                raise NotImplementedError
 792
 793            case None: 
 794                if _viz._in_notebook():
 795                    return _viz.visualize_plt(self.graph)
 796                else:
 797                    _viz.visualize_browser(self.graph)
 798
 799            case _:
 800                err_msg = "Unknown visualization type requested."
 801                raise NotImplementedError(err_msg)
 802
 803    @property
 804    def root(self) -> _T.List[str]:
 805        """
 806        Return list of nodes with no dependencies.
 807        Root nodes will occur first in DAG's order of operations.
 808        Uses O(1) in_degree() instead of O(k) predecessors list creation.
 809        """
 810        return [node for node in self.graph.nodes if self.graph.in_degree(node) == 0]
 811
 812    @property
 813    def nodes(self) -> _T.Any:
 814        """Returns the NodeView of the underlying NetworkX graph containing all DAG nodes."""
 815        return self.graph.nodes
 816
 817    def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str:
 818        """
 819        Validate whether identifier given is a valid node within the DAG's scope.
 820        Primarily for internal use, but useful for retrieving string identifiers
 821        for a unique callable in a DAG.
 822        Uses O(1) cache lookup for callables instead of O(n) linear search.
 823        """
 824        if isinstance(identifier, str):
 825            # O(1) lookup using graph.nodes dict
 826            if identifier not in self.graph.nodes:
 827                err_msg = f"Node {identifier} is not defined in this DAG's scope."
 828                raise ValueError(err_msg)
 829            return identifier
 830        elif _iscallable(identifier):
 831            # O(1) lookup using _callable_to_id cache
 832            if identifier not in self._callable_to_id:
 833                err_msg = f"Callable {identifier} is not defined in this DAG's scope."
 834                raise ValueError(err_msg)
 835            cached_id = self._callable_to_id[identifier]
 836            if cached_id is None:
 837                err_msg = f"Callable {identifier.__name__} " \
 838                        + "exists more than once in this DAG. " \
 839                        + "Use the unique string identifier of the required node."
 840                raise ValueError(err_msg)
 841            return cached_id
 842        else:
 843            err_msg = f"Node {identifier} must be a string or a Python callable"
 844            raise TypeError(err_msg)
 845    
 846    def _update_node(self, nodedict: _T.Dict[str, _T.Any], id: str | None = None, force: bool = False) -> None:
 847        """
 848        Internal only. Update properties of single node within a DAG's scope, 
 849        given a well-structured dictionary and the tuple identifier of the network edge. 
 850        Force argument bypasses dictionary validation, and should only be used internally.
 851        """
 852        if id is not None:
 853            try:
 854                if nodedict.get('id') is not None and id != nodedict['id']:
 855                    raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {nodedict['id']}")
 856            except ValueError as e:
 857                if not force:
 858                    raise e
 859            nodedict['id'] = id
 860        
 861        expected_keys = ('callable', 'args', 'kwargs', 'name', 'id')
 862        if not set(expected_keys).issuperset(set(nodedict.keys())):
 863            raise ValueError(f"Expecting keys: {expected_keys}")
 864        
 865        if not force:
 866            if nodedict.get('callable'):
 867                if not _iscallable(nodedict['callable']):
 868                    err_msg = f"Requested node is not assigned a callable Python function."
 869                    raise TypeError(err_msg)
 870                if not nodedict.get('name'):
 871                    nodedict['name'] = nodedict['callable'].__name__
 872
 873            if nodedict.get('name') and (not isinstance(nodedict['name'], str) or len(nodedict['name']) == 0): 
 874                err_msg = f"Requested node name must be a non-null Python string, will default to callable when not set."
 875                raise TypeError(err_msg)
 876
 877            if nodedict.get('args'):
 878                if not (isinstance(nodedict['args'], _abc.Iterable) and not isinstance(nodedict['args'], str)):
 879                    err_msg = f"Requested node is not assigned a tuple of pos args."
 880                    raise TypeError(err_msg)
 881                nodedict['args'] = tuple(nodedict['args'])
 882            
 883            if nodedict.get('kwargs') and not isinstance(nodedict['kwargs'], dict):
 884                err_msg = f"Requested node is not assigned a dictionary of kw args."
 885                raise TypeError(err_msg)                    
 886        
 887        node_id = nodedict.pop('id')
 888        self.graph.nodes[node_id].update(nodedict)
 889
 890        # Reset node name if implicitly requested.
 891        if not nodedict.get('name'):
 892            self.graph.nodes[node_id]['name'] = self.graph.nodes[node_id]['callable'].__name__
 893
 894    def _subgraph(self, node_ids: set[str]) -> 'DAG':
 895        """
 896        Internal only. Given a set of nodes, returns a subset of the DAG containing
 897        only the node(s) and upstream dependencies.
 898        Uses nx.ancestors() for O(V+E) performance instead of manual traversal.
 899        """
 900        all_nodes = set(node_ids)
 901        for node in node_ids:
 902            all_nodes.update(_nx.ancestors(self.graph, node))
 903        subgraph = self.graph.subgraph(all_nodes)
 904        return DAG(subgraph, __force=True)
 905
 906    def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise:
 907        """
 908        Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope.
 909        Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope.
 910        """
 911        try:
 912            key = self.node_id(promise.key)
 913        except Exception as e:
 914            raise _xcoms.InvalidCallable(e)
 915        return _xcoms.Promise(key)
 916
 917    def __init__(self, graph: _T.Union[_nx.DiGraph, _nx.Graph, None] = None, **kwargs: _T.Any) -> None:
 918
 919        if graph is None:
 920            self.graph = _nx.DiGraph()
 921            self._callable_to_id = {}
 922            return
 923
 924        # Meant for internal use only, creating DAGs from subgraphs.
 925        if kwargs.get('__force') == True:
 926            self.graph = _T.cast(_nx.DiGraph, graph)
 927            # Build cache from existing subgraph
 928            self._callable_to_id = {}
 929            for node_id, data in self.graph.nodes(data=True):
 930                callable_fn = data.get('callable')
 931                if callable_fn is not None:
 932                    if callable_fn in self._callable_to_id:
 933                        self._callable_to_id[callable_fn] = None  # Duplicate
 934                    else:
 935                        self._callable_to_id[callable_fn] = node_id
 936            return
 937
 938        _edges.validate_dag(graph)
 939        # Type narrowing: validate_dag ensures graph is a DiGraph
 940        assert isinstance(graph, _nx.DiGraph)
 941
 942        # Define framework specific logic as nested dictionaries.
 943        for node in graph.nodes.keys():
 944            graph.nodes[node]['callable'] = node
 945            graph.nodes[node]['name'] = node.__name__
 946            graph.nodes[node]['args'] = []
 947            graph.nodes[node]['kwargs'] = {}
 948
 949        for e in graph.edges:
 950            condition = graph.edges[e].get('Condition')
 951            if condition is not None:
 952                if condition not in Condition:
 953                    raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.')
 954            else:
 955                graph.edges[e]['Condition'] = Condition.Success
 956
 957        graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node))
 958        for n in graph.nodes:
 959            graph.nodes[n]['id'] = n
 960        self.graph = _nx.DiGraph(graph)
 961
 962        # Build callable->id cache after relabeling
 963        self._callable_to_id = {}
 964        for node_id, data in self.graph.nodes(data=True):
 965            callable_fn = data.get('callable')
 966            if callable_fn is not None:
 967                if callable_fn in self._callable_to_id:
 968                    self._callable_to_id[callable_fn] = None  # Duplicate
 969                else:
 970                    self._callable_to_id[callable_fn] = node_id
 971
 972    class DAGExecutor:
 973        """
 974        Base class used to execute DAG in embarrassingly parallel.
 975        """
 976        def __init__(
 977            self,
 978            graph: _nx.DiGraph,
 979            roots: _T.List[str],
 980            print_exc: bool = False,
 981            max_workers: _T.Optional[int] = None,
 982            verbose: bool = True
 983        ) -> None:
 984
 985            # Skip execution if there are no callables in scope.
 986            if len(graph.nodes) == 0:
 987                if verbose:
 988                    print('Given DAG has no callables in scope. Skipping execution...')
 989                return
 990
 991            # Initialize local variables for execution.
 992            complete: set[str] = set()
 993            started: set[str] = set()
 994            failed: set[str] = set()
 995            skipped: set[str] = set()
 996            refs: _T.Dict[str, _futures.Future[_T.Any]] = {}
 997
 998            def raise_exc(message: str) -> None:
 999                raise ValueError(message)
1000
1001            def run_conditions_met(n: str) -> bool:
1002                for p in graph.predecessors(n):
1003                    match graph.edges[(p, n)]['Condition']:
1004                        case Condition.Success:
1005                            if p not in complete:
1006                                return False
1007                        case Condition.Failure:
1008                            if p not in failed:
1009                                return False
1010                        case Condition.AnyCase:
1011                            if p not in failed and p not in complete:
1012                                return False
1013                return True
1014
1015            skip = graph.nodes.data("skip", default=(False, False))
1016            retry: _T.Dict[str, _T.List[_T.Any]] = {
1017                n: [c + 1, {}] for n, c in graph.nodes.data("retry", default=0)
1018            }
1019
1020            # Start execution of root node functions.
1021            with _futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
1022
1023                def Submit(
1024                    event: str,
1025                    callable: _T.Callable[..., _T.Any],
1026                    name: str,
1027                    args: _T.Union[tuple[_T.Any, ...], list[_T.Any]],
1028                    kwargs: _T.Dict[str, _T.Any]
1029                ) -> _futures.Future[_T.Any]:
1030                    if skip[event][0]:
1031                        retry[event][0] = 0
1032                        skip_callable = raise_exc if skip[event][1] else print
1033                        return executor.submit(
1034                            _event_wrapper,
1035                            _callable=skip_callable,
1036                            _name=graph.nodes[event]['name'],
1037                            _args=[f"Event {event} skipped..."],
1038                            _kwargs={},
1039                            _verbose=verbose
1040                        )
1041                    else:
1042                        retry[event][0] -= 1
1043                        if (retry[event][0] > 0) and (retry[event][1] == {}):
1044                            retry[event][1] = {
1045                                'callable': callable,
1046                                'name': name,
1047                                'args': args,
1048                                'kwargs': kwargs
1049                            }
1050                        return executor.submit(
1051                            _event_wrapper,
1052                            _callable=callable,
1053                            _name=name,
1054                            _args=args,
1055                            _kwargs=kwargs,
1056                            _verbose=verbose
1057                        )
1058
1059                # Submit root nodes
1060                for event in roots:
1061                    refs[event] = Submit(
1062                        event,
1063                        graph.nodes[event]['callable'],
1064                        graph.nodes[event]['name'],
1065                        graph.nodes[event]['args'],
1066                        graph.nodes[event]['kwargs']
1067                    )
1068                    started.add(event)
1069
1070                # Process futures as they complete (no busy-wait)
1071                pending = set(refs.values())
1072                future_to_event = {v: k for k, v in refs.items()}
1073
1074                while (len(complete) + len(failed) + len(skipped)) != len(graph):
1075                    if not pending:
1076                        break
1077
1078                    # Wait for at least one future to complete
1079                    done, pending = _futures.wait(pending, return_when=_futures.FIRST_COMPLETED)
1080
1081                    for future in done:
1082                        event = future_to_event[future]
1083
1084                        if future.exception() is not None:
1085                            try:
1086                                raise future.exception()  # type: ignore
1087                            except:
1088                                if print_exc:
1089                                    _traceback.print_exc()
1090
1091                                if retry[event][0] > 0:
1092                                    if verbose:
1093                                        print(f"Event {event} exited with exception, retrying...")
1094                                    new_future = Submit(
1095                                        event,
1096                                        callable=retry[event][1]['callable'],
1097                                        name=retry[event][1]['name'],
1098                                        args=retry[event][1]['args'],
1099                                        kwargs=retry[event][1]['kwargs']
1100                                    )
1101                                    refs[event] = new_future
1102                                    pending.add(new_future)
1103                                    future_to_event[new_future] = event
1104                                    continue
1105
1106                                if verbose:
1107                                    print(f"Event {event} exited with exception...")
1108                                failed.add(event)
1109                                skipping = [
1110                                    e[1] for e in set(graph.out_edges(event))
1111                                    if not _edges.continue_on_failure(graph.edges[e])
1112                                ]
1113                        else:
1114                            complete.add(event)
1115                            skipping = [
1116                                e[1] for e in set(graph.out_edges(event))
1117                                if not _edges.continue_on_success(graph.edges[e])
1118                            ]
1119
1120                        skipped = skipped.union(skipping)
1121                        for n in skipping:
1122                            skipped = skipped.union(_nx.descendants(graph, n))
1123
1124                        carryon = set(graph.successors(event)).difference(skipped)
1125                        starting = [
1126                            successor for successor in carryon
1127                            if run_conditions_met(successor)
1128                        ]
1129
1130                        for successor in starting:
1131                            _args = [
1132                                a if not isinstance(a, _xcoms.Promise) else refs[_T.cast(str, a.key)].result()
1133                                for a in graph.nodes[successor]['args']
1134                            ]
1135                            _kwargs = {
1136                                k: v if not isinstance(v, _xcoms.Promise) else refs[_T.cast(str, v.key)].result()
1137                                for k, v in graph.nodes[successor]['kwargs'].items()
1138                            }
1139                            new_future = Submit(
1140                                successor,
1141                                graph.nodes[successor]['callable'],
1142                                graph.nodes[successor]['name'],
1143                                _args,
1144                                _kwargs
1145                            )
1146                            refs[successor] = new_future
1147                            pending.add(new_future)
1148                            future_to_event[new_future] = successor
1149                            started.add(successor)

Directed Acyclic Graph used to represent Pythonic tasks in parallel.

DAG( graph: Union[networkx.classes.digraph.DiGraph, networkx.classes.graph.Graph, NoneType] = None, **kwargs: Any)
917    def __init__(self, graph: _T.Union[_nx.DiGraph, _nx.Graph, None] = None, **kwargs: _T.Any) -> None:
918
919        if graph is None:
920            self.graph = _nx.DiGraph()
921            self._callable_to_id = {}
922            return
923
924        # Meant for internal use only, creating DAGs from subgraphs.
925        if kwargs.get('__force') == True:
926            self.graph = _T.cast(_nx.DiGraph, graph)
927            # Build cache from existing subgraph
928            self._callable_to_id = {}
929            for node_id, data in self.graph.nodes(data=True):
930                callable_fn = data.get('callable')
931                if callable_fn is not None:
932                    if callable_fn in self._callable_to_id:
933                        self._callable_to_id[callable_fn] = None  # Duplicate
934                    else:
935                        self._callable_to_id[callable_fn] = node_id
936            return
937
938        _edges.validate_dag(graph)
939        # Type narrowing: validate_dag ensures graph is a DiGraph
940        assert isinstance(graph, _nx.DiGraph)
941
942        # Define framework specific logic as nested dictionaries.
943        for node in graph.nodes.keys():
944            graph.nodes[node]['callable'] = node
945            graph.nodes[node]['name'] = node.__name__
946            graph.nodes[node]['args'] = []
947            graph.nodes[node]['kwargs'] = {}
948
949        for e in graph.edges:
950            condition = graph.edges[e].get('Condition')
951            if condition is not None:
952                if condition not in Condition:
953                    raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.')
954            else:
955                graph.edges[e]['Condition'] = Condition.Success
956
957        graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node))
958        for n in graph.nodes:
959            graph.nodes[n]['id'] = n
960        self.graph = _nx.DiGraph(graph)
961
962        # Build callable->id cache after relabeling
963        self._callable_to_id = {}
964        for node_id, data in self.graph.nodes(data=True):
965            callable_fn = data.get('callable')
966            if callable_fn is not None:
967                if callable_fn in self._callable_to_id:
968                    self._callable_to_id[callable_fn] = None  # Duplicate
969                else:
970                    self._callable_to_id[callable_fn] = node_id
graph: networkx.classes.digraph.DiGraph
def add_node(self, node: Any, *args: Any, **kwargs: Any) -> str:
224    def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str:
225        """
226        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
227        On success, return unique identifier for the new node.
228        """
229        if not _iscallable(node):
230            err_msg = f"{node}: requested node is not a callable Python function."
231            raise TypeError(err_msg)
232
233        node_name = _edges.label_node(node)
234
235        args = tuple(
236            arg if not isinstance(arg, _xcoms.Promise) else self.resolve_promise(arg)
237            for arg in args
238        )
239
240        for k, val in kwargs.items():
241            if isinstance(val, _xcoms.Promise):
242                kwargs[k] = self.resolve_promise(val)
243
244        self.graph.add_node(
245            node_for_adding=node_name,
246            callable=node, args=args, kwargs=kwargs,
247            name=node.__name__,
248            id=node_name
249        )
250
251        # Populate callable->id cache for O(1) lookup
252        # If callable already exists, mark as None to indicate duplicates
253        if node in self._callable_to_id:
254            self._callable_to_id[node] = None  # Multiple occurrences - must use string ID
255        else:
256            self._callable_to_id[node] = node_name
257
258        return node_name

Adds a callable with positional and keyword arguments to the DAG's underlying graph. On success, return unique identifier for the new node.

def add_edge( self, u_of_edge: Union[str, Callable], v_of_edge: Union[str, Callable], continue_on: Condition = <Condition.Success: 0>) -> Tuple[str, str]:
261    def add_edge(
262            self, 
263            u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable],
264            continue_on: Condition = Condition.Success
265        ) -> _T.Tuple[str,str]:
266        """
267        Adds an edge between nodes in the DAG's underlying graph,
268        so long as the requested edge is unique and has not been added previously.
269
270        On success, returns Tuple of the new edge's unique identifiers.
271        """
272        # Validate inputs prior to execution
273        # - Nodes must be identifiers or callables
274        # - Conditions must belong to the webber.edges.Condition class
275        if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)):
276            err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable"
277            raise TypeError(err_msg)
278        if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)):
279            err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable"
280            raise TypeError(err_msg)
281        if not isinstance(continue_on, Condition):
282            raise TypeError("Edge conditions must use the webber.edges.Condition class.")
283
284        # Base Case 0: No nodes are present in the DAG:
285        # Ensure that both nodes are callables, then add both to the graph and
286        # assign the outgoing node as a root.
287        if len(self.graph.nodes()) == 0:
288            if not _iscallable(u_of_edge):
289                err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope."
290                raise ValueError(err_msg)
291            if not _iscallable(v_of_edge):
292                err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope."
293                raise ValueError(err_msg)
294            outgoing_node = self.add_node(u_of_edge)
295            incoming_node = self.add_node(v_of_edge)
296            self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
297            return (outgoing_node, incoming_node)
298
299        new_callables: _T.Dict[_T.Callable, _T.Callable] = {}
300
301        if _iscallable(u_of_edge) and _iscallable(v_of_edge):
302            # Type narrowing: we've verified both are callables
303            u_callable = _T.cast(_T.Callable, u_of_edge)
304            v_callable = _T.cast(_T.Callable, v_of_edge)
305
306            # Error Cases 0, 1: Either of the callables appear more than once in the DAG.
307            if self._callable_status(u_callable) == 'many':
308                err_msg = f"Callable {u_callable.__name__} " \
309                        + "exists more than once in this DAG. " \
310                        + "Use the unique string identifier of the required node."
311                raise ValueError(err_msg)
312            if self._callable_status(v_callable) == 'many':
313                err_msg = f"Callable {v_callable.__name__} " \
314                        + "exists more than once in this DAG. " \
315                        + "Use the unique string identifier of the required node."
316                raise ValueError(err_msg)
317
318            # Base Case 1: Both args are callables and will be present in the DAG scope no more than once.
319            # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes.
320            cached_u = self._callable_to_id.get(u_callable)
321            if cached_u is not None:
322                outgoing_node = cached_u
323            else:
324                new_callables[u_callable] = u_callable
325                outgoing_node = _edges.label_node(u_callable)
326
327            cached_v = self._callable_to_id.get(v_callable)
328            if cached_v is not None:
329                incoming_node = cached_v
330            else:
331                new_callables[v_callable] = v_callable
332                incoming_node = _edges.label_node(v_callable)
333
334        else:
335
336            # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope.
337            if isinstance(u_of_edge, str) and u_of_edge not in self.graph.nodes:
338                err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope."
339                raise ValueError(err_msg)
340            if isinstance(v_of_edge, str) and v_of_edge not in self.graph.nodes:
341                err_msg = f"Incoming node {v_of_edge} not in DAG's current scope."
342                raise ValueError(err_msg)
343
344            # Both nodes' unique identifiers are present in the DAG
345            # and should be evaluated for a valid edge.
346            if isinstance(u_of_edge, str) and isinstance(v_of_edge, str):
347                outgoing_node = u_of_edge
348                incoming_node = v_of_edge
349
350            # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier.
351            else:
352                for node in (u_of_edge, v_of_edge):
353                    if node == u_of_edge:
354                        outgoing_node = self._assign_node(node, new_callables)
355                    else:
356                        incoming_node = self._assign_node(node, new_callables)
357
358        # Error Case 5: Both callables exist only once in the DAG,
359        # but an edge already exists between them. O(1) lookup with has_edge().
360        if self.graph.has_edge(outgoing_node, incoming_node):
361            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \
362                    + "a definition in this DAG."
363            raise ValueError(err_msg)
364
365        # Error Case 6: Adding an edge would create circular dependencies.
366        # Use has_path() for O(V+E) incremental check instead of rebuilding entire graph.
367        if incoming_node in self.graph and _nx.has_path(self.graph, incoming_node, outgoing_node):
368            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \
369                    + "results in circular dependencies."
370            raise ValueError(err_msg)
371
372        # We can now add the edge to the DAG, since we are certain it will not result in
373        # illegal dependencies/behavior.
374        # First, we should account for potential new nodes. This also handles
375        # duplicates on first entry to the DAG (e.g.: edge == (print, print))
376        if _iscallable(u_of_edge) and new_callables.get(_T.cast(_T.Callable, u_of_edge)) is not None:
377            outgoing_node = self.add_node(new_callables[_T.cast(_T.Callable, u_of_edge)])
378        if _iscallable(v_of_edge) and new_callables.get(_T.cast(_T.Callable, v_of_edge)) is not None:
379            incoming_node = self.add_node(new_callables[_T.cast(_T.Callable, v_of_edge)])
380
381        # Then we can add the new edge.
382        self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
383        return (outgoing_node, incoming_node)

Adds an edge between nodes in the DAG's underlying graph, so long as the requested edge is unique and has not been added previously.

On success, returns Tuple of the new edge's unique identifiers.

def remove_node(self, *posargs, **kwargs) -> None:
385    def remove_node(self, *posargs, **kwargs) -> None:
386        """
387        Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG.
388        Throws error message and recommends safer methods.
389        """
390        raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.")

Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG. Throws error message and recommends safer methods.

def remove_edge( self, u_of_edge: Union[str, Callable], v_of_edge: Union[str, Callable]) -> Tuple[str, str]:
392    def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]:
393        """
394        Removes an directed edge between nodes in the DAG's underlying graph.
395        Throws error if the edge does not exist.
396        On success, returns Tuple of the removed edge's unique identifiers.
397        """
398        edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge))
399        if edge_id not in self.graph.edges(data = False):
400            err_msg = "Requested edge does not exist in the DAG's scope"
401            raise ValueError(err_msg)
402        self.graph.remove_edge(edge_id[0], edge_id[1])
403        return edge_id

Removes an directed edge between nodes in the DAG's underlying graph. Throws error if the edge does not exist. On success, returns Tuple of the removed edge's unique identifiers.

def update_edges( self, *E: Any, continue_on: Condition | None = None, filter: function | None = None, data: Union[Dict[str, Any], webber.edges.edgedict, NoneType] = None) -> None:
405    def update_edges(self, *E: _T.Any, continue_on: Condition | None = None, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | edgedict | None = None) -> None:
406        """
407        Flexible function to update properties of edges in the DAG's scope, 
408        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
409        lambda filter using the edgedict syntax.
410        
411        List of nodes to update or filter argument is expected. Valid edge lists include:
412
413        \t update_edges((node1, node2), ...)       or update_edges([(node1, node2)], ...)
414
415        \t update_edges((node1, callable2), ...)   or update_edges([(node1, callable2)], ...)
416        
417        \t update_edges(edgedict),                 where isinstance(edgedict, webber.edges.edgedict) == True
418
419        Parameters:
420
421        > continue_on: webber.edges.Condition value to update matching edges with, 
422        
423        \t Execute child on success, failure, or any exit state, of the parent node
424
425        > filter: lambda property that can be used instead of a list of edges to be updated.
426        
427        \t filter = (lambda e: n.parent == print or e.child == print)
428
429        > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set.
430
431        \t data = { 'continue_on': webber.edges.Condition }
432
433        """
434        if len(E) == 0 and filter == None:
435            raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.")
436
437        elif isinstance(E, dict) or isinstance(E, edgedict):
438            E = [E]
439
440        elif len(E) == 1 and isinstance(E[0], _abc.Iterable):
441            try:
442                _ = self.get_edges(E[0])
443            except:
444                E = E[0]
445
446        if filter is not None:
447            edge_ids = self.filter_edges(filter, data = False)
448        else:
449            if isinstance(E[0], dict) or isinstance(E[0], edgedict):
450                try:
451                    ids = [e['id'] for e in E]
452                except KeyError:
453                    err_msg = 'In dictionary form, all given edges must be follow edgedict standards.'
454                    raise ValueError(err_msg)
455            else:
456                ids = E
457            edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids]
458
459        std_update = (continue_on is None)
460
461        if std_update:
462            for edge_id, e in zip(edge_ids, E):
463                if data is not None:
464                    self._update_edge(data, id = edge_id)
465                else:
466                    self._update_edge(e, id = edge_id)
467        
468        else:
469            if continue_on is not None:
470                if not isinstance(continue_on, Condition):
471                    err_msg = f"Condition assignment must use webber.edges.Condition"
472                    raise TypeError(err_msg)
473                for e in edge_ids:
474                    self.graph.edges[e]['Condition'] = continue_on

Flexible function to update properties of edges in the DAG's scope, based on unique identifier(s) (e.g.: string IDs or unique callables) or a lambda filter using the edgedict syntax.

List of nodes to update or filter argument is expected. Valid edge lists include:

     update_edges((node1, node2), ...)       or update_edges([(node1, node2)], ...)

     update_edges((node1, callable2), ...)   or update_edges([(node1, callable2)], ...)

     update_edges(edgedict),                 where isinstance(edgedict, webber.edges.edgedict) == True

Parameters:

continue_on: webber.edges.Condition value to update matching edges with,

     Execute child on success, failure, or any exit state, of the parent node

filter: lambda property that can be used instead of a list of edges to be updated.

     filter = (lambda e: n.parent == print or e.child == print)

data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set.

     data = { 'continue_on': webber.edges.Condition }
def relabel_node(self, node: Union[str, Callable], label: str) -> str:
521    def relabel_node(self, node: _T.Union[str, _T.Callable], label: str) -> str:
522        """
523        Update the label, or name, given to a node in the DAG's scope, 
524        given a Python string and a node identifier. 
525        Well-structured wrapper for a common use-case of DAG.update_nodes.
526        """
527        node_id = self.node_id(node)
528        if not isinstance(label, str) or len(label) == 0:
529            err_msg = "Node label must be a Python string with one or more characters."
530            raise ValueError(err_msg)
531        self.update_nodes(node_id, data = {'name': label})
532        return label

Update the label, or name, given to a node in the DAG's scope, given a Python string and a node identifier. Well-structured wrapper for a common use-case of DAG.update_nodes.

def update_nodes( self, *N: Any, filter: function | None = None, data: Union[Dict[str, Any], webber.edges.dotdict, NoneType] = None, callable: Optional[Callable] = None, args: Optional[Iterable[Any]] = None, kwargs: Optional[Dict[str, Any]] = None) -> None:
534    def update_nodes(self, *N: _T.Any, filter: _types.LambdaType | None = None, data: _T.Dict[str, _T.Any] | dotdict | None = None, callable: _T.Callable | None = None, args: _T.Iterable[_T.Any] | None = None, kwargs: _T.Dict[str, _T.Any] | None = None) -> None:
535        """
536        Flexible function to update properties of nodes in the DAG's scope, 
537        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
538        lambda filter using the dotdict syntax.
539        
540        List of nodes to update or filter argument is expected. Valid node lists include:
541
542        \t update_nodes(node_id, ...)           or update_nodes([node_id], ...)
543
544        \t update_nodes(callable, ...)          or update_nodes([callable], ...)
545        
546        \t update_nodes(node1, node2, ...)      or update_nodes([node1, node2], ...)
547        
548        \t update_nodes(node1, callable2, ...)  or update_nodes([node1, callable2], ...)
549
550        > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id)
551
552        Parameters:
553
554        > filter: lambda property that can be used instead of a list of nodes to be updated.
555        
556        \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args)
557
558        > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set.
559            Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method.
560            
561        \t data = {
562            \t 'callable': print,
563            \t 'args': ['Hello', 'World'],
564            \t 'kwargs': {'sep': ', '},
565            \t 'name': 'custom_label',
566            \t 'id': 'unique-identifier'
567            \t}
568
569        > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List).
570
571        > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary.
572
573        """
574        if len(N) == 0 and filter == None:
575            raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.")
576
577        elif len(N) > 0 and filter is not None:
578            raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.")
579
580        elif isinstance(N, dict) or isinstance(N, str):
581            N = [N]
582
583        elif len(N) == 1 and isinstance(N[0], _abc.Iterable):
584            if isinstance(N[0][0], dict):
585                N = N[0]
586            elif isinstance(N[0][0], str):
587                # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator).
588                if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 
589                    N = N[0]
590
591
592        if filter is not None:
593            node_ids = self.filter_nodes(filter, data = False)
594        else:
595            if isinstance(N[0], dict):
596                ids = [n['id'] for n in N]
597            else:
598                ids = N
599            node_ids = [self.node_id(i) for i in ids]
600        
601        std_update = (callable == None) and (args == None) and (kwargs == None)
602
603        if std_update:
604            for node_id, n in zip(node_ids, N):
605                if data is not None:
606                    self._update_node(data, id = node_id)
607                else:
608                    self._update_node(n, id = node_id)
609        
610        else:
611            if callable is not None:
612                if not _iscallable(callable):
613                    err_msg = f"Requested node is not assigned a callable Python function."
614                    raise TypeError(err_msg)
615                for node_id in node_ids:
616                    self.graph.nodes[node_id]['callable'] = callable
617                    self.graph.nodes[node_id]['name'] = callable.__name__
618            
619            if args is not None:
620                if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)):
621                    err_msg = f"Requested node is not assigned a tuple of pos args."
622                    raise TypeError(err_msg)
623                args = tuple(args)
624                for node_id in node_ids:
625                    self.graph.nodes[node_id]['args'] = args
626            
627            if kwargs is not None:
628                if not isinstance(kwargs, dict):
629                    err_msg = f"Requested node is not assigned a dictionary of kw args."
630                    raise TypeError(err_msg)
631                for node_id in node_ids:
632                    self.graph.nodes[node_id]['kwargs'] = kwargs

Flexible function to update properties of nodes in the DAG's scope, based on unique identifier(s) (e.g.: string IDs or unique callables) or a lambda filter using the dotdict syntax.

List of nodes to update or filter argument is expected. Valid node lists include:

     update_nodes(node_id, ...)           or update_nodes([node_id], ...)

     update_nodes(callable, ...)          or update_nodes([callable], ...)

     update_nodes(node1, node2, ...)      or update_nodes([node1, node2], ...)

     update_nodes(node1, callable2, ...)  or update_nodes([node1, callable2], ...)

update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id)

Parameters:

filter: lambda property that can be used instead of a list of nodes to be updated.

     filter = (lambda n: n.callable == print or 'Hello, World' in n.args)

data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set. Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method.

     data = {
     'callable': print,
     'args': ['Hello', 'World'],
     'kwargs': {'sep': ', '},
     'name': 'custom_label',
     'id': 'unique-identifier'
    }

args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List).

kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary.

def get_edges( self, *N: Any, data: bool = True) -> Union[List[webber.edges.edgedict], List[Tuple[str, str]]]:
634    def get_edges(self, *N: _T.Any, data: bool = True) -> _T.Union[_T.List[edgedict], _T.List[_T.Tuple[str, str]]]:
635        """
636        Retrieval function for DAG edge data, based on tuple identifiers.
637        Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2']))
638        """
639        if len(N) == 0:
640            if data:
641                return [edgedict(u, v, **d) for u, v, d in self.graph.edges.data()]
642            return list(self.graph.edges.data(data=False))
643            
644        # elif len(N) == 1:
645        #     if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple):
646        #         N = N[0]
647
648        if len(N) != len(set(N)) or not all(isinstance(n, _abc.Iterable) and len(n) == 2 for n in N):
649            err_msg = 'All requested edges must be unique tuples of size 2.'
650            raise ValueError(err_msg)
651    
652        edge_data = [self.get_edge(o, i, data=data) for (o, i) in N]
653        return edge_data

Retrieval function for DAG edge data, based on tuple identifiers. Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2']))

def get_edge( self, outgoing_node: Union[str, Callable], incoming_node: Union[str, Callable], data: bool = True) -> Union[webber.edges.edgedict, Tuple[str, str]]:
655    def get_edge(self, outgoing_node: _T.Union[str, _T.Callable], incoming_node: _T.Union[str, _T.Callable], data: bool = True) -> _T.Union[edgedict, _T.Tuple[str, str]]:
656        """
657        Retrieval function for a single directed edge between nodes in a DAG's scope. 
658        """
659        id = (self.node_id(outgoing_node), self.node_id(incoming_node))
660        if not data:
661            return id
662        edge_data = self.graph.get_edge_data(u = id[0], v = id[1])
663        if not edge_data:
664            err_msg = f'No match found for the directed edge requested: {id}'
665            raise ValueError(err_msg)
666        assert edge_data is not None  # Type narrowing for Pylance
667        return edgedict(*id, **edge_data)

Retrieval function for a single directed edge between nodes in a DAG's scope.

def get_node(self, n: Union[str, Callable]) -> webber.edges.dotdict:
669    def get_node(self, n: _T.Union[str, _T.Callable]) -> dotdict:
670        """
671        Given a unique identifier, returns a dictionary of node metadata
672        for a single node in the DAG's scope.
673        """
674        node_id = self.node_id(n)
675        return dotdict(self.graph.nodes[node_id])

Given a unique identifier, returns a dictionary of node metadata for a single node in the DAG's scope.

def get_nodes(self, *N: Any) -> List[webber.edges.dotdict]:
677    def get_nodes(self, *N: _T.Any) -> _T.List[dotdict]:
678        """
679        Flexible function to retrieve DAG node data, based on node identifiers 
680        (e.g.: string IDs or unique callables).
681        """
682        if len(N) == 0:
683            node_data = list(self.graph.nodes.values())
684            return [dotdict(d) for d in node_data]
685
686        elif len(N) == 1:
687            if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str):
688                N = N[0]
689            else:
690                node_id = self.node_id(N[0])
691                node_data = [dotdict(self.graph.nodes[node_id])]
692                return node_data
693            
694        if not len(N) == len(set(N)):
695            err_msg = 'All requested nodes must be unique identifiers.'
696            raise ValueError(err_msg)
697        
698        node_ids  = [self.node_id(n) for n in N]
699        node_data = [dotdict(self.graph.nodes[n]) for n in node_ids]
700        return node_data

Flexible function to retrieve DAG node data, based on node identifiers (e.g.: string IDs or unique callables).

def filter_nodes( self, filter: function, data: bool = False) -> Union[List[str], List[webber.edges.dotdict]]:
702    def filter_nodes(self, filter: _types.LambdaType, data: bool = False) -> _T.List[str] | _T.List[dotdict]:
703        """
704        Given a lambda function, filter nodes in a DAG's scope based on its attributes.
705        Current limitation: Filters must use node identifier strings when referencing nodes.
706        Use get_nodes for more flexible controls.
707        """
708        if not data:
709            return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))]
710        return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))]

Given a lambda function, filter nodes in a DAG's scope based on its attributes. Current limitation: Filters must use node identifier strings when referencing nodes. Use get_nodes for more flexible controls.

def filter_edges( self, filter: function, data: bool = False) -> Union[List[Tuple[str, str]], List[webber.edges.edgedict]]:
712    def filter_edges(self, filter: _types.LambdaType, data: bool = False) -> _T.List[_T.Tuple[str, str]] | _T.List[edgedict]:
713        """
714        Given a lambda function, filter edges in a DAG's scope based on its attributes.
715        Current limitation: Filters must use node identifier strings when referencing nodes.
716        Use get_edges for more flexible controls.
717        """
718        if not data:
719            return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
720        return [edgedict(*e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))]

Given a lambda function, filter edges in a DAG's scope based on its attributes. Current limitation: Filters must use node identifier strings when referencing nodes. Use get_edges for more flexible controls.

def retry_node(self, identifier: Union[str, Callable], count: int) -> None:
722    def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int) -> None:
723        """
724        Given a node identifier, set number of automatic retries in case of failure.
725        Re-attempts will begin as soon as possible.
726        """
727        if not isinstance(count, int) or not count >= 0:
728            raise ValueError("Retry count must be a non-negative integer.")
729        node = self.node_id(identifier)
730        self.graph.nodes[node]['retry'] = count

Given a node identifier, set number of automatic retries in case of failure. Re-attempts will begin as soon as possible.

def skip_node( self, identifier: Union[str, Callable], skip: bool = True, as_failure: bool = False) -> None:
732    def skip_node(self, identifier: _T.Union[str, _T.Callable], skip: bool = True, as_failure: bool = False) -> None:
733        """
734        Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error).
735        Allows conditional control and testing over DAG's order of operations.
736        """
737        if not isinstance(skip, bool):
738            raise ValueError("Skip argument must be a boolean value.")
739        node = self.node_id(identifier)
740        self.graph.nodes[node]['skip'] = (skip, as_failure)

Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error). Allows conditional control and testing over DAG's order of operations.

def critical_path( self, nodes: Union[str, Callable, Iterable[Union[str, Callable]]]) -> DAG:
742    def critical_path(self, nodes: _T.Union[str, _T.Callable, _T.Iterable[_T.Union[str, _T.Callable]]]) -> 'DAG':
743        """
744        Given a set of nodes, returns a subset of the DAG containing
745        only the node(s) and its parents, or upstream dependencies.
746        """
747        if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str):
748            node_ids = {self.node_id(n) for n in nodes}
749        else:
750            node_ids = {self.node_id(nodes)}
751        return self._subgraph(node_ids)

Given a set of nodes, returns a subset of the DAG containing only the node(s) and its parents, or upstream dependencies.

def execute( self, return_ref: bool = False, print_exc: bool = False, max_workers: Optional[int] = None, verbose: bool = True) -> Optional[DAG.DAGExecutor]:
753    def execute(
754        self,
755        return_ref: bool = False,
756        print_exc: bool = False,
757        max_workers: _T.Optional[int] = None,
758        verbose: bool = True
759    ) -> _T.Optional['DAG.DAGExecutor']:
760        """
761        Basic wrapper for execution of the DAG's underlying callables.
762
763        Args:
764            return_ref: If True, return the DAGExecutor instance.
765            print_exc: If True, print full exception tracebacks.
766            max_workers: Maximum number of worker threads. Defaults to None (auto).
767            verbose: If True, log task start/completion messages. Defaults to True.
768        """
769        executor = self.DAGExecutor(
770            self.graph, self.root, print_exc,
771            max_workers=max_workers, verbose=verbose
772        )
773        return executor if return_ref else None

Basic wrapper for execution of the DAG's underlying callables.

Args: return_ref: If True, return the DAGExecutor instance. print_exc: If True, print full exception tracebacks. max_workers: Maximum number of worker threads. Defaults to None (auto). verbose: If True, log task start/completion messages. Defaults to True.

def visualize(self, type: Optional[Literal['gui', 'browser', 'plt']] = None) -> Any:
775    def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] | None = None) -> _T.Any:
776        """
777        Basic wrapper to visualize DAG using Vis.js and NetGraph libraries.
778        By default, visualization library only loaded in after DAG.visualize() is called, halving import times.
779        """
780        import webber.viz as _viz
781
782        match type:
783            case 'browser':
784                _viz.visualize_browser(self.graph)
785
786            case 'plt':
787                return _viz.visualize_plt(self.graph)
788
789            case 'gui':
790                # _visualize_gui(self.graph)
791                raise NotImplementedError
792
793            case None: 
794                if _viz._in_notebook():
795                    return _viz.visualize_plt(self.graph)
796                else:
797                    _viz.visualize_browser(self.graph)
798
799            case _:
800                err_msg = "Unknown visualization type requested."
801                raise NotImplementedError(err_msg)

Basic wrapper to visualize DAG using Vis.js and NetGraph libraries. By default, visualization library only loaded in after DAG.visualize() is called, halving import times.

root: List[str]
803    @property
804    def root(self) -> _T.List[str]:
805        """
806        Return list of nodes with no dependencies.
807        Root nodes will occur first in DAG's order of operations.
808        Uses O(1) in_degree() instead of O(k) predecessors list creation.
809        """
810        return [node for node in self.graph.nodes if self.graph.in_degree(node) == 0]

Return list of nodes with no dependencies. Root nodes will occur first in DAG's order of operations. Uses O(1) in_degree() instead of O(k) predecessors list creation.

nodes: Any
812    @property
813    def nodes(self) -> _T.Any:
814        """Returns the NodeView of the underlying NetworkX graph containing all DAG nodes."""
815        return self.graph.nodes

Returns the NodeView of the underlying NetworkX graph containing all DAG nodes.

def node_id(self, identifier: Union[str, Callable]) -> str:
817    def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str:
818        """
819        Validate whether identifier given is a valid node within the DAG's scope.
820        Primarily for internal use, but useful for retrieving string identifiers
821        for a unique callable in a DAG.
822        Uses O(1) cache lookup for callables instead of O(n) linear search.
823        """
824        if isinstance(identifier, str):
825            # O(1) lookup using graph.nodes dict
826            if identifier not in self.graph.nodes:
827                err_msg = f"Node {identifier} is not defined in this DAG's scope."
828                raise ValueError(err_msg)
829            return identifier
830        elif _iscallable(identifier):
831            # O(1) lookup using _callable_to_id cache
832            if identifier not in self._callable_to_id:
833                err_msg = f"Callable {identifier} is not defined in this DAG's scope."
834                raise ValueError(err_msg)
835            cached_id = self._callable_to_id[identifier]
836            if cached_id is None:
837                err_msg = f"Callable {identifier.__name__} " \
838                        + "exists more than once in this DAG. " \
839                        + "Use the unique string identifier of the required node."
840                raise ValueError(err_msg)
841            return cached_id
842        else:
843            err_msg = f"Node {identifier} must be a string or a Python callable"
844            raise TypeError(err_msg)

Validate whether identifier given is a valid node within the DAG's scope. Primarily for internal use, but useful for retrieving string identifiers for a unique callable in a DAG. Uses O(1) cache lookup for callables instead of O(n) linear search.

def resolve_promise(self, promise: webber.xcoms.Promise) -> webber.xcoms.Promise:
906    def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise:
907        """
908        Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope.
909        Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope.
910        """
911        try:
912            key = self.node_id(promise.key)
913        except Exception as e:
914            raise _xcoms.InvalidCallable(e)
915        return _xcoms.Promise(key)

Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope. Raises webber.xcoms.InvalidCallable if Promise requests a callable that is out of scope.

class DAG.DAGExecutor:
 972    class DAGExecutor:
 973        """
 974        Base class used to execute DAG in embarrassingly parallel.
 975        """
 976        def __init__(
 977            self,
 978            graph: _nx.DiGraph,
 979            roots: _T.List[str],
 980            print_exc: bool = False,
 981            max_workers: _T.Optional[int] = None,
 982            verbose: bool = True
 983        ) -> None:
 984
 985            # Skip execution if there are no callables in scope.
 986            if len(graph.nodes) == 0:
 987                if verbose:
 988                    print('Given DAG has no callables in scope. Skipping execution...')
 989                return
 990
 991            # Initialize local variables for execution.
 992            complete: set[str] = set()
 993            started: set[str] = set()
 994            failed: set[str] = set()
 995            skipped: set[str] = set()
 996            refs: _T.Dict[str, _futures.Future[_T.Any]] = {}
 997
 998            def raise_exc(message: str) -> None:
 999                raise ValueError(message)
1000
1001            def run_conditions_met(n: str) -> bool:
1002                for p in graph.predecessors(n):
1003                    match graph.edges[(p, n)]['Condition']:
1004                        case Condition.Success:
1005                            if p not in complete:
1006                                return False
1007                        case Condition.Failure:
1008                            if p not in failed:
1009                                return False
1010                        case Condition.AnyCase:
1011                            if p not in failed and p not in complete:
1012                                return False
1013                return True
1014
1015            skip = graph.nodes.data("skip", default=(False, False))
1016            retry: _T.Dict[str, _T.List[_T.Any]] = {
1017                n: [c + 1, {}] for n, c in graph.nodes.data("retry", default=0)
1018            }
1019
1020            # Start execution of root node functions.
1021            with _futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
1022
1023                def Submit(
1024                    event: str,
1025                    callable: _T.Callable[..., _T.Any],
1026                    name: str,
1027                    args: _T.Union[tuple[_T.Any, ...], list[_T.Any]],
1028                    kwargs: _T.Dict[str, _T.Any]
1029                ) -> _futures.Future[_T.Any]:
1030                    if skip[event][0]:
1031                        retry[event][0] = 0
1032                        skip_callable = raise_exc if skip[event][1] else print
1033                        return executor.submit(
1034                            _event_wrapper,
1035                            _callable=skip_callable,
1036                            _name=graph.nodes[event]['name'],
1037                            _args=[f"Event {event} skipped..."],
1038                            _kwargs={},
1039                            _verbose=verbose
1040                        )
1041                    else:
1042                        retry[event][0] -= 1
1043                        if (retry[event][0] > 0) and (retry[event][1] == {}):
1044                            retry[event][1] = {
1045                                'callable': callable,
1046                                'name': name,
1047                                'args': args,
1048                                'kwargs': kwargs
1049                            }
1050                        return executor.submit(
1051                            _event_wrapper,
1052                            _callable=callable,
1053                            _name=name,
1054                            _args=args,
1055                            _kwargs=kwargs,
1056                            _verbose=verbose
1057                        )
1058
1059                # Submit root nodes
1060                for event in roots:
1061                    refs[event] = Submit(
1062                        event,
1063                        graph.nodes[event]['callable'],
1064                        graph.nodes[event]['name'],
1065                        graph.nodes[event]['args'],
1066                        graph.nodes[event]['kwargs']
1067                    )
1068                    started.add(event)
1069
1070                # Process futures as they complete (no busy-wait)
1071                pending = set(refs.values())
1072                future_to_event = {v: k for k, v in refs.items()}
1073
1074                while (len(complete) + len(failed) + len(skipped)) != len(graph):
1075                    if not pending:
1076                        break
1077
1078                    # Wait for at least one future to complete
1079                    done, pending = _futures.wait(pending, return_when=_futures.FIRST_COMPLETED)
1080
1081                    for future in done:
1082                        event = future_to_event[future]
1083
1084                        if future.exception() is not None:
1085                            try:
1086                                raise future.exception()  # type: ignore
1087                            except:
1088                                if print_exc:
1089                                    _traceback.print_exc()
1090
1091                                if retry[event][0] > 0:
1092                                    if verbose:
1093                                        print(f"Event {event} exited with exception, retrying...")
1094                                    new_future = Submit(
1095                                        event,
1096                                        callable=retry[event][1]['callable'],
1097                                        name=retry[event][1]['name'],
1098                                        args=retry[event][1]['args'],
1099                                        kwargs=retry[event][1]['kwargs']
1100                                    )
1101                                    refs[event] = new_future
1102                                    pending.add(new_future)
1103                                    future_to_event[new_future] = event
1104                                    continue
1105
1106                                if verbose:
1107                                    print(f"Event {event} exited with exception...")
1108                                failed.add(event)
1109                                skipping = [
1110                                    e[1] for e in set(graph.out_edges(event))
1111                                    if not _edges.continue_on_failure(graph.edges[e])
1112                                ]
1113                        else:
1114                            complete.add(event)
1115                            skipping = [
1116                                e[1] for e in set(graph.out_edges(event))
1117                                if not _edges.continue_on_success(graph.edges[e])
1118                            ]
1119
1120                        skipped = skipped.union(skipping)
1121                        for n in skipping:
1122                            skipped = skipped.union(_nx.descendants(graph, n))
1123
1124                        carryon = set(graph.successors(event)).difference(skipped)
1125                        starting = [
1126                            successor for successor in carryon
1127                            if run_conditions_met(successor)
1128                        ]
1129
1130                        for successor in starting:
1131                            _args = [
1132                                a if not isinstance(a, _xcoms.Promise) else refs[_T.cast(str, a.key)].result()
1133                                for a in graph.nodes[successor]['args']
1134                            ]
1135                            _kwargs = {
1136                                k: v if not isinstance(v, _xcoms.Promise) else refs[_T.cast(str, v.key)].result()
1137                                for k, v in graph.nodes[successor]['kwargs'].items()
1138                            }
1139                            new_future = Submit(
1140                                successor,
1141                                graph.nodes[successor]['callable'],
1142                                graph.nodes[successor]['name'],
1143                                _args,
1144                                _kwargs
1145                            )
1146                            refs[successor] = new_future
1147                            pending.add(new_future)
1148                            future_to_event[new_future] = successor
1149                            started.add(successor)

Base class used to execute DAG in embarrassingly parallel.

DAG.DAGExecutor( graph: networkx.classes.digraph.DiGraph, roots: List[str], print_exc: bool = False, max_workers: Optional[int] = None, verbose: bool = True)
 976        def __init__(
 977            self,
 978            graph: _nx.DiGraph,
 979            roots: _T.List[str],
 980            print_exc: bool = False,
 981            max_workers: _T.Optional[int] = None,
 982            verbose: bool = True
 983        ) -> None:
 984
 985            # Skip execution if there are no callables in scope.
 986            if len(graph.nodes) == 0:
 987                if verbose:
 988                    print('Given DAG has no callables in scope. Skipping execution...')
 989                return
 990
 991            # Initialize local variables for execution.
 992            complete: set[str] = set()
 993            started: set[str] = set()
 994            failed: set[str] = set()
 995            skipped: set[str] = set()
 996            refs: _T.Dict[str, _futures.Future[_T.Any]] = {}
 997
 998            def raise_exc(message: str) -> None:
 999                raise ValueError(message)
1000
1001            def run_conditions_met(n: str) -> bool:
1002                for p in graph.predecessors(n):
1003                    match graph.edges[(p, n)]['Condition']:
1004                        case Condition.Success:
1005                            if p not in complete:
1006                                return False
1007                        case Condition.Failure:
1008                            if p not in failed:
1009                                return False
1010                        case Condition.AnyCase:
1011                            if p not in failed and p not in complete:
1012                                return False
1013                return True
1014
1015            skip = graph.nodes.data("skip", default=(False, False))
1016            retry: _T.Dict[str, _T.List[_T.Any]] = {
1017                n: [c + 1, {}] for n, c in graph.nodes.data("retry", default=0)
1018            }
1019
1020            # Start execution of root node functions.
1021            with _futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
1022
1023                def Submit(
1024                    event: str,
1025                    callable: _T.Callable[..., _T.Any],
1026                    name: str,
1027                    args: _T.Union[tuple[_T.Any, ...], list[_T.Any]],
1028                    kwargs: _T.Dict[str, _T.Any]
1029                ) -> _futures.Future[_T.Any]:
1030                    if skip[event][0]:
1031                        retry[event][0] = 0
1032                        skip_callable = raise_exc if skip[event][1] else print
1033                        return executor.submit(
1034                            _event_wrapper,
1035                            _callable=skip_callable,
1036                            _name=graph.nodes[event]['name'],
1037                            _args=[f"Event {event} skipped..."],
1038                            _kwargs={},
1039                            _verbose=verbose
1040                        )
1041                    else:
1042                        retry[event][0] -= 1
1043                        if (retry[event][0] > 0) and (retry[event][1] == {}):
1044                            retry[event][1] = {
1045                                'callable': callable,
1046                                'name': name,
1047                                'args': args,
1048                                'kwargs': kwargs
1049                            }
1050                        return executor.submit(
1051                            _event_wrapper,
1052                            _callable=callable,
1053                            _name=name,
1054                            _args=args,
1055                            _kwargs=kwargs,
1056                            _verbose=verbose
1057                        )
1058
1059                # Submit root nodes
1060                for event in roots:
1061                    refs[event] = Submit(
1062                        event,
1063                        graph.nodes[event]['callable'],
1064                        graph.nodes[event]['name'],
1065                        graph.nodes[event]['args'],
1066                        graph.nodes[event]['kwargs']
1067                    )
1068                    started.add(event)
1069
1070                # Process futures as they complete (no busy-wait)
1071                pending = set(refs.values())
1072                future_to_event = {v: k for k, v in refs.items()}
1073
1074                while (len(complete) + len(failed) + len(skipped)) != len(graph):
1075                    if not pending:
1076                        break
1077
1078                    # Wait for at least one future to complete
1079                    done, pending = _futures.wait(pending, return_when=_futures.FIRST_COMPLETED)
1080
1081                    for future in done:
1082                        event = future_to_event[future]
1083
1084                        if future.exception() is not None:
1085                            try:
1086                                raise future.exception()  # type: ignore
1087                            except:
1088                                if print_exc:
1089                                    _traceback.print_exc()
1090
1091                                if retry[event][0] > 0:
1092                                    if verbose:
1093                                        print(f"Event {event} exited with exception, retrying...")
1094                                    new_future = Submit(
1095                                        event,
1096                                        callable=retry[event][1]['callable'],
1097                                        name=retry[event][1]['name'],
1098                                        args=retry[event][1]['args'],
1099                                        kwargs=retry[event][1]['kwargs']
1100                                    )
1101                                    refs[event] = new_future
1102                                    pending.add(new_future)
1103                                    future_to_event[new_future] = event
1104                                    continue
1105
1106                                if verbose:
1107                                    print(f"Event {event} exited with exception...")
1108                                failed.add(event)
1109                                skipping = [
1110                                    e[1] for e in set(graph.out_edges(event))
1111                                    if not _edges.continue_on_failure(graph.edges[e])
1112                                ]
1113                        else:
1114                            complete.add(event)
1115                            skipping = [
1116                                e[1] for e in set(graph.out_edges(event))
1117                                if not _edges.continue_on_success(graph.edges[e])
1118                            ]
1119
1120                        skipped = skipped.union(skipping)
1121                        for n in skipping:
1122                            skipped = skipped.union(_nx.descendants(graph, n))
1123
1124                        carryon = set(graph.successors(event)).difference(skipped)
1125                        starting = [
1126                            successor for successor in carryon
1127                            if run_conditions_met(successor)
1128                        ]
1129
1130                        for successor in starting:
1131                            _args = [
1132                                a if not isinstance(a, _xcoms.Promise) else refs[_T.cast(str, a.key)].result()
1133                                for a in graph.nodes[successor]['args']
1134                            ]
1135                            _kwargs = {
1136                                k: v if not isinstance(v, _xcoms.Promise) else refs[_T.cast(str, v.key)].result()
1137                                for k, v in graph.nodes[successor]['kwargs'].items()
1138                            }
1139                            new_future = Submit(
1140                                successor,
1141                                graph.nodes[successor]['callable'],
1142                                graph.nodes[successor]['name'],
1143                                _args,
1144                                _kwargs
1145                            )
1146                            refs[successor] = new_future
1147                            pending.add(new_future)
1148                            future_to_event[new_future] = successor
1149                            started.add(successor)
class Condition(enum.IntEnum):
12class Condition(_enum.IntEnum):
13    """Represents edge condition for a node execution, based on outcome(s) of predecessor(s)."""
14    Success = 0
15    Failure = 1
16    AnyCase = 3

Represents edge condition for a node execution, based on outcome(s) of predecessor(s).

Success = <Condition.Success: 0>
Failure = <Condition.Failure: 1>
AnyCase = <Condition.AnyCase: 3>
class QueueDAG(DAG):
1151class QueueDAG(DAG):
1152    """
1153    Directed Acyclic Graph used to queue and execute Pythonic callables in parallel,
1154    while stringing the outputs of those callables in linear sequences.
1155
1156    Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the
1157    behavior of root nodes to determine if and/or when the DAG run has been completed.
1158
1159    Root nodes will be re-executed until culled by one of two conditions:
1160    1. A max number of iterations has been completed, or
1161    2. The output of the root node's callable matches a lambda halt_condition.
1162
1163    Both conditions can be set at run-time.
1164
1165    QueueDAG can be nested inside a standard webber.DAG by passing qdag.execute as a callable.
1166    Benchmarks show moderate overhead of 1.3-2.5x (~1-2ms fixed cost) when nested, due to the
1167    outer DAG's ThreadPoolExecutor setup and task wrapper infrastructure. This is well within
1168    acceptable millisecond-scale latency for most real-time applications.
1169    """
1170
1171    conditions: _T.Dict[str, _T.Dict[str, _T.Any]] = {}
1172
1173    def __init__(self) -> None:
1174        super().__init__()
1175
1176    def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str:
1177        """
1178        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
1179        On success, return unique identifier for the new node.
1180        
1181        Reserved key-words are used for Queue DAG definitions:
1182        
1183        - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables.
1184
1185        \t halt_condition = (lambda output: output == None) 
1186
1187        - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument.
1188
1189        - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop.
1190        """
1191        halt_condition = kwargs.pop('halt_condition', None)
1192        iterator: int = kwargs.pop('iterator', None)
1193        max_iter: int = kwargs.pop('max_iter', None)
1194
1195        return_val = super().add_node(node, *args, **kwargs)
1196        
1197        if max_iter is not None:
1198            iter_limit = int(max_iter)
1199        elif iterator is not None:
1200            iter_limit = int(iterator)
1201        else:
1202            iter_limit = None
1203        
1204        self.conditions[return_val] = {
1205            'halt_condition': halt_condition,
1206            'iter_limit': iter_limit
1207        }
1208
1209        return return_val
1210        
1211    def add_edge(self, u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable], continue_on: Condition = Condition.Success) -> _T.Tuple[str, str]:
1212        """
1213        Adds an edge between nodes in the Queue DAG's underlying graph.
1214        Queue DAG nodes may have a maximum of one child and one parent worker.
1215        """
1216        for node in (u_of_edge, v_of_edge):
1217            try:
1218                node_id = self.node_id(node)
1219            except:
1220                continue
1221        
1222            filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id)
1223            try:
1224                assert(len(self.filter_edges(filter)) == 0)
1225            except Exception as e:
1226                e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.")
1227                raise e
1228
1229        return super().add_edge(u_of_edge, v_of_edge, continue_on)
1230
1231    def execute(self, *promises: _T.Any, return_ref: bool = False, print_exc: bool = False) -> _T.List[_T.Any] | None:
1232        """
1233        Basic wrapper for execution of the DAG's underlying callables.
1234        """
1235        queues: _T.Dict[str, _q.LifoQueue[_T.Any]] = {}
1236        processes: _T.Dict[str, _futures.Future[_T.Any]] = {}
1237        join: set[str] = set()
1238        end_proc: _T.Optional[str] = None
1239
1240        _promises: _T.Dict[str, _T.Any] = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {}
1241
1242        with _TaskLogger("root") as _:
1243
1244            # Skip execution if there are no callables in scope.
1245            if len(self.graph.nodes) == 0:
1246                print('Given DAG has no callables in scope. Skipping execution...')
1247                return
1248
1249            with _futures.ThreadPoolExecutor() as executor:
1250
1251                for id in self.root:
1252                    node = self.get_node(id)
1253                    queues[id] = _q.LifoQueue()
1254                    node.update({
1255                        'callable': _queue._worker,
1256                        'args': tuple(),
1257                        'kwargs': {
1258                            'work': node.callable,
1259                            'args': node.args, 'kwargs': node.kwargs,
1260                            'promises': _promises,
1261                            'print_exc': print_exc,
1262                            'halt_condition': self.conditions[id]['halt_condition'],
1263                            'iter_limit': self.conditions[id]['iter_limit'],
1264                            'out_queue': queues.get(id)
1265                        }
1266                    })
1267                    processes[id] = executor.submit(
1268                        _event_wrapper,
1269                        _callable=node['callable'],
1270                        _name=node['name'],
1271                        _args=node['args'],
1272                        _kwargs=node['kwargs']
1273                    )
1274                
1275                for parent_id, id in self.graph.edges:
1276                    node = self.get_node(id)
1277                    queues[id] = _q.LifoQueue()
1278                    if len(list(self.graph.successors(id))) == 0:
1279                        end_proc = id
1280                    node.update({
1281                        'callable': _queue._worker,
1282                        'args': tuple(),
1283                        'kwargs': {
1284                            'work': node.callable,
1285                            'args': node.args, 'kwargs': node.kwargs,
1286                            'promises': _promises,
1287                            'print_exc': print_exc,
1288                            'parent_id': parent_id,
1289                            'parent_process': processes[parent_id],
1290                            'in_queue': queues.get(parent_id),
1291                            'out_queue': queues.get(id)
1292                        }
1293                    })
1294                    processes[id] = executor.submit(
1295                        _event_wrapper,
1296                        _callable=node['callable'],
1297                        _name=node['name'],
1298                        _args=node['args'],
1299                        _kwargs=node['kwargs']
1300                    )
1301
1302            # For single-node DAGs with no edges, end_proc is the root node
1303            if end_proc is None and len(self.root) > 0:
1304                end_proc = self.root[0]
1305
1306            while len(join) != len(self.graph.nodes):
1307                for node in self.graph.nodes:
1308                    if processes[node].done():
1309                        join.add(node)
1310
1311            return_val: _T.List[_T.Any] = []
1312            if end_proc is not None and end_proc in queues:
1313                while not queues[end_proc].empty():
1314                    return_val.append(queues[end_proc].get())
1315
1316            return return_val

Directed Acyclic Graph used to queue and execute Pythonic callables in parallel, while stringing the outputs of those callables in linear sequences.

Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the behavior of root nodes to determine if and/or when the DAG run has been completed.

Root nodes will be re-executed until culled by one of two conditions:

  1. A max number of iterations has been completed, or
  2. The output of the root node's callable matches a lambda halt_condition.

Both conditions can be set at run-time.

QueueDAG can be nested inside a standard webber.DAG by passing qdag.execute as a callable. Benchmarks show moderate overhead of 1.3-2.5x (~1-2ms fixed cost) when nested, due to the outer DAG's ThreadPoolExecutor setup and task wrapper infrastructure. This is well within acceptable millisecond-scale latency for most real-time applications.

conditions: Dict[str, Dict[str, Any]] = {}
def add_node(self, node: Any, *args: Any, **kwargs: Any) -> str:
1176    def add_node(self, node: _T.Any, *args: _T.Any, **kwargs: _T.Any) -> str:
1177        """
1178        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
1179        On success, return unique identifier for the new node.
1180        
1181        Reserved key-words are used for Queue DAG definitions:
1182        
1183        - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables.
1184
1185        \t halt_condition = (lambda output: output == None) 
1186
1187        - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument.
1188
1189        - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop.
1190        """
1191        halt_condition = kwargs.pop('halt_condition', None)
1192        iterator: int = kwargs.pop('iterator', None)
1193        max_iter: int = kwargs.pop('max_iter', None)
1194
1195        return_val = super().add_node(node, *args, **kwargs)
1196        
1197        if max_iter is not None:
1198            iter_limit = int(max_iter)
1199        elif iterator is not None:
1200            iter_limit = int(iterator)
1201        else:
1202            iter_limit = None
1203        
1204        self.conditions[return_val] = {
1205            'halt_condition': halt_condition,
1206            'iter_limit': iter_limit
1207        }
1208
1209        return return_val

Adds a callable with positional and keyword arguments to the DAG's underlying graph. On success, return unique identifier for the new node.

Reserved key-words are used for Queue DAG definitions:

  • halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables.

     halt_condition = (lambda output: output == None)
    
  • iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument.

  • max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop.

def add_edge( self, u_of_edge: Union[str, Callable], v_of_edge: Union[str, Callable], continue_on: Condition = <Condition.Success: 0>) -> Tuple[str, str]:
1211    def add_edge(self, u_of_edge: _T.Union[str, _T.Callable], v_of_edge: _T.Union[str, _T.Callable], continue_on: Condition = Condition.Success) -> _T.Tuple[str, str]:
1212        """
1213        Adds an edge between nodes in the Queue DAG's underlying graph.
1214        Queue DAG nodes may have a maximum of one child and one parent worker.
1215        """
1216        for node in (u_of_edge, v_of_edge):
1217            try:
1218                node_id = self.node_id(node)
1219            except:
1220                continue
1221        
1222            filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id)
1223            try:
1224                assert(len(self.filter_edges(filter)) == 0)
1225            except Exception as e:
1226                e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.")
1227                raise e
1228
1229        return super().add_edge(u_of_edge, v_of_edge, continue_on)

Adds an edge between nodes in the Queue DAG's underlying graph. Queue DAG nodes may have a maximum of one child and one parent worker.

def execute( self, *promises: Any, return_ref: bool = False, print_exc: bool = False) -> Optional[List[Any]]:
1231    def execute(self, *promises: _T.Any, return_ref: bool = False, print_exc: bool = False) -> _T.List[_T.Any] | None:
1232        """
1233        Basic wrapper for execution of the DAG's underlying callables.
1234        """
1235        queues: _T.Dict[str, _q.LifoQueue[_T.Any]] = {}
1236        processes: _T.Dict[str, _futures.Future[_T.Any]] = {}
1237        join: set[str] = set()
1238        end_proc: _T.Optional[str] = None
1239
1240        _promises: _T.Dict[str, _T.Any] = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {}
1241
1242        with _TaskLogger("root") as _:
1243
1244            # Skip execution if there are no callables in scope.
1245            if len(self.graph.nodes) == 0:
1246                print('Given DAG has no callables in scope. Skipping execution...')
1247                return
1248
1249            with _futures.ThreadPoolExecutor() as executor:
1250
1251                for id in self.root:
1252                    node = self.get_node(id)
1253                    queues[id] = _q.LifoQueue()
1254                    node.update({
1255                        'callable': _queue._worker,
1256                        'args': tuple(),
1257                        'kwargs': {
1258                            'work': node.callable,
1259                            'args': node.args, 'kwargs': node.kwargs,
1260                            'promises': _promises,
1261                            'print_exc': print_exc,
1262                            'halt_condition': self.conditions[id]['halt_condition'],
1263                            'iter_limit': self.conditions[id]['iter_limit'],
1264                            'out_queue': queues.get(id)
1265                        }
1266                    })
1267                    processes[id] = executor.submit(
1268                        _event_wrapper,
1269                        _callable=node['callable'],
1270                        _name=node['name'],
1271                        _args=node['args'],
1272                        _kwargs=node['kwargs']
1273                    )
1274                
1275                for parent_id, id in self.graph.edges:
1276                    node = self.get_node(id)
1277                    queues[id] = _q.LifoQueue()
1278                    if len(list(self.graph.successors(id))) == 0:
1279                        end_proc = id
1280                    node.update({
1281                        'callable': _queue._worker,
1282                        'args': tuple(),
1283                        'kwargs': {
1284                            'work': node.callable,
1285                            'args': node.args, 'kwargs': node.kwargs,
1286                            'promises': _promises,
1287                            'print_exc': print_exc,
1288                            'parent_id': parent_id,
1289                            'parent_process': processes[parent_id],
1290                            'in_queue': queues.get(parent_id),
1291                            'out_queue': queues.get(id)
1292                        }
1293                    })
1294                    processes[id] = executor.submit(
1295                        _event_wrapper,
1296                        _callable=node['callable'],
1297                        _name=node['name'],
1298                        _args=node['args'],
1299                        _kwargs=node['kwargs']
1300                    )
1301
1302            # For single-node DAGs with no edges, end_proc is the root node
1303            if end_proc is None and len(self.root) > 0:
1304                end_proc = self.root[0]
1305
1306            while len(join) != len(self.graph.nodes):
1307                for node in self.graph.nodes:
1308                    if processes[node].done():
1309                        join.add(node)
1310
1311            return_val: _T.List[_T.Any] = []
1312            if end_proc is not None and end_proc in queues:
1313                while not queues[end_proc].empty():
1314                    return_val.append(queues[end_proc].get())
1315
1316            return return_val

Basic wrapper for execution of the DAG's underlying callables.