Coverage for src/gncpy/filters/unscented_particle_filter.py: 92%

139 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-07-19 05:48 +0000

1import numpy as np 

2import scipy.stats as stats 

3from copy import deepcopy 

4from warnings import warn 

5 

6import gncpy.distributions as gdistrib 

7from gncpy.filters.mcmc_particle_filter_base import MCMCParticleFilterBase 

8from gncpy.filters.unscented_kalman_filter import UnscentedKalmanFilter 

9 

10class UnscentedParticleFilter(MCMCParticleFilterBase): 

11 """Implements an unscented particle filter. 

12 

13 Notes 

14 ----- 

15 For details on the filter see 

16 :cite:`VanDerMerwe2000_TheUnscentedParticleFilter` and 

17 :cite:`VanDerMerwe2001_TheUnscentedParticleFilter`. 

18 """ 

19 

20 require_copy_prop_parts = False 

21 

22 def __init__(self, **kwargs): 

23 self.candDist = None 

24 

25 self._filt = UnscentedKalmanFilter() 

26 

27 super().__init__(**kwargs) 

28 

29 def save_filter_state(self): 

30 """Saves filter variables so they can be restored later.""" 

31 filt_state = super().save_filter_state() 

32 

33 filt_state["candDist"] = deepcopy(self.candDist) 

34 filt_state["_filt"] = self._filt.save_filter_state() 

35 

36 return filt_state 

37 

38 def load_filter_state(self, filt_state): 

39 """Initializes filter using saved filter state. 

40 

41 Attributes 

42 ---------- 

43 filt_state : dict 

44 Dictionary generated by :meth:`save_filter_state`. 

45 """ 

46 super().load_filter_state(filt_state) 

47 

48 self.candDist = filt_state["candDist"] 

49 self._filt.load_filter_state(filt_state["_filt"]) 

50 

51 @property 

52 def meas_likelihood_fnc(self): 

53 r"""A function that returns the likelihood of the measurement. 

54 

55 This has the signature :code:`f(y, y_hat, *args)` where `y` is 

56 the measurement as an Nm x 1 numpy array, and `y_hat` is the estimated 

57 measurement. 

58 

59 Notes 

60 ----- 

61 This represents :math:`p(y_t \vert x_t)` in the importance 

62 weight 

63 

64 .. math:: 

65 

66 w_t \propto \frac{p(y_t \vert x_t) p(x_t \vert x_{t-1})}{q(x_t \vert x_{t-1}, y_t)} 

67 

68 and has the assumed form :math:`\mathcal{N}(y_t, R)` for measurement 

69 noise covariance :math:`R`. 

70 

71 Returns 

72 ------- 

73 callable 

74 function to return the measurement likelihood. 

75 """ 

76 return lambda y, y_hat: stats.multivariate_normal.pdf( 

77 y.ravel(), y_hat.ravel(), self._filt.meas_noise 

78 ) 

79 

80 @meas_likelihood_fnc.setter 

81 def meas_likelihood_fnc(self, val): 

82 warn("Measuremnet likelihood has an assumed form.") 

83 

84 @property 

85 def proposal_fnc(self): 

86 r"""A function that returns the probability for the proposal distribution. 

87 

88 This has the signature :code:`f(x_hat, x, y, *args)` where 

89 `x_hat` is a :class:`gncpy.distributions.Particle` of the estimated 

90 state, `x` is the particle it is conditioned on, and `y` is the 

91 measurement. 

92 

93 Notes 

94 ----- 

95 This represents :math:`q(x_t \vert x_{t-1}, y_t)` in the importance 

96 weight 

97 

98 .. math:: 

99 

100 w_t \propto \frac{p(y_t \vert x_t) p(x_t \vert x_{t-1})}{q(x_t \vert x_{t-1}, y_t)} 

101 

102 and has the assumed form :math:`\mathcal{N}(\bar{x}_{t}, \hat{P}_t)` 

103 

104 Returns 

105 ------- 

106 callable 

107 function to return the proposal probability. 

108 """ 

109 return lambda x_hat, part: stats.multivariate_normal.pdf( 

110 x_hat.ravel(), part.mean.ravel(), part.uncertainty 

111 ) 

