comparison crbeam.py @ 0:f40d05521dca draft default tip

planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit de01e3c02a26cd6353a6b9b6f8d1be44de8ccd54
author astroteam
date Fri, 25 Apr 2025 19:33:20 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:f40d05521dca
1 import subprocess
2 import sys
3 import tempfile
4 from os import makedirs, path
5 from os.path import join
6
7 import numpy as np
8
9
10 class objdict(dict):
11 def __getattr__(self, name):
12 if name in self:
13 return self[name]
14 else:
15 raise AttributeError("No such attribute: " + name)
16
17 def __setattr__(self, name, value):
18 self[name] = value
19
20 def __delattr__(self, name):
21 if name in self:
22 del self[name]
23 else:
24 raise AttributeError("No such attribute: " + name)
25
26
27 class CRbeam(object):
28 default_parameters = dict(
29 nparticles=10000,
30 z=0.1,
31 emax=1e13,
32 emin=1e7,
33 emin_source=None, # if None, emin is used
34 primary="photon",
35 EGMF=0.0,
36 lmaxEGMF=5,
37 lminEGMF=0.05,
38 background=12,
39 )
40
41 particles = dict(
42 electron=0, positron=1, photon=2, gamma=2, neutron=9, proton=10
43 )
44
45 prog_name = "crbeam"
46
47 # Here we always use fixed power law E^{-1} for MC and then adjust weights for the events if needed
48 power_law = 1
49
50 def __init__(self, **kwargs):
51 self._step = 0
52 self._cache = None
53 self._size_per_step = 100
54 p = objdict(self.default_parameters)
55 p.update(kwargs)
56 if p.emin_source is None:
57 p.emin_source = p.emin
58
59 self._cache_name_prefix = (
60 f"{p.primary}_z{p.z}_E_{p.emin:g}_{p.emax:g}_{p.background}"
61 )
62 if p.emin_source < p.emax:
63 self._cache_name_prefix += (
64 f"_pow{self.power_law}_Emin{p.emin_source:g}"
65 )
66 if p.EGMF > 0:
67 self._cache_name_prefix += (
68 f"_B{p.EGMF:g}_turbL{p.lminEGMF}-{p.lmaxEGMF}"
69 )
70
71 self._cache_name_prefix += "_N"
72
73 self._output_dir = self._cache_name_prefix + f"{p.nparticles}"
74
75 p.primary = self.particles[p.primary]
76
77 self.p = p
78
79 @property
80 def output_dir(self):
81 return self._output_dir
82
83 @property
84 def cache(self):
85 from cache import CRbeamCache
86
87 if self._cache is None:
88 self._cache = CRbeamCache()
89 return self._cache
90
91 @property
92 def command(self):
93 result = f"{self.prog_name} "
94 result += " ".join(
95 [f"--{key} {value}" for key, value in self.p.items()]
96 )
97 result += " --output " + self.output_dir
98 power = 1 if self.p.emin_source < self.p.emax else -1000
99 result += f" --power {power}"
100 if self.p.EGMF > 0:
101 # use turbulent magnetic field with unique random orientation for all particles
102 result += " -mft -mfr"
103 return result
104
105 @property
106 def output_path(self):
107 return join(self.output_dir, "z0")
108
109 def run(self, force_overwrite):
110 command_suffix = ""
111 if force_overwrite:
112 command_suffix = " --overwrite"
113 if path.isdir(self.output_path) and not force_overwrite:
114 return
115 logs_dir = "logs"
116 makedirs(logs_dir, exist_ok=True)
117 log_file = f"{logs_dir}/{self.output_dir}.log"
118 with tempfile.NamedTemporaryFile() as script_file:
119 with open(script_file.name, "wt") as out_script:
120 print("#!/bin/bash", file=out_script)
121 print(
122 self.command + command_suffix,
123 "2>&1 1>" + log_file,
124 file=out_script,
125 )
126 subprocess.run(["bash", script_file.name])
127 return log_file
128
129 def run_cached(self, overwrite_local_cache=False):
130 import shutil
131
132 output_root = self.output_dir
133 if path.isdir(output_root):
134 if not overwrite_local_cache:
135 return self.output_path
136 shutil.rmtree(output_root)
137
138 size = self.cache.get_cache_size(self._cache_name_prefix)
139 print("s3 data size: ", size)
140 skip_paths = []
141 makedirs(self.output_path, exist_ok=False)
142 if size < self.p.nparticles:
143 try:
144 self.p.nparticles = self.p.nparticles - size
145 self.run(force_overwrite=True)
146 skip_paths.append(self.upload_cache())
147 finally:
148 self.p.nparticles = self.p.nparticles + size
149 else:
150 self.p.nparticles = size
151
152 if size > 0: # append results from s3 cache to the local cach file
153 self.cache.load_results(
154 self.output_path + "/photon",
155 self._cache_name_prefix,
156 skip_paths=skip_paths,
157 )
158
159 return self.output_path
160
161 def start_multistage_run(self, overwrite_local_cache=False, n_steps=10):
162 """
163 Initialize environment for running different parts of simulation in subsiquent calls of simulation_step
164 This function is useful if we show progress by running code in different jupyter cells
165 :param overwrite_local_cache: remove local cache
166 :param n_steps: number intermediate steps
167 :return: True if further simulation is needed, False if data is already available
168 """
169 import shutil
170
171 output_root = self.output_dir
172 if path.isdir(output_root):
173 if not overwrite_local_cache:
174 return False
175 shutil.rmtree(output_root)
176
177 size = self.cache.get_cache_size(self._cache_name_prefix)
178 print("s3 data size: ", size)
179 if size >= self.p.nparticles:
180 makedirs(self.output_path, exist_ok=False)
181 self.cache.load_results(
182 self.output_path + "/photon", self._cache_name_prefix
183 )
184 self.p.nparticles = size
185 return False
186 self.p.nparticles = self.p.nparticles - size
187 self._size_per_step = max(
188 100, int(np.ceil((self.p.nparticles) / n_steps))
189 )
190 self._step = 0
191 return True
192
193 def simulation_step(self):
194 size = self._size_per_step * self._step
195 target_nparticles = self.p.nparticles
196 if size >= target_nparticles:
197 return False
198 save_output_dir = self._output_dir
199 try:
200 adjusted_size_per_step = min(
201 self._size_per_step, target_nparticles - size
202 )
203 self.p.nparticles = adjusted_size_per_step
204 self._step += 1
205 self._output_dir = f"{self._output_dir}_step{self._step}"
206 self.run(force_overwrite=True)
207 finally:
208 self.p.nparticles = target_nparticles
209 self._output_dir = save_output_dir
210
211 return size + adjusted_size_per_step < target_nparticles
212
213 def end_multistep_run(self):
214 makedirs(self.output_path, exist_ok=False)
215 with tempfile.NamedTemporaryFile() as script_file:
216 with open(script_file.name, "wt") as task:
217 print(
218 f"cat {self.output_dir}_step*/z0/photon >"
219 f" {self.output_path}/photon",
220 file=task,
221 )
222 subprocess.run(["bash", script_file.name])
223 try:
224 skip_paths = [self.upload_cache()]
225 self.cache.load_results(
226 self.output_path + "/photon",
227 self._cache_name_prefix,
228 skip_paths=skip_paths,
229 )
230 self.p.nparticles = self.cache.get_cache_size(
231 self._cache_name_prefix
232 )
233 except Exception as ex:
234 print(ex, file=sys.stderr)
235 return self.output_path
236
237 def upload_cache(self):
238 source_file = self.output_path + "/photon"
239 if not path.isfile(source_file):
240 if path.isdir(self.output_path):
241 print("empty result")
242 return None
243 else:
244 assert False, "result dir not found"
245 obj_name = self._cache_name_prefix + f"{self.p.nparticles}"
246 print("saving", source_file, "to s3 as", obj_name)
247 return self.cache.save(obj_name, source_file)
248
249 def remove_cache(self):
250 from cache import CRbeamCache
251
252 c = CRbeamCache()
253 prefix = (
254 self._cache_name_prefix
255 ) # todo: use metadata when metadata reading is implemented
256 c.detete_results(prefix)