Coverage for src/serums/distribution_estimator.py: 83%

229 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-11 16:43 +0000

1"""Estimates Parameters of Various Distributions.""" 

2import numpy as np 

3from scipy.optimize import minimize 

4from scipy.stats import ks_2samp, cramervonmises_2samp, anderson_ksamp 

5import serums.models 

6import serums.enums as senums 

7import serums.errors 

8 

9 

10def _edparams_cost_factory(dist): 

11 def cost_function(x, rng, samples, method): 

12 # sample from distribution given by params in x 

13 if isinstance(dist, serums.models.Gaussian): 

14 x_samples = x[1] * rng.standard_normal(size=samples.size) + x[0] 

15 

16 elif isinstance(dist, serums.models.Cauchy): 

17 x_samples = x[1] * rng.standard_t(1, size=samples.size) + x[0] 

18 

19 elif isinstance(dist, serums.models.StudentsT): 

20 x_samples = x[1] * rng.standard_t(np.abs(x[2]), size=samples.size) + x[0] 

21 else: 

22 fmt = "Invalid distribution choice: {}" 

23 raise RuntimeError(fmt.format(type(dist).__name__)) 

24 

25 # Kolmogorov-Smirnov test to get Chi2 

26 if method is senums.DistEstimatorMethod.KOLMOGOROV_SMIRNOV: 

27 cost = ks_2samp(x_samples, samples)[0] 

28 elif method is senums.DistEstimatorMethod.CRAMER_VON_MISES: 

29 out = cramervonmises_2samp(x_samples, samples) 

30 cost = out.statistic 

31 elif method is senums.DistEstimatorMethod.ANDERSON_DARLING: 

32 cost = anderson_ksamp([x_samples, samples]).statistic 

33 else: 

34 fmt = "Invalid method choice: {}" 

35 raise RuntimeError(fmt.format(method)) 

36 return cost 

37 

38 return cost_function 

39 

40 

41def estimate_distribution_params(dist, method, samples, maxiter=100, disp=False): 

42 """Estimate distribution parameters from data. 

43 

44 Parameters 

45 ---------- 

46 dist : :class:`serums.models.BaseSingleModel` 

47 Desired target distribution to fit samples. 

48 method : :class:`serums.enums.DistEstimatorMethod` 

49 Selection of goodness of fit test to compare samples to target distribution. 

50 samples : N numpy array 

51 Samples to be fitted to the input distribution. 

52 maxiter : int, optional 

53 Maximum number of iterations to run the estimator. The default is 100. 

54 disp : bool, optional 

55 Flag for displaying extra information. The default is False. 

56 

57 Raises 

58 ------ 

59 RuntimeError 

60 Raised if the estimator fails. 

61 

62 Returns 

63 ------- 

64 output : :class:`serums.models.BaseSingleModel` 

65 Estimated output distribution. 

66 """ 

67 # Set outputs 

68 output = type(dist)() 

69 

70 if method in ( 

71 senums.DistEstimatorMethod.ANDERSON_DARLING, 

72 senums.DistEstimatorMethod.CRAMER_VON_MISES, 

73 senums.DistEstimatorMethod.KOLMOGOROV_SMIRNOV, 

74 ): 

75 # Select cost function 

76 costfun = _edparams_cost_factory(dist) 

77 if isinstance(dist, (serums.models.Gaussian, serums.models.Cauchy)): 

78 x0 = np.array([dist.location.item(), dist.scale.item()]) 

79 rng = np.random.default_rng() 

80 elif isinstance(dist, serums.models.StudentsT): 

81 x0 = np.array( 

82 [ 

83 dist.location.item(), 

84 dist.scale.item(), 

85 dist.degrees_of_freedom.item(), 

86 ] 

87 ) 

88 rng = np.random.default_rng() 

89 else: 

90 fmt = "Invalid distribution choice: {}" 

91 raise RuntimeError(fmt.format(type(dist).__name__)) 

92 

93 # optimize distribution parameters 

94 res = minimize( 

95 costfun, 

96 x0, 

97 args=(rng, samples, method), 

98 method="Powell", 

99 options={"maxiter": maxiter, "disp": disp}, 

100 ) 

101 

102 if not res.success: 

103 fmt = "Parameter estimation failed with:\n{}" 

