Coverage for src/serums/models.py: 24%

972 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-11 16:43 +0000

1"""Defines various distribution models.""" 

2from __future__ import annotations 

3import numpy as np 

4import numpy.random as rnd 

5import scipy.stats as stats 

6from warnings import warn 

7import matplotlib.pyplot as plt 

8from scipy.stats import norm, halfnorm, genpareto 

9from copy import deepcopy 

10import probscale 

11 

12import serums.enums as enums 

13 

14 

15class BaseSingleModel: 

16 """Generic base class for distribution models. 

17 

18 This defines the required functions and provides their recommended function 

19 signature for inherited classes. It also defines base attributes for the 

20 distribution. 

21 

22 Attributes 

23 ---------- 

24 location : N x 1 numpy array 

25 location parameter of the distribution 

26 scale : N x N numpy array 

27 scale parameter of the distribution 

28 """ 

29 

30 def __init__(self, loc=None, scale=None, monte_carlo_size=int(1e4)): 

31 super().__init__() 

32 if isinstance(loc, np.ndarray): 

33 self.location = loc 

34 else: 

35 self.location = np.array([[loc]]) 

36 

37 if isinstance(scale, np.ndarray): 

38 self.scale = scale 

39 else: 

40 self.scale = np.array([[scale]]) 

41 

42 self.monte_carlo_size = monte_carlo_size 

43 

44 def sample(self, rng: rnd._generator = None, num_samples: int = None) -> np.ndarray: 

45 """Draw a sample from the distribution. 

46 

47 This should be implemented by the child class. 

48 

49 Parameters 

50 ---------- 

51 rng : numpy random generator, optional 

52 random number generator to use. The default is None. 

53 

54 Returns 

55 ------- 

56 None. 

57 """ 

58 warn("sample not implemented by class {}".format(type(self).__name__)) 

59 

60 def pdf(self, x): 

61 """Calculate the PDF value at the given point. 

62 

63 This should be implemented by the child class. 

64 

65 Parameters 

66 ---------- 

67 x : N x 1 numpy array 

68 Point to evaluate the PDF. 

69 

70 Returns 

71 ------- 

72 float 

73 PDF value. 

74 """ 

75 warn("pdf not implemented by class {}".format(type(self).__name__)) 

76 return np.nan 

77 

78 def __str__(self): 

79 msg = "Location = " 

80 dim = self.mean.size 

81 for ii in range(dim): 

82 if ii != 0: 

83 msg += "{:11s}".format("") 

84 

85 if ii == 0 and dim != 1: 

86 fmt = "\u2308{:.4e}\u2309\tScale = \u2308" 

87 fmt += "{:.4e}, " * (dim - 1) + "{:.4e}" + "\u2309" 

88 elif ii == (dim - 1) and dim != 1: 

89 fmt = ( 

90 "\u230A{:.4e}\u230B\t" 

91 + "{:8s}\u230A".format("") 

92 + "{:.4e}, " * (dim - 1) 

93 + "{:.4e}" 

94 + "\u230B" 

95 ) 

96 else: 

97 fmt = "|{:.4e}|\t" 

98 if dim == 1: 

99 fmt += "Scale = |" 

100 else: 

101 fmt += "{:8s}|".format("") 

102 fmt += "{:.4e}, " * (dim - 1) + "{:.4e}" + "|" 

103 msg += ( 

104 fmt.format(self.mean.ravel()[ii], *self.covariance[ii, :].tolist()) 

105 + "\n" 

106 ) 

107 return msg 

108 

109 def __sub__(self, other: BaseSingleModel | float | int) -> np.ndarray: 

110 if isinstance(other, BaseSingleModel): 

111 if self.location.size != other.location.size: 

112 raise RuntimeError( 

113 "Can not subtract distributions of different shapes ({:d} vs {:d})".format( 

114 self.location.size, other.location.size 

115 ) 

116 ) 

117 n_samps = np.max([self.monte_carlo_size, other.monte_carlo_size]).astype( 

118 int 

119 ) 

120 return self.sample(num_samples=n_samps) - other.sample(num_samples=n_samps) 

121 else: 

122 return self.sample(num_samples=self.monte_carlo_size) - other 

123 

124 def __neg__(self) -> np.ndarray: 

125 """Should only be used to redefine order of operations for a subtraction operation (i.e. -g1 + g2 vs g2 - g1)""" 

126 return -self.sample(num_samples=int(self.monte_carlo_size)) 

127 

128 def __add__(self, other: BaseSingleModel | float | int) -> np.ndarray: 

129 if isinstance(other, BaseSingleModel): 

130 if self.location.size != other.location.size: 

131 raise RuntimeError( 

132 "Can not add distributions of different shapes ({:d} vs {:d})".format( 

133 self.location.size, other.location.size 

134 ) 

135 ) 

136 n_samps = np.max([self.monte_carlo_size, other.monte_carlo_size]).astype( 

137 int 

138 ) 

139 return self.sample(num_samples=n_samps) + other.sample(num_samples=n_samps) 

140 else: 

141 return self.sample(num_samples=self.monte_carlo_size) + other 

142 

143 # define right multiplication to be the same as normal multiplication (allow scalar * distribution) 

144 def __rmul__(self, other: BaseSingleModel | float | int) -> np.ndarray: 

145 return self.__mul__(other) 

146 

147 def __mul__(self, other: BaseSingleModel | float | int) -> np.ndarray: 

148 if isinstance(other, BaseSingleModel): 

149 if self.location.size != other.location.size: 

150 raise RuntimeError( 

151 "Can not multiply distributions of different shapes ({:d} vs {:d})".format( 

152 self.location.size, other.location.size 

153 ) 

154 ) 

155 n_samps = np.max([self.monte_carlo_size, other.monte_carlo_size]).astype( 

156 int 

157 ) 

158 return self.sample(num_samples=n_samps) * other.sample(num_samples=n_samps) 

159 else: 

160 return self.sample(num_samples=self.monte_carlo_size) * other 

161 

162 def __truediv__(self, other: BaseSingleModel | float | int) -> np.ndarray: 

163 if isinstance(other, BaseSingleModel): 

164 if self.location.size != other.location.size: 

165 raise RuntimeError( 

166 "Can not divide distributions of different shapes ({:d} vs {:d})".format( 

167 self.location.size, other.location.size 

168 ) 

169 ) 

170 n_samps = np.max([self.monte_carlo_size, other.monte_carlo_size]).astype( 

171 int 

172 ) 

173 return self.sample(num_samples=n_samps) / other.sample(num_samples=n_samps) 

174 else: 

175 return self.sample(num_samples=self.monte_carlo_size) / other 

176 

177 def __rtruediv__(self, other: float | int) -> np.ndarray: 

178 return other / self.sample(num_samples=self.monte_carlo_size) 

179 

180 def __floordiv__(self, other: BaseSingleModel | float | int) -> np.ndarray: 

181 if isinstance(other, BaseSingleModel): 

182 if self.location.size != other.location.size: 

183 raise RuntimeError( 

184 "Can not floor divide distributions of different shapes ({:d} vs {:d})".format( 

185 self.location.size, other.location.size 

186 ) 

187 ) 

188 n_samps = np.max([self.monte_carlo_size, other.monte_carlo_size]).astype( 

189 int 

190 ) 

191 return self.sample(num_samples=n_samps) // other.sample(num_samples=n_samps) 

192 else: 

193 return self.sample(num_samples=self.monte_carlo_size) // other 

194 

195 def __rfloordiv__(self, other: float | int) -> np.ndarray: 

196 return other // self.sample(num_samples=self.monte_carlo_size) 

197 

198 def __pow__(self, power: int) -> np.ndarray: 

199 return self.sample(num_samples=int(self.monte_carlo_size)) ** power 

200 

201 

202class Gaussian(BaseSingleModel): 

203 """Represents a Gaussian distribution object.""" 

204 

205 def __init__(self, mean=None, covariance=None): 

206 """Initialize an object. 

207 

208 Parameters 

209 ---------- 

210 mean : N x 1 numpy array, optional 

211 Mean of the distribution. The default is None. 

212 covariance : N x N numpy array, optional 

213 Covariance of the distribution. The default is None. 

214 

215 Returns 

216 ------- 

217 None. 

218 """ 

219 super().__init__(loc=mean, scale=covariance) 

220 if covariance is not None: 

221 try: 

222 self.stdev = np.linalg.cholesky(covariance) 

223 except np.linalg.LinAlgError: 

224 self.stdev = None 

225 else: 

226 self.stdev = None 

227 

228 def __str__(self): 

229 msg = "Mean = " 

230 dim = self.mean.size 

231 for ii in range(dim): 

232 if ii != 0: 

233 msg += "{:7s}".format("") 

234 

235 if ii == 0 and dim != 1: 

236 fmt = "\u2308{:.4e}\u2309\tCovariance = \u2308" 

237 fmt += "{:.4e}, " * (dim - 1) + "{:.4e}" + "\u2309" 

238 elif ii == (dim - 1) and dim != 1: 

239 fmt = ( 

240 "\u230A{:.4e}\u230B\t" 

241 + "{:13s}\u230A".format("") 

242 + "{:.4e}, " * (dim - 1) 

243 + "{:.4e}" 

244 + "\u230B" 

245 ) 

246 else: 

247 fmt = "|{:.4e}|\t" 

248 if dim == 1: 

249 fmt += "Covariance = |" 

250 else: 

251 fmt += "{:13s}|".format("") 

252 fmt += "{:.4e}, " * (dim - 1) + "{:.4e}" + "|" 

253 msg += ( 

254 fmt.format(self.mean.ravel()[ii], *self.covariance[ii, :].tolist()) 

255 + "\n" 

256 ) 

257 return msg 

258 

259 @property 

260 def mean(self): 

261 """Mean of the distribution. 

262 

263 Returns 

264 ------- 

265 N x 1 numpy array. 

266 """ 

267 return self.location 

268 

269 @mean.setter 

270 def mean(self, val): 

271 self.location = val 

272 

273 @property 

274 def covariance(self): 

275 """Covariance of the distribution. 

276 

277 Returns 

278 ------- 

279 N x N numpy array. 

280 """ 

281 return self.scale 

282 

283 @covariance.setter 

284 def covariance(self, val): 

285 self.scale = val 

286 try: 

287 self.stdev = np.linalg.cholesky(val) 

288 except np.linalg.LinAlgError: 

289 self.stdev = None 

290 

291 def sample(self, rng=None, num_samples=None): 

292 """Draw a sample from the current mixture model. 

293 

294 Parameters 

295 ---------- 

296 rng : numpy random generator, optional 

297 Random number generator to use. If none is given then the numpy 

298 default is used. The default is None. 

299 

300 Returns 

301 ------- 

302 numpy array 

303 randomly sampled numpy array of the same shape as the mean. 

304 """ 

305 if rng is None: 

306 rng = rnd.default_rng() 

307 if num_samples is None: 

308 num_samples = 1 

309 return rng.multivariate_normal( 

310 self.mean.flatten(), self.covariance, size=int(num_samples) 

311 ) 

312 

313 def pdf(self, x): 

