Source code for selfisys.setup_model

#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2024 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------

__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2024"
__license__ = "GPLv3"

"""
Set up parameters related to the grid and the fiducial power spectrum.
"""

import os.path
import gc
from typing import Optional, NamedTuple
import numpy as np
from h5py import File

from pysbmy.power import PowerSpectrum, FourierGrid, get_Pk
from selfisys.utils.logger import getCustomLogger
from selfisys.utils.tools import get_k_max

logger = getCustomLogger(__name__)


[docs] class ModelSetup(NamedTuple): size: int L: float P: int S: int G_sim_path: str G_ss_path: str Pbins_bnd: np.ndarray Pbins: np.ndarray k_s: np.ndarray P_ss_obj_path: str P_0: np.ndarray planck_Pk: np.ndarray
[docs] def setup_model( workdir: str, params_planck: dict, params_P0: dict, size: int = 256, L: float = 3600.0, S: int = 100, N_exact: int = 8, Pinit: int = 50, trim_threshold: int = 100, minval: Optional[float] = None, maxval: Optional[float] = None, force: bool = False, ) -> ModelSetup: """ Set up the model by computing or loading necessary grids and parameters. Parameters ---------- workdir : str Directory where the results will be stored. params_planck : dict Parameters for the Planck 2018 cosmology. params_P0 : dict Parameters for the normalisation power spectrum. size : int Number of elements in each direction of the box. L : float Comoving length of the box in Mpc/h. S : int Number of support wavenumbers for the input power spectra. N_exact : int Number of support wavenumbers matching the Fourier grid. Pinit : int Maximum number of bins for the summaries. trim_threshold : int Minimum number of modes required per bin. minval : float, optional Minimum k value for the summaries. maxval : float, optional Maximum k value for the summaries. force : bool If True, forces recomputation of the inputs. Returns ------- ModelSetup A named tuple containing: - size (int): Number of elements in each direction of the box. - L (float): Comoving length of the box in Mpc/h. - P (int): Number of bins for the summaries. - S (int): Number of support wavenumbers for input powerspectra. - G_sim_path (str): Path to the full Fourier grid file. - G_ss_path (str): Path to the Fourier grid for summaries file. - Pbins_bnd (np.ndarray): Boundaries of summary bins. - Pbins (np.ndarray): Centres of the bins for the summaries. - k_s (np.ndarray): Support wavenumbers for input power spectra. - P_ss_obj_path (str): Path to the summary power spectrum file. - P_0 (np.ndarray): Normalisation power spectrum values. - planck_Pk (np.ndarray): Planck 2018 power spectrum values. """ # Check input parameters if N_exact < 0 or N_exact > S: raise ValueError("Parameter 'N_exact' must be between 0 and 'S'.") # Define file paths G_sim_path = os.path.join(workdir, "G_sim.h5") k_s_path = os.path.join(workdir, "k_s.npy") G_ss_path = os.path.join(workdir, "G_ss.h5") P_ss_obj_path = os.path.join(workdir, "P_ss_obj.h5") P_0_path = os.path.join(workdir, "P_0.npy") theta_planck_path = os.path.join(workdir, "theta_planck.npy") Pbins_path = os.path.join(workdir, "Pbins.npy") Pbins_bnd_path = os.path.join(workdir, "Pbins_bnd.npy") # Compute or load the full Fourier grid if not os.path.exists(G_sim_path) or force: logger.info("Computing Fourier grid...") G_sim = FourierGrid(L, L, L, size, size, size) G_sim.write(G_sim_path) logger.info("Computing Fourier grid done.") else: logger.info("Loading Fourier grid.") G_sim = FourierGrid.read(G_sim_path) # Determine minimum and maximum k values if minval is None: minval = np.min(G_sim.k_modes[G_sim.k_modes != 0]) if maxval is None: maxval = np.pi * size / L # 1D Nyquist frequency # Compute or load support wavenumbers for the input power spectrum if not os.path.exists(k_s_path) or force: logger.diagnostic("Computing input power spectrum support wavenumbers...") k_s = np.zeros(S) sorted_knorms = np.sort(G_sim.k_modes.flatten()) unique_indices = np.unique(np.round(sorted_knorms, 5), return_index=True)[1] sorted_knorms_corrected = sorted_knorms[unique_indices] k_s[:N_exact] = sorted_knorms_corrected[1 : N_exact + 1] k_s_max = get_k_max(L, size) k_s[N_exact:] = np.logspace( np.log10(sorted_knorms_corrected[N_exact]), np.log10(k_s_max), S - N_exact + 1, )[1:] np.save(k_s_path, k_s) logger.diagnostic("Computing input power spectrum support wavenumbers done.") else: logger.diagnostic("Loading input power spectrum support wavenumbers.") try: k_s = np.load(k_s_path) except (IOError, FileNotFoundError) as e: logger.error(f"Failed to load k_s from {k_s_path}: {e}") raise # Initialise Pbins Pbins_left_bnds_init = np.logspace( np.log10(minval), np.log10(maxval), Pinit + 1, dtype=np.float32 ) Pbins_left_bnds_init = Pbins_left_bnds_init[:-1] # Compute or load Fourier grid for the summaries if not os.path.exists(G_ss_path) or force: G_ss = FourierGrid( L, L, L, size, size, size, k_modes=Pbins_left_bnds_init, kmax=maxval, trim_bins=True, trim_threshold=trim_threshold, ) G_ss.write(G_ss_path) else: G_ss = FourierGrid.read(G_ss_path) P = G_ss.NUM_MODES # Compute or load Pbins and Pbins_bnd if not os.path.exists(Pbins_path) or not os.path.exists(Pbins_bnd_path) or force: k_ss_max_offset = Pbins_left_bnds_init[-1] - Pbins_left_bnds_init[-2] logger.diagnostic(f"k_ss_max_offset: {k_ss_max_offset:.5f}") Pbins_bnd = G_ss.k_modes Pbins_bnd = np.concatenate([Pbins_bnd, [Pbins_bnd[-1] + k_ss_max_offset]]) Pbins = (Pbins_bnd[1:] + Pbins_bnd[:-1]) / 2 np.save(Pbins_path, Pbins) np.save(Pbins_bnd_path, Pbins_bnd) else: try: Pbins = np.load(Pbins_path) Pbins_bnd = np.load(Pbins_bnd_path) except (IOError, FileNotFoundError) as e: logger.error(f"Failed to load Pbins or Pbins_bnd: {e}") raise # Compute or load BBKS spectrum for normalisation if not os.path.exists(P_0_path) or force: P_0 = get_Pk(k_s, params_P0) np.save(P_0_path, P_0) else: try: P_0 = np.load(P_0_path) except (IOError, FileNotFoundError) as e: logger.error(f"Failed to load P_0 from {P_0_path}: {e}") raise if not os.path.exists(P_ss_obj_path) or force: P_0_ss = get_Pk(G_ss.k_modes, params_P0) P_ss_obj = PowerSpectrum.from_FourierGrid(G_ss, powerspectrum=P_0_ss, cosmo=params_P0) P_ss_obj.write(P_ss_obj_path) else: P_ss_obj = PowerSpectrum.read(P_ss_obj_path) # Compute or load Planck power spectrum if not os.path.exists(theta_planck_path) or force: planck_Pk = get_Pk(k_s, params_planck) np.save(theta_planck_path, planck_Pk) else: try: planck_Pk = np.load(theta_planck_path) except (IOError, FileNotFoundError) as e: logger.error(f"Failed to load theta_planck from {theta_planck_path}: {e}") raise # Clean up del G_sim, G_ss, P_ss_obj, Pbins_left_bnds_init gc.collect() return ModelSetup( size, L, P, S, G_sim_path, G_ss_path, Pbins_bnd, Pbins, k_s, P_ss_obj_path, P_0, planck_Pk, )
[docs] def compute_alpha_cv( workdir: str, k_s: np.ndarray, size: int, L: float, window_fct_path: Optional[str] = None, force: bool = False, ) -> None: """ Compute the cosmic variance parameter alpha_cv. Parameters ---------- workdir : str Directory where the results will be stored. k_s : np.ndarray Support wavenumbers. size : int Number of elements in each direction of the box. L : float Comoving length of the box in Mpc/h. window_fct_path : str, optional Path to the window function file. force : bool If True, forces recomputation of the inputs. """ from scipy.optimize import curve_fit alpha_cv_path = os.path.join(workdir, "alpha_cv.npy") alpha_cv_eff_path = os.path.join(workdir, "alpha_cv_eff.npy") if not os.path.exists(alpha_cv_path) or force: logger.info("Computing cosmic variance alpha_cv...") k_s_bnd = np.concatenate([k_s, [np.inf]]) G_sim = FourierGrid.read(os.path.join(workdir, "G_sim.h5")).k_modes.flatten() knorms = np.sort(G_sim) Nks, _ = np.histogram(knorms, bins=k_s_bnd) del knorms, G_sim nyquist_frequency = np.pi * size / L idx_nyquist = np.searchsorted(k_s, nyquist_frequency) def cubic_func(x, a): return a * x**3 try: popt, _ = curve_fit(cubic_func, k_s[:idx_nyquist], Nks[:idx_nyquist]) except RuntimeError as e: logger.error(f"Curve fitting failed: {e}") raise alpha_cv = np.sqrt(1 / popt[0]) np.save(alpha_cv_path, alpha_cv) logger.info(f"Computing cosmic variance alpha_cv done. alpha_cv = {alpha_cv}") if window_fct_path is not None: # Compute alpha_cv with approximate correction for the effective volume nnz = 0 with File(window_fct_path, "r") as f: for ipop in range(3): mask = f["select_fct"][:][ipop] nnz += np.sum(mask) nnz_size = nnz ** (1 / 3.0) # Side length of a cube containing nnz voxels eff_L = nnz_size * L / size alpha_cv_eff = alpha_cv * (L / eff_L) ** 1.5 logger.info(f"Effective length: {eff_L * 1e-3} Gpc/h") logger.info(f"Effective volume: {(eff_L * 1e-3) ** 3} (Gpc/h)^3") logger.info(f"alpha_cv_eff = {alpha_cv_eff}") np.save(alpha_cv_eff_path, alpha_cv_eff) gc.collect()