112 

113 @proposal_fnc.setter 

114 def proposal_fnc(self, val): 

115 warn("Proposal distribution has an assumed form.") 

116 

117 @property 

118 def proposal_sampling_fnc(self): 

119 r"""A function that returns a random sample from the proposal distribtion. 

120 

121 This should be consistent with the PDF specified in the 

122 :meth:`gncpy.filters.ParticleFilter.proposal_fnc`. 

123 

124 Notes 

125 ----- 

126 This assumes :math:`x` is drawn from :math:`\mathcal{N}(\bar{x}_{t}, \hat{P}_t)` 

127 Returns 

128 ------- 

129 callable 

130 function to return a random sample. 

131 """ 

132 return lambda part: self.rng.multivariate_normal( 

133 part.mean.ravel(), part.uncertainty 

134 ).reshape(part.mean.shape) 

135 

136 @proposal_sampling_fnc.setter 

137 def proposal_sampling_fnc(self, val): 

138 warn("Proposal sampling has an assumed form.") 

139 

140 @property 

141 def transition_prob_fnc(self): 

142 r"""A function that returns the transition probability for the state. 

143 

144 This has the signature :code:`f(x_hat, x, cov)` where 

145 `x_hat` is an N x 1 numpy array representing the propagated state, and 

146 `x` is the state it is conditioned on. 

147 

148 Notes 

149 ----- 

150 This represents :math:`p(x_t \vert x_{t-1})` in the importance 

151 weight 

152 

153 .. math:: 

154 

155 w_t \propto \frac{p(y_t \vert x_t) p(x_t \vert x_{t-1})}{q(x_t \vert x_{t-1}, y_t)} 

156 

157 and has the assumed form :math:`\mathcal{N}(f(x_{t-1}), P_t)` for the 

158 covariance :math:`P_t`. 

159 

160 Returns 

161 ------- 

162 callable 

163 function to return the transition probability. 

164 """ 

165 return lambda x_hat, x, cov: stats.multivariate_normal.pdf( 

166 x_hat.ravel(), x.ravel(), cov 

167 ) 

168 

169 @transition_prob_fnc.setter 

170 def transition_prob_fnc(self, val): 

171 warn("Transistion distribution has an assumed form.") 

172 

173 # @property 

174 # def cov(self): 

175 # """Read only covariance of the particles. 

176 

177 # This is a weighted sum of each particles uncertainty. 

178 

179 # Returns 

180 # ------- 

181 # N x N numpy array 

182 # covariance matrix. 

183 

184 # """ 

185 # return gmath.weighted_sum_mat(self._particleDist.weights, 

186 # self._particleDist.uncertainties) 

187 

188 @property 

189 def meas_noise(self): 

190 """Measurement noise matrix. 

191 

192 This is a wrapper to keep the UPF measurement noise and the internal UKF 

193 measurement noise synced. 

194 

195 Returns 

196 ------- 

197 Nm x Nm numpy array 

198 measurement noise. 

199 """ 

200 return self._filt.meas_noise 

201 

202 @meas_noise.setter 

203 def meas_noise(self, meas_noise): 

204 self._filt.meas_noise = meas_noise 

205 

206 @property 

207 def proc_noise(self): 

208 """Process noise matrix. 

209 

210 This is a wrapper to keep the UPF process noise and the internal UKF 

211 process noise synced. 

212 

213 Returns 

214 ------- 

215 N x N numpy array 

216 process noise. 

217 """ 

218 return self._filt.proc_noise 

219 

220 @proc_noise.setter 

221 def proc_noise(self, proc_noise): 

222 self._filt.proc_noise = proc_noise 

223 

224 def set_state_model(self, **kwargs): 

225 """Sets the state model for the filter. 

226 

227 This calls the UKF's set state function 

228 (:meth:`gncpy.filters.UnscentedKalmanFilter.set_state_model`). 

229 """ 

230 self._filt.set_state_model(**kwargs) 

231 

232 def set_measurement_model(self, **kwargs): 