104 raise RuntimeError(fmt.format(res.message)) 

105 

106 output.location = res.x[0].reshape(np.shape(dist.location)) 

107 output.scale = np.abs(res.x[1]).reshape(np.shape(dist.scale)) 

108 

109 if isinstance(dist, (serums.models.Gaussian, serums.models.Cauchy)): 

110 pass 

111 

112 elif isinstance(dist, serums.models.StudentsT): 

113 output.degrees_of_freedom = np.abs(res.x[2]).item() 

114 

115 else: 

116 fmt = "Invalid distribution choice: {}" 

117 raise RuntimeError(fmt.format(type(dist).__name__)) 

118 

119 elif method is senums.DistEstimatorMethod.GRIMSHAW_MLE: 

120 if not isinstance(dist, serums.models.GeneralizedPareto): 

121 raise RuntimeError("Invalid distribution for Grimshaws MLE") 

122 try: 

123 shape, scale = grimshaw_MLE(samples)[:2] 

124 output.shape = np.array([[shape]]) 

125 output.scale = np.array([[scale]]) 

126 except serums.errors.DistributionEstimatorFailed: 

127 estimate_distribution_params( 

128 dist, 

129 senums.DistEstimatorMethod.METHOD_OF_MOMENTS, 

130 samples, 

131 maxiter=maxiter, 

132 disp=disp, 

133 ) 

134 

135 elif method is senums.DistEstimatorMethod.METHOD_OF_MOMENTS: 

136 if not isinstance(dist, serums.models.GeneralizedPareto): 

137 raise RuntimeError("Method of Moments only implemented for GPD") 

138 else: 

139 shape, scale = genpareto_MOM_est(samples)[:2] 

140 output.shape = np.array([[shape]]) 

141 output.scale = np.array([[scale]]) 

142 

143 elif method is senums.DistEstimatorMethod.PROBABILITY_WEIGHTED_MOMENTS: 

144 if not isinstance(dist, serums.models.GeneralizedPareto): 

145 raise RuntimeError("PWM only implemented for GPD") 

146 else: 

147 shape, scale = genpareto_PWM_est(samples)[:2] 

148 output.shape = np.array([[shape]]) 

149 output.scale = np.array([[scale]]) 

150 

151 return output 

152 

153 

154def grimshaw_MLE(dataset): 

155 """Finds estimates for GPD shape and scale using Grimshaw's MLE Procedure. 

156 

157 Notes 

158 ----- 

159 This method is taken from 

160 :cite:`Grimshaw1993_ComputingMLEforGeneralizedParetoDistribution` 

161 

162 Parameters 

163 ---------- 

164 dataset : N numpy array 

165 Contains positive error data above a threshold of 0. 

166 

167 Returns 

168 ------- 

169 shape_gamma : float 

170 Shape parameter as defined in chapter 3.2 of :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems` 

171 scale_beta : float 

172 Scale parameter as defined in chapter 3.2 of :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems` 

173 gpd_covar : 2x2 numpy array 

174 Contains the estimated covariance matrix for shape, scale based on 

175 observed Fisher information. For further information, see chapter 4.1 

176 of :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems` 

177 """ 

178 # Calculate sample size, mean, minimum, and maximum 

179 n = dataset.size 

180 x_bar = np.mean(dataset) 

181 x_min = np.amin(dataset) 

182 x_max = np.amax(dataset) 

183 

184 # GM Step 1: Choose epsilon 

185 epsilon = 1e-6 / x_bar 

186 

187 # GM Step 2: Compute lower and upper bounds for zeros of h(theta) 

188 theta_L = (2 * (x_min - x_bar)) / (x_min**2) 

189 theta_U = (1 / x_max) - epsilon 

190 

191 # Bypass GM Step 3: Search for zeros on both intervals regardless of h''(t) 

192 

193 # GM Steps 4 and 5 (Find zeros of h(theta) if possible) 

194 

195 def h(theta, data): 

196 """Returns value of h(theta) function for given theta and dataset.""" 

197 value = (1 + (1 / data.size) * np.sum(np.log(1 - theta * data))) * ( 

198 (1 / data.size) * np.sum(1 / (1 - theta * data)) 

199 ) - 1 

200 return value 

201 

202 def h_prime(theta, data): 

203 """Returns value of h'(theta) for a given theta and dataset.""" 

