webber.core

Base module for abstract multiprocessing system - a directed acyclic graph.

   1"""
   2Base module for abstract multiprocessing system - a directed acyclic graph.
   3"""
   4import sys as _sys
   5import typing as _T
   6import uuid as _uuid
   7import types as _types
   8import logging as _logging
   9import traceback as _traceback
  10import contextlib as _contextlib
  11import collections.abc as _abc
  12import concurrent.futures as _futures
  13import networkx as _nx
  14import webber.edges as _edges
  15import webber.xcoms as _xcoms
  16import webber.queue as _queue
  17
  18from webber.edges import Condition, dotdict, edgedict
  19
  20import queue as _q
  21import itertools as _it
  22
  23__all__ = ["DAG", "Condition", "QueueDAG"]
  24
  25def _iscallable(function: any):
  26    return callable(function)
  27
  28class _OutputLogger:
  29    """
  30    Basic logger for synchronizing parallel output of Webber tasks.
  31    Adapted from John Paton: https://johnpaton.net/posts/redirect-logging/
  32    """
  33    def __init__(self, name="root", level="INFO", callable_name="root", file=_sys.stdout) -> None:
  34        """Initializes logger for given scope."""
  35        self.logger    = _logging.getLogger(name)
  36        self.name      = self.logger.name
  37        self.callable  = callable_name
  38        self.level     = getattr(_logging, level)
  39        self.format    = _logging.Formatter(
  40                            "{asctime} " + f"{self.callable:>15}:" + " {message}",
  41                            style="{",
  42                        )
  43        stream_handler = _logging.StreamHandler(file)
  44        stream_handler.setFormatter(self.format)
  45        self.logger.addHandler(stream_handler)
  46        self.logger.setLevel(self.level)
  47        self._redirector = _contextlib.redirect_stdout(self)
  48        # elif file is _stderr:
  49        #     self._redirector = _contextlib.redirect_stderr(self)
  50
  51    def write(self, msg: str):
  52        """Writes non-empty strings to the logger's output -- stdout, by default."""
  53        if msg and not msg.isspace():
  54            self.logger.log(self.level, msg)
  55
  56    def flush(self):
  57        """Ignores flushing of output, since logs are not concerned with such characters. (?)"""
  58        pass
  59
  60    def __enter__(self):
  61        """Allows Python built-ins for printing to forward data in scope. (?)"""
  62        self._redirector.__enter__()
  63        return self
  64
  65    def __exit__(self, exc_type, exc_value, traceback):
  66        """Allows Python built-ins to exit scope on error. (?)"""
  67        self._redirector.__exit__(exc_type, exc_value, traceback)
  68
  69def _event_wrapper(_callable: callable, _name: str, _args, _kwargs):
  70    """Wrapper used by Webber DAGs to log and execute a Python callables as a unit of work."""
  71    with _OutputLogger(str(_uuid.uuid4()), "INFO", _name) as _:
  72        return _callable(*_args, **_kwargs)
  73
  74class DAG:
  75    """
  76    Directed Acyclic Graph used to represent Pythonic tasks in parallel.
  77    """
  78    def add_node(self, node, *args, **kwargs) -> str:
  79        """
  80        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
  81        On success, return unique identifier for the new node.
  82        """
  83        if not _iscallable(node):
  84            err_msg = f"{node}: requested node is not a callable Python function."
  85            raise TypeError(err_msg)
  86
  87        node_name = _edges.label_node(node)
  88
  89        args = tuple([
  90            arg if not isinstance(args, _xcoms.Promise) else self.resolve_promise(arg)
  91            for arg in args
  92        ])
  93
  94        for k, val in kwargs.items():
  95            if isinstance(val, _xcoms.Promise):
  96                kwargs[k] = self.resolve_promise(val)
  97
  98        self.graph.add_node(
  99            node_for_adding=node_name,
 100            callable=node, args=args, kwargs=kwargs,
 101            name=node.__name__,
 102            id=node_name
 103        )
 104
 105        return node_name
 106
 107
 108    def add_edge(
 109            self, 
 110            u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable],
 111            continue_on: Condition = Condition.Success
 112        ) -> _T.Tuple[str,str]:
 113        """
 114        Adds an edge between nodes in the DAG's underlying graph,
 115        so long as the requested edge is unique and has not been added previously.
 116
 117        On success, returns Tuple of the new edge's unique identifiers.
 118        """
 119        # Validate inputs prior to execution
 120        # - Nodes must be identifiers or callables
 121        # - Conditions must belong to the webber.edges.Condition class
 122        if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)):
 123            err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable"
 124            raise TypeError(err_msg)
 125        if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)):
 126            err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable"
 127            raise TypeError(err_msg)
 128        if not isinstance(continue_on, Condition):
 129            raise TypeError("Edge conditions must use the webber.edges.Condition class.")
 130
 131        # Base Case 0: No nodes are present in the DAG:
 132        # Ensure that both nodes are callables, then add both to the graph and
 133        # assign the outgoing node as a root.
 134        if len(self.graph.nodes()) == 0:
 135            if not _iscallable(u_of_edge):
 136                err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope."
 137                raise ValueError(err_msg)
 138            if not _iscallable(v_of_edge):
 139                err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope."
 140                raise ValueError(err_msg)
 141            outgoing_node = self.add_node(u_of_edge)
 142            incoming_node = self.add_node(v_of_edge)
 143            self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
 144            return (outgoing_node, incoming_node)
 145
 146        node_names, node_callables = zip(*self.graph.nodes(data='callable'))
 147        graph_edges = list(self.graph.edges(data=False))
 148        new_callables = dict()
 149
 150        if _iscallable(u_of_edge) and _iscallable(v_of_edge):
 151
 152            # Error Cases 0, 1: Either of the callables appear more than once in the DAG.
 153            if node_callables.count(u_of_edge) > 1:
 154                err_msg = f"Outgoing callable {u_of_edge.__name__} " \
 155                        + "exists more than once in this DAG. " \
 156                        + "Use the unique identifier of the required node or add a new node."
 157                raise ValueError(err_msg)
 158            if node_callables.count(v_of_edge) > 1:
 159                err_msg = f"Incoming callable {v_of_edge.__name__} " \
 160                        + "exists more than once in this DAG. " \
 161                        + "Use the unique string identifier of the required node."
 162                raise ValueError(err_msg)
 163            
 164            # Base Case 1: Both args are callables and will be present in the DAG scope no more than once.
 165            # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes.
 166            try:
 167                outgoing_node = self.node_id(u_of_edge)
 168            except:
 169                new_callables[u_of_edge] = u_of_edge
 170                outgoing_node = _edges.label_node(u_of_edge)
 171            try:
 172                incoming_node = self.node_id(v_of_edge)
 173            except:
 174                new_callables[v_of_edge] = v_of_edge
 175                incoming_node = _edges.label_node(v_of_edge)
 176
 177        else:
 178
 179            # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope.
 180            if isinstance(u_of_edge, str) and u_of_edge not in node_names:
 181                err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope."
 182                raise ValueError(err_msg)
 183            if isinstance(v_of_edge, str) and v_of_edge not in node_names:
 184                err_msg = f"Incoming node {v_of_edge} not in DAG's current scope."
 185                raise ValueError(err_msg)
 186
 187            # Both nodes' unique identifiers are present in the DAG
 188            # and should be evaluated for a valid edge.
 189            if isinstance(u_of_edge, str) and isinstance(v_of_edge, str):
 190                outgoing_node = u_of_edge
 191                incoming_node = v_of_edge
 192
 193            # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier.
 194            else:
 195                for node in (u_of_edge, v_of_edge):
 196                    def _assign_node(n):
 197                        # For the argument that is a unique string identifier, assign and continue.
 198                        if not _iscallable(n):
 199                            return n
 200                        # Error Case 4: The requested callable exists more than once in the DAG.
 201                        if node_callables.count(n) > 1:
 202                            err_msg = f"Outgoing callable {n.__name__} " \
 203                                + "exists more than once in this DAG. " \
 204                                + "Use the unique ID of the required node or add a new node."
 205                            raise ValueError(err_msg)
 206                        # If the callable exists only once in the DAG, use its unique identifier to
 207                        # evaluate the requested edge.
 208                        if node_callables.count(n) == 1:
 209                            return node_names[ node_callables.index(n) ]
 210                        # Otherwise, the callable is new and needs to be added to the DAG scope.
 211                        else:
 212                            new_callables[n] = n
 213                            return _edges.label_node(n)
 214                    if node == u_of_edge:
 215                        outgoing_node = _assign_node(node)
 216                    else:
 217                        incoming_node = _assign_node(node)
 218
 219        # Error Case 5: Both callables exist only once in the DAG,
 220        # but an edge already exists between them.
 221        if (outgoing_node, incoming_node) in graph_edges:
 222            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \
 223                    + "a definition in this DAG."
 224            raise ValueError(err_msg)
 225
 226        # Ensure that no cycles will be created by adding this edge to the DAG.
 227        test_edges: list = graph_edges + [(outgoing_node, incoming_node)]
 228
 229        # Error Case 6: Both callables exist only once in the DAG,
 230        # but adding an edge between them creates circular dependencies.
 231        if not _nx.is_directed_acyclic_graph(_nx.DiGraph(test_edges)):
 232            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \
 233                    + "results in circular dependencies."
 234            raise ValueError(err_msg)
 235
 236        # We can now add the edge to the DAG, since we are certain it will not result in
 237        # illegal dependencies/behavior.
 238        # First, we should account for potential new nodes. This also handles
 239        # duplicates on first entry to the DAG (e.g.: edge == (print, print))
 240        if new_callables.get(u_of_edge) != None:
 241            outgoing_node = self.add_node(new_callables[u_of_edge])
 242        if new_callables.get(v_of_edge) != None:
 243            incoming_node = self.add_node(new_callables[v_of_edge])
 244
 245        # Then we can add the new edge.
 246        self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
 247        return (outgoing_node, incoming_node)
 248    
 249    def remove_node(self, *posargs, **kwargs) -> None:
 250        """
 251        Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG.
 252        Throws error message and recommends safer methods.
 253        """
 254        raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.")
 255
 256    def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]:
 257        """
 258        Removes an directed edge between nodes in the DAG's underlying graph.
 259        Throws error if the edge does not exist.
 260        On success, returns Tuple of the removed edge's unique identifiers.
 261        """
 262        edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge))
 263        if edge_id not in self.graph.edges(data = False):
 264            err_msg = "Requested edge does not exist in the DAG's scope"
 265            raise ValueError(err_msg)
 266        self.graph.remove_edge(edge_id[0], edge_id[1])
 267        return edge_id
 268
 269    def update_edges(self, *E, continue_on: Condition, filter: _types.LambdaType = None, data = None):
 270        """
 271        Flexible function to update properties of edges in the DAG's scope, 
 272        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
 273        lambda filter using the edgedict syntax.
 274        
 275        List of nodes to update or filter argument is expected. Valid edge lists include:
 276
 277        \t update_edges((node1, node2), ...)       or update_edges([(node1, node2)], ...)
 278
 279        \t update_edges((node1, callable2), ...)   or update_edges([(node1, callable2)], ...)
 280        
 281        \t update_edges(edgedict),                 where isinstance(edgedict, webber.edges.edgedict) == True
 282
 283        Parameters:
 284
 285        > continue_on: webber.edges.Condition value to update matching edges with, 
 286        
 287        \t Execute child on success, failure, or any exit state, of the parent node
 288
 289        > filter: lambda property that can be used instead of a list of edges to be updated.
 290        
 291        \t filter = (lambda e: n.parent == print or e.child == print)
 292
 293        > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set.
 294
 295        \t data = { 'continue_on': webber.edges.Condition }
 296
 297        """
 298        if len(E) == 0 and filter == None:
 299            raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.")
 300
 301        elif isinstance(E, dict) or isinstance(E, edgedict):
 302            E = [E]
 303
 304        elif len(E) == 1 and isinstance(E[0], _abc.Iterable):
 305            try:
 306                _ = self.get_edges(E[0])
 307            except:
 308                E = E[0]
 309
 310        if filter != None:
 311            edge_ids = self.filter_edges(filter, data = False)
 312        else:
 313            if isinstance(E[0], dict) or isinstance(E[0], edgedict):
 314                try:
 315                    ids = [e['id'] for e in E]
 316                except KeyError:
 317                    err_msg = 'In dictionary form, all given edges must be follow edgedict standards.'
 318                    raise ValueError(err_msg)
 319            else:
 320                ids = E
 321            edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids]
 322
 323        std_update = (continue_on == None)
 324
 325        if std_update:
 326            for edge_id, e in zip(edge_ids, E):
 327                if data != None:
 328                    self._update_edge(data, id = edge_id)
 329                else:
 330                    self._update_edge(e, id = edge_id)
 331        
 332        else:
 333            if continue_on != None:
 334                if not isinstance(continue_on, Condition):
 335                    err_msg = f"Condition assignment must use webber.edges.Condition"
 336                    raise TypeError(err_msg)
 337                for e in edge_ids:
 338                    self.graph.edges[e]['Condition'] = continue_on
 339
 340    def _update_edge(self, edgedict: dict, id: tuple[str, str] = None, force: bool = False):
 341        """
 342        Internal only. Update properties of an individual edge within a DAG's scope, 
 343        given a well-structured dictionary and the tuple identifier of the network edge. 
 344        Force argument bypasses validation, and should only be used internally.
 345        """
 346        if id != None:
 347            try:
 348                if edgedict.get('id') and id != edgedict['id']:
 349                    raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {edgedict['id']}")
 350            except ValueError as e:
 351                if not force:
 352                    raise e
 353            edgedict['id'] = id
 354
 355        expected_keys = ('parent', 'child', 'id', 'continue_on')
 356        if not set(expected_keys).issuperset(set(edgedict.keys())):
 357            raise ValueError(f"Expecting keys: {expected_keys}")
 358
 359        if not force:
 360            
 361            e1, e2 = None, None
 362
 363            if edgedict.get('id'):
 364                e1 = self.get_edge(edgedict['id'], data = False)
 365
 366            if edgedict.get('parent') or edgedict.get('child'):
 367                e2 = self.get_edge(edgedict.get('parent'), edgedict.get('child'), data = False)
 368            else:
 369                e2 = e1
 370
 371            if e1 != e2:
 372                raise ValueError('Edge vertices should not be changed using update functions.')
 373            
 374            elif e1 == None:
 375                raise ValueError('Requested edge was not given an identifier.')
 376
 377            if edgedict.get('continue_on') and not isinstance(edgedict['continue_on'], Condition):
 378                err_msg = f"Condition assignment must use webber.edges.Condition"
 379                raise TypeError(err_msg)
 380                       
 381        edge_id = edgedict.pop('id')
 382        edge = {k: v for k,v in edgedict.items() if k not in ('parent', 'child', 'id')}
 383        self.graph.edges[(edge_id[0], edge_id[1])].update(edge)
 384
 385    def relabel_node(self, node: _T.Union[str, _T.Callable], label: str):
 386        """
 387        Update the label, or name, given to a node in the DAG's scope, 
 388        given a Python string and a node identifier. 
 389        Well-structured wrapper for a common use-case of DAG.update_nodes.
 390        """
 391        node_id = self.node_id(node)
 392        if not isinstance(label, str) or len(label) == 0:
 393            err_msg = "Node label must be a Python string with one or more characters."
 394            raise ValueError(err_msg)
 395        self.update_nodes(node_id, data = {'name': label})
 396        return label
 397
 398    def update_nodes(self, *N, filter: _types.LambdaType = None, data = None, callable = None, args = None, kwargs = None):
 399        """
 400        Flexible function to update properties of nodes in the DAG's scope, 
 401        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
 402        lambda filter using the dotdict syntax.
 403        
 404        List of nodes to update or filter argument is expected. Valid node lists include:
 405
 406        \t update_nodes(node_id, ...)           or update_nodes([node_id], ...)
 407
 408        \t update_nodes(callable, ...)          or update_nodes([callable], ...)
 409        
 410        \t update_nodes(node1, node2, ...)      or update_nodes([node1, node2], ...)
 411        
 412        \t update_nodes(node1, callable2, ...)  or update_nodes([node1, callable2], ...)
 413
 414        > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id)
 415
 416        Parameters:
 417
 418        > filter: lambda property that can be used instead of a list of nodes to be updated.
 419        
 420        \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args)
 421
 422        > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set.
 423            Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method.
 424            
 425        \t data = {
 426            \t 'callable': print,
 427            \t 'args': ['Hello', 'World'],
 428            \t 'kwargs': {'sep': ', '},
 429            \t 'name': 'custom_label',
 430            \t 'id': 'unique-identifier'
 431            \t}
 432
 433        > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List).
 434
 435        > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary.
 436
 437        """
 438        if len(N) == 0 and filter == None:
 439            raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.")
 440
 441        elif len(N) > 0 and filter != None:
 442            raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.")
 443
 444        elif isinstance(N, dict) or isinstance(N, str):
 445            N = [N]
 446
 447        elif len(N) == 1 and isinstance(N[0], _abc.Iterable):
 448            if isinstance(N[0][0], dict):
 449                N = N[0]
 450            elif isinstance(N[0][0], str):
 451                # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator).
 452                if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 
 453                    N = N[0]
 454
 455
 456        if filter != None:
 457            node_ids = self.filter_nodes(filter, data = False)
 458        else:
 459            if isinstance(N[0], dict):
 460                ids = [n['id'] for n in N]
 461            else:
 462                ids = N
 463            node_ids = [self.node_id(i) for i in ids]
 464        
 465        std_update = (callable == None) and (args == None) and (kwargs == None)
 466
 467        if std_update:
 468            for node_id, n in zip(node_ids, N):
 469                if data != None:
 470                    self._update_node(data, id = node_id)
 471                else:
 472                    self._update_node(n, id = node_id)
 473        
 474        else:
 475            if callable != None:
 476                if not _iscallable(callable):
 477                    err_msg = f"Requested node is not assigned a callable Python function."
 478                    raise TypeError(err_msg)
 479                for node_id in node_ids:
 480                    self.graph.nodes[node_id]['callable'] = callable
 481                    self.graph.nodes[node_id]['name'] = callable.__name__
 482            
 483            if args != None:
 484                if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)):
 485                    err_msg = f"Requested node is not assigned a tuple of pos args."
 486                    raise TypeError(err_msg)
 487                args = tuple(args)
 488                for node_id in node_ids:
 489                    self.graph.nodes[node_id]['args'] = args
 490            
 491            if kwargs != None:
 492                if not isinstance(kwargs, dict):
 493                    err_msg = f"Requested node is not assigned a dictionary of kw args."
 494                    raise TypeError(err_msg)
 495                for node_id in node_ids:
 496                    self.graph.nodes[node_id]['kwargs'] = kwargs
 497
 498    def get_edges(self, *N, data: bool = True) -> _T.Union[list[edgedict], list[tuple]]:
 499        """
 500        Retrieval function for DAG edge data, based on tuple identifiers.
 501        Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2']))
 502        """
 503        if len(N) == 0:
 504            if data == True:
 505                return list(map(edgedict, self.graph.edges.data()))
 506            return list(self.graph.edges.data(data=False))
 507            
 508        # elif len(N) == 1:
 509        #     if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple):
 510        #         N = N[0]
 511
 512        if len(N) != len(set(N)) or False in map(lambda n: isinstance(n, _abc.Iterable) and len(n) == 2, N):
 513            err_msg = 'All requested edges must be unique tuples of size 2.'
 514            raise ValueError(err_msg)
 515    
 516        edge_data = [self.get_edge(o, i, data=data) for (o, i) in N]
 517        return edge_data
 518    
 519    def get_edge(self, outgoing_node: _T.Union[str, callable], incoming_node: _T.Union[str, callable], data: bool = True) -> _T.Union[edgedict, tuple]:
 520        """
 521        Retrieval function for a single directed edge between nodes in a DAG's scope. 
 522        """
 523        id = (self.node_id(outgoing_node), self.node_id(incoming_node))
 524        if not data:
 525            return id
 526        edge_data = self.graph.get_edge_data(u = id[0], v = id[1])
 527        if not edge_data:
 528            err_msg = f'No match found for the directed edge requested: {id}'
 529            raise ValueError(err_msg)
 530        return edgedict(*id, **edge_data)
 531
 532    def get_node(self, n: _T.Union[str, callable]) -> dotdict:
 533        """
 534        Given a unique identifier, returns a dictionary of node metadata
 535        for a single node in the DAG's scope.
 536        """
 537        node_id = self.node_id(n)
 538        return dotdict(self.graph.nodes[node_id])
 539
 540    def get_nodes(self, *N) -> list[dotdict]:
 541        """
 542        Flexible function to retrieve DAG node data, based on node identifiers 
 543        (e.g.: string IDs or unique callables).
 544        """
 545        if len(N) == 0:
 546            node_data = list(self.graph.nodes.values())
 547            return [dotdict(d) for d in node_data]
 548
 549        elif len(N) == 1:
 550            if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str):
 551                N = N[0]
 552            else:
 553                node_id = self.node_id(N[0])
 554                node_data = [dotdict(self.graph.nodes[node_id])]
 555                return node_data
 556            
 557        if not len(N) == len(set(N)):
 558            err_msg = 'All requested nodes must be unique identifiers.'
 559            raise ValueError(err_msg)
 560        
 561        node_ids  = [self.node_id(n) for n in N]
 562        node_data = [dotdict(self.graph.nodes[n]) for n in node_ids]
 563        return node_data
 564
 565    def filter_nodes(self, filter: _types.LambdaType, data: bool = False):
 566        """
 567        Given a lambda function, filter nodes in a DAG's scope based on its attributes.
 568        Current limitation: Filters must use node identifier strings when referencing nodes.
 569        Use get_nodes for more flexible controls.
 570        """
 571        if not data:
 572            return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))]
 573        return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))]
 574    
 575    def filter_edges(self, filter = _types.LambdaType, data: bool = False) -> list[edgedict]:
 576        """
 577        Given a lambda function, filter edges in a DAG's scope based on its attributes.
 578        Current limitation: Filters must use node identifier strings when referencing nodes.
 579        Use get_edges for more flexible controls.
 580        """
 581        if not data:
 582            return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
 583        return [edgedict(e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
 584
 585    def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int):
 586        """
 587        Given a node identifier, set number of automatic retries in case of failure.
 588        Re-attempts will begin as soon as possible.
 589        """
 590        if not isinstance(count, int) or not count >= 0:
 591            raise ValueError("Retry count must be a non-negative integer.")
 592        node = self.node_id(identifier)
 593        self.graph.nodes[node]['retry'] = count
 594
 595    def skip_node(self, identifier: _T.Union[str,_T.Callable], skip: bool = True, as_failure = False):
 596        """
 597        Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error).
 598        Allows conditional control and testing over DAG's order of operations.
 599        """
 600        if not isinstance(skip, bool):
 601            raise ValueError("Skip argument must be a boolean value.")
 602        node = self.node_id(identifier)
 603        self.graph.nodes[node]['skip'] = (skip, as_failure)
 604      
 605    def critical_path(self, nodes):
 606        """
 607        Given a set of nodes, returns a subset of the DAG containing
 608        only the node(s) and its parents, or upstream dependencies.
 609        """
 610        if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str):
 611            node_ids = {self.node_id(n) for n in nodes}
 612        else:
 613            node_ids = {self.node_id(nodes)}
 614        return self._subgraph(node_ids)
 615
 616    def execute(self, return_ref=False, print_exc=False):
 617        """
 618        Basic wrapper for execution of the DAG's underlying callables.
 619        """
 620        executor = self.DAGExecutor(self.graph, self.root, print_exc)
 621        return executor if return_ref else None
 622
 623    def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] = None):
 624        """
 625        Basic wrapper to visualize DAG using Vis.js and NetGraph libraries.
 626        By default, visualization library only loaded in after DAG.visualize() is called, halving import times.
 627        """
 628        import webber.viz as _viz
 629
 630        match type:
 631            case 'browser':
 632                _viz.visualize_browser(self.graph)
 633
 634            case 'plt':
 635                return _viz.visualize_plt(self.graph)
 636
 637            case 'gui':
 638                # _visualize_gui(self.graph)
 639                raise NotImplementedError
 640
 641            case None: 
 642                if _viz._in_notebook():
 643                    return _viz.visualize_plt(self.graph)
 644                else:
 645                    _viz.visualize_browser(self.graph)
 646
 647            case _:
 648                err_msg = "Unknown visualization type requested."
 649                raise NotImplementedError(err_msg)
 650
 651    @property
 652    def root(self) -> list[str]:
 653        """
 654        Return list of nodes with no dependencies.
 655        Root nodes will occur first in DAG's order of operations.
 656        """
 657        return list(filter(
 658            lambda node: len(list(self.graph.predecessors(node))) < 1,
 659            self.graph.nodes.keys()
 660        ))
 661
 662    @property
 663    def nodes(self):
 664        return self.graph.nodes
 665
 666    def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str:
 667        """
 668        Validate whether identifier given is a valid node within the DAG's scope.
 669        Primarily for internal use, but useful for retrieving string identifiers
 670        for a unique callable in a DAG.
 671        """
 672        node_names, node_callables = zip(*self.graph.nodes(data='callable'))
 673
 674        if isinstance(identifier, str):
 675            if identifier not in node_names:
 676                err_msg = f"Node {identifier} is not defined in this DAG's scope."
 677                raise ValueError(err_msg)
 678            node = identifier
 679        elif _iscallable(identifier):
 680            match node_callables.count(identifier):
 681                case 0:
 682                    err_msg = f"Callable {identifier} is not defined in this DAG's scope."
 683                    raise ValueError(err_msg)
 684                case 1:
 685                    node = node_names[ node_callables.index(identifier) ]
 686                case _:
 687                    err_msg = f"Callable {identifier.__name__} " \
 688                            + "exists more than once in this DAG. " \
 689                            + "Use the unique identifier of the required node."
 690                    raise ValueError(err_msg)
 691        else:            
 692            err_msg = f"Node {identifier} must be a string or a Python callable"
 693            raise TypeError(err_msg)
 694        return node
 695    
 696    def _update_node(self, nodedict: dict, id: str = None, force: bool = False):
 697        """
 698        Internal only. Update properties of single node within a DAG's scope, 
 699        given a well-structured dictionary and the tuple identifier of the network edge. 
 700        Force argument bypasses dictionary validation, and should only be used internally.
 701        """
 702        if id != None:
 703            try:
 704                if nodedict.get('id') != None and id != nodedict['id']:
 705                    raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {nodedict['id']}")
 706            except ValueError as e:
 707                if not force:
 708                    raise e
 709            nodedict['id'] = id
 710        
 711        expected_keys = ('callable', 'args', 'kwargs', 'name', 'id')
 712        if not set(expected_keys).issuperset(set(nodedict.keys())):
 713            raise ValueError(f"Expecting keys: {expected_keys}")
 714        
 715        if not force:
 716            if nodedict.get('callable'):
 717                if _iscallable(nodedict['callable']):
 718                    err_msg = f"Requested node is not assigned a callable Python function."
 719                    raise TypeError(err_msg)
 720                if not nodedict.get('name'):
 721                    nodedict['name'] = nodedict['callable'].__name__
 722
 723            if nodedict.get('name') and (not isinstance(nodedict['name'], str) or len(nodedict['name']) == 0): 
 724                err_msg = f"Requested node name must be a non-null Python string, will default to callable when not set."
 725                raise TypeError(err_msg)
 726
 727            if nodedict.get('args'):
 728                if not (isinstance(nodedict['args'], _abc.Iterable) or isinstance(nodedict['args'], str)):
 729                    err_msg = f"Requested node is not assigned a tuple of pos args."
 730                    raise TypeError(err_msg)
 731                nodedict['args'] = tuple(nodedict['args'])
 732            
 733            if nodedict.get('kwargs') and not isinstance(nodedict['kwargs'], dict):
 734                err_msg = f"Requested node is not assigned a dictionary of kw args."
 735                raise TypeError(err_msg)                    
 736        
 737        node_id = nodedict.pop('id')
 738        self.graph.nodes[node_id].update(nodedict)
 739
 740        # Reset node name if implicitly requested.
 741        if not nodedict.get('name'):
 742            self.graph.nodes[node_id]['name'] = self.graph.nodes[node_id]['callable'].__name__
 743
 744    def _subgraph(self, node_ids: set[str]):
 745        """
 746        Internal only. Given a set of nodes, returns a subset of the DAG containing
 747        only the node(s) and upstream dependencies.
 748        """
 749        parent_nodes = set()
 750        for node in node_ids:
 751            parent_nodes = parent_nodes.union(set(self.graph.predecessors(node)))
 752        while parent_nodes:
 753            node_ids = node_ids.union(parent_nodes)
 754            parent_nodes, child_nodes = set(), parent_nodes
 755            for node in child_nodes:
 756                parent_nodes = parent_nodes.union(set(self.graph.predecessors(node)))
 757        subgraph = self.graph.subgraph(node_ids)
 758        return DAG(subgraph, __force=True)
 759
 760    def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise:
 761        """
 762        Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope.
 763        Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope.
 764        """
 765        try:
 766            key = self.node_id(promise.key)
 767        except Exception as e:
 768            raise _xcoms.InvalidCallable(e)
 769        return _xcoms.Promise(key)
 770
 771    def __init__(self, graph: _nx.DiGraph = None, **kwargs) -> None:
 772
 773        if graph is None:
 774            self.graph = _nx.DiGraph()
 775            return
 776        
 777        # Meant for internal use only, creating DAGs from subgraphs.
 778        if kwargs.get('__force') == True:
 779            self.graph = graph
 780            return
 781        
 782        _edges.validate_dag(graph)
 783
 784        # Define framework specific logic as nested dictionaries.
 785        for node in graph.nodes.keys():
 786            graph.nodes[node]['callable'] = node
 787            graph.nodes[node]['name'] = node.__name__
 788            graph.nodes[node]['args'] = []
 789            graph.nodes[node]['kwargs'] = {}
 790
 791        for e in graph.edges:
 792            condition = graph.edges[e].get('Condition')
 793            if condition is not None:
 794                if condition not in Condition:
 795                    raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.')
 796            else:
 797                graph.edges[e]['Condition'] = Condition.Success
 798            
 799        graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node))
 800        for n in graph.nodes:
 801            graph.nodes[n]['id'] = n
 802        self.graph = _nx.DiGraph(graph)
 803
 804    class DAGExecutor:
 805        """
 806        Base class used to execute DAG in embarrassingly parallel.
 807        """
 808        def __init__(self, graph: _nx.DiGraph, roots: list, print_exc: bool = False) -> None:
 809            
 810            with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _:
 811                
 812                # Skip execution if there are no callables in scope.
 813                if len(graph.nodes) == 0:
 814                    print('Given DAG has no callables in scope. Skipping execution...')
 815                    return
 816
 817                # Initialize local variables for execution.
 818                complete, started, failed, skipped = set(), set(), set(), set()
 819                events = set(roots)
 820                refs: dict[str, _futures.Future] = {}
 821
 822                def raise_exc(message):
 823                    raise ValueError(message)
 824
 825                def run_conditions_met(n):
 826                    for p in graph.predecessors(n):
 827                        match graph.edges.get((p, n))['Condition']:
 828                            case Condition.Success:
 829                                if p not in complete:
 830                                    return False
 831                            case Condition.Failure:
 832                                if p not in failed:
 833                                    return False
 834                            case Condition.AnyCase:
 835                                if p not in failed and p not in complete:
 836                                    return False
 837                    return True
 838
 839                skip  = graph.nodes.data("skip", default=(False, False))
 840                retry = {n: [c+1, {}] for n,c in graph.nodes.data("retry", default=0)}
 841
 842                # Start execution of root node functions.
 843                with _futures.ThreadPoolExecutor() as executor:
 844
 845                    def Submit(event, callable, name, args, kwargs):
 846                        if skip[event][0]: # NOTE: Internally, DAGExecutor tracks these skipped events as successes or failures.
 847                            retry[event][0] = 0
 848                            skip_callable = raise_exc if skip[event][1] else print
 849                            return executor.submit(
 850                                _event_wrapper,
 851                                _callable=skip_callable,
 852                                _name=graph.nodes[event]['name'],
 853                                _args=[f"Event {event} skipped..."],
 854                                _kwargs={}
 855                            )
 856                        else:
 857                            retry[event][0] -= 1
 858                            if (retry[event][0] > 0) and (retry[event][1] == {}):
 859                                retry[event][1] = {
 860                                    'callable': callable,
 861                                    'name': name,
 862                                    'args': args,
 863                                    'kwargs': kwargs
 864                                }
 865                            return executor.submit(
 866                                _event_wrapper,
 867                                _callable=callable,
 868                                _name=name,
 869                                _args=args,
 870                                _kwargs=kwargs
 871                            )
 872
 873                    for event in events:
 874                        refs[event] = Submit(
 875                            event,
 876                            graph.nodes[event]['callable'],
 877                            graph.nodes[event]['name'],
 878                            graph.nodes[event]['args'],
 879                            graph.nodes[event]['kwargs']
 880                        )
 881                        started.add(event)
 882                    # Loop until all nodes in the network are executed.
 883                    while (len(complete) + len(failed) + len(skipped)) != len(graph):
 884                        for event in events:
 885                            if refs[event].done() is True:
 886                                if refs[event].exception(timeout=0) is not None:
 887                                    try:
 888                                        raise refs[event].exception(timeout=0)
 889                                    except:
 890                                        if print_exc:
 891                                            _traceback.print_exc()
 892                                        
 893                                        if retry[event][0] > 0:
 894                                            print(f"Event {event} exited with exception, retrying...")
 895                                            refs[event] = Submit(
 896                                                event,
 897                                                callable=retry[event][1]['callable'],
 898                                                name=retry[event][1]['name'],
 899                                                args=retry[event][1]['args'],
 900                                                kwargs=retry[event][1]['kwargs']
 901                                            )
 902                                            continue                                            
 903
 904                                        print(f"Event {event} exited with exception...")
 905                                        failed.add(event)
 906                                        skipping = [
 907                                            e[1] for e in set(graph.out_edges(event)) 
 908                                            if not _edges.continue_on_failure(graph.edges.get(e))
 909                                        ]
 910                                else:
 911                                    complete.add(event)
 912                                    skipping = [
 913                                        e[1] for e in set(graph.out_edges(event)) 
 914                                        if not _edges.continue_on_success(graph.edges.get(e))
 915                                    ]
 916                                skipped  = skipped.union(skipping)
 917                                for n in skipping:
 918                                    skipped  = skipped.union(_nx.descendants(graph, n))
 919                                carryon  = set(graph.successors(event)).difference(skipped)
 920                                starting = [
 921                                    successor for successor in carryon if
 922                                    run_conditions_met(successor)
 923                                ]
 924                                for successor in starting:
 925                                    _args = [
 926                                        a if not isinstance(a, _xcoms.Promise) else refs[a.key].result()
 927                                        for a in graph.nodes[successor]['args']
 928                                    ]
 929                                    _kwargs = {
 930                                        k: v if not isinstance(v, _xcoms.Promise) else refs[v.key].result()
 931                                        for k, v in graph.nodes[successor]['kwargs'].items()
 932                                    }
 933                                    refs[successor] = Submit(
 934                                        successor,
 935                                        graph.nodes[successor]['callable'],
 936                                        graph.nodes[successor]['name'],
 937                                        _args,
 938                                        _kwargs
 939                                    )
 940                                    started.add(successor)
 941                        # Initialized nodes that are incomplete or not yet documented as complete.
 942                        events = started.difference(complete.union(failed).union(skipped))
 943
 944class QueueDAG(DAG):
 945    """
 946    #### Experimental, as of v0.2. ####
 947
 948    Directed Acyclic Graph used to queue and execute Pythonic callables in parallel,
 949    while stringing the outputs of those callables in linear sequences.
 950
 951    Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the 
 952    behavior of root nodes to determine if and/or when the DAG run has been completed.
 953
 954    Root nodes will be re-executed until culled by one of two conditions:
 955    1. A max number of iterations has been completed, or
 956    2. The output of the root node's callable matches a lambda halt_condition.
 957
 958    Both conditions can be set at run-time.
 959
 960    As of v0.2, QueueDAG experiences extreme latency when nested inside of a standard webber.DAG class.
 961    """
 962
 963    conditions = {}
 964
 965    def __init__(self):
 966        super().__init__()
 967
 968    def add_node(self, node, *args, **kwargs):
 969        """
 970        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
 971        On success, return unique identifier for the new node.
 972        
 973        Reserved key-words are used for Queue DAG definitions:
 974        
 975        - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables.
 976
 977        \t halt_condition = (lambda output: output == None) 
 978
 979        - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument.
 980
 981        - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop.
 982        """
 983        halt_condition = kwargs.pop('halt_condition', None)
 984        iterator: int = kwargs.pop('iterator', None)
 985        max_iter: int = kwargs.pop('max_iter', None)
 986
 987        return_val = super().add_node(node, *args, **kwargs)
 988        
 989        if max_iter != None:
 990            iter_limit = int(max_iter)
 991        elif iterator != None:
 992            iter_limit = int(iterator)
 993        else:
 994            iter_limit = None
 995        
 996        self.conditions[return_val] = {
 997            'halt_condition': halt_condition,
 998            'iter_limit': iter_limit
 999        }
