#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2024 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------
__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2024"
__license__ = "GPLv3"
"""
Routines for parameter inference and gradient evaluation in the SelfiSys
pipeline.
"""
import gc
from typing import Any, Tuple, List
from selfisys.utils.logger import getCustomLogger
logger = getCustomLogger(__name__)
[docs]
def Simbelmyne_worker(args) -> Tuple[float, Any]:
"""
Worker function used for implicit likelihood inference of
cosmological parameters.
Parameters
----------
args : tuple
A tuple of arguments to be unpacked for the worker routine:
(index, param_val, param_id, fsimdir, k_s, Pbins_bnd,
selection_params, norm_csts, P_ss_obj_path, obs_density,
lin_bias, noise, survey_mask_path, G_sim_path, G_ss_path, Np0,
Npm0, seedphase_init, seednoise_init, size, L,
radial_selection, sim_params, wd, batch_idx, dbg, modeldir,
local_mask_prefix, TimeStepDistribution, indices_steps_cumul,
eff_redshifts, poolname_abc, setup_only, prefix_mocks).
Returns
-------
tuple
(param_val, Phi) where param_val is the parameter value used,
and Phi is the resulting summary from evaluating the model.
Raises
------
OSError
If file I/O (reading or writing mock data) fails.
RuntimeError
For unexpected errors in the worker routine.
"""
import os
from pathlib import Path
try:
(
index,
param_val,
param_id,
fsimdir,
k_s,
Pbins_bnd,
selection_params,
norm_csts,
P_ss_obj_path,
obs_density,
lin_bias,
noise,
survey_mask_path,
G_sim_path,
G_ss_path,
Np0,
Npm0,
seedphase_init,
seednoise_init,
size,
L,
radial_selection,
sim_params,
wd,
batch_idx,
dbg,
modeldir,
local_mask_prefix,
TimeStepDistribution,
indices_steps_cumul,
eff_redshifts,
poolname_abc,
setup_only,
prefix_mocks,
) = args
spectrum_name = int(str(seedphase_init + index) + str(seednoise_init + index))
pooldir = (
fsimdir + "/pool/d" if not poolname_abc else fsimdir + "/pool/" + poolname_abc + "/d"
)
simdir_d = pooldir + str(spectrum_name) + "/"
Path(simdir_d).mkdir(parents=True, exist_ok=True)
if prefix_mocks is None:
fname_mocks = (
simdir_d + "mocks_d" + str(spectrum_name) + "_p" + str(batch_idx + index) + ".h5"
)
else:
fname_mocks = (
simdir_d
+ prefix_mocks
+ "_mocks_d"
+ str(spectrum_name)
+ "_p"
+ str(batch_idx + index)
+ ".h5"
)
if os.path.exists(fname_mocks):
from h5py import File
logger.debug("Mock file %s found, loading existing data...", fname_mocks)
with File(fname_mocks, "r") as f:
Phi = f["Phi"][:]
else:
logger.debug("No existing mock file at %s, generating new data...", fname_mocks)
from numpy.random import normal
from numpy import shape, max
from selfisys.global_parameters import BASELINE_SEEDNORM, omegas_gt
from selfisys.utils.tools import get_k_max
from selfisys.hiddenbox import HiddenBox
from selfisys.utils.tools import get_summary
P = len(Pbins_bnd) - 1
try:
BB_selfi = HiddenBox(
k_s=k_s,
P_ss_path=P_ss_obj_path,
Pbins_bnd=Pbins_bnd,
theta2P=None,
P=P * shape(selection_params)[1], # P * Npop
size=size,
L=L,
G_sim_path=G_sim_path,
G_ss_path=G_ss_path,
Np0=Np0,
Npm0=Npm0,
fsimdir=wd[:-1],
modeldir=modeldir,
noise_std=noise,
radial_selection=radial_selection,
selection_params=selection_params,
observed_density=obs_density,
linear_bias=lin_bias,
norm_csts=norm_csts,
survey_mask_path=survey_mask_path,
local_mask_prefix=local_mask_prefix,
sim_params=sim_params,
TimeStepDistribution=TimeStepDistribution,
TimeSteps=indices_steps_cumul,
eff_redshifts=eff_redshifts,
seedphase=seedphase_init,
seednoise=seednoise_init,
fixnoise=False,
seednorm=BASELINE_SEEDNORM,
reset=False,
save_frequency=5,
verbosity=2,
)
k_max = get_k_max(L, size)
except Exception as e:
logger.critical("Error instantiating HiddenBox: %s", str(e))
raise RuntimeError("Failed to set up HiddenBox.") from e
# Evaluate the param -> 'theta' using some get_summary logic
try:
theta = get_summary(param_val, param_id, omegas_gt, k_s, kmax=k_max)
except Exception:
max_tries = 10
perturb_std = 1e-8
param_val_init = param_val
logger.warning(
"get_summary failed for param_val=%s. Trying small perturbations...", param_val
)
for i in range(max_tries):
param_val = normal(param_val_init, perturb_std)
logger.diagnostic("Attempt #%d: param_val=%s", i + 1, param_val)
try:
theta = get_summary(param_val, param_id, omegas_gt, k_s, kmax=k_max)
logger.diagnostic(
"Success with param_val=%s on attempt #%d", param_val, i + 1
)
break
except Exception:
if i == max_tries - 1:
logger.critical(
"All attempts to get_summary failed for param_val=%s",
param_val_init,
)
raise RuntimeError("get_summary repeatedly failed.")
continue
from io import BytesIO
from selfisys.utils.low_level import stderr_redirector, stdout_redirector
cosmo_vect = omegas_gt
cosmo_vect[param_id] = param_val
logger.debug("Evaluating model with HPC redirection, setup_only=%s", setup_only)
f = BytesIO()
g = BytesIO()
try:
with stderr_redirector(f):
with stdout_redirector(g):
if setup_only:
BB_selfi.switch_setup()
else:
BB_selfi.switch_recompute_pool(prefix_mocks=prefix_mocks)
Phi = BB_selfi.evaluate(
theta,
spectrum_name,
seedphase_init + index,
seednoise_init + index,
i=batch_idx + index,
thetaIsP=True,
remove_sbmy=True,
force_powerspectrum=dbg,
force_parfiles=dbg,
check_output=dbg,
abc=poolname_abc,
cosmo_vect=cosmo_vect,
)
if setup_only:
BB_selfi.switch_setup()
else:
BB_selfi.switch_recompute_pool(prefix_mocks=prefix_mocks)
except Exception as e:
logger.critical("Error while evaluating model: %s", str(e))
raise RuntimeError("Simbelmyne_worker model evaluation failed.") from e
finally:
g.close()
f.close()
logger.debug("Returning param_val=%s with resulting Phi of shape %s", param_val, Phi.shape)
return param_val, Phi
except OSError as e:
logger.error("File I/O error in Simbelmyne_worker: %s", str(e))
raise
except Exception as e:
logger.critical("Unexpected error in Simbelmyne_worker: %s", str(e))
raise RuntimeError("Simbelmyne_worker HPC run failed.") from e
finally:
gc.collect()
[docs]
def worker_gradient_Symbelmyne(
coeff: float,
delta_x: float,
omega,
param_index: int,
k_s,
delta: float,
kmax: float,
):
"""
Worker function for evaluating the gradient of the power spectrum
using finite differences.
Parameters
----------
coeff : float
Coefficient for the finite difference.
delta_x : float
Step size in the parameter space.
omega : ndarray
Base cosmological parameter vector.
param_index : int
Index of the parameter being varied.
k_s : ndarray
Array of wavenumbers.
delta : float
Denominator for finite differences (scaled).
kmax : float
Maximum wavenumber for power spectrum.
Returns
-------
ndarray
The gradient of the power spectrum wrt the specified parameter.
Raises
------
RuntimeError
If the gradient evaluation fails.
"""
import numpy as np
from pysbmy.power import get_Pk
from selfisys.utils.tools import cosmo_vector_to_Simbelmyne_dict
omega_new = omega.copy()
try:
omega_new[param_index] += delta_x
ps = get_Pk(k_s, cosmo_vector_to_Simbelmyne_dict(omega_new, kmax=kmax))
contrib_to_grad = (coeff * ps) / delta
return np.array(contrib_to_grad)
except Exception as e:
logger.critical("Error in worker_gradient_Symbelmyne: %s", str(e))
raise RuntimeError("worker_gradient_Symbelmyne failed.") from e
finally:
gc.collect()
[docs]
def evaluate_gradient_of_Symbelmyne(
omega,
param_index: int,
k_s,
coeffs: List[float] = [2 / 3.0, -1 / 12.0],
deltas_x: List[float] = [0.01, 0.02],
delta: float = 1e-2,
kmax: float = 1.4,
):
"""
Estimate the gradient of CLASS with respect to the cosmological
parameters using central finite differences of arbitrary order.
Parameters
----------
omega : ndarray
Base cosmological parameter vector.
param_index : int
Index of the parameter to differentiate against.
k_s : ndarray
Wavenumbers for the power spectrum.
coeffs : list of float, optional
Coefficients for the finite-difference scheme, typically
[2/3, -1/12] etc. Default is [2/3.0, -1/12.0].
deltas_x : list of float, optional
Step sizes. The corresponding negative steps are generated
automatically. Default is [0.01, 0.02].
delta : float, optional
Scale for the finite difference in the denominator. Default is
1e-2.
kmax : float, optional
Maximum wavenumber for the power spectrum. Default is 1.4.
Returns
-------
ndarray
The gradient of the power spectrum wrt the specified parameter.
Raises
------
RuntimeError
If the gradient evaluation fails.
"""
import numpy as np
from multiprocessing import Pool
try:
grad = np.zeros(len(k_s))
full_coeffs = np.concatenate((-np.array(coeffs)[::-1], coeffs))
deltas_x_full = np.concatenate((-np.array(deltas_x)[::-1], deltas_x))
tasks = [
(c, dx, omega, param_index, k_s, delta, kmax)
for c, dx in zip(full_coeffs, deltas_x_full)
]
logger.diagnostic("Starting parallel HPC for gradient, tasks=%d", len(tasks))
with Pool() as mp_pool:
results = mp_pool.starmap(worker_gradient_Symbelmyne, tasks)
for contrib in results:
grad += contrib
logger.diagnostic("Gradient evaluation completed. Shape=%s", grad.shape)
return grad
except Exception as e:
logger.critical("Unexpected error in evaluate_gradient_of_Symbelmyne: %s", str(e))
raise RuntimeError("evaluate_gradient_of_Symbelmyne failed.") from e
finally:
gc.collect()