view ui/egapx.py @ 8:1680e72e27be draft default tip

planemo upload for repository https://github.com/ncbi/egapx commit bdbe05027c2c40e217a2ff0c9e0556450c443e54
author fubar
date Mon, 05 Aug 2024 03:56:41 +0000
parents d9c5c5b87fec
children
line wrap: on
line source

#!/usr/bin/env python
# requires pip install pyyaml
#
import shlex
import shutil
import sys
import os
import argparse
import subprocess
import tempfile
import re
import time
import datetime
from collections import defaultdict
from ftplib import FTP
from ftplib import FTP_TLS
import ftplib
from pathlib import Path
from typing import List
from urllib.request import urlopen
import json
import sqlite3
import stat

import yaml

VERBOSITY_DEFAULT=0
VERBOSITY_QUIET=-1
VERBOSITY_VERBOSE=1

FTP_EGAP_PROTOCOL = "https"
FTP_EGAP_SERVER = "ftp.ncbi.nlm.nih.gov"
FTP_EGAP_ROOT_PATH = "genomes/TOOLS/EGAP/support_data"
FTP_EGAP_ROOT = f"{FTP_EGAP_PROTOCOL}://{FTP_EGAP_SERVER}/{FTP_EGAP_ROOT_PATH}"
DATA_VERSION = "current"
dataset_taxonomy_url = "https://api.ncbi.nlm.nih.gov/datasets/v2alpha/taxonomy/taxon/"

user_cache_dir = ''

def parse_args(argv):
    parser = argparse.ArgumentParser(description="Main script for EGAPx")
    group = parser.add_argument_group('run')
    group.add_argument("filename", nargs='?', help="YAML file with input: section with at least genome: and reads: parameters")
    group.add_argument("-o", "--output", help="Output path", default="")
    parser.add_argument("-e", "--executor", help="Nextflow executor, one of docker, singularity, aws, or local (for NCBI internal use only). Uses corresponding Nextflow config file", default="local")
    parser.add_argument("-c", "--config-dir", help="Directory for executor config files, default is ./egapx_config. Can be also set as env EGAPX_CONFIG_DIR", default="")
    parser.add_argument("-w", "--workdir", help="Working directory for cloud executor", default="")
    parser.add_argument("-r", "--report", help="Report file prefix for report (.report.html) and timeline (.timeline.html) files, default is in output directory", default="")
    parser.add_argument("-n", "--dry-run", action="store_true", default=False)
    parser.add_argument("-st", "--stub-run", action="store_true", default=False)
    parser.add_argument("-so", "--summary-only", help="Print result statistics only if available, do not compute result", action="store_true", default=False)
    group = parser.add_argument_group('download')
    group.add_argument("-dl", "--download-only", help="Download external files to local storage, so that future runs can be isolated", action="store_true", default=False)
    parser.add_argument("-lc", "--local-cache", help="Where to store the downloaded files", default="")
    parser.add_argument("-q", "--quiet", dest='verbosity', action='store_const', const=VERBOSITY_QUIET, default=VERBOSITY_DEFAULT)
    parser.add_argument("-v", "--verbose", dest='verbosity', action='store_const', const=VERBOSITY_VERBOSE, default=VERBOSITY_DEFAULT)
    parser.add_argument("-fn", "--func_name", help="func_name", default="")
    return parser.parse_args(argv[1:])


