Mercurial > repos > astroteam > grb_detection_astro_tool
comparison detectgrb.py @ 0:c80db1ec6611 draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit 5b1251e9c9cdff7f825bfcfebdfdb12714c13d8f
author | astroteam |
---|---|
date | Wed, 13 Aug 2025 19:10:53 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:c80db1ec6611 |
---|---|
1 #!/usr/bin/env python | |
2 # coding: utf-8 | |
3 | |
4 #!/usr/bin/env python | |
5 | |
6 # This script is generated with nb2galaxy | |
7 | |
8 # flake8: noqa | |
9 | |
10 import json | |
11 import os | |
12 import shutil | |
13 | |
14 from oda_api.json import CustomJSONEncoder | |
15 | |
16 T1 = "2023-01-16T04:53:33.9" # http://odahub.io/ontology#StartTime | |
17 T2 = "2023-01-16T04:55:33.9" # http://odahub.io/ontology#EndTime | |
18 detection_time_scales = "1,10" | |
19 lc_time_scale = 0.1 # https://odahub.io/ontology#TimeIntervalSeconds | |
20 background_age = 10 # oda:TimeIntervalSeconds | |
21 min_sn = 5 # https://odahub.io/ontology#SignalToNoiseRatio | |
22 | |
23 _galaxy_wd = os.getcwd() | |
24 | |
25 with open("inputs.json", "r") as fd: | |
26 inp_dic = json.load(fd) | |
27 if "C_data_product_" in inp_dic.keys(): | |
28 inp_pdic = inp_dic["C_data_product_"] | |
29 else: | |
30 inp_pdic = inp_dic | |
31 T1 = str(inp_pdic["T1"]) | |
32 T2 = str(inp_pdic["T2"]) | |
33 detection_time_scales = str(inp_pdic["detection_time_scales"]) | |
34 lc_time_scale = float(inp_pdic["lc_time_scale"]) | |
35 background_age = float(inp_pdic["background_age"]) | |
36 min_sn = int(inp_pdic["min_sn"]) | |
37 | |
38 import numpy as np | |
39 from astropy.time import Time | |
40 from matplotlib import pylab as plt | |
41 from oda_api.api import DispatcherAPI | |
42 from oda_api.data_products import LightCurveDataProduct, PictureProduct | |
43 | |
44 disp = DispatcherAPI( | |
45 url="https://www.astro.unige.ch/mmoda//dispatch-data", instrument="mock" | |
46 ) | |
47 | |
48 par_dict = { | |
49 "T1": T1, | |
50 "T2": T2, | |
51 "T_format": "isot", | |
52 "instrument": "spi_acs", | |
53 "product": "spi_acs_lc", | |
54 "product_type": "Real", | |
55 "time_bin": lc_time_scale, | |
56 "time_bin_format": "sec", | |
57 } | |
58 | |
59 data_collection = disp.get_product(**par_dict) | |
60 | |
61 lc = data_collection.spi_acs_lc_0_query.data_unit[1].data | |
62 | |
63 lc["TIME"] = ( | |
64 lc["TIME"] / 24 / 3600 + (Time(T1).mjd + Time(T2).mjd) / 2.0 | |
65 ) # TODO: more accurately | |
66 | |
67 def rebin(x, n): | |
68 N = int(len(x) / n) | |
69 return np.array(x[: N * n]).reshape((N, n)).sum(1) | |
70 | |
71 lc = LightCurveDataProduct.from_arrays( | |
72 times=Time(lc["TIME"], format="mjd"), fluxes=lc["RATE"], errors=lc["ERROR"] | |
73 ) | |
74 | |
75 obj_results = [lc.encode()] | |
76 | |
77 T0_ijd = lc.data_unit[1].data["TIME"][np.argmax(lc.data_unit[1].data["FLUX"])] | |
78 | |
79 ijd2plot = lambda x: (x - T0_ijd) * 24 * 3600 | |
80 | |
81 m_bkg = ijd2plot(lc.data_unit[1].data["TIME"]) < background_age | |
82 np.sum(m_bkg) | |
83 bkg = np.mean(lc.data_unit[1].data["FLUX"][m_bkg]) | |
84 bkg | |
85 | |
86 from matplotlib import pylab as plt | |
87 | |
88 plt.figure(figsize=(10, 6)) | |
89 | |
90 plt.errorbar( | |
91 ijd2plot(lc.data_unit[1].data["TIME"]), | |
92 lc.data_unit[1].data["FLUX"], | |
93 lc.data_unit[1].data["ERROR"], | |
94 label=f"{lc_time_scale} s", | |
95 alpha=0.5, | |
96 ) | |
97 | |
98 best_detection = {"sn": None, "timescale": None, "t_max_sn": None} | |
99 | |
100 for detection_time_scale in detection_time_scales.split(","): | |
101 n = int(float(detection_time_scale.strip()) / lc_time_scale) | |
102 | |
103 r_t = rebin(lc.data_unit[1].data["TIME"], n) / n | |
104 r_c = rebin(lc.data_unit[1].data["FLUX"], n) / n | |
105 r_ce = rebin(lc.data_unit[1].data["ERROR"] ** 2, n) ** 0.5 / n | |
106 | |
107 sn = (r_c - bkg) / r_ce | |
108 imax = sn.argmax() | |
109 | |
110 if best_detection["sn"] is None or best_detection["sn"] < sn[imax]: | |
111 best_detection["sn"] = sn[imax] | |
112 best_detection["timescale"] = detection_time_scale | |
113 best_detection["t_max_sn"] = Time(r_t[imax], format="mjd").isot | |
114 | |
115 plt.errorbar( | |
116 ijd2plot(r_t), | |
117 r_c, | |
118 r_ce, | |
119 label=f"{detection_time_scale} s, S/N = {sn[imax]:.1f}", | |
120 ) | |
121 | |
122 plt.axhline(bkg, lw=3, ls=":", c="k") | |
123 | |
124 plt.grid() | |
125 plt.legend() | |
126 | |
127 plt.ylabel("Flux") | |
128 plt.xlabel("Time, MJD") | |
129 | |
130 plt.title(best_detection) | |
131 | |
132 plt.savefig("annotated-lc.png") | |
133 | |
134 Time(r_t, format="mjd").isot | |
135 | |
136 bin_image = PictureProduct.from_file("annotated-lc.png") | |
137 | |
138 detection_note = str(dict(best_detection=best_detection)) | |
139 | |
140 import numpy as np | |
141 from astropy import wcs | |
142 from astropy.io import fits | |
143 from matplotlib import pylab as plt | |
144 | |
145 x, y = np.meshgrid(np.linspace(0, 10, 500), np.linspace(0, 10, 500)) | |
146 | |
147 image = np.exp(-((x - 5) ** 2 + (y - 5) ** 2)) | |
148 | |
149 pixsize = 0.1 | |
150 Npix = 500 | |
151 cdec = 1 | |
152 | |
153 RA = 0 | |
154 DEC = 0 | |
155 | |
156 # Create a new WCS object. The number of axes must be set | |
157 # from the start | |
158 w = wcs.WCS(naxis=2) | |
159 | |
160 # Set up an "Airy's zenithal" projection | |
161 # Vector properties may be set with Python lists, or Numpy arrays | |
162 w.wcs.crpix = [image.shape[0] / 2.0, image.shape[1] / 2.0] | |
163 w.wcs.cdelt = np.array([pixsize / cdec, pixsize]) | |
164 w.wcs.crval = [RA, DEC] | |
165 w.wcs.ctype = ["RA---CAR", "DEC--CAR"] | |
166 w.wcs.set_pv([(1, 1, 45.0)]) | |
167 | |
168 # Now, write out the WCS object as a FITS header | |
169 header = w.to_header() | |
170 | |
171 # header is an astropy.io.fits.Header object. We can use it to create a new | |
172 # PrimaryHDU and write it to a file. | |
173 hdu = fits.PrimaryHDU(image, header=header) | |
174 hdu.writeto("Image.fits", overwrite=True) | |
175 hdu = fits.open("Image.fits") | |
176 im = hdu[0].data | |
177 from astropy.wcs import WCS | |
178 | |
179 wcs = WCS(hdu[0].header) | |
180 plt.subplot(projection=wcs) | |
181 plt.imshow(im, origin="lower") | |
182 plt.grid(color="white", ls="solid") | |
183 plt.xlabel("RA") | |
184 plt.ylabel("Dec") | |
185 | |
186 from oda_api.data_products import ImageDataProduct | |
187 | |
188 fits_image = ImageDataProduct.from_fits_file("Image.fits") | |
189 | |
190 lc = lc # http://odahub.io/ontology#LightCurve | |
191 | |
192 detection_comment = detection_note # http://odahub.io/ontology#ODATextProduct | |
193 image_output = bin_image # http://odahub.io/ontology#ODAPictureProduct | |
194 image_fits = fits_image # http://odahub.io/ontology#Image | |
195 | |
196 # output gathering | |
197 _galaxy_meta_data = {} | |
198 _oda_outs = [] | |
199 _oda_outs.append(("out_detectgrb_lc", "lc_galaxy.output", lc)) | |
200 _oda_outs.append( | |
201 ( | |
202 "out_detectgrb_detection_comment", | |
203 "detection_comment_galaxy.output", | |
204 detection_comment, | |
205 ) | |
206 ) | |
207 _oda_outs.append( | |
208 ("out_detectgrb_image_output", "image_output_galaxy.output", image_output) | |
209 ) | |
210 _oda_outs.append( | |
211 ("out_detectgrb_image_fits", "image_fits_galaxy.output", image_fits) | |
212 ) | |
213 | |
214 for _outn, _outfn, _outv in _oda_outs: | |
215 _galaxy_outfile_name = os.path.join(_galaxy_wd, _outfn) | |
216 if isinstance(_outv, str) and os.path.isfile(_outv): | |
217 shutil.move(_outv, _galaxy_outfile_name) | |
218 _galaxy_meta_data[_outn] = {"ext": "_sniff_"} | |
219 elif getattr(_outv, "write_fits_file", None): | |
220 _outv.write_fits_file(_galaxy_outfile_name) | |
221 _galaxy_meta_data[_outn] = {"ext": "fits"} | |
222 elif getattr(_outv, "write_file", None): | |
223 _outv.write_file(_galaxy_outfile_name) | |
224 _galaxy_meta_data[_outn] = {"ext": "_sniff_"} | |
225 else: | |
226 with open(_galaxy_outfile_name, "w") as fd: | |
227 json.dump(_outv, fd, cls=CustomJSONEncoder) | |
228 _galaxy_meta_data[_outn] = {"ext": "json"} | |
229 | |
230 with open(os.path.join(_galaxy_wd, "galaxy.json"), "w") as fd: | |
231 json.dump(_galaxy_meta_data, fd) | |
232 print("*** Job finished successfully ***") |