diff ui/egapx.py @ 0:d9c5c5b87fec draft

planemo upload for repository https://github.com/ncbi/egapx commit 8173d01b08d9a91c9ec5f6cb50af346edc8020c4
author fubar
date Sat, 03 Aug 2024 11:16:53 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/ui/egapx.py	Sat Aug 03 11:16:53 2024 +0000
@@ -0,0 +1,810 @@
+#!/usr/bin/env python
+# requires pip install pyyaml
+#
+import shlex
+import shutil
+import sys
+import os
+import argparse
+import subprocess
+import tempfile
+import re
+import time
+import datetime
+from collections import defaultdict
+from ftplib import FTP
+from ftplib import FTP_TLS
+import ftplib
+from pathlib import Path
+from typing import List
+from urllib.request import urlopen
+import json
+import sqlite3
+import stat
+
+import yaml
+
+VERBOSITY_DEFAULT=0
+VERBOSITY_QUIET=-1
+VERBOSITY_VERBOSE=1
+
+FTP_EGAP_PROTOCOL = "https"
+FTP_EGAP_SERVER = "ftp.ncbi.nlm.nih.gov"
+FTP_EGAP_ROOT_PATH = "genomes/TOOLS/EGAP/support_data"
+FTP_EGAP_ROOT = f"{FTP_EGAP_PROTOCOL}://{FTP_EGAP_SERVER}/{FTP_EGAP_ROOT_PATH}"
+DATA_VERSION = "current"
+dataset_taxonomy_url = "https://api.ncbi.nlm.nih.gov/datasets/v2alpha/taxonomy/taxon/"
+
+user_cache_dir = ''
+
+def parse_args(argv):
+    parser = argparse.ArgumentParser(description="Main script for EGAPx")
+    group = parser.add_argument_group('run')
+    group.add_argument("filename", nargs='?', help="YAML file with input: section with at least genome: and reads: parameters")
+    group.add_argument("-o", "--output", help="Output path", default="")
+    parser.add_argument("-e", "--executor", help="Nextflow executor, one of docker, singularity, aws, or local (for NCBI internal use only). Uses corresponding Nextflow config file", default="local")
+    parser.add_argument("-c", "--config-dir", help="Directory for executor config files, default is ./egapx_config. Can be also set as env EGAPX_CONFIG_DIR", default="")
+    parser.add_argument("-w", "--workdir", help="Working directory for cloud executor", default="")
+    parser.add_argument("-r", "--report", help="Report file prefix for report (.report.html) and timeline (.timeline.html) files, default is in output directory", default="")
+    parser.add_argument("-n", "--dry-run", action="store_true", default=False)
+    parser.add_argument("-st", "--stub-run", action="store_true", default=False)
+    parser.add_argument("-so", "--summary-only", help="Print result statistics only if available, do not compute result", action="store_true", default=False)
+    group = parser.add_argument_group('download')
+    group.add_argument("-dl", "--download-only", help="Download external files to local storage, so that future runs can be isolated", action="store_true", default=False)
+    parser.add_argument("-lc", "--local-cache", help="Where to store the downloaded files", default="")
+    parser.add_argument("-q", "--quiet", dest='verbosity', action='store_const', const=VERBOSITY_QUIET, default=VERBOSITY_DEFAULT)
+    parser.add_argument("-v", "--verbose", dest='verbosity', action='store_const', const=VERBOSITY_VERBOSE, default=VERBOSITY_DEFAULT)
+    parser.add_argument("-fn", "--func_name", help="func_name", default="")
+    return parser.parse_args(argv[1:])
+
+
+class FtpDownloader:
+    def __init__(self):
+        self.ftp = None
+
+    def connect(self, host):
+        self.host = host
+        self.reconnect() 
+
+    def reconnect(self):
+        self.ftp = FTP(self.host)
+        self.ftp.login()
+        self.ftp.set_debuglevel(0)
+       
+    ##ftp_types = set()
+    def download_ftp_file(self, ftp_name, local_path):
+        # print(f"file: {ftp_name}")
+        # print(f"f: { os.path.dirname(local_path)}")
+
+        os.makedirs(os.path.dirname(local_path), exist_ok=True)
+        try:
+            with open(local_path, 'wb') as f:
+                self.ftp.retrbinary("RETR {0}".format(ftp_name), f.write)
+            # print("downloaded: {0}".format(local_path))
+            return True
+        except FileNotFoundError:
+            print("FAILED FNF: {0}".format(local_path))
+        except EOFError:
+            print("FAILED EOF: {0}".format(local_path))
+            time.sleep(1)
+            self.reconnect()
+            print("retrying...")
+            r = self.download_ftp_file(ftp_name, local_path)
+            print("downloaded on retry: {0}".format(local_path))
+            return r
+        except BrokenPipeError:
+            print("FAILED BP: {0}".format(local_path))
+        except IsADirectoryError:
+            ## same as 550 but pre-exists
+            ## os.remove(local_path)
+            return 550
+        except ftplib.error_perm:
+            ## ftplib.error_perm: 550 genomes/TOOLS/EGAP/ortholog_references/9606/current: Not a regular file
+            ## its a symlink to a dir.
+            os.remove(local_path)
+            return 550
+        return False
+
+    # item: ('Eublepharis_macularius', {'modify': '20240125225739', 'perm': 'fle', 'size': '4096', 'type': 'dir', 'unique': '6CU599079F', 'unix.group': '562', 'unix.mode': '0444', 'unix.owner': '14'}
+    def should_download_file(self, ftp_item, local_name):
+        metadata = ftp_item[1]
+        ftp_modify = datetime.datetime.strptime(metadata['modify'], '%Y%m%d%H%M%S')
+        ftp_size = int(metadata['size'])
+        ftp_type = metadata['type']
+
+        local_stat = []
+        try:
+            local_stat = os.stat(local_name)
+        except FileNotFoundError:
+            return True
+        except NotADirectoryError:
+            return True
+        local_is_dir = stat.S_ISDIR(local_stat.st_mode)
+        #print(f"item: {ftp_item}")
+        #print(f"stat: {local_stat}") 
+ 
+        local_stat_dt = datetime.datetime.fromtimestamp(local_stat.st_mtime)
+
+        #print(f"should_dl: {ftp_size != local_stat.st_size}  {ftp_modify > local_stat_dt}  ")
+
+        if (ftp_type == 'file' and ftp_size != local_stat.st_size) or (ftp_type=='OS.unix=symlink' and ftp_size >= local_stat.st_size):
+            return True
+
+        if ftp_modify > local_stat_dt:
+            return True
+
+        return False
+
+    ## types
+    ## cdir and pdir: . and ..: do nothing
+    ## file: a file
+    ## dir: a dir
+    ## OS.unix=symlink: symlink to a file, treat like a file
+    def download_ftp_dir(self, ftp_path, local_path):
+        """ replicates a directory on an ftp server recursively """
+        for ftp_item in self.ftp.mlsd(ftp_path):
+            # print(f"ftp_item: {ftp_item}")
+            ##print(f"  name: {ftp_item[0]}")
+            ##print(f"  type: {ftp_item[1]['type']}")
+            name = ftp_item[0]
+            next_ftp_name="/".join([ftp_path,name])
+            next_local_name=os.sep.join([local_path,name])
+            ftp_type = ftp_item[1]['type']
+            ##ftp_types.add(ftp_type)
+            if ftp_type=='dir':
+                self.download_ftp_dir(next_ftp_name, next_local_name)
+            elif ftp_type=='file' or ftp_type=='OS.unix=symlink':
+                if self.should_download_file(ftp_item, next_local_name):
+                    r = self.download_ftp_file(next_ftp_name, next_local_name)
+                    if r == 550:
+                        self.download_ftp_dir(next_ftp_name, next_local_name)
+                    ##time.sleep(0.1)
+            ## else: . or .. do nothing
+
+
+def download_egapx_ftp_data(local_cache_dir):
+    global user_cache_dir
+    manifest_url = f"{FTP_EGAP_ROOT}/{DATA_VERSION}.mft"
+    manifest = urlopen(manifest_url)
+    manifest_path = f"{user_cache_dir}/{DATA_VERSION}.mft"
+    manifest_list = []
+    for line in manifest:
+        line = line.decode("utf-8").strip()
+        if not line or line[0] == '#':
+            continue
+        manifest_list.append(line)
+        print(f"Downloading {line}")
+        ftpd = FtpDownloader()
+        ftpd.connect(FTP_EGAP_SERVER)
+        ftpd.download_ftp_dir(FTP_EGAP_ROOT_PATH+f"/{line}", f"{local_cache_dir}/{line}")
+    if user_cache_dir:
+        with open(manifest_path, 'wt') as f:
+            for line in manifest_list:
+                f.write(f"{line}\n")
+    return 0
+
+
+def repackage_inputs(run_inputs):
+    "Repackage input parameters into 'input' key if not already there"
+    if 'input' in run_inputs:
+        return run_inputs
+    new_inputs = {}
+    new_inputs['input'] = {}
+    for key in run_inputs:
+        if key not in { 'tasks', 'output' }:
+            new_inputs['input'][key] = run_inputs[key]
+        else:
+            new_inputs[key] = run_inputs[key]
+    return new_inputs
+
+
+def convert_value(value):
+    "Convert paths to absolute paths when possible, look into objects and lists"
+    if isinstance(value, dict):
+        return {k: convert_value(v) for k, v in value.items()}
+    elif isinstance(value, list):
+        return [convert_value(v) for v in value]
+    else:
+        if value == '' or re.match(r'[a-z0-9]{2,5}://', value) or not os.path.exists(value):
+            # do not convert - this is a URL or empty string or non-file
+            return value
+        # convert to absolute path
+        return str(Path(value).absolute())
+
+
+path_inputs = { 'genome', 'hmm', 'softmask', 'reads_metadata', 'organelles', 'proteins', 'reads', 'rnaseq_alignments', 'protein_alignments' }
+def convert_paths(run_inputs):
+    "Convert paths to absolute paths where appropriate"
+    input_root = run_inputs['input']
+    for key in input_root:
+        if key in path_inputs:
+            if isinstance(input_root[key], list):
+                input_root[key] = [convert_value(v) for v in input_root[key]]
+            else:
+                input_root[key] = convert_value(input_root[key])
+    if 'output' in run_inputs:
+        run_inputs['output'] = convert_value(run_inputs['output'])
+
+
+def prepare_reads(run_inputs):
+    """Reformat reads input to be in 'fromPairs' format expected by egapx, i.e. [sample_id, [read1, read2]]
+    Generate reads metadata file with minimal information - paired/unpaired and valid for existing libraries"""
+    if 'reads' not in run_inputs['input'] or 'output' not in run_inputs:
+        return
+    prefixes = defaultdict(list)
+    reads = run_inputs['input']['reads']
+    if type(reads) == str:
+        del run_inputs['input']['reads']
+        run_inputs['input']['reads_query'] = reads
+        return
+    # Create fake metadata file for reads containing only one meaningful information piece - pairedness
+    # Have an entry in this file only for SRA runs - files with prefixes SRR, ERR, and DRR and digits
+    # Non-SRA reads are handled by rnaseq_collapse internally
+    has_files = False
+    for rf in run_inputs['input']['reads']:
+        if type(rf) == str:
+            name = Path(rf).parts[-1]
+            mo = re.match(r'([^._]+)', name)
+            if mo and mo.group(1) != name:
+                has_files = True
+                prefixes[mo.group(1)].append(rf)
+        elif type(rf) == list:
+            has_files = True
+            names = list(map(lambda x: re.match(r'([^.]+)', Path(x).parts[-1]).group(1), rf))
+            # Find common prefix
+            names.sort()
+            if len(names) == 1:
+                sample_id = names[0]
+            else:
+                for i in range(min(len(names[0]), len(names[-1]))):
+                    if names[0][i] != names[-1][i]:
+                        break
+                sample_id = names[0][0:i]
+            # Dont end prefix with . or _
+            i = len(sample_id) - 1
+            while i > 0 and sample_id[-1] in "._":
+                sample_id = sample_id[:-1]
+                i -= 1
+            prefixes[sample_id] = rf
+    if has_files: # len(prefixes):
+        # Always create metadata file even if it's empty
+        output = run_inputs['output']
+        with tempfile.NamedTemporaryFile(mode='w', delete=False, dir=output, prefix='egapx_reads_metadata_', suffix='.tsv') as f:
+            for k, v in prefixes.items():
+                if re.fullmatch(r'([DES]RR[0-9]+)', k):
+                    paired = 'paired' if len(v) == 2 else 'unpaired'
+                    # SRR9005248	NA	paired	2	2	NA	NA	NA	NA	NA	NA	NA	0
+                    rec = "\t".join([k, 'NA', paired, '2', '2', 'NA', 'NA', 'NA', 'NA', 'NA', 'NA', 'NA', '0'])
+                    f.write(rec + '\n')
+            f.flush()
+            run_inputs['input']['reads_metadata'] = f.name
+        run_inputs['input']['reads'] = [ [k, v] for k, v in prefixes.items() ]
+    elif reads:
+        del run_inputs['input']['reads']
+        run_inputs['input']['reads_query'] = "[Accession] OR ".join(reads) + "[Accession]"
+
+
+def expand_and_validate_params(run_inputs):
+    """ Expand implicit parameters and validate inputs
+    Args:
+        run_inputs (dict): Input parameters
+    Returns:
+        bool: True if parameters are valid, False otherwise
+    """
+    inputs = run_inputs['input']
+
+    if 'taxid' not in inputs:
+        print("ERROR: Missing parameter: 'taxid'")
+        return False
+
+    taxid = inputs['taxid']
+
+    if 'genome' not in inputs:
+        print("ERROR: Missing parameter: 'genome'")
+        return False
+
+    # Check for proteins input and if empty or no input at all, add closest protein bag
+    if 'proteins' not in inputs:
+        proteins = get_closest_protein_bag(taxid)
+        if not proteins:
+            # Proteins are not specified explicitly and not found by taxid
+            print(f"ERROR: Proteins are not found for tax id {inputs['taxid']}")
+            print("  This organism is not supported by current NCBI protein database")
+            print("  You can specify proteins manually using 'proteins' parameter")
+            return False
+        inputs['proteins'] = proteins
+
+    has_rnaseq = 'reads' in inputs or 'reads_ids' in inputs or 'reads_query' in inputs
+    if not has_rnaseq:
+        if inputs['proteins']:
+            print("WARNING: It is strongly advised to specify RNA-seq reads using 'reads' parameter\n")
+        else:
+            print("ERROR: Either proteins or RNA-seq reads should be provided for annotation")
+            return False
+
+    if 'hmm' not in inputs:
+        best_taxid, best_hmm = get_closest_hmm(taxid)
+        inputs['hmm'] = best_hmm
+        inputs['hmm_taxid'] = best_taxid
+    else:
+        # We assume the user knows what they're doing and the training is not necessary
+        inputs['hmm_taxid'] = taxid
+
+    if 'max_intron' not in inputs:
+        max_intron, genome_size_threshold = get_max_intron(taxid)
+        inputs['max_intron'] = max_intron
+        inputs['genome_size_threshold'] = genome_size_threshold
+    else:
+        # Given max_intron is a hard limit, no further calculation is necessary
+        inputs['genome_size_threshold'] = 0
+
+    return True
+
+
+def manage_workdir(args):
+    workdir_file = f"work_dir_{args.executor}.last"
+    if args.workdir:
+        os.environ['NXF_WORK'] = args.workdir
+        with open(workdir_file, 'w') as f:
+            f.write(args.workdir)
+    else:
+        if os.path.exists(workdir_file):
+            with open(workdir_file) as f:
+                os.environ['NXF_WORK'] = f.read().strip()
+        else:
+            if args.executor == 'aws':
+                print("Work directory not set, use -w at least once")
+                return False
+    return True
+
+
+def get_cache_dir():
+    global user_cache_dir
+    if user_cache_dir and os.path.exists(user_cache_dir):
+        return user_cache_dir
+    ##elif os.path.exists("/am/ftp-genomes/TOOLS/EGAP"):
+    ##    return "/am/ftp-genomes/TOOLS/EGAP"
+    return ""
+
+
+data_version_cache = {}
+def get_versioned_path(subsystem, filename):
+    global data_version_cache
+    global user_cache_dir
+    if not data_version_cache:
+        manifest_path = f"{user_cache_dir}/{DATA_VERSION}.mft"
+        if user_cache_dir and os.path.exists(manifest_path):
+            with open(manifest_path, 'rt') as f:
+                for line in f:
+                    line = line.strip()
+                    if not line or line[0] == '#':
+                        continue
+                    parts = line.split('/')
+                    if len(parts) == 2:
+                        data_version_cache[parts[0]] = parts[1]
+        else:
+            manifest_url = f"{FTP_EGAP_ROOT}/{DATA_VERSION}.mft"
+            manifest = urlopen(manifest_url)
+            manifest_list = []
+            for line in manifest:
+                line = line.decode("utf-8").strip()
+                if not line or line[0] == '#':
+                    continue
+                parts = line.split('/')
+                if len(parts) == 2:
+                    data_version_cache[parts[0]] = parts[1]
+                    manifest_list.append(line)
+            if user_cache_dir:
+                with open(manifest_path, 'wt') as f:
+                    for line in manifest_list:
+                        f.write(f"{line}\n")
+
+    if subsystem not in data_version_cache:
+        return os.path.join(subsystem, filename)
+    version = data_version_cache[subsystem]
+    return os.path.join(subsystem, version, filename)
+
+
+# Get file path for subsystem, either from cache or from remote server
+def get_file_path(subsystem, filename):
+    cache_dir = get_cache_dir()
+    vfn = get_versioned_path(subsystem, filename)
+    file_path = os.path.join(cache_dir, vfn)
+    file_url = f"{FTP_EGAP_ROOT}/{vfn}"
+    if os.path.exists(file_path):
+        return file_path
+    return file_url
+
+
+def get_config(script_directory, args):
+    config_file = ""
+    config_dir = args.config_dir if args.config_dir else os.environ.get("EGAPX_CONFIG_DIR")
+    if not config_dir:
+        config_dir = Path(os.getcwd()) / "egapx_config"
+    if not Path(config_dir).is_dir():
+        # Create directory and copy executor config files there
+        from_dir = Path(script_directory) / 'assets' / 'config' / 'executor'
+        if args.verbosity >= VERBOSITY_VERBOSE:
+            print(f"Copy config files from {from_dir} to {config_dir}")
+        if not args.dry_run:
+            os.mkdir(config_dir)
+            ld = os.listdir(from_dir)
+            for f in ld:
+                shutil.copy2(from_dir / f, config_dir)
+            print(f"Edit config files in {config_dir} to reflect your actual configuration, then repeat the command")
+            return ""
+    config_file = Path(config_dir) / (args.executor + '.config')
+    if not config_file.is_file():
+        print(f"Executor {args.executor} not supported")
+        return ""
+    default_configs = [ "default.config" ]
+    with open(str(config_file), 'r') as f:
+        config_txt = f.read().replace('\n', ' ')
+        # Check whether the config sets the container
+        mo = re.search(r"process.+container *=", config_txt)
+        if not mo:
+            default_configs.append("docker_image.config")
+        # Check whether the config specifies proccess memory or CPUs
+        mo = re.search(r"process.+(memory|cpus) *=", config_txt)
+        if not mo:
+            default_configs.append("process_resources.config")
+    
+    # Add mandatory configs
+    config_files = [str(config_file)]
+    for cf in default_configs:
+        config_files.append(os.path.join(script_directory, "assets/config", cf))
+    return ",".join(config_files)
+
+
+lineage_cache = {}
+def get_lineage(taxid):
+    global lineage_cache
+    if not taxid:
+        return []
+    if taxid in lineage_cache:
+        return lineage_cache[taxid]
+    # Try cached taxonomy database
+    taxonomy_db_name = os.path.join(get_cache_dir(), get_versioned_path("taxonomy", "taxonomy4blast.sqlite3"))
+    if os.path.exists(taxonomy_db_name):
+        conn = sqlite3.connect(taxonomy_db_name)
+        if conn:
+            c = conn.cursor()
+            lineage = [taxid]
+            cur_taxid = taxid
+            while cur_taxid != 1:
+                c.execute("SELECT parent FROM TaxidInfo WHERE taxid = ?", (cur_taxid,))
+                cur_taxid = c.fetchone()[0]
+                lineage.append(cur_taxid)
+            lineage.reverse()
+            lineage_cache[taxid] = lineage
+            return lineage
+    
+    # Fallback to API
+    taxon_json_file = urlopen(dataset_taxonomy_url+str(taxid))
+    taxon = json.load(taxon_json_file)["taxonomy_nodes"][0]
+    lineage = taxon["taxonomy"]["lineage"]
+    lineage.append(taxon["taxonomy"]["tax_id"])
+    lineage_cache[taxid] = lineage
+    return lineage
+
+
+def get_tax_file(subsystem, tax_path):
+    vfn = get_versioned_path(subsystem, tax_path)
+    taxids_path = os.path.join(get_cache_dir(), vfn)
+    taxids_url = f"{FTP_EGAP_ROOT}/{vfn}"
+    taxids_file = []
+    if os.path.exists(taxids_path):
+        with open(taxids_path, "rb") as r:
+            taxids_file = r.readlines()
+    else:
+        taxids_file = urlopen(taxids_url)
+    return taxids_file
+
+def get_closest_protein_bag(taxid):
+    if not taxid:
+        return ''
+
+    taxids_file = get_tax_file("target_proteins", "taxid.list")
+    taxids_list = []
+    for line in taxids_file:
+        line = line.decode("utf-8").strip()
+        if len(line) == 0 or line[0] == '#':
+            continue
+        parts = line.split('\t')
+        if len(parts) > 0:
+            t = parts[0]
+            taxids_list.append(int(t))
+
+    lineage = get_lineage(taxid)
+
+    best_taxid = None
+    best_score = 0
+    for t in taxids_list:
+        try:
+            pos = lineage.index(t)
+        except ValueError:
+            continue
+        if pos > best_score:
+            best_score = pos
+            best_taxid = t
+
+    if best_score == 0:
+        return ''
+    # print(best_taxid, best_score)
+    return get_file_path("target_proteins", f"{best_taxid}.faa.gz")
+
+
+def get_closest_hmm(taxid):
+    if not taxid:
+        return 0, ""
+
+    taxids_file = get_tax_file("gnomon", "hmm_parameters/taxid.list")
+
+    taxids_list = []
+    lineages = []
+    for line in taxids_file:
+        parts = line.decode("utf-8").strip().split('\t')
+        if len(parts) > 0:
+            t = parts[0]
+            taxids_list.append(t)
+            if len(parts) > 1:
+                l = map(lambda x: int(x) if x[-1] != ';' else int(x[:-1]), parts[1].split())
+                lineages.append((int(t), list(l)+[int(t)]))
+
+    #if len(lineages) < len(taxids_list):
+    #    taxonomy_json_file = urlopen(dataset_taxonomy_url+','.join(taxids_list))
+    #    taxonomy = json.load(taxonomy_json_file)["taxonomy_nodes"]
+    #    lineages = [ (t["taxonomy"]["tax_id"], t["taxonomy"]["lineage"] + [t["taxonomy"]["tax_id"]]) for t in taxonomy ]
+
+    lineage = get_lineage(taxid)
+
+    best_lineage = None
+    best_taxid = None
+    best_score = 0
+    for (t, l) in lineages:
+        pos1 = 0
+        last_match = 0
+        for pos in range(len(lineage)):
+            tax_id = lineage[pos]
+            while tax_id != l[pos1]:
+                if pos1 + 1 < len(l):
+                    pos1 += 1
+                else:
+                    break
+            if tax_id == l[pos1]:
+                last_match = pos1
+            else:
+                break
+        if last_match > best_score:
+            best_score = last_match
+            best_taxid = t
+            best_lineage = l
+
+    if best_score == 0:
+        return 0, ""
+    # print(best_lineage)
+    # print(best_taxid, best_score)
+    return best_taxid, get_file_path("gnomon", f"hmm_parameters/{best_taxid}.params")
+
+
+PLANTS=33090
+VERTEBRATES=7742
+def get_max_intron(taxid):
+    if not taxid:
+        return 0, 0
+    lineage = get_lineage(taxid)
+    if PLANTS in lineage:
+        return 300000, 3000000000
+    elif VERTEBRATES in lineage:
+        return 1200000, 2000000000
+    else:
+        return 600000, 500000000
+
+
+def to_dict(x: List[str]):
+    d = {}
+    s = len(x)
+    i = 0
+    while i < s:
+        el = x[i]
+        i += 1
+        if el and el[0] == '-':
+            if i < s:
+                v = x[i]
+                if v and (v[0] != '-'  or (v[0] == '-' and ' ' in v)):
+                    d[el] = v
+                    i += 1
+                else:
+                    d[el] = ""
+            else:
+                d[el] = ""
+        else:
+            d[el] = ""
+    return d
+
+def merge_params(task_params, run_inputs):
+    # Walk over the keys in run_inputs and merge them into task_params
+    for k in run_inputs.keys():
+        if k in task_params:
+            if isinstance(task_params[k], dict) and isinstance(run_inputs[k], dict):
+                task_params[k] = merge_params(task_params[k], run_inputs[k])
+            else:
+                task_params_dict = to_dict(shlex.split(task_params[k]))
+                run_inputs_dict = to_dict(shlex.split(run_inputs[k]))
+                task_params_dict.update(run_inputs_dict)
+                task_params_list = []
+                for k1, v in task_params_dict.items():
+                    task_params_list.append(k1)
+                    if v:
+                        task_params_list.append(v)
+                task_params[k] = shlex.join(task_params_list)
+        else:
+            task_params[k] = run_inputs[k]
+    return task_params
+
+
+def print_statistics(output):
+    accept_gff = Path(output) / 'accept.gff'
+    print(f"Statistics for {accept_gff}")
+    counter = defaultdict(int)
+    if accept_gff.exists():
+        with open(accept_gff, 'rt') as f:
+            for line in f:
+                line = line.strip()
+                if not line or line[0] == '#':
+                    continue
+                parts = line.split()
+                if len(parts) < 3:
+                    continue
+                counter[parts[2]] += 1
+    keys = list(counter.keys())
+    keys.sort()
+    for k in keys:
+        print(f"{k:12s} {counter[k]}")
+
+
+def main(argv):
+    "Main script for EGAPx"
+    #warn user that this is an alpha release
+    print("\n!!WARNING!!\nThis is an alpha release with limited features and organism scope to collect initial feedback on execution. Outputs are not yet complete and not intended for production use.\n")
+
+    # Parse command line
+    args = parse_args(argv)
+
+    global user_cache_dir
+    if args.local_cache:
+        # print(f"Local cache: {args.local_cache}")
+        user_cache_dir = args.local_cache
+    if args.download_only:
+        if args.local_cache:
+            if not args.dry_run:
+                # print(f"Download only: {args.download_only}")
+                os.makedirs(args.local_cache, exist_ok=True)
+                download_egapx_ftp_data(args.local_cache)
+            else:
+                print(f"Download only to {args.local_cache}")
+            return 0
+        else:
+            print("Local cache not set")
+            return 1
+    else:
+        # Check that input and output set
+        if not args.filename or not args.output:
+            print("Input file and output directory must be set")
+            return 1
+
+    packaged_distro = bool(getattr(sys, '_MEIPASS', ''))
+    script_directory = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
+    config_file = get_config(script_directory, args)
+    if not config_file:
+        return 1
+    
+    # Check for workdir, set if not set, and manage last used workdir
+    if not manage_workdir(args):
+        return 1
+   
+    files_to_delete = []
+    
+    # Read default task parameters into a dict
+    task_params = yaml.safe_load(open(Path(script_directory) / 'assets' / 'default_task_params.yaml', 'r'))
+    run_inputs = repackage_inputs(yaml.safe_load(open(args.filename, 'r')))
+
+    if not expand_and_validate_params(run_inputs):
+        return 1
+
+    # Command line overrides manifest input
+    if args.output:
+        run_inputs['output'] = args.output
+
+    # Convert inputs to absolute paths
+    convert_paths(run_inputs)
+
+    # Create output directory if needed
+    os.makedirs(run_inputs['output'], exist_ok=True)
+
+    if args.summary_only:
+        print_statistics(run_inputs['output'])
+        return 0
+
+    # Reformat reads into pairs in fromPairs format and add reads_metadata.tsv file
+    prepare_reads(run_inputs)
+
+
+    ##if True or args.download_only:
+    ##    with open("./dumped_input.yaml", 'w') as f:
+    ##        yaml.dump(run_inputs, f)
+    ##        f.flush()
+    ##return 0 
+
+    # Add to default task parameters, if input file has some task parameters they will override the default
+    task_params = merge_params(task_params, run_inputs)
+
+    # Move output from YAML file to arguments to have more transparent Nextflow log
+    output = task_params['output']
+    del task_params['output']
+
+    if args.func_name:
+        task_params['func_name'] = args.func_name
+
+    # Run nextflow process
+    if args.verbosity >= VERBOSITY_VERBOSE:
+        task_params['verbose'] = True
+        print("Nextflow inputs:")
+        print(yaml.dump(task_params))
+        if 'reads_metadata' in run_inputs['input']:
+            print("Reads metadata:")
+            with open(run_inputs['input']['reads_metadata'], 'r') as f:
+                print(f.read())
+    if packaged_distro:
+        main_nf = Path(script_directory) / 'nf' / 'ui.nf'
+    else:
+        main_nf = Path(script_directory) / '..' / 'nf' / 'ui.nf'
+    nf_cmd = ["nextflow", "-C", config_file, "-log", f"{output}/nextflow.log", "run", main_nf, "--output", output]
+    if args.stub_run:
+        nf_cmd += ["-stub-run", "-profile", "stubrun"]
+    if args.report:
+        nf_cmd += ["-with-report", f"{args.report}.report.html", "-with-timeline", f"{args.report}.timeline.html"]
+    else:
+        nf_cmd += ["-with-report", f"{output}/run.report.html", "-with-timeline", f"{output}/run.timeline.html"]
+    
+    nf_cmd += ["-with-trace", f"{output}/run.trace.txt"]
+    # if output directory does not exist, it will be created
+    if not os.path.exists(output):
+        os.makedirs(output)
+    params_file = Path(output) / "run_params.yaml"
+    nf_cmd += ["-params-file", str(params_file)]
+
+    if args.dry_run:
+        print(" ".join(map(str, nf_cmd)))
+    else:
+        with open(params_file, 'w') as f:
+            yaml.dump(task_params, f)
+            f.flush()
+        if args.verbosity >= VERBOSITY_VERBOSE:
+            print(" ".join(map(str, nf_cmd)))
+        resume_file = Path(output) / "resume.sh"
+        with open(resume_file, 'w') as f:
+            f.write("#!/bin/bash\n")
+            f.write(" ".join(map(str, nf_cmd)))
+            f.write(" -resume")
+            if os.environ.get('NXF_WORK'):
+                f.write(" -work-dir " + os.environ['NXF_WORK'])
+            f.write("\n")
+        try:
+            subprocess.run(nf_cmd, check=True, capture_output=(args.verbosity <= VERBOSITY_QUIET), text=True)
+        except subprocess.CalledProcessError as e:
+            print(e.stderr)
+            print(f"To resume execution, run: sh {resume_file}")
+            if files_to_delete:
+                print(f"Don't forget to delete file(s) {' '.join(files_to_delete)}")
+            return 1
+    if not args.dry_run and not args.stub_run:
+        print_statistics(output)
+    # TODO: Use try-finally to delete the metadata file
+    for f in files_to_delete:
+        os.unlink(f)
+
+    return 0
+
+if __name__ == "__main__":
+    sys.exit(main(sys.argv))