comparison ui/egapx.py @ 0:d9c5c5b87fec draft

planemo upload for repository https://github.com/ncbi/egapx commit 8173d01b08d9a91c9ec5f6cb50af346edc8020c4
author fubar
date Sat, 03 Aug 2024 11:16:53 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:d9c5c5b87fec
1 #!/usr/bin/env python
2 # requires pip install pyyaml
3 #
4 import shlex
5 import shutil
6 import sys
7 import os
8 import argparse
9 import subprocess
10 import tempfile
11 import re
12 import time
13 import datetime
14 from collections import defaultdict
15 from ftplib import FTP
16 from ftplib import FTP_TLS
17 import ftplib
18 from pathlib import Path
19 from typing import List
20 from urllib.request import urlopen
21 import json
22 import sqlite3
23 import stat
24
25 import yaml
26
27 VERBOSITY_DEFAULT=0
28 VERBOSITY_QUIET=-1
29 VERBOSITY_VERBOSE=1
30
31 FTP_EGAP_PROTOCOL = "https"
32 FTP_EGAP_SERVER = "ftp.ncbi.nlm.nih.gov"
33 FTP_EGAP_ROOT_PATH = "genomes/TOOLS/EGAP/support_data"
34 FTP_EGAP_ROOT = f"{FTP_EGAP_PROTOCOL}://{FTP_EGAP_SERVER}/{FTP_EGAP_ROOT_PATH}"
35 DATA_VERSION = "current"
36 dataset_taxonomy_url = "https://api.ncbi.nlm.nih.gov/datasets/v2alpha/taxonomy/taxon/"
37
38 user_cache_dir = ''
39
40 def parse_args(argv):
41 parser = argparse.ArgumentParser(description="Main script for EGAPx")
42 group = parser.add_argument_group('run')
43 group.add_argument("filename", nargs='?', help="YAML file with input: section with at least genome: and reads: parameters")
44 group.add_argument("-o", "--output", help="Output path", default="")
45 parser.add_argument("-e", "--executor", help="Nextflow executor, one of docker, singularity, aws, or local (for NCBI internal use only). Uses corresponding Nextflow config file", default="local")
46 parser.add_argument("-c", "--config-dir", help="Directory for executor config files, default is ./egapx_config. Can be also set as env EGAPX_CONFIG_DIR", default="")
47 parser.add_argument("-w", "--workdir", help="Working directory for cloud executor", default="")
48 parser.add_argument("-r", "--report", help="Report file prefix for report (.report.html) and timeline (.timeline.html) files, default is in output directory", default="")
49 parser.add_argument("-n", "--dry-run", action="store_true", default=False)
50 parser.add_argument("-st", "--stub-run", action="store_true", default=False)
51 parser.add_argument("-so", "--summary-only", help="Print result statistics only if available, do not compute result", action="store_true", default=False)
52 group = parser.add_argument_group('download')
53 group.add_argument("-dl", "--download-only", help="Download external files to local storage, so that future runs can be isolated", action="store_true", default=False)
54 parser.add_argument("-lc", "--local-cache", help="Where to store the downloaded files", default="")
55 parser.add_argument("-q", "--quiet", dest='verbosity', action='store_const', const=VERBOSITY_QUIET, default=VERBOSITY_DEFAULT)
56 parser.add_argument("-v", "--verbose", dest='verbosity', action='store_const', const=VERBOSITY_VERBOSE, default=VERBOSITY_DEFAULT)
57 parser.add_argument("-fn", "--func_name", help="func_name", default="")
58 return parser.parse_args(argv[1:])
59
60
61 class FtpDownloader:
62 def __init__(self):
63 self.ftp = None
64
65 def connect(self, host):
66 self.host = host
67 self.reconnect()
68
69 def reconnect(self):
70 self.ftp = FTP(self.host)
71 self.ftp.login()
72 self.ftp.set_debuglevel(0)
73
74 ##ftp_types = set()
75 def download_ftp_file(self, ftp_name, local_path):
76 # print(f"file: {ftp_name}")
77 # print(f"f: { os.path.dirname(local_path)}")
78
79 os.makedirs(os.path.dirname(local_path), exist_ok=True)
80 try:
81 with open(local_path, 'wb') as f:
82 self.ftp.retrbinary("RETR {0}".format(ftp_name), f.write)
83 # print("downloaded: {0}".format(local_path))
84 return True
85 except FileNotFoundError:
86 print("FAILED FNF: {0}".format(local_path))
87 except EOFError:
88 print("FAILED EOF: {0}".format(local_path))
89 time.sleep(1)
90 self.reconnect()
91 print("retrying...")
92 r = self.download_ftp_file(ftp_name, local_path)
93 print("downloaded on retry: {0}".format(local_path))
94 return r
95 except BrokenPipeError:
96 print("FAILED BP: {0}".format(local_path))
97 except IsADirectoryError:
98 ## same as 550 but pre-exists
99 ## os.remove(local_path)
100 return 550
101 except ftplib.error_perm:
102 ## ftplib.error_perm: 550 genomes/TOOLS/EGAP/ortholog_references/9606/current: Not a regular file
103 ## its a symlink to a dir.
104 os.remove(local_path)
105 return 550
106 return False
107
108 # item: ('Eublepharis_macularius', {'modify': '20240125225739', 'perm': 'fle', 'size': '4096', 'type': 'dir', 'unique': '6CU599079F', 'unix.group': '562', 'unix.mode': '0444', 'unix.owner': '14'}
109 def should_download_file(self, ftp_item, local_name):
110 metadata = ftp_item[1]
111 ftp_modify = datetime.datetime.strptime(metadata['modify'], '%Y%m%d%H%M%S')
112 ftp_size = int(metadata['size'])
113 ftp_type = metadata['type']
114
115 local_stat = []
116 try:
117 local_stat = os.stat(local_name)
118 except FileNotFoundError:
119 return True
120 except NotADirectoryError:
121 return True
122 local_is_dir = stat.S_ISDIR(local_stat.st_mode)
123 #print(f"item: {ftp_item}")
124 #print(f"stat: {local_stat}")
125
126 local_stat_dt = datetime.datetime.fromtimestamp(local_stat.st_mtime)
127
128 #print(f"should_dl: {ftp_size != local_stat.st_size} {ftp_modify > local_stat_dt} ")
129
130 if (ftp_type == 'file' and ftp_size != local_stat.st_size) or (ftp_type=='OS.unix=symlink' and ftp_size >= local_stat.st_size):
131 return True
132
133 if ftp_modify > local_stat_dt:
134 return True
135
136 return False
137
138 ## types
139 ## cdir and pdir: . and ..: do nothing
140 ## file: a file
141 ## dir: a dir
142 ## OS.unix=symlink: symlink to a file, treat like a file
143 def download_ftp_dir(self, ftp_path, local_path):
144 """ replicates a directory on an ftp server recursively """
145 for ftp_item in self.ftp.mlsd(ftp_path):
146 # print(f"ftp_item: {ftp_item}")
147 ##print(f" name: {ftp_item[0]}")
148 ##print(f" type: {ftp_item[1]['type']}")
149 name = ftp_item[0]
150 next_ftp_name="/".join([ftp_path,name])
151 next_local_name=os.sep.join([local_path,name])
152 ftp_type = ftp_item[1]['type']
153 ##ftp_types.add(ftp_type)
154 if ftp_type=='dir':
155 self.download_ftp_dir(next_ftp_name, next_local_name)
156 elif ftp_type=='file' or ftp_type=='OS.unix=symlink':
157 if self.should_download_file(ftp_item, next_local_name):
158 r = self.download_ftp_file(next_ftp_name, next_local_name)
159 if r == 550:
160 self.download_ftp_dir(next_ftp_name, next_local_name)
161 ##time.sleep(0.1)
162 ## else: . or .. do nothing
163
164
165 def download_egapx_ftp_data(local_cache_dir):
166 global user_cache_dir
167 manifest_url = f"{FTP_EGAP_ROOT}/{DATA_VERSION}.mft"
168 manifest = urlopen(manifest_url)
169 manifest_path = f"{user_cache_dir}/{DATA_VERSION}.mft"
170 manifest_list = []
171 for line in manifest:
172 line = line.decode("utf-8").strip()
173 if not line or line[0] == '#':
174 continue
175 manifest_list.append(line)
176 print(f"Downloading {line}")
177 ftpd = FtpDownloader()
178 ftpd.connect(FTP_EGAP_SERVER)
179 ftpd.download_ftp_dir(FTP_EGAP_ROOT_PATH+f"/{line}", f"{local_cache_dir}/{line}")
180 if user_cache_dir:
181 with open(manifest_path, 'wt') as f:
182 for line in manifest_list:
183 f.write(f"{line}\n")
184 return 0
185
186
187 def repackage_inputs(run_inputs):
188 "Repackage input parameters into 'input' key if not already there"
189 if 'input' in run_inputs:
190 return run_inputs
191 new_inputs = {}
192 new_inputs['input'] = {}
193 for key in run_inputs:
194 if key not in { 'tasks', 'output' }:
195 new_inputs['input'][key] = run_inputs[key]
196 else:
197 new_inputs[key] = run_inputs[key]
198 return new_inputs
199
200
201 def convert_value(value):
202 "Convert paths to absolute paths when possible, look into objects and lists"
203 if isinstance(value, dict):
204 return {k: convert_value(v) for k, v in value.items()}
205 elif isinstance(value, list):
206 return [convert_value(v) for v in value]
207 else:
208 if value == '' or re.match(r'[a-z0-9]{2,5}://', value) or not os.path.exists(value):
209 # do not convert - this is a URL or empty string or non-file
210 return value
211 # convert to absolute path
212 return str(Path(value).absolute())
213
214
215 path_inputs = { 'genome', 'hmm', 'softmask', 'reads_metadata', 'organelles', 'proteins', 'reads', 'rnaseq_alignments', 'protein_alignments' }
216 def convert_paths(run_inputs):
217 "Convert paths to absolute paths where appropriate"
218 input_root = run_inputs['input']
219 for key in input_root:
220 if key in path_inputs:
221 if isinstance(input_root[key], list):
222 input_root[key] = [convert_value(v) for v in input_root[key]]
223 else:
224 input_root[key] = convert_value(input_root[key])
225 if 'output' in run_inputs:
226 run_inputs['output'] = convert_value(run_inputs['output'])
227
228
229 def prepare_reads(run_inputs):
230 """Reformat reads input to be in 'fromPairs' format expected by egapx, i.e. [sample_id, [read1, read2]]
231 Generate reads metadata file with minimal information - paired/unpaired and valid for existing libraries"""
232 if 'reads' not in run_inputs['input'] or 'output' not in run_inputs:
233 return
234 prefixes = defaultdict(list)
235 reads = run_inputs['input']['reads']
236 if type(reads) == str:
237 del run_inputs['input']['reads']
238 run_inputs['input']['reads_query'] = reads
239 return
240 # Create fake metadata file for reads containing only one meaningful information piece - pairedness
241 # Have an entry in this file only for SRA runs - files with prefixes SRR, ERR, and DRR and digits
242 # Non-SRA reads are handled by rnaseq_collapse internally
243 has_files = False
244 for rf in run_inputs['input']['reads']:
245 if type(rf) == str:
246 name = Path(rf).parts[-1]
247 mo = re.match(r'([^._]+)', name)
248 if mo and mo.group(1) != name:
249 has_files = True
250 prefixes[mo.group(1)].append(rf)
251 elif type(rf) == list:
252 has_files = True
253 names = list(map(lambda x: re.match(r'([^.]+)', Path(x).parts[-1]).group(1), rf))
254 # Find common prefix
255 names.sort()
256 if len(names) == 1:
257 sample_id = names[0]
258 else:
259 for i in range(min(len(names[0]), len(names[-1]))):
260 if names[0][i] != names[-1][i]:
261 break
262 sample_id = names[0][0:i]
263 # Dont end prefix with . or _
264 i = len(sample_id) - 1
265 while i > 0 and sample_id[-1] in "._":
266 sample_id = sample_id[:-1]
267 i -= 1
268 prefixes[sample_id] = rf
269 if has_files: # len(prefixes):
270 # Always create metadata file even if it's empty
271 output = run_inputs['output']
272 with tempfile.NamedTemporaryFile(mode='w', delete=False, dir=output, prefix='egapx_reads_metadata_', suffix='.tsv') as f:
273 for k, v in prefixes.items():
274 if re.fullmatch(r'([DES]RR[0-9]+)', k):
275 paired = 'paired' if len(v) == 2 else 'unpaired'
276 # SRR9005248 NA paired 2 2 NA NA NA NA NA NA NA 0
277 rec = "\t".join([k, 'NA', paired, '2', '2', 'NA', 'NA', 'NA', 'NA', 'NA', 'NA', 'NA', '0'])
278 f.write(rec + '\n')
279 f.flush()
280 run_inputs['input']['reads_metadata'] = f.name
281 run_inputs['input']['reads'] = [ [k, v] for k, v in prefixes.items() ]
282 elif reads:
283 del run_inputs['input']['reads']
284 run_inputs['input']['reads_query'] = "[Accession] OR ".join(reads) + "[Accession]"
285
286
287 def expand_and_validate_params(run_inputs):
288 """ Expand implicit parameters and validate inputs
289 Args:
290 run_inputs (dict): Input parameters
291 Returns:
292 bool: True if parameters are valid, False otherwise
293 """
294 inputs = run_inputs['input']
295
296 if 'taxid' not in inputs:
297 print("ERROR: Missing parameter: 'taxid'")
298 return False
299
300 taxid = inputs['taxid']
301
302 if 'genome' not in inputs:
303 print("ERROR: Missing parameter: 'genome'")
304 return False
305
306 # Check for proteins input and if empty or no input at all, add closest protein bag
307 if 'proteins' not in inputs:
308 proteins = get_closest_protein_bag(taxid)
309 if not proteins:
310 # Proteins are not specified explicitly and not found by taxid
311 print(f"ERROR: Proteins are not found for tax id {inputs['taxid']}")
312 print(" This organism is not supported by current NCBI protein database")
313 print(" You can specify proteins manually using 'proteins' parameter")
314 return False
315 inputs['proteins'] = proteins
316
317 has_rnaseq = 'reads' in inputs or 'reads_ids' in inputs or 'reads_query' in inputs
318 if not has_rnaseq:
319 if inputs['proteins']:
320 print("WARNING: It is strongly advised to specify RNA-seq reads using 'reads' parameter\n")
321 else:
322 print("ERROR: Either proteins or RNA-seq reads should be provided for annotation")
323 return False
324
325 if 'hmm' not in inputs:
326 best_taxid, best_hmm = get_closest_hmm(taxid)
327 inputs['hmm'] = best_hmm
328 inputs['hmm_taxid'] = best_taxid
329 else:
330 # We assume the user knows what they're doing and the training is not necessary
331 inputs['hmm_taxid'] = taxid
332
333 if 'max_intron' not in inputs:
334 max_intron, genome_size_threshold = get_max_intron(taxid)
335 inputs['max_intron'] = max_intron
336 inputs['genome_size_threshold'] = genome_size_threshold
337 else:
338 # Given max_intron is a hard limit, no further calculation is necessary
339 inputs['genome_size_threshold'] = 0
340
341 return True
342
343
344 def manage_workdir(args):
345 workdir_file = f"work_dir_{args.executor}.last"
346 if args.workdir:
347 os.environ['NXF_WORK'] = args.workdir
348 with open(workdir_file, 'w') as f:
349 f.write(args.workdir)
350 else:
351 if os.path.exists(workdir_file):
352 with open(workdir_file) as f:
353 os.environ['NXF_WORK'] = f.read().strip()
354 else:
355 if args.executor == 'aws':
356 print("Work directory not set, use -w at least once")
357 return False
358 return True
359
360
361 def get_cache_dir():
362 global user_cache_dir
363 if user_cache_dir and os.path.exists(user_cache_dir):
364 return user_cache_dir
365 ##elif os.path.exists("/am/ftp-genomes/TOOLS/EGAP"):
366 ## return "/am/ftp-genomes/TOOLS/EGAP"
367 return ""
368
369
370 data_version_cache = {}
371 def get_versioned_path(subsystem, filename):
372 global data_version_cache
373 global user_cache_dir
374 if not data_version_cache:
375 manifest_path = f"{user_cache_dir}/{DATA_VERSION}.mft"
376 if user_cache_dir and os.path.exists(manifest_path):
377 with open(manifest_path, 'rt') as f:
378 for line in f:
379 line = line.strip()
380 if not line or line[0] == '#':
381 continue
382 parts = line.split('/')
383 if len(parts) == 2:
384 data_version_cache[parts[0]] = parts[1]
385 else:
386 manifest_url = f"{FTP_EGAP_ROOT}/{DATA_VERSION}.mft"
387 manifest = urlopen(manifest_url)
388 manifest_list = []
389 for line in manifest:
390 line = line.decode("utf-8").strip()
391 if not line or line[0] == '#':
392 continue
393 parts = line.split('/')
394 if len(parts) == 2:
395 data_version_cache[parts[0]] = parts[1]
396 manifest_list.append(line)
397 if user_cache_dir:
398 with open(manifest_path, 'wt') as f:
399 for line in manifest_list:
400 f.write(f"{line}\n")
401
402 if subsystem not in data_version_cache:
403 return os.path.join(subsystem, filename)
404 version = data_version_cache[subsystem]
405 return os.path.join(subsystem, version, filename)
406
407
408 # Get file path for subsystem, either from cache or from remote server
409 def get_file_path(subsystem, filename):
410 cache_dir = get_cache_dir()
411 vfn = get_versioned_path(subsystem, filename)
412 file_path = os.path.join(cache_dir, vfn)
413 file_url = f"{FTP_EGAP_ROOT}/{vfn}"
414 if os.path.exists(file_path):
415 return file_path
416 return file_url
417
418
419 def get_config(script_directory, args):
420 config_file = ""
421 config_dir = args.config_dir if args.config_dir else os.environ.get("EGAPX_CONFIG_DIR")
422 if not config_dir:
423 config_dir = Path(os.getcwd()) / "egapx_config"
424 if not Path(config_dir).is_dir():
425 # Create directory and copy executor config files there
426 from_dir = Path(script_directory) / 'assets' / 'config' / 'executor'
427 if args.verbosity >= VERBOSITY_VERBOSE:
428 print(f"Copy config files from {from_dir} to {config_dir}")
429 if not args.dry_run:
430 os.mkdir(config_dir)
431 ld = os.listdir(from_dir)
432 for f in ld:
433 shutil.copy2(from_dir / f, config_dir)
434 print(f"Edit config files in {config_dir} to reflect your actual configuration, then repeat the command")
435 return ""
436 config_file = Path(config_dir) / (args.executor + '.config')
437 if not config_file.is_file():
438 print(f"Executor {args.executor} not supported")
439 return ""
440 default_configs = [ "default.config" ]
441 with open(str(config_file), 'r') as f:
442 config_txt = f.read().replace('\n', ' ')
443 # Check whether the config sets the container
444 mo = re.search(r"process.+container *=", config_txt)
445 if not mo:
446 default_configs.append("docker_image.config")
447 # Check whether the config specifies proccess memory or CPUs
448 mo = re.search(r"process.+(memory|cpus) *=", config_txt)
449 if not mo:
450 default_configs.append("process_resources.config")
451
452 # Add mandatory configs
453 config_files = [str(config_file)]
454 for cf in default_configs:
455 config_files.append(os.path.join(script_directory, "assets/config", cf))
456 return ",".join(config_files)
457
458
459 lineage_cache = {}
460 def get_lineage(taxid):
461 global lineage_cache
462 if not taxid:
463 return []
464 if taxid in lineage_cache:
465 return lineage_cache[taxid]
466 # Try cached taxonomy database
467 taxonomy_db_name = os.path.join(get_cache_dir(), get_versioned_path("taxonomy", "taxonomy4blast.sqlite3"))
468 if os.path.exists(taxonomy_db_name):
469 conn = sqlite3.connect(taxonomy_db_name)
470 if conn:
471 c = conn.cursor()
472 lineage = [taxid]
473 cur_taxid = taxid
474 while cur_taxid != 1:
475 c.execute("SELECT parent FROM TaxidInfo WHERE taxid = ?", (cur_taxid,))
476 cur_taxid = c.fetchone()[0]
477 lineage.append(cur_taxid)
478 lineage.reverse()
479 lineage_cache[taxid] = lineage
480 return lineage
481
482 # Fallback to API
483 taxon_json_file = urlopen(dataset_taxonomy_url+str(taxid))
484 taxon = json.load(taxon_json_file)["taxonomy_nodes"][0]
485 lineage = taxon["taxonomy"]["lineage"]
486 lineage.append(taxon["taxonomy"]["tax_id"])
487 lineage_cache[taxid] = lineage
488 return lineage
489
490
491 def get_tax_file(subsystem, tax_path):
492 vfn = get_versioned_path(subsystem, tax_path)
493 taxids_path = os.path.join(get_cache_dir(), vfn)
494 taxids_url = f"{FTP_EGAP_ROOT}/{vfn}"
495 taxids_file = []
496 if os.path.exists(taxids_path):
497 with open(taxids_path, "rb") as r:
498 taxids_file = r.readlines()
499 else:
500 taxids_file = urlopen(taxids_url)
501 return taxids_file
502
503 def get_closest_protein_bag(taxid):
504 if not taxid:
505 return ''
506
507 taxids_file = get_tax_file("target_proteins", "taxid.list")
508 taxids_list = []
509 for line in taxids_file:
510 line = line.decode("utf-8").strip()
511 if len(line) == 0 or line[0] == '#':
512 continue
513 parts = line.split('\t')
514 if len(parts) > 0:
515 t = parts[0]
516 taxids_list.append(int(t))
517
518 lineage = get_lineage(taxid)
519
520 best_taxid = None
521 best_score = 0
522 for t in taxids_list:
523 try:
524 pos = lineage.index(t)
525 except ValueError:
526 continue
527 if pos > best_score:
528 best_score = pos
529 best_taxid = t
530
531 if best_score == 0:
532 return ''
533 # print(best_taxid, best_score)
534 return get_file_path("target_proteins", f"{best_taxid}.faa.gz")
535
536
537 def get_closest_hmm(taxid):
538 if not taxid:
539 return 0, ""
540
541 taxids_file = get_tax_file("gnomon", "hmm_parameters/taxid.list")
542
543 taxids_list = []
544 lineages = []
545 for line in taxids_file:
546 parts = line.decode("utf-8").strip().split('\t')
547 if len(parts) > 0:
548 t = parts[0]
549 taxids_list.append(t)
550 if len(parts) > 1:
551 l = map(lambda x: int(x) if x[-1] != ';' else int(x[:-1]), parts[1].split())
552 lineages.append((int(t), list(l)+[int(t)]))
553
554 #if len(lineages) < len(taxids_list):
555 # taxonomy_json_file = urlopen(dataset_taxonomy_url+','.join(taxids_list))
556 # taxonomy = json.load(taxonomy_json_file)["taxonomy_nodes"]
557 # lineages = [ (t["taxonomy"]["tax_id"], t["taxonomy"]["lineage"] + [t["taxonomy"]["tax_id"]]) for t in taxonomy ]
558
559 lineage = get_lineage(taxid)
560
561 best_lineage = None
562 best_taxid = None
563 best_score = 0
564 for (t, l) in lineages:
565 pos1 = 0
566 last_match = 0
567 for pos in range(len(lineage)):
568 tax_id = lineage[pos]
569 while tax_id != l[pos1]:
570 if pos1 + 1 < len(l):
571 pos1 += 1
572 else:
573 break
574 if tax_id == l[pos1]:
575 last_match = pos1
576 else:
577 break
578 if last_match > best_score:
579 best_score = last_match
580 best_taxid = t
581 best_lineage = l
582
583 if best_score == 0:
584 return 0, ""
585 # print(best_lineage)
586 # print(best_taxid, best_score)
587 return best_taxid, get_file_path("gnomon", f"hmm_parameters/{best_taxid}.params")
588
589
590 PLANTS=33090
591 VERTEBRATES=7742
592 def get_max_intron(taxid):
593 if not taxid:
594 return 0, 0
595 lineage = get_lineage(taxid)
596 if PLANTS in lineage:
597 return 300000, 3000000000
598 elif VERTEBRATES in lineage:
599 return 1200000, 2000000000
600 else:
601 return 600000, 500000000
602
603
604 def to_dict(x: List[str]):
605 d = {}
606 s = len(x)
607 i = 0
608 while i < s:
609 el = x[i]
610 i += 1
611 if el and el[0] == '-':
612 if i < s:
613 v = x[i]
614 if v and (v[0] != '-' or (v[0] == '-' and ' ' in v)):
615 d[el] = v
616 i += 1
617 else:
618 d[el] = ""
619 else:
620 d[el] = ""
621 else:
622 d[el] = ""
623 return d
624
625 def merge_params(task_params, run_inputs):
626 # Walk over the keys in run_inputs and merge them into task_params
627 for k in run_inputs.keys():
628 if k in task_params:
629 if isinstance(task_params[k], dict) and isinstance(run_inputs[k], dict):
630 task_params[k] = merge_params(task_params[k], run_inputs[k])
631 else:
632 task_params_dict = to_dict(shlex.split(task_params[k]))
633 run_inputs_dict = to_dict(shlex.split(run_inputs[k]))
634 task_params_dict.update(run_inputs_dict)
635 task_params_list = []
636 for k1, v in task_params_dict.items():
637 task_params_list.append(k1)
638 if v:
639 task_params_list.append(v)
640 task_params[k] = shlex.join(task_params_list)
641 else:
642 task_params[k] = run_inputs[k]
643 return task_params
644
645
646 def print_statistics(output):
647 accept_gff = Path(output) / 'accept.gff'
648 print(f"Statistics for {accept_gff}")
649 counter = defaultdict(int)
650 if accept_gff.exists():
651 with open(accept_gff, 'rt') as f:
652 for line in f:
653 line = line.strip()
654 if not line or line[0] == '#':
655 continue
656 parts = line.split()
657 if len(parts) < 3:
658 continue
659 counter[parts[2]] += 1
660 keys = list(counter.keys())
661 keys.sort()
662 for k in keys:
663 print(f"{k:12s} {counter[k]}")
664
665
666 def main(argv):
667 "Main script for EGAPx"
668 #warn user that this is an alpha release
669 print("\n!!WARNING!!\nThis is an alpha release with limited features and organism scope to collect initial feedback on execution. Outputs are not yet complete and not intended for production use.\n")
670
671 # Parse command line
672 args = parse_args(argv)
673
674 global user_cache_dir
675 if args.local_cache:
676 # print(f"Local cache: {args.local_cache}")
677 user_cache_dir = args.local_cache
678 if args.download_only:
679 if args.local_cache:
680 if not args.dry_run:
681 # print(f"Download only: {args.download_only}")
682 os.makedirs(args.local_cache, exist_ok=True)
683 download_egapx_ftp_data(args.local_cache)
684 else:
685 print(f"Download only to {args.local_cache}")
686 return 0
687 else:
688 print("Local cache not set")
689 return 1
690 else:
691 # Check that input and output set
692 if not args.filename or not args.output:
693 print("Input file and output directory must be set")
694 return 1
695
696 packaged_distro = bool(getattr(sys, '_MEIPASS', ''))
697 script_directory = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
698 config_file = get_config(script_directory, args)
699 if not config_file:
700 return 1
701
702 # Check for workdir, set if not set, and manage last used workdir
703 if not manage_workdir(args):
704 return 1
705
706 files_to_delete = []
707
708 # Read default task parameters into a dict
709 task_params = yaml.safe_load(open(Path(script_directory) / 'assets' / 'default_task_params.yaml', 'r'))
710 run_inputs = repackage_inputs(yaml.safe_load(open(args.filename, 'r')))
711
712 if not expand_and_validate_params(run_inputs):
713 return 1
714
715 # Command line overrides manifest input
716 if args.output:
717 run_inputs['output'] = args.output
718
719 # Convert inputs to absolute paths
720 convert_paths(run_inputs)
721
722 # Create output directory if needed
723 os.makedirs(run_inputs['output'], exist_ok=True)
724
725 if args.summary_only:
726 print_statistics(run_inputs['output'])
727 return 0
728
729 # Reformat reads into pairs in fromPairs format and add reads_metadata.tsv file
730 prepare_reads(run_inputs)
731
732
733 ##if True or args.download_only:
734 ## with open("./dumped_input.yaml", 'w') as f:
735 ## yaml.dump(run_inputs, f)
736 ## f.flush()
737 ##return 0
738
739 # Add to default task parameters, if input file has some task parameters they will override the default
740 task_params = merge_params(task_params, run_inputs)
741
742 # Move output from YAML file to arguments to have more transparent Nextflow log
743 output = task_params['output']
744 del task_params['output']
745
746 if args.func_name:
747 task_params['func_name'] = args.func_name
748
749 # Run nextflow process
750 if args.verbosity >= VERBOSITY_VERBOSE:
751 task_params['verbose'] = True
752 print("Nextflow inputs:")
753 print(yaml.dump(task_params))
754 if 'reads_metadata' in run_inputs['input']:
755 print("Reads metadata:")
756 with open(run_inputs['input']['reads_metadata'], 'r') as f:
757 print(f.read())
758 if packaged_distro:
759 main_nf = Path(script_directory) / 'nf' / 'ui.nf'
760 else:
761 main_nf = Path(script_directory) / '..' / 'nf' / 'ui.nf'
762 nf_cmd = ["nextflow", "-C", config_file, "-log", f"{output}/nextflow.log", "run", main_nf, "--output", output]
763 if args.stub_run:
764 nf_cmd += ["-stub-run", "-profile", "stubrun"]
765 if args.report:
766 nf_cmd += ["-with-report", f"{args.report}.report.html", "-with-timeline", f"{args.report}.timeline.html"]
767 else:
768 nf_cmd += ["-with-report", f"{output}/run.report.html", "-with-timeline", f"{output}/run.timeline.html"]
769
770 nf_cmd += ["-with-trace", f"{output}/run.trace.txt"]
771 # if output directory does not exist, it will be created
772 if not os.path.exists(output):
773 os.makedirs(output)
774 params_file = Path(output) / "run_params.yaml"
775 nf_cmd += ["-params-file", str(params_file)]
776
777 if args.dry_run:
778 print(" ".join(map(str, nf_cmd)))
779 else:
780 with open(params_file, 'w') as f:
781 yaml.dump(task_params, f)
782 f.flush()
783 if args.verbosity >= VERBOSITY_VERBOSE:
784 print(" ".join(map(str, nf_cmd)))
785 resume_file = Path(output) / "resume.sh"
786 with open(resume_file, 'w') as f:
787 f.write("#!/bin/bash\n")
788 f.write(" ".join(map(str, nf_cmd)))
789 f.write(" -resume")
790 if os.environ.get('NXF_WORK'):
791 f.write(" -work-dir " + os.environ['NXF_WORK'])
792 f.write("\n")
793 try:
794 subprocess.run(nf_cmd, check=True, capture_output=(args.verbosity <= VERBOSITY_QUIET), text=True)
795 except subprocess.CalledProcessError as e:
796 print(e.stderr)
797 print(f"To resume execution, run: sh {resume_file}")
798 if files_to_delete:
799 print(f"Don't forget to delete file(s) {' '.join(files_to_delete)}")
800 return 1
801 if not args.dry_run and not args.stub_run:
802 print_statistics(output)
803 # TODO: Use try-finally to delete the metadata file
804 for f in files_to_delete:
805 os.unlink(f)
806
807 return 0
808
809 if __name__ == "__main__":
810 sys.exit(main(sys.argv))