233 r"""Sets the measurement model for the filter. 

234 

235 This is a wrapper for the inner UKF's set_measurement model function. 

236 It is assumed that the measurement model is the same as that of the UKF. 

237 See :meth:`gncpy.filters.UnscentedKalmanFilter.set_measurement_model` 

238 for details. 

239 """ 

240 self._filt.set_measurement_model(**kwargs) 

241 

242 def _predict_loop(self, timestep, ukf_kwargs, dist): 

243 newDist = gdistrib.ParticleDistribution() 

244 num_parts = dist.num_particles 

245 new_parts = [None] * num_parts 

246 new_weights = [None] * num_parts 

247 for ii, (origPart, w) in enumerate(dist): 

248 part = gdistrib.Particle() 

249 self._filt.cov = origPart.uncertainty.copy() 

250 self._filt._stateSigmaPoints = deepcopy(origPart.sigmaPoints) 

251 part.point = self._filt.predict(timestep, origPart.point, **ukf_kwargs) 

252 part.uncertainty = self._filt.cov 

253 part.sigmaPoints = self._filt._stateSigmaPoints 

254 new_parts[ii] = part 

255 new_weights[ii] = w 

256 newDist.add_particle(new_parts, new_weights) 

257 return newDist 

258 

259 def predict(self, timestep, ukf_kwargs={}): 

260 """Prediction step of the UPF. 

261 

262 This calls the UKF prediction function on every particle. 

263 

264 Parameters 

265 ---------- 

266 timestep : float 

267 Current timestep. 

268 ukf_kwargs : dict, optional 

269 Additional arguments to pass to the UKF prediction function. The 

270 default is {}. 

271 

272 Returns 

273 ------- 

274 state : N x 1 numpy array 

275 The predicted state. 

276 """ 

277 self._particleDist = self._predict_loop( 

278 timestep, ukf_kwargs, self._particleDist 

279 ) 

280 if self.use_MCMC: 

281 if self.candDist is None: # first timestep 

282 self.candDist = deepcopy(self._particleDist) 

283 else: 

284 self.candDist = self._predict_loop(timestep, ukf_kwargs, self.candDist) 

285 return self._calc_state() 

286 

287 def _inner_correct(self, timestep, meas, state, filt_kwargs): 

288 """Wrapper so child class can override.""" 

289 return self._filt.correct(timestep, meas, state, **filt_kwargs) 

290 

291 def _est_meas(self, timestep, cur_state, n_meas, meas_fun_args): 

292 return self._filt._est_meas(timestep, cur_state, n_meas, meas_fun_args)[0] 

293 

294 def _correct_loop(self, timestep, meas, ukf_kwargs, dist): 

295 unnorm_weights = np.nan * np.ones(dist.num_particles) 

296 new_parts = [None] * dist.num_particles 

297 rel_likeli = [None] * dist.num_particles 

298 for ii, (p, w) in enumerate(dist): 

299 self._filt.cov = p.uncertainty.copy() 

300 self._filt._stateSigmaPoints = deepcopy(p.sigmaPoints) 

301 

302 # create a new particle and correct the predicted point with the measurement 

303 part = gdistrib.Particle() 

304 part.point, rel_likeli[ii] = self._inner_correct( 

305 timestep, meas, p.point, ukf_kwargs 

306 ) 

307 part.uncertainty = self._filt.cov 

308 

309 # draw a sample from the proposal distribution using the corrected point 

310 samp = self.proposal_sampling_fnc(part) 

311 

312 # transition probability of the sample given the predicted point 

313 trans_prob = self.transition_prob_fnc(samp, p.point, p.uncertainty) 

314 

315 # probability of the sampled value given the corrected value 

316 proposal_prob = self.proposal_fnc(samp, part) 

317 

318 # get new weight 

319 if proposal_prob < np.finfo(float).eps: 

320 proposal_prob = np.finfo(float).eps 

321 unnorm_weights[ii] = rel_likeli[ii] * trans_prob / proposal_prob 

322 

323 # update the new particle with the sampled point 

324 part.point = samp 

325 part.sigmaPoints = self._filt._stateSigmaPoints 

326 part.sigmaPoints.update_points(part.point, part.uncertainty) 

327 new_parts[ii] = part 

328 # update info for next UKF 

