Mercurial > repos > astroteam > grb_detection_astro_tool
diff detectgrb.py @ 0:c80db1ec6611 draft default tip
planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit 5b1251e9c9cdff7f825bfcfebdfdb12714c13d8f
| author | astroteam | 
|---|---|
| date | Wed, 13 Aug 2025 19:10:53 +0000 | 
| parents | |
| children | 
line wrap: on
 line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/detectgrb.py Wed Aug 13 19:10:53 2025 +0000 @@ -0,0 +1,232 @@ +#!/usr/bin/env python +# coding: utf-8 + +#!/usr/bin/env python + +# This script is generated with nb2galaxy + +# flake8: noqa + +import json +import os +import shutil + +from oda_api.json import CustomJSONEncoder + +T1 = "2023-01-16T04:53:33.9" # http://odahub.io/ontology#StartTime +T2 = "2023-01-16T04:55:33.9" # http://odahub.io/ontology#EndTime +detection_time_scales = "1,10" +lc_time_scale = 0.1 # https://odahub.io/ontology#TimeIntervalSeconds +background_age = 10 # oda:TimeIntervalSeconds +min_sn = 5 # https://odahub.io/ontology#SignalToNoiseRatio + +_galaxy_wd = os.getcwd() + +with open("inputs.json", "r") as fd: + inp_dic = json.load(fd) +if "C_data_product_" in inp_dic.keys(): + inp_pdic = inp_dic["C_data_product_"] +else: + inp_pdic = inp_dic +T1 = str(inp_pdic["T1"]) +T2 = str(inp_pdic["T2"]) +detection_time_scales = str(inp_pdic["detection_time_scales"]) +lc_time_scale = float(inp_pdic["lc_time_scale"]) +background_age = float(inp_pdic["background_age"]) +min_sn = int(inp_pdic["min_sn"]) + +import numpy as np +from astropy.time import Time +from matplotlib import pylab as plt +from oda_api.api import DispatcherAPI +from oda_api.data_products import LightCurveDataProduct, PictureProduct + +disp = DispatcherAPI( + url="https://www.astro.unige.ch/mmoda//dispatch-data", instrument="mock" +) + +par_dict = { + "T1": T1, + "T2": T2, + "T_format": "isot", + "instrument": "spi_acs", + "product": "spi_acs_lc", + "product_type": "Real", + "time_bin": lc_time_scale, + "time_bin_format": "sec", +} + +data_collection = disp.get_product(**par_dict) + +lc = data_collection.spi_acs_lc_0_query.data_unit[1].data + +lc["TIME"] = ( + lc["TIME"] / 24 / 3600 + (Time(T1).mjd + Time(T2).mjd) / 2.0 +) # TODO: more accurately + +def rebin(x, n): + N = int(len(x) / n) + return np.array(x[: N * n]).reshape((N, n)).sum(1) + +lc = LightCurveDataProduct.from_arrays( + times=Time(lc["TIME"], format="mjd"), fluxes=lc["RATE"], errors=lc["ERROR"] +) + +obj_results = [lc.encode()] + +T0_ijd = lc.data_unit[1].data["TIME"][np.argmax(lc.data_unit[1].data["FLUX"])] + +ijd2plot = lambda x: (x - T0_ijd) * 24 * 3600 + +m_bkg = ijd2plot(lc.data_unit[1].data["TIME"]) < background_age +np.sum(m_bkg) +bkg = np.mean(lc.data_unit[1].data["FLUX"][m_bkg]) +bkg + +from matplotlib import pylab as plt + +plt.figure(figsize=(10, 6)) + +plt.errorbar( + ijd2plot(lc.data_unit[1].data["TIME"]), + lc.data_unit[1].data["FLUX"], + lc.data_unit[1].data["ERROR"], + label=f"{lc_time_scale} s", + alpha=0.5, +) + +best_detection = {"sn": None, "timescale": None, "t_max_sn": None} + +for detection_time_scale in detection_time_scales.split(","): + n = int(float(detection_time_scale.strip()) / lc_time_scale) + + r_t = rebin(lc.data_unit[1].data["TIME"], n) / n + r_c = rebin(lc.data_unit[1].data["FLUX"], n) / n + r_ce = rebin(lc.data_unit[1].data["ERROR"] ** 2, n) ** 0.5 / n + + sn = (r_c - bkg) / r_ce + imax = sn.argmax() + + if best_detection["sn"] is None or best_detection["sn"] < sn[imax]: + best_detection["sn"] = sn[imax] + best_detection["timescale"] = detection_time_scale + best_detection["t_max_sn"] = Time(r_t[imax], format="mjd").isot + + plt.errorbar( + ijd2plot(r_t), + r_c, + r_ce, + label=f"{detection_time_scale} s, S/N = {sn[imax]:.1f}", + ) + +plt.axhline(bkg, lw=3, ls=":", c="k") + +plt.grid() +plt.legend() + +plt.ylabel("Flux") +plt.xlabel("Time, MJD") + +plt.title(best_detection) + +plt.savefig("annotated-lc.png") + +Time(r_t, format="mjd").isot + +bin_image = PictureProduct.from_file("annotated-lc.png") + +detection_note = str(dict(best_detection=best_detection)) + +import numpy as np +from astropy import wcs +from astropy.io import fits +from matplotlib import pylab as plt + +x, y = np.meshgrid(np.linspace(0, 10, 500), np.linspace(0, 10, 500)) + +image = np.exp(-((x - 5) ** 2 + (y - 5) ** 2)) + +pixsize = 0.1 +Npix = 500 +cdec = 1 + +RA = 0 +DEC = 0 + +# Create a new WCS object. The number of axes must be set +# from the start +w = wcs.WCS(naxis=2) + +# Set up an "Airy's zenithal" projection +# Vector properties may be set with Python lists, or Numpy arrays +w.wcs.crpix = [image.shape[0] / 2.0, image.shape[1] / 2.0] +w.wcs.cdelt = np.array([pixsize / cdec, pixsize]) +w.wcs.crval = [RA, DEC] +w.wcs.ctype = ["RA---CAR", "DEC--CAR"] +w.wcs.set_pv([(1, 1, 45.0)]) + +# Now, write out the WCS object as a FITS header +header = w.to_header() + +# header is an astropy.io.fits.Header object. We can use it to create a new +# PrimaryHDU and write it to a file. +hdu = fits.PrimaryHDU(image, header=header) +hdu.writeto("Image.fits", overwrite=True) +hdu = fits.open("Image.fits") +im = hdu[0].data +from astropy.wcs import WCS + +wcs = WCS(hdu[0].header) +plt.subplot(projection=wcs) +plt.imshow(im, origin="lower") +plt.grid(color="white", ls="solid") +plt.xlabel("RA") +plt.ylabel("Dec") + +from oda_api.data_products import ImageDataProduct + +fits_image = ImageDataProduct.from_fits_file("Image.fits") + +lc = lc # http://odahub.io/ontology#LightCurve + +detection_comment = detection_note # http://odahub.io/ontology#ODATextProduct +image_output = bin_image # http://odahub.io/ontology#ODAPictureProduct +image_fits = fits_image # http://odahub.io/ontology#Image + +# output gathering +_galaxy_meta_data = {} +_oda_outs = [] +_oda_outs.append(("out_detectgrb_lc", "lc_galaxy.output", lc)) +_oda_outs.append( + ( + "out_detectgrb_detection_comment", + "detection_comment_galaxy.output", + detection_comment, + ) +) +_oda_outs.append( + ("out_detectgrb_image_output", "image_output_galaxy.output", image_output) +) +_oda_outs.append( + ("out_detectgrb_image_fits", "image_fits_galaxy.output", image_fits) +) + +for _outn, _outfn, _outv in _oda_outs: + _galaxy_outfile_name = os.path.join(_galaxy_wd, _outfn) + if isinstance(_outv, str) and os.path.isfile(_outv): + shutil.move(_outv, _galaxy_outfile_name) + _galaxy_meta_data[_outn] = {"ext": "_sniff_"} + elif getattr(_outv, "write_fits_file", None): + _outv.write_fits_file(_galaxy_outfile_name) + _galaxy_meta_data[_outn] = {"ext": "fits"} + elif getattr(_outv, "write_file", None): + _outv.write_file(_galaxy_outfile_name) + _galaxy_meta_data[_outn] = {"ext": "_sniff_"} + else: + with open(_galaxy_outfile_name, "w") as fd: + json.dump(_outv, fd, cls=CustomJSONEncoder) + _galaxy_meta_data[_outn] = {"ext": "json"} + +with open(os.path.join(_galaxy_wd, "galaxy.json"), "w") as fd: + json.dump(_galaxy_meta_data, fd) +print("*** Job finished successfully ***")
