Source code for selfisys.hiddenbox

#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2024 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------

__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2024"
__license__ = "GPLv3"

"""This module provides the HiddenBox class, which is used to define
standalone stochastic forward models of large-scale spectroscopic
galaxy surveys.

The HiddenBox class is compatible with the pySELFI package.
"""

import os
from gc import collect
from typing import Callable, Any, List, Optional, Union

import h5py
import numpy as np
from selfisys.global_parameters import DEFAULT_VERBOSE_LEVEL
from selfisys.utils.parser import joinstrs_only


[docs] class HiddenBox: """This class represents custom forward data model of large-scale spectroscopic galaxy surveys.""" def __init__( self, k_s: Any, P_ss_path: str, Pbins_bnd: Any, theta2P: Callable, P: int, size: int, L: float, G_sim_path: str, G_ss_path: str, Np0: int, Npm0: int, fsimdir: str, noise_std: Optional[float] = None, radial_selection: Optional[str] = None, selection_params: Optional[List[Any]] = None, observed_density: Optional[float] = None, linear_bias: Optional[Union[float, List[float]]] = None, norm_csts: Optional[Union[float, List[float]]] = None, survey_mask_path: Optional[str] = None, local_mask_prefix: Optional[str] = None, sim_params: Optional[str] = None, eff_redshifts: Optional[List[float]] = None, TimeSteps: Optional[List[int]] = None, TimeStepDistribution: Optional[str] = None, seedphase: Optional[int] = None, seednoise: Optional[int] = None, fixnoise: bool = False, seednorm: Optional[int] = None, save_frequency: Optional[int] = None, reset: bool = False, verbosity: int = DEFAULT_VERBOSE_LEVEL, **kwargs, ): """ Initialise the HiddenBox. Parameters ---------- k_s : array-like Vector of input support wavenumbers. P_ss_path : str Path to the spectrum used to normalise the outputs. Pbins_bnd : array-like Vector of bin boundaries for the summary statistics. theta2P : Callable Function to convert theta vectors to initial power spectra. P : int Dimension of the output summary statistics. size : int Side length of the simulation box in voxels. L : float Side length of the simulation box in Mpc/h. G_sim_path : str Path to the simulation grid. G_ss_path : str Path to the summary grid. Np0 : int Number of dark matter particles per spatial dimension. Npm0 : int Side length of the particle-mesh grid in voxels. fsimdir : str Output directory. noise_std : float, optional Standard deviation for the Gaussian noise. Default is `None`. radial_selection : str, optional Type of radial selection mask. Default is `None`, corresponding to no radial selection. Available options: - 'multiple_lognormal': Use multiple log-normal radial selection functions. selection_params : list, optional List of parameters for the radial selection mask, for each population. Default is `None`. For the 'multiple_lognormal' radial selection, `selection_params` is a shape `(3, N_pop)` array comprising the three following parameters for each population: - selection_std : (float) Standard deviation of the distribution, e.g., constant × (1 + z). - selection_mean : (float) Mean of the distribution in Gpc/h. - selection_rescale : (float, optional) Individually rescale the distributions by the given value. If `None`, the global maximum is normalised to `1`. observed_density : float, optional Mean galaxy density. Default is `None`. linear_bias : float or list of float, optional First-order linear galaxy biases. If `None`, use the dark matter density to compute the summaries. Default is `None`. norm_csts : float or list of float, optional If not `None`, normalise the output of the hidden box accordingly. For `radial_selection == 'multiple_lognormal'`, `norm_csts` must be a list of `N_pop` values. Default is `None`. survey_mask_path : str, optional If not `None`, apply the corresponding survey mask to the observed field. Default is `None`. local_mask_prefix : str, optional Prefix for the local copy of the survey mask. If `None`, use the default name. Default is `None`. sim_params : str, optional Set of simulation parameters to be used for Simbelmynë. Check `setup_sbmy_parfiles` in `selfisys.sbmy_parser` for details. Default is `None`. eff_redshifts : list, optional Effective redshifts for the time steps. Default is `None`. TimeSteps : list, optional Number of time steps to reach the corresponding effective redshifts. Default is `None`. TimeStepDistribution : str, optional Path to the Simbelmynë time step distribution file. Default is `None`. seedphase : int, optional Seed to generate the initial white noise realisation. Default is `None`. seednoise : int, optional Initial state of the RNG to generate the noise sequence. If `fixnoise` is True, the seed used in `compute_pool` is always `seednoise`. If `fixnoise` is False, it is `seednoise + i` for realisation `i`. Default is `None`. fixnoise : bool, optional Whether to fix the noise realisation. If `True`, always use `seednoise`. Default is `False`. seednorm : int, optional Seed used for normalisation. Default is `None`. save_frequency : int, optional Save the outputs of the hidden box to disk every `save_frequency` evaluations. Default is `None`. reset : bool, optional Whether to always force reset the survey box when the hidden `HiddenBox` object is instantiated. Default is `False`. verbosity : int, optional Verbosity level of the hidden box. `0` is silent, `1` is minimal, `2` is verbose. Default is `DEFAULT_VERBOSE_LEVEL`. **kwargs Additional optional keyword arguments. """ # Mandatory attributes self.k_s = k_s self.P_ss_path = P_ss_path self.Pbins_bnd = Pbins_bnd self.theta2P = theta2P self.P = P self.size = size self.L = L self.G_sim_path = G_sim_path self.G_ss_path = G_ss_path self.Np0 = Np0 self.Npm0 = Npm0 self.fsimdir = fsimdir # Optional attributes self.noise_std = noise_std self.radial_selection = radial_selection self.selection_params = selection_params self.observed_density = observed_density self.linear_bias = linear_bias self.norm_csts = norm_csts self.survey_mask_path = survey_mask_path self.local_mask_prefix = local_mask_prefix self.sim_params = sim_params self.eff_redshifts = eff_redshifts self.TimeSteps = TimeSteps self.TimeStepDistribution = TimeStepDistribution self.seedphase = seedphase self.seednoise = seednoise self.fixnoise = fixnoise self.seednorm = seednorm self.save_frequency = save_frequency self.reset = reset self.verbosity = verbosity # Additional attributes from kwargs for key, value in kwargs.items(): setattr(self, key, value) # Compute default values self._set_defaults() # Create the window function W(n, r) = C(n) * R(r) self._init_survey_mask() self._init_radial_selection()
[docs] def reset_survey(self): """Re-initialise the survey mask C(n) and radial selection function R(r). """ self._init_survey_mask(reset=True) self._init_radial_selection(reset=True)
[docs] def update(self, **kwargs): """Updates the given parameter(s) of the hidden box with the given value(s). Parameters ---------- **kwargs : dict dictionary of parameters to update """ for key, value in kwargs.items(): setattr(self, key, value)
[docs] def make_data( self, cosmo, id, seedphase, seednoise, force_powerspectrum=False, force_parfiles=False, force_sim=False, force_cosmo=False, force_phase=False, d=-1, remove_sbmy=True, verbosity=None, return_g=False, RSDs=True, prefix_mocks=None, ): """Generate simulated data based on the given cosmological parameters. Parameters ---------- cosmo : dict Cosmological and infrastructure parameters. id : int or str, optional Identifier used as a suffix in the file names. Default is `0`. seedphase : int or list of int Seed to generate the initial white noise in Fourier space. seednoise : int or list of int Seed to generate the noise realisation. force_powerspectrum : bool, optional Force recomputing the input spectrum. Default is `False`. force_parfiles : bool, optional Force recomputing the parameter files. Default is `False`. force_sim : bool, optional Force recomputing the simulation. Default is `False`. force_cosmo : bool, optional Force recomputing the cosmological parameters. Default is `False`. force_phase : bool, optional Force recomputing the initial phase. Default is `False`. d : int, optional Direction in the parameter space. Default is `-1`. remove_sbmy : bool, optional Whether to remove most Simbelmynë output files after use for disk space management. Default is `True`. verbosity : int, optional Verbosity level. If `None`, use self.verbosity. Default is `None`. return_g : bool, optional Whether to return the full field alongside the summary. Default is `False`. RSDs : bool, optional Whether to compute the redshift-space distortions. Default is `True`. prefix_mocks : str, optional Prefix for the mock data. If None, use self.prefix_mocks. Default is `None`. Returns ------- Phi : ndarray Vector of summary statistics. g : ndarray or list of ndarray or None Observed field(s) if return_g is `True`; otherwise `None`. """ from selfisys.utils.path_utils import get_file_names from selfisys.sbmy_interface import get_power_spectrum_from_cosmo self._PrintMessage(0, f"Making mock data...", verbosity) self._indent() names = get_file_names( self.fsimdir, id, self.sim_params, self.TimeSteps, prefix_mocks or self.prefix_mocks, self.gravity_on, return_g, ) # Save cosmological parameters self._save_cosmo(cosmo, names["fname_cosmo"], force_cosmo) # Generate the input initial matter power spectrum self._PrintMessage(1, f"Computing initial power spectrum...", verbosity) get_power_spectrum_from_cosmo( self.L, self.size, cosmo, names["fname_power_spectrum"], force_powerspectrum ) self._PrintMessage(1, f"Computing initial power spectrum done.", verbosity) # Generate the simulated galaxy survey data self._PrintMessage(1, f"Running the forward model...", verbosity) Phi, g = self._aux_hiddenbox( d, seedphase, seednoise, names, force_parfiles=force_parfiles, force_sim=force_sim, force_phase=force_phase, return_g=return_g, RSDs=RSDs, ) self._PrintMessage(1, f"Running the forward model done.", verbosity) if remove_sbmy and self.gravity_on: self._clean_output(names["fname_outputinitialdensity"]) self._unindent() self._PrintMessage(1, f"Making mock data done.", verbosity) return (Phi, g) if return_g else Phi
[docs] def evaluate( self, theta, d, seedphase, seednoise, i=0, N=0, force_powerspectrum=False, force_parfiles=False, force_sim=False, remove_sbmy=False, theta_is_p=False, simspath=None, check_output=False, RSDs=True, abc=False, cosmo_vect=None, ): """Evaluate the hidden box for a given input power spectrum. The result is deterministic (the phase is fixed), except as it is modified by nuisance parameters if any. This routine is used by `pySELFI` to compute the gradient of the hidden box with respect to the power spectrum, and by ABC-PMC to evaluate the forward model. Parameters ---------- theta : ndarray Power spectrum values at the support wavenumbers. d : int Direction in parameter space, from `0` to `S`. seedphase : int or list of int Seed to generate the initial white noise in Fourier space. seednoise : int or list of int Seed to generate the noise realisation. i : int, optional Current evaluation index of the hidden box. Default is `0`. N : int, optional Total number of evaluations of the hidden box. Default is `0`. force_powerspectrum : bool, optional If `True`, force recomputation of the power spectrum at the values of the Fourier grid. Default is `False`. force_parfiles : bool, optional If `True`, overwrite existing parameter files. Default is `False`. force_sim : bool, optional If `True`, rerun the simulation even if the output density already exists. Default is `False`. remove_sbmy : bool, optional If `True`, remove Simbelmynë output files from disk. Default is `False`. theta_is_p : bool, optional Set to `True` if `theta` is already an unnormalised power spectrum. Default is `False`. simspath : str, optional Path to the simulations directory. Default is `None`. check_output : bool, optional If `True`, check the integrity of the output file and recompute if corrupted. Default is `False`. RSDs : bool, optional Whether to compute redshift-space distortions. Default is `True`. abc : bool or str, optional If not False, remove most output files after evaluation. Default is `False`. cosmo_vect : ndarray, optional Cosmological parameters. Required if `abc` is `True`. Returns ------- Phi : ndarray Vector of summary statistics. """ from selfisys.utils.path_utils import file_names_evaluate if abc and cosmo_vect is None: raise ValueError("cosmo_vect must be provided when using ABC.") if not abc and cosmo_vect is not None: raise ValueError("cosmo_vect must not be provided when not using ABC.") if not theta_is_p: self._PrintMessage( 1, f"Direction {d}. Evaluating hidden box (index {i}/{N - 1})...", ) self._indent() simdir = simspath or self.fsimdir simdir_d = os.path.join(simdir, joinstrs_only(["pool/", abc, "d", str(d)])) # Get the file names for the current evaluation names = file_names_evaluate( simdir, simdir_d, d, i, self.sim_params, self.TimeSteps, self.prefix_mocks, abc, self.gravity_on, ) fname_power_spectrum = names["fname_power_spectrum"] fname_simparfile = names["fname_simparfile"] fname_whitenoise = names["fname_whitenoise"] fname_outputinitialdensity = names["fname_outputinitialdensity"] # Interpolate the power spectrum values over the Fourier grid self._PrintMessage(1, f"Interpolating spectrum over the Fourier grid...") self._power_spectrum_from_theta( theta, fname_power_spectrum, theta_is_p, force=force_powerspectrum ) self._PrintMessage(1, f"Interpolating spectrum over the Fourier grid done.") # Compute the simulated data self._PrintMessage(1, f"Generating observed summary...") Phi, _ = self._aux_hiddenbox( d, seedphase, seednoise, names, force_parfiles, force_sim=force_sim, force_phase=False, return_g=False, check_output=check_output, RSDs=RSDs, sample=cosmo_vect, ) self._PrintMessage(1, f"Generating observed summary done.") # Clean up the output files if remove_sbmy and self.gravity_on: self._clean_output(fname_outputinitialdensity) if abc: self._clean_output(fname_power_spectrum) self._clean_output(fname_simparfile) self._clean_output(fname_whitenoise) for f in os.listdir(simdir_d): if self.TimeSteps is not None: if f.startswith(f"output_realdensity_d{d}_p{i}_{self.TimeSteps[0]}"): self._clean_output(os.path.join(simdir_d, f)) else: if f.startswith(f"output_realdensity_d{d}_p{i}"): self._clean_output(os.path.join(simdir_d, f)) if f.startswith(f"output_density_d{d}_p{i}"): self._clean_output(os.path.join(simdir_d, f)) if not self.gravity_on: self._clean_output(fname_outputinitialdensity) if not theta_is_p: self._unindent() self._PrintMessage( 1, f"Direction {d}. Evaluation done (index {i}/{N - 1}).", ) return Phi
[docs] def switch_recompute_pool(self, prefix_mocks=None): """Toggle recomputation of the pool for future `compute_pool` calls. Parameters ---------- prefix_mocks : str, optional Prefix for the future simulation files. Default is `None`. """ self._force_recompute_mocks = not self.force_recompute_mocks self._prefix_mocks = prefix_mocks if prefix_mocks is not None else None
[docs] def switch_setup(self): """Toggle the setup-only mode.""" self._setup_only = not self.setup_only
[docs] def compute_pool( self, theta, d, pool_fname, N, index=None, force_powerspectrum=False, force_parfiles=False, force_sim=False, remove_sbmy=False, theta_is_p=False, simspath=None, bar=False, ): """Compute a pool of realisations of the hidden box compatible with `pySELFI`. Parameters ---------- theta : ndarray Power spectrum values at the support wavenumbers. d : int Direction in parameter space, from 0 to S. pool_fname : str Filename for the pool. N : int Number of realisations required at the given direction. index : int, optional Index of a single simulation to run. Default is `None`. force_powerspectrum : bool, optional If True, force recomputation of the power spectrum. Default is `False`. force_parfiles : bool, optional If True, overwrite existing parameter files. Default is `False`. force_sim : bool, optional If True, rerun the simulation even if the output density already exists. Default is `False`. remove_sbmy : bool, optional If True, remove Simbelmynë output files from disk. Default is `False`. theta_is_p : bool, optional Set to True when `theta` is already an unnormalised power spectrum. Default is `False`. simspath : str, optional Path indicating where to store the simulations. Default is `None`. bar : bool, optional If True, display a progress bar. Default is `False`. Returns ------- p : Pool Simulation pool object. """ import tqdm.auto as tqdm from pyselfi.pool import pool as Pool self._PrintMessage(1, f"Computing a pool of realisations of the hidden box...") pool_fname = str(pool_fname) if self.force_recompute_mocks: if os.path.exists(pool_fname): os.remove(pool_fname) p = Pool(pool_fname, N, retro=False) ids = list(range(N)) if index is None else [index] def worker(i): this_seedphase = self._get_current_seed(self.__global_seedphase, False, i) this_seednoise = self._get_current_seed(self.__global_seednoise, self.fixnoise, i) Phi = self.evaluate( theta, d, this_seedphase, this_seednoise, i=i, N=N, force_powerspectrum=force_powerspectrum, force_parfiles=force_parfiles, force_sim=force_sim, remove_sbmy=remove_sbmy, theta_is_p=theta_is_p, simspath=simspath, ) p.add_sim(Phi, i) iterator = tqdm.tqdm(ids, desc=f"Direction {d}/{self.S}") if bar else ids for i in iterator: worker(i) if index is None: p.load_sims() p.save_all() return p
[docs] def load_pool(self, pool_fname, N): """Load a pool of realisations of the hidden box. Parameters ---------- pool_fname : str Filename of the pool to load. N : int Number of realisations in the pool. Returns ------- p : Pool Simulation pool object. """ from pyselfi.pool import pool as Pool pool_fname = str(pool_fname) p = Pool(pool_fname, N, retro=False) p.load_sims() p.save_all() return p
@property def Npop(self): """Number of populations.""" return self._Npop @Npop.setter def Npop(self, _): """Compute the number of populations.""" if self.radial_selection == "multiple_lognormal": if self.selection_params is not None: self._Npop = len(self.selection_params[0]) if self.linear_bias is not None and len(self.linear_bias) != self._Npop: raise ValueError("Length of linear_bias must match the number of populations.") if self.norm_csts is not None and len(self.norm_csts) != self._Npop: raise ValueError("Length of norm_csts must match the number of populations.") else: raise ValueError( "Selection parameters are required for multiple_lognormal radial selection." ) else: self._Npop = 1 @property def gravity_on(self): """Whether gravity is enabled.""" return self._gravity_on @gravity_on.setter def gravity_on(self, _): """Compute and set the gravity status.""" if self.sim_params: self._gravity_on = not self.sim_params.startswith("nograv") else: self._gravity_on = True @property def Ntimesteps(self): """Number of time steps.""" return self._Ntimesteps @Ntimesteps.setter def Ntimesteps(self, _): """Compute and set the number of time steps.""" if self.sim_params: if self.sim_params.startswith("splitLPT"): self._Ntimesteps = len(self.eff_redshifts) elif self.sim_params.startswith("split"): self._Ntimesteps = len(self.TimeStepDistribution) elif self.sim_params.startswith("nograv"): self._Ntimesteps = None else: self._Ntimesteps = None else: self._Ntimesteps = None @property def force_recompute_mocks(self): """Whether to force recomputation of mocks.""" return self._force_recompute_mocks @property def setup_only(self): """Whether to only set up the hidden box.""" return self._setup_only @property def prefix_mocks(self): """Prefix for the mocks.""" return self._prefix_mocks @property def modified_selfi(self): """Whether to use the modified selfi.""" return self._modified_selfi @property def force_neglect_lightcone(self): """Whether to force neglecting the lightcone.""" return self._force_neglect_lightcone @property def Psingle(self): """Number of summary statistics for each population.""" return self._Psingle @Psingle.setter def Psingle(self, _): """Dimension of the summary statistics for a single population, if relevant. """ if self.radial_selection == "multiple_lognormal": self._Psingle = self.P // self.Npop else: self._Psingle = self.P @property def __global_seedphase(self): return self.seedphase @property def __global_seednoise(self): return self.seednoise @property def __global_seednorm(self): """Global seed for the normalisation constants.""" return self.seednorm def _set_defaults(self): """Set default values and ensure consistency of the hidden box configuration. """ if not hasattr(self, "modeldir"): self.modeldir = os.path.join(self.fsimdir, "model") self._force_recompute_mocks = False self._setup_only = False self._prefix_mocks = None self._modified_selfi = True self._force_neglect_lightcone = False self.S = len(self.k_s) # The following attributes are set by the setters self.Ntimesteps = None self.gravity_on = True self.Npop = len(self.linear_bias) or 1 self.Psingle = None def _init_survey_mask(self, reset=False): """Initialise the survey mask and save it to disk in binary format. The survey mask C(n) represents the angular selection function of the survey. """ if self.local_mask_prefix: mask_filename = f"{self.local_mask_prefix}_survey_mask_binary.h5" else: mask_filename = "survey_mask_binary.h5" self.local_mask_path = os.path.join(self.modeldir, mask_filename) if not os.path.exists(self.local_mask_path) or reset or self.reset: if self.survey_mask_path is not None: survey_mask = np.load(self.survey_mask_path) survey_mask_binary = (survey_mask > 0).astype(int) del survey_mask else: survey_mask_binary = np.ones([self.size] * 3, dtype=int) with h5py.File(self.local_mask_path, "w") as f: f.create_dataset("survey_mask_binary", data=survey_mask_binary) del survey_mask_binary collect() def _init_radial_selection(self, reset=False): """Initialise the radial selection function R(r) and save it to disk. """ if self.radial_selection == "multiple_lognormal": from selfisys.selection_functions import LognormalSelection # Set the normalisation constants if self.norm_csts is None: self.norm_csts = [1.0] * self.Npop # Initialise the radial selection functions if self.local_mask_prefix: filename = f"{self.local_mask_prefix}_select_fct.h5" else: filename = "select_fct.h5" self.local_select_path = os.path.join(self.modeldir, filename) LogNorm = LognormalSelection( self.L, self.selection_params, survey_mask_path=self.survey_mask_path, local_select_path=self.local_select_path, size=self.size, ) LogNorm.init_selection(reset=self.reset or reset) del LogNorm elif self.radial_selection is None: if self.norm_csts is None: self.norm_csts = 1.0 else: raise ValueError( f"Unknown or unimplemented selection function: {self.radial_selection}" ) def _PrintMessage(self, required_verbosity, message, verbosity=None): """Print a message to standard output using PrintMessage from pyselfi.utils. """ from selfisys.selfi_interface import PrintMessage PrintMessage(required_verbosity, message, verbosity or self.verbosity) def _indent(self): """Indents the standard output using INDENT from pyselfi.utils. """ from selfisys.selfi_interface import indent indent() def _unindent(self): """Unindents the standard output using UNINDENT from pyselfi.utils. """ from selfisys.selfi_interface import unindent unindent() def _save_cosmo(self, cosmo, fname_cosmo, force_cosmo=False): """Save cosmological parameters in JSON format. Parameters ---------- cosmo : dict Cosmological parameters (and infrastructure parameters) to be saved. fname_cosmo : str Name of the output JSON file. force_cosmo : bool, optional If True, overwrite the file if it already exists. Default is `False`. """ from os.path import exists if not exists(fname_cosmo) or force_cosmo: from json import dump with open(fname_cosmo, "w") as fp: dump(cosmo, fp) def _add_noise(self, g, seednoise, field=None): """Add noise to a realisation in physical space. Parameters ---------- g : ndarray Field to which the noise is added (modified in place). seednoise : int or list of int Seed to generate the noise realisation. field : ndarray, optional Selection function to apply to the input field. Default is `None`. Returns ------- None """ if self.noise_std > 0: from numpy import random, sqrt, ones_like from h5py import File if seednoise is not None: rng = random.default_rng(seednoise) else: raise ValueError("Seednoise must be provided and cannot be None.") if field is None: field = ones_like(g) N = self.observed_density if self.observed_density is not None else 1.0 noise = rng.normal( size=(self.size, self.size, self.size), scale=self.noise_std * sqrt(N * field), ) with File(self.local_mask_path, "r") as f: mask = f["survey_mask_binary"][:] noise *= mask g += noise del noise collect() def _get_density_field(self, delta_g_dm, bias): """Apply galaxy bias to a dark matter overdensity field. Parameters ---------- delta_g_dm : ndarray Dark matter density contrast in physical space. bias : float Linear bias factor. Returns ------- delta_g : ndarray Galaxy density or overdensity field. """ if bias is None: bias = 1.0 if not isinstance(bias, float): raise TypeError("Bias must be a float.") if self.observed_density is None: delta_g = bias * delta_g_dm else: delta_g = self.observed_density * (1 + bias * delta_g_dm) return delta_g def _repaint_and_get_Phi( self, g_obj, norm, seednoise, bias=None, field=None, return_g=False, AliasingCorr=True, ): """Repaint a realisation in physical space and compute its summary statistics. Parameters ---------- g_obj : Field Input field object. norm : ndarray Normalisation constants for the summary statistics. seednoise : int or list of int Seed to generate the noise realisation. bias : float, optional Bias to apply to the input field. Default is `None`. field : ndarray, optional Selection function. Reused as output to save memory allocations. Default is `None`. return_g : bool, optional If True, returns the full field. Default is `False`. AliasingCorr : bool, optional Whether to apply aliasing correction. Default is `True`. Returns ------- Phi : ndarray Vector of summary statistics. delta_g : ndarray or None Realisation in physical space if return_g is True; None otherwise. """ import copy from selfisys.sbmy_interface import compute_Phi if bias is not None and not isinstance(bias, float): raise TypeError("Bias must be a float.") g_obj_local = copy.deepcopy(g_obj) g_obj_local.data = self._get_density_field(g_obj_local.data, bias) if field is not None: g_obj_local.data *= field self._add_noise(g_obj_local.data, seednoise=seednoise, field=field) Phi = compute_Phi( self.G_ss_path, self.P_ss_path, g_obj_local, norm, AliasingCorr, self.verbosity, ) delta_g = copy.deepcopy(g_obj_local.data) if return_g else None del g_obj_local collect() return Phi, delta_g def _apply_selection(self, fnames_outputdensity, seednoise, return_g=False, AliasingCorr=True): """Apply the selection function to a realisation in physical space. Parameters ---------- fnames_outputdensity : list of str Filenames of the output density fields. seednoise : int or list of int Seed to generate the noise realisation. return_g : bool, optional If True, returns the full field(s). Default is `False`. AliasingCorr : bool, optional Whether to apply aliasing correction. Default is `True`. Returns ------- Phi_tot : ndarray Concatenated summary statistics for each population. gs : list of ndarray or None List of full fields in physical space if return_g is True; None otherwise. """ from pysbmy.field import read_basefield split = any(self.sim_params.startswith(s) for s in ["split", "custom"]) if not split: g_obj = read_basefield(fnames_outputdensity[0]) elif self.force_neglect_lightcone: g_obj = read_basefield(fnames_outputdensity[0]) if self.radial_selection is not None: Phi_tot = [] gs = [] for ifct in range(self.Npop): if split and not self.force_neglect_lightcone: g_obj = read_basefield(fnames_outputdensity[ifct]) with h5py.File(self.local_select_path, "r") as f: field = f["select_fct"][ifct].astype(float) bias = float(self.linear_bias[ifct]) if self.linear_bias is not None else None Phi, g_out = self._repaint_and_get_Phi( g_obj, self.norm_csts[ifct], seednoise=seednoise, bias=bias, field=field, return_g=return_g, AliasingCorr=AliasingCorr, ) Phi_tot = np.concatenate([Phi_tot, Phi]) if return_g: gs.append(g_out) del g_out result = Phi_tot, gs if return_g else None else: from h5py import File if self.local_mask_path is not None: with File(self.local_mask_path, "r") as f: field = f["survey_mask_binary"][:] else: field = None if not split: bias = self.linear_bias if self.linear_bias is not None else None Phi_tot, delta_g = self._repaint_and_get_Phi( g_obj, self.norm_csts, seednoise=seednoise, bias=bias, field=field, return_g=return_g, AliasingCorr=AliasingCorr, ) result = (Phi_tot, [delta_g]) if return_g else (Phi_tot, None) else: Phi_tot = [] gs = [] for ifct in range(len(self.TimeSteps)): bias = ( self.linear_bias if isinstance(self.linear_bias, float) else (self.linear_bias[ifct] if self.linear_bias is not None else None) ) g_obj = read_basefield(fnames_outputdensity[ifct]) Phi, g_out = self._repaint_and_get_Phi( g_obj, self.norm_csts, seednoise=seednoise, bias=bias, field=field, return_g=return_g, AliasingCorr=AliasingCorr, ) Phi_tot = np.concatenate([Phi_tot, Phi]) if return_g: gs.append(g_out) del g_out result = Phi_tot, gs if return_g else None del field, g_obj collect() return result def _setup_parfiles( self, d, cosmology, file_names, force=False, ): """Sets up Simbelmynë parameter file given the necessary inputs (please refer to the Simbelmynë documentation for more details). Parameters ---------- d : int index giving the direction in parameter space: -1 for mock data, 0 for the expansion point, or from 1 to S cosmology : array, double, dimension=5 cosmological parameters file_names : dict Dictionary containing the names of the input and output files. force : bool, optional, default=False overwrite if files already exists? """ from selfisys.sbmy_interface import setup_sbmy_parfiles hiddenbox_params = { "Npop": self.Npop, "TimeSteps": self.TimeSteps, "eff_redshifts": self.eff_redshifts, "sim_params": self.sim_params, "Ntimesteps": self.Ntimesteps, "TimeStepDistribution": self.TimeStepDistribution, "modified_selfi": self.modified_selfi, "Np0": self.Np0, "Npm0": self.Npm0, "size": self.size, "L": self.L, "fsimdir": self.fsimdir, } setup_sbmy_parfiles( d, cosmology, file_names, hiddenbox_params, force, ) def _run_sim( self, fname_simparfile, fname_simlogs, fnames_outputdensity, force_sim=False, check_output=False, ): """Run a simulation with Simbelmynë. Parameters ---------- fname_simparfile : str Name of the input parameter file. fname_simlogs : str Name of the output Simbelmynë logs. fnames_outputdensity : list of str Names of the output density fields to be written. force_sim : bool, optional If True, force recomputation if output density already exists. Default is `False`. check_output : bool, optional If True, check the integrity of the output files and recompute if corrupted. Default is `False`. """ from glob import glob from selfisys.utils.parser import check_files_exist split = self.sim_params.startswith("split") if not check_files_exist(fnames_outputdensity) or force_sim: from pysbmy import pySbmy if not split: pySbmy(f"{fname_simparfile}_{self.Npop}.sbmy", fname_simlogs) else: for i in range(self.Ntimesteps): pySbmy(f"{fname_simparfile}_pop{i}.sbmy", fname_simlogs) elif check_output: from pysbmy.field import read_basefield try: for fname in fnames_outputdensity: g = read_basefield(fname) del g collect() except Exception: from pysbmy import pySbmy for fname in fnames_outputdensity: os.remove(fname) pySbmy(fname_simparfile, fname_simlogs) # Workaround to remove unwanted Simbelmynë outputs from disk temp_files = glob(os.path.join(self.fsimdir, "data", "cola_kick_*.h5")) for f in temp_files: os.remove(f) if split: temp_files = glob(os.path.join(self.fsimdir, "data", "cola_snapshot_*.h5")) temp_files += glob(os.path.join(self.fsimdir, "data", "lpt_psi*.h5")) for f in temp_files: os.remove(f) def _compute_mocks( self, seednoise, fnames_input, fname_mocks, return_g, fname_g=None, AliasingCorr=True, ): """Apply galaxy bias, observational effects, and compute the summary statistics. Parameters ---------- seednoise : int or list of int Seed to generate the noise realisation. fnames_input : list of str Filenames of the input density fields. fname_mocks : str Filename to save the computed summary statistics. return_g : bool If True, return the observed field(s). fname_g : str, optional Filename to save the observed field(s) if return_g is True. AliasingCorr : bool, optional Whether to apply aliasing correction. Default is `True`. Returns ------- Phi : ndarray Vector of summary statistics. g_out : ndarray or list of ndarray or None Observed field(s) if return_g is True; otherwise None. """ if return_g and fname_g is None: raise ValueError("Filename for the observed field must be provided.") if (not os.path.exists(fname_mocks)) or self.force_recompute_mocks: Phi, g_out = self._apply_selection( fnames_input, seednoise=seednoise, return_g=return_g, AliasingCorr=AliasingCorr, ) with h5py.File(fname_mocks, "w") as f: f.create_dataset("Phi", data=Phi) if return_g: with h5py.File(fname_g, "w") as f: f.create_dataset("g", data=g_out) else: self._PrintMessage(1, f"Using existing mock file: {fname_mocks}") with h5py.File(fname_mocks, "r") as f: Phi = f["Phi"][:] if return_g: with h5py.File(fname_g, "r") as f: g_out = f["g"][:] else: g_out = None return Phi, g_out def _sample_omega(self, seed): """Sample cosmological parameters from the prior. Parameters ---------- seed : int or list of int Seed for the random number generator. Returns ------- omega_sample : ndarray Sampled cosmological parameters. """ from selfisys.utils.tools import sample_omega_from_prior from selfisys.global_parameters import planck_mean, planck_cov ids = list(range(len(planck_mean))) return sample_omega_from_prior(1, planck_mean, planck_cov, ids, seed=seed)[0] def _aux_hiddenbox( self, d, seedphase, seednoise, file_names, force_parfiles=False, force_sim=False, force_phase=False, return_g=False, check_output=False, RSDs=True, sample=None, ): """Generate observations from the input initial matter power spectrum. Parameters ---------- d : int Index indicating the direction in parameter space (-1 for mock data, 0 for the expansion point, or from 1 to S for the gradient directions). seedphase : int or list of int Seed to generate the initial white noise in Fourier space. seednoise : int or list of int Seed to generate the observational noise realisation. file_names : dict Dictionary containing the names of the input and output files. force_parfiles : bool, optional If True, force recomputation of the parameter files. Default is `False`. force_sim : bool, optional If True, force recomputation of the simulation. Default is `False`. force_phase : bool, optional If True, force recomputation of the phase. Default is `False`. return_g : bool, optional If True, return the full realisation in physical space. Default is `False`. check_output : bool, optional If True, check the integrity of the output file and recompute if corrupted. Default is `False`. RSDs : bool, optional If True, include redshift-space distortions. Default is `True`. sample : ndarray, optional Cosmological parameters sample. If `None`, sample from the prior. Default is `None`. Returns ------- result : tuple A tuple containing: - Phi : (ndarray) Summary statistics for each population, concatenated. - g : (ndarray or list of ndarray or None) List of observed fields if return_g is True; None otherwise. """ fname_power_spectrum = file_names["fname_power_spectrum"] fname_simparfile = file_names["fname_simparfile"] fname_whitenoise = file_names["fname_whitenoise"] seedname_whitenoise = file_names["seedname_whitenoise"] fname_outputinitialdensity = file_names["fname_outputinitialdensity"] fnames_outputrealspacedensity = file_names["fnames_outputrealspacedensity"] fnames_outputdensity = file_names["fnames_outputdensity"] fname_simlogs = file_names["fname_simlogs"] fname_mocks = file_names["fname_mocks"] fname_g = file_names["fname_g"] if return_g else None sample = self._sample_omega(seedphase) if sample is None else sample if self.gravity_on: from selfisys.sbmy_interface import generate_white_noise_Field self._setup_parfiles( d, sample, file_names, force_parfiles, ) generate_white_noise_Field( self.L, self.size, seedphase, fname_whitenoise, seedname_whitenoise, force_phase, ) if not self.setup_only: if self.gravity_on: self._run_sim( fname_simparfile, fname_simlogs, fnames_outputdensity, force_sim, check_output, ) fnames = fnames_outputdensity if RSDs else fnames_outputrealspacedensity else: from selfisys.grf import primordial_grf primordial_grf( self.L, self.size, seedphase, fname_power_spectrum, fname_outputinitialdensity, force_sim, False, self.verbosity, ) fnames = [fname_outputinitialdensity] result = self._compute_mocks(seednoise, fnames, fname_mocks, return_g, fname_g) else: result = [], [] # lists are mandatory for compatibility return result def _power_spectrum_from_theta( self, theta, fname_power_spectrum, theta_is_p=False, force=False ): """Compute the power spectrum values using spline interpolation. Parameters ---------- theta : ndarray Vector of power spectrum values at the support wavenumbers. fname_power_spectrum : str Name of the input/output power spectrum file. theta_is_p : bool, optional If True, theta is already an unnormalised power spectrum. Default is `False`. force : bool, optional If True, force recomputation. Default is `False`. """ from pysbmy.power import PowerSpectrum if (not os.path.exists(fname_power_spectrum)) or force: from scipy.interpolate import InterpolatedUnivariateSpline from pysbmy.power import FourierGrid PP = theta if theta_is_p else self.theta2P(theta) Spline = InterpolatedUnivariateSpline(self.k_s, PP, k=5) G_sim = FourierGrid.read(self.G_sim_path) power_spectrum = Spline(G_sim.k_modes) power_spectrum[0] = 0.0 P = PowerSpectrum.from_FourierGrid(G_sim, powerspectrum=power_spectrum) P.write(fname_power_spectrum) del G_sim collect() def _clean_output(self, fname_output): """Remove a file from disk if it exists. Parameters ---------- fname_output : str Name of the file to remove. """ if fname_output is not None and os.path.exists(fname_output): os.remove(fname_output) def _get_current_seed(self, parent_seed, fixed_seed, i): """Return the current seed for the i-th realisation. Parameters ---------- parent_seed : int or list of int The parent seed. fixed_seed : bool If True, use the parent seed directly. i : int Index of the current realisation. Returns ------- this_seed : int or list of int The seed for the current realisation. """ if fixed_seed: this_seed = parent_seed else: this_seed = [i, parent_seed] return this_seed