comparison detectgrb.py @ 0:c80db1ec6611 draft default tip

planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit 5b1251e9c9cdff7f825bfcfebdfdb12714c13d8f
author astroteam
date Wed, 13 Aug 2025 19:10:53 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:c80db1ec6611
1 #!/usr/bin/env python
2 # coding: utf-8
3
4 #!/usr/bin/env python
5
6 # This script is generated with nb2galaxy
7
8 # flake8: noqa
9
10 import json
11 import os
12 import shutil
13
14 from oda_api.json import CustomJSONEncoder
15
16 T1 = "2023-01-16T04:53:33.9" # http://odahub.io/ontology#StartTime
17 T2 = "2023-01-16T04:55:33.9" # http://odahub.io/ontology#EndTime
18 detection_time_scales = "1,10"
19 lc_time_scale = 0.1 # https://odahub.io/ontology#TimeIntervalSeconds
20 background_age = 10 # oda:TimeIntervalSeconds
21 min_sn = 5 # https://odahub.io/ontology#SignalToNoiseRatio
22
23 _galaxy_wd = os.getcwd()
24
25 with open("inputs.json", "r") as fd:
26 inp_dic = json.load(fd)
27 if "C_data_product_" in inp_dic.keys():
28 inp_pdic = inp_dic["C_data_product_"]
29 else:
30 inp_pdic = inp_dic
31 T1 = str(inp_pdic["T1"])
32 T2 = str(inp_pdic["T2"])
33 detection_time_scales = str(inp_pdic["detection_time_scales"])
34 lc_time_scale = float(inp_pdic["lc_time_scale"])
35 background_age = float(inp_pdic["background_age"])
36 min_sn = int(inp_pdic["min_sn"])
37
38 import numpy as np
39 from astropy.time import Time
40 from matplotlib import pylab as plt
41 from oda_api.api import DispatcherAPI
42 from oda_api.data_products import LightCurveDataProduct, PictureProduct
43
44 disp = DispatcherAPI(
45 url="https://www.astro.unige.ch/mmoda//dispatch-data", instrument="mock"
46 )
47
48 par_dict = {
49 "T1": T1,
50 "T2": T2,
51 "T_format": "isot",
52 "instrument": "spi_acs",
53 "product": "spi_acs_lc",
54 "product_type": "Real",
55 "time_bin": lc_time_scale,
56 "time_bin_format": "sec",
57 }
58
59 data_collection = disp.get_product(**par_dict)
60
61 lc = data_collection.spi_acs_lc_0_query.data_unit[1].data
62
63 lc["TIME"] = (
64 lc["TIME"] / 24 / 3600 + (Time(T1).mjd + Time(T2).mjd) / 2.0
65 ) # TODO: more accurately
66
67 def rebin(x, n):
68 N = int(len(x) / n)
69 return np.array(x[: N * n]).reshape((N, n)).sum(1)
70
71 lc = LightCurveDataProduct.from_arrays(
72 times=Time(lc["TIME"], format="mjd"), fluxes=lc["RATE"], errors=lc["ERROR"]
73 )
74
75 obj_results = [lc.encode()]
76
77 T0_ijd = lc.data_unit[1].data["TIME"][np.argmax(lc.data_unit[1].data["FLUX"])]
78
79 ijd2plot = lambda x: (x - T0_ijd) * 24 * 3600
80
81 m_bkg = ijd2plot(lc.data_unit[1].data["TIME"]) < background_age
82 np.sum(m_bkg)
83 bkg = np.mean(lc.data_unit[1].data["FLUX"][m_bkg])
84 bkg
85
86 from matplotlib import pylab as plt
87
88 plt.figure(figsize=(10, 6))
89
90 plt.errorbar(
91 ijd2plot(lc.data_unit[1].data["TIME"]),
92 lc.data_unit[1].data["FLUX"],
93 lc.data_unit[1].data["ERROR"],
94 label=f"{lc_time_scale} s",
95 alpha=0.5,
96 )
97
98 best_detection = {"sn": None, "timescale": None, "t_max_sn": None}
99
100 for detection_time_scale in detection_time_scales.split(","):
101 n = int(float(detection_time_scale.strip()) / lc_time_scale)
102
103 r_t = rebin(lc.data_unit[1].data["TIME"], n) / n
104 r_c = rebin(lc.data_unit[1].data["FLUX"], n) / n
105 r_ce = rebin(lc.data_unit[1].data["ERROR"] ** 2, n) ** 0.5 / n
106
107 sn = (r_c - bkg) / r_ce
108 imax = sn.argmax()
109
110 if best_detection["sn"] is None or best_detection["sn"] < sn[imax]:
111 best_detection["sn"] = sn[imax]
112 best_detection["timescale"] = detection_time_scale
113 best_detection["t_max_sn"] = Time(r_t[imax], format="mjd").isot
114
115 plt.errorbar(
116 ijd2plot(r_t),
117 r_c,
118 r_ce,
119 label=f"{detection_time_scale} s, S/N = {sn[imax]:.1f}",
120 )
121
122 plt.axhline(bkg, lw=3, ls=":", c="k")
123
124 plt.grid()
125 plt.legend()
126
127 plt.ylabel("Flux")
128 plt.xlabel("Time, MJD")
129
130 plt.title(best_detection)
131
132 plt.savefig("annotated-lc.png")
133
134 Time(r_t, format="mjd").isot
135
136 bin_image = PictureProduct.from_file("annotated-lc.png")
137
138 detection_note = str(dict(best_detection=best_detection))
139
140 import numpy as np
141 from astropy import wcs
142 from astropy.io import fits
143 from matplotlib import pylab as plt
144
145 x, y = np.meshgrid(np.linspace(0, 10, 500), np.linspace(0, 10, 500))
146
147 image = np.exp(-((x - 5) ** 2 + (y - 5) ** 2))
148
149 pixsize = 0.1
150 Npix = 500
151 cdec = 1
152
153 RA = 0
154 DEC = 0
155
156 # Create a new WCS object. The number of axes must be set
157 # from the start
158 w = wcs.WCS(naxis=2)
159
160 # Set up an "Airy's zenithal" projection
161 # Vector properties may be set with Python lists, or Numpy arrays
162 w.wcs.crpix = [image.shape[0] / 2.0, image.shape[1] / 2.0]
163 w.wcs.cdelt = np.array([pixsize / cdec, pixsize])
164 w.wcs.crval = [RA, DEC]
165 w.wcs.ctype = ["RA---CAR", "DEC--CAR"]
166 w.wcs.set_pv([(1, 1, 45.0)])
167
168 # Now, write out the WCS object as a FITS header
169 header = w.to_header()
170
171 # header is an astropy.io.fits.Header object. We can use it to create a new
172 # PrimaryHDU and write it to a file.
173 hdu = fits.PrimaryHDU(image, header=header)
174 hdu.writeto("Image.fits", overwrite=True)
175 hdu = fits.open("Image.fits")
176 im = hdu[0].data
177 from astropy.wcs import WCS
178
179 wcs = WCS(hdu[0].header)
180 plt.subplot(projection=wcs)
181 plt.imshow(im, origin="lower")
182 plt.grid(color="white", ls="solid")
183 plt.xlabel("RA")
184 plt.ylabel("Dec")
185
186 from oda_api.data_products import ImageDataProduct
187
188 fits_image = ImageDataProduct.from_fits_file("Image.fits")
189
190 lc = lc # http://odahub.io/ontology#LightCurve
191
192 detection_comment = detection_note # http://odahub.io/ontology#ODATextProduct
193 image_output = bin_image # http://odahub.io/ontology#ODAPictureProduct
194 image_fits = fits_image # http://odahub.io/ontology#Image
195
196 # output gathering
197 _galaxy_meta_data = {}
198 _oda_outs = []
199 _oda_outs.append(("out_detectgrb_lc", "lc_galaxy.output", lc))
200 _oda_outs.append(
201 (
202 "out_detectgrb_detection_comment",
203 "detection_comment_galaxy.output",
204 detection_comment,
205 )
206 )
207 _oda_outs.append(
208 ("out_detectgrb_image_output", "image_output_galaxy.output", image_output)
209 )
210 _oda_outs.append(
211 ("out_detectgrb_image_fits", "image_fits_galaxy.output", image_fits)
212 )
213
214 for _outn, _outfn, _outv in _oda_outs:
215 _galaxy_outfile_name = os.path.join(_galaxy_wd, _outfn)
216 if isinstance(_outv, str) and os.path.isfile(_outv):
217 shutil.move(_outv, _galaxy_outfile_name)
218 _galaxy_meta_data[_outn] = {"ext": "_sniff_"}
219 elif getattr(_outv, "write_fits_file", None):
220 _outv.write_fits_file(_galaxy_outfile_name)
221 _galaxy_meta_data[_outn] = {"ext": "fits"}
222 elif getattr(_outv, "write_file", None):
223 _outv.write_file(_galaxy_outfile_name)
224 _galaxy_meta_data[_outn] = {"ext": "_sniff_"}
225 else:
226 with open(_galaxy_outfile_name, "w") as fd:
227 json.dump(_outv, fd, cls=CustomJSONEncoder)
228 _galaxy_meta_data[_outn] = {"ext": "json"}
229
230 with open(os.path.join(_galaxy_wd, "galaxy.json"), "w") as fd:
231 json.dump(_galaxy_meta_data, fd)
232 print("*** Job finished successfully ***")