#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2024 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------
__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2024"
__license__ = "GPLv3"
"""
Priors for the SelfiSys pipeline. This module provides:
- a Planck2018-based prior class (`planck_prior`) compatible with
pySelfi, adapted for the logic of the SelfiSys pipeline;
- wrappers for the selfi2019 prior from [leclercq2019primordial].
Raises
------
OSError
If file or directory paths are inaccessible.
RuntimeError
If unexpected HPC or multi-processing errors arise.
"""
import gc
from selfisys.utils.logger import getCustomLogger
logger = getCustomLogger(__name__)
[docs]
def get_summary(x, bins, normalisation=None, kmax=1.4):
"""
Compute a power-spectrum summary for given cosmological parameters.
Parameters
----------
x : array-like
Cosmological parameters [h, Omega_b, Omega_m, n_s, sigma_8].
bins : array-like
Wavenumber bins.
normalisation : float or None, optional
Normalisation constant to scale the resulting spectrum.
kmax : float, optional
Maximum wavenumber for get_Pk.
Returns
-------
theta : ndarray
The computed power-spectrum values, optionally normalised.
Raises
------
RuntimeError
If the power-spectrum computation fails unexpectedly.
"""
from numpy import array
from pysbmy.power import get_Pk
from selfisys.utils.tools import cosmo_vector_to_Simbelmyne_dict
try:
theta = get_Pk(bins, cosmo_vector_to_Simbelmyne_dict(x, kmax=kmax))
if normalisation is not None:
theta /= normalisation
return array(theta)
except Exception as e:
logger.critical("Unexpected error in get_summary: %s", str(e))
raise RuntimeError("Failed to compute power spectrum summary.") from e
finally:
gc.collect()
[docs]
def worker_class(params):
"""
Worker function to compute power spectra with CLASS, compatible with
Python multiprocessing.
Parameters
----------
params : tuple
(x, bins, normalisation, kmax) where x is an array-like of
cosmological parameters, bins is the wavenumber array,
normalisation is a float or None, and kmax is a float.
Returns
-------
theta : ndarray
Power-spectrum summary from `get_summary`.
"""
x, bins, normalisation, kmax = params
return get_summary(x, bins, normalisation, kmax)
[docs]
class planck_prior:
"""
Custom prior for the SelfiSys pipeline. This is the prior used in
[hoellinger2024diagnosing], based on the Planck 2018 cosmological
parameters.
This class provides methods to compute a power-spectrum prior from a
prior distribution of cosmological parameters, using a Gaussian fit.
See equation (7) in [hoellinger2024diagnosing].
Parameters
----------
Omega_mean : array-like
Mean of the prior distribution on cosmological parameters.
Omega_cov : array-like
Covariance matrix of the prior distribution on cosmological
parameters.
bins : array-like
Wavenumbers where the power spectrum is evaluated.
normalisation : float or None
If not None, divide the power spectra by the normalisation.
kmax : float
Maximum wavenumber for computations.
nsamples : int, optional
Number of samples drawn from the prior on the cosmological
parameters. Default is 10,000.
nthreads : int, optional
Number of CPU threads for parallel tasks. Default is -1, that
is, auto-detect the number of available threads.
EPS_K : float, optional
Regularisation parameter for covariance inversion. Default 1e-7.
EPS_residual : float, optional
Additional cutoff for matrix inversion. Default 1e-3.
filename : str or None, optional
Path to a .npy file to store or load precomputed power spectra.
Attributes
----------
mean : ndarray
Mean of the computed power spectra.
covariance : ndarray
Covariance matrix of the computed power spectra.
inv_covariance : ndarray
Inverse of the covariance matrix.
Raises
------
OSError
If file reading or writing fails.
RuntimeError
For unexpected HPC or multi-processing errors.
"""
def __init__(
self,
Omega_mean,
Omega_cov,
bins,
normalisation,
kmax,
nsamples=10000,
nthreads=-1,
EPS_K=1e-7,
EPS_residual=1e-3,
filename=None,
):
from numpy import where
from multiprocessing import cpu_count
self.Omega_mean = Omega_mean
self.Omega_cov = Omega_cov
self.bins = bins
self.normalisation = normalisation
self.kmax = kmax
self.nsamples = nsamples
self.EPS_K = EPS_K
self.EPS_residual = EPS_residual
self.filename = filename
if nthreads == -1:
# Use #CPU - 1 or fallback to 1 if a single CPU is available
self.nthreads = cpu_count() - 1 or 1
else:
self.nthreads = nthreads
self._Nbin_min = where(self.bins >= 0.01)[0].min()
self._Nbin_max = where(self.bins <= self.kmax)[0].max() + 1
# Attributes set after compute()
self.mean = None
self.covariance = None
self.inv_covariance = None
self.thetas = None
@property
def Nbin_min(self, k_min):
"""Index of the minimal wavenumber given k_min."""
return self._Nbin_min
@property
def Nbin_max(self, k_min):
"""Index of the maximal wavenumber given self.kmax."""
return self._Nbin_max
[docs]
def compute(self):
"""
Compute the prior (mean, covariance, and inverse covariance).
If `self.filename` exists, tries to load the prior. Otherwise,
samples from the prior distribution on cosmological parameters
and evaluates the power spectra in parallel.
Raises
------
OSError
If self.filename is not writable/accessible.
RuntimeError
If multi-processing or power-spectra computations fail.
"""
from os.path import exists
import numpy as np
try:
if self.filename and exists(self.filename):
logger.info("Loading precomputed thetas from %s", self.filename)
self.thetas = np.load(self.filename)
else:
from time import time
from multiprocessing import Pool
import tqdm.auto as tqdm
logger.info("Sampling %d cosmological parameter sets...", self.nsamples)
OO = np.random.multivariate_normal(
np.array(self.Omega_mean), np.array(self.Omega_cov), self.nsamples
)
eps = 1e-5
OO = np.clip(OO, eps, 1 - eps)
liste = [(o, self.bins, self.normalisation, self.kmax) for o in OO]
logger.info(
"Computing prior power spectra in parallel using %d threads...", self.nthreads
)
start = time()
with Pool(self.nthreads) as pool:
thetas = []
for theta in tqdm.tqdm(pool.imap(worker_class, liste), total=len(liste)):
thetas.append(theta)
thetas = np.array(thetas)
end = time()
logger.info("Done computing power spectra in %.2f seconds.", end - start)
self.thetas = thetas
if self.filename:
logger.info("Saving thetas to %s", self.filename)
np.save(self.filename, thetas)
# Compute stats
self.mean = np.mean(self.thetas, axis=0)
self.covariance = np.cov(self.thetas.T)
logger.info("Regularising and inverting the prior covariance matrix.")
from pyselfi.utils import regular_inv
self.inv_covariance = regular_inv(self.covariance, self.EPS_K, self.EPS_residual)
except OSError as e:
logger.error("File I/O error: %s", str(e))
raise
except Exception as e:
logger.critical("Error during prior computation: %s", str(e))
raise RuntimeError("planck_prior computation failed.") from e
finally:
gc.collect()
[docs]
def logpdf(self, theta, theta_mean, theta_covariance, theta_icov):
"""
Return the log prior probability at a given point in parameter
space.
Parameters
----------
theta : ndarray
Evaluation point in parameter space.
theta_mean : ndarray
Prior mean vector.
theta_covariance : ndarray
Prior covariance matrix.
theta_icov : ndarray
Inverse of the prior covariance matrix.
Returns
-------
float
Log prior probability value.
"""
import numpy as np
diff = theta - theta_mean
val = -0.5 * diff.dot(theta_icov).dot(diff)
val -= 0.5 * np.linalg.slogdet(2 * np.pi * theta_covariance)[1]
return val
[docs]
def sample(self, seedsample=None):
"""
Draw a random sample from the prior distribution.
Parameters
----------
seedsample : int, optional
Seed for the random number generator.
Returns
-------
ndarray
A single sample from the prior distribution.
"""
from numpy.random import seed, multivariate_normal
if seedsample is not None:
seed(seedsample)
return multivariate_normal(self.mean, self.covariance)
[docs]
def save(self, fname):
"""
Save the prior to an output file.
Parameters
----------
fname : str
Output HDF5 filename to store the prior data.
Raises
------
OSError
If the file cannot be accessed or written.
"""
import h5py
from ctypes import c_double
from pyselfi.utils import PrintMessage, save_replace_dataset, save_replace_attr
try:
PrintMessage(3, f"Writing prior in data file '{fname}'...")
with h5py.File(fname, "r+") as hf:
def save_to_hf(name, data, **kwargs):
save_replace_dataset(hf, f"/prior/{name}", data, dtype=c_double, **kwargs)
# Hyperparameters
save_to_hf("thetas", self.thetas, maxshape=(None, None))
save_to_hf("Omega_mean", self.Omega_mean, maxshape=(None,))
save_to_hf("Omega_cov", self.Omega_cov, maxshape=(None, None))
save_to_hf("bins", self.bins, maxshape=(None,))
save_replace_attr(hf, "/prior/normalisation", self.normalisation, dtype=c_double)
save_replace_attr(hf, "/prior/kmax", self.kmax, dtype=c_double)
# Mandatory attributes
save_to_hf("mean", self.mean, maxshape=(None,))
save_to_hf("covariance", self.covariance, maxshape=(None, None))
save_to_hf("inv_covariance", self.inv_covariance, maxshape=(None, None))
PrintMessage(3, f"Writing prior in data file '{fname}' done.")
except OSError as e:
logger.error("Failed to save prior to '%s': %s", fname, str(e))
raise
finally:
gc.collect()
[docs]
@classmethod
def load(cls, fname):
"""
Load the prior from input file.
Parameters
----------
fname : str
Input HDF5 filename.
Returns
-------
prior
The prior object.
Raises
------
OSError
If the file cannot be read or is invalid.
"""
from h5py import File
from numpy import array
from ctypes import c_double
from pyselfi.utils import PrintMessage
try:
PrintMessage(3, f"Reading prior in data file '{fname}'...")
with File(fname, "r") as hf:
# Load constructor parameters
Omega_mean = array(hf.get("/prior/Omega_mean"), dtype=c_double)
Omega_cov = array(hf.get("/prior/Omega_cov"), dtype=c_double)
bins = array(hf.get("/prior/bins"), dtype=c_double)
normalisation = hf.attrs["/prior/normalisation"]
kmax = hf.attrs["/prior/kmax"]
# Instantiate class
prior = cls(Omega_mean, Omega_cov, bins, normalisation, kmax)
# Load mandatory arrays
prior.mean = array(hf.get("prior/mean"), dtype=c_double)
prior.covariance = array(hf.get("/prior/covariance"), dtype=c_double)
prior.inv_covariance = array(hf.get("/prior/inv_covariance"), dtype=c_double)
PrintMessage(3, f"Reading prior in data file '{fname}' done.")
return prior
except OSError as e:
logger.error("Failed to read prior from '%s': %s", fname, str(e))
raise
finally:
gc.collect()
[docs]
def logposterior_hyperparameters_parallel(
selfi,
theta_fiducial,
Nbin_min,
Nbin_max,
theta_norm,
k_corr,
alpha_cv,
):
"""
Compute the log-posterior for the hyperparameters of the prior from
[leclercq2019primordial], for use within the SelfiSys pipeline.
Parameters
----------
selfi : object
The selfi object.
theta_fiducial : ndarray
Fiducial spectrum.
Nbin_min : int
Minimum bin index for the wavenumber range.
Nbin_max : int
Maximum bin index for the wavenumber range.
theta_norm : float
Hyperparameter controlling the overall uncertainty.
k_corr : float
Hyperparameter controlling correlation scale.
alpha_cv : float
Cosmic variance strength.
Returns
-------
float
The log-posterior value for the given hyperparameters.
Raises
------
RuntimeError
If the log-posterior computation fails unexpectedly.
"""
try:
return selfi.logposterior_hyperparameters(
theta_fiducial, Nbin_min, Nbin_max, theta_norm, k_corr, alpha_cv
)
except Exception as e:
logger.critical("Unexpected error in logposterior_hyperparameters_parallel: %s", str(e))
raise RuntimeError("logposterior_hyperparameters_parallel failed.") from e
finally:
gc.collect()