1000
1001        return return_val
1002        
1003    def add_edge(self, u_of_edge, v_of_edge, continue_on = Condition.Success):
1004        """
1005        Adds an edge between nodes in the Queue DAG's underlying graph.
1006        Queue DAG nodes may have a maximum of one child and one parent worker.
1007        """
1008        for node in (u_of_edge, v_of_edge):
1009            try:
1010                node_id = self.node_id(node)
1011            except:
1012                continue
1013        
1014            filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id)
1015            try:
1016                assert(len(self.filter_edges(filter)) == 0)
1017            except Exception as e:
1018                e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.")
1019                raise e
1020
1021        return super().add_edge(u_of_edge, v_of_edge, continue_on)
1022
1023    def execute(self, *promises, return_ref=False, print_exc=False):
1024        """
1025        Basic wrapper for execution of the DAG's underlying callables.
1026        """
1027        queues = {}
1028        processes = {}
1029        join = set()
1030        
1031        promises: dict = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {}
1032
1033        with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _:
1034
1035            # Skip execution if there are no callables in scope.
1036            if len(self.graph.nodes) == 0:
1037                print('Given DAG has no callables in scope. Skipping execution...')
1038                return
1039
1040            with _futures.ThreadPoolExecutor() as executor:
1041
1042                for id in self.root:
1043                    node = self.get_node(id)
1044                    queues[id] = _q.LifoQueue()
1045                    node.update({
1046                        'callable': _queue._worker,
1047                        'args': tuple(),
1048                        'kwargs': {
1049                            'work': node.callable,
1050                            'args': node.args, 'kwargs': node.kwargs,
1051                            'promises': promises,
1052                            'print_exc': print_exc,
1053                            'halt_condition': self.conditions[id]['halt_condition'],
1054                            'iter_limit': self.conditions[id]['iter_limit'],
1055                            'out_queue': queues.get(id)
1056                        }
1057                    })
1058                    processes[id] = executor.submit(
1059                        _event_wrapper,
1060                        _callable=node['callable'],
1061                        _name=node['name'],
1062                        _args=node['args'],
1063                        _kwargs=node['kwargs']
1064                    )
1065                
1066                for parent_id, id in self.graph.edges:
1067                    node = self.get_node(id)
1068                    queues[id] = _q.LifoQueue()
1069                    if len(list(self.graph.successors(id))) == 0:
1070                        end_proc = id
1071                    node.update({
1072                        'callable': _queue._worker,
1073                        'args': tuple(),
1074                        'kwargs': {
1075                            'work': node.callable,
1076                            'args': node.args, 'kwargs': node.kwargs,
1077                            'promises': promises,
1078                            'print_exc': print_exc,
1079                            'parent_id': parent_id,
1080                            'parent_process': processes[parent_id],
1081                            'in_queue': queues.get(parent_id),
1082                            'out_queue': queues.get(id)
1083                        }
1084                    })
1085                    processes[id] = executor.submit(
1086                        _event_wrapper,
1087                        _callable=node['callable'],
1088                        _name=node['name'],
1089                        _args=node['args'],
1090                        _kwargs=node['kwargs']
1091                    )
1092
1093            
1094            while len(join) != len(self.graph.nodes):
1095                for node in self.graph.nodes:
1096                    if processes[node].done():
1097                        join.add(node)
1098            
1099            return_val = []
1100            while not queues[end_proc].empty():
1101                return_val.append(queues[end_proc].get())
1102            
1103            return return_val
class DAG:
 75class DAG:
 76    """
 77    Directed Acyclic Graph used to represent Pythonic tasks in parallel.
 78    """
 79    def add_node(self, node, *args, **kwargs) -> str:
 80        """
 81        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
 82        On success, return unique identifier for the new node.
 83        """
 84        if not _iscallable(node):
 85            err_msg = f"{node}: requested node is not a callable Python function."
 86            raise TypeError(err_msg)
 87
 88        node_name = _edges.label_node(node)
 89
 90        args = tuple([
 91            arg if not isinstance(args, _xcoms.Promise) else self.resolve_promise(arg)
 92            for arg in args
 93        ])
 94
 95        for k, val in kwargs.items():
 96            if isinstance(val, _xcoms.Promise):
 97                kwargs[k] = self.resolve_promise(val)
 98
 99        self.graph.add_node(
100            node_for_adding=node_name,
101            callable=node, args=args, kwargs=kwargs,
102            name=node.__name__,
103            id=node_name
104        )
105
106        return node_name
107
108
109    def add_edge(
110            self, 
111            u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable],
112            continue_on: Condition = Condition.Success
113        ) -> _T.Tuple[str,str]:
114        """
115        Adds an edge between nodes in the DAG's underlying graph,
116        so long as the requested edge is unique and has not been added previously.
117
118        On success, returns Tuple of the new edge's unique identifiers.
119        """
120        # Validate inputs prior to execution
121        # - Nodes must be identifiers or callables
122        # - Conditions must belong to the webber.edges.Condition class
123        if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)):
124            err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable"
125            raise TypeError(err_msg)
126        if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)):
127            err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable"
128            raise TypeError(err_msg)
129        if not isinstance(continue_on, Condition):
130            raise TypeError("Edge conditions must use the webber.edges.Condition class.")
131
132        # Base Case 0: No nodes are present in the DAG:
133        # Ensure that both nodes are callables, then add both to the graph and
134        # assign the outgoing node as a root.
135        if len(self.graph.nodes()) == 0:
136            if not _iscallable(u_of_edge):
137                err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope."
138                raise ValueError(err_msg)
139            if not _iscallable(v_of_edge):
140                err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope."
141                raise ValueError(err_msg)
142            outgoing_node = self.add_node(u_of_edge)
143            incoming_node = self.add_node(v_of_edge)
144            self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
145            return (outgoing_node, incoming_node)
146
147        node_names, node_callables = zip(*self.graph.nodes(data='callable'))
148        graph_edges = list(self.graph.edges(data=False))
149        new_callables = dict()
150
151        if _iscallable(u_of_edge) and _iscallable(v_of_edge):
152
153            # Error Cases 0, 1: Either of the callables appear more than once in the DAG.
154            if node_callables.count(u_of_edge) > 1:
155                err_msg = f"Outgoing callable {u_of_edge.__name__} " \
156                        + "exists more than once in this DAG. " \
157                        + "Use the unique identifier of the required node or add a new node."
158                raise ValueError(err_msg)
159            if node_callables.count(v_of_edge) > 1:
160                err_msg = f"Incoming callable {v_of_edge.__name__} " \
161                        + "exists more than once in this DAG. " \
162                        + "Use the unique string identifier of the required node."
163                raise ValueError(err_msg)
164            
165            # Base Case 1: Both args are callables and will be present in the DAG scope no more than once.
166            # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes.
167            try:
168                outgoing_node = self.node_id(u_of_edge)
169            except:
170                new_callables[u_of_edge] = u_of_edge
171                outgoing_node = _edges.label_node(u_of_edge)
172            try:
173                incoming_node = self.node_id(v_of_edge)
174            except:
175                new_callables[v_of_edge] = v_of_edge
176                incoming_node = _edges.label_node(v_of_edge)
177
178        else:
179
180            # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope.
181            if isinstance(u_of_edge, str) and u_of_edge not in node_names:
182                err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope."
183                raise ValueError(err_msg)
184            if isinstance(v_of_edge, str) and v_of_edge not in node_names:
185                err_msg = f"Incoming node {v_of_edge} not in DAG's current scope."
186                raise ValueError(err_msg)
187
188            # Both nodes' unique identifiers are present in the DAG
189            # and should be evaluated for a valid edge.
190            if isinstance(u_of_edge, str) and isinstance(v_of_edge, str):
191                outgoing_node = u_of_edge
192                incoming_node = v_of_edge
193
194            # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier.
195            else:
196                for node in (u_of_edge, v_of_edge):
197                    def _assign_node(n):
198                        # For the argument that is a unique string identifier, assign and continue.
199                        if not _iscallable(n):
200                            return n
201                        # Error Case 4: The requested callable exists more than once in the DAG.
202                        if node_callables.count(n) > 1:
203                            err_msg = f"Outgoing callable {n.__name__} " \
204                                + "exists more than once in this DAG. " \
205                                + "Use the unique ID of the required node or add a new node."
206                            raise ValueError(err_msg)
207                        # If the callable exists only once in the DAG, use its unique identifier to
208                        # evaluate the requested edge.
209                        if node_callables.count(n) == 1:
210                            return node_names[ node_callables.index(n) ]
211                        # Otherwise, the callable is new and needs to be added to the DAG scope.
212                        else:
213                            new_callables[n] = n
214                            return _edges.label_node(n)
215                    if node == u_of_edge:
216                        outgoing_node = _assign_node(node)
217                    else:
218                        incoming_node = _assign_node(node)
219
220        # Error Case 5: Both callables exist only once in the DAG,
221        # but an edge already exists between them.
222        if (outgoing_node, incoming_node) in graph_edges:
223            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \
224                    + "a definition in this DAG."
225            raise ValueError(err_msg)
226
227        # Ensure that no cycles will be created by adding this edge to the DAG.
228        test_edges: list = graph_edges + [(outgoing_node, incoming_node)]
229
230        # Error Case 6: Both callables exist only once in the DAG,
231        # but adding an edge between them creates circular dependencies.
232        if not _nx.is_directed_acyclic_graph(_nx.DiGraph(test_edges)):
233            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \
234                    + "results in circular dependencies."
235            raise ValueError(err_msg)
236
237        # We can now add the edge to the DAG, since we are certain it will not result in
238        # illegal dependencies/behavior.
239        # First, we should account for potential new nodes. This also handles
240        # duplicates on first entry to the DAG (e.g.: edge == (print, print))
241        if new_callables.get(u_of_edge) != None:
242            outgoing_node = self.add_node(new_callables[u_of_edge])
243        if new_callables.get(v_of_edge) != None:
244            incoming_node = self.add_node(new_callables[v_of_edge])
245
246        # Then we can add the new edge.
247        self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
248        return (outgoing_node, incoming_node)
249    
250    def remove_node(self, *posargs, **kwargs) -> None:
251        """
252        Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG.
253        Throws error message and recommends safer methods.
254        """
255        raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.")
256
257    def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]:
258        """
259        Removes an directed edge between nodes in the DAG's underlying graph.
260        Throws error if the edge does not exist.
261        On success, returns Tuple of the removed edge's unique identifiers.
262        """
263        edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge))
264        if edge_id not in self.graph.edges(data = False):
265            err_msg = "Requested edge does not exist in the DAG's scope"
266            raise ValueError(err_msg)
267        self.graph.remove_edge(edge_id[0], edge_id[1])
268        return edge_id
269
270    def update_edges(self, *E, continue_on: Condition, filter: _types.LambdaType = None, data = None):
271        """
272        Flexible function to update properties of edges in the DAG's scope, 
273        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
274        lambda filter using the edgedict syntax.
275        
276        List of nodes to update or filter argument is expected. Valid edge lists include:
277
278        \t update_edges((node1, node2), ...)       or update_edges([(node1, node2)], ...)
279
280        \t update_edges((node1, callable2), ...)   or update_edges([(node1, callable2)], ...)
281        
282        \t update_edges(edgedict),                 where isinstance(edgedict, webber.edges.edgedict) == True
283
284        Parameters:
285
286        > continue_on: webber.edges.Condition value to update matching edges with, 
287        
288        \t Execute child on success, failure, or any exit state, of the parent node
289
290        > filter: lambda property that can be used instead of a list of edges to be updated.
291        
292        \t filter = (lambda e: n.parent == print or e.child == print)
293
294        > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set.
295
296        \t data = { 'continue_on': webber.edges.Condition }
297
298        """
299        if len(E) == 0 and filter == None:
300            raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.")
301
302        elif isinstance(E, dict) or isinstance(E, edgedict):
303            E = [E]
304
305        elif len(E) == 1 and isinstance(E[0], _abc.Iterable):
306            try:
307                _ = self.get_edges(E[0])
308            except:
309                E = E[0]
310
311        if filter != None:
312            edge_ids = self.filter_edges(filter, data = False)
313        else:
314            if isinstance(E[0], dict) or isinstance(E[0], edgedict):
315                try:
316                    ids = [e['id'] for e in E]
317                except KeyError:
318                    err_msg = 'In dictionary form, all given edges must be follow edgedict standards.'
319                    raise ValueError(err_msg)
320            else:
321                ids = E
322            edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids]
323
324        std_update = (continue_on == None)
325
326        if std_update:
327            for edge_id, e in zip(edge_ids, E):
328                if data != None:
329                    self._update_edge(data, id = edge_id)
330                else:
331                    self._update_edge(e, id = edge_id)
332        
333        else:
334            if continue_on != None:
335                if not isinstance(continue_on, Condition):
336                    err_msg = f"Condition assignment must use webber.edges.Condition"
337                    raise TypeError(err_msg)
338                for e in edge_ids:
339                    self.graph.edges[e]['Condition'] = continue_on
340
341    def _update_edge(self, edgedict: dict, id: tuple[str, str] = None, force: bool = False):
342        """
343        Internal only. Update properties of an individual edge within a DAG's scope, 
344        given a well-structured dictionary and the tuple identifier of the network edge. 
345        Force argument bypasses validation, and should only be used internally.
346        """
347        if id != None:
348            try:
349                if edgedict.get('id') and id != edgedict['id']:
350                    raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {edgedict['id']}")
351            except ValueError as e:
352                if not force:
353                    raise e
354            edgedict['id'] = id
355
356        expected_keys = ('parent', 'child', 'id', 'continue_on')
357        if not set(expected_keys).issuperset(set(edgedict.keys())):
358            raise ValueError(f"Expecting keys: {expected_keys}")
359
360        if not force:
361            
362            e1, e2 = None, None
363
364            if edgedict.get('id'):
365                e1 = self.get_edge(edgedict['id'], data = False)
366
367            if edgedict.get('parent') or edgedict.get('child'):
368                e2 = self.get_edge(edgedict.get('parent'), edgedict.get('child'), data = False)
369            else:
370                e2 = e1
371
372            if e1 != e2:
373                raise ValueError('Edge vertices should not be changed using update functions.')
374            
375            elif e1 == None:
376                raise ValueError('Requested edge was not given an identifier.')
377
378            if edgedict.get('continue_on') and not isinstance(edgedict['continue_on'], Condition):
379                err_msg = f"Condition assignment must use webber.edges.Condition"
380                raise TypeError(err_msg)
381                       
382        edge_id = edgedict.pop('id')
383        edge = {k: v for k,v in edgedict.items() if k not in ('parent', 'child', 'id')}
384        self.graph.edges[(edge_id[0], edge_id[1])].update(edge)
385
386    def relabel_node(self, node: _T.Union[str, _T.Callable], label: str):
387        """
388        Update the label, or name, given to a node in the DAG's scope, 
389        given a Python string and a node identifier. 
390        Well-structured wrapper for a common use-case of DAG.update_nodes.
391        """
392        node_id = self.node_id(node)
393        if not isinstance(label, str) or len(label) == 0:
394            err_msg = "Node label must be a Python string with one or more characters."
395            raise ValueError(err_msg)
396        self.update_nodes(node_id, data = {'name': label})
397        return label
398
399    def update_nodes(self, *N, filter: _types.LambdaType = None, data = None, callable = None, args = None, kwargs = None):
400        """
401        Flexible function to update properties of nodes in the DAG's scope, 
402        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
403        lambda filter using the dotdict syntax.
404        
405        List of nodes to update or filter argument is expected. Valid node lists include:
406
407        \t update_nodes(node_id, ...)           or update_nodes([node_id], ...)
408
409        \t update_nodes(callable, ...)          or update_nodes([callable], ...)
410        
411        \t update_nodes(node1, node2, ...)      or update_nodes([node1, node2], ...)
412        
413        \t update_nodes(node1, callable2, ...)  or update_nodes([node1, callable2], ...)
414
415        > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id)
416
417        Parameters:
418
419        > filter: lambda property that can be used instead of a list of nodes to be updated.
420        
421        \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args)
422
423        > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set.
424            Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method.
425            
426        \t data = {
427            \t 'callable': print,
428            \t 'args': ['Hello', 'World'],
429            \t 'kwargs': {'sep': ', '},
430            \t 'name': 'custom_label',
431            \t 'id': 'unique-identifier'
432            \t}
433
434        > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List).
435
436        > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary.
437
438        """
439        if len(N) == 0 and filter == None:
440            raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.")
441
442        elif len(N) > 0 and filter != None:
443            raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.")
444
445        elif isinstance(N, dict) or isinstance(N, str):
446            N = [N]
447
448        elif len(N) == 1 and isinstance(N[0], _abc.Iterable):
449            if isinstance(N[0][0], dict):
450                N = N[0]
451            elif isinstance(N[0][0], str):
452                # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator).
453                if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 
454                    N = N[0]
455
456
457        if filter != None:
458            node_ids = self.filter_nodes(filter, data = False)
459        else:
460            if isinstance(N[0], dict):
461                ids = [n['id'] for n in N]
462            else:
463                ids = N
464            node_ids = [self.node_id(i) for i in ids]
465        
466        std_update = (callable == None) and (args == None) and (kwargs == None)
467
468        if std_update:
469            for node_id, n in zip(node_ids, N):
470                if data != None:
471                    self._update_node(data, id = node_id)
472                else:
473                    self._update_node(n, id = node_id)
474        
475        else:
476            if callable != None:
477                if not _iscallable(callable):
478                    err_msg = f"Requested node is not assigned a callable Python function."
479                    raise TypeError(err_msg)
480                for node_id in node_ids:
481                    self.graph.nodes[node_id]['callable'] = callable
482                    self.graph.nodes[node_id]['name'] = callable.__name__
483            
484            if args != None:
485                if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)):
486                    err_msg = f"Requested node is not assigned a tuple of pos args."
487                    raise TypeError(err_msg)
488                args = tuple(args)
489                for node_id in node_ids:
490                    self.graph.nodes[node_id]['args'] = args
491            
492            if kwargs != None:
493                if not isinstance(kwargs, dict):
494                    err_msg = f"Requested node is not assigned a dictionary of kw args."
495                    raise TypeError(err_msg)
496                for node_id in node_ids:
497                    self.graph.nodes[node_id]['kwargs'] = kwargs
498
499    def get_edges(self, *N, data: bool = True) -> _T.Union[list[edgedict], list[tuple]]:
500        """
501        Retrieval function for DAG edge data, based on tuple identifiers.
502        Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2']))
503        """
504        if len(N) == 0:
505            if data == True:
506                return list(map(edgedict, self.graph.edges.data()))
507            return list(self.graph.edges.data(data=False))
508            
509        # elif len(N) == 1:
510        #     if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple):
511        #         N = N[0]
512
513        if len(N) != len(set(N)) or False in map(lambda n: isinstance(n, _abc.Iterable) and len(n) == 2, N):
514            err_msg = 'All requested edges must be unique tuples of size 2.'
515            raise ValueError(err_msg)
516    
517        edge_data = [self.get_edge(o, i, data=data) for (o, i) in N]
518        return edge_data
519    
520    def get_edge(self, outgoing_node: _T.Union[str, callable], incoming_node: _T.Union[str, callable], data: bool = True) -> _T.Union[edgedict, tuple]:
521        """
522        Retrieval function for a single directed edge between nodes in a DAG's scope. 
523        """
524        id = (self.node_id(outgoing_node), self.node_id(incoming_node))
525        if not data:
526            return id
527        edge_data = self.graph.get_edge_data(u = id[0], v = id[1])
528        if not edge_data:
529            err_msg = f'No match found for the directed edge requested: {id}'
530            raise ValueError(err_msg)
531        return edgedict(*id, **edge_data)
532
533    def get_node(self, n: _T.Union[str, callable]) -> dotdict:
534        """
535        Given a unique identifier, returns a dictionary of node metadata
536        for a single node in the DAG's scope.
537        """
538        node_id = self.node_id(n)
539        return dotdict(self.graph.nodes[node_id])
540
541    def get_nodes(self, *N) -> list[dotdict]:
542        """
543        Flexible function to retrieve DAG node data, based on node identifiers 
544        (e.g.: string IDs or unique callables).
545        """
546        if len(N) == 0:
547            node_data = list(self.graph.nodes.values())
548            return [dotdict(d) for d in node_data]
549
550        elif len(N) == 1:
551            if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str):
552                N = N[0]
553            else:
554                node_id = self.node_id(N[0])
555                node_data = [dotdict(self.graph.nodes[node_id])]
556                return node_data
557            
558        if not len(N) == len(set(N)):
559            err_msg = 'All requested nodes must be unique identifiers.'
560            raise ValueError(err_msg)
561        
562        node_ids  = [self.node_id(n) for n in N]
563        node_data = [dotdict(self.graph.nodes[n]) for n in node_ids]
564        return node_data
565
566    def filter_nodes(self, filter: _types.LambdaType, data: bool = False):
567        """
568        Given a lambda function, filter nodes in a DAG's scope based on its attributes.
569        Current limitation: Filters must use node identifier strings when referencing nodes.
570        Use get_nodes for more flexible controls.
571        """
572        if not data:
573            return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))]
574        return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))]
575    
576    def filter_edges(self, filter = _types.LambdaType, data: bool = False) -> list[edgedict]:
577        """
578        Given a lambda function, filter edges in a DAG's scope based on its attributes.
579        Current limitation: Filters must use node identifier strings when referencing nodes.
580        Use get_edges for more flexible controls.
581        """
582        if not data:
583            return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
584        return [edgedict(e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
585
586    def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int):
587        """
588        Given a node identifier, set number of automatic retries in case of failure.
589        Re-attempts will begin as soon as possible.
590        """
591        if not isinstance(count, int) or not count >= 0:
592            raise ValueError("Retry count must be a non-negative integer.")
593        node = self.node_id(identifier)
594        self.graph.nodes[node]['retry'] = count
595
596    def skip_node(self, identifier: _T.Union[str,_T.Callable], skip: bool = True, as_failure = False):
597        """
598        Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error).
599        Allows conditional control and testing over DAG's order of operations.
600        """
601        if not isinstance(skip, bool):
602            raise ValueError("Skip argument must be a boolean value.")
603        node = self.node_id(identifier)
604        self.graph.nodes[node]['skip'] = (skip, as_failure)
605      
606    def critical_path(self, nodes):
607        """
608        Given a set of nodes, returns a subset of the DAG containing
609        only the node(s) and its parents, or upstream dependencies.
610        """
611        if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str):
612            node_ids = {self.node_id(n) for n in nodes}
613        else:
614            node_ids = {self.node_id(nodes)}
615        return self._subgraph(node_ids)
616
617    def execute(self, return_ref=False, print_exc=False):
618        """
619        Basic wrapper for execution of the DAG's underlying callables.
620        """
621        executor = self.DAGExecutor(self.graph, self.root, print_exc)
622        return executor if return_ref else None
623
624    def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] = None):
625        """
626        Basic wrapper to visualize DAG using Vis.js and NetGraph libraries.
627        By default, visualization library only loaded in after DAG.visualize() is called, halving import times.
628        """
629        import webber.viz as _viz
630
631        match type:
632            case 'browser':
633                _viz.visualize_browser(self.graph)
634
635            case 'plt':
636                return _viz.visualize_plt(self.graph)
637
638            case 'gui':
639                # _visualize_gui(self.graph)
640                raise NotImplementedError
641
642            case None: 
643                if _viz._in_notebook():
644                    return _viz.visualize_plt(self.graph)
645                else:
646                    _viz.visualize_browser(self.graph)
647
648            case _:
649                err_msg = "Unknown visualization type requested."
650                raise NotImplementedError(err_msg)
651
652    @property
653    def root(self) -> list[str]:
654        """
655        Return list of nodes with no dependencies.
656        Root nodes will occur first in DAG's order of operations.
657        """
658        return list(filter(
659            lambda node: len(list(self.graph.predecessors(node))) < 1,
660            self.graph.nodes.keys()
661        ))
662
663    @property
664    def nodes(self):
665        return self.graph.nodes
666
667    def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str:
668        """
669        Validate whether identifier given is a valid node within the DAG's scope.
670        Primarily for internal use, but useful for retrieving string identifiers
671        for a unique callable in a DAG.
672        """
673        node_names, node_callables = zip(*self.graph.nodes(data='callable'))
674
675        if isinstance(identifier, str):
676            if identifier not in node_names:
677                err_msg = f"Node {identifier} is not defined in this DAG's scope."
678                raise ValueError(err_msg)
679            node = identifier
680        elif _iscallable(identifier):
681            match node_callables.count(identifier):
682                case 0:
683                    err_msg = f"Callable {identifier} is not defined in this DAG's scope."
684                    raise ValueError(err_msg)
685                case 1:
686                    node = node_names[ node_callables.index(identifier) ]
687                case _:
688                    err_msg = f"Callable {identifier.__name__} " \
689                            + "exists more than once in this DAG. " \
690                            + "Use the unique identifier of the required node."
691                    raise ValueError(err_msg)
692        else:            
693            err_msg = f"Node {identifier} must be a string or a Python callable"
694            raise TypeError(err_msg)
695        return node
696    
697    def _update_node(self, nodedict: dict, id: str = None, force: bool = False):
698        """
699        Internal only. Update properties of single node within a DAG's scope, 
700        given a well-structured dictionary and the tuple identifier of the network edge. 
701        Force argument bypasses dictionary validation, and should only be used internally.
702        """
703        if id != None:
704            try:
705                if nodedict.get('id') != None and id != nodedict['id']:
706                    raise ValueError(f"Given ID {id} inconsistent with dictionary identifier: {nodedict['id']}")
707            except ValueError as e:
708                if not force:
709                    raise e
710            nodedict['id'] = id
711        
712        expected_keys = ('callable', 'args', 'kwargs', 'name', 'id')
713        if not set(expected_keys).issuperset(set(nodedict.keys())):
714            raise ValueError(f"Expecting keys: {expected_keys}")
715        
716        if not force:
717            if nodedict.get('callable'):
718                if _iscallable(nodedict['callable']):
719                    err_msg = f"Requested node is not assigned a callable Python function."
720                    raise TypeError(err_msg)
721                if not nodedict.get('name'):
722                    nodedict['name'] = nodedict['callable'].__name__
723
724            if nodedict.get('name') and (not isinstance(nodedict['name'], str) or len(nodedict['name']) == 0): 
725                err_msg = f"Requested node name must be a non-null Python string, will default to callable when not set."
726                raise TypeError(err_msg)
727
728            if nodedict.get('args'):
729                if not (isinstance(nodedict['args'], _abc.Iterable) or isinstance(nodedict['args'], str)):
730                    err_msg = f"Requested node is not assigned a tuple of pos args."
731                    raise TypeError(err_msg)
732                nodedict['args'] = tuple(nodedict['args'])
733            
734            if nodedict.get('kwargs') and not isinstance(nodedict['kwargs'], dict):
735                err_msg = f"Requested node is not assigned a dictionary of kw args."
736                raise TypeError(err_msg)                    
737        
738        node_id = nodedict.pop('id')
739        self.graph.nodes[node_id].update(nodedict)
740
741        # Reset node name if implicitly requested.
742        if not nodedict.get('name'):
743            self.graph.nodes[node_id]['name'] = self.graph.nodes[node_id]['callable'].__name__
744
745    def _subgraph(self, node_ids: set[str]):
746        """
747        Internal only. Given a set of nodes, returns a subset of the DAG containing
748        only the node(s) and upstream dependencies.
749        """
750        parent_nodes = set()
751        for node in node_ids:
752            parent_nodes = parent_nodes.union(set(self.graph.predecessors(node)))
753        while parent_nodes:
754            node_ids = node_ids.union(parent_nodes)
755            parent_nodes, child_nodes = set(), parent_nodes
756            for node in child_nodes:
757                parent_nodes = parent_nodes.union(set(self.graph.predecessors(node)))
758        subgraph = self.graph.subgraph(node_ids)
759        return DAG(subgraph, __force=True)
760
761    def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise:
762        """
763        Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope.
764        Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope.
765        """
766        try:
767            key = self.node_id(promise.key)
768        except Exception as e:
769            raise _xcoms.InvalidCallable(e)
770        return _xcoms.Promise(key)
771
772    def __init__(self, graph: _nx.DiGraph = None, **kwargs) -> None:
773
774        if graph is None:
775            self.graph = _nx.DiGraph()
776            return
777        
778        # Meant for internal use only, creating DAGs from subgraphs.
779        if kwargs.get('__force') == True:
780            self.graph = graph
781            return
782        
783        _edges.validate_dag(graph)
784
785        # Define framework specific logic as nested dictionaries.
786        for node in graph.nodes.keys():
787            graph.nodes[node]['callable'] = node
788            graph.nodes[node]['name'] = node.__name__
789            graph.nodes[node]['args'] = []
790            graph.nodes[node]['kwargs'] = {}
791
792        for e in graph.edges:
793            condition = graph.edges[e].get('Condition')
794            if condition is not None:
795                if condition not in Condition:
796                    raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.')
797            else:
798                graph.edges[e]['Condition'] = Condition.Success
799            
800        graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node))
801        for n in graph.nodes:
802            graph.nodes[n]['id'] = n
803        self.graph = _nx.DiGraph(graph)
804
805    class DAGExecutor:
806        """
807        Base class used to execute DAG in embarrassingly parallel.
808        """
809        def __init__(self, graph: _nx.DiGraph, roots: list, print_exc: bool = False) -> None:
810            
811            with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _:
812                
813                # Skip execution if there are no callables in scope.
814                if len(graph.nodes) == 0:
815                    print('Given DAG has no callables in scope. Skipping execution...')
816                    return
817
818                # Initialize local variables for execution.
819                complete, started, failed, skipped = set(), set(), set(), set()
820                events = set(roots)
821                refs: dict[str, _futures.Future] = {}
822
823                def raise_exc(message):
824                    raise ValueError(message)
825
826                def run_conditions_met(n):
827                    for p in graph.predecessors(n):
828                        match graph.edges.get((p, n))['Condition']:
829                            case Condition.Success:
830                                if p not in complete:
831                                    return False
832                            case Condition.Failure:
833                                if p not in failed:
834                                    return False
835                            case Condition.AnyCase:
836                                if p not in failed and p not in complete:
837                                    return False
838                    return True
839
840                skip  = graph.nodes.data("skip", default=(False, False))
841                retry = {n: [c+1, {}] for n,c in graph.nodes.data("retry", default=0)}
842
843                # Start execution of root node functions.
844                with _futures.ThreadPoolExecutor() as executor:
845
846                    def Submit(event, callable, name, args, kwargs):
847                        if skip[event][0]: # NOTE: Internally, DAGExecutor tracks these skipped events as successes or failures.
848                            retry[event][0] = 0
849                            skip_callable = raise_exc if skip[event][1] else print
850                            return executor.submit(
851                                _event_wrapper,
852                                _callable=skip_callable,
853                                _name=graph.nodes[event]['name'],
854                                _args=[f"Event {event} skipped..."],
855                                _kwargs={}
856                            )
857                        else:
858                            retry[event][0] -= 1
859                            if (retry[event][0] > 0) and (retry[event][1] == {}):
860                                retry[event][1] = {
861                                    'callable': callable,
862                                    'name': name,
863                                    'args': args,
864                                    'kwargs': kwargs
865                                }
866                            return executor.submit(
867                                _event_wrapper,
868                                _callable=callable,
869                                _name=name,
870                                _args=args,
871                                _kwargs=kwargs
872                            )
873
874                    for event in events:
875                        refs[event] = Submit(
876                            event,
877                            graph.nodes[event]['callable'],
878                            graph.nodes[event]['name'],
879                            graph.nodes[event]['args'],
880                            graph.nodes[event]['kwargs']
881                        )
882                        started.add(event)
883                    # Loop until all nodes in the network are executed.
884                    while (len(complete) + len(failed) + len(skipped)) != len(graph):
885                        for event in events:
886                            if refs[event].done() is True:
887                                if refs[event].exception(timeout=0) is not None:
888                                    try:
889                                        raise refs[event].exception(timeout=0)
890                                    except:
891                                        if print_exc:
892                                            _traceback.print_exc()
893                                        
894                                        if retry[event][0] > 0:
895                                            print(f"Event {event} exited with exception, retrying...")
896                                            refs[event] = Submit(
897                                                event,
898                                                callable=retry[event][1]['callable'],
899                                                name=retry[event][1]['name'],
900                                                args=retry[event][1]['args'],
901                                                kwargs=retry[event][1]['kwargs']
902                                            )
903                                            continue                                            
904
905                                        print(f"Event {event} exited with exception...")
906                                        failed.add(event)
907                                        skipping = [
908                                            e[1] for e in set(graph.out_edges(event)) 
909                                            if not _edges.continue_on_failure(graph.edges.get(e))
910                                        ]
911                                else:
912                                    complete.add(event)
913                                    skipping = [
914                                        e[1] for e in set(graph.out_edges(event)) 
915                                        if not _edges.continue_on_success(graph.edges.get(e))
916                                    ]
917                                skipped  = skipped.union(skipping)
918                                for n in skipping:
919                                    skipped  = skipped.union(_nx.descendants(graph, n))
920                                carryon  = set(graph.successors(event)).difference(skipped)
921                                starting = [
922                                    successor for successor in carryon if
923                                    run_conditions_met(successor)
924                                ]
925                                for successor in starting:
926                                    _args = [
927                                        a if not isinstance(a, _xcoms.Promise) else refs[a.key].result()
928                                        for a in graph.nodes[successor]['args']
929                                    ]
930                                    _kwargs = {
931                                        k: v if not isinstance(v, _xcoms.Promise) else refs[v.key].result()
932                                        for k, v in graph.nodes[successor]['kwargs'].items()
933                                    }
934                                    refs[successor] = Submit(
935                                        successor,
936                                        graph.nodes[successor]['callable'],
937                                        graph.nodes[successor]['name'],
938                                        _args,
939                                        _kwargs
940                                    )
941                                    started.add(successor)
942                        # Initialized nodes that are incomplete or not yet documented as complete.
943                        events = started.difference(complete.union(failed).union(skipped))

