Coverage for src/serums/distribution_estimator.py: 83%
229 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-11 16:43 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-11 16:43 +0000
1"""Estimates Parameters of Various Distributions."""
2import numpy as np
3from scipy.optimize import minimize
4from scipy.stats import ks_2samp, cramervonmises_2samp, anderson_ksamp
5import serums.models
6import serums.enums as senums
7import serums.errors
10def _edparams_cost_factory(dist):
11 def cost_function(x, rng, samples, method):
12 # sample from distribution given by params in x
13 if isinstance(dist, serums.models.Gaussian):
14 x_samples = x[1] * rng.standard_normal(size=samples.size) + x[0]
16 elif isinstance(dist, serums.models.Cauchy):
17 x_samples = x[1] * rng.standard_t(1, size=samples.size) + x[0]
19 elif isinstance(dist, serums.models.StudentsT):
20 x_samples = x[1] * rng.standard_t(np.abs(x[2]), size=samples.size) + x[0]
21 else:
22 fmt = "Invalid distribution choice: {}"
23 raise RuntimeError(fmt.format(type(dist).__name__))
25 # Kolmogorov-Smirnov test to get Chi2
26 if method is senums.DistEstimatorMethod.KOLMOGOROV_SMIRNOV:
27 cost = ks_2samp(x_samples, samples)[0]
28 elif method is senums.DistEstimatorMethod.CRAMER_VON_MISES:
29 out = cramervonmises_2samp(x_samples, samples)
30 cost = out.statistic
31 elif method is senums.DistEstimatorMethod.ANDERSON_DARLING:
32 cost = anderson_ksamp([x_samples, samples]).statistic
33 else:
34 fmt = "Invalid method choice: {}"
35 raise RuntimeError(fmt.format(method))
36 return cost
38 return cost_function
41def estimate_distribution_params(dist, method, samples, maxiter=100, disp=False):
42 """Estimate distribution parameters from data.
44 Parameters
45 ----------
46 dist : :class:`serums.models.BaseSingleModel`
47 Desired target distribution to fit samples.
48 method : :class:`serums.enums.DistEstimatorMethod`
49 Selection of goodness of fit test to compare samples to target distribution.
50 samples : N numpy array
51 Samples to be fitted to the input distribution.
52 maxiter : int, optional
53 Maximum number of iterations to run the estimator. The default is 100.
54 disp : bool, optional
55 Flag for displaying extra information. The default is False.
57 Raises
58 ------
59 RuntimeError
60 Raised if the estimator fails.
62 Returns
63 -------
64 output : :class:`serums.models.BaseSingleModel`
65 Estimated output distribution.
66 """
67 # Set outputs
68 output = type(dist)()
70 if method in (
71 senums.DistEstimatorMethod.ANDERSON_DARLING,
72 senums.DistEstimatorMethod.CRAMER_VON_MISES,
73 senums.DistEstimatorMethod.KOLMOGOROV_SMIRNOV,
74 ):
75 # Select cost function
76 costfun = _edparams_cost_factory(dist)
77 if isinstance(dist, (serums.models.Gaussian, serums.models.Cauchy)):
78 x0 = np.array([dist.location.item(), dist.scale.item()])
79 rng = np.random.default_rng()
80 elif isinstance(dist, serums.models.StudentsT):
81 x0 = np.array(
82 [
83 dist.location.item(),
84 dist.scale.item(),
85 dist.degrees_of_freedom.item(),
86 ]
87 )
88 rng = np.random.default_rng()
89 else:
90 fmt = "Invalid distribution choice: {}"
91 raise RuntimeError(fmt.format(type(dist).__name__))
93 # optimize distribution parameters
94 res = minimize(
95 costfun,
96 x0,
97 args=(rng, samples, method),
98 method="Powell",
99 options={"maxiter": maxiter, "disp": disp},
100 )
102 if not res.success:
103 fmt = "Parameter estimation failed with:\n{}"
104 raise RuntimeError(fmt.format(res.message))
106 output.location = res.x[0].reshape(np.shape(dist.location))
107 output.scale = np.abs(res.x[1]).reshape(np.shape(dist.scale))
109 if isinstance(dist, (serums.models.Gaussian, serums.models.Cauchy)):
110 pass
112 elif isinstance(dist, serums.models.StudentsT):
113 output.degrees_of_freedom = np.abs(res.x[2]).item()
115 else:
116 fmt = "Invalid distribution choice: {}"
117 raise RuntimeError(fmt.format(type(dist).__name__))
119 elif method is senums.DistEstimatorMethod.GRIMSHAW_MLE:
120 if not isinstance(dist, serums.models.GeneralizedPareto):
121 raise RuntimeError("Invalid distribution for Grimshaws MLE")
122 try:
123 shape, scale = grimshaw_MLE(samples)[:2]
124 output.shape = np.array([[shape]])
125 output.scale = np.array([[scale]])
126 except serums.errors.DistributionEstimatorFailed:
127 estimate_distribution_params(
128 dist,
129 senums.DistEstimatorMethod.METHOD_OF_MOMENTS,
130 samples,
131 maxiter=maxiter,
132 disp=disp,
133 )
135 elif method is senums.DistEstimatorMethod.METHOD_OF_MOMENTS:
136 if not isinstance(dist, serums.models.GeneralizedPareto):
137 raise RuntimeError("Method of Moments only implemented for GPD")
138 else:
139 shape, scale = genpareto_MOM_est(samples)[:2]
140 output.shape = np.array([[shape]])
141 output.scale = np.array([[scale]])
143 elif method is senums.DistEstimatorMethod.PROBABILITY_WEIGHTED_MOMENTS:
144 if not isinstance(dist, serums.models.GeneralizedPareto):
145 raise RuntimeError("PWM only implemented for GPD")
146 else:
147 shape, scale = genpareto_PWM_est(samples)[:2]
148 output.shape = np.array([[shape]])
149 output.scale = np.array([[scale]])
151 return output
154def grimshaw_MLE(dataset):
155 """Finds estimates for GPD shape and scale using Grimshaw's MLE Procedure.
157 Notes
158 -----
159 This method is taken from
160 :cite:`Grimshaw1993_ComputingMLEforGeneralizedParetoDistribution`
162 Parameters
163 ----------
164 dataset : N numpy array
165 Contains positive error data above a threshold of 0.
167 Returns
168 -------
169 shape_gamma : float
170 Shape parameter as defined in chapter 3.2 of :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems`
171 scale_beta : float
172 Scale parameter as defined in chapter 3.2 of :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems`
173 gpd_covar : 2x2 numpy array
174 Contains the estimated covariance matrix for shape, scale based on
175 observed Fisher information. For further information, see chapter 4.1
176 of :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems`
177 """
178 # Calculate sample size, mean, minimum, and maximum
179 n = dataset.size
180 x_bar = np.mean(dataset)
181 x_min = np.amin(dataset)
182 x_max = np.amax(dataset)
184 # GM Step 1: Choose epsilon
185 epsilon = 1e-6 / x_bar
187 # GM Step 2: Compute lower and upper bounds for zeros of h(theta)
188 theta_L = (2 * (x_min - x_bar)) / (x_min**2)
189 theta_U = (1 / x_max) - epsilon
191 # Bypass GM Step 3: Search for zeros on both intervals regardless of h''(t)
193 # GM Steps 4 and 5 (Find zeros of h(theta) if possible)
195 def h(theta, data):
196 """Returns value of h(theta) function for given theta and dataset."""
197 value = (1 + (1 / data.size) * np.sum(np.log(1 - theta * data))) * (
198 (1 / data.size) * np.sum(1 / (1 - theta * data))
199 ) - 1
200 return value
202 def h_prime(theta, data):
203 """Returns value of h'(theta) for a given theta and dataset."""
204 value = (1 / theta) * (
205 ((1 / data.size) * np.sum(1 / (1 - theta * data) ** 2))
206 - ((1 / data.size) * np.sum(1 / (1 - theta * data))) ** 2
207 - ((1 / data.size) * np.sum(np.log(1 - theta * data)))
208 * (
209 ((1 / data.size) * np.sum(1 / (1 - theta * data)))
210 - ((1 / data.size) * np.sum(1 / (1 - theta * data) ** 2))
211 )
212 )
213 return value
215 def mod_NR_root_search(init_guess_theta, low, high, data):
216 """Searches for a root using MNRM and returns the root if converged."""
217 max_iter = 1000
218 tol = 1e-15
220 def mid(t1, t2):
221 return (t1 + t2) / 2
223 t_old = init_guess_theta
224 bottom = low
225 top = high
227 for i in range(max_iter):
228 t_sub = t_old - h(t_old, data) / h_prime(t_old, data)
229 if t_sub > bottom and t_sub < top:
230 t_old = t_sub
231 else:
232 t_old = mid(top, bottom)
233 if h(bottom, dataset) * h(t_old, dataset) < 0:
234 top = t_old
235 else:
236 bottom = t_old
237 if abs(h(t_old, data)) < tol: # changed condition
238 conv = True
239 break
240 elif t_old == low or t_old == high:
241 conv = False
242 break
243 else:
244 conv = False
246 if conv is True:
247 return t_old
248 else:
249 return t_old
251 def mm_root_search(low, high, data):
252 """Finds a root of h(theta) using the midpoint method on given interval."""
253 max_iter = 1000
254 tol = 1e-15
255 bottom = low
256 top = high
258 for i in range(max_iter):
259 t_old = bottom + top / 2
260 if h(bottom, dataset) * h(t_old, dataset) < 0:
261 top = t_old
262 else:
263 bottom = t_old
264 if abs(h(t_old, data)) < tol:
265 conv = True
266 break
267 else:
268 conv = False
270 if conv is True:
271 return t_old
272 else:
273 return t_old
275 throots = np.zeros(4, dtype=float)
277 # Search for zeros of h(theta) on low interval
278 throots[0] = mod_NR_root_search(theta_L, theta_L, -epsilon, dataset)
280 if throots[0] > theta_L + epsilon and throots[0] < -epsilon * 2:
281 root_conv1 = True
282 lfs_test = h(theta_L, dataset) * h(throots[0] - epsilon, dataset)
283 rts_test = h(-epsilon, dataset) * h(throots[0] + epsilon, dataset)
284 if lfs_test < 0:
285 throots[1] = mm_root_search(theta_L, throots[0] - epsilon, dataset)
286 if rts_test < 0:
287 throots[1] = mm_root_search(throots[0] + epsilon, -epsilon, dataset)
288 else:
289 throots[1] = -epsilon
290 else:
291 root_conv1 = False
293 # Search for zeros of h(theta) on high interval
294 throots[2] = mod_NR_root_search(theta_U, epsilon, theta_U, dataset)
296 if throots[2] > 2 * epsilon and throots[2] < theta_U - epsilon:
297 root_conv2 = True
298 lfs_test = h(epsilon, dataset) * h(throots[2] - epsilon, dataset)
299 rts_test = h(throots[2] + epsilon, dataset) * h(theta_U, dataset)
300 if lfs_test < 0:
301 throots[3] = mm_root_search(epsilon, throots[2] - epsilon, dataset)
302 if rts_test < 0:
303 throots[3] = mm_root_search(throots[2] + epsilon, theta_U, dataset)
304 else:
305 throots[3] = theta_U
306 else:
307 root_conv2 = False
309 # GM Step 6: Compute candidate shape and scale parameters and
310 # evaluate GPD log-likelihood
311 def get_GPD_parameters(theta, data):
312 """Calculates shape, scale of GPD from given Theta."""
313 if abs(theta) < 1e-6:
314 val1 = 1
315 val2 = 100000000
316 else:
317 val1 = -(1 / data.size) * np.sum(np.log(1 - theta * dataset))
318 val2 = val1 / theta
320 tup = (-val1, val2)
321 return list(tup)
323 def GPD_log_likelihood(shape_gamma, scale_beta, data):
324 """Evaluates the GPD log-likelihood given shape and scale."""
325 k = -shape_gamma
326 alfa = scale_beta
327 if k == 0:
328 val = -data.size * np.log(alfa) - (1 / alfa) * np.sum(data)
329 else:
330 val = -data.size * np.log(alfa) + ((1 / k) - 1) * np.sum(
331 np.log(1 - (k * data) / alfa)
332 )
333 return val
335 param_list = np.zeros([8], dtype=float)
336 param_list[0:2] = get_GPD_parameters(throots[0], dataset)
337 param_list[2:4] = get_GPD_parameters(throots[1], dataset)
338 param_list[4:6] = get_GPD_parameters(throots[2], dataset)
339 param_list[6:8] = get_GPD_parameters(throots[3], dataset)
341 odds_list = np.zeros([4], dtype=float)
342 odds_list[0] = GPD_log_likelihood(param_list[0], param_list[1], dataset)
343 odds_list[1] = GPD_log_likelihood(param_list[2], param_list[3], dataset)
344 odds_list[2] = GPD_log_likelihood(param_list[4], param_list[5], dataset)
345 odds_list[3] = GPD_log_likelihood(param_list[6], param_list[7], dataset)
347 idx = np.argmax(odds_list)
348 crit = get_GPD_parameters(throots[idx], dataset)
350 # GM Step 7 and 8: Choose best parameter estimates
351 if root_conv1 is True or root_conv2 is True:
352 if odds_list[idx] > -n * np.log(x_max):
353 shape_gamma = crit[0]
354 scale_beta = crit[1]
355 else:
356 shape_gamma = -1
357 scale_beta = x_max
358 else:
359 raise serums.errors.DistributionEstimatorFailed("Grimshaw MLE failed")
361 # Calculate covariance of shape, scale parameters using CRLB based on
362 # observed Fisher information (eq. 4.2 in :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems`)
364 eye_11 = (
365 (2 / (shape_gamma**3))
366 * np.sum(np.log(1 + (shape_gamma / scale_beta) * dataset))
367 - (2 / (shape_gamma**2))
368 * np.sum((dataset / scale_beta) / (1 + (shape_gamma / scale_beta) * dataset))
369 - (1 + (1 / shape_gamma))
370 * np.sum(
371 ((dataset / scale_beta) / (1 + (shape_gamma / scale_beta) * dataset)) ** 2
372 )
373 )
374 eye_12 = -np.sum(
375 (dataset / scale_beta**2) / (1 + (shape_gamma / scale_beta) * dataset)
376 ) + (1 + shape_gamma) * np.sum(
377 (dataset**2 / scale_beta**3)
378 / (1 + (shape_gamma / scale_beta) * dataset) ** 2
379 )
380 eye_21 = eye_12
381 eye_22 = (
382 (-n / scale_beta**2)
383 + 2
384 * (shape_gamma + 1)
385 * np.sum(
386 (dataset / scale_beta**3) / (1 + (shape_gamma / scale_beta) * dataset)
387 )
388 - shape_gamma
389 * (shape_gamma + 1)
390 * np.sum(
391 (dataset**2 / scale_beta**4)
392 / (1 + (shape_gamma / scale_beta) * dataset) ** 2
393 )
394 )
396 sub = np.array([[eye_11, eye_12], [eye_21, eye_22]])
397 gpd_covar = np.linalg.inv(sub)
399 return shape_gamma, scale_beta, gpd_covar
402def genpareto_MOM_est(dataset):
403 """Finds estimates for GPD shape and scale using the method of moments.
405 Important: The covariance of these estimators is only shown to be
406 asymptotically normally distributed and given by the formula below if
407 (shape_gamma < 1/4) is true
409 Notes
410 -----
411 This method is taken from
412 :cite:`Hosking1987_ParameterEstimationforGeneralizedParetoDistribution`
414 Parameters
415 ----------
416 dataset : N x 1 numpy array
417 Contains positive error data above threshold
419 Returns
420 -------
421 shape_gamma : float
422 Shape parameter as defined by :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems`
423 scale_beta : float
424 Scale parameter as defined by :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems`
425 gpd_covar : 2 x 2 numpy array
426 Contains the estimated asymptotic covariance matrix for shape, scale
427 given in Hosking and Wallis
428 """
429 x_bar = np.mean(dataset)
430 svar = np.var(dataset, ddof=1)
432 alfahat = 0.5 * x_bar * (1 + (x_bar**2) / svar)
433 khat = 0.5 * (-1 + (x_bar**2) / svar)
435 shape_gamma = -khat
436 scale_beta = alfahat
438 # Calculate estimated asymptotic covariance matrix
439 n = dataset.size
440 gain = (1 / n) * (
441 ((1 + khat) ** 2) / ((1 + 2 * khat) * (1 + 3 * khat) * (1 + 4 * khat))
442 )
444 mat11 = 2 * (alfahat**2) * (1 + 6 * khat + 12 * khat**2)
445 mat12 = alfahat * (1 + 2 * khat) * (1 + 4 * khat + 12 * khat**2)
446 mat21 = mat12
447 mat22 = ((1 + 2 * khat) ** 2) * (1 + khat + 6 * khat**2)
449 gpd_covar = gain * np.array([[mat11, mat12], [mat21, mat22]])
451 return shape_gamma, scale_beta, gpd_covar
454def genpareto_PWM_est(dataset):
455 """Finds estimates for GPD shape and scale using the PWM method.
457 Important: The covariance matrix calculated below is only valid if the
458 shape parameter gamma is less than 0.5
460 Notes
461 -----
462 This method is taken from
463 :cite:`Hosking1987_ParameterEstimationforGeneralizedParetoDistribution`
465 Parameters
466 ----------
467 dataset : N x 1 numpy array
468 Contains positive error data above threshold
470 Returns
471 -------
472 shape_gamma : float
473 Shape parameter as defined by :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems`
474 scale_beta : float
475 Scale parameter as defined by :cite:`Larson2018_GaussianParetoOverboundingAMethodForManagingRiskInSafetyCriticalNavigationSystems`
476 gpd_covar : 2 x 2 numpy array
477 Contains the estimated asymptotic covariance matrix for shape, scale
478 given in Hosking and Wallis
479 """
480 n = dataset.size
481 ordered_list = np.sort(dataset)
483 gam0bar = (1 / n) * np.sum(dataset)
485 summate = 0
486 for ii in range(n):
487 temp = ((n - (ii + 1)) / (n - 1)) * ordered_list[ii]
488 summate = summate + temp
490 gam1bar = (1 / n) * summate
492 scale_beta = (2 * gam0bar * gam1bar) / (gam0bar - 2 * gam1bar)
493 shape_gamma = -(gam0bar / (gam0bar - 2 * gam1bar) - 2)
495 # Calculate estimated asymptotic covariance matrix
496 k = -shape_gamma
497 a = scale_beta
499 gain = (1 / n) * (1 / ((1 + 2 * k) * (3 + 2 * k)))
501 mat11 = (a**2) * (7 + 18 * k + 11 * k**2 + 2 * k**3)
502 mat12 = a * (2 + k) * (2 + 6 * k + 7 * k**2 + 2 * k**3)
503 mat21 = mat12
504 mat22 = (1 + k) * ((2 + k) ** 2) * (1 + k + 2 * k**2)
506 gpd_covar = gain * np.array([[mat11, mat12], [mat21, mat22]])
508 return shape_gamma, scale_beta, gpd_covar