Mercurial > repos > astroteam > crbeam_astro_tool
diff crbeam.py @ 0:f40d05521dca draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit de01e3c02a26cd6353a6b9b6f8d1be44de8ccd54
author | astroteam |
---|---|
date | Fri, 25 Apr 2025 19:33:20 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/crbeam.py Fri Apr 25 19:33:20 2025 +0000 @@ -0,0 +1,256 @@ +import subprocess +import sys +import tempfile +from os import makedirs, path +from os.path import join + +import numpy as np + + +class objdict(dict): + def __getattr__(self, name): + if name in self: + return self[name] + else: + raise AttributeError("No such attribute: " + name) + + def __setattr__(self, name, value): + self[name] = value + + def __delattr__(self, name): + if name in self: + del self[name] + else: + raise AttributeError("No such attribute: " + name) + + +class CRbeam(object): + default_parameters = dict( + nparticles=10000, + z=0.1, + emax=1e13, + emin=1e7, + emin_source=None, # if None, emin is used + primary="photon", + EGMF=0.0, + lmaxEGMF=5, + lminEGMF=0.05, + background=12, + ) + + particles = dict( + electron=0, positron=1, photon=2, gamma=2, neutron=9, proton=10 + ) + + prog_name = "crbeam" + + # Here we always use fixed power law E^{-1} for MC and then adjust weights for the events if needed + power_law = 1 + + def __init__(self, **kwargs): + self._step = 0 + self._cache = None + self._size_per_step = 100 + p = objdict(self.default_parameters) + p.update(kwargs) + if p.emin_source is None: + p.emin_source = p.emin + + self._cache_name_prefix = ( + f"{p.primary}_z{p.z}_E_{p.emin:g}_{p.emax:g}_{p.background}" + ) + if p.emin_source < p.emax: + self._cache_name_prefix += ( + f"_pow{self.power_law}_Emin{p.emin_source:g}" + ) + if p.EGMF > 0: + self._cache_name_prefix += ( + f"_B{p.EGMF:g}_turbL{p.lminEGMF}-{p.lmaxEGMF}" + ) + + self._cache_name_prefix += "_N" + + self._output_dir = self._cache_name_prefix + f"{p.nparticles}" + + p.primary = self.particles[p.primary] + + self.p = p + + @property + def output_dir(self): + return self._output_dir + + @property + def cache(self): + from cache import CRbeamCache + + if self._cache is None: + self._cache = CRbeamCache() + return self._cache + + @property + def command(self): + result = f"{self.prog_name} " + result += " ".join( + [f"--{key} {value}" for key, value in self.p.items()] + ) + result += " --output " + self.output_dir + power = 1 if self.p.emin_source < self.p.emax else -1000 + result += f" --power {power}" + if self.p.EGMF > 0: + # use turbulent magnetic field with unique random orientation for all particles + result += " -mft -mfr" + return result + + @property + def output_path(self): + return join(self.output_dir, "z0") + + def run(self, force_overwrite): + command_suffix = "" + if force_overwrite: + command_suffix = " --overwrite" + if path.isdir(self.output_path) and not force_overwrite: + return + logs_dir = "logs" + makedirs(logs_dir, exist_ok=True) + log_file = f"{logs_dir}/{self.output_dir}.log" + with tempfile.NamedTemporaryFile() as script_file: + with open(script_file.name, "wt") as out_script: + print("#!/bin/bash", file=out_script) + print( + self.command + command_suffix, + "2>&1 1>" + log_file, + file=out_script, + ) + subprocess.run(["bash", script_file.name]) + return log_file + + def run_cached(self, overwrite_local_cache=False): + import shutil + + output_root = self.output_dir + if path.isdir(output_root): + if not overwrite_local_cache: + return self.output_path + shutil.rmtree(output_root) + + size = self.cache.get_cache_size(self._cache_name_prefix) + print("s3 data size: ", size) + skip_paths = [] + makedirs(self.output_path, exist_ok=False) + if size < self.p.nparticles: + try: + self.p.nparticles = self.p.nparticles - size + self.run(force_overwrite=True) + skip_paths.append(self.upload_cache()) + finally: + self.p.nparticles = self.p.nparticles + size + else: + self.p.nparticles = size + + if size > 0: # append results from s3 cache to the local cach file + self.cache.load_results( + self.output_path + "/photon", + self._cache_name_prefix, + skip_paths=skip_paths, + ) + + return self.output_path + + def start_multistage_run(self, overwrite_local_cache=False, n_steps=10): + """ + Initialize environment for running different parts of simulation in subsiquent calls of simulation_step + This function is useful if we show progress by running code in different jupyter cells + :param overwrite_local_cache: remove local cache + :param n_steps: number intermediate steps + :return: True if further simulation is needed, False if data is already available + """ + import shutil + + output_root = self.output_dir + if path.isdir(output_root): + if not overwrite_local_cache: + return False + shutil.rmtree(output_root) + + size = self.cache.get_cache_size(self._cache_name_prefix) + print("s3 data size: ", size) + if size >= self.p.nparticles: + makedirs(self.output_path, exist_ok=False) + self.cache.load_results( + self.output_path + "/photon", self._cache_name_prefix + ) + self.p.nparticles = size + return False + self.p.nparticles = self.p.nparticles - size + self._size_per_step = max( + 100, int(np.ceil((self.p.nparticles) / n_steps)) + ) + self._step = 0 + return True + + def simulation_step(self): + size = self._size_per_step * self._step + target_nparticles = self.p.nparticles + if size >= target_nparticles: + return False + save_output_dir = self._output_dir + try: + adjusted_size_per_step = min( + self._size_per_step, target_nparticles - size + ) + self.p.nparticles = adjusted_size_per_step + self._step += 1 + self._output_dir = f"{self._output_dir}_step{self._step}" + self.run(force_overwrite=True) + finally: + self.p.nparticles = target_nparticles + self._output_dir = save_output_dir + + return size + adjusted_size_per_step < target_nparticles + + def end_multistep_run(self): + makedirs(self.output_path, exist_ok=False) + with tempfile.NamedTemporaryFile() as script_file: + with open(script_file.name, "wt") as task: + print( + f"cat {self.output_dir}_step*/z0/photon >" + f" {self.output_path}/photon", + file=task, + ) + subprocess.run(["bash", script_file.name]) + try: + skip_paths = [self.upload_cache()] + self.cache.load_results( + self.output_path + "/photon", + self._cache_name_prefix, + skip_paths=skip_paths, + ) + self.p.nparticles = self.cache.get_cache_size( + self._cache_name_prefix + ) + except Exception as ex: + print(ex, file=sys.stderr) + return self.output_path + + def upload_cache(self): + source_file = self.output_path + "/photon" + if not path.isfile(source_file): + if path.isdir(self.output_path): + print("empty result") + return None + else: + assert False, "result dir not found" + obj_name = self._cache_name_prefix + f"{self.p.nparticles}" + print("saving", source_file, "to s3 as", obj_name) + return self.cache.save(obj_name, source_file) + + def remove_cache(self): + from cache import CRbeamCache + + c = CRbeamCache() + prefix = ( + self._cache_name_prefix + ) # todo: use metadata when metadata reading is implemented + c.detete_results(prefix)