Directed Acyclic Graph used to represent Pythonic tasks in parallel.

DAG(graph: networkx.classes.digraph.DiGraph = None, **kwargs)
772    def __init__(self, graph: _nx.DiGraph = None, **kwargs) -> None:
773
774        if graph is None:
775            self.graph = _nx.DiGraph()
776            return
777        
778        # Meant for internal use only, creating DAGs from subgraphs.
779        if kwargs.get('__force') == True:
780            self.graph = graph
781            return
782        
783        _edges.validate_dag(graph)
784
785        # Define framework specific logic as nested dictionaries.
786        for node in graph.nodes.keys():
787            graph.nodes[node]['callable'] = node
788            graph.nodes[node]['name'] = node.__name__
789            graph.nodes[node]['args'] = []
790            graph.nodes[node]['kwargs'] = {}
791
792        for e in graph.edges:
793            condition = graph.edges[e].get('Condition')
794            if condition is not None:
795                if condition not in Condition:
796                    raise TypeError(e, 'Edge conditions must belong to IntEnum type Webber.Condition.')
797            else:
798                graph.edges[e]['Condition'] = Condition.Success
799            
800        graph = _nx.relabel_nodes(graph, lambda node: _edges.label_node(node))
801        for n in graph.nodes:
802            graph.nodes[n]['id'] = n
803        self.graph = _nx.DiGraph(graph)
def add_node(self, node, *args, **kwargs) -> str:
 79    def add_node(self, node, *args, **kwargs) -> str:
 80        """
 81        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
 82        On success, return unique identifier for the new node.
 83        """
 84        if not _iscallable(node):
 85            err_msg = f"{node}: requested node is not a callable Python function."
 86            raise TypeError(err_msg)
 87
 88        node_name = _edges.label_node(node)
 89
 90        args = tuple([
 91            arg if not isinstance(args, _xcoms.Promise) else self.resolve_promise(arg)
 92            for arg in args
 93        ])
 94
 95        for k, val in kwargs.items():
 96            if isinstance(val, _xcoms.Promise):
 97                kwargs[k] = self.resolve_promise(val)
 98
 99        self.graph.add_node(
100            node_for_adding=node_name,
101            callable=node, args=args, kwargs=kwargs,
102            name=node.__name__,
103            id=node_name
104        )
105
106        return node_name