314 """Multi-variate probability density function for this distribution. 

315 

316 Returns 

317 ------- 

318 float 

319 PDF value of the state `x`. 

320 """ 

321 rv = stats.multivariate_normal 

322 return rv.pdf(x.flatten(), mean=self.mean.flatten(), cov=self.covariance) 

323 

324 def CI(self, alfa): 

325 """Return confidence interval of distribution given a significance level 'alfa'. 

326 

327 Parameters 

328 ---------- 

329 alfa : float 

330 significance level, i.e. confidence level = (1 - alfa). Must be 

331 a positive real number which is less than 1 

332 

333 Returns 

334 ------- 

335 1 x 2 numpy array 

336 Numpy array containing the upper and lower bound of the computed 

337 confidence interval. 

338 """ 

339 p = alfa / 2 

340 low = stats.norm.ppf(p, loc=self.mean, scale=np.sqrt(self.scale)) 

341 high = stats.norm.ppf(1 - p, loc=self.mean, scale=np.sqrt(self.scale)) 

342 return np.array([[low[0, 0], high[0, 0]]]) 

343 

344 def CDFplot(self, data): 

345 """Plot the overbound and DKW bound(s) against ECDF of input data. 

346 

347 Parameters 

348 ---------- 

349 data : N numpy array 

350 Contains the error sample data for which the overbound was computed. 

351 

352 Returns 

353 ------- 

354 matplotlib line plot 

355 Shows empirical cumulative distribution function of input error 

356 data, the associated DKW bound(s), and the computed overbound in the 

357 CDF domain. 

358 """ 

359 n = data.size 

360 ordered_abs_data = np.sort(np.abs(data)) 

361 ecdf_ords = np.zeros(n) 

362 for i in range(n): 

363 ecdf_ords[i] = (i + 1) / n 

364 

365 X_ECDF = ordered_abs_data 

366 X_OB = ordered_abs_data 

367 Y_ECDF = ecdf_ords 

368 Y_OB = halfnorm.cdf(X_OB, loc=0, scale=np.sqrt(self.scale[0, 0])) 

369 

370 confidence = 0.95 

371 alfa = 1 - confidence 

372 epsilon = np.sqrt(np.log(2 / alfa) / (2 * n)) 

373 plt.figure("Symmetric Gaussian Overbound Plot in Half-Gaussian CDF Domain") 

374 plt.plot(X_ECDF, Y_ECDF, label="Original ECDF") 

375 plt.plot(X_ECDF, np.add(Y_ECDF, epsilon), label="DKW Upper Band") 

376 plt.plot(X_ECDF, np.subtract(Y_ECDF, epsilon), label="DKW Lower Band") 

377 plt.plot(X_OB, Y_OB, label="Overbound CDF") 

378 plt.xlim(np.array([0, 1.1 * ordered_abs_data[-1]])) 

379 plt.ylim(np.array([0, 1])) 

380 plt.legend() 

381 plt.grid() 

382 plt.title("Symmetric Gaussian Overbound Plot in CDF Domain") 

383 plt.ylabel("Accumulated Probability") 

384 plt.xlabel("Error Magnitude") 

385 

386 def probscaleplot(self, data): 

387 """Generate probability plot of the ECDF, overbound, and DKW bound(s). 

388 

389 Parameters 

390 ---------- 

391 data : N numpy array 

392 numpy array containing the error data used to calculate the 

393 overbound 

394 

395 Returns 

396 ------- 

397 matplotlib line plot 

398 Shows empirical cumulative distribution function of input error 

399 data, the associated DKW bound(s), and the computed overbound 

400 in the CDF domain where the probability axis is represented with 

401 percentiles and is scaled such that a Gaussian CDF is linear. 

402 """ 

403 sorted_abs_data = np.sort(np.abs(data)) 

404 n = data.size 

405 ecdf_ords = np.zeros(n) 

406 for i in range(n): 

407 ecdf_ords[i] = (i + 1) / n 

408 x_ecdf = sorted_abs_data 

409 y_ecdf = ecdf_ords 

410 

411 confidence = 0.95 

412 alfa = 1 - confidence 

413 epsilon = np.sqrt(np.log(2 / alfa) / (2 * n)) 

414 

415 x_dkw = x_ecdf 

416 y_dkw = np.subtract(y_ecdf, epsilon) 

417 

418 x_ob = sorted_abs_data 

419 y_ob = halfnorm.cdf(x_ob, loc=0, scale=np.sqrt(self.scale[0, 0])) 

420 dist_type = halfnorm() 

421 

422 figure, ax = plt.subplots() 

423 ax.set_ylim(bottom=5, top=99.999) 

424 ax.set_yscale("prob", dist=dist_type) 

425 

426 plt.plot(x_ecdf, 100 * y_ecdf) 

427 plt.plot(x_dkw, 100 * y_dkw) 

428 plt.plot(x_ob, 100 * y_ob) 

429 plt.legend(["ECDF", "DKW Lower Bound", "Symmetric Gaussian Overbound"]) 

430 plt.title("Probability Plot of Symmetric Gaussian Overbound") 

431 plt.ylabel("CDF Percentiles") 

432 plt.xlabel("Error Magnitude") 

433 plt.grid() 

434 

435 

436class PairedGaussian(BaseSingleModel): 

437 """Represents a Paired Gaussian Overbound Distribution Object.""" 

438 

439 def __init__(self, left: Gaussian, right: Gaussian): 

440 """Initialize an object. 

441 

442 Parameters 

443 ---------- 

444 left : :class:'serums.models.Gaussian' 

445 Gaussian model storing the parameters of the left component of the 

446 paired overbound. 

447 right : :class:'serums.models.Gaussian' 

448 Gaussian model storing the parameters of the right component of the 

449 paired overbound. 

450 

451 Returns 

452 ------- 

453 None. 

454 """ 

455 super().__init__() 

456 

457 self.left_gaussian = deepcopy(left) 

458 self.right_gaussian = deepcopy(right) 

459 

460 def sample(self, rng: rnd._generator = None, num_samples: int = None) -> np.ndarray: 

461 """Generate a random sample from the distribution model. 

462 

463 Parameters 

464 ---------- 

465 num_samples : int 

466 Specify the size of the sample. 

467 

468 Returns 

469 ------- 

470 N numpy array 

471 Numpy array containing a random sample of the specified size from 

472 the distribution. 

473 """ 

474 if rng is None: 

475 rng = rnd.default_rng() 

476 if num_samples is None: 

477 num_samples = 1 

478 

479 p = rng.uniform(size=num_samples) 

480 rcount = int(np.sum(p[p >= 0.5])) 

481 lcount = int(num_samples) - rcount 

482 

483 samp = np.nan * np.ones((num_samples, self.right_gaussian.mean.size)) 

484 if rcount > 0: 

485 rsamp = self.right_gaussian.sample(rng=rng, num_samples=int(rcount)) 

486 samp[0:rcount] = ( 

487 np.abs(rsamp - self.right_gaussian.mean) + self.right_gaussian.mean 

488 ) 

489 

490 if lcount > 0: 

491 lsamp = self.left_gaussian.sample(rng=rng, num_samples=int(lcount)) 

492 samp[rcount:] = ( 

493 -np.abs(lsamp - self.left_gaussian.mean) + self.left_gaussian.mean 

494 ) 

495 

496 return samp 

497 

498 def CI(self, alfa): 

499 """Return confidence interval of distribution given a significance level 'alfa'. 

500 

501 Parameters 

502 ---------- 

503 alfa : float 

504 significance level, i.e. confidence level = (1 - alfa). Must be 

505 a positive real number which is less than 1 

506 

507 Returns 

508 ------- 

509 1 x 2 numpy array 

510 Numpy array containing the upper and lower bound of the computed 

511 confidence interval. 

512 """ 

513 e = alfa / 2 

514 left = norm.ppf( 

515 e, 

516 loc=self.left_gaussian.location, 

517 scale=np.sqrt(self.left_gaussian.scale), 

518 ) 

519 right = norm.ppf( 

520 1 - e, 

521 loc=self.right_gaussian.location, 

522 scale=np.sqrt(self.right_gaussian.scale), 

523 ) 

524 return np.array([[left, right]]) 

525 

526 def CDFplot(self, data): 

527 """Plot the overbound and DKW bound(s) against ECDF of input data. 

528 

529 Parameters 

530 ---------- 

531 data : N numpy array 

532 Contains the error sample data for which the overbound was computed. 

533 

534 Returns 

535 ------- 

536 matplotlib line plot 

537 Shows empirical cumulative distribution function of input error 

538 data, the associated DKW bound(s), and the computed overbound in the 

539 CDF domain. 

540 """ 

541 data = np.sort(data) 

542 n = data.size 

543 ecdf_ords = np.zeros(n) 

544 for i in range(n): 

545 ecdf_ords[i] = (i + 1) / n 

546 

547 confidence = 0.95 

548 alfa = 1 - confidence 

549 epsilon = np.sqrt(np.log(2 / alfa) / (2 * n)) 

550 DKW_low = np.subtract(ecdf_ords, epsilon) 

551 DKW_high = np.add(ecdf_ords, epsilon) 

552 

553 left_mean = self.left_gaussian.mean 

554 left_std = np.sqrt(self.left_gaussian.covariance) 

555 right_mean = self.right_gaussian.mean 

556 right_std = np.sqrt(self.right_gaussian.covariance) 

557 

558 y_left_ob = np.reshape(norm.cdf(data, loc=left_mean, scale=left_std), (n,)) 

559 y_right_ob = np.reshape(norm.cdf(data, loc=right_mean, scale=right_std), (n,)) 

560 x_paired_ob = np.linspace(np.min(data) - 1, np.max(data) + 1, num=10000) 

561 y_paired_ob = np.zeros(x_paired_ob.size) 

562 left_pt = self.left_gaussian.mean 

563 right_pt = self.right_gaussian.mean 

564 

565 for i in range(y_paired_ob.size): 

566 if x_paired_ob[i] < left_pt: 

567 y_paired_ob[i] = norm.cdf(x_paired_ob[i], loc=left_mean, scale=left_std) 

568 elif x_paired_ob[i] > right_pt: 

569 y_paired_ob[i] = norm.cdf( 

570 x_paired_ob[i], loc=right_mean, scale=right_std 

571 ) 

572 else: 

573 y_paired_ob[i] = 0.5 

574 

575 plt.figure("Paired Overbound in CDF Domain") 

576 plt.plot(data, y_left_ob, label="Left OB", linestyle="--") 

577 plt.plot(data, y_right_ob, label="Right OB", linestyle="--") 

578 plt.plot(x_paired_ob, y_paired_ob, label="Paired OB") 

579 plt.plot(data, ecdf_ords, label="ECDF") 

580 plt.plot(data, DKW_high, label="Upper DKW Bound") 

581 plt.plot(data, DKW_low, label="Lower DKW Bound") 

582 plt.legend() 

583 plt.grid() 

584 plt.title("Paired Gaussian Overbound Plot in CDF Domain") 

585 plt.ylabel("Accumulated Probability") 

586 plt.xlabel("Error") 

587 

588 def probscaleplot(self, data): 

