Coverage for src/carbs/utilities/sampling.py: 38%

80 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-15 06:48 +0000

1"""Implements general sampling methods.""" 

2import numpy as np 

3 

4 

5def gibbs(in_costs, num_iters, rng=None): 

6 """Implements a Gibbs sampler. 

7 

8 Notes 

9 ----- 

10 Computationally efficient form of the Metropolis Hastings Markov Chain 

11 Monte Carlo algorithm, useful when direct sampling of the target 

12 multivariate distribution is difficult. Generates samples by selecting 

13 from a histogram constructed from the distribution and randomly sampling 

14 from the most likely bins in the histogram. This is based on the sampler in 

15 :cite:`Vo2017_AnEfficientImplementationoftheGeneralizedLabeledMultiBernoulliFilter`. 

16 

17 Parameters 

18 ---------- 

19 in_costs : N x M numpy array 

20 Cost matrix. 

21 num_iters : int 

22 Number of iterations to run. 

23 rng : numpy random generator, optional 

24 Random number generator to be used when sampling. The default is None 

25 which implies :code:`default_rng()` 

26 

27 Returns 

28 ------- 

29 assignments : M (max size) x N numpy array 

30 The unique entries from the sampling. 

31 costs : M x 1 numpy array 

32 Cost of each assignment. 

33 """ 

34 if rng is None: 

35 rng = np.random.default_rng() 

36 

37 # Determine size of cost matrix 

38 cost_size = np.shape(in_costs) 

39 

40 # Initialize assignment and cost matrices 

41 assignments = np.zeros((num_iters, cost_size[0])) 

42 costs = np.zeros((num_iters, 1)) 

43 

44 # Initialize current solution 

45 cur_soln = np.arange(cost_size[0], 2 * cost_size[0]) 

46 

47 assignments[0] = cur_soln 

48 rows = np.arange(0, cost_size[0]).astype(int) 

49 costs[0] = np.sum(in_costs[rows, cur_soln]) 

50 

51 # Loop over all possible assignments and determine costs 

52 mask = np.ones(len(cur_soln), dtype=bool) 

53 for sol in range(1, num_iters): 

54 for var in range(0, cost_size[0]): 

55 if var > 0: 

56 mask[var - 1] = True 

57 mask[var] = False 

58 temp_samp = np.exp(-in_costs[var]) 

59 temp_samp[cur_soln[mask]] = 0 

60 

61 hist_in_array = np.zeros(temp_samp[temp_samp > 0].size + 1) 

62 csum = np.cumsum(temp_samp[temp_samp > 0].ravel()) 

63 hist_in_array[1:] = csum / csum[-1] 

64 

65 cur_soln[var] = np.digitize(rng.uniform(size=(1,1)), hist_in_array) - 1 

66 if np.any(temp_samp > 0): 

67 cur_soln[var] = np.nonzero(temp_samp > 0)[0][cur_soln[var]] 

68 

69 mask[-1] = True 

70 assignments[sol] = cur_soln 

71 costs[sol] = np.sum(in_costs[rows, cur_soln]) 

72 

73 [assignments, I] = np.unique(assignments, return_index=True, axis=0) 

74 

75 return assignments, costs[I] 

76 

77# TODO: Multisensor Gibbs potentially 

78def mm_gibbs(in_costs, num_iters, rng=None): 

79 """Implements a minimally-Markovian Gibbs sampler. 

80 

81 Notes 

82 ----- 

83 Computationally efficient form of the Metropolis Hastings Markov Chain 

84 Monte Carlo algorithm, useful when direct sampling of the target 

85 multivariate distribution is difficult. Generates samples by selecting 

86 from a histogram constructed from the distribution and randomly sampling 

87 from the most likely bins in the histogram. This is based on the sampler in 

88 :cite:`Vo2017_AnEfficientImplementationoftheGeneralizedLabeledMultiBernoulliFilter`. 

89 

90 Parameters 

91 ---------- 

92 in_costs : S x N x M numpy array 

93 Cost matrix. 

94 num_iters : int 

95 Number of iterations to run. 

96 rng : numpy random generator, optional 

97 Random number generator to be used when sampling. The default is None 

98 which implies :code:`default_rng()` 

99 

100 Returns 

101 ------- 

102 assignments : M (max size) x N x S numpy array 

103 The unique entries from the sampling. 

104 costs : M x S numpy array 

105 Cost of each assignment for each sensor. 

106 """ 

