"""Algorithms for finding the lowest common ancestor of trees and DAGs."""
from collections import defaultdict
from collections.abc import Mapping, Set
from itertools import combinations_with_replacement
import networkx as nx
from networkx.utils import UnionFind, arbitrary_element, not_implemented_for
__all__ = [
"all_pairs_lowest_common_ancestor",
"tree_all_pairs_lowest_common_ancestor",
"lowest_common_ancestor",
]
[docs]
@not_implemented_for("undirected")
@nx._dispatch
def all_pairs_lowest_common_ancestor(G, pairs=None):
"""Return the lowest common ancestor of all pairs or the provided pairs
Parameters
----------
G : NetworkX directed graph
pairs : iterable of pairs of nodes, optional (default: all pairs)
The pairs of nodes of interest.
If None, will find the LCA of all pairs of nodes.
Yields
------
((node1, node2), lca) : 2-tuple
Where lca is least common ancestor of node1 and node2.
Note that for the default case, the order of the node pair is not considered,
e.g. you will not get both ``(a, b)`` and ``(b, a)``
Raises
------
NetworkXPointlessConcept
If `G` is null.
NetworkXError
If `G` is not a DAG.
Examples
--------
The default behavior is to yield the lowest common ancestor for all
possible combinations of nodes in `G`, including self-pairings:
>>> G = nx.DiGraph([(0, 1), (0, 3), (1, 2)])
>>> dict(nx.all_pairs_lowest_common_ancestor(G))
{(0, 0): 0, (0, 1): 0, (0, 3): 0, (0, 2): 0, (1, 1): 1, (1, 3): 0, (1, 2): 1, (3, 3): 3, (3, 2): 0, (2, 2): 2}
The pairs argument can be used to limit the output to only the
specified node pairings:
>>> dict(nx.all_pairs_lowest_common_ancestor(G, pairs=[(1, 2), (2, 3)]))
{(1, 2): 1, (2, 3): 0}
Notes
-----
Only defined on non-null directed acyclic graphs.
See Also
--------
lowest_common_ancestor
"""
if not nx.is_directed_acyclic_graph(G):
raise nx.NetworkXError("LCA only defined on directed acyclic graphs.")
if len(G) == 0:
raise nx.NetworkXPointlessConcept("LCA meaningless on null graphs.")
if pairs is None:
pairs = combinations_with_replacement(G, 2)
else:
# Convert iterator to iterable, if necessary. Trim duplicates.
pairs = dict.fromkeys(pairs)
# Verify that each of the nodes in the provided pairs is in G
nodeset = set(G)
for pair in pairs:
if set(pair) - nodeset:
raise nx.NodeNotFound(
f"Node(s) {set(pair) - nodeset} from pair {pair} not in G."
)
# Once input validation is done, construct the generator
def generate_lca_from_pairs(G, pairs):
ancestor_cache = {}
for v, w in pairs:
if v not in ancestor_cache:
ancestor_cache[v] = nx.ancestors(G, v)
ancestor_cache[v].add(v)
if w not in ancestor_cache:
ancestor_cache[w] = nx.ancestors(G, w)
ancestor_cache[w].add(w)
common_ancestors = ancestor_cache[v] & ancestor_cache[w]
if common_ancestors:
common_ancestor = next(iter(common_ancestors))
while True:
successor = None
for lower_ancestor in G.successors(common_ancestor):
if lower_ancestor in common_ancestors:
successor = lower_ancestor
break
if successor is None:
break
common_ancestor = successor
yield ((v, w), common_ancestor)
return generate_lca_from_pairs(G, pairs)
[docs]
@not_implemented_for("undirected")
@nx._dispatch
def lowest_common_ancestor(G, node1, node2, default=None):
"""Compute the lowest common ancestor of the given pair of nodes.
Parameters
----------
G : NetworkX directed graph
node1, node2 : nodes in the graph.
default : object
Returned if no common ancestor between `node1` and `node2`
Returns
-------
The lowest common ancestor of node1 and node2,
or default if they have no common ancestors.
Examples
--------
>>> G = nx.DiGraph()
>>> nx.add_path(G, (0, 1, 2, 3))
>>> nx.add_path(G, (0, 4, 3))
>>> nx.lowest_common_ancestor(G, 2, 4)
0
See Also
--------
all_pairs_lowest_common_ancestor"""
ans = list(all_pairs_lowest_common_ancestor(G, pairs=[(node1, node2)]))
if ans:
assert len(ans) == 1
return ans[0][1]
return default
[docs]
@not_implemented_for("undirected")
@nx._dispatch
def tree_all_pairs_lowest_common_ancestor(G, root=None, pairs=None):
r"""Yield the lowest common ancestor for sets of pairs in a tree.
Parameters
----------
G : NetworkX directed graph (must be a tree)
root : node, optional (default: None)
The root of the subtree to operate on.
If None, assume the entire graph has exactly one source and use that.
pairs : iterable or iterator of pairs of nodes, optional (default: None)
The pairs of interest. If None, Defaults to all pairs of nodes
under `root` that have a lowest common ancestor.
Returns
-------
lcas : generator of tuples `((u, v), lca)` where `u` and `v` are nodes
in `pairs` and `lca` is their lowest common ancestor.
Examples
--------
>>> import pprint
>>> G = nx.DiGraph([(1, 3), (2, 4), (1, 2)])
>>> pprint.pprint(dict(nx.tree_all_pairs_lowest_common_ancestor(G)))
{(1, 1): 1,
(2, 1): 1,
(2, 2): 2,
(3, 1): 1,
(3, 2): 1,
(3, 3): 3,
(3, 4): 1,
(4, 1): 1,
(4, 2): 2,
(4, 4): 4}
We can also use `pairs` argument to specify the pairs of nodes for which we
want to compute lowest common ancestors. Here is an example:
>>> dict(nx.tree_all_pairs_lowest_common_ancestor(G, pairs=[(1, 4), (2, 3)]))
{(2, 3): 1, (1, 4): 1}
Notes
-----
Only defined on non-null trees represented with directed edges from
parents to children. Uses Tarjan's off-line lowest-common-ancestors
algorithm. Runs in time $O(4 \times (V + E + P))$ time, where 4 is the largest
value of the inverse Ackermann function likely to ever come up in actual
use, and $P$ is the number of pairs requested (or $V^2$ if all are needed).
Tarjan, R. E. (1979), "Applications of path compression on balanced trees",
Journal of the ACM 26 (4): 690-715, doi:10.1145/322154.322161.
See Also
--------
all_pairs_lowest_common_ancestor: similar routine for general DAGs
lowest_common_ancestor: just a single pair for general DAGs
"""
if len(G) == 0:
raise nx.NetworkXPointlessConcept("LCA meaningless on null graphs.")
# Index pairs of interest for efficient lookup from either side.
if pairs is not None:
pair_dict = defaultdict(set)
# See note on all_pairs_lowest_common_ancestor.
if not isinstance(pairs, (Mapping, Set)):
pairs = set(pairs)
for u, v in pairs:
for n in (u, v):
if n not in G:
msg = f"The node {str(n)} is not in the digraph."
raise nx.NodeNotFound(msg)
pair_dict[u].add(v)
pair_dict[v].add(u)
# If root is not specified, find the exactly one node with in degree 0 and
# use it. Raise an error if none are found, or more than one is. Also check
# for any nodes with in degree larger than 1, which would imply G is not a
# tree.
if root is None:
for n, deg in G.in_degree:
if deg == 0:
if root is not None:
msg = "No root specified and tree has multiple sources."
raise nx.NetworkXError(msg)
root = n
# checking deg>1 is not sufficient for MultiDiGraphs
elif deg > 1 and len(G.pred[n]) > 1:
msg = "Tree LCA only defined on trees; use DAG routine."
raise nx.NetworkXError(msg)
if root is None:
raise nx.NetworkXError("Graph contains a cycle.")
# Iterative implementation of Tarjan's offline lca algorithm
# as described in CLRS on page 521 (2nd edition)/page 584 (3rd edition)
uf = UnionFind()
ancestors = {}
for node in G:
ancestors[node] = uf[node]
colors = defaultdict(bool)
for node in nx.dfs_postorder_nodes(G, root):
colors[node] = True
for v in pair_dict[node] if pairs is not None else G:
if colors[v]:
# If the user requested both directions of a pair, give it.
# Otherwise, just give one.
if pairs is not None and (node, v) in pairs:
yield (node, v), ancestors[uf[v]]
if pairs is None or (v, node) in pairs:
yield (v, node), ancestors[uf[v]]
if node != root:
parent = arbitrary_element(G.pred[node])
uf.union(parent, node)
ancestors[uf[parent]] = parent