Coverage for src/carbs/utilities/graphs_subroutines.py: 92%
190 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-15 06:48 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-15 06:48 +0000
1""" Implements helper functions for graph search algorithms.
3This module contains the longer helper functions for the graph search
4algorithms.
5"""
6import numpy as np
9def assign_opt(dist_mat_in):
10 """ Optimal assignment used by Murty's m-best assignment algorithm.
12 This ports the implementation from
13 `here <https://ba-tuong.vo-au.com/codes.html>`_.
14 to python.
16 Args:
17 dist_mat_in (numpy array): Distance matrix to calculate assignment from
19 Returns:
20 tuple containing
22 - assign (numpy array): Optimal assignment
23 - cost (float): Cost of the assignment
24 """
26 dist_mat = dist_mat_in.copy()
27 if len(dist_mat.shape) == 1:
28 dist_mat = dist_mat.reshape((1, dist_mat.shape[0]))
29 elif len(dist_mat.shape) > 2:
30 raise RuntimeError('Distance matrix must have at most 2 dimensions')
31 dist_mat_orig = dist_mat.copy()
33 (n_rows, n_cols) = dist_mat.shape
34 assign = -np.ones(n_rows, dtype=int)
35 if (np.isfinite(dist_mat) & (dist_mat < 0)).any():
36 raise RuntimeError('All matrix values must be non-negative')
38 if np.isinf(dist_mat).all():
39 return (assign, 0)
41 # replace any infite values with a large number
42 if np.isinf(dist_mat).any():
43 max_finite = dist_mat[np.nonzero(np.isfinite(dist_mat))]
44 if len(max_finite) == 0:
45 return (assign, 0)
47 max_finite = max_finite.max()
48 if max_finite > 0:
49 inf_val = 10 * max_finite * n_rows * n_cols
50 else:
51 inf_val = 10
52 dist_mat[np.nonzero(np.isinf(dist_mat))] = inf_val
54 covered_cols = [False] * n_cols
55 covered_rows = [False] * n_rows
56 star_mat = np.zeros((n_rows, n_cols), dtype=bool)
57 prime_mat = np.zeros((n_rows, n_cols), dtype=bool)
59 if n_rows <= n_cols:
60 min_dim = n_rows
62 # find min of each row and subract from each element in the row
63 dist_mat = dist_mat - dist_mat.min(axis=1).reshape((n_rows, 1))
65 # steps 1 and 2a
66 for row in range(0, n_rows):
67 for col in range(0, n_cols):
68 if dist_mat[row, col] == 0:
69 if not covered_cols[col]:
70 star_mat[row, col] = True
71 covered_cols[col] = True
72 break
73 else:
74 min_dim = n_cols
76 # find min of each column and subract from each element in the column
77 dist_mat = dist_mat - dist_mat.min(axis=0)
79 # steps 1 and 2a
80 for col in range(0, n_cols):
81 for row in range(0, n_rows):
82 if dist_mat[row, col].squeeze() == 0:
83 if not covered_rows[row]:
84 star_mat[row, col] = True
85 covered_cols[col] = True
86 covered_rows[row] = True
87 break
89 for row in range(0, n_rows):
90 covered_rows[row] = False
92 __aop_step2b(assign, dist_mat, star_mat, prime_mat, covered_cols,
93 covered_rows, n_rows, n_cols, min_dim)
94 cost = __aop_comp_assign(assign, dist_mat_orig, n_rows)
96 return (assign, cost)
99def __aop_step2a(assign, dist_mat, star_mat, prime_mat, covered_cols,
100 covered_rows, n_rows, n_cols, min_dim):
101 for col in range(0, n_cols):
102 for row in range(0, n_rows):
103 if star_mat[row, col]:
104 covered_cols[col] = True
105 break
107 __aop_step2b(assign, dist_mat, star_mat, prime_mat, covered_cols,
108 covered_rows, n_rows, n_cols, min_dim)
111def __aop_step2b(assign, dist_mat, star_mat, prime_mat, covered_cols,
112 covered_rows, n_rows, n_cols, min_dim):
113 n_covered_cols = 0
114 for v in covered_cols:
115 if v:
116 n_covered_cols += 1
118 if n_covered_cols == min_dim:
119 __aop_build_assign(assign, star_mat, n_rows, n_cols)
121 else:
122 __aop_step3(assign, dist_mat, star_mat,
123 prime_mat, covered_cols, covered_rows, n_rows,
124 n_cols, min_dim)
127def __aop_step3(assign, dist_mat, star_mat, prime_mat, covered_cols,
128 covered_rows, n_rows, n_cols, min_dim):
129 z_found = True
130 while z_found:
131 z_found = False
132 for col in range(0, n_cols):
133 if not covered_cols[col]:
134 for row in range(0, n_rows):
135 cond1 = not covered_rows[row]
136 cond2 = dist_mat[row, col] == 0
137 if cond1 and cond2:
138 prime_mat[row, col] = True
140 star_col = 0
141 for ii in range(0, n_cols):
142 if star_mat[row, ii]:
143 star_col = ii
144 break
145 else:
146 star_col = ii + 1
148 if star_col == n_cols:
149 __aop_step4(assign, dist_mat, star_mat,
150 prime_mat, covered_cols, covered_rows,
151 n_rows, n_cols, min_dim, row, col)
152 return
153 else:
154 covered_rows[row] = True
155 covered_cols[star_col] = False
156 z_found = True
157 break
159 __aop_step5(assign, dist_mat, star_mat, prime_mat, covered_cols,
160 covered_rows, n_rows, n_cols, min_dim)
163def __aop_step4(assign, dist_mat, star_mat, prime_mat, covered_cols,
164 covered_rows, n_rows, n_cols, min_dim, row, col):
165 new_star_mat = star_mat.copy()
167 new_star_mat[row, col] = True
168 star_col = col
169 star_row = 0
170 for ii in range(0, n_rows):
171 star_row = ii
172 if star_mat[ii, star_col]:
173 break
174 else:
175 star_row = ii + 1
178 while star_row < n_rows:
179 new_star_mat[star_row, star_col] = False
181 prime_row = star_row
182 prime_col = 0
183 for ii in range(0, n_cols):
184 prime_col = ii
185 if prime_mat[prime_row, ii]:
186 break
187 else:
188 prime_col = ii + 1
190 new_star_mat[prime_row, prime_col] = True
192 star_col = prime_col
193 for ii in range(0, n_rows):
194 if star_mat[ii, star_col]:
195 star_row = ii
196 break
197 else:
198 star_row = ii + 1
200 for row in range(0, n_rows):
201 for col in range(0, n_cols):
202 prime_mat[row, col] = False
203 star_mat[row, col] = new_star_mat[row, col]
205 for row in range(0, n_rows):
206 covered_rows[row] = False
208 __aop_step2a(assign, dist_mat, star_mat, prime_mat, covered_cols,
209 covered_rows, n_rows, n_cols, min_dim)
212def __aop_step5(assign, dist_mat, star_mat, prime_mat, covered_cols,
213 covered_rows, n_rows, n_cols, min_dim):
214 h = np.inf
215 for row in range(0, n_rows):
216 if not covered_rows[row]:
217 for col in range(0, n_cols):
218 if not covered_cols[col]:
219 v = dist_mat[row, col]
220 if v < h:
221 h = v
223 for row in range(0, n_rows):
224 if covered_rows[row]:
225 for col in range(0, n_cols):
226 dist_mat[row, col] += h
228 for col in range(0, n_cols):
229 if not covered_cols[col]:
230 for row in range(0, n_rows):
231 dist_mat[row, col] -= h
233 __aop_step3(assign, dist_mat, star_mat, prime_mat, covered_cols,
234 covered_rows, n_rows, n_cols, min_dim)
237def __aop_build_assign(assign, star_mat, n_rows, n_cols):
238 for row in range(0, n_rows):
239 for col in range(0, n_cols):
240 if star_mat[row, col]:
241 assign[row] = col
242 break
245def __aop_comp_assign(assign, dist_mat, n_rows):
246 cost = 0
247 for row in range(0, n_rows):
248 col = assign[row]
249 if col >= 0:
250 v = dist_mat[row, col]
251 if np.isfinite(v):
252 cost += v
253 else:
254 assign[row] = -1
255 return cost
258class AStarNode:
259 """
260 A node class for A* Pathfinding
261 parent is parent of the current Node
262 position is current position of the Node in the maze
263 g is cost from start to current Node
264 h is heuristic based estimated cost for current Node to end Node
265 f is total cost of present node i.e. : f = g + h
266 x and y are Cartesian coordinates of the node
267 """
269 def __init__(self, parent=None, position=None):
270 self.parent = parent
271 self.position = position
273 self.g = 0
274 self.h = 0
275 self.f = 0
276 self.x = 0
277 self.y = 0
279 def __eq__(self, other):
280 return self.position == other.position
283def astar_return_path(current_node, maze):
284 path = []
285 no_rows, no_columns = np.shape(maze)
287 # here we create the initialized result maze with -1 in every position
288 result = [[-1 for i in range(no_columns)] for j in range(no_rows)]
289 current = current_node
290 total_cost = []
291 while current is not None:
292 path.append(current.position)
293 total_cost.append(current.h)
294 current = current.parent
296 # Return reversed path as we need to show from start to end path
297 path = path[::-1]
298 start_value = 0
300 # we update the path of start to end found by A-star serch with every step
301 # incremented by 1
302 for i in range(len(path)):
303 result[path[i][0]][path[i][1]] = start_value
304 start_value += 1
306 inds = np.argwhere(np.array(result) >= 0)
307 vals = [result[r][c] for r, c in inds]
308 s_inds = np.argsort(vals)
309 vals = [vals[ii] for ii in s_inds]
311 return (tuple([(inds[ii][0], inds[ii][1]) for ii in s_inds]), max(total_cost))