view Lightcurve.py @ 0:02e4bb4fa10c draft

planemo upload for repository https://github.com/esg-epfl-apc/tools-astro/tree/main/tools commit 2991f65b25d4e6d1b69458603fce917adff40f94
author astroteam
date Mon, 19 Feb 2024 10:56:44 +0000
parents
children 593c4b45eda5
line wrap: on
line source

#!/usr/bin/env python
# coding: utf-8

# flake8: noqa

import json
import os
import shutil

import matplotlib.pyplot as plt
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.time import Time
from numpy import sqrt
from oda_api.data_products import ODAAstropyTable, PictureProduct
from oda_api.json import CustomJSONEncoder

if os.path.exists("hess_dl3_dr1.tar.gz") == False:
    get_ipython().system(   # noqa: F821
        "wget https://zenodo.org/record/1421099/files/hess_dl3_dr1.tar.gz"
    )
    get_ipython().system("tar -zxvf hess_dl3_dr1.tar.gz")   # noqa: F821

src_name = "Crab"  # http://odahub.io/ontology#AstrophysicalObject
RA = 83.628700  # http://odahub.io/ontology#PointOfInterestRA
DEC = 22.014700  # http://odahub.io/ontology#PointOfInterestDEC
T1 = "2003-10-09T13:16:00.0"  # http://odahub.io/ontology#StartTime
T2 = "2005-10-10T13:16:00.0"  # http://odahub.io/ontology#EndTime
Radius = 2.5  # http://odahub.io/ontology#AngleDegrees
R_s = 0.2  # http://odahub.io/ontology#AngleDegrees
Emin = 100.0  # http://odahub.io/ontology#Energy_GeV
Emax = 10000.0  # http://odahub.io/ontology#Energy_GeV
NTbins = 10  # http://odahub.io/ontology#Integer

_galaxy_wd = os.getcwd()

with open("inputs.json", "r") as fd:
    inp_dic = json.load(fd)
if "_data_product" in inp_dic.keys():
    inp_pdic = inp_dic["_data_product"]
else:
    inp_pdic = inp_dic

for vn, vv in inp_pdic.items():
    if vn != "_selector":
        globals()[vn] = type(globals()[vn])(vv)

T1 = Time(T1, format="isot", scale="utc").mjd
T2 = Time(T2, format="isot", scale="utc").mjd
message = ""
RA_pnts = []
DEC_pnts = []
DL3_files = []
OBSIDs = []
Tstart = []
Tstop = []
flist = os.listdir("data")
for f in flist:
    if f[-7:] == "fits.gz":
        DL3_files.append(f)
        OBSIDs.append(int(f[20:26]))
        hdul = fits.open("data/" + f)
        RA_pnts.append(float(hdul[1].header["RA_PNT"]))
        DEC_pnts.append(float(hdul[1].header["DEC_PNT"]))
        Tstart.append(
            Time(
                hdul[1].header["DATE-OBS"] + "T" + hdul[1].header["TIME-OBS"],
                format="isot",
                scale="utc",
            ).mjd
        )
        Tstop.append(
            Time(
                hdul[1].header["DATE-END"] + "T" + hdul[1].header["TIME-END"],
                format="isot",
                scale="utc",
            ).mjd
        )
        hdul.close()

Coords_s = SkyCoord(RA, DEC, unit="degree")
COORDS_pnts = SkyCoord(RA_pnts, DEC_pnts, unit="degree")
seps = COORDS_pnts.separation(Coords_s).deg

mask = np.where((seps < Radius) & (Tstart > T1) & (Tstop < T2))[0]
OBSlist = []
Tbegs = []
for i in mask:
    OBSlist.append(DL3_files[i])
    Tbegs.append(Tstart[i])
if len(OBSlist) == 0:
    message = "No data found"
    raise RuntimeError("No data found")
message

Tbins = np.linspace(T1, T2, NTbins + 1)
Tmin = Tbins[:-1]
Tmax = Tbins[1:]
Tmean = (Tmin + Tmax) / 2.0
Tbins