204 value = (1 / theta) * ( 

205 ((1 / data.size) * np.sum(1 / (1 - theta * data) ** 2)) 

206 - ((1 / data.size) * np.sum(1 / (1 - theta * data))) ** 2 

207 - ((1 / data.size) * np.sum(np.log(1 - theta * data))) 

208 * ( 

209 ((1 / data.size) * np.sum(1 / (1 - theta * data))) 

210 - ((1 / data.size) * np.sum(1 / (1 - theta * data) ** 2)) 

211 ) 

212 ) 

213 return value 

214 

215 def mod_NR_root_search(init_guess_theta, low, high, data): 

216 """Searches for a root using MNRM and returns the root if converged.""" 

217 max_iter = 1000 

218 tol = 1e-15 

219 

220 def mid(t1, t2): 

221 return (t1 + t2) / 2 

222 

223 t_old = init_guess_theta 

224 bottom = low 

225 top = high 

226 

227 for i in range(max_iter): 

228 t_sub = t_old - h(t_old, data) / h_prime(t_old, data) 

229 if t_sub > bottom and t_sub < top: 

230 t_old = t_sub 

231 else: 

232 t_old = mid(top, bottom) 

233 if h(bottom, dataset) * h(t_old, dataset) < 0: 

234 top = t_old 

235 else: 

236 bottom = t_old 

237 if abs(h(t_old, data)) < tol: # changed condition 

238 conv = True 

239 break 

240 elif t_old == low or t_old == high: 

241 conv = False 

242 break 

243 else: 

244 conv = False 

245 

246 if conv is True: 

247 return t_old 

248 else: 

249 return t_old 

250 

251 def mm_root_search(low, high, data): 

252 """Finds a root of h(theta) using the midpoint method on given interval.""" 

253 max_iter = 1000 

254 tol = 1e-15 

255 bottom = low 

256 top = high 

257 

258 for i in range(max_iter): 

259 t_old = bottom + top / 2 

260 if h(bottom, dataset) * h(t_old, dataset) < 0: 

261 top = t_old 

262 else: 

263 bottom = t_old 

264 if abs(h(t_old, data)) < tol: 

265 conv = True 

266 break 

267 else: 

268 conv = False 

269 

270 if conv is True: 

271 return t_old 

272 else: 

273 return t_old 

274 

275 throots = np.zeros(4, dtype=float) 

276 

277 # Search for zeros of h(theta) on low interval 

278 throots[0] = mod_NR_root_search(theta_L, theta_L, -epsilon, dataset) 

279 

280 if throots[0] > theta_L + epsilon and throots[0] < -epsilon * 2: 

281 root_conv1 = True 

282 lfs_test = h(theta_L, dataset) * h(throots[0] - epsilon, dataset) 

283 rts_test = h(-epsilon, dataset) * h(throots[0] + epsilon, dataset) 

284 if lfs_test < 0: 

285 throots[1] = mm_root_search(theta_L, throots[0] - epsilon, dataset) 

286 if rts_test < 0: 

287 throots[1] = mm_root_search(throots[0] + epsilon, -epsilon, dataset) 

288 else: 

289 throots[1] = -epsilon 

290 else: 

291 root_conv1 = False 

292 

293 # Search for zeros of h(theta) on high interval 

294 throots[2] = mod_NR_root_search(theta_U, epsilon, theta_U, dataset) 

295 

296 if throots[2] > 2 * epsilon and throots[2] < theta_U - epsilon: 

297 root_conv2 = True 

298 lfs_test = h(epsilon, dataset) * h(throots[2] - epsilon, dataset) 

299 rts_test = h(throots[2] + epsilon, dataset) * h(theta_U, dataset) 

300 if lfs_test < 0: 

301 throots[3] = mm_root_search(epsilon, throots[2] - epsilon, dataset) 

302 if rts_test < 0: 

303 throots[3] = mm_root_search(throots[2] + epsilon, theta_U, dataset) 

304 else: 

305 throots[3] = theta_U 

306 else: 

307 root_conv2 = False 

308 

309 # GM Step 6: Compute candidate shape and scale parameters and 

310 # evaluate GPD log-likelihood 

311 def get_GPD_parameters(theta, data): 

312 """Calculates shape, scale of GPD from given Theta.""" 