329 newDist = gdistrib.ParticleDistribution() 

330 tot = np.sum(unnorm_weights) 

331 # protect against divide by 0 

332 if tot < np.finfo(float).eps: 

333 tot = np.finfo(float).eps 

334 newDist.add_particle(new_parts, (unnorm_weights / tot).tolist()) 

335 

336 return newDist, rel_likeli, unnorm_weights 

337 

338 def correct(self, timestep, meas, ukf_kwargs={}, move_kwargs={}): 

339 """Correction step of the UPF. 

340 

341 This optionally can perform a MCMC move step as well. 

342 

343 Parameters 

344 ---------- 

345 timestep : float 

346 Current timestep. 

347 meas : Nm x 1 numpy array 

348 measurement. 

349 ukf_kwargs : dict, optional 

350 Additional arguments to pass to the UKF correction function. The 

351 default is {}. 

352 move_kwargs : dict, optional 

353 Additional arguments to pass to the movement function. 

354 The default is {}. 

355 

356 Returns 

357 ------- 

358 state : N x 1 numpy array 

359 corrected state. 

360 rel_likeli : list 

361 each element is a float representing the relative likelihood of the 

362 particles (unnormalized). 

363 inds_removed : list 

364 each element is an int representing the index of any particles 

365 that were removed during the selection process. 

366 """ 

367 # if first timestep and have not called predict yet 

368 if self.use_MCMC and self.candDist is None: 

369 self.candDist = deepcopy(self._particleDist) 

370 # call UKF correction on each particle 

371 (self._particleDist, rel_likeli, unnorm_weights) = self._correct_loop( 

372 timestep, meas, ukf_kwargs, self._particleDist 

373 ) 

374 if self.use_MCMC: 

375 (self.candDist, can_rel_likeli, can_unnorm_weights) = self._correct_loop( 

376 timestep, meas, ukf_kwargs, self.candDist 

377 ) 

378 # perform selection/resampling 

379 (inds_removed, old_weights, rel_likeli) = self._selection( 

380 unnorm_weights, rel_likeli_in=rel_likeli 

381 ) 

382 

383 # optionally move particles 

384 if self.use_MCMC: 

385 rel_likeli = self.move_particles( 

386 timestep, 

387 meas, 

388 old_weights, 

389 rel_likeli, 

390 can_unnorm_weights, 

391 can_rel_likeli, 

392 **move_kwargs 

393 ) 

394 return (self._calc_state(), rel_likeli, inds_removed) 

395 

396 def move_particles( 

397 self, timestep, meas, old_weights, old_likeli, can_weight, can_likeli 

398 ): 

399 r"""Movement function for the MCMC move step. 

400 

401 This modifies the internal particle distribution but does not return a 

402 value. 

403 

404 Notes 

405 ----- 

406 This implements a metropolis-hastings algorithm. 

407 

408 Parameters 

409 ---------- 

410 timestep : float 

411 Current timestep. 

412 meas : Nm x 1 numpy array 

413 measurement. 

414 old_weights : :class:`gncpy.distributions.ParticleDistribution` 

415 Distribution before the measurement correction. 

416 

417 Returns 

418 ------- 

419 None. 

420 """ 

421 accept_prob = self.rng.random() 

422 num_parts = self._particleDist.num_particles 

423 new_parts = [None] * num_parts 

424 new_likeli = [None] * num_parts 

425 for ii, (can, exp, ex_weight, can_weight, ex_like, can_like) in enumerate( 

426 zip( 

427 self.candDist, 

428 self._particleDist, 

429 old_weights, 

430 can_weight, 

431 old_likeli, 

432 can_likeli, 

433 ) 

434 ): 

435 

436 if accept_prob <= np.min([1, can_weight / ex_weight]): 

437 # accept move 

438 new_parts[ii] = deepcopy(can[0]) 

439 new_likeli[ii] = can_like 

440 else: 

441 # reject move 

442 new_parts[ii] = exp[0] 

443 new_likeli[ii] = ex_like 

444 self._particleDist.clear_particles() 

445 self._particleDist.add_particle(new_parts, [1 / num_parts] * num_parts) 

446 

447 return new_likeli