Adds a callable with positional and keyword arguments to the DAG's underlying graph. On success, return unique identifier for the new node.

def add_edge( self, u_of_edge: Union[str, Callable], v_of_edge: Union[str, Callable], continue_on: Condition = <Condition.Success: 0>) -> Tuple[str, str]:
109    def add_edge(
110            self, 
111            u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable],
112            continue_on: Condition = Condition.Success
113        ) -> _T.Tuple[str,str]:
114        """
115        Adds an edge between nodes in the DAG's underlying graph,
116        so long as the requested edge is unique and has not been added previously.
117
118        On success, returns Tuple of the new edge's unique identifiers.
119        """
120        # Validate inputs prior to execution
121        # - Nodes must be identifiers or callables
122        # - Conditions must belong to the webber.edges.Condition class
123        if not (isinstance(u_of_edge,str) or _iscallable(u_of_edge)):
124            err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable"
125            raise TypeError(err_msg)
126        if not (isinstance(v_of_edge,str) or _iscallable(v_of_edge)):
127            err_msg = f"Outgoing node {v_of_edge} must be a string or a Python callable"
128            raise TypeError(err_msg)
129        if not isinstance(continue_on, Condition):
130            raise TypeError("Edge conditions must use the webber.edges.Condition class.")
131
132        # Base Case 0: No nodes are present in the DAG:
133        # Ensure that both nodes are callables, then add both to the graph and
134        # assign the outgoing node as a root.
135        if len(self.graph.nodes()) == 0:
136            if not _iscallable(u_of_edge):
137                err_msg = f"Outgoing node {u_of_edge} is not defined in this DAG's scope."
138                raise ValueError(err_msg)
139            if not _iscallable(v_of_edge):
140                err_msg = f"Incoming node {v_of_edge} is not defined in this DAG's scope."
141                raise ValueError(err_msg)
142            outgoing_node = self.add_node(u_of_edge)
143            incoming_node = self.add_node(v_of_edge)
144            self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
145            return (outgoing_node, incoming_node)
146
147        node_names, node_callables = zip(*self.graph.nodes(data='callable'))
148        graph_edges = list(self.graph.edges(data=False))
149        new_callables = dict()
150
151        if _iscallable(u_of_edge) and _iscallable(v_of_edge):
152
153            # Error Cases 0, 1: Either of the callables appear more than once in the DAG.
154            if node_callables.count(u_of_edge) > 1:
155                err_msg = f"Outgoing callable {u_of_edge.__name__} " \
156                        + "exists more than once in this DAG. " \
157                        + "Use the unique identifier of the required node or add a new node."
158                raise ValueError(err_msg)
159            if node_callables.count(v_of_edge) > 1:
160                err_msg = f"Incoming callable {v_of_edge.__name__} " \
161                        + "exists more than once in this DAG. " \
162                        + "Use the unique string identifier of the required node."
163                raise ValueError(err_msg)
164            
165            # Base Case 1: Both args are callables and will be present in the DAG scope no more than once.
166            # We will create new nodes if necessary, after validation, and get the unique string identifiers of the nodes.
167            try:
168                outgoing_node = self.node_id(u_of_edge)
169            except:
170                new_callables[u_of_edge] = u_of_edge
171                outgoing_node = _edges.label_node(u_of_edge)
172            try:
173                incoming_node = self.node_id(v_of_edge)
174            except:
175                new_callables[v_of_edge] = v_of_edge
176                incoming_node = _edges.label_node(v_of_edge)
177
178        else:
179
180            # Error Cases 2, 3: Either of the requested IDs are not in the DAG's current scope.
181            if isinstance(u_of_edge, str) and u_of_edge not in node_names:
182                err_msg = f"Outgoing node {u_of_edge} not in DAG's current scope."
183                raise ValueError(err_msg)
184            if isinstance(v_of_edge, str) and v_of_edge not in node_names:
185                err_msg = f"Incoming node {v_of_edge} not in DAG's current scope."
186                raise ValueError(err_msg)
187
188            # Both nodes' unique identifiers are present in the DAG
189            # and should be evaluated for a valid edge.
190            if isinstance(u_of_edge, str) and isinstance(v_of_edge, str):
191                outgoing_node = u_of_edge
192                incoming_node = v_of_edge
193
194            # Otherwise, one of the nodes is a callable, and the other is a valid unique identifier.
195            else:
196                for node in (u_of_edge, v_of_edge):
197                    def _assign_node(n):
198                        # For the argument that is a unique string identifier, assign and continue.
199                        if not _iscallable(n):
200                            return n
201                        # Error Case 4: The requested callable exists more than once in the DAG.
202                        if node_callables.count(n) > 1:
203                            err_msg = f"Outgoing callable {n.__name__} " \
204                                + "exists more than once in this DAG. " \
205                                + "Use the unique ID of the required node or add a new node."
206                            raise ValueError(err_msg)
207                        # If the callable exists only once in the DAG, use its unique identifier to
208                        # evaluate the requested edge.
209                        if node_callables.count(n) == 1:
210                            return node_names[ node_callables.index(n) ]
211                        # Otherwise, the callable is new and needs to be added to the DAG scope.
212                        else:
213                            new_callables[n] = n
214                            return _edges.label_node(n)
215                    if node == u_of_edge:
216                        outgoing_node = _assign_node(node)
217                    else:
218                        incoming_node = _assign_node(node)
219
220        # Error Case 5: Both callables exist only once in the DAG,
221        # but an edge already exists between them.
222        if (outgoing_node, incoming_node) in graph_edges:
223            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) already has " \
224                    + "a definition in this DAG."
225            raise ValueError(err_msg)
226
227        # Ensure that no cycles will be created by adding this edge to the DAG.
228        test_edges: list = graph_edges + [(outgoing_node, incoming_node)]
229
230        # Error Case 6: Both callables exist only once in the DAG,
231        # but adding an edge between them creates circular dependencies.
232        if not _nx.is_directed_acyclic_graph(_nx.DiGraph(test_edges)):
233            err_msg = f"Requested edge ({outgoing_node}, {incoming_node}) " \
234                    + "results in circular dependencies."
235            raise ValueError(err_msg)
236
237        # We can now add the edge to the DAG, since we are certain it will not result in
238        # illegal dependencies/behavior.
239        # First, we should account for potential new nodes. This also handles
240        # duplicates on first entry to the DAG (e.g.: edge == (print, print))
241        if new_callables.get(u_of_edge) != None:
242            outgoing_node = self.add_node(new_callables[u_of_edge])
243        if new_callables.get(v_of_edge) != None:
244            incoming_node = self.add_node(new_callables[v_of_edge])
245
246        # Then we can add the new edge.
247        self.graph.add_edge(outgoing_node, incoming_node, Condition = continue_on)
248        return (outgoing_node, incoming_node)

