Coverage for src/carbs/utilities/sampling.py: 38%
80 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-15 06:48 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-15 06:48 +0000
1"""Implements general sampling methods."""
2import numpy as np
5def gibbs(in_costs, num_iters, rng=None):
6 """Implements a Gibbs sampler.
8 Notes
9 -----
10 Computationally efficient form of the Metropolis Hastings Markov Chain
11 Monte Carlo algorithm, useful when direct sampling of the target
12 multivariate distribution is difficult. Generates samples by selecting
13 from a histogram constructed from the distribution and randomly sampling
14 from the most likely bins in the histogram. This is based on the sampler in
15 :cite:`Vo2017_AnEfficientImplementationoftheGeneralizedLabeledMultiBernoulliFilter`.
17 Parameters
18 ----------
19 in_costs : N x M numpy array
20 Cost matrix.
21 num_iters : int
22 Number of iterations to run.
23 rng : numpy random generator, optional
24 Random number generator to be used when sampling. The default is None
25 which implies :code:`default_rng()`
27 Returns
28 -------
29 assignments : M (max size) x N numpy array
30 The unique entries from the sampling.
31 costs : M x 1 numpy array
32 Cost of each assignment.
33 """
34 if rng is None:
35 rng = np.random.default_rng()
37 # Determine size of cost matrix
38 cost_size = np.shape(in_costs)
40 # Initialize assignment and cost matrices
41 assignments = np.zeros((num_iters, cost_size[0]))
42 costs = np.zeros((num_iters, 1))
44 # Initialize current solution
45 cur_soln = np.arange(cost_size[0], 2 * cost_size[0])
47 assignments[0] = cur_soln
48 rows = np.arange(0, cost_size[0]).astype(int)
49 costs[0] = np.sum(in_costs[rows, cur_soln])
51 # Loop over all possible assignments and determine costs
52 mask = np.ones(len(cur_soln), dtype=bool)
53 for sol in range(1, num_iters):
54 for var in range(0, cost_size[0]):
55 if var > 0:
56 mask[var - 1] = True
57 mask[var] = False
58 temp_samp = np.exp(-in_costs[var])
59 temp_samp[cur_soln[mask]] = 0
61 hist_in_array = np.zeros(temp_samp[temp_samp > 0].size + 1)
62 csum = np.cumsum(temp_samp[temp_samp > 0].ravel())
63 hist_in_array[1:] = csum / csum[-1]
65 cur_soln[var] = np.digitize(rng.uniform(size=(1,1)), hist_in_array) - 1
66 if np.any(temp_samp > 0):
67 cur_soln[var] = np.nonzero(temp_samp > 0)[0][cur_soln[var]]
69 mask[-1] = True
70 assignments[sol] = cur_soln
71 costs[sol] = np.sum(in_costs[rows, cur_soln])
73 [assignments, I] = np.unique(assignments, return_index=True, axis=0)
75 return assignments, costs[I]
77# TODO: Multisensor Gibbs potentially
78def mm_gibbs(in_costs, num_iters, rng=None):
79 """Implements a minimally-Markovian Gibbs sampler.
81 Notes
82 -----
83 Computationally efficient form of the Metropolis Hastings Markov Chain
84 Monte Carlo algorithm, useful when direct sampling of the target
85 multivariate distribution is difficult. Generates samples by selecting
86 from a histogram constructed from the distribution and randomly sampling
87 from the most likely bins in the histogram. This is based on the sampler in
88 :cite:`Vo2017_AnEfficientImplementationoftheGeneralizedLabeledMultiBernoulliFilter`.
90 Parameters
91 ----------
92 in_costs : S x N x M numpy array
93 Cost matrix.
94 num_iters : int
95 Number of iterations to run.
96 rng : numpy random generator, optional
97 Random number generator to be used when sampling. The default is None
98 which implies :code:`default_rng()`
100 Returns
101 -------
102 assignments : M (max size) x N x S numpy array
103 The unique entries from the sampling.
104 costs : M x S numpy array
105 Cost of each assignment for each sensor.
106 """
108 if rng is None:
109 rng = np.random.default_rng()
111 # Determine size of cost matrix
112 cost_size = np.shape(in_costs)
114 # Initialize assignment and cost matrices
115 #assignments is iterations x sensors x solution
116 assignments = np.zeros((num_iters, cost_size[0], cost_size[1]))
117 costs = np.zeros((num_iters, 1))
118 P_n = []
119 Q_n = []
120 #TODO: calculation for P_n and Q_n
121 for var in range(0, cost_size[1]):
122 prod_gamma_n = 1
123 prod_v_n = 1
124 for s in range(0, cost_size[0]):
125 gamma_n = np.sum(in_costs[s, var][in_costs[s, var] != -1]) #third index needs to represent -1 vs non neg
126 prod_gamma_n = prod_gamma_n*gamma_n
127 prod_v_n = prod_v_n * np.sum(in_costs[s, var][in_costs[s, var] == -1]) # index where in_costs are -1
128 P_n.append(prod_gamma_n/(prod_v_n + prod_gamma_n))
129 Q_n.append(1 - P_n[var])
131 # Initialize current solution
132 costsum = 0
133 rows = np.arange(0, cost_size[1]).astype(int)
134 cur_soln = np.zeros((cost_size[0], len(np.arange(cost_size[1], 2*cost_size[1]))),dtype=int)
135 for s in range(0, cost_size[0]):
136 cur_soln[s, :] = np.arange(cost_size[1], 2 * cost_size[1])
137 costsum += np.sum(in_costs[s, rows, np.arange(cost_size[1], 2 * cost_size[1]).astype(int)])
139 assignments[0, :, :] = cur_soln
140 costs[0] = costsum
142 # Loop over all possible assignments and determine costs
143 mask = np.ones(np.shape(cur_soln), dtype=bool)
144 for sol in range(1, num_iters):
145 costsum = 0
146 for var in range(0, cost_size[1]):
147 #TODO: write first categorical
148 # sample markovian stationary distribution,
149 # if greater than some probability as calculated above by pn,
150 # then perform mm-gibbs, otherwise all values are set to -1.
151 #
152 bins = np.array([0.0, Q_n[var], 1.0])
153 i_n = np.digitize(rng.uniform(size=(1, 1)), bins) - 1
154 "i_n = Categorical ('+', '-', ), [Pn(Lambda(1:num_sensors)), Qn(Lambda(1:num_sensors)]"
155 if i_n > 0: # i_n = "+"
157 for s in range(0, cost_size[0]):
158 if var > 0:
159 mask[s, var - 1] = True
160 mask[s, var] = False
161 temp_samp = np.exp(-in_costs[:, var, :])
162 temp_samp[cur_soln[mask]] = 0
163 for s in range(0, cost_size[0]):
164 hist_in_array = np.zeros(temp_samp[s][temp_samp[s] > 0].size + 1)
165 csum = np.cumsum(temp_samp[s][temp_samp[s] > 0].ravel())
166 hist_in_array[1:] = csum / csum[-1]
167 cur_soln[s, var] = np.digitize(rng.uniform(size=(1, 1)), hist_in_array) - 1
168 if np.any(temp_samp[s] > 0):
169 cur_soln[s, var] = np.nonzero(temp_samp > 0)[0][cur_soln[s, var]]
171 else: # i_n = "-"
172 cur_soln[:, var] = -1 * np.ones(1, cost_size[0])
174 assignments[sol, :, :] = cur_soln
175 costs[sol] = np.sum(in_costs[:, rows, cur_soln]) #some variant of this
178 [assignments, I] = np.unique(assignments, return_index=True, axis=0)
180 return assignments, costs[I]