313 if abs(theta) < 1e-6: 

314 val1 = 1 

315 val2 = 100000000 

316 else: 

317 val1 = -(1 / data.size) * np.sum(np.log(1 - theta * dataset)) 

318 val2 = val1 / theta 

319 

320 tup = (-val1, val2) 

321 return list(tup) 

322 

323 def GPD_log_likelihood(shape_gamma, scale_beta, data): 

324 """Evaluates the GPD log-likelihood given shape and scale.""" 

325 k = -shape_gamma 

326 alfa = scale_beta 

327 if k == 0: 

328 val = -data.size * np.log(alfa) - (1 / alfa) * np.sum(data) 

329 else: 

330 val = -data.size * np.log(alfa) + ((1 / k) - 1) * np.sum( 

331 np.log(1 - (k * data) / alfa) 

332 ) 

333 return val 

334 

335 param_list = np.zeros([8], dtype=float) 

336 param_list[0:2] = get_GPD_parameters(throots[0], dataset) 

337 param_list[2:4] = get_GPD_parameters(throots[1], dataset) 

338 param_list[4:6] = get_GPD_parameters(throots[2], dataset) 

339 param_list[6:8] = get_GPD_parameters(throots[3], dataset) 

340 

341 odds_list = np.zeros([4], dtype=float) 

342 odds_list[0] = GPD_log_likelihood(param_list[0], param_list[1], dataset) 

343 odds_list[1] = GPD_log_likelihood(param_list[2], param_list[3], dataset) 

344 odds_list[2] = GPD_log_likelihood(param_list[4], param_list[5], dataset) 

345 odds_list[3] = GPD_log_likelihood(param_list[6], param_list[7], dataset) 

346 

347 idx = np.argmax(odds_list) 

348 crit = get_GPD_parameters(throots[idx], dataset) 

349 

350 # GM Step 7 and 8: Choose best parameter estimates 

351 if root_conv1 is True or root_conv2 is True: 

352 if odds_list[idx] > -n * np.log(x_max): 

353 shape_gamma = crit[0] 

354 scale_beta = crit[1] 

355 else: 

356 shape_gamma = -1 

357 scale_beta = x_max 

358 else: 

359 raise serums.errors.DistributionEstimatorFailed("Grimshaw MLE failed") 

360 

361 # Calculate covariance of shape, scale parameters using CRLB based on 

362 # observed Fisher information (eq. 4.2 in :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems`) 

363 

364 eye_11 = ( 

365 (2 / (shape_gamma**3)) 

366 * np.sum(np.log(1 + (shape_gamma / scale_beta) * dataset)) 

367 - (2 / (shape_gamma**2)) 

368 * np.sum((dataset / scale_beta) / (1 + (shape_gamma / scale_beta) * dataset)) 

369 - (1 + (1 / shape_gamma)) 

370 * np.sum( 

371 ((dataset / scale_beta) / (1 + (shape_gamma / scale_beta) * dataset)) ** 2 

372 ) 

373 ) 

374 eye_12 = -np.sum( 

375 (dataset / scale_beta**2) / (1 + (shape_gamma / scale_beta) * dataset) 

376 ) + (1 + shape_gamma) * np.sum( 

377 (dataset**2 / scale_beta**3) 

378 / (1 + (shape_gamma / scale_beta) * dataset) ** 2 

379 ) 

380 eye_21 = eye_12 

381 eye_22 = ( 

382 (-n / scale_beta**2) 

383 + 2 

384 * (shape_gamma + 1) 

385 * np.sum( 

386 (dataset / scale_beta**3) / (1 + (shape_gamma / scale_beta) * dataset) 

387 ) 

388 - shape_gamma 

389 * (shape_gamma + 1) 

390 * np.sum( 

391 (dataset**2 / scale_beta**4) 

392 / (1 + (shape_gamma / scale_beta) * dataset) ** 2 

393 ) 

394 ) 

395 

396 sub = np.array([[eye_11, eye_12], [eye_21, eye_22]]) 

397 gpd_covar = np.linalg.inv(sub) 

398 

399 return shape_gamma, scale_beta, gpd_covar 

400 

401 

402def genpareto_MOM_est(dataset): 