class FtpDownloader:
    def __init__(self):
        self.ftp = None

    def connect(self, host):
        self.host = host
        self.reconnect() 

    def reconnect(self):
        self.ftp = FTP(self.host)
        self.ftp.login()
        self.ftp.set_debuglevel(0)
       
    ##ftp_types = set()
    def download_ftp_file(self, ftp_name, local_path):
        # print(f"file: {ftp_name}")
        # print(f"f: { os.path.dirname(local_path)}")

        os.makedirs(os.path.dirname(local_path), exist_ok=True)
        try:
            with open(local_path, 'wb') as f:
                self.ftp.retrbinary("RETR {0}".format(ftp_name), f.write)
            # print("downloaded: {0}".format(local_path))
            return True
        except FileNotFoundError:
            print("FAILED FNF: {0}".format(local_path))
        except EOFError:
            print("FAILED EOF: {0}".format(local_path))
            time.sleep(1)
            self.reconnect()
            print("retrying...")
            r = self.download_ftp_file(ftp_name, local_path)
            print("downloaded on retry: {0}".format(local_path))
            return r
        except BrokenPipeError:
            print("FAILED BP: {0}".format(local_path))
        except IsADirectoryError:
            ## same as 550 but pre-exists
            ## os.remove(local_path)
            return 550
        except ftplib.error_perm:
            ## ftplib.error_perm: 550 genomes/TOOLS/EGAP/ortholog_references/9606/current: Not a regular file
            ## its a symlink to a dir.
            os.remove(local_path)
            return 550
        return False

    # item: ('Eublepharis_macularius', {'modify': '20240125225739', 'perm': 'fle', 'size': '4096', 'type': 'dir', 'unique': '6CU599079F', 'unix.group': '562', 'unix.mode': '0444', 'unix.owner': '14'}
    def should_download_file(self, ftp_item, local_name):
        metadata = ftp_item[1]
        ftp_modify = datetime.datetime.strptime(metadata['modify'], '%Y%m%d%H%M%S')
        ftp_size = int(metadata['size'])
        ftp_type = metadata['type']

        local_stat = []
        try:
            local_stat = os.stat(local_name)
        except FileNotFoundError:
            return True
        except NotADirectoryError:
            return True
        local_is_dir = stat.S_ISDIR(local_stat.st_mode)
        #print(f"item: {ftp_item}")
        #print(f"stat: {local_stat}") 
 
        local_stat_dt = datetime.datetime.fromtimestamp(local_stat.st_mtime)

        #print(f"should_dl: {ftp_size != local_stat.st_size}  {ftp_modify > local_stat_dt}  ")

        if (ftp_type == 'file' and ftp_size != local_stat.st_size) or (ftp_type=='OS.unix=symlink' and ftp_size >= local_stat.st_size):
            return True

        if ftp_modify > local_stat_dt:
            return True

        return False

    ## types
    ## cdir and pdir: . and ..: do nothing
    ## file: a file
    ## dir: a dir
    ## OS.unix=symlink: symlink to a file, treat like a file
    def download_ftp_dir(self, ftp_path, local_path):
        """ replicates a directory on an ftp server recursively """
        for ftp_item in self.ftp.mlsd(ftp_path):
            # print(f"ftp_item: {ftp_item}")
            ##print(f"  name: {ftp_item[0]}")
            ##print(f"  type: {ftp_item[1]['type']}")
            name = ftp_item[0]
            next_ftp_name="/".join([ftp_path,name])
            next_local_name=os.sep.join([local_path,name])
            ftp_type = ftp_item[1]['type']
            ##ftp_types.add(ftp_type)
            if ftp_type=='dir':
                self.download_ftp_dir(next_ftp_name, next_local_name)
            elif ftp_type=='file' or ftp_type=='OS.unix=symlink':
                if self.should_download_file(ftp_item, next_local_name):
                    r = self.download_ftp_file(next_ftp_name, next_local_name)
                    if r == 550:
                        self.download_ftp_dir(next_ftp_name, next_local_name)
                    ##time.sleep(0.1)
            ## else: . or .. do nothing


def download_egapx_ftp_data(local_cache_dir):
    global user_cache_dir
    manifest_url = f"{FTP_EGAP_ROOT}/{DATA_VERSION}.mft"
    manifest = urlopen(manifest_url)
    manifest_path = f"{user_cache_dir}/{DATA_VERSION}.mft"
    manifest_list = []
    for line in manifest:
        line = line.decode("utf-8").strip()
        if not line or line[0] == '#':
            continue
        manifest_list.append(line)
        print(f"Downloading {line}")
        ftpd = FtpDownloader()
        ftpd.connect(FTP_EGAP_SERVER)
        ftpd.download_ftp_dir(FTP_EGAP_ROOT_PATH+f"/{line}", f"{local_cache_dir}/{line}")
    if user_cache_dir:
        with open(manifest_path, 'wt') as f:
            for line in manifest_list:
                f.write(f"{line}\n")
    return 0


def repackage_inputs(run_inputs):
    "Repackage input parameters into 'input' key if not already there"
    if 'input' in run_inputs:
        return run_inputs
    new_inputs = {}
    new_inputs['input'] = {}
    for key in run_inputs:
        if key not in { 'tasks', 'output' }:
            new_inputs['input'][key] = run_inputs[key]
        else:
            new_inputs[key] = run_inputs[key]
    return new_inputs


