view detectgrb.py @ 0:c80db1ec6611 draft default tip

planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit 5b1251e9c9cdff7f825bfcfebdfdb12714c13d8f
author astroteam
date Wed, 13 Aug 2025 19:10:53 +0000
parents
children
line wrap: on
line source

#!/usr/bin/env python
# coding: utf-8

#!/usr/bin/env python

# This script is generated with nb2galaxy

# flake8: noqa

import json
import os
import shutil

from oda_api.json import CustomJSONEncoder

T1 = "2023-01-16T04:53:33.9"  # http://odahub.io/ontology#StartTime
T2 = "2023-01-16T04:55:33.9"  # http://odahub.io/ontology#EndTime
detection_time_scales = "1,10"
lc_time_scale = 0.1  # https://odahub.io/ontology#TimeIntervalSeconds
background_age = 10  # oda:TimeIntervalSeconds
min_sn = 5  # https://odahub.io/ontology#SignalToNoiseRatio

_galaxy_wd = os.getcwd()

with open("inputs.json", "r") as fd:
    inp_dic = json.load(fd)
if "C_data_product_" in inp_dic.keys():
    inp_pdic = inp_dic["C_data_product_"]
else:
    inp_pdic = inp_dic
T1 = str(inp_pdic["T1"])
T2 = str(inp_pdic["T2"])
detection_time_scales = str(inp_pdic["detection_time_scales"])
lc_time_scale = float(inp_pdic["lc_time_scale"])
background_age = float(inp_pdic["background_age"])
min_sn = int(inp_pdic["min_sn"])

import numpy as np
from astropy.time import Time
from matplotlib import pylab as plt
from oda_api.api import DispatcherAPI
from oda_api.data_products import LightCurveDataProduct, PictureProduct

disp = DispatcherAPI(
    url="https://www.astro.unige.ch/mmoda//dispatch-data", instrument="mock"
)

par_dict = {
    "T1": T1,
    "T2": T2,
    "T_format": "isot",
    "instrument": "spi_acs",
    "product": "spi_acs_lc",
    "product_type": "Real",
    "time_bin": lc_time_scale,
    "time_bin_format": "sec",
}

data_collection = disp.get_product(**par_dict)

lc = data_collection.spi_acs_lc_0_query.data_unit[1].data

lc["TIME"] = (
    lc["TIME"] / 24 / 3600 + (Time(T1).mjd + Time(T2).mjd) / 2.0
)  # TODO: more accurately

def rebin(x, n):
    N = int(len(x) / n)
    return np.array(x[: N * n]).reshape((N, n)).sum(1)

lc = LightCurveDataProduct.from_arrays(
    times=Time(lc["TIME"], format="mjd"), fluxes=lc["RATE"], errors=lc["ERROR"]
)

obj_results = [lc.encode()]

T0_ijd = lc.data_unit[1].data["TIME"][np.argmax(lc.data_unit[1].data["FLUX"])]

ijd2plot = lambda x: (x - T0_ijd) * 24 * 3600

m_bkg = ijd2plot(lc.data_unit[1].data["TIME"]) < background_age
np.sum(m_bkg)
bkg = np.mean(lc.data_unit[1].data["FLUX"][m_bkg])
bkg

from matplotlib import pylab as plt

plt.figure(figsize=(10, 6))

plt.errorbar(
    ijd2plot(lc.data_unit[1].data["TIME"]),
    lc.data_unit[1].data["FLUX"],
    lc.data_unit[1].data["ERROR"],
    label=f"{lc_time_scale} s",
    alpha=0.5,
)

best_detection = {"sn": None, "timescale": None, "t_max_sn": None}

for detection_time_scale in detection_time_scales.split(","):
    n = int(float(detection_time_scale.strip()) / lc_time_scale)

    r_t = rebin(lc.data_unit[1].data["TIME"], n) / n
    r_c = rebin(lc.data_unit[1].data["FLUX"], n) / n
    r_ce = rebin(lc.data_unit[1].data["ERROR"] ** 2, n) ** 0.5 / n

    sn = (r_c - bkg) / r_ce
    imax = sn.argmax()

    if best_detection["sn"] is None or best_detection["sn"] < sn[imax]:
        best_detection["sn"] = sn[imax]
        best_detection["timescale"] = detection_time_scale
        best_detection["t_max_sn"] = Time(r_t[imax], format="mjd").isot

    plt.errorbar(
        ijd2plot(r_t),
        r_c,
        r_ce,
        label=f"{detection_time_scale} s, S/N = {sn[imax]:.1f}",
    )