Adds an edge between nodes in the DAG's underlying graph, so long as the requested edge is unique and has not been added previously.

On success, returns Tuple of the new edge's unique identifiers.

def remove_node(self, *posargs, **kwargs) -> None:
250    def remove_node(self, *posargs, **kwargs) -> None:
251        """
252        Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG.
253        Throws error message and recommends safer methods.
254        """
255        raise NotImplementedError("Node removals can lead to unexpected behavior in a DAG without special care. Please consider using the skip_node operation or define a new DAG to achieve the same effect.")

Currently out-of-scope. Node-removal can lead to unexpected behavior in a DAG. Throws error message and recommends safer methods.

def remove_edge( self, u_of_edge: Union[str, Callable], v_of_edge: Union[str, Callable]) -> Tuple[str, str]:
257    def remove_edge(self, u_of_edge: _T.Union[str,_T.Callable], v_of_edge: _T.Union[str,_T.Callable]) -> _T.Tuple[str,str]:
258        """
259        Removes an directed edge between nodes in the DAG's underlying graph.
260        Throws error if the edge does not exist.
261        On success, returns Tuple of the removed edge's unique identifiers.
262        """
263        edge_id = (self.node_id(u_of_edge), self.node_id(v_of_edge))
264        if edge_id not in self.graph.edges(data = False):
265            err_msg = "Requested edge does not exist in the DAG's scope"
266            raise ValueError(err_msg)
267        self.graph.remove_edge(edge_id[0], edge_id[1])
268        return edge_id

