Coverage for src/gncpy/filters/unscented_particle_filter.py: 92%
139 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-19 05:48 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-07-19 05:48 +0000
1import numpy as np
2import scipy.stats as stats
3from copy import deepcopy
4from warnings import warn
6import gncpy.distributions as gdistrib
7from gncpy.filters.mcmc_particle_filter_base import MCMCParticleFilterBase
8from gncpy.filters.unscented_kalman_filter import UnscentedKalmanFilter
10class UnscentedParticleFilter(MCMCParticleFilterBase):
11 """Implements an unscented particle filter.
13 Notes
14 -----
15 For details on the filter see
16 :cite:`VanDerMerwe2000_TheUnscentedParticleFilter` and
17 :cite:`VanDerMerwe2001_TheUnscentedParticleFilter`.
18 """
20 require_copy_prop_parts = False
22 def __init__(self, **kwargs):
23 self.candDist = None
25 self._filt = UnscentedKalmanFilter()
27 super().__init__(**kwargs)
29 def save_filter_state(self):
30 """Saves filter variables so they can be restored later."""
31 filt_state = super().save_filter_state()
33 filt_state["candDist"] = deepcopy(self.candDist)
34 filt_state["_filt"] = self._filt.save_filter_state()
36 return filt_state
38 def load_filter_state(self, filt_state):
39 """Initializes filter using saved filter state.
41 Attributes
42 ----------
43 filt_state : dict
44 Dictionary generated by :meth:`save_filter_state`.
45 """
46 super().load_filter_state(filt_state)
48 self.candDist = filt_state["candDist"]
49 self._filt.load_filter_state(filt_state["_filt"])
51 @property
52 def meas_likelihood_fnc(self):
53 r"""A function that returns the likelihood of the measurement.
55 This has the signature :code:`f(y, y_hat, *args)` where `y` is
56 the measurement as an Nm x 1 numpy array, and `y_hat` is the estimated
57 measurement.
59 Notes
60 -----
61 This represents :math:`p(y_t \vert x_t)` in the importance
62 weight
64 .. math::
66 w_t \propto \frac{p(y_t \vert x_t) p(x_t \vert x_{t-1})}{q(x_t \vert x_{t-1}, y_t)}
68 and has the assumed form :math:`\mathcal{N}(y_t, R)` for measurement
69 noise covariance :math:`R`.
71 Returns
72 -------
73 callable
74 function to return the measurement likelihood.
75 """
76 return lambda y, y_hat: stats.multivariate_normal.pdf(
77 y.ravel(), y_hat.ravel(), self._filt.meas_noise
78 )
80 @meas_likelihood_fnc.setter
81 def meas_likelihood_fnc(self, val):
82 warn("Measuremnet likelihood has an assumed form.")
84 @property
85 def proposal_fnc(self):
86 r"""A function that returns the probability for the proposal distribution.
88 This has the signature :code:`f(x_hat, x, y, *args)` where
89 `x_hat` is a :class:`gncpy.distributions.Particle` of the estimated
90 state, `x` is the particle it is conditioned on, and `y` is the
91 measurement.
93 Notes
94 -----
95 This represents :math:`q(x_t \vert x_{t-1}, y_t)` in the importance
96 weight
98 .. math::
100 w_t \propto \frac{p(y_t \vert x_t) p(x_t \vert x_{t-1})}{q(x_t \vert x_{t-1}, y_t)}
102 and has the assumed form :math:`\mathcal{N}(\bar{x}_{t}, \hat{P}_t)`
104 Returns
105 -------
106 callable
107 function to return the proposal probability.
108 """
109 return lambda x_hat, part: stats.multivariate_normal.pdf(
110 x_hat.ravel(), part.mean.ravel(), part.uncertainty
111 )
113 @proposal_fnc.setter
114 def proposal_fnc(self, val):
115 warn("Proposal distribution has an assumed form.")
117 @property
118 def proposal_sampling_fnc(self):
119 r"""A function that returns a random sample from the proposal distribtion.
121 This should be consistent with the PDF specified in the
122 :meth:`gncpy.filters.ParticleFilter.proposal_fnc`.
124 Notes
125 -----
126 This assumes :math:`x` is drawn from :math:`\mathcal{N}(\bar{x}_{t}, \hat{P}_t)`
127 Returns
128 -------
129 callable
130 function to return a random sample.
131 """
132 return lambda part: self.rng.multivariate_normal(
133 part.mean.ravel(), part.uncertainty
134 ).reshape(part.mean.shape)
136 @proposal_sampling_fnc.setter
137 def proposal_sampling_fnc(self, val):
138 warn("Proposal sampling has an assumed form.")
140 @property
141 def transition_prob_fnc(self):
142 r"""A function that returns the transition probability for the state.
144 This has the signature :code:`f(x_hat, x, cov)` where
145 `x_hat` is an N x 1 numpy array representing the propagated state, and
146 `x` is the state it is conditioned on.
148 Notes
149 -----
150 This represents :math:`p(x_t \vert x_{t-1})` in the importance
151 weight
153 .. math::
155 w_t \propto \frac{p(y_t \vert x_t) p(x_t \vert x_{t-1})}{q(x_t \vert x_{t-1}, y_t)}
157 and has the assumed form :math:`\mathcal{N}(f(x_{t-1}), P_t)` for the
158 covariance :math:`P_t`.
160 Returns
161 -------
162 callable
163 function to return the transition probability.
164 """
165 return lambda x_hat, x, cov: stats.multivariate_normal.pdf(
166 x_hat.ravel(), x.ravel(), cov
167 )
169 @transition_prob_fnc.setter
170 def transition_prob_fnc(self, val):
171 warn("Transistion distribution has an assumed form.")
173 # @property
174 # def cov(self):
175 # """Read only covariance of the particles.
177 # This is a weighted sum of each particles uncertainty.
179 # Returns
180 # -------
181 # N x N numpy array
182 # covariance matrix.
184 # """
185 # return gmath.weighted_sum_mat(self._particleDist.weights,
186 # self._particleDist.uncertainties)
188 @property
189 def meas_noise(self):
190 """Measurement noise matrix.
192 This is a wrapper to keep the UPF measurement noise and the internal UKF
193 measurement noise synced.
195 Returns
196 -------
197 Nm x Nm numpy array
198 measurement noise.
199 """
200 return self._filt.meas_noise
202 @meas_noise.setter
203 def meas_noise(self, meas_noise):
204 self._filt.meas_noise = meas_noise
206 @property
207 def proc_noise(self):
208 """Process noise matrix.
210 This is a wrapper to keep the UPF process noise and the internal UKF
211 process noise synced.
213 Returns
214 -------
215 N x N numpy array
216 process noise.
217 """
218 return self._filt.proc_noise
220 @proc_noise.setter
221 def proc_noise(self, proc_noise):
222 self._filt.proc_noise = proc_noise
224 def set_state_model(self, **kwargs):
225 """Sets the state model for the filter.
227 This calls the UKF's set state function
228 (:meth:`gncpy.filters.UnscentedKalmanFilter.set_state_model`).
229 """
230 self._filt.set_state_model(**kwargs)
232 def set_measurement_model(self, **kwargs):
233 r"""Sets the measurement model for the filter.
235 This is a wrapper for the inner UKF's set_measurement model function.
236 It is assumed that the measurement model is the same as that of the UKF.
237 See :meth:`gncpy.filters.UnscentedKalmanFilter.set_measurement_model`
238 for details.
239 """
240 self._filt.set_measurement_model(**kwargs)
242 def _predict_loop(self, timestep, ukf_kwargs, dist):
243 newDist = gdistrib.ParticleDistribution()
244 num_parts = dist.num_particles
245 new_parts = [None] * num_parts
246 new_weights = [None] * num_parts
247 for ii, (origPart, w) in enumerate(dist):
248 part = gdistrib.Particle()
249 self._filt.cov = origPart.uncertainty.copy()
250 self._filt._stateSigmaPoints = deepcopy(origPart.sigmaPoints)
251 part.point = self._filt.predict(timestep, origPart.point, **ukf_kwargs)
252 part.uncertainty = self._filt.cov
253 part.sigmaPoints = self._filt._stateSigmaPoints
254 new_parts[ii] = part
255 new_weights[ii] = w
256 newDist.add_particle(new_parts, new_weights)
257 return newDist
259 def predict(self, timestep, ukf_kwargs={}):
260 """Prediction step of the UPF.
262 This calls the UKF prediction function on every particle.
264 Parameters
265 ----------
266 timestep : float
267 Current timestep.
268 ukf_kwargs : dict, optional
269 Additional arguments to pass to the UKF prediction function. The
270 default is {}.
272 Returns
273 -------
274 state : N x 1 numpy array
275 The predicted state.
276 """
277 self._particleDist = self._predict_loop(
278 timestep, ukf_kwargs, self._particleDist
279 )
280 if self.use_MCMC:
281 if self.candDist is None: # first timestep
282 self.candDist = deepcopy(self._particleDist)
283 else:
284 self.candDist = self._predict_loop(timestep, ukf_kwargs, self.candDist)
285 return self._calc_state()
287 def _inner_correct(self, timestep, meas, state, filt_kwargs):
288 """Wrapper so child class can override."""
289 return self._filt.correct(timestep, meas, state, **filt_kwargs)
291 def _est_meas(self, timestep, cur_state, n_meas, meas_fun_args):
292 return self._filt._est_meas(timestep, cur_state, n_meas, meas_fun_args)[0]
294 def _correct_loop(self, timestep, meas, ukf_kwargs, dist):
295 unnorm_weights = np.nan * np.ones(dist.num_particles)
296 new_parts = [None] * dist.num_particles
297 rel_likeli = [None] * dist.num_particles
298 for ii, (p, w) in enumerate(dist):
299 self._filt.cov = p.uncertainty.copy()
300 self._filt._stateSigmaPoints = deepcopy(p.sigmaPoints)
302 # create a new particle and correct the predicted point with the measurement
303 part = gdistrib.Particle()
304 part.point, rel_likeli[ii] = self._inner_correct(
305 timestep, meas, p.point, ukf_kwargs
306 )
307 part.uncertainty = self._filt.cov
309 # draw a sample from the proposal distribution using the corrected point
310 samp = self.proposal_sampling_fnc(part)
312 # transition probability of the sample given the predicted point
313 trans_prob = self.transition_prob_fnc(samp, p.point, p.uncertainty)
315 # probability of the sampled value given the corrected value
316 proposal_prob = self.proposal_fnc(samp, part)
318 # get new weight
319 if proposal_prob < np.finfo(float).eps:
320 proposal_prob = np.finfo(float).eps
321 unnorm_weights[ii] = rel_likeli[ii] * trans_prob / proposal_prob
323 # update the new particle with the sampled point
324 part.point = samp
325 part.sigmaPoints = self._filt._stateSigmaPoints
326 part.sigmaPoints.update_points(part.point, part.uncertainty)
327 new_parts[ii] = part
328 # update info for next UKF
329 newDist = gdistrib.ParticleDistribution()
330 tot = np.sum(unnorm_weights)
331 # protect against divide by 0
332 if tot < np.finfo(float).eps:
333 tot = np.finfo(float).eps
334 newDist.add_particle(new_parts, (unnorm_weights / tot).tolist())
336 return newDist, rel_likeli, unnorm_weights
338 def correct(self, timestep, meas, ukf_kwargs={}, move_kwargs={}):
339 """Correction step of the UPF.
341 This optionally can perform a MCMC move step as well.
343 Parameters
344 ----------
345 timestep : float
346 Current timestep.
347 meas : Nm x 1 numpy array
348 measurement.
349 ukf_kwargs : dict, optional
350 Additional arguments to pass to the UKF correction function. The
351 default is {}.
352 move_kwargs : dict, optional
353 Additional arguments to pass to the movement function.
354 The default is {}.
356 Returns
357 -------
358 state : N x 1 numpy array
359 corrected state.
360 rel_likeli : list
361 each element is a float representing the relative likelihood of the
362 particles (unnormalized).
363 inds_removed : list
364 each element is an int representing the index of any particles
365 that were removed during the selection process.
366 """
367 # if first timestep and have not called predict yet
368 if self.use_MCMC and self.candDist is None:
369 self.candDist = deepcopy(self._particleDist)
370 # call UKF correction on each particle
371 (self._particleDist, rel_likeli, unnorm_weights) = self._correct_loop(
372 timestep, meas, ukf_kwargs, self._particleDist
373 )
374 if self.use_MCMC:
375 (self.candDist, can_rel_likeli, can_unnorm_weights) = self._correct_loop(
376 timestep, meas, ukf_kwargs, self.candDist
377 )
378 # perform selection/resampling
379 (inds_removed, old_weights, rel_likeli) = self._selection(
380 unnorm_weights, rel_likeli_in=rel_likeli
381 )
383 # optionally move particles
384 if self.use_MCMC:
385 rel_likeli = self.move_particles(
386 timestep,
387 meas,
388 old_weights,
389 rel_likeli,
390 can_unnorm_weights,
391 can_rel_likeli,
392 **move_kwargs
393 )
394 return (self._calc_state(), rel_likeli, inds_removed)
396 def move_particles(
397 self, timestep, meas, old_weights, old_likeli, can_weight, can_likeli
398 ):
399 r"""Movement function for the MCMC move step.
401 This modifies the internal particle distribution but does not return a
402 value.
404 Notes
405 -----
406 This implements a metropolis-hastings algorithm.
408 Parameters
409 ----------
410 timestep : float
411 Current timestep.
412 meas : Nm x 1 numpy array
413 measurement.
414 old_weights : :class:`gncpy.distributions.ParticleDistribution`
415 Distribution before the measurement correction.
417 Returns
418 -------
419 None.
420 """
421 accept_prob = self.rng.random()
422 num_parts = self._particleDist.num_particles
423 new_parts = [None] * num_parts
424 new_likeli = [None] * num_parts
425 for ii, (can, exp, ex_weight, can_weight, ex_like, can_like) in enumerate(
426 zip(
427 self.candDist,
428 self._particleDist,
429 old_weights,
430 can_weight,
431 old_likeli,
432 can_likeli,
433 )
434 ):
436 if accept_prob <= np.min([1, can_weight / ex_weight]):
437 # accept move
438 new_parts[ii] = deepcopy(can[0])
439 new_likeli[ii] = can_like
440 else:
441 # reject move
442 new_parts[ii] = exp[0]
443 new_likeli[ii] = ex_like
444 self._particleDist.clear_particles()
445 self._particleDist.add_particle(new_parts, [1 / num_parts] * num_parts)
447 return new_likeli