Mercurial > repos > astroteam > crbeam_astro_tool
comparison crbeam.py @ 0:f40d05521dca draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit de01e3c02a26cd6353a6b9b6f8d1be44de8ccd54
| author | astroteam |
|---|---|
| date | Fri, 25 Apr 2025 19:33:20 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:f40d05521dca |
|---|---|
| 1 import subprocess | |
| 2 import sys | |
| 3 import tempfile | |
| 4 from os import makedirs, path | |
| 5 from os.path import join | |
| 6 | |
| 7 import numpy as np | |
| 8 | |
| 9 | |
| 10 class objdict(dict): | |
| 11 def __getattr__(self, name): | |
| 12 if name in self: | |
| 13 return self[name] | |
| 14 else: | |
| 15 raise AttributeError("No such attribute: " + name) | |
| 16 | |
| 17 def __setattr__(self, name, value): | |
| 18 self[name] = value | |
| 19 | |
| 20 def __delattr__(self, name): | |
| 21 if name in self: | |
| 22 del self[name] | |
| 23 else: | |
| 24 raise AttributeError("No such attribute: " + name) | |
| 25 | |
| 26 | |
| 27 class CRbeam(object): | |
| 28 default_parameters = dict( | |
| 29 nparticles=10000, | |
| 30 z=0.1, | |
| 31 emax=1e13, | |
| 32 emin=1e7, | |
| 33 emin_source=None, # if None, emin is used | |
| 34 primary="photon", | |
| 35 EGMF=0.0, | |
| 36 lmaxEGMF=5, | |
| 37 lminEGMF=0.05, | |
| 38 background=12, | |
| 39 ) | |
| 40 | |
| 41 particles = dict( | |
| 42 electron=0, positron=1, photon=2, gamma=2, neutron=9, proton=10 | |
| 43 ) | |
| 44 | |
| 45 prog_name = "crbeam" | |
| 46 | |
| 47 # Here we always use fixed power law E^{-1} for MC and then adjust weights for the events if needed | |
| 48 power_law = 1 | |
| 49 | |
| 50 def __init__(self, **kwargs): | |
| 51 self._step = 0 | |
| 52 self._cache = None | |
| 53 self._size_per_step = 100 | |
| 54 p = objdict(self.default_parameters) | |
| 55 p.update(kwargs) | |
| 56 if p.emin_source is None: | |
| 57 p.emin_source = p.emin | |
| 58 | |
| 59 self._cache_name_prefix = ( | |
| 60 f"{p.primary}_z{p.z}_E_{p.emin:g}_{p.emax:g}_{p.background}" | |
| 61 ) | |
| 62 if p.emin_source < p.emax: | |
| 63 self._cache_name_prefix += ( | |
| 64 f"_pow{self.power_law}_Emin{p.emin_source:g}" | |
| 65 ) | |
| 66 if p.EGMF > 0: | |
| 67 self._cache_name_prefix += ( | |
| 68 f"_B{p.EGMF:g}_turbL{p.lminEGMF}-{p.lmaxEGMF}" | |
| 69 ) | |
| 70 | |
| 71 self._cache_name_prefix += "_N" | |
| 72 | |
| 73 self._output_dir = self._cache_name_prefix + f"{p.nparticles}" | |
| 74 | |
| 75 p.primary = self.particles[p.primary] | |
| 76 | |
| 77 self.p = p | |
| 78 | |
| 79 @property | |
| 80 def output_dir(self): | |
| 81 return self._output_dir | |
| 82 | |
| 83 @property | |
| 84 def cache(self): | |
| 85 from cache import CRbeamCache | |
| 86 | |
| 87 if self._cache is None: | |
| 88 self._cache = CRbeamCache() | |
| 89 return self._cache | |
| 90 | |
| 91 @property | |
| 92 def command(self): | |
| 93 result = f"{self.prog_name} " | |
| 94 result += " ".join( | |
| 95 [f"--{key} {value}" for key, value in self.p.items()] | |
| 96 ) | |
| 97 result += " --output " + self.output_dir | |
| 98 power = 1 if self.p.emin_source < self.p.emax else -1000 | |
| 99 result += f" --power {power}" | |
| 100 if self.p.EGMF > 0: | |
| 101 # use turbulent magnetic field with unique random orientation for all particles | |
| 102 result += " -mft -mfr" | |
| 103 return result | |
| 104 | |
| 105 @property | |
| 106 def output_path(self): | |
| 107 return join(self.output_dir, "z0") | |
| 108 | |
| 109 def run(self, force_overwrite): | |
| 110 command_suffix = "" | |
| 111 if force_overwrite: | |
| 112 command_suffix = " --overwrite" | |
| 113 if path.isdir(self.output_path) and not force_overwrite: | |
| 114 return | |
| 115 logs_dir = "logs" | |
| 116 makedirs(logs_dir, exist_ok=True) | |
| 117 log_file = f"{logs_dir}/{self.output_dir}.log" | |
| 118 with tempfile.NamedTemporaryFile() as script_file: | |
| 119 with open(script_file.name, "wt") as out_script: | |
| 120 print("#!/bin/bash", file=out_script) | |
| 121 print( | |
| 122 self.command + command_suffix, | |
| 123 "2>&1 1>" + log_file, | |
| 124 file=out_script, | |
| 125 ) | |
| 126 subprocess.run(["bash", script_file.name]) | |
| 127 return log_file | |
| 128 | |
| 129 def run_cached(self, overwrite_local_cache=False): | |
| 130 import shutil | |
| 131 | |
| 132 output_root = self.output_dir | |
| 133 if path.isdir(output_root): | |
| 134 if not overwrite_local_cache: | |
| 135 return self.output_path | |
| 136 shutil.rmtree(output_root) | |
| 137 | |
| 138 size = self.cache.get_cache_size(self._cache_name_prefix) | |
| 139 print("s3 data size: ", size) | |
| 140 skip_paths = [] | |
| 141 makedirs(self.output_path, exist_ok=False) | |
| 142 if size < self.p.nparticles: | |
| 143 try: | |
| 144 self.p.nparticles = self.p.nparticles - size | |
| 145 self.run(force_overwrite=True) | |
| 146 skip_paths.append(self.upload_cache()) | |
| 147 finally: | |
| 148 self.p.nparticles = self.p.nparticles + size | |
| 149 else: | |
| 150 self.p.nparticles = size | |
| 151 | |
| 152 if size > 0: # append results from s3 cache to the local cach file | |
| 153 self.cache.load_results( | |
| 154 self.output_path + "/photon", | |
| 155 self._cache_name_prefix, | |
| 156 skip_paths=skip_paths, | |
| 157 ) | |
| 158 | |
| 159 return self.output_path | |
| 160 | |
| 161 def start_multistage_run(self, overwrite_local_cache=False, n_steps=10): | |
| 162 """ | |
| 163 Initialize environment for running different parts of simulation in subsiquent calls of simulation_step | |
| 164 This function is useful if we show progress by running code in different jupyter cells | |
| 165 :param overwrite_local_cache: remove local cache | |
| 166 :param n_steps: number intermediate steps | |
| 167 :return: True if further simulation is needed, False if data is already available | |
| 168 """ | |
| 169 import shutil | |
| 170 | |
| 171 output_root = self.output_dir | |
| 172 if path.isdir(output_root): | |
| 173 if not overwrite_local_cache: | |
| 174 return False | |
| 175 shutil.rmtree(output_root) | |
| 176 | |
| 177 size = self.cache.get_cache_size(self._cache_name_prefix) | |
| 178 print("s3 data size: ", size) | |
| 179 if size >= self.p.nparticles: | |
| 180 makedirs(self.output_path, exist_ok=False) | |
| 181 self.cache.load_results( | |
| 182 self.output_path + "/photon", self._cache_name_prefix | |
| 183 ) | |
| 184 self.p.nparticles = size | |
| 185 return False | |
| 186 self.p.nparticles = self.p.nparticles - size | |
| 187 self._size_per_step = max( | |
| 188 100, int(np.ceil((self.p.nparticles) / n_steps)) | |
| 189 ) | |
| 190 self._step = 0 | |
| 191 return True | |
| 192 | |
| 193 def simulation_step(self): | |
| 194 size = self._size_per_step * self._step | |
| 195 target_nparticles = self.p.nparticles | |
| 196 if size >= target_nparticles: | |
| 197 return False | |
| 198 save_output_dir = self._output_dir | |
| 199 try: | |
| 200 adjusted_size_per_step = min( | |
| 201 self._size_per_step, target_nparticles - size | |
| 202 ) | |
| 203 self.p.nparticles = adjusted_size_per_step | |
| 204 self._step += 1 | |
| 205 self._output_dir = f"{self._output_dir}_step{self._step}" | |
| 206 self.run(force_overwrite=True) | |
| 207 finally: | |
| 208 self.p.nparticles = target_nparticles | |
| 209 self._output_dir = save_output_dir | |
| 210 | |
| 211 return size + adjusted_size_per_step < target_nparticles | |
| 212 | |
| 213 def end_multistep_run(self): | |
| 214 makedirs(self.output_path, exist_ok=False) | |
| 215 with tempfile.NamedTemporaryFile() as script_file: | |
| 216 with open(script_file.name, "wt") as task: | |
| 217 print( | |
| 218 f"cat {self.output_dir}_step*/z0/photon >" | |
| 219 f" {self.output_path}/photon", | |
| 220 file=task, | |
| 221 ) | |
| 222 subprocess.run(["bash", script_file.name]) | |
| 223 try: | |
| 224 skip_paths = [self.upload_cache()] | |
| 225 self.cache.load_results( | |
| 226 self.output_path + "/photon", | |
| 227 self._cache_name_prefix, | |
| 228 skip_paths=skip_paths, | |
| 229 ) | |
| 230 self.p.nparticles = self.cache.get_cache_size( | |
| 231 self._cache_name_prefix | |
| 232 ) | |
| 233 except Exception as ex: | |
| 234 print(ex, file=sys.stderr) | |
| 235 return self.output_path | |
| 236 | |
| 237 def upload_cache(self): | |
| 238 source_file = self.output_path + "/photon" | |
| 239 if not path.isfile(source_file): | |
| 240 if path.isdir(self.output_path): | |
| 241 print("empty result") | |
| 242 return None | |
| 243 else: | |
| 244 assert False, "result dir not found" | |
| 245 obj_name = self._cache_name_prefix + f"{self.p.nparticles}" | |
| 246 print("saving", source_file, "to s3 as", obj_name) | |
| 247 return self.cache.save(obj_name, source_file) | |
| 248 | |
| 249 def remove_cache(self): | |
| 250 from cache import CRbeamCache | |
| 251 | |
| 252 c = CRbeamCache() | |
| 253 prefix = ( | |
| 254 self._cache_name_prefix | |
| 255 ) # todo: use metadata when metadata reading is implemented | |
| 256 c.detete_results(prefix) |
