Mercurial > repos > astroteam > grb_detection_astro_tool
comparison detectgrb.py @ 0:c80db1ec6611 draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit 5b1251e9c9cdff7f825bfcfebdfdb12714c13d8f
| author | astroteam |
|---|---|
| date | Wed, 13 Aug 2025 19:10:53 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:c80db1ec6611 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 # coding: utf-8 | |
| 3 | |
| 4 #!/usr/bin/env python | |
| 5 | |
| 6 # This script is generated with nb2galaxy | |
| 7 | |
| 8 # flake8: noqa | |
| 9 | |
| 10 import json | |
| 11 import os | |
| 12 import shutil | |
| 13 | |
| 14 from oda_api.json import CustomJSONEncoder | |
| 15 | |
| 16 T1 = "2023-01-16T04:53:33.9" # http://odahub.io/ontology#StartTime | |
| 17 T2 = "2023-01-16T04:55:33.9" # http://odahub.io/ontology#EndTime | |
| 18 detection_time_scales = "1,10" | |
| 19 lc_time_scale = 0.1 # https://odahub.io/ontology#TimeIntervalSeconds | |
| 20 background_age = 10 # oda:TimeIntervalSeconds | |
| 21 min_sn = 5 # https://odahub.io/ontology#SignalToNoiseRatio | |
| 22 | |
| 23 _galaxy_wd = os.getcwd() | |
| 24 | |
| 25 with open("inputs.json", "r") as fd: | |
| 26 inp_dic = json.load(fd) | |
| 27 if "C_data_product_" in inp_dic.keys(): | |
| 28 inp_pdic = inp_dic["C_data_product_"] | |
| 29 else: | |
| 30 inp_pdic = inp_dic | |
| 31 T1 = str(inp_pdic["T1"]) | |
| 32 T2 = str(inp_pdic["T2"]) | |
| 33 detection_time_scales = str(inp_pdic["detection_time_scales"]) | |
| 34 lc_time_scale = float(inp_pdic["lc_time_scale"]) | |
| 35 background_age = float(inp_pdic["background_age"]) | |
| 36 min_sn = int(inp_pdic["min_sn"]) | |
| 37 | |
| 38 import numpy as np | |
| 39 from astropy.time import Time | |
| 40 from matplotlib import pylab as plt | |
| 41 from oda_api.api import DispatcherAPI | |
| 42 from oda_api.data_products import LightCurveDataProduct, PictureProduct | |
| 43 | |
| 44 disp = DispatcherAPI( | |
| 45 url="https://www.astro.unige.ch/mmoda//dispatch-data", instrument="mock" | |
| 46 ) | |
| 47 | |
| 48 par_dict = { | |
| 49 "T1": T1, | |
| 50 "T2": T2, | |
| 51 "T_format": "isot", | |
| 52 "instrument": "spi_acs", | |
| 53 "product": "spi_acs_lc", | |
| 54 "product_type": "Real", | |
| 55 "time_bin": lc_time_scale, | |
| 56 "time_bin_format": "sec", | |
| 57 } | |
| 58 | |
| 59 data_collection = disp.get_product(**par_dict) | |
| 60 | |
| 61 lc = data_collection.spi_acs_lc_0_query.data_unit[1].data | |
| 62 | |
| 63 lc["TIME"] = ( | |
| 64 lc["TIME"] / 24 / 3600 + (Time(T1).mjd + Time(T2).mjd) / 2.0 | |
| 65 ) # TODO: more accurately | |
| 66 | |
| 67 def rebin(x, n): | |
| 68 N = int(len(x) / n) | |
| 69 return np.array(x[: N * n]).reshape((N, n)).sum(1) | |
| 70 | |
| 71 lc = LightCurveDataProduct.from_arrays( | |
| 72 times=Time(lc["TIME"], format="mjd"), fluxes=lc["RATE"], errors=lc["ERROR"] | |
| 73 ) | |
| 74 | |
| 75 obj_results = [lc.encode()] | |
| 76 | |
| 77 T0_ijd = lc.data_unit[1].data["TIME"][np.argmax(lc.data_unit[1].data["FLUX"])] | |
| 78 | |
| 79 ijd2plot = lambda x: (x - T0_ijd) * 24 * 3600 | |
| 80 | |
| 81 m_bkg = ijd2plot(lc.data_unit[1].data["TIME"]) < background_age | |
| 82 np.sum(m_bkg) | |
| 83 bkg = np.mean(lc.data_unit[1].data["FLUX"][m_bkg]) | |
| 84 bkg | |
| 85 | |
| 86 from matplotlib import pylab as plt | |
| 87 | |
| 88 plt.figure(figsize=(10, 6)) | |
| 89 | |
| 90 plt.errorbar( | |
| 91 ijd2plot(lc.data_unit[1].data["TIME"]), | |
| 92 lc.data_unit[1].data["FLUX"], | |
| 93 lc.data_unit[1].data["ERROR"], | |
| 94 label=f"{lc_time_scale} s", | |
| 95 alpha=0.5, | |
| 96 ) | |
| 97 | |
| 98 best_detection = {"sn": None, "timescale": None, "t_max_sn": None} | |
| 99 | |
| 100 for detection_time_scale in detection_time_scales.split(","): | |
| 101 n = int(float(detection_time_scale.strip()) / lc_time_scale) | |
| 102 | |
| 103 r_t = rebin(lc.data_unit[1].data["TIME"], n) / n | |
| 104 r_c = rebin(lc.data_unit[1].data["FLUX"], n) / n | |
| 105 r_ce = rebin(lc.data_unit[1].data["ERROR"] ** 2, n) ** 0.5 / n | |
| 106 | |
| 107 sn = (r_c - bkg) / r_ce | |
| 108 imax = sn.argmax() | |
| 109 | |
| 110 if best_detection["sn"] is None or best_detection["sn"] < sn[imax]: | |
| 111 best_detection["sn"] = sn[imax] | |
| 112 best_detection["timescale"] = detection_time_scale | |
| 113 best_detection["t_max_sn"] = Time(r_t[imax], format="mjd").isot | |
| 114 | |
| 115 plt.errorbar( | |
| 116 ijd2plot(r_t), | |
| 117 r_c, | |
| 118 r_ce, | |
| 119 label=f"{detection_time_scale} s, S/N = {sn[imax]:.1f}", | |
| 120 ) | |
| 121 | |
| 122 plt.axhline(bkg, lw=3, ls=":", c="k") | |
| 123 | |
| 124 plt.grid() | |
| 125 plt.legend() | |
| 126 | |
| 127 plt.ylabel("Flux") | |
| 128 plt.xlabel("Time, MJD") | |
| 129 | |
| 130 plt.title(best_detection) | |
| 131 | |
| 132 plt.savefig("annotated-lc.png") | |
| 133 | |
| 134 Time(r_t, format="mjd").isot | |
| 135 | |
| 136 bin_image = PictureProduct.from_file("annotated-lc.png") | |
| 137 | |
| 138 detection_note = str(dict(best_detection=best_detection)) | |
| 139 | |
| 140 import numpy as np | |
| 141 from astropy import wcs | |
| 142 from astropy.io import fits | |
| 143 from matplotlib import pylab as plt | |
| 144 | |
| 145 x, y = np.meshgrid(np.linspace(0, 10, 500), np.linspace(0, 10, 500)) | |
| 146 | |
| 147 image = np.exp(-((x - 5) ** 2 + (y - 5) ** 2)) | |
| 148 | |
| 149 pixsize = 0.1 | |
| 150 Npix = 500 | |
| 151 cdec = 1 | |
| 152 | |
| 153 RA = 0 | |
| 154 DEC = 0 | |
| 155 | |
| 156 # Create a new WCS object. The number of axes must be set | |
| 157 # from the start | |
| 158 w = wcs.WCS(naxis=2) | |
| 159 | |
| 160 # Set up an "Airy's zenithal" projection | |
| 161 # Vector properties may be set with Python lists, or Numpy arrays | |
| 162 w.wcs.crpix = [image.shape[0] / 2.0, image.shape[1] / 2.0] | |
| 163 w.wcs.cdelt = np.array([pixsize / cdec, pixsize]) | |
| 164 w.wcs.crval = [RA, DEC] | |
| 165 w.wcs.ctype = ["RA---CAR", "DEC--CAR"] | |
| 166 w.wcs.set_pv([(1, 1, 45.0)]) | |
| 167 | |
| 168 # Now, write out the WCS object as a FITS header | |
| 169 header = w.to_header() | |
| 170 | |
| 171 # header is an astropy.io.fits.Header object. We can use it to create a new | |
| 172 # PrimaryHDU and write it to a file. | |
| 173 hdu = fits.PrimaryHDU(image, header=header) | |
| 174 hdu.writeto("Image.fits", overwrite=True) | |
| 175 hdu = fits.open("Image.fits") | |
| 176 im = hdu[0].data | |
| 177 from astropy.wcs import WCS | |
| 178 | |
| 179 wcs = WCS(hdu[0].header) | |
| 180 plt.subplot(projection=wcs) | |
| 181 plt.imshow(im, origin="lower") | |
| 182 plt.grid(color="white", ls="solid") | |
| 183 plt.xlabel("RA") | |
| 184 plt.ylabel("Dec") | |
| 185 | |
| 186 from oda_api.data_products import ImageDataProduct | |
| 187 | |
| 188 fits_image = ImageDataProduct.from_fits_file("Image.fits") | |
| 189 | |
| 190 lc = lc # http://odahub.io/ontology#LightCurve | |
| 191 | |
| 192 detection_comment = detection_note # http://odahub.io/ontology#ODATextProduct | |
| 193 image_output = bin_image # http://odahub.io/ontology#ODAPictureProduct | |
| 194 image_fits = fits_image # http://odahub.io/ontology#Image | |
| 195 | |
| 196 # output gathering | |
| 197 _galaxy_meta_data = {} | |
| 198 _oda_outs = [] | |
| 199 _oda_outs.append(("out_detectgrb_lc", "lc_galaxy.output", lc)) | |
| 200 _oda_outs.append( | |
| 201 ( | |
| 202 "out_detectgrb_detection_comment", | |
| 203 "detection_comment_galaxy.output", | |
| 204 detection_comment, | |
| 205 ) | |
| 206 ) | |
| 207 _oda_outs.append( | |
| 208 ("out_detectgrb_image_output", "image_output_galaxy.output", image_output) | |
| 209 ) | |
| 210 _oda_outs.append( | |
| 211 ("out_detectgrb_image_fits", "image_fits_galaxy.output", image_fits) | |
| 212 ) | |
| 213 | |
| 214 for _outn, _outfn, _outv in _oda_outs: | |
| 215 _galaxy_outfile_name = os.path.join(_galaxy_wd, _outfn) | |
| 216 if isinstance(_outv, str) and os.path.isfile(_outv): | |
| 217 shutil.move(_outv, _galaxy_outfile_name) | |
| 218 _galaxy_meta_data[_outn] = {"ext": "_sniff_"} | |
| 219 elif getattr(_outv, "write_fits_file", None): | |
| 220 _outv.write_fits_file(_galaxy_outfile_name) | |
| 221 _galaxy_meta_data[_outn] = {"ext": "fits"} | |
| 222 elif getattr(_outv, "write_file", None): | |
| 223 _outv.write_file(_galaxy_outfile_name) | |
| 224 _galaxy_meta_data[_outn] = {"ext": "_sniff_"} | |
| 225 else: | |
| 226 with open(_galaxy_outfile_name, "w") as fd: | |
| 227 json.dump(_outv, fd, cls=CustomJSONEncoder) | |
| 228 _galaxy_meta_data[_outn] = {"ext": "json"} | |
| 229 | |
| 230 with open(os.path.join(_galaxy_wd, "galaxy.json"), "w") as fd: | |
| 231 json.dump(_galaxy_meta_data, fd) | |
| 232 print("*** Job finished successfully ***") |