403 """Finds estimates for GPD shape and scale using the method of moments. 

404 

405 Important: The covariance of these estimators is only shown to be 

406 asymptotically normally distributed and given by the formula below if 

407 (shape_gamma < 1/4) is true 

408 

409 Notes 

410 ----- 

411 This method is taken from 

412 :cite:`Hosking1987_ParameterEstimationforGeneralizedParetoDistribution` 

413 

414 Parameters 

415 ---------- 

416 dataset : N x 1 numpy array 

417 Contains positive error data above threshold 

418 

419 Returns 

420 ------- 

421 shape_gamma : float 

422 Shape parameter as defined by :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems` 

423 scale_beta : float 

424 Scale parameter as defined by :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems` 

425 gpd_covar : 2 x 2 numpy array 

426 Contains the estimated asymptotic covariance matrix for shape, scale 

427 given in Hosking and Wallis 

428 """ 

429 x_bar = np.mean(dataset) 

430 svar = np.var(dataset, ddof=1) 

431 

432 alfahat = 0.5 * x_bar * (1 + (x_bar**2) / svar) 

433 khat = 0.5 * (-1 + (x_bar**2) / svar) 

434 

435 shape_gamma = -khat 

436 scale_beta = alfahat 

437 

438 # Calculate estimated asymptotic covariance matrix 

439 n = dataset.size 

440 gain = (1 / n) * ( 

441 ((1 + khat) ** 2) / ((1 + 2 * khat) * (1 + 3 * khat) * (1 + 4 * khat)) 

442 ) 

443 

444 mat11 = 2 * (alfahat**2) * (1 + 6 * khat + 12 * khat**2) 

445 mat12 = alfahat * (1 + 2 * khat) * (1 + 4 * khat + 12 * khat**2) 

446 mat21 = mat12 

447 mat22 = ((1 + 2 * khat) ** 2) * (1 + khat + 6 * khat**2) 

448 

449 gpd_covar = gain * np.array([[mat11, mat12], [mat21, mat22]]) 

450 

451 return shape_gamma, scale_beta, gpd_covar 

452 

453 

454def genpareto_PWM_est(dataset): 

455 """Finds estimates for GPD shape and scale using the PWM method. 

456 

457 Important: The covariance matrix calculated below is only valid if the 

458 shape parameter gamma is less than 0.5 

459 

460 Notes 

461 ----- 

462 This method is taken from 

463 :cite:`Hosking1987_ParameterEstimationforGeneralizedParetoDistribution` 

464 

465 Parameters 

466 ---------- 

467 dataset : N x 1 numpy array 

468 Contains positive error data above threshold 

469 

470 Returns 

471 ------- 

472 shape_gamma : float 

473 Shape parameter as defined by :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems` 

474 scale_beta : float 

475 Scale parameter as defined by :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems` 

476 gpd_covar : 2 x 2 numpy array 

477 Contains the estimated asymptotic covariance matrix for shape, scale 

478 given in Hosking and Wallis 

479 """ 

480 n = dataset.size 

481 ordered_list = np.sort(dataset) 

482 

483 gam0bar = (1 / n) * np.sum(dataset) 

484 

485 summate = 0 

486 for ii in range(n): 

487 temp = ((n - (ii + 1)) / (n - 1)) * ordered_list[ii] 

488 summate = summate + temp 

489 

490 gam1bar = (1 / n) * summate 

491 

492 scale_beta = (2 * gam0bar * gam1bar) / (gam0bar - 2 * gam1bar) 

493 shape_gamma = -(gam0bar / (gam0bar - 2 * gam1bar) - 2) 

494 

495 # Calculate estimated asymptotic covariance matrix 

496 k = -shape_gamma 

497 a = scale_beta 

498 

499 gain = (1 / n) * (1 / ((1 + 2 * k) * (3 + 2 * k))) 

500 

501 mat11 = (a**2) * (7 + 18 * k + 11 * k**2 + 2 * k**3) 

502 mat12 = a * (2 + k) * (2 + 6 * k + 7 * k**2 + 2 * k**3) 

503 mat21 = mat12 

504 mat22 = (1 + k) * ((2 + k) ** 2) * (1 + k + 2 * k**2) 

505 

506 gpd_covar = gain * np.array([[mat11, mat12], [mat21, mat22]]) 

507 

508 return shape_gamma, scale_beta, gpd_covar