Removes an directed edge between nodes in the DAG's underlying graph. Throws error if the edge does not exist. On success, returns Tuple of the removed edge's unique identifiers.

def update_edges( self, *E, continue_on: Condition, filter: function = None, data=None):
270    def update_edges(self, *E, continue_on: Condition, filter: _types.LambdaType = None, data = None):
271        """
272        Flexible function to update properties of edges in the DAG's scope, 
273        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
274        lambda filter using the edgedict syntax.
275        
276        List of nodes to update or filter argument is expected. Valid edge lists include:
277
278        \t update_edges((node1, node2), ...)       or update_edges([(node1, node2)], ...)
279
280        \t update_edges((node1, callable2), ...)   or update_edges([(node1, callable2)], ...)
281        
282        \t update_edges(edgedict),                 where isinstance(edgedict, webber.edges.edgedict) == True
283
284        Parameters:
285
286        > continue_on: webber.edges.Condition value to update matching edges with, 
287        
288        \t Execute child on success, failure, or any exit state, of the parent node
289
290        > filter: lambda property that can be used instead of a list of edges to be updated.
291        
292        \t filter = (lambda e: n.parent == print or e.child == print)
293
294        > data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set.
295
296        \t data = { 'continue_on': webber.edges.Condition }
297
298        """
299        if len(E) == 0 and filter == None:
300            raise ValueError("Either an array of edge IDs / edgedicts (E) or a filter must be passed to this function.")
301
302        elif isinstance(E, dict) or isinstance(E, edgedict):
303            E = [E]
304
305        elif len(E) == 1 and isinstance(E[0], _abc.Iterable):
306            try:
307                _ = self.get_edges(E[0])
308            except:
309                E = E[0]
310
311        if filter != None:
312            edge_ids = self.filter_edges(filter, data = False)
313        else:
314            if isinstance(E[0], dict) or isinstance(E[0], edgedict):
315                try:
316                    ids = [e['id'] for e in E]
317                except KeyError:
318                    err_msg = 'In dictionary form, all given edges must be follow edgedict standards.'
319                    raise ValueError(err_msg)
320            else:
321                ids = E
322            edge_ids = [self.get_edge(i[0], i[1], data=False) for i in ids]
323
324        std_update = (continue_on == None)
325
326        if std_update:
327            for edge_id, e in zip(edge_ids, E):
328                if data != None:
329                    self._update_edge(data, id = edge_id)
330                else:
331                    self._update_edge(e, id = edge_id)
332        
333        else:
334            if continue_on != None:
335                if not isinstance(continue_on, Condition):
336                    err_msg = f"Condition assignment must use webber.edges.Condition"
337                    raise TypeError(err_msg)
338                for e in edge_ids:
339                    self.graph.edges[e]['Condition'] = continue_on

Flexible function to update properties of edges in the DAG's scope, based on unique identifier(s) (e.g.: string IDs or unique callables) or a lambda filter using the edgedict syntax.

List of nodes to update or filter argument is expected. Valid edge lists include:

     update_edges((node1, node2), ...)       or update_edges([(node1, node2)], ...)

     update_edges((node1, callable2), ...)   or update_edges([(node1, callable2)], ...)

     update_edges(edgedict),                 where isinstance(edgedict, webber.edges.edgedict) == True

Parameters:

continue_on: Condition value to update matching edges with,

     Execute child on success, failure, or any exit state, of the parent node

filter: lambda property that can be used instead of a list of edges to be updated.

     filter = (lambda e: n.parent == print or e.child == print)

data: If given, expects a dictionary or edgedict to update edge properties. Currently, only continue_on property should be set.

     data = { 'continue_on': Condition }
def relabel_node(self, node: Union[str, Callable], label: str):
386    def relabel_node(self, node: _T.Union[str, _T.Callable], label: str):
387        """
388        Update the label, or name, given to a node in the DAG's scope, 
389        given a Python string and a node identifier. 
390        Well-structured wrapper for a common use-case of DAG.update_nodes.
391        """
392        node_id = self.node_id(node)
393        if not isinstance(label, str) or len(label) == 0:
394            err_msg = "Node label must be a Python string with one or more characters."
395            raise ValueError(err_msg)
396        self.update_nodes(node_id, data = {'name': label})
397        return label

Update the label, or name, given to a node in the DAG's scope, given a Python string and a node identifier. Well-structured wrapper for a common use-case of DAG.update_nodes.

def update_nodes( self, *N, filter: function = None, data=None, callable=None, args=None, kwargs=None):
399    def update_nodes(self, *N, filter: _types.LambdaType = None, data = None, callable = None, args = None, kwargs = None):
400        """
401        Flexible function to update properties of nodes in the DAG's scope, 
402        based on unique identifier(s) (e.g.: string IDs or unique callables) or a
403        lambda filter using the dotdict syntax.
404        
405        List of nodes to update or filter argument is expected. Valid node lists include:
406
407        \t update_nodes(node_id, ...)           or update_nodes([node_id], ...)
408
409        \t update_nodes(callable, ...)          or update_nodes([callable], ...)
410        
411        \t update_nodes(node1, node2, ...)      or update_nodes([node1, node2], ...)
412        
413        \t update_nodes(node1, callable2, ...)  or update_nodes([node1, callable2], ...)
414
415        > update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id)
416
417        Parameters:
418
419        > filter: lambda property that can be used instead of a list of nodes to be updated.
420        
421        \t filter = (lambda n: n.callable == print or 'Hello, World' in n.args)
422
423        > data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set.
424            Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method.
425            
426        \t data = {
427            \t 'callable': print,
428            \t 'args': ['Hello', 'World'],
429            \t 'kwargs': {'sep': ', '},
430            \t 'name': 'custom_label',
431            \t 'id': 'unique-identifier'
432            \t}
433
434        > args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List).
435
436        > kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary.
437
438        """
439        if len(N) == 0 and filter == None:
440            raise ValueError("Either an array of node IDs or node data (N) or a filter must be passed to this function.")
441
442        elif len(N) > 0 and filter != None:
443            raise ValueError("Node data array (N) and filter argument are mutually exclusive, and cannot both be defined to identify nodes to update DAG's scope.")
444
445        elif isinstance(N, dict) or isinstance(N, str):
446            N = [N]
447
448        elif len(N) == 1 and isinstance(N[0], _abc.Iterable):
449            if isinstance(N[0][0], dict):
450                N = N[0]
451            elif isinstance(N[0][0], str):
452                # BUG: A list of all single character IDs will fail to be updated. Please try another call method (i.e.: nested iterator).
453                if sum(list(map(lambda n: len(n), N[0]))) != len(N[0]): 
454                    N = N[0]
455
456
457        if filter != None:
458            node_ids = self.filter_nodes(filter, data = False)
459        else:
460            if isinstance(N[0], dict):
461                ids = [n['id'] for n in N]
462            else:
463                ids = N
464            node_ids = [self.node_id(i) for i in ids]
465        
466        std_update = (callable == None) and (args == None) and (kwargs == None)
467
468        if std_update:
469            for node_id, n in zip(node_ids, N):
470                if data != None:
471                    self._update_node(data, id = node_id)
472                else:
473                    self._update_node(n, id = node_id)
474        
475        else:
476            if callable != None:
477                if not _iscallable(callable):
478                    err_msg = f"Requested node is not assigned a callable Python function."
479                    raise TypeError(err_msg)
480                for node_id in node_ids:
481                    self.graph.nodes[node_id]['callable'] = callable
482                    self.graph.nodes[node_id]['name'] = callable.__name__
483            
484            if args != None:
485                if not (isinstance(args, _abc.Iterable) and not isinstance(args, str)):
486                    err_msg = f"Requested node is not assigned a tuple of pos args."
487                    raise TypeError(err_msg)
488                args = tuple(args)
489                for node_id in node_ids:
490                    self.graph.nodes[node_id]['args'] = args
491            
492            if kwargs != None:
493                if not isinstance(kwargs, dict):
494                    err_msg = f"Requested node is not assigned a dictionary of kw args."
495                    raise TypeError(err_msg)
496                for node_id in node_ids:
497                    self.graph.nodes[node_id]['kwargs'] = kwargs

Flexible function to update properties of nodes in the DAG's scope, based on unique identifier(s) (e.g.: string IDs or unique callables) or a lambda filter using the dotdict syntax.

List of nodes to update or filter argument is expected. Valid node lists include:

     update_nodes(node_id, ...)           or update_nodes([node_id], ...)

     update_nodes(callable, ...)          or update_nodes([callable], ...)

     update_nodes(node1, node2, ...)      or update_nodes([node1, node2], ...)

     update_nodes(node1, callable2, ...)  or update_nodes([node1, callable2], ...)

update_nodes(node_id, ...) is equivalent to update_nodes(filter = lambda n: n.id == node_id)

Parameters:

filter: lambda property that can be used instead of a list of nodes to be updated.

     filter = (lambda n: n.callable == print or 'Hello, World' in n.args)

data: If given, expects a dictionary or dotdict to update node properties. At least one property should be defined if data argument is set. Any value given to the id key will be ignored. Allowed for ease of use with DAG.get nodes method.

     data = {
     'callable': print,
     'args': ['Hello', 'World'],
     'kwargs': {'sep': ', '},
     'name': 'custom_label',
     'id': 'unique-identifier'
    }

args: Positional arguments to be passed to matching callables in the DAG's scope, using a Python iterable (e.g.: Tuple or List).

kwargs: Keyword arguments to be passed to matching callables in the DAG's scope, using a Python dictionary.

def get_edges( self, *N, data: bool = True) -> Union[list[webber.edges.edgedict], list[tuple]]:
499    def get_edges(self, *N, data: bool = True) -> _T.Union[list[edgedict], list[tuple]]:
500        """
501        Retrieval function for DAG edge data, based on tuple identifiers.
502        Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2']))
503        """
504        if len(N) == 0:
505            if data == True:
506                return list(map(edgedict, self.graph.edges.data()))
507            return list(self.graph.edges.data(data=False))
508            
509        # elif len(N) == 1:
510        #     if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], tuple):
511        #         N = N[0]
512
513        if len(N) != len(set(N)) or False in map(lambda n: isinstance(n, _abc.Iterable) and len(n) == 2, N):
514            err_msg = 'All requested edges must be unique tuples of size 2.'
515            raise ValueError(err_msg)
516    
517        edge_data = [self.get_edge(o, i, data=data) for (o, i) in N]
518        return edge_data

Retrieval function for DAG edge data, based on tuple identifiers. Use filter_edges for more flexible controls (e.g.: filter_edges(in=['node_1', 'node_2']))