589 """Generate probability plot of the ECDF, overbound, and DKW bound(s). 

590 

591 Parameters 

592 ---------- 

593 data : N numpy array 

594 numpy array containing the error data used to calculate the 

595 overbound 

596 

597 Returns 

598 ------- 

599 matplotlib line plot 

600 Shows empirical cumulative distribution function of input error 

601 data, the associated DKW bound(s), and the computed overbound 

602 in the CDF domain where the probability axis is represented with 

603 percentiles and is scaled such that a Gaussian CDF is linear. 

604 """ 

605 n = data.size 

606 ecdf_ords = np.zeros(n) 

607 for i in range(n): 

608 ecdf_ords[i] = (i + 1) / n 

609 

610 confidence = 0.95 

611 alfa = 1 - confidence 

612 epsilon = np.sqrt(np.log(2 / alfa) / (2 * n)) 

613 DKW_low = np.subtract(ecdf_ords, epsilon) 

614 DKW_high = np.add(ecdf_ords, epsilon) 

615 

616 left_mean = self.left_gaussian.mean 

617 left_std = np.sqrt(self.left_gaussian.covariance) 

618 right_mean = self.right_gaussian.mean 

619 right_std = np.sqrt(self.right_gaussian.covariance) 

620 

621 y_left_ob = np.reshape(norm.cdf(data, loc=left_mean, scale=left_std), (n,)) 

622 y_right_ob = np.reshape(norm.cdf(data, loc=right_mean, scale=right_std), (n,)) 

623 x_paired_ob = np.linspace(np.min(data), np.max(data), num=10000) 

624 y_paired_ob = np.zeros(x_paired_ob.size) 

625 left_pt = self.left_gaussian.mean 

626 right_pt = self.right_gaussian.mean 

627 

628 for i in range(y_paired_ob.size): 

629 if x_paired_ob[i] < left_pt: 

630 y_paired_ob[i] = norm.cdf(x_paired_ob[i], loc=left_mean, scale=left_std) 

631 elif x_paired_ob[i] > right_pt: 

632 y_paired_ob[i] = norm.cdf( 

633 x_paired_ob[i], loc=right_mean, scale=right_std 

634 ) 

635 else: 

636 y_paired_ob[i] = 0.5 

637 

638 dist_type = norm() 

639 

640 x_ecdf = np.sort(data) 

641 y_ecdf = ecdf_ords 

642 

643 x_dkw_low = x_ecdf 

644 y_dkw_low = DKW_low 

645 

646 x_dkw_high = x_ecdf 

647 y_dkw_high = DKW_high 

648 

649 figure, ax = plt.subplots() 

650 ax.set_ylim(bottom=0.001, top=99.999) 

651 ax.set_yscale("prob", dist=dist_type) 

652 

653 plt.plot(x_ecdf, 100 * y_ecdf) 

654 plt.plot(x_dkw_low, 100 * y_dkw_low) 

655 plt.plot(x_dkw_high, 100 * y_dkw_high) 

656 plt.plot(x_paired_ob, 100 * y_paired_ob) 

657 plt.legend( 

658 [ 

659 "ECDF", 

660 "DKW Lower Bound", 

661 "DKW Upper Bound", 

662 "Symmetric Gaussian Overbound", 

663 ] 

664 ) 

665 plt.title("Probability Plot of Paired Gaussian Overbound") 

666 plt.ylabel("CDF Percentiles") 

667 plt.xlabel("Error") 

668 plt.grid() 

669 

670 

671class StudentsT(BaseSingleModel): 

672 """Represents a Student's t-distribution.""" 

673 

674 def __init__(self, mean=None, scale=None, dof=None): 

675 super().__init__(loc=mean, scale=scale) 

676 self._dof = dof 

677 

678 @property 

679 def mean(self): 

680 """Mean of the distribution. 

681 

682 Returns 

683 ------- 

684 N x 1 numpy array. 

685 """ 

686 return self.location 

687 

688 @mean.setter 

689 def mean(self, val): 

690 self.location = val 

691 

692 @property 

693 def degrees_of_freedom(self): 

694 """Degrees of freedom of the distribution, must be greater than 0.""" 

695 return self._dof 

696 

697 @degrees_of_freedom.setter 

698 def degrees_of_freedom(self, value): 

699 self._dof = value 

700 

701 @property 

702 def covariance(self): 

703 """Read only covariance of the distribution (if defined). 

704 

705 Returns 

706 ------- 

707 N x N numpy array. 

708 """ 

709 if self._dof <= 2: 

710 msg = "Degrees of freedom is {} and must be > 2" 

711 raise RuntimeError(msg.format(self._dof)) 

712 return self._dof / (self._dof - 2) * self.scale 

713 

714 @covariance.setter 

715 def covariance(self, val): 

716 warn("Covariance is read only.") 

717 

718 def pdf(self, x): 

719 """Multi-variate probability density function for this distribution. 

720 

721 Parameters 

722 ---------- 

723 x : N x 1 numpy array 

724 Value to evaluate the pdf at. 

725 

726 Returns 

727 ------- 

728 float 

729 PDF value of the state `x`. 

730 """ 

731 rv = stats.multivariate_t 

732 return rv.pdf( 

733 x.flatten(), 

734 loc=self.location.flatten(), 

735 shape=self.scale, 

736 df=self.degrees_of_freedom, 

737 ) 

738 

739 def sample(self, rng=None, num_samples=None): 

740 """Multi-variate probability density function for this distribution. 

741 

742 Parameters 

743 ---------- 

744 rng : numpy random generator, optional 

745 Random number generator to use. If none is given then the numpy 

746 default is used. The default is None. 

747 

748 Returns 

749 ------- 

750 float 

751 PDF value of the state `x`. 

752 """ 

753 if rng is None: 

754 rng = rnd.default_rng() 

755 if num_samples is None: 

756 num_samples = 1 

757 

758 rv = stats.multivariate_t 

759 rv.random_state = rng 

760 x = rv.rvs( 

761 loc=self.location.flatten(), 

762 shape=self.scale, 

763 df=self.degrees_of_freedom, 

764 size=int(num_samples), 

765 ) 

766 

767 return x.reshape((x.size, 1)) 

768 

769 

770class ChiSquared(BaseSingleModel): 

771 """Represents a Chi Squared distribution.""" 

772 

773 def __init__(self, mean=None, scale=None, dof=None): 

774 super().__init__(loc=mean, scale=scale) 

775 self._dof = dof 

776 

777 @property 

778 def mean(self): 

779 """Mean of the distribution. 

780 

781 Returns 

782 ------- 

783 N x 1 numpy array. 

784 """ 

785 return self.location 

786 

787 @mean.setter 

788 def mean(self, val): 

789 self.location = val 

790 

791 @property 

792 def degrees_of_freedom(self): 

793 """Degrees of freedom of the distribution, must be greater than 0.""" 

794 return self._dof 

795 

796 @degrees_of_freedom.setter 

797 def degrees_of_freedom(self, value): 

798 self._dof = value 

799 

800 @property 

801 def covariance(self): 

802 """Read only covariance of the distribution (if defined). 

803 

804 Returns 

805 ------- 

806 N x N numpy array. 

807 """ 

808 if self._dof < 0: 

809 msg = "Degrees of freedom is {} and must be > 0" 

810 raise RuntimeError(msg.format(self._dof)) 

811 return (self._dof * 2) * (self.scale ** 2) 

812 

813 @covariance.setter 

814 def covariance(self, val): 

815 warn("Covariance is read only.") 

816 

817 def pdf(self, x): 

818 """Multi-variate probability density function for this distribution. 

819 

820 Parameters 

821 ---------- 

822 x : N x 1 numpy array 

823 Value to evaluate the pdf at. 

824 

825 Returns 

826 ------- 

827 float 

828 PDF value of the state `x`. 

829 """ 

830 rv = stats.chi2 

831 return rv.pdf( 

832 x.flatten(), 

833 self._dof, 

834 loc=self.location.flatten(), 

835 shape=self.scale, 

836 ) 

837 

838 def sample(self, rng=None, num_samples=None): 

839 """Multi-variate probability density function for this distribution. 

840 

841 Parameters 

842 ---------- 

843 rng : numpy random generator, optional 

844 Random number generator to use. If none is given then the numpy 

845 default is used. The default is None. 

846 

847 Returns 

848 ------- 

849 float 

850 PDF value of the state `x`. 

851 """ 

852 if rng is None: 

853 rng = rnd.default_rng() 

854 if num_samples is None: 

855 num_samples = 1 

856 

857 rv = stats.chi2 

858 rv.random_state = rng 

859 x = rv.rvs( 

860 self._dof, 

861 loc=self.location.flatten(), 

862 scale=self.scale, 

863 size=int(num_samples), 

864 ) 

865 if num_samples == 1: 

866 return x.reshape((-1, 1)) 

867 else: 

868 return x 

869 

870 

871class Cauchy(StudentsT): 

872 """Represents a Cauchy distribution. 

873 

874 This is a special case of the Student's t-distribution with the degrees of 

875 freedom fixed at 1. However, the mean and covariance do not exist for this 

876 distribution. 

877 """ 

878 

879 def __init__(self, location=None, scale=None): 

880 super().__init__(scale=scale, dof=1) 

881 self.location = location 

882 

883 @property 

884 def mean(self): 

885 """Mean of the distribution.""" 

886 warn("Mean does not exist for a Cauchy") 

887 

888 @mean.setter 

889 def mean(self, val): 

890 warn("Mean does not exist for a Cauchy") 

891 

892 @property 

893 def degrees_of_freedom(self): 

894 """Degrees of freedom of the distribution, fixed at 1.""" 

895 return super().degrees_of_freedom 

896 

897 @degrees_of_freedom.setter 

898 def degrees_of_freedom(self, value): 

899 warn("Degrees of freedom is 1 for a Cauchy") 

900 

901 @property 

902 def covariance(self): 

903 """Read only covariance of the distribution (if defined).""" 

904 warn("Covariance is does not exist.") 

905 

906 @covariance.setter 

907 def covariance(self, val): 

908 warn("Covariance is does not exist.") 

909 

910 

911class GaussianScaleMixture(BaseSingleModel): 

912 r"""Helper class for defining Gaussian Scale Mixture objects. 

913 

914 Note 

915 ---- 

916 This is an alternative method for representing heavy-tailed distributions 

917 by modeling them as a combination of a standard Gaussian, :math:`v`, and 

918 another positive random variable known as the generating variate, :math:`z` 

919 

920 .. math:: 

921 x \overset{d}{=} \sqrt{z} v 

922 

923 where :math:`\overset{d}{=}` means equal in distribution and :math:`x` 

924 follows a GSM distribution (in general, a heavy tailed distribution). 

925 This formulation is based on 

926 :cite:`VilaValls2012_NonlinearBayesianFilteringintheGaussianScaleMixtureContext`, 

927 :cite:`Wainwright1999_ScaleMixturesofGaussiansandtheStatisticsofNaturalImages`, and 

928 :cite:`Kuruoglu1998_ApproximationofAStableProbabilityDensitiesUsingFiniteGaussianMixtures`. 

929 

930 Attributes 

931 ---------- 

932 type : :class:`serums.enums.GSMTypes` 

933 Type of the distribution to represent as a GSM. 

934 location_range : tuple 

935 Minimum and maximum values for the location parameter. Useful if being 

936 fed to a filter for estimating the location parameter. Each element must 

937 match the type of the :attr:`BaseSingleModel.location` attribute. 

938 scale_range : tuple 

939 Minimum and maximum values for the scale parameter. Useful if being 

940 fed to a filter for estimating the scale parameter. Each element must 

941 match the type of the :attr:`BaseSingleModel.scale` attribute. The default is None. 

942 df_range : tuple 

943 Minimum and maximum values for the degree of freedom parameter. 

944 Useful if being fed to a filter for estimating the degree of freedom 

945 parameter. Each element must be a float. The default is None. 

946 """ 

