Mercurial > repos > astroteam > crbeam_astro_tool
view crbeam.py @ 0:f40d05521dca draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit de01e3c02a26cd6353a6b9b6f8d1be44de8ccd54
author | astroteam |
---|---|
date | Fri, 25 Apr 2025 19:33:20 +0000 |
parents | |
children |
line wrap: on
line source
import subprocess import sys import tempfile from os import makedirs, path from os.path import join import numpy as np class objdict(dict): def __getattr__(self, name): if name in self: return self[name] else: raise AttributeError("No such attribute: " + name) def __setattr__(self, name, value): self[name] = value def __delattr__(self, name): if name in self: del self[name] else: raise AttributeError("No such attribute: " + name) class CRbeam(object): default_parameters = dict( nparticles=10000, z=0.1, emax=1e13, emin=1e7, emin_source=None, # if None, emin is used primary="photon", EGMF=0.0, lmaxEGMF=5, lminEGMF=0.05, background=12, ) particles = dict( electron=0, positron=1, photon=2, gamma=2, neutron=9, proton=10 ) prog_name = "crbeam" # Here we always use fixed power law E^{-1} for MC and then adjust weights for the events if needed power_law = 1 def __init__(self, **kwargs): self._step = 0 self._cache = None self._size_per_step = 100 p = objdict(self.default_parameters) p.update(kwargs) if p.emin_source is None: p.emin_source = p.emin self._cache_name_prefix = ( f"{p.primary}_z{p.z}_E_{p.emin:g}_{p.emax:g}_{p.background}" ) if p.emin_source < p.emax: self._cache_name_prefix += ( f"_pow{self.power_law}_Emin{p.emin_source:g}" ) if p.EGMF > 0: self._cache_name_prefix += ( f"_B{p.EGMF:g}_turbL{p.lminEGMF}-{p.lmaxEGMF}" ) self._cache_name_prefix += "_N" self._output_dir = self._cache_name_prefix + f"{p.nparticles}" p.primary = self.particles[p.primary] self.p = p @property def output_dir(self): return self._output_dir @property def cache(self): from cache import CRbeamCache if self._cache is None: self._cache = CRbeamCache() return self._cache @property def command(self): result = f"{self.prog_name} " result += " ".join( [f"--{key} {value}" for key, value in self.p.items()] ) result += " --output " + self.output_dir power = 1 if self.p.emin_source < self.p.emax else -1000 result += f" --power {power}" if self.p.EGMF > 0: # use turbulent magnetic field with unique random orientation for all particles result += " -mft -mfr" return result @property def output_path(self): return join(self.output_dir, "z0") def run(self, force_overwrite): command_suffix = "" if force_overwrite: command_suffix = " --overwrite" if path.isdir(self.output_path) and not force_overwrite: return logs_dir = "logs" makedirs(logs_dir, exist_ok=True) log_file = f"{logs_dir}/{self.output_dir}.log" with tempfile.NamedTemporaryFile() as script_file: with open(script_file.name, "wt") as out_script: print("#!/bin/bash", file=out_script) print( self.command + command_suffix, "2>&1 1>" + log_file, file=out_script, ) subprocess.run(["bash", script_file.name]) return log_file def run_cached(self, overwrite_local_cache=False): import shutil output_root = self.output_dir if path.isdir(output_root): if not overwrite_local_cache: return self.output_path shutil.rmtree(output_root) size = self.cache.get_cache_size(self._cache_name_prefix) print("s3 data size: ", size) skip_paths = [] makedirs(self.output_path, exist_ok=False) if size < self.p.nparticles: try: self.p.nparticles = self.p.nparticles - size self.run(force_overwrite=True) skip_paths.append(self.upload_cache()) finally: self.p.nparticles = self.p.nparticles + size else: self.p.nparticles = size if size > 0: # append results from s3 cache to the local cach file self.cache.load_results( self.output_path + "/photon", self._cache_name_prefix, skip_paths=skip_paths, ) return self.output_path def start_multistage_run(self, overwrite_local_cache=False, n_steps=10): """ Initialize environment for running different parts of simulation in subsiquent calls of simulation_step This function is useful if we show progress by running code in different jupyter cells :param overwrite_local_cache: remove local cache :param n_steps: number intermediate steps :return: True if further simulation is needed, False if data is already available """ import shutil output_root = self.output_dir if path.isdir(output_root): if not overwrite_local_cache: return False shutil.rmtree(output_root) size = self.cache.get_cache_size(self._cache_name_prefix) print("s3 data size: ", size) if size >= self.p.nparticles: makedirs(self.output_path, exist_ok=False) self.cache.load_results( self.output_path + "/photon", self._cache_name_prefix ) self.p.nparticles = size return False self.p.nparticles = self.p.nparticles - size self._size_per_step = max( 100, int(np.ceil((self.p.nparticles) / n_steps)) ) self._step = 0 return True def simulation_step(self): size = self._size_per_step * self._step target_nparticles = self.p.nparticles if size >= target_nparticles: return False save_output_dir = self._output_dir try: adjusted_size_per_step = min( self._size_per_step, target_nparticles - size ) self.p.nparticles = adjusted_size_per_step self._step += 1 self._output_dir = f"{self._output_dir}_step{self._step}" self.run(force_overwrite=True) finally: self.p.nparticles = target_nparticles self._output_dir = save_output_dir return size + adjusted_size_per_step < target_nparticles def end_multistep_run(self): makedirs(self.output_path, exist_ok=False) with tempfile.NamedTemporaryFile() as script_file: with open(script_file.name, "wt") as task: print( f"cat {self.output_dir}_step*/z0/photon >" f" {self.output_path}/photon", file=task, ) subprocess.run(["bash", script_file.name]) try: skip_paths = [self.upload_cache()] self.cache.load_results( self.output_path + "/photon", self._cache_name_prefix, skip_paths=skip_paths, ) self.p.nparticles = self.cache.get_cache_size( self._cache_name_prefix ) except Exception as ex: print(ex, file=sys.stderr) return self.output_path def upload_cache(self): source_file = self.output_path + "/photon" if not path.isfile(source_file): if path.isdir(self.output_path): print("empty result") return None else: assert False, "result dir not found" obj_name = self._cache_name_prefix + f"{self.p.nparticles}" print("saving", source_file, "to s3 as", obj_name) return self.cache.save(obj_name, source_file) def remove_cache(self): from cache import CRbeamCache c = CRbeamCache() prefix = ( self._cache_name_prefix ) # todo: use metadata when metadata reading is implemented c.detete_results(prefix)