Mercurial > repos > astroteam > crbeam_astro_tool
comparison crbeam.py @ 0:f40d05521dca draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit de01e3c02a26cd6353a6b9b6f8d1be44de8ccd54
author | astroteam |
---|---|
date | Fri, 25 Apr 2025 19:33:20 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:f40d05521dca |
---|---|
1 import subprocess | |
2 import sys | |
3 import tempfile | |
4 from os import makedirs, path | |
5 from os.path import join | |
6 | |
7 import numpy as np | |
8 | |
9 | |
10 class objdict(dict): | |
11 def __getattr__(self, name): | |
12 if name in self: | |
13 return self[name] | |
14 else: | |
15 raise AttributeError("No such attribute: " + name) | |
16 | |
17 def __setattr__(self, name, value): | |
18 self[name] = value | |
19 | |
20 def __delattr__(self, name): | |
21 if name in self: | |
22 del self[name] | |
23 else: | |
24 raise AttributeError("No such attribute: " + name) | |
25 | |
26 | |
27 class CRbeam(object): | |
28 default_parameters = dict( | |
29 nparticles=10000, | |
30 z=0.1, | |
31 emax=1e13, | |
32 emin=1e7, | |
33 emin_source=None, # if None, emin is used | |
34 primary="photon", | |
35 EGMF=0.0, | |
36 lmaxEGMF=5, | |
37 lminEGMF=0.05, | |
38 background=12, | |
39 ) | |
40 | |
41 particles = dict( | |
42 electron=0, positron=1, photon=2, gamma=2, neutron=9, proton=10 | |
43 ) | |
44 | |
45 prog_name = "crbeam" | |
46 | |
47 # Here we always use fixed power law E^{-1} for MC and then adjust weights for the events if needed | |
48 power_law = 1 | |
49 | |
50 def __init__(self, **kwargs): | |
51 self._step = 0 | |
52 self._cache = None | |
53 self._size_per_step = 100 | |
54 p = objdict(self.default_parameters) | |
55 p.update(kwargs) | |
56 if p.emin_source is None: | |
57 p.emin_source = p.emin | |
58 | |
59 self._cache_name_prefix = ( | |
60 f"{p.primary}_z{p.z}_E_{p.emin:g}_{p.emax:g}_{p.background}" | |
61 ) | |
62 if p.emin_source < p.emax: | |
63 self._cache_name_prefix += ( | |
64 f"_pow{self.power_law}_Emin{p.emin_source:g}" | |
65 ) | |
66 if p.EGMF > 0: | |
67 self._cache_name_prefix += ( | |
68 f"_B{p.EGMF:g}_turbL{p.lminEGMF}-{p.lmaxEGMF}" | |
69 ) | |
70 | |
71 self._cache_name_prefix += "_N" | |
72 | |
73 self._output_dir = self._cache_name_prefix + f"{p.nparticles}" | |
74 | |
75 p.primary = self.particles[p.primary] | |
76 | |
77 self.p = p | |
78 | |
79 @property | |
80 def output_dir(self): | |
81 return self._output_dir | |
82 | |
83 @property | |
84 def cache(self): | |
85 from cache import CRbeamCache | |
86 | |
87 if self._cache is None: | |
88 self._cache = CRbeamCache() | |
89 return self._cache | |
90 | |
91 @property | |
92 def command(self): | |
93 result = f"{self.prog_name} " | |
94 result += " ".join( | |
95 [f"--{key} {value}" for key, value in self.p.items()] | |
96 ) | |
97 result += " --output " + self.output_dir | |
98 power = 1 if self.p.emin_source < self.p.emax else -1000 | |
99 result += f" --power {power}" | |
100 if self.p.EGMF > 0: | |
101 # use turbulent magnetic field with unique random orientation for all particles | |
102 result += " -mft -mfr" | |
103 return result | |
104 | |
105 @property | |
106 def output_path(self): | |
107 return join(self.output_dir, "z0") | |
108 | |
109 def run(self, force_overwrite): | |
110 command_suffix = "" | |
111 if force_overwrite: | |
112 command_suffix = " --overwrite" | |
113 if path.isdir(self.output_path) and not force_overwrite: | |
114 return | |
115 logs_dir = "logs" | |
116 makedirs(logs_dir, exist_ok=True) | |
117 log_file = f"{logs_dir}/{self.output_dir}.log" | |
118 with tempfile.NamedTemporaryFile() as script_file: | |
119 with open(script_file.name, "wt") as out_script: | |
120 print("#!/bin/bash", file=out_script) | |
121 print( | |
122 self.command + command_suffix, | |
123 "2>&1 1>" + log_file, | |
124 file=out_script, | |
125 ) | |
126 subprocess.run(["bash", script_file.name]) | |
127 return log_file | |
128 | |
129 def run_cached(self, overwrite_local_cache=False): | |
130 import shutil | |
131 | |
132 output_root = self.output_dir | |
133 if path.isdir(output_root): | |
134 if not overwrite_local_cache: | |
135 return self.output_path | |
136 shutil.rmtree(output_root) | |
137 | |
138 size = self.cache.get_cache_size(self._cache_name_prefix) | |
139 print("s3 data size: ", size) | |
140 skip_paths = [] | |
141 makedirs(self.output_path, exist_ok=False) | |
142 if size < self.p.nparticles: | |
143 try: | |
144 self.p.nparticles = self.p.nparticles - size | |
145 self.run(force_overwrite=True) | |
146 skip_paths.append(self.upload_cache()) | |
147 finally: | |
148 self.p.nparticles = self.p.nparticles + size | |
149 else: | |
150 self.p.nparticles = size | |
151 | |
152 if size > 0: # append results from s3 cache to the local cach file | |
153 self.cache.load_results( | |
154 self.output_path + "/photon", | |
155 self._cache_name_prefix, | |
156 skip_paths=skip_paths, | |
157 ) | |
158 | |
159 return self.output_path | |
160 | |
161 def start_multistage_run(self, overwrite_local_cache=False, n_steps=10): | |
162 """ | |
163 Initialize environment for running different parts of simulation in subsiquent calls of simulation_step | |
164 This function is useful if we show progress by running code in different jupyter cells | |
165 :param overwrite_local_cache: remove local cache | |
166 :param n_steps: number intermediate steps | |
167 :return: True if further simulation is needed, False if data is already available | |
168 """ | |
169 import shutil | |
170 | |
171 output_root = self.output_dir | |
172 if path.isdir(output_root): | |
173 if not overwrite_local_cache: | |
174 return False | |
175 shutil.rmtree(output_root) | |
176 | |
177 size = self.cache.get_cache_size(self._cache_name_prefix) | |
178 print("s3 data size: ", size) | |
179 if size >= self.p.nparticles: | |
180 makedirs(self.output_path, exist_ok=False) | |
181 self.cache.load_results( | |
182 self.output_path + "/photon", self._cache_name_prefix | |
183 ) | |
184 self.p.nparticles = size | |
185 return False | |
186 self.p.nparticles = self.p.nparticles - size | |
187 self._size_per_step = max( | |
188 100, int(np.ceil((self.p.nparticles) / n_steps)) | |
189 ) | |
190 self._step = 0 | |
191 return True | |
192 | |
193 def simulation_step(self): | |
194 size = self._size_per_step * self._step | |
195 target_nparticles = self.p.nparticles | |
196 if size >= target_nparticles: | |
197 return False | |
198 save_output_dir = self._output_dir | |
199 try: | |
200 adjusted_size_per_step = min( | |
201 self._size_per_step, target_nparticles - size | |
202 ) | |
203 self.p.nparticles = adjusted_size_per_step | |
204 self._step += 1 | |
205 self._output_dir = f"{self._output_dir}_step{self._step}" | |
206 self.run(force_overwrite=True) | |
207 finally: | |
208 self.p.nparticles = target_nparticles | |
209 self._output_dir = save_output_dir | |
210 | |
211 return size + adjusted_size_per_step < target_nparticles | |
212 | |
213 def end_multistep_run(self): | |
214 makedirs(self.output_path, exist_ok=False) | |
215 with tempfile.NamedTemporaryFile() as script_file: | |
216 with open(script_file.name, "wt") as task: | |
217 print( | |
218 f"cat {self.output_dir}_step*/z0/photon >" | |
219 f" {self.output_path}/photon", | |
220 file=task, | |
221 ) | |
222 subprocess.run(["bash", script_file.name]) | |
223 try: | |
224 skip_paths = [self.upload_cache()] | |
225 self.cache.load_results( | |
226 self.output_path + "/photon", | |
227 self._cache_name_prefix, | |
228 skip_paths=skip_paths, | |
229 ) | |
230 self.p.nparticles = self.cache.get_cache_size( | |
231 self._cache_name_prefix | |
232 ) | |
233 except Exception as ex: | |
234 print(ex, file=sys.stderr) | |
235 return self.output_path | |
236 | |
237 def upload_cache(self): | |
238 source_file = self.output_path + "/photon" | |
239 if not path.isfile(source_file): | |
240 if path.isdir(self.output_path): | |
241 print("empty result") | |
242 return None | |
243 else: | |
244 assert False, "result dir not found" | |
245 obj_name = self._cache_name_prefix + f"{self.p.nparticles}" | |
246 print("saving", source_file, "to s3 as", obj_name) | |
247 return self.cache.save(obj_name, source_file) | |
248 | |
249 def remove_cache(self): | |
250 from cache import CRbeamCache | |
251 | |
252 c = CRbeamCache() | |
253 prefix = ( | |
254 self._cache_name_prefix | |
255 ) # todo: use metadata when metadata reading is implemented | |
256 c.detete_results(prefix) |