Source code for selfisys.normalise_hb

#!/usr/bin/env python3
# ----------------------------------------------------------------------
# Copyright (C) 2024 Tristan Hoellinger
# Distributed under the GNU General Public License v3.0 (GPLv3).
# See the LICENSE file in the root directory for details.
# SPDX-License-Identifier: GPL-3.0-or-later
# ----------------------------------------------------------------------

__author__ = "Tristan Hoellinger"
__version__ = "0.1.0"
__date__ = "2024"
__license__ = "GPLv3"

"""Tools to define normalisation constants for the hidden box."""

import os
import numpy as np
from typing import Tuple, Dict, Any

from selfisys.global_parameters import MIN_K_NORMALISATION
from selfisys.hiddenbox import HiddenBox


[docs] def worker_normalisation( hidden_box: HiddenBox, params: Tuple[Dict[str, Any], list, list, bool], ) -> np.ndarray: """Worker function to compute the normalisation constants, compatible with Python multiprocessing. Parameters ---------- hidden_box : HiddenBox Instance of the HiddenBox class. params : tuple A tuple containing (cosmo, seedphase, seednoise, force). Returns ------- phi : ndarray Computed summary statistics. """ ( cosmo, seedphase, seednoise, force, ) = params name = ( "norm" + "__" + "_".join([str(int(s)) for s in seedphase]) + "__" + "_".join([str(int(s)) for s in seednoise]) ) if hidden_box.verbosity > 1: hidden_box._PrintMessage(1, "Running simulation...") hidden_box._indent() phi = hidden_box.make_data( cosmo, name, seedphase, seednoise, force, force, force, force, ) hidden_box._unindent() elif hidden_box.verbosity > 0: from selfisys.utils.low_level import ( stdout_redirector, ) from io import BytesIO f = BytesIO() with stdout_redirector(f): phi = hidden_box.make_data( cosmo, name, seedphase, seednoise, force, force, force, force, ) f.close() else: from selfisys.utils.low_level import ( stdout_redirector, stderr_redirector, ) from io import BytesIO f = BytesIO() g = BytesIO() with stdout_redirector(f), stderr_redirector(g): phi = hidden_box.make_data( cosmo, name, seedphase, seednoise, force, force, force, force, ) f.close() g.close() return phi
[docs] def worker_normalisation_wrapper(args): """Wrapper function for the worker_normalisation function. Parameters ---------- args : tuple A tuple containing (hidden_box, params). Returns ------- phi : ndarray Computed summary statistics. """ hidden_box, params = args return worker_normalisation(hidden_box, params)
[docs] def worker_normalisation_public( hidden_box, cosmo: Dict[str, Any], N: int, i: int, ): """Run the i-th simulation required to compute the normalisation constants. Parameters ---------- hidden_box : HiddenBox Instance of the HiddenBox class. cosmo : dict Cosmological and some infrastructure parameters. N : int Total number of realisations required. i : int Index of the simulation to be computed. """ params = ( cosmo, [ i, hidden_box._HiddenBox__global_seednorm, ], [ i + N, hidden_box._HiddenBox__global_seednorm, ], False, ) worker_normalisation(hidden_box, params)
[docs] def define_normalisation( hidden_box: HiddenBox, Pbins: np.ndarray, cosmo: Dict[str, Any], N: int, min_k_norma: float = MIN_K_NORMALISATION, npar: int = 1, force: bool = False, ) -> np.ndarray: """Define the normalisation constants for the HiddenBox instance. Parameters ---------- hidden_box : HiddenBox Instance of the HiddenBox class. Pbins : ndarray Array of P bin values. cosmo : dict Cosmological and infrastructure parameters. N : int Number of realisations required. min_k_norma : float, optional Minimum k value to compute the normalisation constants. npar : int, optional Number of parallel processes to use. Default is 1. force : bool, optional If True, force recomputation. Default is False. Returns ------- norm_csts : ndarray Normalisation constants for the HiddenBox instance. """ import tqdm.auto as tqdm from multiprocessing import Pool hidden_box._PrintMessage( 0, "Defining normalisation constants...", ) hidden_box._indent() indices = np.where(Pbins > min_k_norma) tasks = [ ( hidden_box, ( cosmo, [ i, hidden_box._HiddenBox__global_seednorm, ], [ i + N, hidden_box._HiddenBox__global_seednorm, ], force, ), ) for i in range(N) ] ncors = os.cpu_count() nprocs = min(npar, ncors) norm_csts_list = np.zeros((hidden_box._Npop, N)) if npar > 1: with Pool(nprocs) as p: for j, val in enumerate( tqdm.tqdm( p.imap( worker_normalisation_wrapper, tasks, ), total=N, ) ): norm_csts_list[:, j] = np.array( [ np.mean( val[i * hidden_box.Psingle : (i + 1) * hidden_box.Psingle][indices] ) for i in range(hidden_box._Npop) ] ) else: for j, val in enumerate( tqdm.tqdm( map( worker_normalisation_wrapper, tasks, ), total=N, ) ): val = np.array(val) norm_csts_list[:, j] = np.array( [ np.mean(val[i * hidden_box.Psingle : (i + 1) * hidden_box.Psingle][indices]) for i in range(hidden_box._Npop) ] ) norm_csts = np.mean(norm_csts_list, axis=1) hidden_box._unindent() hidden_box._PrintMessage( 0, "Defining normalisation constants done.", ) return norm_csts