947 

948 __df_types = (enums.GSMTypes.STUDENTS_T, enums.GSMTypes.CAUCHY) 

949 

950 def __init__( 

951 self, 

952 gsm_type, 

953 location=None, 

954 location_range=None, 

955 scale=None, 

956 scale_range=None, 

957 degrees_of_freedom=None, 

958 df_range=None, 

959 ): 

960 """Initialize a GSM Object. 

961 

962 Parameters 

963 ---------- 

964 gsm_type : :class:`serums.enums.GSMTypes` 

965 Type of the distribution to represent as a GSM. 

966 location : N x 1 numpy array, optional 

967 location parameter of the distribution. The default is None. 

968 location_range : tuple, optional 

969 Minimum and maximum values for the location parameter. Useful if being 

970 fed to a filter for estimating the location parameter. Each element must 

971 match the type of the :attr:`BaseSingleModel.location` attribute. The default is None 

972 scale : N x N numpy array, optional 

973 Scale parameter of the distribution being represented as a GSM. 

974 The default is None. 

975 scale_range : tuple, optional 

976 Minimum and maximum values for the scale parameter. Useful if being 

977 fed to a filter for estimating the scale parameter. Each element must 

978 match the type of the :attr:`BaseSingleModel.scale` attribute. The default is None. 

979 degrees_of_freedom : float, optional 

980 Degrees of freedom parameter of the distribution being represented 

981 as a GSM. This is not needed by all types. The default is None. 

982 df_range : tuple, optional 

983 Minimum and maximum values for the degree of freedom parameter. 

984 Useful if being fed to a filter for estimating the degree of freedom 

985 parameter. Each element must be a float. The default is None. 

986 

987 Raises 

988 ------ 

989 RuntimeError 

990 If a `gsm_type` is given that is of the incorrect data type. 

991 """ 

992 super().__init__(loc=location, scale=scale) 

993 

994 if not isinstance(gsm_type, enums.GSMTypes): 

995 raise RuntimeError("Type ({}) must be a GSMType".format(gsm_type)) 

996 

997 self.type = gsm_type 

998 

999 self._df = None 

1000 

1001 self.location_range = location_range 

1002 self.scale_range = scale_range 

1003 self.df_range = df_range 

1004 

1005 if degrees_of_freedom is not None: 

1006 self.degrees_of_freedom = degrees_of_freedom 

1007 

1008 if self.type is enums.GSMTypes.CAUCHY: 

1009 self._df = 1 

1010 

1011 @property 

1012 def degrees_of_freedom(self): 

1013 """Degrees of freedom parameter of the distribution being represented as a GSM. 

1014 

1015 Returns 

1016 ------- 

1017 float, optional 

1018 """ 

1019 if self.type in self.__df_types: 

1020 return self._df 

1021 else: 

1022 msg = "GSM type {:s} does not have a degree of freedom.".format(self.type) 

1023 warn(msg) 

1024 return None 

1025 

1026 @degrees_of_freedom.setter 

1027 def degrees_of_freedom(self, val): 

1028 if self.type in self.__df_types: 

1029 if self.type is enums.GSMTypes.CAUCHY: 

1030 warn("GSM type {:s} requires degree of freedom = 1".format(self.type)) 

1031 return 

1032 self._df = val 

1033 else: 

1034 msg = ( 

1035 "GSM type {:s} does not have a degree of freedom. " + "Skipping" 

1036 ).format(self.type) 

1037 warn(msg) 

1038 

1039 def sample(self, rng=None, num_samples=None): 

1040 """Draw a sample from the specified GSM type. 

1041 

1042 Parameters 

1043 ---------- 

1044 rng : numpy random generator, optional 

1045 Random number generator to use. If none is given then the numpy 

1046 default is used. The default is None. 

1047 

1048 Returns 

1049 ------- 

1050 float 

1051 randomly sampled value from the GSM. 

1052 """ 

1053 if rng is None: 

1054 rng = rnd.default_rng() 

1055 if num_samples is None: 

1056 num_samples = 1 

1057 

1058 if self.type in [enums.GSMTypes.STUDENTS_T, enums.GSMTypes.CAUCHY]: 

1059 return self._sample_student_t(rng, int(num_samples)) 

1060 

1061 elif self.type is enums.GSMTypes.SYMMETRIC_A_STABLE: 

1062 return self._sample_SaS(rng, int(num_samples)) 

1063 

1064 else: 

1065 raise RuntimeError("GSM type: {} is not supported".format(self.type)) 

1066 

1067 def _sample_student_t(self, rng, num_samples=None): 

1068 return stats.t.rvs( 

1069 self.degrees_of_freedom, 

1070 scale=np.diag(self.scale), 

1071 random_state=rng, 

1072 size=num_samples, 

1073 ) 

1074 

1075 def _sample_SaS(self, rng, num_samples): 

1076 raise RuntimeError("sampling SaS distribution not implemented") 

1077 

1078 

1079class GeneralizedPareto(BaseSingleModel): 

1080 """Represents a Generalized Pareto distribution (GPD).""" 

1081 

1082 def __init__(self, location=None, scale=None, shape=None): 

1083 """Initialize an object. 

1084 

1085 Parameters 

1086 ---------- 

1087 location : N x 1 numpy array, optional 

1088 location of the distribution. The default is None. 

1089 scale : N x 1 numpy array, optional 

1090 scale of the distribution. The default is None. 

1091 shape : N x 1 numpy array, optional 

1092 shape of the distribution. The default is None. 

1093 

1094 Returns 

1095 ------- 

1096 None. 

1097 """ 

1098 super().__init__(loc=location, scale=scale) 

1099 self.shape = shape 

1100 

1101 def sample(self, rng=None, num_samples=None): 

1102 """Draw a sample from the current mixture model. 

1103 

1104 Parameters 

1105 ---------- 

1106 rng : numpy random generator, optional 

1107 Random number generator to use. If none is given then the numpy 

1108 default is used. The default is None. 

1109 

1110 Returns 

1111 ------- 

1112 numpy array 

1113 randomly sampled numpy array of the same shape as the mean. 

1114 """ 

1115 if rng is None: 

1116 rng = rnd.default_rng() 

1117 if num_samples is None: 

1118 num_samples = 1 

1119 

1120 rv = stats.genpareto 

1121 rv.random_state = rng 

1122 x = ( 

1123 self.scale * rv.rvs(self.shape, size=int(num_samples)) 

1124 ) + self.location.ravel() 

1125 if num_samples == 1: 

1126 return x.reshape((-1, 1)) 

1127 else: 

1128 return x 

1129 

1130 def pdf(self, x): 

1131 """Multi-variate probability density function for this distribution. 

1132 

1133 Returns 

1134 ------- 

1135 float 

1136 PDF value of the state `x`. 

1137 """ 

1138 rv = stats.genpareto 

1139 return ( 

1140 rv.pdf( 

1141 (x.flatten() - self.location) / self.scale, 

1142 shape=self.shape.flatten(), 

1143 ) 

1144 / self.scale 

1145 ) 

1146 

1147 

1148class Bernoulli(BaseSingleModel): 

1149 """Represents a Bernoulli distribution object""" 

1150 def __init__(self, prob=None, density=None, **kwargs): 

1151 """Initialize an object. 

1152 

1153 Parameters 

1154 ---------- 

1155 prob : float, optional 

1156 Existence probability of the distribution. The default is None. 

1157 density : smodels.BaseSingleModel, optional 

1158 Probability density of the distribution. The default is None. If None, sample will return 1 or 0. 

1159 

1160 Returns 

1161 ------- 

1162 None. 

1163 """ 

1164 super().__init__(**kwargs) 

1165 self.prob = prob 

1166 self.density = density 

1167 

1168 def sample(self, rng=None, num_samples=1): 

1169 """Draw a sample from the current model. 

1170 

1171 Parameters 

1172 ---------- 

1173 rng : numpy random generator, optional 

1174 Random number generator to use. If none is given then the numpy 

1175 default is used. The default is None. 

1176 

1177 Returns 

1178 ------- 

1179 numpy array 

1180 randomly sampled numpy array of the same shape as the mean. 

1181 """ 

1182 if num_samples != 1: 

1183 num_samples = 1 

1184 warn("Bernoulli distribution only accepts a single sample.") 

1185 if rng is None: 

1186 rng = rnd.default_rng() 

1187 if self.density is None: 

1188 return rng.binomial(num_samples, self.prob) 

1189 else: 

1190 return rng.binomial(num_samples, self.prob) * self.density.sample(rng, 1) 

1191 

1192 

1193class BaseMixtureModelIterator: 

1194 """Iterator for :class:`serums.models.BaseMixutreModel`. 

1195 

1196 Attributes 

1197 ---------- 

1198 weights : list 

1199 Each element is a float for the weight of a distribution. 

1200 dists : list 

1201 Each element is a :class:`serums.models.BaseSingleModel`. 

1202 idx : int 

1203 Current index for the iterator. 

1204 """ 

1205 

1206 def __init__(self, weights, dists): 

1207 """Initialize an object. 

1208 

1209 Parameters 

1210 ---------- 

1211 weights : list 

1212 Each element is a float for the weight of a distribution. 

1213 dists : list 

1214 Each element is a :class:`serums.models.BaseSingleModel`. 

1215 """ 

1216 self.weights = weights 

1217 self.dists = dists 

1218 self.idx = 0 

1219 

1220 def __iter__(self): 

1221 """Returns the iterator object.""" 

1222 return self 

1223 

1224 def __next__(self): 

1225 """Get the next element in the iterator. 

1226 

1227 Raises 

1228 ------ 

1229 StopIteration 

1230 End of the iterator is reached. 

1231 

1232 Returns 

1233 ------- 

1234 float 

1235 weight of the distribution. 

1236 :class:`serums.models.BaseSingleModel` 

1237 distribution object. 

1238 """ 

1239 self.idx += 1 

1240 try: 

1241 return self.weights[self.idx - 1], self.dists[self.idx - 1] 

1242 except IndexError: 

1243 self.idx = 0 

1244 raise StopIteration # Done iterating. 

1245 

1246 

1247class BaseMixtureModel: 

1248 """Generic base class for mixture distribution models. 

1249 

1250 This defines the required functions and provides their recommended function 

1251 signature for inherited classes. It also defines base attributes for the 

1252 mixture model. 

1253 

1254 Attributes 

1255 ---------- 

1256 weights : list 

1257 weight of each distribution 

1258 """ 

