Coverage for src/carbs/utilities/graphs_subroutines.py: 92%

190 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-15 06:48 +0000

1""" Implements helper functions for graph search algorithms. 

2 

3This module contains the longer helper functions for the graph search 

4algorithms. 

5""" 

6import numpy as np 

7 

8 

9def assign_opt(dist_mat_in): 

10 """ Optimal assignment used by Murty's m-best assignment algorithm. 

11 

12 This ports the implementation from 

13 `here <https://ba-tuong.vo-au.com/codes.html>`_. 

14 to python. 

15 

16 Args: 

17 dist_mat_in (numpy array): Distance matrix to calculate assignment from 

18 

19 Returns: 

20 tuple containing 

21 

22 - assign (numpy array): Optimal assignment 

23 - cost (float): Cost of the assignment 

24 """ 

25 

26 dist_mat = dist_mat_in.copy() 

27 if len(dist_mat.shape) == 1: 

28 dist_mat = dist_mat.reshape((1, dist_mat.shape[0])) 

29 elif len(dist_mat.shape) > 2: 

30 raise RuntimeError('Distance matrix must have at most 2 dimensions') 

31 dist_mat_orig = dist_mat.copy() 

32 

33 (n_rows, n_cols) = dist_mat.shape 

34 assign = -np.ones(n_rows, dtype=int) 

35 if (np.isfinite(dist_mat) & (dist_mat < 0)).any(): 

36 raise RuntimeError('All matrix values must be non-negative') 

37 

38 if np.isinf(dist_mat).all(): 

39 return (assign, 0) 

40 

41 # replace any infite values with a large number 

42 if np.isinf(dist_mat).any(): 

43 max_finite = dist_mat[np.nonzero(np.isfinite(dist_mat))] 

44 if len(max_finite) == 0: 

45 return (assign, 0) 

46 

47 max_finite = max_finite.max() 

48 if max_finite > 0: 

49 inf_val = 10 * max_finite * n_rows * n_cols 

50 else: 

51 inf_val = 10 

52 dist_mat[np.nonzero(np.isinf(dist_mat))] = inf_val 

53 

54 covered_cols = [False] * n_cols 

55 covered_rows = [False] * n_rows 

56 star_mat = np.zeros((n_rows, n_cols), dtype=bool) 

57 prime_mat = np.zeros((n_rows, n_cols), dtype=bool) 

58 

59 if n_rows <= n_cols: 

60 min_dim = n_rows 

61 

62 # find min of each row and subract from each element in the row 

63 dist_mat = dist_mat - dist_mat.min(axis=1).reshape((n_rows, 1)) 

64 

65 # steps 1 and 2a 

66 for row in range(0, n_rows): 

67 for col in range(0, n_cols): 

68 if dist_mat[row, col] == 0: 

69 if not covered_cols[col]: 

70 star_mat[row, col] = True 

71 covered_cols[col] = True 

72 break 

73 else: 

74 min_dim = n_cols 

75 

76 # find min of each column and subract from each element in the column 

77 dist_mat = dist_mat - dist_mat.min(axis=0) 

78 

79 # steps 1 and 2a 

80 for col in range(0, n_cols): 

81 for row in range(0, n_rows): 

82 if dist_mat[row, col].squeeze() == 0: 

83 if not covered_rows[row]: 

84 star_mat[row, col] = True 

85 covered_cols[col] = True 

86 covered_rows[row] = True 

87 break 

88 

89 for row in range(0, n_rows): 

90 covered_rows[row] = False 

91 

92 __aop_step2b(assign, dist_mat, star_mat, prime_mat, covered_cols, 

93 covered_rows, n_rows, n_cols, min_dim) 

94 cost = __aop_comp_assign(assign, dist_mat_orig, n_rows) 

95 

96 return (assign, cost) 

97 

98 

99def __aop_step2a(assign, dist_mat, star_mat, prime_mat, covered_cols, 

100 covered_rows, n_rows, n_cols, min_dim): 

101 for col in range(0, n_cols): 

102 for row in range(0, n_rows): 

103 if star_mat[row, col]: 

104 covered_cols[col] = True 

105 break 

106 

107 __aop_step2b(assign, dist_mat, star_mat, prime_mat, covered_cols, 

108 covered_rows, n_rows, n_cols, min_dim) 

109 

110 

111def __aop_step2b(assign, dist_mat, star_mat, prime_mat, covered_cols, 

112 covered_rows, n_rows, n_cols, min_dim): 

113 n_covered_cols = 0 

114 for v in covered_cols: 

115 if v: 

116 n_covered_cols += 1 

117 

118 if n_covered_cols == min_dim: 

119 __aop_build_assign(assign, star_mat, n_rows, n_cols) 

120 

121 else: 

122 __aop_step3(assign, dist_mat, star_mat, 

123 prime_mat, covered_cols, covered_rows, n_rows, 

124 n_cols, min_dim) 

125 

126 

127def __aop_step3(assign, dist_mat, star_mat, prime_mat, covered_cols, 

128 covered_rows, n_rows, n_cols, min_dim): 

129 z_found = True 

130 while z_found: 

131 z_found = False 

132 for col in range(0, n_cols): 

133 if not covered_cols[col]: 

134 for row in range(0, n_rows): 

135 cond1 = not covered_rows[row] 

136 cond2 = dist_mat[row, col] == 0 

137 if cond1 and cond2: 

138 prime_mat[row, col] = True 

139 

140 star_col = 0 

141 for ii in range(0, n_cols): 

142 if star_mat[row, ii]: 