plt.axhline(bkg, lw=3, ls=":", c="k")

plt.grid()
plt.legend()

plt.ylabel("Flux")
plt.xlabel("Time, MJD")

plt.title(best_detection)

plt.savefig("annotated-lc.png")

Time(r_t, format="mjd").isot

bin_image = PictureProduct.from_file("annotated-lc.png")

detection_note = str(dict(best_detection=best_detection))

import numpy as np
from astropy import wcs
from astropy.io import fits
from matplotlib import pylab as plt

x, y = np.meshgrid(np.linspace(0, 10, 500), np.linspace(0, 10, 500))

image = np.exp(-((x - 5) ** 2 + (y - 5) ** 2))

pixsize = 0.1
Npix = 500
cdec = 1

RA = 0
DEC = 0

# Create a new WCS object.  The number of axes must be set
# from the start
w = wcs.WCS(naxis=2)

# Set up an "Airy's zenithal" projection
# Vector properties may be set with Python lists, or Numpy arrays
w.wcs.crpix = [image.shape[0] / 2.0, image.shape[1] / 2.0]
w.wcs.cdelt = np.array([pixsize / cdec, pixsize])
w.wcs.crval = [RA, DEC]
w.wcs.ctype = ["RA---CAR", "DEC--CAR"]
w.wcs.set_pv([(1, 1, 45.0)])

# Now, write out the WCS object as a FITS header
header = w.to_header()

# header is an astropy.io.fits.Header object.  We can use it to create a new
# PrimaryHDU and write it to a file.
hdu = fits.PrimaryHDU(image, header=header)
hdu.writeto("Image.fits", overwrite=True)
hdu = fits.open("Image.fits")
im = hdu[0].data
from astropy.wcs import WCS

wcs = WCS(hdu[0].header)
plt.subplot(projection=wcs)
plt.imshow(im, origin="lower")
plt.grid(color="white", ls="solid")
plt.xlabel("RA")
plt.ylabel("Dec")

from oda_api.data_products import ImageDataProduct

fits_image = ImageDataProduct.from_fits_file("Image.fits")

lc = lc  # http://odahub.io/ontology#LightCurve

detection_comment = detection_note  # http://odahub.io/ontology#ODATextProduct
image_output = bin_image  # http://odahub.io/ontology#ODAPictureProduct
image_fits = fits_image  # http://odahub.io/ontology#Image

# output gathering
_galaxy_meta_data = {}
_oda_outs = []
_oda_outs.append(("out_detectgrb_lc", "lc_galaxy.output", lc))
_oda_outs.append(
    (
        "out_detectgrb_detection_comment",
        "detection_comment_galaxy.output",
        detection_comment,
    )
)
_oda_outs.append(
    ("out_detectgrb_image_output", "image_output_galaxy.output", image_output)
)
_oda_outs.append(
    ("out_detectgrb_image_fits", "image_fits_galaxy.output", image_fits)
)

for _outn, _outfn, _outv in _oda_outs:
    _galaxy_outfile_name = os.path.join(_galaxy_wd, _outfn)
    if isinstance(_outv, str) and os.path.isfile(_outv):
        shutil.move(_outv, _galaxy_outfile_name)
        _galaxy_meta_data[_outn] = {"ext": "_sniff_"}
    elif getattr(_outv, "write_fits_file", None):
        _outv.write_fits_file(_galaxy_outfile_name)
        _galaxy_meta_data[_outn] = {"ext": "fits"}
    elif getattr(_outv, "write_file", None):
        _outv.write_file(_galaxy_outfile_name)
        _galaxy_meta_data[_outn] = {"ext": "_sniff_"}
    else:
        with open(_galaxy_outfile_name, "w") as fd:
            json.dump(_outv, fd, cls=CustomJSONEncoder)
        _galaxy_meta_data[_outn] = {"ext": "json"}

with open(os.path.join(_galaxy_wd, "galaxy.json"), "w") as fd:
    json.dump(_galaxy_meta_data, fd)
print("*** Job finished successfully ***")