flux = np.zeros(NTbins)
flux_err = np.zeros(NTbins)
flux_b = np.zeros(NTbins)
flux_b_err = np.zeros(NTbins)
Expos = np.zeros(NTbins)
for count, f in enumerate(OBSlist):
    hdul = fits.open("data/" + f)
    RA_pnt = hdul[1].header["RA_PNT"]
    DEC_pnt = hdul[1].header["DEC_PNT"]
    Texp = hdul[1].header["LIVETIME"]
    Trun_start = hdul[1].header["TSTART"]
    dRA = RA - RA_pnt
    dDEC = DEC - DEC_pnt
    RA_b = RA_pnt - dRA
    DEC_b = DEC_pnt - dDEC
    Coords_b = SkyCoord(RA_b, DEC_b, unit="degree")
    Coords_pnt = SkyCoord(RA_pnt, DEC_pnt, unit="degree")
    dist = Coords_pnt.separation(Coords_s).deg

    ev = hdul["EVENTS"].data
    ev_ra = ev["RA"]
    ev_dec = ev["DEC"]
    ev_en = ev["ENERGY"]
    ev_time = (ev["TIME"] - Trun_start) / 86400.0 + Tbegs[count]
    print(ev_time[0])
    ev_coords = SkyCoord(ev_ra, ev_dec, unit="degree")
    sep_s = ev_coords.separation(Coords_s).deg
    sep_b = ev_coords.separation(Coords_b).deg

    hdu = hdul["AEFF"].data
    EEmin = hdu["ENERG_LO"][0]
    EEmax = hdu["ENERG_HI"][0]
    EE = sqrt(EEmin * EEmax)
    EEbins = np.concatenate((EEmin, [EEmax[-1]]))
    AA = hdu["EFFAREA"][0] + 1e-10
    Thmin = hdu["THETA_LO"][0]
    Thmax = hdu["THETA_HI"][0]
    ind = np.argmin((Thmin - dist) ** 2)
    AA = AA[ind] * Texp * 1e4
    mask = np.where((sep_s < R_s))
    cts1 = np.histogram2d(ev_time[mask], ev_en[mask], bins=[Tbins, EEbins])[0]
    mask = sep_b < R_s
    cts2 = np.histogram2d(ev_time[mask], ev_en[mask], bins=[Tbins, EEbins])[0]
    src = cts1 - cts2
    src_err = sqrt(cts1 + cts2)
    flux += np.sum(src / AA, axis=1)
    flux_err += np.sum(src_err / AA, axis=1)
    flux_b += np.sum(cts2 / AA, axis=1)
    flux_b_err += np.sum(sqrt(cts2) / AA, axis=1)
    hdul.close()

if message == "":
    plt.errorbar(
        Tmean,
        flux,
        yerr=flux_err,
        xerr=[Tmean - Tmin, Tmax - Tmean],
        linestyle="none",
        label="source",
    )
    plt.errorbar(
        Tmean,
        flux_b,
        yerr=flux_b_err,
        xerr=[Tmean - Tmin, Tmax - Tmean],
        linestyle="none",
        label="background",
    )
    plt.xlabel("Time, MJD")
    plt.ylabel("Flux, cts/cm$^2$s")
    plt.yscale("log")
    ymin = min(min(flux - flux_err), min(flux_b - flux_b_err))
    ymax = max(max(flux + flux_err), max(flux_b + flux_b_err))
    plt.ylim(ymin / 2.0, 2 * ymax)
    plt.legend(loc="lower left")
    plt.savefig("Lightcurve.png", format="png")

if message == "":
    bin_image = PictureProduct.from_file("Lightcurve.png")
    from astropy.table import Table

    data = [Tmean, Tmin, Tmax, flux, flux_err, flux_b, flux_b_err]
    names = (
        "Tmean[MJD]",
        "Tmin[MJD]",
        "Tmax[MJD]",
        "Flux[counts/cm2s]",
        "Flux_error[counts/cm2s]",
        "Background[counts/cm2s]",
        "Background_error[counts/cm2s]",
    )
    lc = ODAAstropyTable(Table(data, names=names))

picture = bin_image  # http://odahub.io/ontology#ODAPictureProduct
lightcurve_astropy_table = lc  # http://odahub.io/ontology#ODAAstropyTable

# output gathering
_galaxy_meta_data = {}
_oda_outs = []
_oda_outs.append(("out_Lightcurve_picture", "picture_galaxy.output", picture))
_oda_outs.append(
    (
        "out_Lightcurve_lightcurve_astropy_table",
        "lightcurve_astropy_table_galaxy.output",
        lightcurve_astropy_table,
    )
)

for _outn, _outfn, _outv in _oda_outs:
    _galaxy_outfile_name = os.path.join(_galaxy_wd, _outfn)
    if isinstance(_outv, str) and os.path.isfile(_outv):
        shutil.move(_outv, _galaxy_outfile_name)
        _galaxy_meta_data[_outn] = {"ext": "_sniff_"}
    elif getattr(_outv, "write_fits_file", None):
        _outv.write_fits_file(_galaxy_outfile_name)
        _galaxy_meta_data[_outn] = {"ext": "fits"}
    elif getattr(_outv, "write_file", None):
        _outv.write_file(_galaxy_outfile_name)
        _galaxy_meta_data[_outn] = {"ext": "_sniff_"}
    else:
        with open(_galaxy_outfile_name, "w") as fd:
            json.dump(_outv, fd, cls=CustomJSONEncoder)
        _galaxy_meta_data[_outn] = {"ext": "json"}

with open(os.path.join(_galaxy_wd, "galaxy.json"), "w") as fd:
    json.dump(_galaxy_meta_data, fd)
print("*** Job finished successfully ***")