图(Graph)是一种用来对某些现实问题进行建模的抽象的数学结构,这些问题从逻辑上可以被划分成一系列相互连接的 节点 。其中的节点称为 顶点 ( vertex ),顶点之间的连接称为 边 ( edge )。
比如地铁线路就可以看作由图表示成的运输网络。每一个顶点都代表一个地铁站,而顶点之间的边则表示两个地铁站之间的路径。如果想知道某个站点到另一个站点的最短路径,图算法就能发挥作用。实际上,图算法可以被应用到任何类型的网络问题中。
map as graph
# edge.py from __future__ import annotations from dataclasses import dataclass @dataclass class Edge: u: int # the "from" vertex v: int # the "to" vertex def reversed(self) -> Edge: return Edge(self.v, self.u) def __str__(self) -> str: return f"{self.u} -> {self.v}"
上面代码中的 Edge
类表示两个顶点之间的连接(即“边”),每个顶点都由整数索引表示。其中 u
用来表示第一个顶点, v
表示第二个顶点。
这里只关注非方向性的 graph,edge 是双向的。而在 有向图(digraph) 中,edge 可以是单向的。 reversed()
方法用来返回当前 edge 的逆向形式。
# graph.py from typing import TypeVar, Generic, List, Optional from edge import Edge V = TypeVar('V') # type of the vertices in the graph class Graph(Generic[V]): def __init__(self, vertices: List[V] = []) -> None: self._vertices: List[V] = vertices self._edges: List[List[Edge]] = [[] for _ in vertices] @property def vertex_count(self) -> int: return len(self._vertices) # Number of vertices @property def edge_count(self) -> int: return sum(map(len, self._edges)) # Number of edges # Add a vertex to the graph and return its index def add_vertex(self, vertex: V) -> int: self._vertices.append(vertex) self._edges.append([]) # Add empty list for containing edges return self.vertex_count - 1 # Return index of added vertex # This is an undirected graph, # so we always add edges in both directions def add_edge(self, edge: Edge) -> None: self._edges[edge.u].append(edge) self._edges[edge.v].append(edge.reversed()) # Add an edge using vertex indices (convenience method) def add_edge_by_indices(self, u: int, v: int) -> None: edge: Edge = Edge(u, v) self.add_edge(edge) # Add an edge by looking up vertex indices (convenience method) def add_edge_by_vertices(self, first: V, second: V) -> None: u: int = self._vertices.index(first) v: int = self._vertices.index(second) self.add_edge_by_indices(u, v) # Find the vertex at a specific index def vertex_at(self, index: int) -> V: return self._vertices[index] # Find the index of a vertex in the graph def index_of(self, vertex: V) -> int: return self._vertices.index(vertex) # Find the vertices that a vertex at some index is connected to def neighbors_for_index(self, index: int) -> List[V]: return list(map(self.vertex_at, [e.v for e in self._edges[index]])) # Look up a vertice's index and find its neighbors (convenience method) def neighbors_for_vertex(self, vertex: V) -> List[V]: return self.neighbors_for_index(self.index_of(vertex)) # Return all of the edges associated with a vertex at some index def edges_for_index(self, index: int) -> List[Edge]: return self._edges[index] # Look up the index of a vertex and return its edges (convenience method) def edges_for_vertex(self, vertex: V) -> List[Edge]: return self.edges_for_index(self.index_of(vertex)) # Make it easy to pretty-print a Graph def __str__(self) -> str: desc: str = "" for i in range(self.vertex_count): desc += f"{self.vertex_at(i)} -> {self.neighbors_for_index(i)}\n" return desc
Graph
类聚焦于 graph 的核心角色,即将顶点用边连接起来。
_vertices
列表是 Graph 类的核心,每个顶点都会被存储在该列表中。但是之后在实际引用时会使用顶点在列表中的索引。顶点本身有可能会是非常复杂的数据类型,但其索引一定会是 int 类型,相对而言更加方便使用。
graph 数据类型可以使用 adjacency lists 方式实现,每个顶点都拥有一个列表,里面包含了这个顶点连接的其他顶点。这里使用了由 edge 组成的列表再组成的列表( _edges
),每个顶点都拥有一个由 edge 组成的列表,这些 edge 表示该顶点与其他顶点的连接关系。
Graph
类中实现的方法的简单介绍:
vertex_count edge_count add_vertex add_edge add_edge_by_indices add_edge_by_vertices vertex_at index_of neighbors_for_index neighbors_for_vertex edges_for_index edges_for_vertex __str__
补充测试代码:
# graph.py continued if __name__ == "__main__": # test basic Graph construction city_graph: Graph[str] = Graph(["Seattle", "San Francisco", "Los Angeles", "Riverside", "Phoenix", "Chicago", "Boston", "New York", "Atlanta", "Miami", "Dallas", "Houston", "Detroit", "Philadelphia", "Washington"]) city_graph.add_edge_by_vertices("Seattle", "Chicago") city_graph.add_edge_by_vertices("Seattle", "San Francisco") city_graph.add_edge_by_vertices("San Francisco", "Riverside") city_graph.add_edge_by_vertices("San Francisco", "Los Angeles") city_graph.add_edge_by_vertices("Los Angeles", "Riverside") city_graph.add_edge_by_vertices("Los Angeles", "Phoenix") city_graph.add_edge_by_vertices("Riverside", "Phoenix") city_graph.add_edge_by_vertices("Riverside", "Chicago") city_graph.add_edge_by_vertices("Phoenix", "Dallas") city_graph.add_edge_by_vertices("Phoenix", "Houston") city_graph.add_edge_by_vertices("Dallas", "Chicago") city_graph.add_edge_by_vertices("Dallas", "Atlanta") city_graph.add_edge_by_vertices("Dallas", "Houston") city_graph.add_edge_by_vertices("Houston", "Atlanta") city_graph.add_edge_by_vertices("Houston", "Miami") city_graph.add_edge_by_vertices("Atlanta", "Chicago") city_graph.add_edge_by_vertices("Atlanta", "Washington") city_graph.add_edge_by_vertices("Atlanta", "Miami") city_graph.add_edge_by_vertices("Miami", "Washington") city_graph.add_edge_by_vertices("Chicago", "Detroit") city_graph.add_edge_by_vertices("Detroit", "Boston") city_graph.add_edge_by_vertices("Detroit", "Washington") city_graph.add_edge_by_vertices("Detroit", "New York") city_graph.add_edge_by_vertices("Boston", "New York") city_graph.add_edge_by_vertices("New York", "Philadelphia") city_graph.add_edge_by_vertices("Philadelphia", "Washington") print(city_graph)
运行结果:
Seattle -> ['Chicago', 'San Francisco'] San Francisco -> ['Seattle', 'Riverside', 'Los Angeles'] Los Angeles -> ['San Francisco', 'Riverside', 'Phoenix'] Riverside -> ['San Francisco', 'Los Angeles', 'Phoenix', 'Chicago'] Phoenix -> ['Los Angeles', 'Riverside', 'Dallas', 'Houston'] Chicago -> ['Seattle', 'Riverside', 'Dallas', 'Atlanta', 'Detroit'] Boston -> ['Detroit', 'New York'] New York -> ['Detroit', 'Boston', 'Philadelphia'] Atlanta -> ['Dallas', 'Houston', 'Chicago', 'Washington', 'Miami'] Miami -> ['Houston', 'Atlanta', 'Washington'] Dallas -> ['Phoenix', 'Chicago', 'Atlanta', 'Houston'] Houston -> ['Phoenix', 'Dallas', 'Atlanta', 'Miami'] Detroit -> ['Chicago', 'Boston', 'Washington', 'New York'] Philadelphia -> ['New York', 'Washington'] Washington -> ['Atlanta', 'Miami', 'Detroit', 'Philadelphia']
寻找最短路径
在 graph 理论中,任意两个顶点之间的所有连线(边)称为路径。即从一个顶点到达另一个顶点需要走过的所有路径。
在一个未加权的 graph 中(即不考虑边的长度),寻找最短的路径意味着从起始顶点到目标顶点之间经过的边最少。可以使用 宽度优先搜索(breadth-first search, BFS) 算法查找两个顶点之间的最短路径。(BFS 算法的具体实现可参考 基本算法问题的 Python 解法(递归与搜索) 中的迷宫问题)。
BFS 部分代码如下:
# generic_search.py from __future__ import annotations from typing import TypeVar, Generic, List, Callable, Deque, Set, Optional T = TypeVar('T') class Node(Generic[T]): def __init__(self, state: T, parent: Optional[Node]) -> None: self.state: T = state self.parent: Optional[Node] = parent class Queue(Generic[T]): def __init__(self) -> None: self._container: Deque[T] = Deque() @property def empty(self) -> bool: return not self._container # not is true for empty container def push(self, item: T) -> None: self._container.append(item) def pop(self) -> T: return self._container.popleft() # FIFO def __repr__(self) -> str: return repr(self._container) def bfs(initial: T, goal_test: Callable[[T], bool], successors: Callable[[T], List[T]]) -> Optional[Node[T]]: # frontier is where we've yet to go frontier: Queue[Node[T]] = Queue() frontier.push(Node(initial, None)) # explored is where we've been explored: Set[T] = {initial} # keep going while there is more to explore while not frontier.empty: current_node: Node[T] = frontier.pop() current_state: T = current_node.state # if we found the goal, we're done if goal_test(current_state): return current_node # check where we can go next and haven't explored for child in successors(current_state): if child in explored: # skip children we already explored continue explored.add(child) frontier.push(Node(child, current_node)) return None # went through everything and never found goal def node_to_path(node: Node[T]) -> List[T]: path: List[T] = [node.state] # work backwards from end to front while node.parent is not None: node = node.parent path.append(node.state) path.reverse() return path
继续补充 graph.py
代码如下:
# graph.py continued if __name__ == "__main__": # ... from generic_search import bfs, Node, node_to_path bfs_result: Optional[Node[V]] = bfs("Boston", lambda x: x == "Miami", city_graph.neighbors_for_vertex) if bfs_result is None: print("No solution found using breadth-first search!") else: path: List[V] = node_to_path(bfs_result) print("Path from Boston to Miami:") print(path)
bfs()
函数接受三个参数:初始状态、用于检测当前状态是否符合目标状态的 Callable(可调用对象)、用于寻找达成目标状态的路径的 Callable。
若需要寻找 Boston 到 Miami 的最短路径(不考虑加权的情况),则初始状态为顶点 “Boston”,用于状态检测的 Callable 则判断当前顶点是否为 “Miami”。
运行效果:
Path from Boston to Miami: ['Boston', 'Detroit', 'Washington', 'Miami']
加权图
之前的计算中,最短路径只考虑经过的站点最少,而未将站点之间的路程计算在内。若需要将路程包含进去,则可以为 edge 加上 权重 来表示该 edge 对应的距离。
为了实现加权的 graph,需要实现 Edge 的子类 WeightedEdge 以及 Graph 的子类 WeightedGraph。每一个 WeightedEdge 对象都有一个关联的 float 类型的属性用来表示权重。
# weighted_edge.py from __future__ import annotations from dataclasses import dataclass from edge import Edge @dataclass class WeightedEdge(Edge): weight: float def reversed(self) -> WeightedEdge: return WeightedEdge(self.v, self.u, self.weight) # so that we can order edges by weight to find the minimum weight edge def __lt__(self, other: WeightedEdge) -> bool: return self.weight < other.weight def __str__(self) -> str: return f"{self.u} {self.weight}> {self.v}"
WeightedEdge 子类添加了一个 weight
属性,通过 __lt__()
方法实现了 <
操作符,令 WeightedEdge 对象成为可比较的,使得返回 weight 最小的 edge 成为可能。
# weighted_graph.py from typing import TypeVar, Generic, List, Tuple from graph import Graph from weighted_edge import WeightedEdge V = TypeVar('V') # type of the vertices in the graph class WeightedGraph(Generic[V], Graph[V]): def __init__(self, vertices: List[V] = []) -> None: self._vertices: List[V] = vertices self._edges: List[List[WeightedEdge]] = [[] for _ in vertices] def add_edge_by_indices(self, u: int, v: int, weight: float) -> None: edge: WeightedEdge = WeightedEdge(u, v, weight) self.add_edge(edge) # call superclass version def add_edge_by_vertices(self, first: V, second: V, weight: float) -> None: u: int = self._vertices.index(first) v: int = self._vertices.index(second) self.add_edge_by_indices(u, v, weight) def neighbors_for_index_with_weights(self, index: int) -> List[Tuple[V, float]]: distance_tuples: List[Tuple[V, float]] = [] for edge in self.edges_for_index(index): distance_tuples.append((self.vertex_at(edge.v), edge.weight)) return distance_tuples def __str__(self) -> str: desc: str = "" for i in range(self.vertex_count): desc += f"{self.vertex_at(i)} -> {self.neighbors_for_index_with_weights(i)}\n" return desc
WeightedGraph 类继承自 Graph,在原来的基础上对某些需要适应 weight 属性的方法做了对应的修改。
补充 weighted_graph.py
代码,测试运行效果:
# weighted_graph.py continued if __name__ == "__main__": city_graph2: WeightedGraph[str] = WeightedGraph(["Seattle", "San Francisco", "Los Angeles", "Riverside", "Phoenix", "Chicago", "Boston", "New York", "Atlanta", "Miami", "Dallas", "Houston", "Detroit", "Philadelphia", "Washington"]) city_graph2.add_edge_by_vertices("Seattle", "Chicago", 1737) city_graph2.add_edge_by_vertices("Seattle", "San Francisco", 678) city_graph2.add_edge_by_vertices("San Francisco", "Riverside", 386) city_graph2.add_edge_by_vertices("San Francisco", "Los Angeles", 348) city_graph2.add_edge_by_vertices("Los Angeles", "Riverside", 50) city_graph2.add_edge_by_vertices("Los Angeles", "Phoenix", 357) city_graph2.add_edge_by_vertices("Riverside", "Phoenix", 307) city_graph2.add_edge_by_vertices("Riverside", "Chicago", 1704) city_graph2.add_edge_by_vertices("Phoenix", "Dallas", 887) city_graph2.add_edge_by_vertices("Phoenix", "Houston", 1015) city_graph2.add_edge_by_vertices("Dallas", "Chicago", 805) city_graph2.add_edge_by_vertices("Dallas", "Atlanta", 721) city_graph2.add_edge_by_vertices("Dallas", "Houston", 225) city_graph2.add_edge_by_vertices("Houston", "Atlanta", 702) city_graph2.add_edge_by_vertices("Houston", "Miami", 968) city_graph2.add_edge_by_vertices("Atlanta", "Chicago", 588) city_graph2.add_edge_by_vertices("Atlanta", "Washington", 543) city_graph2.add_edge_by_vertices("Atlanta", "Miami", 604) city_graph2.add_edge_by_vertices("Miami", "Washington", 923) city_graph2.add_edge_by_vertices("Chicago", "Detroit", 238) city_graph2.add_edge_by_vertices("Detroit", "Boston", 613) city_graph2.add_edge_by_vertices("Detroit", "Washington", 396) city_graph2.add_edge_by_vertices("Detroit", "New York", 482) city_graph2.add_edge_by_vertices("Boston", "New York", 190) city_graph2.add_edge_by_vertices("New York", "Philadelphia", 81) city_graph2.add_edge_by_vertices("Philadelphia", "Washington", 123) print(city_graph2)
运行效果:
Seattle -> [('Chicago', 1737), ('San Francisco', 678)] San Francisco -> [('Seattle', 678), ('Riverside', 386), ('Los Angeles', 348)] Los Angeles -> [('San Francisco', 348), ('Riverside', 50), ('Phoenix', 357)] Riverside -> [('San Francisco', 386), ('Los Angeles', 50), ('Phoenix', 307), ('Chicago', 1704)] Phoenix -> [('Los Angeles', 357), ('Riverside', 307), ('Dallas', 887), ('Houston', 1015)] Chicago -> [('Seattle', 1737), ('Riverside', 1704), ('Dallas', 805), ('Atlanta', 588), ('Detroit', 238)] Boston -> [('Detroit', 613), ('New York', 190)] New York -> [('Detroit', 482), ('Boston', 190), ('Philadelphia', 81)] Atlanta -> [('Dallas', 721), ('Houston', 702), ('Chicago', 588), ('Washington', 543), ('Miami', 604)] Miami -> [('Houston', 968), ('Atlanta', 604), ('Washington', 923)] Dallas -> [('Phoenix', 887), ('Chicago', 805), ('Atlanta', 721), ('Houston', 225)] Houston -> [('Phoenix', 1015), ('Dallas', 225), ('Atlanta', 702), ('Miami', 968)] Detroit -> [('Chicago', 238), ('Boston', 613), ('Washington', 396), ('New York', 482)] Philadelphia -> [('New York', 81), ('Washington', 123)] Washington -> [('Atlanta', 543), ('Miami', 923), ('Detroit', 396), ('Philadelphia', 123)]
在加权图中搜索最短路径
寻找某个起点城市到另一个城市的所有路线中花费最小的一条,属于单源头最短路径(single-source shortest path)问题,即从加权图中的某个顶点到任意的另外一个顶点的最短路径。
Dijkstra 算法可以用来解决单源头最短路径问题。该算法从某个起始顶点开始,可以找出加权图中所有其他顶点到起始顶点的最短路径。从某个顶点开始按照远近关系依次遍历完所有顶点并记录其总的花费(从起始顶点到当前顶点),若重复出现的顶点花费更小,则令其替换已有的记录。
具体步骤如下:
- 将起始顶点加入到优先级队列中
- 从优先级队列中弹出一个顶点(一开始就是起始顶点)作为当前顶点
- 查看与当前顶点临近的所有顶点,若某一个之前没有被记录到,或某个顶点按照当前路径的花费低于已有的最小记录,则记录其到起始顶点的距离(作为新的最小记录)及生成该距离的最后一条边(记录路径),并将该顶点 push 到优先级队列中(令其作为之后的“当前”顶点)
- 重复前面两步直到优先级队列为空
- 返回所有顶点到起始顶点的最小距离及路径
# priority_queue.py from typing import TypeVar, Generic, List from heapq import heappush, heappop T = TypeVar('T') class PriorityQueue(Generic[T]): def __init__(self) -> None: self._container: List[T] = [] @property def empty(self) -> bool: return not self._container def push(self, item: T) -> None: heappush(self._container, item) def pop(self) -> T: return heappop(self._container) def __repr__(self) -> str: return repr(self._container)
# dijkstra.py from __future__ import annotations from typing import TypeVar, List, Optional, Tuple, Dict from dataclasses import dataclass from mst import WeightedPath, print_weighted_path from weighted_graph import WeightedGraph from weighted_edge import WeightedEdge from priority_queue import PriorityQueue V = TypeVar('V') # type of the vertices in the graph @dataclass class DijkstraNode: vertex: int distance: float def __lt__(self, other: DijkstraNode) -> bool: return self.distance < other.distance def __eq__(self, other: DijkstraNode) -> bool: return self.distance == other.distance def dijkstra(wg: WeightedGraph[V], root: V) -> Tuple[List[Optional[float]], Dict[int, WeightedEdge]]: first: int = wg.index_of(root) # distances are unknown at first distances: List[Optional[float]] = [None] * wg.vertex_count distances[first] = 0 # the root is 0 away from the root path_dict: Dict[int, WeightedEdge] = {} # how we got to each vertex pq: PriorityQueue[DijkstraNode] = PriorityQueue() pq.push(DijkstraNode(first, 0)) while not pq.empty: u: int = pq.pop().vertex # explore the next closest vertex dist_u: float = distances[u] # should already have seen it # look at every edge/vertex from current vertex for we in wg.edges_for_index(u): # the old distance from starting vertex to this vertex dist_v: float = distances[we.v] # no old distance or found shorter path if dist_v is None or dist_v > we.weight + dist_u: # update distance to this vertex distances[we.v] = we.weight + dist_u # update the edge on the shortest path to this vertex path_dict[we.v] = we # explore this vertex soon pq.push(DijkstraNode(we.v, we.weight + dist_u)) return distances, path_dict # Helper function to get easier access to dijkstra results def distance_array_to_vertex_dict(wg: WeightedGraph[V], distances: List[Optional[float]]) -> Dict[V, Optional[float]]: distance_dict: Dict[V, Optional[float]] = {} for i in range(len(distances)): distance_dict[wg.vertex_at(i)] = distances[i] return distance_dict # Takes a dictionary of edges to reach each node and returns a list of # edges that goes from `start` ot `end` def path_dict_to_path(start: int, end: int, path_dict: Dict[int, WeightedEdge]) -> WeightedPath: if len(path_dict) == 0: return [] edge_path: WeightedPath = [] e: WeightedEdge = path_dict[end] edge_path.append(e) while e.u != start: e = path_dict[e.u] edge_path.append(e) return list(reversed(edge_path)) if __name__ == "__main__": city_graph2: WeightedGraph[str] = WeightedGraph(["Seattle", "San Francisco", "Los Angeles", "Riverside", "Phoenix", "Chicago", "Boston", "New York", "Atlanta", "Miami", "Dallas", "Houston", "Detroit", "Philadelphia", "Washington"]) city_graph2.add_edge_by_vertices("Seattle", "Chicago", 1737) city_graph2.add_edge_by_vertices("Seattle", "San Francisco", 678) city_graph2.add_edge_by_vertices("San Francisco", "Riverside", 386) city_graph2.add_edge_by_vertices("San Francisco", "Los Angeles", 348) city_graph2.add_edge_by_vertices("Los Angeles", "Riverside", 50) city_graph2.add_edge_by_vertices("Los Angeles", "Phoenix", 357) city_graph2.add_edge_by_vertices("Riverside", "Phoenix", 307) city_graph2.add_edge_by_vertices("Riverside", "Chicago", 1704) city_graph2.add_edge_by_vertices("Phoenix", "Dallas", 887) city_graph2.add_edge_by_vertices("Phoenix", "Houston", 1015) city_graph2.add_edge_by_vertices("Dallas", "Chicago", 805) city_graph2.add_edge_by_vertices("Dallas", "Atlanta", 721) city_graph2.add_edge_by_vertices("Dallas", "Houston", 225) city_graph2.add_edge_by_vertices("Houston", "Atlanta", 702) city_graph2.add_edge_by_vertices("Houston", "Miami", 968) city_graph2.add_edge_by_vertices("Atlanta", "Chicago", 588) city_graph2.add_edge_by_vertices("Atlanta", "Washington", 543) city_graph2.add_edge_by_vertices("Atlanta", "Miami", 604) city_graph2.add_edge_by_vertices("Miami", "Washington", 923) city_graph2.add_edge_by_vertices("Chicago", "Detroit", 238) city_graph2.add_edge_by_vertices("Detroit", "Boston", 613) city_graph2.add_edge_by_vertices("Detroit", "Washington", 396) city_graph2.add_edge_by_vertices("Detroit", "New York", 482) city_graph2.add_edge_by_vertices("Boston", "New York", 190) city_graph2.add_edge_by_vertices("New York", "Philadelphia", 81) city_graph2.add_edge_by_vertices("Philadelphia", "Washington", 123) distances, path_dict = dijkstra(city_graph2, "Los Angeles") name_distance: Dict[str, Optional[int]] = distance_array_to_vertex_dict(city_graph2, distances) print("Distances from Los Angeles:") for key, value in name_distance.items(): print(f"{key} : {value}") print("") print("Shortest path from Los Angelges to Boston:") path: WeightedPath = path_dict_to_path(city_graph2.index_of("Los Angeles"), city_graph2.index_of("Boston"), path_dict) print_weighted_path(city_graph2, path)
运行结果:
Distances from Los Angeles: Seattle : 1026 San Francisco : 348 Los Angeles : 0 Riverside : 50 Phoenix : 357 Chicago : 1754 Boston : 2605 New York : 2474 Atlanta : 1965 Miami : 2340 Dallas : 1244 Houston : 1372 Detroit : 1992 Philadelphia : 2511 Washington : 2388 Shortest path from Los Angelges to Boston: Los Angeles 50> Riverside Riverside 1704> Chicago Chicago 238> Detroit Detroit 613> Boston Total Weight: 2605