"""Shortest paths and path lengths using the A* ("A star") algorithm.
"""
from heapq import heappop, heappush
from itertools import count
import networkx as nx
from networkx.algorithms.shortest_paths.weighted import _weight_function
__all__ = ["astar_path", "astar_path_length"]
[docs]
@nx._dispatchable(edge_attrs="weight", preserve_node_attrs="heuristic")
def astar_path(G, source, target, heuristic=None, weight="weight", *, cutoff=None):
"""Returns a list of nodes in a shortest path between source and target
using the A* ("A-star") algorithm.
There may be more than one shortest path. This returns only one.
Parameters
----------
G : NetworkX graph
source : node
Starting node for path
target : node
Ending node for path
heuristic : function
A function to evaluate the estimate of the distance
from the a node to the target. The function takes
two nodes arguments and must return a number.
If the heuristic is inadmissible (if it might
overestimate the cost of reaching the goal from a node),
the result may not be a shortest path.
The algorithm does not support updating heuristic
values for the same node due to caching the first
heuristic calculation per node.
weight : string or function
If this is a string, then edge weights will be accessed via the
edge attribute with this key (that is, the weight of the edge
joining `u` to `v` will be ``G.edges[u, v][weight]``). If no
such edge attribute exists, the weight of the edge is assumed to
be one.
If this is a function, the weight of an edge is the value
returned by the function. The function must accept exactly three
positional arguments: the two endpoints of an edge and the
dictionary of edge attributes for that edge. The function must
return a number or None to indicate a hidden edge.
cutoff : float, optional
If this is provided, the search will be bounded to this value. I.e. if
the evaluation function surpasses this value for a node n, the node will not
be expanded further and will be ignored. More formally, let h'(n) be the
heuristic function, and g(n) be the cost of reaching n from the source node. Then,
if g(n) + h'(n) > cutoff, the node will not be explored further.
Note that if the heuristic is inadmissible, it is possible that paths
are ignored even though they satisfy the cutoff.
Raises
------
NetworkXNoPath
If no path exists between source and target.
Examples
--------
>>> G = nx.path_graph(5)
>>> print(nx.astar_path(G, 0, 4))
[0, 1, 2, 3, 4]
>>> G = nx.grid_graph(dim=[3, 3]) # nodes are two-tuples (x,y)
>>> nx.set_edge_attributes(G, {e: e[1][0] * 2 for e in G.edges()}, "cost")
>>> def dist(a, b):
... (x1, y1) = a
... (x2, y2) = b
... return ((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5
>>> print(nx.astar_path(G, (0, 0), (2, 2), heuristic=dist, weight="cost"))
[(0, 0), (0, 1), (0, 2), (1, 2), (2, 2)]
Notes
-----
Edge weight attributes must be numerical.
Distances are calculated as sums of weighted edges traversed.
The weight function can be used to hide edges by returning None.
So ``weight = lambda u, v, d: 1 if d['color']=="red" else None``
will find the shortest red path.
See Also
--------
shortest_path, dijkstra_path
"""
if source not in G or target not in G:
msg = f"Either source {source} or target {target} is not in G"
raise nx.NodeNotFound(msg)
if heuristic is None:
# The default heuristic is h=0 - same as Dijkstra's algorithm
def heuristic(u, v):
return 0
push = heappush
pop = heappop
weight = _weight_function(G, weight)
G_succ = G._adj # For speed-up (and works for both directed and undirected graphs)
# The queue stores priority, node, cost to reach, and parent.
# Uses Python heapq to keep in priority order.
# Add a counter to the queue to prevent the underlying heap from
# attempting to compare the nodes themselves. The hash breaks ties in the
# priority and is guaranteed unique for all nodes in the graph.
c = count()
queue = [(0, next(c), source, 0, None)]
# Maps enqueued nodes to distance of discovered paths and the
# computed heuristics to target. We avoid computing the heuristics
# more than once and inserting the node into the queue too many times.
enqueued = {}
# Maps explored nodes to parent closest to the source.
explored = {}
while queue:
# Pop the smallest item from queue.
_, __, curnode, dist, parent = pop(queue)
if curnode == target:
path = [curnode]
node = parent
while node is not None:
path.append(node)
node = explored[node]
path.reverse()
return path
if curnode in explored:
# Do not override the parent of starting node
if explored[curnode] is None:
continue
# Skip bad paths that were enqueued before finding a better one
qcost, h = enqueued[curnode]
if qcost < dist:
continue
explored[curnode] = parent
for neighbor, w in G_succ[curnode].items():
cost = weight(curnode, neighbor, w)
if cost is None:
continue
ncost = dist + cost
if neighbor in enqueued:
qcost, h = enqueued[neighbor]
# if qcost <= ncost, a less costly path from the
# neighbor to the source was already determined.
# Therefore, we won't attempt to push this neighbor
# to the queue
if qcost <= ncost:
continue
else:
h = heuristic(neighbor, target)
if cutoff and ncost + h > cutoff:
continue
enqueued[neighbor] = ncost, h
push(queue, (ncost + h, next(c), neighbor, ncost, curnode))
raise nx.NetworkXNoPath(f"Node {target} not reachable from {source}")
[docs]
@nx._dispatchable(edge_attrs="weight", preserve_node_attrs="heuristic")
def astar_path_length(
G, source, target, heuristic=None, weight="weight", *, cutoff=None
):
"""Returns the length of the shortest path between source and target using
the A* ("A-star") algorithm.
Parameters
----------
G : NetworkX graph
source : node
Starting node for path
target : node
Ending node for path
heuristic : function
A function to evaluate the estimate of the distance
from the a node to the target. The function takes
two nodes arguments and must return a number.
If the heuristic is inadmissible (if it might
overestimate the cost of reaching the goal from a node),
the result may not be a shortest path.
The algorithm does not support updating heuristic
values for the same node due to caching the first
heuristic calculation per node.
weight : string or function
If this is a string, then edge weights will be accessed via the
edge attribute with this key (that is, the weight of the edge
joining `u` to `v` will be ``G.edges[u, v][weight]``). If no
such edge attribute exists, the weight of the edge is assumed to
be one.
If this is a function, the weight of an edge is the value
returned by the function. The function must accept exactly three
positional arguments: the two endpoints of an edge and the
dictionary of edge attributes for that edge. The function must
return a number or None to indicate a hidden edge.
cutoff : float, optional
If this is provided, the search will be bounded to this value. I.e. if
the evaluation function surpasses this value for a node n, the node will not
be expanded further and will be ignored. More formally, let h'(n) be the
heuristic function, and g(n) be the cost of reaching n from the source node. Then,
if g(n) + h'(n) > cutoff, the node will not be explored further.
Note that if the heuristic is inadmissible, it is possible that paths
are ignored even though they satisfy the cutoff.
Raises
------
NetworkXNoPath
If no path exists between source and target.
See Also
--------
astar_path
"""
if source not in G or target not in G:
msg = f"Either source {source} or target {target} is not in G"
raise nx.NodeNotFound(msg)
weight = _weight_function(G, weight)
path = astar_path(G, source, target, heuristic, weight, cutoff=cutoff)
return sum(weight(u, v, G[u][v]) for u, v in zip(path[:-1], path[1:]))