1259 

1260 def __init__(self, distributions=None, weights=None, monte_carlo_size=int(1e4)): 

1261 """Initialize a mixture model object. 

1262 

1263 Parameters 

1264 ---------- 

1265 distributions : list, optional 

1266 Each element is a :class:`.BaseSingleModel`. The default is None. 

1267 weights : list, optional 

1268 Weight of each distribution. The default is None. 

1269 

1270 Returns 

1271 ------- 

1272 None. 

1273 

1274 """ 

1275 if distributions is None: 

1276 distributions = [] 

1277 if weights is None: 

1278 weights = [] 

1279 

1280 self._distributions = distributions 

1281 self.weights = weights 

1282 self.monte_carlo_size = monte_carlo_size 

1283 

1284 def __iter__(self): 

1285 """Allow iterating over mixture objects by (weight, distribution).""" 

1286 return BaseMixtureModelIterator(self.weights, self._distributions) 

1287 

1288 def __len__(self): 

1289 """Give the number of distributions in the mixture.""" 

1290 return len(self._distributions) 

1291 

1292 def sample(self, rng=None, num_samples=None): 

1293 """Draw a sample from the current mixture model. 

1294 

1295 Parameters 

1296 ---------- 

1297 rng : numpy random generator, optional 

1298 Random number generator to use. If none is given then the numpy 

1299 default is used. The default is None. 

1300 

1301 Returns 

1302 ------- 

1303 numpy array 

1304 randomly sampled numpy array of the same shape as the mean. 

1305 """ 

1306 if rng is None: 

1307 rng = rnd.default_rng() 

1308 if num_samples is None: 

1309 num_samples = 1 

1310 weights = np.array(self.weights, dtype=float) 

1311 weights /= np.sum(weights).astype(float) 

1312 if num_samples == 1: 

1313 mix_ind = rng.choice(np.arange(len(self), dtype=int), p=weights) 

1314 x = self._distributions[mix_ind].sample(rng=rng) 

1315 return x.reshape((x.size, 1)) 

1316 else: 

1317 x = np.nan * np.ones( 

1318 (int(num_samples), self._distributions[0].location.size()) 

1319 ) 

1320 for ii in range(int(num_samples)): 

1321 mix_ind = rng.choice(np.arange(len(self), dtype=int), p=weights) 

1322 x[ii, :] = self._distributions[mix_ind].sample(rng=rng).ravel() 

1323 return x 

1324 

1325 def pdf(self, x): 

1326 """Multi-variate probability density function for this mixture. 

1327 

1328 Returns 

1329 ------- 

1330 float 

1331 PDF value of the state `x`. 

1332 """ 

1333 p = 0 

1334 for w, dist in self: 

1335 p += w * dist.pdf(x) 

1336 

1337 return p 

1338 

1339 def remove_components(self, indices): 

1340 """Remove component distributions from the mixture by index. 

1341 

1342 Parameters 

1343 ---------- 

1344 indices : list 

1345 indices of distributions to remove. 

1346 

1347 Returns 

1348 ------- 

1349 None. 

1350 """ 

1351 if not isinstance(indices, list): 

1352 indices = list(indices) 

1353 

1354 for index in sorted(indices, reverse=True): 

1355 del self._distributions[index] 

1356 del self.weights[index] 

1357 

1358 def add_components(self, *args): 

1359 """Add a component distribution to the mixture. 

1360 

1361 This should be implemented by the child class. 

1362 

1363 Parameters 

1364 ---------- 

1365 *args : tuple 

1366 Additional arguments specific to the child distribution. 

1367 

1368 Returns 

1369 ------- 

1370 None. 

1371 """ 

1372 warn("add_component not implemented by {}".format(type(self).__name__)) 

1373 

1374 

1375class _DistListWrapper(list): 

1376 """Helper class for wrapping lists of BaseSingleModel to get a list of a single parameter.""" 

1377 

1378 def __init__(self, dist_lst, attr): 

1379 """Give list of distributions and the attribute to access.""" 

1380 self.dist_lst = dist_lst 

1381 self.attr = attr 

1382 

1383 def __getitem__(self, index): 

1384 """Get the attribute of the item at the index in the list.""" 

1385 if isinstance(index, slice): 

1386 step = 1 

1387 if index.step is not None: 

1388 step = index.step 

1389 return [ 

1390 getattr(self.dist_lst[ii], self.attr) 

1391 for ii in range(index.start, index.stop, step) 

1392 ] 

1393 elif isinstance(index, int): 

1394 return getattr(self.dist_lst[index], self.attr) 

1395 

1396 else: 

1397 fmt = "Index must be a integer or slice not {}" 

1398 raise RuntimeError(fmt.format(type(index))) 

1399 

1400 def __setitem__(self, index, val): 

1401 """Set the attribute of the item at the index to the value.""" 

1402 if isinstance(index, slice): 

1403 step = 1 

1404 if index.step is not None: 

1405 step = index.step 

1406 for ii in range(index.start, index.stop, step): 

1407 setattr(self.dist_lst[ii], self.attr, val) 

1408 

1409 elif isinstance(index, int): 

1410 setattr(self.dist_lst[index], self.attr, val) 

1411 

1412 else: 

1413 fmt = "Index must be a integer or slice not {}" 

1414 raise RuntimeError(fmt.format(type(index))) 

1415 

1416 def __iter__(self): 

1417 self.n = 0 

1418 return self 

1419 

1420 def __next__(self): 

1421 if self.n < len(self.dist_lst): 

1422 self.n += 1 

1423 return getattr(self.dist_lst[self.n - 1], self.attr) 

1424 else: 

1425 raise StopIteration 

1426 

1427 def __repr__(self): 

1428 return str([getattr(d, self.attr) for d in self.dist_lst]) 

1429 

1430 def __len__(self): 

1431 return len(self.dist_lst) 

1432 

1433 def append(self, *args): 

1434 raise RuntimeError("Cannot append, use add_component function instead.") 

1435 

1436 def extend(self, *args): 

1437 raise RuntimeError("Cannot extend, use add_component function instead.") 

1438 

1439 

1440class GaussianMixture(BaseMixtureModel): 

1441 """Gaussian Mixture object.""" 

1442 

1443 def __init__(self, means=None, covariances=None, **kwargs): 

1444 """Initialize an object. 

1445 

1446 Parameters 

1447 ---------- 

1448 means : list, optional 

1449 Each element is a N x 1 numpy array. Will be used in place of supplied 

1450 distributions but requires covariances to also be given. The default is None. 

1451 covariances : list, optional 

1452 Each element is an N x N numpy array. Will be used in place of 

1453 supplied distributions but requires means to be given. The default is None. 

1454 **kwargs : dict, optional 

1455 See the base class for details. 

1456 

1457 Returns 

1458 ------- 

1459 None. 

1460 """ 

1461 if means is not None and covariances is not None: 

1462 kwargs["distributions"] = [ 

1463 Gaussian(mean=m, covariance=c) for m, c in zip(means, covariances) 

1464 ] 

1465 super().__init__(**kwargs) 

1466 

1467 @property 

1468 def means(self): 

1469 """List of Gaussian means, each is a N x 1 numpy array. Recommended to be read only.""" 

1470 return _DistListWrapper(self._distributions, "location") 

1471 

1472 @means.setter 

1473 def means(self, val): 

1474 if not isinstance(val, list): 

1475 warn("Must set means to a list") 

1476 return 

1477 

1478 if len(val) != len(self._distributions): 

1479 self.weights = [1 / len(val) for ii in range(len(val))] 

1480 self._distributions = [Gaussian() for ii in range(len(val))] 

1481 for ii, v in enumerate(val): 

1482 self._distributions[ii].mean = v 

1483 

1484 @property 

1485 def covariances(self): 

1486 """List of Gaussian covariances, each is a N x N numpy array. Recommended to be read only.""" 

1487 return _DistListWrapper(self._distributions, "scale") 

1488 

1489 @covariances.setter 

1490 def covariances(self, val): 

1491 if not isinstance(val, list): 

1492 warn("Must set covariances to a list") 

1493 return 

1494 

1495 if len(val) != len(self._distributions): 

1496 self.weights = [1 / len(val) for ii in range(len(val))] 

1497 self._distributions = [Gaussian() for ii in range(len(val))] 

1498 

1499 for ii, v in enumerate(val): 

1500 self._distributions[ii].covariance = v 

1501 

1502 def add_components(self, means, covariances, weights): 

1503 """Add Gaussian distributions to the mixture. 

1504 

1505 Parameters 

1506 ---------- 

1507 means : list 

1508 Each is a N x 1 numpy array of the mean of the distributions to add. 

1509 covariances : list 

1510 Each is a N x N numpy array of the covariance of the distributions 

1511 to add. 

1512 weights : list 

1513 Each is a float for the weight of the distributions to add. No 

1514 normalization is done. 

1515 

1516 Returns 

1517 ------- 

1518 None. 

1519 """ 

1520 if not isinstance(means, list): 

1521 means = [ 

1522 means, 

1523 ] 

1524 if not isinstance(covariances, list): 

1525 covariances = [ 

1526 covariances, 

1527 ] 

1528 if not isinstance(weights, list): 

1529 weights = [ 

1530 weights, 

1531 ] 

1532 

1533 self._distributions.extend( 

1534 [Gaussian(mean=m, covariance=c) for m, c in zip(means, covariances)] 

1535 ) 

1536 self.weights.extend(weights) 

1537 

1538 

1539class StudentsTMixture(BaseMixtureModel): 

1540 """Students T mixture object.""" 

1541 

1542 def __init__(self, means=None, scalings=None, dof=None, **kwargs): 

1543 if means is not None and scalings is not None and dof is not None: 

1544 if isinstance(dof, list): 

1545 dists = [ 

1546 StudentsT(mean=m, scale=s, dof=df) 

1547 for m, s, df in zip(means, scalings, dof) 

1548 ] 

1549 else: 

1550 dists = [ 

1551 StudentsT(mean=m, scale=s, dof=dof) for m, s in zip(means, scalings) 

1552 ] 

1553 kwargs["distributions"] = dists 

1554 super().__init__(**kwargs) 

1555 

1556 @property 

1557 def means(self): 

1558 """List of Gaussian means, each is a N x 1 numpy array. Recommended to be read only.""" 

1559 return _DistListWrapper(self._distributions, "location") 

1560 

1561 @means.setter 

1562 def means(self, val): 

1563 if not isinstance(val, list): 

1564 warn("Must set means to a list") 

1565 return 

1566 

1567 if len(val) != len(self._distributions): 

1568 self.weights = [1 / len(val) for ii in range(len(val))] 

1569 self._distributions = [StudentsT() for ii in range(len(val))] 

1570 

1571 for ii, v in enumerate(val): 

1572 self._distributions[ii].mean = v 

1573 

1574 @property 

1575 def covariances(self): 

1576 """Read only list of covariances, each is a N x N numpy array.""" 

1577 return _DistListWrapper(self._distributions, "covariance") 

1578 

1579 @property 

1580 def scalings(self): 