def get_edge( self, outgoing_node: Union[str, <built-in function callable>], incoming_node: Union[str, <built-in function callable>], data: bool = True) -> Union[webber.edges.edgedict, tuple]:
520    def get_edge(self, outgoing_node: _T.Union[str, callable], incoming_node: _T.Union[str, callable], data: bool = True) -> _T.Union[edgedict, tuple]:
521        """
522        Retrieval function for a single directed edge between nodes in a DAG's scope. 
523        """
524        id = (self.node_id(outgoing_node), self.node_id(incoming_node))
525        if not data:
526            return id
527        edge_data = self.graph.get_edge_data(u = id[0], v = id[1])
528        if not edge_data:
529            err_msg = f'No match found for the directed edge requested: {id}'
530            raise ValueError(err_msg)
531        return edgedict(*id, **edge_data)

Retrieval function for a single directed edge between nodes in a DAG's scope.

def get_node( self, n: Union[str, <built-in function callable>]) -> webber.edges.dotdict:
533    def get_node(self, n: _T.Union[str, callable]) -> dotdict:
534        """
535        Given a unique identifier, returns a dictionary of node metadata
536        for a single node in the DAG's scope.
537        """
538        node_id = self.node_id(n)
539        return dotdict(self.graph.nodes[node_id])

Given a unique identifier, returns a dictionary of node metadata for a single node in the DAG's scope.

def get_nodes(self, *N) -> list[webber.edges.dotdict]:
541    def get_nodes(self, *N) -> list[dotdict]:
542        """
543        Flexible function to retrieve DAG node data, based on node identifiers 
544        (e.g.: string IDs or unique callables).
545        """
546        if len(N) == 0:
547            node_data = list(self.graph.nodes.values())
548            return [dotdict(d) for d in node_data]
549
550        elif len(N) == 1:
551            if isinstance(N[0], _abc.Iterable) and not isinstance(N[0], str):
552                N = N[0]
553            else:
554                node_id = self.node_id(N[0])
555                node_data = [dotdict(self.graph.nodes[node_id])]
556                return node_data
557            
558        if not len(N) == len(set(N)):
559            err_msg = 'All requested nodes must be unique identifiers.'
560            raise ValueError(err_msg)
561        
562        node_ids  = [self.node_id(n) for n in N]
563        node_data = [dotdict(self.graph.nodes[n]) for n in node_ids]
564        return node_data

Flexible function to retrieve DAG node data, based on node identifiers (e.g.: string IDs or unique callables).

def filter_nodes(self, filter: function, data: bool = False):
566    def filter_nodes(self, filter: _types.LambdaType, data: bool = False):
567        """
568        Given a lambda function, filter nodes in a DAG's scope based on its attributes.
569        Current limitation: Filters must use node identifier strings when referencing nodes.
570        Use get_nodes for more flexible controls.
571        """
572        if not data:
573            return [node['id'] for node in self.graph.nodes.values() if filter(dotdict(node))]
574        return [dotdict(node) for node in self.graph.nodes.values() if filter(dotdict(node))]

Given a lambda function, filter nodes in a DAG's scope based on its attributes. Current limitation: Filters must use node identifier strings when referencing nodes. Use get_nodes for more flexible controls.

def filter_edges( self, filter=<class 'function'>, data: bool = False) -> list[webber.edges.edgedict]:
576    def filter_edges(self, filter = _types.LambdaType, data: bool = False) -> list[edgedict]:
577        """
578        Given a lambda function, filter edges in a DAG's scope based on its attributes.
579        Current limitation: Filters must use node identifier strings when referencing nodes.
580        Use get_edges for more flexible controls.
581        """
582        if not data:
583            return [e[:2] for e in list(self.graph.edges.data()) if filter(edgedict(*e))]
584        return [edgedict(e) for e in list(self.graph.edges.data()) if filter(edgedict(*e))]

Given a lambda function, filter edges in a DAG's scope based on its attributes. Current limitation: Filters must use node identifier strings when referencing nodes. Use get_edges for more flexible controls.

def retry_node(self, identifier: Union[str, Callable], count: int):
586    def retry_node(self, identifier: _T.Union[str,_T.Callable], count: int):
587        """
588        Given a node identifier, set number of automatic retries in case of failure.
589        Re-attempts will begin as soon as possible.
590        """
591        if not isinstance(count, int) or not count >= 0:
592            raise ValueError("Retry count must be a non-negative integer.")
593        node = self.node_id(identifier)
594        self.graph.nodes[node]['retry'] = count

Given a node identifier, set number of automatic retries in case of failure. Re-attempts will begin as soon as possible.

def skip_node( self, identifier: Union[str, Callable], skip: bool = True, as_failure=False):
596    def skip_node(self, identifier: _T.Union[str,_T.Callable], skip: bool = True, as_failure = False):
597        """
598        Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error).
599        Allows conditional control and testing over DAG's order of operations.
600        """
601        if not isinstance(skip, bool):
602            raise ValueError("Skip argument must be a boolean value.")
603        node = self.node_id(identifier)
604        self.graph.nodes[node]['skip'] = (skip, as_failure)

Given a node identifier, set DAG to skip node execution as a success (stdout print) or a failure (exception error). Allows conditional control and testing over DAG's order of operations.

def critical_path(self, nodes):
606    def critical_path(self, nodes):
607        """
608        Given a set of nodes, returns a subset of the DAG containing
609        only the node(s) and its parents, or upstream dependencies.
610        """
611        if isinstance(nodes, _abc.Iterable) and not isinstance(nodes, str):
612            node_ids = {self.node_id(n) for n in nodes}
613        else:
614            node_ids = {self.node_id(nodes)}
615        return self._subgraph(node_ids)

Given a set of nodes, returns a subset of the DAG containing only the node(s) and its parents, or upstream dependencies.

def execute(self, return_ref=False, print_exc=False):
617    def execute(self, return_ref=False, print_exc=False):
618        """
619        Basic wrapper for execution of the DAG's underlying callables.
620        """
621        executor = self.DAGExecutor(self.graph, self.root, print_exc)
622        return executor if return_ref else None

Basic wrapper for execution of the DAG's underlying callables.

def visualize(self, type: Literal['gui', 'browser', 'plt'] = None):
624    def visualize(self, type: _T.Literal['gui', 'browser', 'plt'] = None):
625        """
626        Basic wrapper to visualize DAG using Vis.js and NetGraph libraries.
627        By default, visualization library only loaded in after DAG.visualize() is called, halving import times.
628        """
629        import webber.viz as _viz
630
631        match type:
632            case 'browser':
633                _viz.visualize_browser(self.graph)
634
635            case 'plt':
636                return _viz.visualize_plt(self.graph)
637
638            case 'gui':
639                # _visualize_gui(self.graph)
640                raise NotImplementedError
641
642            case None: 
643                if _viz._in_notebook():
644                    return _viz.visualize_plt(self.graph)
645                else:
646                    _viz.visualize_browser(self.graph)
647
648            case _:
649                err_msg = "Unknown visualization type requested."
650                raise NotImplementedError(err_msg)

Basic wrapper to visualize DAG using Vis.js and NetGraph libraries. By default, visualization library only loaded in after DAG.visualize() is called, halving import times.

root: list[str]
652    @property
653    def root(self) -> list[str]:
654        """
655        Return list of nodes with no dependencies.
656        Root nodes will occur first in DAG's order of operations.
657        """
658        return list(filter(
659            lambda node: len(list(self.graph.predecessors(node))) < 1,
660            self.graph.nodes.keys()
661        ))

Return list of nodes with no dependencies. Root nodes will occur first in DAG's order of operations.

nodes
663    @property
664    def nodes(self):
665        return self.graph.nodes
def node_id(self, identifier: Union[str, Callable]) -> str:
667    def node_id(self, identifier: _T.Union[str,_T.Callable]) -> str:
668        """
669        Validate whether identifier given is a valid node within the DAG's scope.
670        Primarily for internal use, but useful for retrieving string identifiers
671        for a unique callable in a DAG.
672        """
673        node_names, node_callables = zip(*self.graph.nodes(data='callable'))
674
675        if isinstance(identifier, str):
676            if identifier not in node_names:
677                err_msg = f"Node {identifier} is not defined in this DAG's scope."
678                raise ValueError(err_msg)
679            node = identifier
680        elif _iscallable(identifier):
681            match node_callables.count(identifier):
682                case 0:
683                    err_msg = f"Callable {identifier} is not defined in this DAG's scope."
684                    raise ValueError(err_msg)
685                case 1:
686                    node = node_names[ node_callables.index(identifier) ]
687                case _:
688                    err_msg = f"Callable {identifier.__name__} " \
689                            + "exists more than once in this DAG. " \
690                            + "Use the unique identifier of the required node."
691                    raise ValueError(err_msg)
692        else:            
693            err_msg = f"Node {identifier} must be a string or a Python callable"
694            raise TypeError(err_msg)
695        return node

Validate whether identifier given is a valid node within the DAG's scope. Primarily for internal use, but useful for retrieving string identifiers for a unique callable in a DAG.

def resolve_promise(self, promise: webber.xcoms.Promise) -> webber.xcoms.Promise:
761    def resolve_promise(self, promise: _xcoms.Promise) -> _xcoms.Promise:
762        """
763        Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope.
764        Raises `webber.xcoms.InvalidCallable` if Promise requests a callable that is out of scope.
765        """
766        try:
767            key = self.node_id(promise.key)
768        except Exception as e:
769            raise _xcoms.InvalidCallable(e)
770        return _xcoms.Promise(key)

Returns a Promise with a unique string identifier, if a given Promise is valid, based on the DAG's current scope. Raises webber.xcoms.InvalidCallable if Promise requests a callable that is out of scope.

graph
class DAG.DAGExecutor:
805    class DAGExecutor:
806        """
807        Base class used to execute DAG in embarrassingly parallel.
808        """
809        def __init__(self, graph: _nx.DiGraph, roots: list, print_exc: bool = False) -> None:
810            
811            with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _:
812                
813                # Skip execution if there are no callables in scope.
814                if len(graph.nodes) == 0:
815                    print('Given DAG has no callables in scope. Skipping execution...')
816                    return
817
818                # Initialize local variables for execution.
819                complete, started, failed, skipped = set(), set(), set(), set()
820                events = set(roots)
821                refs: dict[str, _futures.Future] = {}
822
823                def raise_exc(message):
824                    raise ValueError(message)
825
826                def run_conditions_met(n):
827                    for p in graph.predecessors(n):
828                        match graph.edges.get((p, n))['Condition']:
829                            case Condition.Success:
830                                if p not in complete:
831                                    return False
832                            case Condition.Failure:
833                                if p not in failed:
834                                    return False
835                            case Condition.AnyCase:
836                                if p not in failed and p not in complete:
837                                    return False
838                    return True
839
840                skip  = graph.nodes.data("skip", default=(False, False))
841                retry = {n: [c+1, {}] for n,c in graph.nodes.data("retry", default=0)}
842
843                # Start execution of root node functions.
844                with _futures.ThreadPoolExecutor() as executor:
845
846                    def Submit(event, callable, name, args, kwargs):
847                        if skip[event][0]: # NOTE: Internally, DAGExecutor tracks these skipped events as successes or failures.
848                            retry[event][0] = 0
849                            skip_callable = raise_exc if skip[event][1] else print
850                            return executor.submit(
851                                _event_wrapper,
852                                _callable=skip_callable,
853                                _name=graph.nodes[event]['name'],
854                                _args=[f"Event {event} skipped..."],
855                                _kwargs={}
856                            )
857                        else:
858                            retry[event][0] -= 1
859                            if (retry[event][0] > 0) and (retry[event][1] == {}):
860                                retry[event][1] = {
861                                    'callable': callable,
862                                    'name': name,
863                                    'args': args,
864                                    'kwargs': kwargs
865                                }
866                            return executor.submit(
867                                _event_wrapper,
868                                _callable=callable,
869                                _name=name,
870                                _args=args,
871                                _kwargs=kwargs
872                            )
873
874                    for event in events:
875                        refs[event] = Submit(
876                            event,
877                            graph.nodes[event]['callable'],
878                            graph.nodes[event]['name'],
879                            graph.nodes[event]['args'],
880                            graph.nodes[event]['kwargs']
881                        )
882                        started.add(event)
883                    # Loop until all nodes in the network are executed.
884                    while (len(complete) + len(failed) + len(skipped)) != len(graph):
885                        for event in events:
886                            if refs[event].done() is True:
887                                if refs[event].exception(timeout=0) is not None:
888                                    try:
889                                        raise refs[event].exception(timeout=0)
890                                    except:
891                                        if print_exc:
892                                            _traceback.print_exc()
893                                        
894                                        if retry[event][0] > 0:
895                                            print(f"Event {event} exited with exception, retrying...")
896                                            refs[event] = Submit(
897                                                event,
898                                                callable=retry[event][1]['callable'],
899                                                name=retry[event][1]['name'],
900                                                args=retry[event][1]['args'],
901                                                kwargs=retry[event][1]['kwargs']
902                                            )
903                                            continue                                            
904
905                                        print(f"Event {event} exited with exception...")
906                                        failed.add(event)
907                                        skipping = [
908                                            e[1] for e in set(graph.out_edges(event)) 
909                                            if not _edges.continue_on_failure(graph.edges.get(e))
910                                        ]
911                                else:
912                                    complete.add(event)
913                                    skipping = [
914                                        e[1] for e in set(graph.out_edges(event)) 
915                                        if not _edges.continue_on_success(graph.edges.get(e))
916                                    ]
917                                skipped  = skipped.union(skipping)
918                                for n in skipping:
919                                    skipped  = skipped.union(_nx.descendants(graph, n))
920                                carryon  = set(graph.successors(event)).difference(skipped)
921                                starting = [
922                                    successor for successor in carryon if
923                                    run_conditions_met(successor)
924                                ]
925                                for successor in starting:
926                                    _args = [
927                                        a if not isinstance(a, _xcoms.Promise) else refs[a.key].result()
928                                        for a in graph.nodes[successor]['args']
929                                    ]
930                                    _kwargs = {
931                                        k: v if not isinstance(v, _xcoms.Promise) else refs[v.key].result()
932                                        for k, v in graph.nodes[successor]['kwargs'].items()
933                                    }
934                                    refs[successor] = Submit(
935                                        successor,
936                                        graph.nodes[successor]['callable'],
937                                        graph.nodes[successor]['name'],
938                                        _args,
939                                        _kwargs
940                                    )
941                                    started.add(successor)
942                        # Initialized nodes that are incomplete or not yet documented as complete.
943                        events = started.difference(complete.union(failed).union(skipped))

Base class used to execute DAG in embarrassingly parallel.