def convert_value(value):
    "Convert paths to absolute paths when possible, look into objects and lists"
    if isinstance(value, dict):
        return {k: convert_value(v) for k, v in value.items()}
    elif isinstance(value, list):
        return [convert_value(v) for v in value]
    else:
        if value == '' or re.match(r'[a-z0-9]{2,5}://', value) or not os.path.exists(value):
            # do not convert - this is a URL or empty string or non-file
            return value
        # convert to absolute path
        return str(Path(value).absolute())


path_inputs = { 'genome', 'hmm', 'softmask', 'reads_metadata', 'organelles', 'proteins', 'reads', 'rnaseq_alignments', 'protein_alignments' }
def convert_paths(run_inputs):
    "Convert paths to absolute paths where appropriate"
    input_root = run_inputs['input']
    for key in input_root:
        if key in path_inputs:
            if isinstance(input_root[key], list):
                input_root[key] = [convert_value(v) for v in input_root[key]]
            else:
                input_root[key] = convert_value(input_root[key])
    if 'output' in run_inputs:
        run_inputs['output'] = convert_value(run_inputs['output'])


def prepare_reads(run_inputs):
    """Reformat reads input to be in 'fromPairs' format expected by egapx, i.e. [sample_id, [read1, read2]]
    Generate reads metadata file with minimal information - paired/unpaired and valid for existing libraries"""
    if 'reads' not in run_inputs['input'] or 'output' not in run_inputs:
        return
    prefixes = defaultdict(list)
    reads = run_inputs['input']['reads']
    if type(reads) == str:
        del run_inputs['input']['reads']
        run_inputs['input']['reads_query'] = reads
        return
    # Create fake metadata file for reads containing only one meaningful information piece - pairedness
    # Have an entry in this file only for SRA runs - files with prefixes SRR, ERR, and DRR and digits
    # Non-SRA reads are handled by rnaseq_collapse internally
    has_files = False
    for rf in run_inputs['input']['reads']:
        if type(rf) == str:
            name = Path(rf).parts[-1]
            mo = re.match(r'([^._]+)', name)
            if mo and mo.group(1) != name:
                has_files = True
                prefixes[mo.group(1)].append(rf)
        elif type(rf) == list:
            has_files = True
            names = list(map(lambda x: re.match(r'([^.]+)', Path(x).parts[-1]).group(1), rf))
            # Find common prefix
            names.sort()
            if len(names) == 1:
                sample_id = names[0]
            else:
                for i in range(min(len(names[0]), len(names[-1]))):
                    if names[0][i] != names[-1][i]:
                        break
                sample_id = names[0][0:i]
            # Dont end prefix with . or _
            i = len(sample_id) - 1
            while i > 0 and sample_id[-1] in "._":
                sample_id = sample_id[:-1]
                i -= 1
            prefixes[sample_id] = rf
    if has_files: # len(prefixes):
        # Always create metadata file even if it's empty
        output = run_inputs['output']
        with tempfile.NamedTemporaryFile(mode='w', delete=False, dir=output, prefix='egapx_reads_metadata_', suffix='.tsv') as f:
            for k, v in prefixes.items():
                if re.fullmatch(r'([DES]RR[0-9]+)', k):
                    paired = 'paired' if len(v) == 2 else 'unpaired'
                    # SRR9005248	NA	paired	2	2	NA	NA	NA	NA	NA	NA	NA	0
                    rec = "\t".join([k, 'NA', paired, '2', '2', 'NA', 'NA', 'NA', 'NA', 'NA', 'NA', 'NA', '0'])
                    f.write(rec + '\n')
            f.flush()
            run_inputs['input']['reads_metadata'] = f.name
        run_inputs['input']['reads'] = [ [k, v] for k, v in prefixes.items() ]
    elif reads:
        del run_inputs['input']['reads']
        run_inputs['input']['reads_query'] = "[Accession] OR ".join(reads) + "[Accession]"