1581 """List of scalings, each is a N x N numpy array. Recommended to be read only.""" 

1582 return _DistListWrapper(self._distributions, "scale") 

1583 

1584 @scalings.setter 

1585 def scalings(self, val): 

1586 if not isinstance(val, list): 

1587 warn("Must set scalings to a list") 

1588 return 

1589 

1590 if len(val) != len(self._distributions): 

1591 self.weights = [1 / len(val) for ii in range(len(val))] 

1592 self._distributions = [StudentsT() for ii in range(len(val))] 

1593 

1594 for ii, v in enumerate(val): 

1595 self._distributions[ii].scale = v 

1596 

1597 @property 

1598 def dof(self): 

1599 """Most common degree of freedom for the mixture. Deprecated but kept for compatability, new code should use degrees_of_freedom.""" 

1600 vals, counts = np.unique( 

1601 [d.degrees_of_freedom for d in self._distributions], 

1602 return_counts=True, 

1603 ) 

1604 inds = np.argwhere(counts == np.max(counts)) 

1605 return vals[inds[0]].item() 

1606 

1607 @dof.setter 

1608 def dof(self, val): 

1609 for d in self._distributions: 

1610 d.degrees_of_freedom = val 

1611 

1612 @property 

1613 def degrees_of_freedom(self): 

1614 """List of degrees of freedom, each is a float. Recommended to be read only.""" 

1615 return _DistListWrapper(self._distributions, "degrees_of_freedom") 

1616 

1617 @degrees_of_freedom.setter 

1618 def degrees_of_freedom(self, val): 

1619 if not isinstance(val, list): 

1620 warn("Must set degrees of freedom to a list") 

1621 return 

1622 

1623 if len(val) != len(self._distributions): 

1624 self.weights = [1 / len(val) for ii in range(len(val))] 

1625 self._distributions = [StudentsT() for ii in range(len(val))] 

1626 

1627 for ii, v in enumerate(val): 

1628 self._distributions[ii].degrees_of_freedom = v 

1629 

1630 def add_components(self, means, scalings, dof_lst, weights): 

1631 """Add Student's t-distributions to the mixture. 

1632 

1633 Parameters 

1634 ---------- 

1635 means : list 

1636 Each is a N x 1 numpy array of the mean of the distributions to add. 

1637 scalings : list 

1638 Each is a N x N numpy array of the scale of the distributions 

1639 to add. 

1640 dof_lst : list 

1641 Each is a float representing the degrees of freedom of the distribution 

1642 to add. 

1643 weights : list 

1644 Each is a float for the weight of the distributions to add. No 

1645 normalization is done. 

1646 

1647 Returns 

1648 ------- 

1649 None. 

1650 """ 

1651 if not isinstance(means, list): 

1652 means = [ 

1653 means, 

1654 ] 

1655 if not isinstance(scalings, list): 

1656 scalings = [ 

1657 scalings, 

1658 ] 

1659 if not isinstance(dof_lst, list): 

1660 dof_lst = [ 

1661 dof_lst, 

1662 ] 

1663 if not isinstance(weights, list): 

1664 weights = [ 

1665 weights, 

1666 ] 

1667 

1668 self._distributions.extend( 

1669 [ 

1670 StudentsT(mean=m, scale=s, dof=df) 

1671 for m, s, df in zip(means, scalings, dof_lst) 

1672 ] 

1673 ) 

1674 self.weights.extend(weights) 

1675 

1676 

1677class MultiBernoulliMixture(BaseMixtureModel): 

1678 def __init__(self, probs=None, densities=None, **kwargs): 

1679 if probs is not None and densities is not None: 

1680 kwargs["distributions"] = [ 

1681 Bernoulli(prob=p, density=d) for p, d in zip(probs, densities) 

1682 ] 

1683 super().__init__(**kwargs) 

1684 

1685 @property 

1686 def probs(self): 

1687 """List of Bernoulli existance probabilities, each is a float between 0 and 1. Read only""" 

1688 return _DistListWrapper(self._distributions, "prob") 

1689 

1690 @probs.setter 

1691 def probs(self, val): 

1692 if not isinstance(val, list): 

1693 warn("Must set probs to a list") 

1694 return 

1695 if len(val) != len(self._distributions): 

1696 self.weights = [1 / len(val) for ii in range(len(val))] 

1697 self._distributions = [Bernoulli() for ii in range(len(val))] 

1698 for ii, v in enumerate(val): 

1699 self._distributions[ii].prob = v 

1700 

1701 @property 

1702 def densities(self): 

1703 """Returns a serums.BaseSingleModel object representing the density of the bernoulli distribution.""" 

1704 return _DistListWrapper(self._distributions, "density") 

1705 

1706 @densities.setter 

1707 def densities(self, val): 

1708 if not isinstance(val, list): 

1709 warn("Must set densities to a list") 

1710 return 

1711 if len(val) != len(self._distributions): 

1712 self.weights = [1 / len(val) for ii in range(len(val))] 

1713 self._distributions = [Bernoulli() for ii in range(len(val))] 

1714 for ii, v in enumerate(val): 

1715 self._distributions[ii].density = v 

1716 

1717 def add_components(self, probs, densities, weights): 

1718 """Add Bernoulli distributions to the mixture. 

1719 

1720 Parameters 

1721 ---------- 

1722 probs : list 

1723 Each is a float between 0 and 1 representing the existance probability of the Bernoulli 

1724 densities : list 

1725 Each is a smodels.BaseSingleModel representing the density of the Bernoulli distribution 

1726 to add. 

1727 weights : list 

1728 Each is a float for the weight of the distributions to add. No 

1729 normalization is done. 

1730 

1731 Returns 

1732 ------- 

1733 None. 

1734 """ 

1735 if not isinstance(probs, list): 

1736 probs = [probs, ] 

1737 if not isinstance(densities, list): 

1738 densities = [densities, ] 

1739 if not isinstance(weights, list): 

1740 weights = [weights, ] 

1741 

1742 dists = [ 

1743 Bernoulli(p, d) for p, d in zip(probs, densities) 

1744 ] 

1745 self._distributions.extend(dists) 

1746 self.weights.extend(weights) 

1747 

1748 

1749class SymmetricGaussianPareto(BaseSingleModel): 

1750 """Represents a Symmetric Gaussian-Pareto Mixture Distribution object. 

1751 

1752 Attributes 

1753 ---------- 

1754 location : float 

1755 Location (mean) of the distribution. By definition, it is always 0. 

1756 scale : float 

1757 Standard deviation of the core Gaussian region. The default is None. 

1758 threshold : float 

1759 Location where the core Gaussian region meets the Pareto tail. The default is None. 

1760 tail_shape : float 

1761 GPD shape parameter of the distribution's tail. The default is None 

1762 tail_scale : float 

1763 GPD scale parameter of the distribution's tail. The default is None 

1764 

1765 """ 

1766 

1767 def __init__( 

1768 self, 

1769 location=0, 

1770 scale=None, 

1771 threshold=None, 

1772 tail_shape=None, 

1773 tail_scale=None, 

1774 ): 

1775 """Initialize an object. 

1776 

1777 Parameters 

1778 ---------- 

1779 location : float 

1780 Location (mean) of the distribution. By definition, it is always 0. 

1781 scale : float, optional 

1782 Sets the class attribute "scale". 

1783 threshold : float, optional 

1784 Sets the class attribute "threshold". 

1785 tail_shape : float, optional 

1786 Sets the class attribute "tail_shape". 

1787 tail_scale : float, optional 

1788 Sets the class attribute "tail_scale". 

1789 

1790 Returns 

1791 ------- 

1792 None. 

1793 """ 

1794 super().__init__(loc=np.zeros((1, 1)), scale=scale) 

1795 self.threshold = threshold 

1796 self.tail_shape = tail_shape 

1797 self.tail_scale = tail_scale 

1798 

1799 def sample(self, rng: rnd._generator = None, num_samples: int = None) -> np.ndarray: 

1800 """Generate a random sample from the distribution model. 

1801 

1802 Parameters 

1803 ---------- 

1804 num_samples : int 

1805 Specify the size of the sample. 

1806 

1807 Returns 

1808 ------- 

1809 N numpy array 

1810 Numpy array containing a random sample of the specified size from 

1811 the distribution. 

1812 """ 

1813 if rng is None: 

1814 rng = rnd.default_rng() 

1815 if num_samples is None: 

1816 num_samples = 1 

1817 

1818 p = rng.uniform(size=num_samples) 

1819 p_sorted = np.sort(p) 

1820 F_mu = norm.cdf(-self.threshold, loc=self.location, scale=self.scale) 

1821 F_u = norm.cdf(self.threshold, loc=self.location, scale=self.scale) 

1822 

1823 idx_mu = int(np.argmax(p_sorted >= F_mu)) 

1824 idx_u = int(np.argmax(p_sorted >= F_u)) 

1825 

1826 sample = np.zeros((num_samples, self.location.size)) 

1827 transformed_left = np.add(np.negative(p_sorted[0:idx_mu]), 1) 

1828 transformed_left = np.subtract(transformed_left, F_u) 

1829 transformed_left = np.divide(transformed_left, (1 - F_u)) 

1830 left_samp = genpareto.ppf( 

1831 transformed_left, self.tail_shape, loc=0, scale=self.tail_scale 

1832 ) 

1833 left_samp = np.subtract(np.negative(left_samp), self.threshold) 

1834 

1835 transformed_right = np.subtract(p_sorted[idx_u:], F_u) 

1836 transformed_right = np.divide(transformed_right, (1 - F_u)) 

1837 right_samp = genpareto.ppf( 

1838 transformed_right, self.tail_shape, loc=0, scale=self.tail_scale 

1839 ) 

1840 right_samp = np.add(right_samp, self.threshold) 

1841 

1842 center_samp = norm.ppf( 

1843 p_sorted[idx_mu:idx_u], loc=self.location, scale=self.scale 

1844 ) 

1845 

1846 sample[0:idx_mu] = np.transpose(left_samp) 

1847 sample[idx_mu:idx_u] = np.transpose(center_samp) 

1848 sample[idx_u:] = np.transpose(right_samp) 

1849 

1850 return sample 

1851 

1852 def CI(self, alfa): 

1853 """Return confidence interval of distribution given a significance level 'alfa'. 

1854 

1855 Parameters 

1856 ---------- 

1857 alfa : float 

1858 significance level, i.e. confidence level = (1 - alfa). Must be 

1859 a positive real number which is less than 1 

1860 

1861 Returns 

1862 ------- 

1863 1 x 2 numpy array 

1864 Numpy array containing the upper and lower bound of the computed 

1865 confidence interval. 

1866 """ 

1867 q_u = halfnorm.cdf(self.threshold, loc=self.location, scale=self.scale) 

1868 q_x = 1 - alfa 

1869 if q_x <= q_u: 

1870 value = halfnorm.ppf(q_x, loc=self.location, scale=self.scale) 

1871 else: 

1872 temp = (q_x - q_u) / (1 - q_u) 

1873 value = self.threshold + genpareto.ppf( 

1874 temp, self.tail_shape, loc=0, scale=self.tail_scale 

1875 ) 

