webber.edges

Helper class for edge and DAG validation logic.

  1"""
  2Helper class for edge and DAG validation logic.
  3"""
  4import typing as _T
  5import uuid as _uuid
  6import networkx as _nx
  7import enum as _enum
  8
  9__all__ = ["valid_node", "valid_nodes", "valid_dag", "validate_nodes", "label_node"]
 10
 11class Condition(_enum.IntEnum):
 12    """Represents edge condition for a node execution, based on outcome(s) of predecessor(s)."""
 13    Success = 0
 14    Failure = 1
 15    AnyCase = 3
 16
 17class dotdict(dict):
 18    """dot.notation access to dictionary attributes"""
 19    __getattr__ = dict.get
 20    __setattr__ = dict.__setitem__
 21    __delattr__ = dict.__delitem__
 22
 23class edgedict(dotdict):
 24    """Dictionary subclass for representing DAG edges with dot notation access.
 25
 26    Provides convenient access to edge properties: parent, child, id, and Condition.
 27    """
 28    super(dotdict)
 29    def __init__(self, *E: _T.Any, **kwargs: _T.Any) -> None:
 30        """Initialize edge dictionary with parent/child nodes and optional attributes."""
 31        super().__init__({'parent': E[0], 'child': E[1], 'id': E[:2]})
 32        self.update(kwargs)
 33
 34def continue_on_failure(edge: _T.Dict[str, _T.Any]) -> bool:
 35    """Check edge condition for whether to continue on parent node's failure."""
 36    return edge['Condition'] in (Condition.Failure, Condition.AnyCase)
 37
 38def continue_on_success(edge: _T.Dict[str, _T.Any]) -> bool:
 39    """Check edge condition for whether to continue on parent node's success."""
 40    return edge['Condition'] in (Condition.Success, Condition.AnyCase)
 41
 42def label_node(node: _T.Callable) -> str:
 43    """Generates unique identifiers for Python callables in a DAG using UUIDs."""
 44    return f"{node.__name__}__{_uuid.uuid4()}"
 45
 46def get_root(graph: _nx.DiGraph) -> _T.List[str]:
 47    """Given a network graph, return list of all nodes without incoming edges or dependencies.
 48    Uses O(1) in_degree() instead of O(k) predecessors list creation."""
 49    return [node for node in graph.nodes if graph.in_degree(node) == 0]
 50
 51# TODO: Refactor logic for DAG and node validation.
 52
 53def valid_node(node: str | _T.Callable) -> bool:
 54    """Check whether given identifier represents a valid node (string or callable)."""
 55    return (isinstance(node,str) or callable(node))
 56
 57def valid_nodes(u_of_edge: str | _T.Callable, v_of_edge: str | _T.Callable) -> bool:
 58    """Check whether parent and child nodes represent valid nodes (string or callable)."""
 59    return valid_node(u_of_edge) and valid_node(v_of_edge)
 60
 61def validate_nodes(u_of_edge: str | _T.Callable, v_of_edge: str | _T.Callable) -> bool:
 62    """
 63    Given parent and child identifiers, validate that both represent valid nodes.
 64    Otherwise raise exceptions.
 65    """
 66    if not valid_node(u_of_edge):
 67        err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable"
 68        raise TypeError(err_msg)
 69
 70    if not valid_node(v_of_edge):
 71        err_msg = f"Incoming node {v_of_edge} must be a string or a Python callable"
 72        raise TypeError(err_msg)
 73
 74    return True
 75
 76def valid_dag(graph: _nx.Graph) -> bool:
 77    """
 78    Given a network graph, return whether network is a valid DAG and that all node-keys are Python callables.
 79    Meant for internal use, DAG initialization.
 80    """
 81    return (
 82        isinstance(graph, _nx.Graph) and
 83        _nx.is_directed_acyclic_graph(graph) and
 84        all(callable(node) for node in graph.nodes)
 85    )
 86
 87
 88def validate_dag(graph: _nx.DiGraph) -> None:
 89    """
 90    Given a network graph, validate whether graph is a valid Webber DAG. Otherwise, raise exceptions.
 91    Meant for internal use, DAG initialization.
 92    """
 93    if not graph.is_directed():
 94        err_msg = f"Directed graph must be defined as type {_nx.DiGraph.__name__}"
 95        raise TypeError(err_msg)
 96
 97    if any(not callable(node) for node in graph.nodes):
 98        err_msg = "All registered nodes must be callable Python functions."
 99        raise TypeError(err_msg)
100
101    if not _nx.is_directed_acyclic_graph(graph):
102        err_msg = "Directed acyclic graph must be properly defined --" \
103                + "no cycles and one or more root nodes."
104        raise ValueError(err_msg)
def valid_node(node: Union[str, Callable]) -> bool:
54def valid_node(node: str | _T.Callable) -> bool:
55    """Check whether given identifier represents a valid node (string or callable)."""
56    return (isinstance(node,str) or callable(node))

Check whether given identifier represents a valid node (string or callable).

def valid_nodes(u_of_edge: Union[str, Callable], v_of_edge: Union[str, Callable]) -> bool:
58def valid_nodes(u_of_edge: str | _T.Callable, v_of_edge: str | _T.Callable) -> bool:
59    """Check whether parent and child nodes represent valid nodes (string or callable)."""
60    return valid_node(u_of_edge) and valid_node(v_of_edge)

Check whether parent and child nodes represent valid nodes (string or callable).

def valid_dag(graph: networkx.classes.graph.Graph) -> bool:
77def valid_dag(graph: _nx.Graph) -> bool:
78    """
79    Given a network graph, return whether network is a valid DAG and that all node-keys are Python callables.
80    Meant for internal use, DAG initialization.
81    """
82    return (
83        isinstance(graph, _nx.Graph) and
84        _nx.is_directed_acyclic_graph(graph) and
85        all(callable(node) for node in graph.nodes)
86    )

Given a network graph, return whether network is a valid DAG and that all node-keys are Python callables. Meant for internal use, DAG initialization.

def validate_nodes(u_of_edge: Union[str, Callable], v_of_edge: Union[str, Callable]) -> bool:
62def validate_nodes(u_of_edge: str | _T.Callable, v_of_edge: str | _T.Callable) -> bool:
63    """
64    Given parent and child identifiers, validate that both represent valid nodes.
65    Otherwise raise exceptions.
66    """
67    if not valid_node(u_of_edge):
68        err_msg = f"Outgoing node {u_of_edge} must be a string or a Python callable"
69        raise TypeError(err_msg)
70
71    if not valid_node(v_of_edge):
72        err_msg = f"Incoming node {v_of_edge} must be a string or a Python callable"
73        raise TypeError(err_msg)
74
75    return True

Given parent and child identifiers, validate that both represent valid nodes. Otherwise raise exceptions.

def label_node(node: Callable) -> str:
43def label_node(node: _T.Callable) -> str:
44    """Generates unique identifiers for Python callables in a DAG using UUIDs."""
45    return f"{node.__name__}__{_uuid.uuid4()}"

Generates unique identifiers for Python callables in a DAG using UUIDs.