def expand_and_validate_params(run_inputs):
    """ Expand implicit parameters and validate inputs
    Args:
        run_inputs (dict): Input parameters
    Returns:
        bool: True if parameters are valid, False otherwise
    """
    inputs = run_inputs['input']

    if 'taxid' not in inputs:
        print("ERROR: Missing parameter: 'taxid'")
        return False

    taxid = inputs['taxid']

    if 'genome' not in inputs:
        print("ERROR: Missing parameter: 'genome'")
        return False

    # Check for proteins input and if empty or no input at all, add closest protein bag
    if 'proteins' not in inputs:
        proteins = get_closest_protein_bag(taxid)
        if not proteins:
            # Proteins are not specified explicitly and not found by taxid
            print(f"ERROR: Proteins are not found for tax id {inputs['taxid']}")
            print("  This organism is not supported by current NCBI protein database")
            print("  You can specify proteins manually using 'proteins' parameter")
            return False
        inputs['proteins'] = proteins

    has_rnaseq = 'reads' in inputs or 'reads_ids' in inputs or 'reads_query' in inputs
    if not has_rnaseq:
        if inputs['proteins']:
            print("WARNING: It is strongly advised to specify RNA-seq reads using 'reads' parameter\n")
        else:
            print("ERROR: Either proteins or RNA-seq reads should be provided for annotation")
            return False

    if 'hmm' not in inputs:
        best_taxid, best_hmm = get_closest_hmm(taxid)
        inputs['hmm'] = best_hmm
        inputs['hmm_taxid'] = best_taxid
    else:
        # We assume the user knows what they're doing and the training is not necessary
        inputs['hmm_taxid'] = taxid

    if 'max_intron' not in inputs:
        max_intron, genome_size_threshold = get_max_intron(taxid)
        inputs['max_intron'] = max_intron
        inputs['genome_size_threshold'] = genome_size_threshold
    else:
        # Given max_intron is a hard limit, no further calculation is necessary
        inputs['genome_size_threshold'] = 0

    return True


def manage_workdir(args):
    workdir_file = f"work_dir_{args.executor}.last"
    if args.workdir:
        os.environ['NXF_WORK'] = args.workdir
        with open(workdir_file, 'w') as f:
            f.write(args.workdir)
    else:
        if os.path.exists(workdir_file):
            with open(workdir_file) as f:
                os.environ['NXF_WORK'] = f.read().strip()
        else:
            if args.executor == 'aws':
                print("Work directory not set, use -w at least once")
                return False
    return True


def get_cache_dir():
    global user_cache_dir
    if user_cache_dir and os.path.exists(user_cache_dir):
        return user_cache_dir
    ##elif os.path.exists("/am/ftp-genomes/TOOLS/EGAP"):
    ##    return "/am/ftp-genomes/TOOLS/EGAP"
    return ""


data_version_cache = {}
def get_versioned_path(subsystem, filename):
    global data_version_cache
    global user_cache_dir
    if not data_version_cache:
        manifest_path = f"{user_cache_dir}/{DATA_VERSION}.mft"
        if user_cache_dir and os.path.exists(manifest_path):
            with open(manifest_path, 'rt') as f:
                for line in f:
                    line = line.strip()
                    if not line or line[0] == '#':
                        continue
                    parts = line.split('/')
                    if len(parts) == 2:
                        data_version_cache[parts[0]] = parts[1]
        else:
            manifest_url = f"{FTP_EGAP_ROOT}/{DATA_VERSION}.mft"
            manifest = urlopen(manifest_url)
            manifest_list = []
            for line in manifest:
                line = line.decode("utf-8").strip()
                if not line or line[0] == '#':
                    continue
                parts = line.split('/')
                if len(parts) == 2:
                    data_version_cache[parts[0]] = parts[1]
                    manifest_list.append(line)
            if user_cache_dir:
                with open(manifest_path, 'wt') as f:
                    for line in manifest_list:
                        f.write(f"{line}\n")

    if subsystem not in data_version_cache:
        return os.path.join(subsystem, filename)
    version = data_version_cache[subsystem]
    return os.path.join(subsystem, version, filename)


# Get file path for subsystem, either from cache or from remote server
def get_file_path(subsystem, filename):
    cache_dir = get_cache_dir()
    vfn = get_versioned_path(subsystem, filename)
    file_path = os.path.join(cache_dir, vfn)
    file_url = f"{FTP_EGAP_ROOT}/{vfn}"
    if os.path.exists(file_path):
        return file_path
    return file_url