1876 return np.array([[-value, value]]) 

1877 

1878 def CDFplot(self, data): 

1879 """Plot the overbound and DKW bound(s) against ECDF of input data. 

1880 

1881 Parameters 

1882 ---------- 

1883 data : N numpy array 

1884 Contains the error sample data for which the overbound was computed. 

1885 

1886 Returns 

1887 ------- 

1888 matplotlib line plot 

1889 Shows empirical cumulative distribution function of input error 

1890 data, the associated DKW bound(s), and the computed overbound in the 

1891 CDF domain. 

1892 """ 

1893 pos = np.absolute(data) 

1894 sorted_abs_data = np.sort(pos) 

1895 

1896 # Plot data ECDF, DKW lower bound, and Symmetric GPO in CDF domain 

1897 

1898 n = data.size 

1899 # confidence = 1 - 1e-6 

1900 confidence = 0.95 

1901 alfa = 1 - confidence 

1902 epsilon = np.sqrt(np.log(2 / alfa) / (2 * n)) 

1903 

1904 x_core = np.linspace(0, self.threshold, 10000) 

1905 x_tail = np.linspace(self.threshold, 1.2 * max(sorted_abs_data), 10000) 

1906 

1907 y_core = halfnorm.cdf(x_core, loc=self.location, scale=self.scale) 

1908 y_tail = genpareto.cdf( 

1909 x_tail - self.threshold, 

1910 self.tail_shape, 

1911 loc=self.location, 

1912 scale=self.tail_scale, 

1913 ) * ( 

1914 1 - (halfnorm.cdf(self.threshold, loc=self.location, scale=self.scale)) 

1915 ) + ( 

1916 halfnorm.cdf(self.threshold, loc=self.location, scale=self.scale) 

1917 ) 

1918 

1919 x = np.append(x_core, x_tail) 

1920 y = np.append(y_core, y_tail) 

1921 

1922 ecdf_ords = np.zeros(n) 

1923 

1924 for i in range(n): 

1925 ecdf_ords[i] = (i + 1) / n 

1926 

1927 DKW_lower_ords = np.subtract(ecdf_ords, epsilon) 

1928 

1929 plt.figure("Symmetric GPO Plot in Half-Gaussian CDF Domain") 

1930 plt.plot(sorted_abs_data, ecdf_ords, label="ECDF") 

1931 plt.plot(sorted_abs_data, DKW_lower_ords, label="DKW Lower Bound") 

1932 plt.plot(x, y, label="Symmetric GPO") 

1933 

1934 plt.xlim([0, 1.2 * max(sorted_abs_data)]) 

1935 plt.legend() 

1936 plt.grid() 

1937 plt.title("Symmetric GPO Plot in CDF Domain") 

1938 plt.ylabel("Accumulated Probability") 

1939 plt.xlabel("Error Magnitude") 

1940 

1941 def probscaleplot(self, data): 

1942 """Generate probability plot of the ECDF, overbound, and DKW bound(s). 

1943 

1944 Parameters 

1945 ---------- 

1946 data : N numpy array 

1947 numpy array containing the error data used to calculate the 

1948 overbound 

1949 

1950 Returns 

1951 ------- 

1952 matplotlib line plot 

1953 Shows empirical cumulative distribution function of input error 

1954 data, the associated DKW bound(s), and the computed overbound 

1955 in the CDF domain where the probability axis is represented with 

1956 percentiles and is scaled such that a Gaussian CDF is linear. 

1957 """ 

1958 pos = np.absolute(data) 

1959 sorted_abs_data = np.sort(pos) 

1960 n = data.size 

1961 confidence = 0.95 

1962 alfa = 1 - confidence 

1963 epsilon = np.sqrt(np.log(2 / alfa) / (2 * n)) 

1964 

1965 x_core = np.linspace(0, self.threshold, 10000) 

1966 x_tail = np.linspace(self.threshold, max(sorted_abs_data), 10000) 

1967 

1968 y_core = halfnorm.cdf(x_core, loc=self.location, scale=self.scale) 

1969 y_tail = genpareto.cdf( 

1970 x_tail - self.threshold, 

1971 self.tail_shape, 

1972 loc=self.location, 

1973 scale=self.tail_scale, 

1974 ) * ( 

1975 1 - (halfnorm.cdf(self.threshold, loc=self.location, scale=self.scale)) 

1976 ) + ( 

1977 halfnorm.cdf(self.threshold, loc=self.location, scale=self.scale) 

1978 ) 

1979 x_ob = np.append(x_core, x_tail) 

1980 y_ob = np.append(y_core, y_tail) 

1981 dist_type = halfnorm() 

1982 

1983 ecdf_ords = np.zeros(n) 

1984 for i in range(n): 

1985 ecdf_ords[i] = (i + 1) / n 

1986 DKW_lower_ords = np.subtract(ecdf_ords, epsilon) 

1987 

1988 x_ecdf = sorted_abs_data 

1989 y_ecdf = ecdf_ords 

1990 

1991 x_dkw = x_ecdf 

1992 y_dkw = DKW_lower_ords 

1993 

1994 figure, ax = plt.subplots() 

1995 ax.set_ylim(bottom=5, top=99.999) 

1996 ax.set_yscale("prob", dist=dist_type) 

1997 

1998 plt.plot(x_ecdf, 100 * y_ecdf) 

1999 plt.plot(x_dkw, 100 * y_dkw) 

2000 plt.plot(x_ob, 100 * y_ob) 

2001 plt.legend(["ECDF", "DKW Lower Bound", "Symmetric Gaussian-Pareto Overbound"]) 

2002 plt.title("Probability Plot of Symmetric Gaussian-Pareto Overbound") 

2003 plt.ylabel("CDF Percentiles") 

2004 plt.xlabel("Error Magnitude") 

2005 plt.grid() 

2006 

2007 

2008class PairedGaussianPareto(BaseSingleModel): 

2009 """Represents a Paired Gaussian-Pareto Mixture Distribution object. 

2010 

2011 Attributes 

2012 ---------- 

2013 left_tail_shape : float 

2014 GPD shape parameter (gamma) of the left tail. The default is None. 

2015 left_tail_scale : float 

2016 GPD scale parameter (beta) of the left tail. The default is None. 

2017 left_threshold : float 

2018 Location where the left tail meets the left Gaussian core region. The default is None. 

2019 left_mean : float 

2020 Mean of the left Gaussian core region. The default is None. 

2021 left_sigma : float 

2022 Standard deviation of the left Gaussian core region. The default is None. 

2023 right_tail_shape : float 

2024 GPD shape parameter (gamma) of the right tail. The default is None. 

2025 right_tail_scale : float 

2026 GPD scale parameter (beta) of the right tail. The default is None. 

2027 right_threshold : float 

2028 Location where the right tail meets the right Gaussian core region. The default is None. 

2029 right_mean : float 

2030 Mean of the right Gaussian core region. The default is None. 

2031 right_sigma : float 

2032 Standard deviation of the right Gaussian core region. The default is None. 

2033 """ 

2034 

2035 def __init__( 

2036 self, 

2037 location=None, 

2038 scale=None, 

2039 left_tail_shape=None, 

2040 left_tail_scale=None, 

2041 left_threshold=None, 

2042 left_mean=None, 

2043 left_sigma=None, 

2044 right_tail_shape=None, 

2045 right_tail_scale=None, 

2046 right_threshold=None, 

2047 right_mean=None, 

2048 right_sigma=None, 

2049 ): 

2050 """Initialize an object. 

2051 

2052 Parameters 

2053 ---------- 

2054 left_tail_shape : float 

2055 Sets the class attribute of the same name 

2056 left_tail_scale : float 

2057 Sets the class attribute of the same name 

2058 left_threshold : float 

2059 Sets the class attribute of the same name 

2060 left_mean : float 

2061 Sets the class attribute of the same name 

2062 left_sigma : float 

2063 Sets the class attribute of the same name 

2064 right_tail_shape : float 

2065 Sets the class attribute of the same name 

2066 right_tail_scale : float 

2067 Sets the class attribute of the same name 

2068 right_threshold : float 

2069 Sets the class attribute of the same name 

2070 right_mean : float 

2071 Sets the class attribute of the same name 

2072 right_sigma : float 

2073 Sets the class attribute of the same name 

2074 

2075 Returns 

2076 ------- 

2077 None. 

2078 """ 

2079 super().__init__(loc=np.zeros((1, 1)), scale=scale) 

2080 self.left_tail_shape = left_tail_shape 

2081 self.left_tail_scale = left_tail_scale 

2082 self.left_threshold = left_threshold 

2083 self.left_mean = left_mean 

2084 self.left_sigma = left_sigma 

2085 self.right_tail_shape = right_tail_shape 

2086 self.right_tail_scale = right_tail_scale 

2087 self.right_threshold = right_threshold 

2088 self.right_mean = right_mean 

2089 self.right_sigma = right_sigma 

2090 

2091 def sample(self, rng: rnd._generator = None, num_samples: int = None) -> np.ndarray: 

2092 """Generate a random sample from the distribution model. 

2093 

2094 Parameters 

2095 ---------- 

2096 num_samples : int 

2097 Specify the size of the sample. 

2098 

2099 Returns 

2100 ------- 

2101 N numpy array 

2102 Numpy array containing a random sample of the specified size from 

2103 the distribution. 

2104 """ 

2105 if rng is None: 

2106 rng = rnd.default_rng() 

2107 if num_samples is None: 

2108 num_samples = 1 

2109 

2110 p = rng.uniform(size=num_samples) 

2111 p_sorted = np.sort(p) 

2112 

2113 FuL = norm.cdf(self.left_threshold, loc=self.left_mean, scale=self.left_sigma) 

2114 FuR = norm.cdf( 

2115 self.right_threshold, loc=self.right_mean, scale=self.right_sigma 

2116 ) 

2117 

2118 lt_ords = p_sorted[p_sorted < FuL] 

2119 lc_ords = p_sorted[p_sorted >= FuL] 

2120 lc_ords = lc_ords[lc_ords <= 0.5] 

2121 rc_ords = p_sorted[p_sorted > 0.5] 

2122 rc_ords = rc_ords[rc_ords <= FuR] 

2123 rt_ords = p_sorted[p_sorted > FuR] 

2124 

2125 lt_ords = np.divide(np.add(np.negative(lt_ords), FuL), FuL) 

2126 lt_samp = np.transpose( 

2127 genpareto.ppf(lt_ords, self.left_tail_shape, scale=self.left_tail_scale) 

2128 ) 

2129 lt_samp = np.add(np.negative(lt_samp), self.left_threshold) 

2130 

2131 rt_ords = np.divide(np.subtract(rt_ords, FuR), (1 - FuR)) 

2132 rt_samp = np.transpose( 

2133 genpareto.ppf(rt_ords, self.right_tail_shape, scale=self.right_tail_scale) 

2134 ) 

2135 rt_samp = np.add(rt_samp, self.right_threshold) 

2136 

2137 lc_samp = np.transpose( 

2138 norm.ppf(lc_ords, loc=self.left_mean, scale=self.left_sigma) 

2139 ) 

