from itertools import combinations, chain
from networkx.utils import pairwise, not_implemented_for
import networkx as nx
__all__ = ['metric_closure', 'steiner_tree']
[docs]@not_implemented_for('directed')
def metric_closure(G, weight='weight'):
""" Return the metric closure of a graph.
The metric closure of a graph *G* is the complete graph in which each edge
is weighted by the shortest path distance between the nodes in *G* .
Parameters
----------
G : NetworkX graph
Returns
-------
NetworkX graph
Metric closure of the graph `G`.
"""
M = nx.Graph()
Gnodes = set(G)
# check for connected graph while processing first node
all_paths_iter = nx.all_pairs_dijkstra(G, weight=weight)
u, (distance, path) = next(all_paths_iter)
if Gnodes - set(distance):
msg = "G is not a connected graph. metric_closure is not defined."
raise nx.NetworkXError(msg)
Gnodes.remove(u)
for v in Gnodes:
M.add_edge(u, v, distance=distance[v], path=path[v])
# first node done -- now process the rest
for u, (distance, path) in all_paths_iter:
Gnodes.remove(u)
for v in Gnodes:
M.add_edge(u, v, distance=distance[v], path=path[v])
return M
[docs]@not_implemented_for('multigraph')
@not_implemented_for('directed')
def steiner_tree(G, terminal_nodes, weight='weight'):
""" Return an approximation to the minimum Steiner tree of a graph.
Parameters
----------
G : NetworkX graph
terminal_nodes : list
A list of terminal nodes for which minimum steiner tree is
to be found.
Returns
-------
NetworkX graph
Approximation to the minimum steiner tree of `G` induced by
`terminal_nodes` .
Notes
-----
Steiner tree can be approximated by computing the minimum spanning
tree of the subgraph of the metric closure of the graph induced by the
terminal nodes, where the metric closure of *G* is the complete graph in
which each edge is weighted by the shortest path distance between the
nodes in *G* .
This algorithm produces a tree whose weight is within a (2 - (2 / t))
factor of the weight of the optimal Steiner tree where *t* is number of
terminal nodes.
"""
# M is the subgraph of the metric closure induced by the terminal nodes of
# G.
M = metric_closure(G, weight=weight)
# Use the 'distance' attribute of each edge provided by the metric closure
# graph.
H = M.subgraph(terminal_nodes)
mst_edges = nx.minimum_spanning_edges(H, weight='distance', data=True)
# Create an iterator over each edge in each shortest path; repeats are okay
edges = chain.from_iterable(pairwise(d['path']) for u, v, d in mst_edges)
T = G.edge_subgraph(edges)
return T