Coverage for src/gncpy/data_fusion.py: 91%
35 statements
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-13 06:15 +0000
« prev ^ index » next coverage.py v7.6.1, created at 2024-09-13 06:15 +0000
1import types
3import numpy as np
4import numpy.linalg as la
5import scipy.optimize as sopt
8def GeneralizedCovarianceIntersection(est_list, cov_list, weight_list, meas_model_list, optimizer=None):
10 def obj_func(w_list, cov_list=None, meas_model_list=None):
11 if cov_list is None or meas_model_list is None:
12 raise ValueError("cov_list cannot be nonetype")
13 if len(cov_list) != len(w_list) or len(cov_list) != len(meas_model_list) or len(w_list) != len(meas_model_list):
14 raise ValueError("must be same amount of weights and covariances")
15 new_cov = np.zeros((meas_model_list[0].T @ cov_list[0] @ meas_model_list[0]).shape)
16 for ii, cov in enumerate(cov_list):
17 new_cov = new_cov + w_list[ii] * meas_model_list[ii].T @ la.inv(cov_list[ii]) @ meas_model_list[ii]
18 return np.trace(la.inv(new_cov))
20 obs_mat_list = []
21 for ii, model in enumerate(meas_model_list):
22 if isinstance(model, types.FunctionType):
23 obs_mat_list.append(model(0, est_list[ii]))
24 else:
25 obs_mat_list.append(model)
27 if optimizer is not None:
28 opt_out = sopt.minimize(obj_func, weight_list, method=optimizer, args=(cov_list, obs_mat_list,))
29 else:
30 opt_out = sopt.minimize(obj_func, weight_list, args=(cov_list, obs_mat_list,), constraints=(sopt.LinearConstraint(np.ones(len(est_list)).reshape([1, len(est_list)]), lb=1, ub=1), sopt.LinearConstraint(np.eye(len(est_list)), lb=0, ub=1)))
31 new_cov = np.zeros(np.shape(obs_mat_list[0].T @ cov_list[0] @ obs_mat_list[0]))
33 new_weight_list = []
34 for w in opt_out.x:
35 new_weight_list.append(w / np.sum(opt_out.x))
37 for ii, cov in enumerate(cov_list):
38 new_cov = new_cov + new_weight_list[ii] * obs_mat_list[ii].T @ la.inv(cov_list[ii]) @ obs_mat_list[ii]
40 new_cov = la.inv(new_cov)
41 gain_list = []
42 new_est = np.zeros(np.shape(new_cov @ obs_mat_list[0].T @ la.inv(cov_list[0]) @ est_list[0]))
43 for ii, cov in enumerate(cov_list):
44 gain_list.append(new_weight_list[ii] * new_cov @ obs_mat_list[ii].T @ la.inv(cov))
45 new_est = new_est + gain_list[ii] @ est_list[ii]
47 return new_est, new_cov, new_weight_list