def get_config(script_directory, args):
    config_file = ""
    config_dir = args.config_dir if args.config_dir else os.environ.get("EGAPX_CONFIG_DIR")
    if not config_dir:
        config_dir = Path(os.getcwd()) / "egapx_config"
    if not Path(config_dir).is_dir():
        # Create directory and copy executor config files there
        from_dir = Path(script_directory) / 'assets' / 'config' / 'executor'
        if args.verbosity >= VERBOSITY_VERBOSE:
            print(f"Copy config files from {from_dir} to {config_dir}")
        if not args.dry_run:
            os.mkdir(config_dir)
            ld = os.listdir(from_dir)
            for f in ld:
                shutil.copy2(from_dir / f, config_dir)
            print(f"Edit config files in {config_dir} to reflect your actual configuration, then repeat the command")
            return ""
    config_file = Path(config_dir) / (args.executor + '.config')
    if not config_file.is_file():
        print(f"Executor {args.executor} not supported")
        return ""
    default_configs = [ "default.config" ]
    with open(str(config_file), 'r') as f:
        config_txt = f.read().replace('\n', ' ')
        # Check whether the config sets the container
        mo = re.search(r"process.+container *=", config_txt)
        if not mo:
            default_configs.append("docker_image.config")
        # Check whether the config specifies proccess memory or CPUs
        mo = re.search(r"process.+(memory|cpus) *=", config_txt)
        if not mo:
            default_configs.append("process_resources.config")
    
    # Add mandatory configs
    config_files = [str(config_file)]
    for cf in default_configs:
        config_files.append(os.path.join(script_directory, "assets/config", cf))
    return ",".join(config_files)


lineage_cache = {}
def get_lineage(taxid):
    global lineage_cache
    if not taxid:
        return []
    if taxid in lineage_cache:
        return lineage_cache[taxid]
    # Try cached taxonomy database
    taxonomy_db_name = os.path.join(get_cache_dir(), get_versioned_path("taxonomy", "taxonomy4blast.sqlite3"))
    if os.path.exists(taxonomy_db_name):
        conn = sqlite3.connect(taxonomy_db_name)
        if conn:
            c = conn.cursor()
            lineage = [taxid]
            cur_taxid = taxid
            while cur_taxid != 1:
                c.execute("SELECT parent FROM TaxidInfo WHERE taxid = ?", (cur_taxid,))
                cur_taxid = c.fetchone()[0]
                lineage.append(cur_taxid)
            lineage.reverse()
            lineage_cache[taxid] = lineage
            return lineage
    
    # Fallback to API
    taxon_json_file = urlopen(dataset_taxonomy_url+str(taxid))
    taxon = json.load(taxon_json_file)["taxonomy_nodes"][0]
    lineage = taxon["taxonomy"]["lineage"]
    lineage.append(taxon["taxonomy"]["tax_id"])
    lineage_cache[taxid] = lineage
    return lineage


def get_tax_file(subsystem, tax_path):
    vfn = get_versioned_path(subsystem, tax_path)
    taxids_path = os.path.join(get_cache_dir(), vfn)
    taxids_url = f"{FTP_EGAP_ROOT}/{vfn}"
    taxids_file = []
    if os.path.exists(taxids_path):
        with open(taxids_path, "rb") as r:
            taxids_file = r.readlines()
    else:
        taxids_file = urlopen(taxids_url)
    return taxids_file

def get_closest_protein_bag(taxid):
    if not taxid:
        return ''

    taxids_file = get_tax_file("target_proteins", "taxid.list")
    taxids_list = []
    for line in taxids_file:
        line = line.decode("utf-8").strip()
        if len(line) == 0 or line[0] == '#':
            continue
        parts = line.split('\t')
        if len(parts) > 0:
            t = parts[0]
            taxids_list.append(int(t))

    lineage = get_lineage(taxid)

    best_taxid = None
    best_score = 0
    for t in taxids_list:
        try:
            pos = lineage.index(t)
        except ValueError:
            continue
        if pos > best_score:
            best_score = pos
            best_taxid = t

    if best_score == 0:
        return ''
    # print(best_taxid, best_score)
    return get_file_path("target_proteins", f"{best_taxid}.faa.gz")