143 star_col = ii 

144 break 

145 else: 

146 star_col = ii + 1 

147 

148 if star_col == n_cols: 

149 __aop_step4(assign, dist_mat, star_mat, 

150 prime_mat, covered_cols, covered_rows, 

151 n_rows, n_cols, min_dim, row, col) 

152 return 

153 else: 

154 covered_rows[row] = True 

155 covered_cols[star_col] = False 

156 z_found = True 

157 break 

158 

159 __aop_step5(assign, dist_mat, star_mat, prime_mat, covered_cols, 

160 covered_rows, n_rows, n_cols, min_dim) 

161 

162 

163def __aop_step4(assign, dist_mat, star_mat, prime_mat, covered_cols, 

164 covered_rows, n_rows, n_cols, min_dim, row, col): 

165 new_star_mat = star_mat.copy() 

166 

167 new_star_mat[row, col] = True 

168 star_col = col 

169 star_row = 0 

170 for ii in range(0, n_rows): 

171 star_row = ii 

172 if star_mat[ii, star_col]: 

173 break 

174 else: 

175 star_row = ii + 1 

176 

177 

178 while star_row < n_rows: 

179 new_star_mat[star_row, star_col] = False 

180 

181 prime_row = star_row 

182 prime_col = 0 

183 for ii in range(0, n_cols): 

184 prime_col = ii 

185 if prime_mat[prime_row, ii]: 

186 break 

187 else: 

188 prime_col = ii + 1 

189 

190 new_star_mat[prime_row, prime_col] = True 

191 

192 star_col = prime_col 

193 for ii in range(0, n_rows): 

194 if star_mat[ii, star_col]: 

195 star_row = ii 

196 break 

197 else: 

198 star_row = ii + 1 

199 

200 for row in range(0, n_rows): 

201 for col in range(0, n_cols): 

202 prime_mat[row, col] = False 

203 star_mat[row, col] = new_star_mat[row, col] 

204 

205 for row in range(0, n_rows): 

206 covered_rows[row] = False 

207 

208 __aop_step2a(assign, dist_mat, star_mat, prime_mat, covered_cols, 

209 covered_rows, n_rows, n_cols, min_dim) 

210 

211 

212def __aop_step5(assign, dist_mat, star_mat, prime_mat, covered_cols, 

213 covered_rows, n_rows, n_cols, min_dim): 

214 h = np.inf 

215 for row in range(0, n_rows): 

216 if not covered_rows[row]: 

217 for col in range(0, n_cols): 

218 if not covered_cols[col]: 

219 v = dist_mat[row, col] 

220 if v < h: 

221 h = v 

222 

223 for row in range(0, n_rows): 

224 if covered_rows[row]: 

225 for col in range(0, n_cols): 

226 dist_mat[row, col] += h 

227 

228 for col in range(0, n_cols): 

229 if not covered_cols[col]: 

230 for row in range(0, n_rows): 

231 dist_mat[row, col] -= h 

232 

233 __aop_step3(assign, dist_mat, star_mat, prime_mat, covered_cols, 

234 covered_rows, n_rows, n_cols, min_dim) 

235 

236 

237def __aop_build_assign(assign, star_mat, n_rows, n_cols): 

238 for row in range(0, n_rows): 

239 for col in range(0, n_cols): 

240 if star_mat[row, col]: 

241 assign[row] = col 

242 break 

243 

244 

245def __aop_comp_assign(assign, dist_mat, n_rows): 

246 cost = 0 

247 for row in range(0, n_rows): 

248 col = assign[row] 

249 if col >= 0: 

250 v = dist_mat[row, col] 

251 if np.isfinite(v): 

252 cost += v 

253 else: 

254 assign[row] = -1 

255 return cost 

256 

257 

258class AStarNode: 

259 """ 

260 A node class for A* Pathfinding 

261 parent is parent of the current Node 

262 position is current position of the Node in the maze 

263 g is cost from start to current Node 

264 h is heuristic based estimated cost for current Node to end Node 

265 f is total cost of present node i.e. : f = g + h 

266 x and y are Cartesian coordinates of the node 

267 """ 

268 

269 def __init__(self, parent=None, position=None): 

270 self.parent = parent 

271 self.position = position 

272 

273 self.g = 0 

274 self.h = 0 

275 self.f = 0 

276 self.x = 0 

277 self.y = 0 

278 

279 def __eq__(self, other): 

280 return self.position == other.position 

281 

282 

283def astar_return_path(current_node, maze): 

284 path = [] 

285 no_rows, no_columns = np.shape(maze) 

286 

287 # here we create the initialized result maze with -1 in every position 

288 result = [[-1 for i in range(no_columns)] for j in range(no_rows)] 

289 current = current_node 

290 total_cost = [] 

291 while current is not None: 

292 path.append(current.position) 

293 total_cost.append(current.h) 

294 current = current.parent 

295 

296 # Return reversed path as we need to show from start to end path 

297 path = path[::-1] 

298 start_value = 0 

299 

300 # we update the path of start to end found by A-star serch with every step 

301 # incremented by 1 

302 for i in range(len(path)): 

303 result[path[i][0]][path[i][1]] = start_value 

304 start_value += 1 

305 

306 inds = np.argwhere(np.array(result) >= 0) 

307 vals = [result[r][c] for r, c in inds] 

308 s_inds = np.argsort(vals) 

309 vals = [vals[ii] for ii in s_inds] 

310 

311 return (tuple([(inds[ii][0], inds[ii][1]) for ii in s_inds]), max(total_cost))