2140 rc_samp = np.transpose( 

2141 norm.ppf(rc_ords, loc=self.right_mean, scale=self.right_sigma) 

2142 ) 

2143 

2144 samp = np.concatenate((lt_samp, lc_samp, rc_samp, rt_samp)) 

2145 

2146 return samp 

2147 

2148 def CI(self, alfa): 

2149 """Return confidence interval of distribution given a significance level 'alfa'. 

2150 

2151 Parameters 

2152 ---------- 

2153 alfa : float 

2154 significance level, i.e. confidence level = (1 - alfa). Must be 

2155 a positive real number which is less than 1 

2156 

2157 Returns 

2158 ------- 

2159 1 x 2 numpy array 

2160 Numpy array containing the upper and lower bound of the computed 

2161 confidence interval. 

2162 """ 

2163 p = alfa / 2 

2164 

2165 FuL = norm.cdf(self.left_threshold, loc=self.left_mean, scale=self.left_sigma) 

2166 FuR = norm.cdf( 

2167 self.right_threshold, loc=self.right_mean, scale=self.right_sigma 

2168 ) 

2169 

2170 if p > FuL: 

2171 left = norm.ppf(p, loc=self.left_mean, scale=self.left_sigma) 

2172 else: 

2173 p_Lt = (FuL - p) / (FuL) 

2174 temp = genpareto.ppf(p_Lt, self.left_tail_shape, scale=self.left_tail_scale) 

2175 left = -temp + self.left_threshold 

2176 

2177 if (1 - p) < FuR: 

2178 right = norm.ppf((1 - p), loc=self.right_mean, scale=self.right_sigma) 

2179 else: 

2180 val = ((1 - p) - FuR) / (1 - FuR) 

2181 temp = genpareto.ppf( 

2182 val, self.right_tail_shape, scale=self.right_tail_scale 

2183 ) 

2184 right = temp + self.right_threshold 

2185 

2186 return np.array([[left, right]]) 

2187 

2188 def CDFplot(self, data): 

2189 """Plot the overbound and DKW bound(s) against ECDF of input data. 

2190 

2191 Parameters 

2192 ---------- 

2193 data : N numpy array 

2194 Contains the error sample data for which the overbound was computed. 

2195 

2196 Returns 

2197 ------- 

2198 matplotlib line plot 

2199 Shows empirical cumulative distribution function of input error 

2200 data, the associated DKW bound(s), and the computed overbound in the 

2201 CDF domain. 

2202 """ 

2203 n = data.size 

2204 data_sorted = np.sort(data) 

2205 

2206 ecdf_ords = np.zeros(n) 

2207 

2208 for i in range(n): 

2209 ecdf_ords[i] = (i + 1) / n 

2210 

2211 confidence = 0.95 

2212 alfa = 1 - confidence 

2213 epsilon = np.sqrt(np.log(2 / alfa) / (2 * n)) 

2214 

2215 dkw_high = np.add(ecdf_ords, epsilon) 

2216 dkw_low = np.subtract(ecdf_ords, epsilon) 

2217 

2218 x_ob_left_tail = np.linspace( 

2219 1.05 * data_sorted[0], self.left_threshold, num=1000 

2220 ) 

2221 sub = np.flip( 

2222 np.negative(np.subtract(x_ob_left_tail, self.left_threshold + 1e-14)) 

2223 ) 

2224 y_ob_left_tail = np.transpose( 

2225 genpareto.cdf( 

2226 sub, 

2227 self.left_tail_shape, 

2228 scale=self.left_tail_scale, 

2229 ) 

2230 ) 

2231 

2232 Fu = norm.cdf(self.left_threshold, loc=self.left_mean, scale=self.left_sigma) 

2233 y_ob_left_tail = np.flip( 

2234 np.add(np.negative(np.multiply(y_ob_left_tail, Fu)), Fu) 

2235 ) 

2236 

2237 x_ob_core = np.linspace(self.left_threshold, self.right_threshold, num=10000) 

2238 y_ob_core = np.zeros(10000) 

2239 

2240 for i in range(10000): 

2241 if x_ob_core[i] >= self.left_threshold and x_ob_core[i] < self.left_mean: 

2242 y_ob_core[i] = norm.cdf( 

2243 x_ob_core[i], loc=self.left_mean, scale=self.left_sigma 

2244 ) 

2245 elif x_ob_core[i] >= self.left_mean and x_ob_core[i] <= self.right_mean: 

2246 y_ob_core[i] = 0.5 

2247 elif ( 

2248 x_ob_core[i] > self.right_mean and x_ob_core[i] <= self.right_threshold 

2249 ): 

2250 y_ob_core[i] = norm.cdf( 

2251 x_ob_core[i], loc=self.right_mean, scale=self.right_sigma 

2252 ) 

2253 

2254 x_ob_right_tail = np.linspace( 

2255 self.right_threshold, 1.05 * data_sorted[-1], num=1000 

2256 ) 

2257 sub = np.subtract(x_ob_right_tail, self.right_threshold - 1e-14) 

2258 y_ob_right_tail = np.transpose( 

2259 genpareto.cdf(sub, self.right_tail_shape, scale=self.right_tail_scale) 

2260 ) 

2261 

2262 Fu = norm.cdf(self.right_threshold, loc=self.right_mean, scale=self.right_sigma) 

2263 y_ob_right_tail = np.add(np.multiply(y_ob_right_tail, (1 - Fu)), Fu) 

2264 

2265 x_ob = np.concatenate((x_ob_left_tail, x_ob_core, x_ob_right_tail)) 

2266 y_ob = np.concatenate((y_ob_left_tail, y_ob_core, y_ob_right_tail)) 

2267 

2268 plt.figure("Paired GPO Plot in CDF Domain") 

2269 plt.plot(data_sorted, ecdf_ords, label="ECDF") 

2270 plt.plot(data_sorted, dkw_high, label="DKW Upper Bound") 

2271 plt.plot(data_sorted, dkw_low, label="DKW Lower Bound") 

2272 plt.plot(x_ob, y_ob, label="Paired Gaussian-Pareto Overbound") 

2273 

2274 left_edge = 1.1 * x_ob_left_tail[0] 

2275 right_edge = 1.1 * x_ob_right_tail[-1] 

2276 plt.xlim([left_edge, right_edge]) 

2277 plt.legend() 

2278 plt.grid() 

2279 plt.title("Paired GPO Plot in CDF Domain") 

2280 plt.ylabel("Accumulated Probability") 

2281 plt.xlabel("Error") 

2282 

2283 def probscaleplot(self, data): 

2284 """Generate probability plot of the ECDF, overbound, and DKW bound(s). 

2285 

2286 Parameters 

2287 ---------- 

2288 data : N numpy array 

2289 numpy array containing the error data used to calculate the 

2290 overbound 

2291 

2292 Returns 

2293 ------- 

2294 matplotlib line plot 

2295 Shows empirical cumulative distribution function of input error 

2296 data, the associated DKW bound(s), and the computed overbound 

2297 in the CDF domain where the probability axis is represented with 

2298 percentiles and is scaled such that a Gaussian CDF is linear. 

2299 """ 

2300 n = data.size 

2301 data_sorted = np.sort(data) 

2302 

2303 ecdf_ords = np.zeros(n) 

2304 

2305 for i in range(n): 

2306 ecdf_ords[i] = (i + 1) / n 

2307 

2308 confidence = 0.95 

2309 alfa = 1 - confidence 

2310 epsilon = np.sqrt(np.log(2 / alfa) / (2 * n)) 

2311 

2312 dkw_high = np.add(ecdf_ords, epsilon) 

2313 dkw_low = np.subtract(ecdf_ords, epsilon) 

2314 

2315 x_ob_left_tail = np.linspace( 

2316 1.05 * data_sorted[0], self.left_threshold, num=1000 

2317 ) 

2318 sub = np.flip( 

2319 np.negative(np.subtract(x_ob_left_tail, self.left_threshold + 1e-14)) 

2320 ) 

2321 y_ob_left_tail = np.transpose( 

2322 genpareto.cdf( 

2323 sub, 

2324 self.left_tail_shape, 

2325 scale=self.left_tail_scale, 

2326 ) 

2327 ) 

2328 

2329 Fu = norm.cdf(self.left_threshold, loc=self.left_mean, scale=self.left_sigma) 

2330 y_ob_left_tail = np.flip( 

2331 np.add(np.negative(np.multiply(y_ob_left_tail, Fu)), Fu) 

2332 ) 

2333 

2334 x_ob_core = np.linspace(self.left_threshold, self.right_threshold, num=10000) 

2335 y_ob_core = np.zeros(10000) 

2336 

2337 for i in range(10000): 

2338 if x_ob_core[i] >= self.left_threshold and x_ob_core[i] < self.left_mean: 

2339 y_ob_core[i] = norm.cdf( 

2340 x_ob_core[i], loc=self.left_mean, scale=self.left_sigma 

2341 ) 

2342 elif x_ob_core[i] >= self.left_mean and x_ob_core[i] <= self.right_mean: 

2343 y_ob_core[i] = 0.5 

2344 elif ( 

2345 x_ob_core[i] > self.right_mean and x_ob_core[i] <= self.right_threshold 

2346 ): 

2347 y_ob_core[i] = norm.cdf( 

2348 x_ob_core[i], loc=self.right_mean, scale=self.right_sigma 

2349 ) 

2350 

2351 x_ob_right_tail = np.linspace( 

2352 self.right_threshold, 1.05 * data_sorted[-1], num=1000 

2353 ) 

2354 sub = np.subtract(x_ob_right_tail, self.right_threshold - 1e-14) 

2355 y_ob_right_tail = np.transpose( 

2356 genpareto.cdf(sub, self.right_tail_shape, scale=self.right_tail_scale) 

2357 ) 

2358 

2359 Fu = norm.cdf(self.right_threshold, loc=self.right_mean, scale=self.right_sigma) 

2360 y_ob_right_tail = np.add(np.multiply(y_ob_right_tail, (1 - Fu)), Fu) 

2361 

2362 x_ob = np.concatenate((x_ob_left_tail, x_ob_core, x_ob_right_tail)) 

2363 y_ob = np.concatenate((y_ob_left_tail, y_ob_core, y_ob_right_tail)) 

2364 

2365 figure, ax = plt.subplots() 

2366 ax.set_ylim(bottom=0.001, top=99.999) 

2367 ax.set_yscale("prob") 

2368 

2369 plt.plot(data_sorted, 100 * ecdf_ords) 

2370 plt.plot(data_sorted, 100 * dkw_high) 

2371 plt.plot(data_sorted, 100 * dkw_low) 

2372 plt.plot(x_ob, 100 * y_ob) 

2373 plt.legend( 

2374 [ 

2375 "ECDF", 

2376 "DKW Upper Bound", 

2377 "DKW Lower Bound", 

2378 "Paired Gaussian-Pareto Overbound", 

2379 ] 

2380 ) 

2381 plt.title("Probability Plot of Paired Gaussian-Pareto Overbound") 

2382 plt.ylabel("CDF Percentiles") 

2383 plt.xlabel("Error") 

2384 plt.grid()