def get_closest_hmm(taxid):
    if not taxid:
        return 0, ""

    taxids_file = get_tax_file("gnomon", "hmm_parameters/taxid.list")

    taxids_list = []
    lineages = []
    for line in taxids_file:
        parts = line.decode("utf-8").strip().split('\t')
        if len(parts) > 0:
            t = parts[0]
            taxids_list.append(t)
            if len(parts) > 1:
                l = map(lambda x: int(x) if x[-1] != ';' else int(x[:-1]), parts[1].split())
                lineages.append((int(t), list(l)+[int(t)]))

    #if len(lineages) < len(taxids_list):
    #    taxonomy_json_file = urlopen(dataset_taxonomy_url+','.join(taxids_list))
    #    taxonomy = json.load(taxonomy_json_file)["taxonomy_nodes"]
    #    lineages = [ (t["taxonomy"]["tax_id"], t["taxonomy"]["lineage"] + [t["taxonomy"]["tax_id"]]) for t in taxonomy ]

    lineage = get_lineage(taxid)

    best_lineage = None
    best_taxid = None
    best_score = 0
    for (t, l) in lineages:
        pos1 = 0
        last_match = 0
        for pos in range(len(lineage)):
            tax_id = lineage[pos]
            while tax_id != l[pos1]:
                if pos1 + 1 < len(l):
                    pos1 += 1
                else:
                    break
            if tax_id == l[pos1]:
                last_match = pos1
            else:
                break
        if last_match > best_score:
            best_score = last_match
            best_taxid = t
            best_lineage = l

    if best_score == 0:
        return 0, ""
    # print(best_lineage)
    # print(best_taxid, best_score)
    return best_taxid, get_file_path("gnomon", f"hmm_parameters/{best_taxid}.params")


PLANTS=33090
VERTEBRATES=7742
def get_max_intron(taxid):
    if not taxid:
        return 0, 0
    lineage = get_lineage(taxid)
    if PLANTS in lineage:
        return 300000, 3000000000
    elif VERTEBRATES in lineage:
        return 1200000, 2000000000
    else:
        return 600000, 500000000


def to_dict(x: List[str]):
    d = {}
    s = len(x)
    i = 0
    while i < s:
        el = x[i]
        i += 1
        if el and el[0] == '-':
            if i < s:
                v = x[i]
                if v and (v[0] != '-'  or (v[0] == '-' and ' ' in v)):
                    d[el] = v
                    i += 1
                else:
                    d[el] = ""
            else:
                d[el] = ""
        else:
            d[el] = ""
    return d

def merge_params(task_params, run_inputs):
    # Walk over the keys in run_inputs and merge them into task_params
    for k in run_inputs.keys():
        if k in task_params:
            if isinstance(task_params[k], dict) and isinstance(run_inputs[k], dict):
                task_params[k] = merge_params(task_params[k], run_inputs[k])
            else:
                task_params_dict = to_dict(shlex.split(task_params[k]))
                run_inputs_dict = to_dict(shlex.split(run_inputs[k]))
                task_params_dict.update(run_inputs_dict)
                task_params_list = []
                for k1, v in task_params_dict.items():
                    task_params_list.append(k1)
                    if v:
                        task_params_list.append(v)
                task_params[k] = shlex.join(task_params_list)
        else:
            task_params[k] = run_inputs[k]
    return task_params


def print_statistics(output):
    accept_gff = Path(output) / 'accept.gff'
    print(f"Statistics for {accept_gff}")
    counter = defaultdict(int)
    if accept_gff.exists():
        with open(accept_gff, 'rt') as f:
            for line in f:
                line = line.strip()
                if not line or line[0] == '#':
                    continue
                parts = line.split()
                if len(parts) < 3:
                    continue
                counter[parts[2]] += 1
    keys = list(counter.keys())
    keys.sort()
    for k in keys:
        print(f"{k:12s} {counter[k]}")