107 

108 if rng is None: 

109 rng = np.random.default_rng() 

110 

111 # Determine size of cost matrix 

112 cost_size = np.shape(in_costs) 

113 

114 # Initialize assignment and cost matrices 

115 #assignments is iterations x sensors x solution 

116 assignments = np.zeros((num_iters, cost_size[0], cost_size[1])) 

117 costs = np.zeros((num_iters, 1)) 

118 P_n = [] 

119 Q_n = [] 

120 #TODO: calculation for P_n and Q_n 

121 for var in range(0, cost_size[1]): 

122 prod_gamma_n = 1 

123 prod_v_n = 1 

124 for s in range(0, cost_size[0]): 

125 gamma_n = np.sum(in_costs[s, var][in_costs[s, var] != -1]) #third index needs to represent -1 vs non neg 

126 prod_gamma_n = prod_gamma_n*gamma_n 

127 prod_v_n = prod_v_n * np.sum(in_costs[s, var][in_costs[s, var] == -1]) # index where in_costs are -1 

128 P_n.append(prod_gamma_n/(prod_v_n + prod_gamma_n)) 

129 Q_n.append(1 - P_n[var]) 

130 

131 # Initialize current solution 

132 costsum = 0 

133 rows = np.arange(0, cost_size[1]).astype(int) 

134 cur_soln = np.zeros((cost_size[0], len(np.arange(cost_size[1], 2*cost_size[1]))),dtype=int) 

135 for s in range(0, cost_size[0]): 

136 cur_soln[s, :] = np.arange(cost_size[1], 2 * cost_size[1]) 

137 costsum += np.sum(in_costs[s, rows, np.arange(cost_size[1], 2 * cost_size[1]).astype(int)]) 

138 

139 assignments[0, :, :] = cur_soln 

140 costs[0] = costsum 

141 

142 # Loop over all possible assignments and determine costs 

143 mask = np.ones(np.shape(cur_soln), dtype=bool) 

144 for sol in range(1, num_iters): 

145 costsum = 0 

146 for var in range(0, cost_size[1]): 

147 #TODO: write first categorical 

148 # sample markovian stationary distribution, 

149 # if greater than some probability as calculated above by pn, 

150 # then perform mm-gibbs, otherwise all values are set to -1. 

151 # 

152 bins = np.array([0.0, Q_n[var], 1.0]) 

153 i_n = np.digitize(rng.uniform(size=(1, 1)), bins) - 1 

154 "i_n = Categorical ('+', '-', ), [Pn(Lambda(1:num_sensors)), Qn(Lambda(1:num_sensors)]" 

155 if i_n > 0: # i_n = "+" 

156 

157 for s in range(0, cost_size[0]): 

158 if var > 0: 

159 mask[s, var - 1] = True 

160 mask[s, var] = False 

161 temp_samp = np.exp(-in_costs[:, var, :]) 

162 temp_samp[cur_soln[mask]] = 0 

163 for s in range(0, cost_size[0]): 

164 hist_in_array = np.zeros(temp_samp[s][temp_samp[s] > 0].size + 1) 

165 csum = np.cumsum(temp_samp[s][temp_samp[s] > 0].ravel()) 

166 hist_in_array[1:] = csum / csum[-1] 

167 cur_soln[s, var] = np.digitize(rng.uniform(size=(1, 1)), hist_in_array) - 1 

168 if np.any(temp_samp[s] > 0): 

169 cur_soln[s, var] = np.nonzero(temp_samp > 0)[0][cur_soln[s, var]] 

170 

171 else: # i_n = "-" 

172 cur_soln[:, var] = -1 * np.ones(1, cost_size[0]) 

173 

174 assignments[sol, :, :] = cur_soln 

175 costs[sol] = np.sum(in_costs[:, rows, cur_soln]) #some variant of this 

176 

177 

178 [assignments, I] = np.unique(assignments, return_index=True, axis=0) 

179 

180 return assignments, costs[I] 

181