DAG.DAGExecutor( graph: networkx.classes.digraph.DiGraph, roots: list, print_exc: bool = False)
809        def __init__(self, graph: _nx.DiGraph, roots: list, print_exc: bool = False) -> None:
810            
811            with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _:
812                
813                # Skip execution if there are no callables in scope.
814                if len(graph.nodes) == 0:
815                    print('Given DAG has no callables in scope. Skipping execution...')
816                    return
817
818                # Initialize local variables for execution.
819                complete, started, failed, skipped = set(), set(), set(), set()
820                events = set(roots)
821                refs: dict[str, _futures.Future] = {}
822
823                def raise_exc(message):
824                    raise ValueError(message)
825
826                def run_conditions_met(n):
827                    for p in graph.predecessors(n):
828                        match graph.edges.get((p, n))['Condition']:
829                            case Condition.Success:
830                                if p not in complete:
831                                    return False
832                            case Condition.Failure:
833                                if p not in failed:
834                                    return False
835                            case Condition.AnyCase:
836                                if p not in failed and p not in complete:
837                                    return False
838                    return True
839
840                skip  = graph.nodes.data("skip", default=(False, False))
841                retry = {n: [c+1, {}] for n,c in graph.nodes.data("retry", default=0)}
842
843                # Start execution of root node functions.
844                with _futures.ThreadPoolExecutor() as executor:
845
846                    def Submit(event, callable, name, args, kwargs):
847                        if skip[event][0]: # NOTE: Internally, DAGExecutor tracks these skipped events as successes or failures.
848                            retry[event][0] = 0
849                            skip_callable = raise_exc if skip[event][1] else print
850                            return executor.submit(
851                                _event_wrapper,
852                                _callable=skip_callable,
853                                _name=graph.nodes[event]['name'],
854                                _args=[f"Event {event} skipped..."],
855                                _kwargs={}
856                            )
857                        else:
858                            retry[event][0] -= 1
859                            if (retry[event][0] > 0) and (retry[event][1] == {}):
860                                retry[event][1] = {
861                                    'callable': callable,
862                                    'name': name,
863                                    'args': args,
864                                    'kwargs': kwargs
865                                }
866                            return executor.submit(
867                                _event_wrapper,
868                                _callable=callable,
869                                _name=name,
870                                _args=args,
871                                _kwargs=kwargs
872                            )
873
874                    for event in events:
875                        refs[event] = Submit(
876                            event,
877                            graph.nodes[event]['callable'],
878                            graph.nodes[event]['name'],
879                            graph.nodes[event]['args'],
880                            graph.nodes[event]['kwargs']
881                        )
882                        started.add(event)
883                    # Loop until all nodes in the network are executed.
884                    while (len(complete) + len(failed) + len(skipped)) != len(graph):
885                        for event in events:
886                            if refs[event].done() is True:
887                                if refs[event].exception(timeout=0) is not None:
888                                    try:
889                                        raise refs[event].exception(timeout=0)
890                                    except:
891                                        if print_exc:
892                                            _traceback.print_exc()
893                                        
894                                        if retry[event][0] > 0:
895                                            print(f"Event {event} exited with exception, retrying...")
896                                            refs[event] = Submit(
897                                                event,
898                                                callable=retry[event][1]['callable'],
899                                                name=retry[event][1]['name'],
900                                                args=retry[event][1]['args'],
901                                                kwargs=retry[event][1]['kwargs']
902                                            )
903                                            continue                                            
904
905                                        print(f"Event {event} exited with exception...")
906                                        failed.add(event)
907                                        skipping = [
908                                            e[1] for e in set(graph.out_edges(event)) 
909                                            if not _edges.continue_on_failure(graph.edges.get(e))
910                                        ]
911                                else:
912                                    complete.add(event)
913                                    skipping = [
914                                        e[1] for e in set(graph.out_edges(event)) 
915                                        if not _edges.continue_on_success(graph.edges.get(e))
916                                    ]
917                                skipped  = skipped.union(skipping)
918                                for n in skipping:
919                                    skipped  = skipped.union(_nx.descendants(graph, n))
920                                carryon  = set(graph.successors(event)).difference(skipped)
921                                starting = [
922                                    successor for successor in carryon if
923                                    run_conditions_met(successor)
924                                ]
925                                for successor in starting:
926                                    _args = [
927                                        a if not isinstance(a, _xcoms.Promise) else refs[a.key].result()
928                                        for a in graph.nodes[successor]['args']
929                                    ]
930                                    _kwargs = {
931                                        k: v if not isinstance(v, _xcoms.Promise) else refs[v.key].result()
932                                        for k, v in graph.nodes[successor]['kwargs'].items()
933                                    }
934                                    refs[successor] = Submit(
935                                        successor,
936                                        graph.nodes[successor]['callable'],
937                                        graph.nodes[successor]['name'],
938                                        _args,
939                                        _kwargs
940                                    )
941                                    started.add(successor)
942                        # Initialized nodes that are incomplete or not yet documented as complete.
943                        events = started.difference(complete.union(failed).union(skipped))
class Condition(enum.IntEnum):
12class Condition(_enum.IntEnum):
13    """Represents edge condition for a node execution, based on outcome(s) of predecessor(s)."""
14    Success = 0
15    Failure = 1
16    AnyCase = 3

Represents edge condition for a node execution, based on outcome(s) of predecessor(s).

Success = <Condition.Success: 0>
Failure = <Condition.Failure: 1>
AnyCase = <Condition.AnyCase: 3>
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
is_integer
real
imag
numerator
denominator
class QueueDAG(DAG):
 945class QueueDAG(DAG):
 946    """
 947    #### Experimental, as of v0.2. ####
 948
 949    Directed Acyclic Graph used to queue and execute Pythonic callables in parallel,
 950    while stringing the outputs of those callables in linear sequences.
 951
 952    Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the 
 953    behavior of root nodes to determine if and/or when the DAG run has been completed.
 954
 955    Root nodes will be re-executed until culled by one of two conditions:
 956    1. A max number of iterations has been completed, or
 957    2. The output of the root node's callable matches a lambda halt_condition.
 958
 959    Both conditions can be set at run-time.
 960
 961    As of v0.2, QueueDAG experiences extreme latency when nested inside of a standard webber.DAG class.
 962    """
 963
 964    conditions = {}
 965
 966    def __init__(self):
 967        super().__init__()
 968
 969    def add_node(self, node, *args, **kwargs):
 970        """
 971        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
 972        On success, return unique identifier for the new node.
 973        
 974        Reserved key-words are used for Queue DAG definitions:
 975        
 976        - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables.
 977
 978        \t halt_condition = (lambda output: output == None) 
 979
 980        - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument.
 981
 982        - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop.
 983        """
 984        halt_condition = kwargs.pop('halt_condition', None)
 985        iterator: int = kwargs.pop('iterator', None)
 986        max_iter: int = kwargs.pop('max_iter', None)
 987
 988        return_val = super().add_node(node, *args, **kwargs)
 989        
 990        if max_iter != None:
 991            iter_limit = int(max_iter)
 992        elif iterator != None:
 993            iter_limit = int(iterator)
 994        else:
 995            iter_limit = None
 996        
 997        self.conditions[return_val] = {
 998            'halt_condition': halt_condition,
 999            'iter_limit': iter_limit
1000        }
1001
1002        return return_val
1003        
1004    def add_edge(self, u_of_edge, v_of_edge, continue_on = Condition.Success):
1005        """
1006        Adds an edge between nodes in the Queue DAG's underlying graph.
1007        Queue DAG nodes may have a maximum of one child and one parent worker.
1008        """
1009        for node in (u_of_edge, v_of_edge):
1010            try:
1011                node_id = self.node_id(node)
1012            except:
1013                continue
1014        
1015            filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id)
1016            try:
1017                assert(len(self.filter_edges(filter)) == 0)
1018            except Exception as e:
1019                e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.")
1020                raise e
1021
1022        return super().add_edge(u_of_edge, v_of_edge, continue_on)
1023
1024    def execute(self, *promises, return_ref=False, print_exc=False):
1025        """
1026        Basic wrapper for execution of the DAG's underlying callables.
1027        """
1028        queues = {}
1029        processes = {}
1030        join = set()
1031        
1032        promises: dict = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {}
1033
1034        with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _:
1035
1036            # Skip execution if there are no callables in scope.
1037            if len(self.graph.nodes) == 0:
1038                print('Given DAG has no callables in scope. Skipping execution...')
1039                return
1040
1041            with _futures.ThreadPoolExecutor() as executor:
1042
1043                for id in self.root:
1044                    node = self.get_node(id)
1045                    queues[id] = _q.LifoQueue()
1046                    node.update({
1047                        'callable': _queue._worker,
1048                        'args': tuple(),
1049                        'kwargs': {
1050                            'work': node.callable,
1051                            'args': node.args, 'kwargs': node.kwargs,
1052                            'promises': promises,
1053                            'print_exc': print_exc,
1054                            'halt_condition': self.conditions[id]['halt_condition'],
1055                            'iter_limit': self.conditions[id]['iter_limit'],
1056                            'out_queue': queues.get(id)
1057                        }
1058                    })
1059                    processes[id] = executor.submit(
1060                        _event_wrapper,
1061                        _callable=node['callable'],
1062                        _name=node['name'],
1063                        _args=node['args'],
1064                        _kwargs=node['kwargs']
1065                    )
1066                
1067                for parent_id, id in self.graph.edges:
1068                    node = self.get_node(id)
1069                    queues[id] = _q.LifoQueue()
1070                    if len(list(self.graph.successors(id))) == 0:
1071                        end_proc = id
1072                    node.update({
1073                        'callable': _queue._worker,
1074                        'args': tuple(),
1075                        'kwargs': {
1076                            'work': node.callable,
1077                            'args': node.args, 'kwargs': node.kwargs,
1078                            'promises': promises,
1079                            'print_exc': print_exc,
1080                            'parent_id': parent_id,
1081                            'parent_process': processes[parent_id],
1082                            'in_queue': queues.get(parent_id),
1083                            'out_queue': queues.get(id)
1084                        }
1085                    })
1086                    processes[id] = executor.submit(
1087                        _event_wrapper,
1088                        _callable=node['callable'],
1089                        _name=node['name'],
1090                        _args=node['args'],
1091                        _kwargs=node['kwargs']
1092                    )
1093
1094            
1095            while len(join) != len(self.graph.nodes):
1096                for node in self.graph.nodes:
1097                    if processes[node].done():
1098                        join.add(node)
1099            
1100            return_val = []
1101            while not queues[end_proc].empty():
1102                return_val.append(queues[end_proc].get())
1103            
1104            return return_val

Experimental, as of v0.2.

Directed Acyclic Graph used to queue and execute Pythonic callables in parallel, while stringing the outputs of those callables in linear sequences.

Queue DAG nodes are repeated until the DAG executor completes or is killed, depending on the behavior of root nodes to determine if and/or when the DAG run has been completed.

Root nodes will be re-executed until culled by one of two conditions:

  1. A max number of iterations has been completed, or
  2. The output of the root node's callable matches a lambda halt_condition.

Both conditions can be set at run-time.

As of v0.2, QueueDAG experiences extreme latency when nested inside of a standard webber.DAG class.

conditions = {}
def add_node(self, node, *args, **kwargs):
 969    def add_node(self, node, *args, **kwargs):
 970        """
 971        Adds a callable with positional and keyword arguments to the DAG's underlying graph.
 972        On success, return unique identifier for the new node.
 973        
 974        Reserved key-words are used for Queue DAG definitions:
 975        
 976        - halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables.
 977
 978        \t halt_condition = (lambda output: output == None) 
 979
 980        - iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument.
 981
 982        - max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop.
 983        """
 984        halt_condition = kwargs.pop('halt_condition', None)
 985        iterator: int = kwargs.pop('iterator', None)
 986        max_iter: int = kwargs.pop('max_iter', None)
 987
 988        return_val = super().add_node(node, *args, **kwargs)
 989        
 990        if max_iter != None:
 991            iter_limit = int(max_iter)
 992        elif iterator != None:
 993            iter_limit = int(iterator)
 994        else:
 995            iter_limit = None
 996        
 997        self.conditions[return_val] = {
 998            'halt_condition': halt_condition,
 999            'iter_limit': iter_limit
1000        }
1001
1002        return return_val

Adds a callable with positional and keyword arguments to the DAG's underlying graph. On success, return unique identifier for the new node.

Reserved key-words are used for Queue DAG definitions:

  • halt_condition: Lambda function used to halt repeated execution of a Queue DAG node that is independent of other callables.

     halt_condition = (lambda output: output == None)
    
  • iterator: Discrete number of times that Queue DAG node should be executed. Meant to be mutually-exclusive of halt_condition argument.

  • max_iter: Maximum number of times that Queue DAG node should be executed. Meant for use with halt_condition in order to prevent forever loop.

def add_edge(self, u_of_edge, v_of_edge, continue_on=<Condition.Success: 0>):
1004    def add_edge(self, u_of_edge, v_of_edge, continue_on = Condition.Success):
1005        """
1006        Adds an edge between nodes in the Queue DAG's underlying graph.
1007        Queue DAG nodes may have a maximum of one child and one parent worker.
1008        """
1009        for node in (u_of_edge, v_of_edge):
1010            try:
1011                node_id = self.node_id(node)
1012            except:
1013                continue
1014        
1015            filter = (lambda e: e.parent == node_id) if node == u_of_edge else (lambda e: e.child == node_id)
1016            try:
1017                assert(len(self.filter_edges(filter)) == 0)
1018            except Exception as e:
1019                e.add_note("Queue DAG nodes may have a maximum of one child and one parent worker.")
1020                raise e
1021
1022        return super().add_edge(u_of_edge, v_of_edge, continue_on)

Adds an edge between nodes in the Queue DAG's underlying graph. Queue DAG nodes may have a maximum of one child and one parent worker.

def execute(self, *promises, return_ref=False, print_exc=False):
1024    def execute(self, *promises, return_ref=False, print_exc=False):
1025        """
1026        Basic wrapper for execution of the DAG's underlying callables.
1027        """
1028        queues = {}
1029        processes = {}
1030        join = set()
1031        
1032        promises: dict = { k: v for k, v in _it.pairwise(promises) } if len(promises) > 0 else {}
1033
1034        with _OutputLogger(str(_uuid.uuid4()), "INFO", "root") as _:
1035
1036            # Skip execution if there are no callables in scope.
1037            if len(self.graph.nodes) == 0:
1038                print('Given DAG has no callables in scope. Skipping execution...')
1039                return
1040
1041            with _futures.ThreadPoolExecutor() as executor:
1042
1043                for id in self.root:
1044                    node = self.get_node(id)
1045                    queues[id] = _q.LifoQueue()
1046                    node.update({
1047                        'callable': _queue._worker,
1048                        'args': tuple(),
1049                        'kwargs': {
1050                            'work': node.callable,
1051                            'args': node.args, 'kwargs': node.kwargs,
1052                            'promises': promises,
1053                            'print_exc': print_exc,
1054                            'halt_condition': self.conditions[id]['halt_condition'],
1055                            'iter_limit': self.conditions[id]['iter_limit'],
1056                            'out_queue': queues.get(id)
1057                        }
1058                    })
1059                    processes[id] = executor.submit(
1060                        _event_wrapper,
1061                        _callable=node['callable'],
1062                        _name=node['name'],
1063                        _args=node['args'],
1064                        _kwargs=node['kwargs']
1065                    )
1066                
1067                for parent_id, id in self.graph.edges:
1068                    node = self.get_node(id)
1069                    queues[id] = _q.LifoQueue()
1070                    if len(list(self.graph.successors(id))) == 0:
1071                        end_proc = id
1072                    node.update({
1073                        'callable': _queue._worker,
1074                        'args': tuple(),
1075                        'kwargs': {
1076                            'work': node.callable,
1077                            'args': node.args, 'kwargs': node.kwargs,
1078                            'promises': promises,
1079                            'print_exc': print_exc,
1080                            'parent_id': parent_id,
1081                            'parent_process': processes[parent_id],
1082                            'in_queue': queues.get(parent_id),
1083                            'out_queue': queues.get(id)
1084                        }
1085                    })
1086                    processes[id] = executor.submit(
1087                        _event_wrapper,
1088                        _callable=node['callable'],
1089                        _name=node['name'],
1090                        _args=node['args'],
1091                        _kwargs=node['kwargs']
1092                    )
1093
1094            
1095            while len(join) != len(self.graph.nodes):
1096                for node in self.graph.nodes:
1097                    if processes[node].done():
1098                        join.add(node)
1099            
1100            return_val = []
1101            while not queues[end_proc].empty():
1102                return_val.append(queues[end_proc].get())
1103            
1104            return return_val

Basic wrapper for execution of the DAG's underlying callables.