def main(argv):
    "Main script for EGAPx"
    #warn user that this is an alpha release
    print("\n!!WARNING!!\nThis is an alpha release with limited features and organism scope to collect initial feedback on execution. Outputs are not yet complete and not intended for production use.\n")

    # Parse command line
    args = parse_args(argv)

    global user_cache_dir
    if args.local_cache:
        # print(f"Local cache: {args.local_cache}")
        user_cache_dir = args.local_cache
    if args.download_only:
        if args.local_cache:
            if not args.dry_run:
                # print(f"Download only: {args.download_only}")
                os.makedirs(args.local_cache, exist_ok=True)
                download_egapx_ftp_data(args.local_cache)
            else:
                print(f"Download only to {args.local_cache}")
            return 0
        else:
            print("Local cache not set")
            return 1
    else:
        # Check that input and output set
        if not args.filename or not args.output:
            print("Input file and output directory must be set")
            return 1

    packaged_distro = bool(getattr(sys, '_MEIPASS', ''))
    script_directory = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
    config_file = get_config(script_directory, args)
    if not config_file:
        return 1
    
    # Check for workdir, set if not set, and manage last used workdir
    if not manage_workdir(args):
        return 1
   
    files_to_delete = []
    
    # Read default task parameters into a dict
    task_params = yaml.safe_load(open(Path(script_directory) / 'assets' / 'default_task_params.yaml', 'r'))
    run_inputs = repackage_inputs(yaml.safe_load(open(args.filename, 'r')))

    if not expand_and_validate_params(run_inputs):
        return 1

    # Command line overrides manifest input
    if args.output:
        run_inputs['output'] = args.output

    # Convert inputs to absolute paths
    convert_paths(run_inputs)

    # Create output directory if needed
    os.makedirs(run_inputs['output'], exist_ok=True)

    if args.summary_only:
        print_statistics(run_inputs['output'])
        return 0

    # Reformat reads into pairs in fromPairs format and add reads_metadata.tsv file
    prepare_reads(run_inputs)


    ##if True or args.download_only:
    ##    with open("./dumped_input.yaml", 'w') as f:
    ##        yaml.dump(run_inputs, f)
    ##        f.flush()
    ##return 0 

    # Add to default task parameters, if input file has some task parameters they will override the default
    task_params = merge_params(task_params, run_inputs)

    # Move output from YAML file to arguments to have more transparent Nextflow log
    output = task_params['output']
    del task_params['output']

    if args.func_name:
        task_params['func_name'] = args.func_name

    # Run nextflow process
    if args.verbosity >= VERBOSITY_VERBOSE:
        task_params['verbose'] = True
        print("Nextflow inputs:")
        print(yaml.dump(task_params))
        if 'reads_metadata' in run_inputs['input']:
            print("Reads metadata:")
            with open(run_inputs['input']['reads_metadata'], 'r') as f:
                print(f.read())
    if packaged_distro:
        main_nf = Path(script_directory) / 'nf' / 'ui.nf'
    else:
        main_nf = Path(script_directory) / '..' / 'nf' / 'ui.nf'
    nf_cmd = ["nextflow", "-C", config_file, "-log", f"{output}/nextflow.log", "run", main_nf, "--output", output]
    if args.stub_run:
        nf_cmd += ["-stub-run", "-profile", "stubrun"]
    if args.report:
        nf_cmd += ["-with-report", f"{args.report}.report.html", "-with-timeline", f"{args.report}.timeline.html"]
    else:
        nf_cmd += ["-with-report", f"{output}/run.report.html", "-with-timeline", f"{output}/run.timeline.html"]
    
    nf_cmd += ["-with-trace", f"{output}/run.trace.txt"]
    # if output directory does not exist, it will be created
    if not os.path.exists(output):
        os.makedirs(output)
    params_file = Path(output) / "run_params.yaml"
    nf_cmd += ["-params-file", str(params_file)]

    if args.dry_run:
        print(" ".join(map(str, nf_cmd)))
    else:
        with open(params_file, 'w') as f:
            yaml.dump(task_params, f)
            f.flush()
        if args.verbosity >= VERBOSITY_VERBOSE:
            print(" ".join(map(str, nf_cmd)))
        resume_file = Path(output) / "resume.sh"
        with open(resume_file, 'w') as f:
            f.write("#!/bin/bash\n")
            f.write(" ".join(map(str, nf_cmd)))
            f.write(" -resume")
            if os.environ.get('NXF_WORK'):
                f.write(" -work-dir " + os.environ['NXF_WORK'])
            f.write("\n")
        try:
            subprocess.run(nf_cmd, check=True, capture_output=(args.verbosity <= VERBOSITY_QUIET), text=True)
        except subprocess.CalledProcessError as e:
            print(e.stderr)
            print(f"To resume execution, run: sh {resume_file}")
            if files_to_delete:
                print(f"Don't forget to delete file(s) {' '.join(files_to_delete)}")
            return 1
    if not args.dry_run and not args.stub_run:
        print_statistics(output)
    # TODO: Use try-finally to delete the metadata file
    for f in files_to_delete:
        os.unlink(f)

    return 0

if __name__ == "__main__":
    sys.exit(main(sys.argv))