Mercurial > repos > fubar > egapx_runner
comparison ui/egapx.py @ 0:d9c5c5b87fec draft
planemo upload for repository https://github.com/ncbi/egapx commit 8173d01b08d9a91c9ec5f6cb50af346edc8020c4
author | fubar |
---|---|
date | Sat, 03 Aug 2024 11:16:53 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:d9c5c5b87fec |
---|---|
1 #!/usr/bin/env python | |
2 # requires pip install pyyaml | |
3 # | |
4 import shlex | |
5 import shutil | |
6 import sys | |
7 import os | |
8 import argparse | |
9 import subprocess | |
10 import tempfile | |
11 import re | |
12 import time | |
13 import datetime | |
14 from collections import defaultdict | |
15 from ftplib import FTP | |
16 from ftplib import FTP_TLS | |
17 import ftplib | |
18 from pathlib import Path | |
19 from typing import List | |
20 from urllib.request import urlopen | |
21 import json | |
22 import sqlite3 | |
23 import stat | |
24 | |
25 import yaml | |
26 | |
27 VERBOSITY_DEFAULT=0 | |
28 VERBOSITY_QUIET=-1 | |
29 VERBOSITY_VERBOSE=1 | |
30 | |
31 FTP_EGAP_PROTOCOL = "https" | |
32 FTP_EGAP_SERVER = "ftp.ncbi.nlm.nih.gov" | |
33 FTP_EGAP_ROOT_PATH = "genomes/TOOLS/EGAP/support_data" | |
34 FTP_EGAP_ROOT = f"{FTP_EGAP_PROTOCOL}://{FTP_EGAP_SERVER}/{FTP_EGAP_ROOT_PATH}" | |
35 DATA_VERSION = "current" | |
36 dataset_taxonomy_url = "https://api.ncbi.nlm.nih.gov/datasets/v2alpha/taxonomy/taxon/" | |
37 | |
38 user_cache_dir = '' | |
39 | |
40 def parse_args(argv): | |
41 parser = argparse.ArgumentParser(description="Main script for EGAPx") | |
42 group = parser.add_argument_group('run') | |
43 group.add_argument("filename", nargs='?', help="YAML file with input: section with at least genome: and reads: parameters") | |
44 group.add_argument("-o", "--output", help="Output path", default="") | |
45 parser.add_argument("-e", "--executor", help="Nextflow executor, one of docker, singularity, aws, or local (for NCBI internal use only). Uses corresponding Nextflow config file", default="local") | |
46 parser.add_argument("-c", "--config-dir", help="Directory for executor config files, default is ./egapx_config. Can be also set as env EGAPX_CONFIG_DIR", default="") | |
47 parser.add_argument("-w", "--workdir", help="Working directory for cloud executor", default="") | |
48 parser.add_argument("-r", "--report", help="Report file prefix for report (.report.html) and timeline (.timeline.html) files, default is in output directory", default="") | |
49 parser.add_argument("-n", "--dry-run", action="store_true", default=False) | |
50 parser.add_argument("-st", "--stub-run", action="store_true", default=False) | |
51 parser.add_argument("-so", "--summary-only", help="Print result statistics only if available, do not compute result", action="store_true", default=False) | |
52 group = parser.add_argument_group('download') | |
53 group.add_argument("-dl", "--download-only", help="Download external files to local storage, so that future runs can be isolated", action="store_true", default=False) | |
54 parser.add_argument("-lc", "--local-cache", help="Where to store the downloaded files", default="") | |
55 parser.add_argument("-q", "--quiet", dest='verbosity', action='store_const', const=VERBOSITY_QUIET, default=VERBOSITY_DEFAULT) | |
56 parser.add_argument("-v", "--verbose", dest='verbosity', action='store_const', const=VERBOSITY_VERBOSE, default=VERBOSITY_DEFAULT) | |
57 parser.add_argument("-fn", "--func_name", help="func_name", default="") | |
58 return parser.parse_args(argv[1:]) | |
59 | |
60 | |
61 class FtpDownloader: | |
62 def __init__(self): | |
63 self.ftp = None | |
64 | |
65 def connect(self, host): | |
66 self.host = host | |
67 self.reconnect() | |
68 | |
69 def reconnect(self): | |
70 self.ftp = FTP(self.host) | |
71 self.ftp.login() | |
72 self.ftp.set_debuglevel(0) | |
73 | |
74 ##ftp_types = set() | |
75 def download_ftp_file(self, ftp_name, local_path): | |
76 # print(f"file: {ftp_name}") | |
77 # print(f"f: { os.path.dirname(local_path)}") | |
78 | |
79 os.makedirs(os.path.dirname(local_path), exist_ok=True) | |
80 try: | |
81 with open(local_path, 'wb') as f: | |
82 self.ftp.retrbinary("RETR {0}".format(ftp_name), f.write) | |
83 # print("downloaded: {0}".format(local_path)) | |
84 return True | |
85 except FileNotFoundError: | |
86 print("FAILED FNF: {0}".format(local_path)) | |
87 except EOFError: | |
88 print("FAILED EOF: {0}".format(local_path)) | |
89 time.sleep(1) | |
90 self.reconnect() | |
91 print("retrying...") | |
92 r = self.download_ftp_file(ftp_name, local_path) | |
93 print("downloaded on retry: {0}".format(local_path)) | |
94 return r | |
95 except BrokenPipeError: | |
96 print("FAILED BP: {0}".format(local_path)) | |
97 except IsADirectoryError: | |
98 ## same as 550 but pre-exists | |
99 ## os.remove(local_path) | |
100 return 550 | |
101 except ftplib.error_perm: | |
102 ## ftplib.error_perm: 550 genomes/TOOLS/EGAP/ortholog_references/9606/current: Not a regular file | |
103 ## its a symlink to a dir. | |
104 os.remove(local_path) | |
105 return 550 | |
106 return False | |
107 | |
108 # item: ('Eublepharis_macularius', {'modify': '20240125225739', 'perm': 'fle', 'size': '4096', 'type': 'dir', 'unique': '6CU599079F', 'unix.group': '562', 'unix.mode': '0444', 'unix.owner': '14'} | |
109 def should_download_file(self, ftp_item, local_name): | |
110 metadata = ftp_item[1] | |
111 ftp_modify = datetime.datetime.strptime(metadata['modify'], '%Y%m%d%H%M%S') | |
112 ftp_size = int(metadata['size']) | |
113 ftp_type = metadata['type'] | |
114 | |
115 local_stat = [] | |
116 try: | |
117 local_stat = os.stat(local_name) | |
118 except FileNotFoundError: | |
119 return True | |
120 except NotADirectoryError: | |
121 return True | |
122 local_is_dir = stat.S_ISDIR(local_stat.st_mode) | |
123 #print(f"item: {ftp_item}") | |
124 #print(f"stat: {local_stat}") | |
125 | |
126 local_stat_dt = datetime.datetime.fromtimestamp(local_stat.st_mtime) | |
127 | |
128 #print(f"should_dl: {ftp_size != local_stat.st_size} {ftp_modify > local_stat_dt} ") | |
129 | |
130 if (ftp_type == 'file' and ftp_size != local_stat.st_size) or (ftp_type=='OS.unix=symlink' and ftp_size >= local_stat.st_size): | |
131 return True | |
132 | |
133 if ftp_modify > local_stat_dt: | |
134 return True | |
135 | |
136 return False | |
137 | |
138 ## types | |
139 ## cdir and pdir: . and ..: do nothing | |
140 ## file: a file | |
141 ## dir: a dir | |
142 ## OS.unix=symlink: symlink to a file, treat like a file | |
143 def download_ftp_dir(self, ftp_path, local_path): | |
144 """ replicates a directory on an ftp server recursively """ | |
145 for ftp_item in self.ftp.mlsd(ftp_path): | |
146 # print(f"ftp_item: {ftp_item}") | |
147 ##print(f" name: {ftp_item[0]}") | |
148 ##print(f" type: {ftp_item[1]['type']}") | |
149 name = ftp_item[0] | |
150 next_ftp_name="/".join([ftp_path,name]) | |
151 next_local_name=os.sep.join([local_path,name]) | |
152 ftp_type = ftp_item[1]['type'] | |
153 ##ftp_types.add(ftp_type) | |
154 if ftp_type=='dir': | |
155 self.download_ftp_dir(next_ftp_name, next_local_name) | |
156 elif ftp_type=='file' or ftp_type=='OS.unix=symlink': | |
157 if self.should_download_file(ftp_item, next_local_name): | |
158 r = self.download_ftp_file(next_ftp_name, next_local_name) | |
159 if r == 550: | |
160 self.download_ftp_dir(next_ftp_name, next_local_name) | |
161 ##time.sleep(0.1) | |
162 ## else: . or .. do nothing | |
163 | |
164 | |
165 def download_egapx_ftp_data(local_cache_dir): | |
166 global user_cache_dir | |
167 manifest_url = f"{FTP_EGAP_ROOT}/{DATA_VERSION}.mft" | |
168 manifest = urlopen(manifest_url) | |
169 manifest_path = f"{user_cache_dir}/{DATA_VERSION}.mft" | |
170 manifest_list = [] | |
171 for line in manifest: | |
172 line = line.decode("utf-8").strip() | |
173 if not line or line[0] == '#': | |
174 continue | |
175 manifest_list.append(line) | |
176 print(f"Downloading {line}") | |
177 ftpd = FtpDownloader() | |
178 ftpd.connect(FTP_EGAP_SERVER) | |
179 ftpd.download_ftp_dir(FTP_EGAP_ROOT_PATH+f"/{line}", f"{local_cache_dir}/{line}") | |
180 if user_cache_dir: | |
181 with open(manifest_path, 'wt') as f: | |
182 for line in manifest_list: | |
183 f.write(f"{line}\n") | |
184 return 0 | |
185 | |
186 | |
187 def repackage_inputs(run_inputs): | |
188 "Repackage input parameters into 'input' key if not already there" | |
189 if 'input' in run_inputs: | |
190 return run_inputs | |
191 new_inputs = {} | |
192 new_inputs['input'] = {} | |
193 for key in run_inputs: | |
194 if key not in { 'tasks', 'output' }: | |
195 new_inputs['input'][key] = run_inputs[key] | |
196 else: | |
197 new_inputs[key] = run_inputs[key] | |
198 return new_inputs | |
199 | |
200 | |
201 def convert_value(value): | |
202 "Convert paths to absolute paths when possible, look into objects and lists" | |
203 if isinstance(value, dict): | |
204 return {k: convert_value(v) for k, v in value.items()} | |
205 elif isinstance(value, list): | |
206 return [convert_value(v) for v in value] | |
207 else: | |
208 if value == '' or re.match(r'[a-z0-9]{2,5}://', value) or not os.path.exists(value): | |
209 # do not convert - this is a URL or empty string or non-file | |
210 return value | |
211 # convert to absolute path | |
212 return str(Path(value).absolute()) | |
213 | |
214 | |
215 path_inputs = { 'genome', 'hmm', 'softmask', 'reads_metadata', 'organelles', 'proteins', 'reads', 'rnaseq_alignments', 'protein_alignments' } | |
216 def convert_paths(run_inputs): | |
217 "Convert paths to absolute paths where appropriate" | |
218 input_root = run_inputs['input'] | |
219 for key in input_root: | |
220 if key in path_inputs: | |
221 if isinstance(input_root[key], list): | |
222 input_root[key] = [convert_value(v) for v in input_root[key]] | |
223 else: | |
224 input_root[key] = convert_value(input_root[key]) | |
225 if 'output' in run_inputs: | |
226 run_inputs['output'] = convert_value(run_inputs['output']) | |
227 | |
228 | |
229 def prepare_reads(run_inputs): | |
230 """Reformat reads input to be in 'fromPairs' format expected by egapx, i.e. [sample_id, [read1, read2]] | |
231 Generate reads metadata file with minimal information - paired/unpaired and valid for existing libraries""" | |
232 if 'reads' not in run_inputs['input'] or 'output' not in run_inputs: | |
233 return | |
234 prefixes = defaultdict(list) | |
235 reads = run_inputs['input']['reads'] | |
236 if type(reads) == str: | |
237 del run_inputs['input']['reads'] | |
238 run_inputs['input']['reads_query'] = reads | |
239 return | |
240 # Create fake metadata file for reads containing only one meaningful information piece - pairedness | |
241 # Have an entry in this file only for SRA runs - files with prefixes SRR, ERR, and DRR and digits | |
242 # Non-SRA reads are handled by rnaseq_collapse internally | |
243 has_files = False | |
244 for rf in run_inputs['input']['reads']: | |
245 if type(rf) == str: | |
246 name = Path(rf).parts[-1] | |
247 mo = re.match(r'([^._]+)', name) | |
248 if mo and mo.group(1) != name: | |
249 has_files = True | |
250 prefixes[mo.group(1)].append(rf) | |
251 elif type(rf) == list: | |
252 has_files = True | |
253 names = list(map(lambda x: re.match(r'([^.]+)', Path(x).parts[-1]).group(1), rf)) | |
254 # Find common prefix | |
255 names.sort() | |
256 if len(names) == 1: | |
257 sample_id = names[0] | |
258 else: | |
259 for i in range(min(len(names[0]), len(names[-1]))): | |
260 if names[0][i] != names[-1][i]: | |
261 break | |
262 sample_id = names[0][0:i] | |
263 # Dont end prefix with . or _ | |
264 i = len(sample_id) - 1 | |
265 while i > 0 and sample_id[-1] in "._": | |
266 sample_id = sample_id[:-1] | |
267 i -= 1 | |
268 prefixes[sample_id] = rf | |
269 if has_files: # len(prefixes): | |
270 # Always create metadata file even if it's empty | |
271 output = run_inputs['output'] | |
272 with tempfile.NamedTemporaryFile(mode='w', delete=False, dir=output, prefix='egapx_reads_metadata_', suffix='.tsv') as f: | |
273 for k, v in prefixes.items(): | |
274 if re.fullmatch(r'([DES]RR[0-9]+)', k): | |
275 paired = 'paired' if len(v) == 2 else 'unpaired' | |
276 # SRR9005248 NA paired 2 2 NA NA NA NA NA NA NA 0 | |
277 rec = "\t".join([k, 'NA', paired, '2', '2', 'NA', 'NA', 'NA', 'NA', 'NA', 'NA', 'NA', '0']) | |
278 f.write(rec + '\n') | |
279 f.flush() | |
280 run_inputs['input']['reads_metadata'] = f.name | |
281 run_inputs['input']['reads'] = [ [k, v] for k, v in prefixes.items() ] | |
282 elif reads: | |
283 del run_inputs['input']['reads'] | |
284 run_inputs['input']['reads_query'] = "[Accession] OR ".join(reads) + "[Accession]" | |
285 | |
286 | |
287 def expand_and_validate_params(run_inputs): | |
288 """ Expand implicit parameters and validate inputs | |
289 Args: | |
290 run_inputs (dict): Input parameters | |
291 Returns: | |
292 bool: True if parameters are valid, False otherwise | |
293 """ | |
294 inputs = run_inputs['input'] | |
295 | |
296 if 'taxid' not in inputs: | |
297 print("ERROR: Missing parameter: 'taxid'") | |
298 return False | |
299 | |
300 taxid = inputs['taxid'] | |
301 | |
302 if 'genome' not in inputs: | |
303 print("ERROR: Missing parameter: 'genome'") | |
304 return False | |
305 | |
306 # Check for proteins input and if empty or no input at all, add closest protein bag | |
307 if 'proteins' not in inputs: | |
308 proteins = get_closest_protein_bag(taxid) | |
309 if not proteins: | |
310 # Proteins are not specified explicitly and not found by taxid | |
311 print(f"ERROR: Proteins are not found for tax id {inputs['taxid']}") | |
312 print(" This organism is not supported by current NCBI protein database") | |
313 print(" You can specify proteins manually using 'proteins' parameter") | |
314 return False | |
315 inputs['proteins'] = proteins | |
316 | |
317 has_rnaseq = 'reads' in inputs or 'reads_ids' in inputs or 'reads_query' in inputs | |
318 if not has_rnaseq: | |
319 if inputs['proteins']: | |
320 print("WARNING: It is strongly advised to specify RNA-seq reads using 'reads' parameter\n") | |
321 else: | |
322 print("ERROR: Either proteins or RNA-seq reads should be provided for annotation") | |
323 return False | |
324 | |
325 if 'hmm' not in inputs: | |
326 best_taxid, best_hmm = get_closest_hmm(taxid) | |
327 inputs['hmm'] = best_hmm | |
328 inputs['hmm_taxid'] = best_taxid | |
329 else: | |
330 # We assume the user knows what they're doing and the training is not necessary | |
331 inputs['hmm_taxid'] = taxid | |
332 | |
333 if 'max_intron' not in inputs: | |
334 max_intron, genome_size_threshold = get_max_intron(taxid) | |
335 inputs['max_intron'] = max_intron | |
336 inputs['genome_size_threshold'] = genome_size_threshold | |
337 else: | |
338 # Given max_intron is a hard limit, no further calculation is necessary | |
339 inputs['genome_size_threshold'] = 0 | |
340 | |
341 return True | |
342 | |
343 | |
344 def manage_workdir(args): | |
345 workdir_file = f"work_dir_{args.executor}.last" | |
346 if args.workdir: | |
347 os.environ['NXF_WORK'] = args.workdir | |
348 with open(workdir_file, 'w') as f: | |
349 f.write(args.workdir) | |
350 else: | |
351 if os.path.exists(workdir_file): | |
352 with open(workdir_file) as f: | |
353 os.environ['NXF_WORK'] = f.read().strip() | |
354 else: | |
355 if args.executor == 'aws': | |
356 print("Work directory not set, use -w at least once") | |
357 return False | |
358 return True | |
359 | |
360 | |
361 def get_cache_dir(): | |
362 global user_cache_dir | |
363 if user_cache_dir and os.path.exists(user_cache_dir): | |
364 return user_cache_dir | |
365 ##elif os.path.exists("/am/ftp-genomes/TOOLS/EGAP"): | |
366 ## return "/am/ftp-genomes/TOOLS/EGAP" | |
367 return "" | |
368 | |
369 | |
370 data_version_cache = {} | |
371 def get_versioned_path(subsystem, filename): | |
372 global data_version_cache | |
373 global user_cache_dir | |
374 if not data_version_cache: | |
375 manifest_path = f"{user_cache_dir}/{DATA_VERSION}.mft" | |
376 if user_cache_dir and os.path.exists(manifest_path): | |
377 with open(manifest_path, 'rt') as f: | |
378 for line in f: | |
379 line = line.strip() | |
380 if not line or line[0] == '#': | |
381 continue | |
382 parts = line.split('/') | |
383 if len(parts) == 2: | |
384 data_version_cache[parts[0]] = parts[1] | |
385 else: | |
386 manifest_url = f"{FTP_EGAP_ROOT}/{DATA_VERSION}.mft" | |
387 manifest = urlopen(manifest_url) | |
388 manifest_list = [] | |
389 for line in manifest: | |
390 line = line.decode("utf-8").strip() | |
391 if not line or line[0] == '#': | |
392 continue | |
393 parts = line.split('/') | |
394 if len(parts) == 2: | |
395 data_version_cache[parts[0]] = parts[1] | |
396 manifest_list.append(line) | |
397 if user_cache_dir: | |
398 with open(manifest_path, 'wt') as f: | |
399 for line in manifest_list: | |
400 f.write(f"{line}\n") | |
401 | |
402 if subsystem not in data_version_cache: | |
403 return os.path.join(subsystem, filename) | |
404 version = data_version_cache[subsystem] | |
405 return os.path.join(subsystem, version, filename) | |
406 | |
407 | |
408 # Get file path for subsystem, either from cache or from remote server | |
409 def get_file_path(subsystem, filename): | |
410 cache_dir = get_cache_dir() | |
411 vfn = get_versioned_path(subsystem, filename) | |
412 file_path = os.path.join(cache_dir, vfn) | |
413 file_url = f"{FTP_EGAP_ROOT}/{vfn}" | |
414 if os.path.exists(file_path): | |
415 return file_path | |
416 return file_url | |
417 | |
418 | |
419 def get_config(script_directory, args): | |
420 config_file = "" | |
421 config_dir = args.config_dir if args.config_dir else os.environ.get("EGAPX_CONFIG_DIR") | |
422 if not config_dir: | |
423 config_dir = Path(os.getcwd()) / "egapx_config" | |
424 if not Path(config_dir).is_dir(): | |
425 # Create directory and copy executor config files there | |
426 from_dir = Path(script_directory) / 'assets' / 'config' / 'executor' | |
427 if args.verbosity >= VERBOSITY_VERBOSE: | |
428 print(f"Copy config files from {from_dir} to {config_dir}") | |
429 if not args.dry_run: | |
430 os.mkdir(config_dir) | |
431 ld = os.listdir(from_dir) | |
432 for f in ld: | |
433 shutil.copy2(from_dir / f, config_dir) | |
434 print(f"Edit config files in {config_dir} to reflect your actual configuration, then repeat the command") | |
435 return "" | |
436 config_file = Path(config_dir) / (args.executor + '.config') | |
437 if not config_file.is_file(): | |
438 print(f"Executor {args.executor} not supported") | |
439 return "" | |
440 default_configs = [ "default.config" ] | |
441 with open(str(config_file), 'r') as f: | |
442 config_txt = f.read().replace('\n', ' ') | |
443 # Check whether the config sets the container | |
444 mo = re.search(r"process.+container *=", config_txt) | |
445 if not mo: | |
446 default_configs.append("docker_image.config") | |
447 # Check whether the config specifies proccess memory or CPUs | |
448 mo = re.search(r"process.+(memory|cpus) *=", config_txt) | |
449 if not mo: | |
450 default_configs.append("process_resources.config") | |
451 | |
452 # Add mandatory configs | |
453 config_files = [str(config_file)] | |
454 for cf in default_configs: | |
455 config_files.append(os.path.join(script_directory, "assets/config", cf)) | |
456 return ",".join(config_files) | |
457 | |
458 | |
459 lineage_cache = {} | |
460 def get_lineage(taxid): | |
461 global lineage_cache | |
462 if not taxid: | |
463 return [] | |
464 if taxid in lineage_cache: | |
465 return lineage_cache[taxid] | |
466 # Try cached taxonomy database | |
467 taxonomy_db_name = os.path.join(get_cache_dir(), get_versioned_path("taxonomy", "taxonomy4blast.sqlite3")) | |
468 if os.path.exists(taxonomy_db_name): | |
469 conn = sqlite3.connect(taxonomy_db_name) | |
470 if conn: | |
471 c = conn.cursor() | |
472 lineage = [taxid] | |
473 cur_taxid = taxid | |
474 while cur_taxid != 1: | |
475 c.execute("SELECT parent FROM TaxidInfo WHERE taxid = ?", (cur_taxid,)) | |
476 cur_taxid = c.fetchone()[0] | |
477 lineage.append(cur_taxid) | |
478 lineage.reverse() | |
479 lineage_cache[taxid] = lineage | |
480 return lineage | |
481 | |
482 # Fallback to API | |
483 taxon_json_file = urlopen(dataset_taxonomy_url+str(taxid)) | |
484 taxon = json.load(taxon_json_file)["taxonomy_nodes"][0] | |
485 lineage = taxon["taxonomy"]["lineage"] | |
486 lineage.append(taxon["taxonomy"]["tax_id"]) | |
487 lineage_cache[taxid] = lineage | |
488 return lineage | |
489 | |
490 | |
491 def get_tax_file(subsystem, tax_path): | |
492 vfn = get_versioned_path(subsystem, tax_path) | |
493 taxids_path = os.path.join(get_cache_dir(), vfn) | |
494 taxids_url = f"{FTP_EGAP_ROOT}/{vfn}" | |
495 taxids_file = [] | |
496 if os.path.exists(taxids_path): | |
497 with open(taxids_path, "rb") as r: | |
498 taxids_file = r.readlines() | |
499 else: | |
500 taxids_file = urlopen(taxids_url) | |
501 return taxids_file | |
502 | |
503 def get_closest_protein_bag(taxid): | |
504 if not taxid: | |
505 return '' | |
506 | |
507 taxids_file = get_tax_file("target_proteins", "taxid.list") | |
508 taxids_list = [] | |
509 for line in taxids_file: | |
510 line = line.decode("utf-8").strip() | |
511 if len(line) == 0 or line[0] == '#': | |
512 continue | |
513 parts = line.split('\t') | |
514 if len(parts) > 0: | |
515 t = parts[0] | |
516 taxids_list.append(int(t)) | |
517 | |
518 lineage = get_lineage(taxid) | |
519 | |
520 best_taxid = None | |
521 best_score = 0 | |
522 for t in taxids_list: | |
523 try: | |
524 pos = lineage.index(t) | |
525 except ValueError: | |
526 continue | |
527 if pos > best_score: | |
528 best_score = pos | |
529 best_taxid = t | |
530 | |
531 if best_score == 0: | |
532 return '' | |
533 # print(best_taxid, best_score) | |
534 return get_file_path("target_proteins", f"{best_taxid}.faa.gz") | |
535 | |
536 | |
537 def get_closest_hmm(taxid): | |
538 if not taxid: | |
539 return 0, "" | |
540 | |
541 taxids_file = get_tax_file("gnomon", "hmm_parameters/taxid.list") | |
542 | |
543 taxids_list = [] | |
544 lineages = [] | |
545 for line in taxids_file: | |
546 parts = line.decode("utf-8").strip().split('\t') | |
547 if len(parts) > 0: | |
548 t = parts[0] | |
549 taxids_list.append(t) | |
550 if len(parts) > 1: | |
551 l = map(lambda x: int(x) if x[-1] != ';' else int(x[:-1]), parts[1].split()) | |
552 lineages.append((int(t), list(l)+[int(t)])) | |
553 | |
554 #if len(lineages) < len(taxids_list): | |
555 # taxonomy_json_file = urlopen(dataset_taxonomy_url+','.join(taxids_list)) | |
556 # taxonomy = json.load(taxonomy_json_file)["taxonomy_nodes"] | |
557 # lineages = [ (t["taxonomy"]["tax_id"], t["taxonomy"]["lineage"] + [t["taxonomy"]["tax_id"]]) for t in taxonomy ] | |
558 | |
559 lineage = get_lineage(taxid) | |
560 | |
561 best_lineage = None | |
562 best_taxid = None | |
563 best_score = 0 | |
564 for (t, l) in lineages: | |
565 pos1 = 0 | |
566 last_match = 0 | |
567 for pos in range(len(lineage)): | |
568 tax_id = lineage[pos] | |
569 while tax_id != l[pos1]: | |
570 if pos1 + 1 < len(l): | |
571 pos1 += 1 | |
572 else: | |
573 break | |
574 if tax_id == l[pos1]: | |
575 last_match = pos1 | |
576 else: | |
577 break | |
578 if last_match > best_score: | |
579 best_score = last_match | |
580 best_taxid = t | |
581 best_lineage = l | |
582 | |
583 if best_score == 0: | |
584 return 0, "" | |
585 # print(best_lineage) | |
586 # print(best_taxid, best_score) | |
587 return best_taxid, get_file_path("gnomon", f"hmm_parameters/{best_taxid}.params") | |
588 | |
589 | |
590 PLANTS=33090 | |
591 VERTEBRATES=7742 | |
592 def get_max_intron(taxid): | |
593 if not taxid: | |
594 return 0, 0 | |
595 lineage = get_lineage(taxid) | |
596 if PLANTS in lineage: | |
597 return 300000, 3000000000 | |
598 elif VERTEBRATES in lineage: | |
599 return 1200000, 2000000000 | |
600 else: | |
601 return 600000, 500000000 | |
602 | |
603 | |
604 def to_dict(x: List[str]): | |
605 d = {} | |
606 s = len(x) | |
607 i = 0 | |
608 while i < s: | |
609 el = x[i] | |
610 i += 1 | |
611 if el and el[0] == '-': | |
612 if i < s: | |
613 v = x[i] | |
614 if v and (v[0] != '-' or (v[0] == '-' and ' ' in v)): | |
615 d[el] = v | |
616 i += 1 | |
617 else: | |
618 d[el] = "" | |
619 else: | |
620 d[el] = "" | |
621 else: | |
622 d[el] = "" | |
623 return d | |
624 | |
625 def merge_params(task_params, run_inputs): | |
626 # Walk over the keys in run_inputs and merge them into task_params | |
627 for k in run_inputs.keys(): | |
628 if k in task_params: | |
629 if isinstance(task_params[k], dict) and isinstance(run_inputs[k], dict): | |
630 task_params[k] = merge_params(task_params[k], run_inputs[k]) | |
631 else: | |
632 task_params_dict = to_dict(shlex.split(task_params[k])) | |
633 run_inputs_dict = to_dict(shlex.split(run_inputs[k])) | |
634 task_params_dict.update(run_inputs_dict) | |
635 task_params_list = [] | |
636 for k1, v in task_params_dict.items(): | |
637 task_params_list.append(k1) | |
638 if v: | |
639 task_params_list.append(v) | |
640 task_params[k] = shlex.join(task_params_list) | |
641 else: | |
642 task_params[k] = run_inputs[k] | |
643 return task_params | |
644 | |
645 | |
646 def print_statistics(output): | |
647 accept_gff = Path(output) / 'accept.gff' | |
648 print(f"Statistics for {accept_gff}") | |
649 counter = defaultdict(int) | |
650 if accept_gff.exists(): | |
651 with open(accept_gff, 'rt') as f: | |
652 for line in f: | |
653 line = line.strip() | |
654 if not line or line[0] == '#': | |
655 continue | |
656 parts = line.split() | |
657 if len(parts) < 3: | |
658 continue | |
659 counter[parts[2]] += 1 | |
660 keys = list(counter.keys()) | |
661 keys.sort() | |
662 for k in keys: | |
663 print(f"{k:12s} {counter[k]}") | |
664 | |
665 | |
666 def main(argv): | |
667 "Main script for EGAPx" | |
668 #warn user that this is an alpha release | |
669 print("\n!!WARNING!!\nThis is an alpha release with limited features and organism scope to collect initial feedback on execution. Outputs are not yet complete and not intended for production use.\n") | |
670 | |
671 # Parse command line | |
672 args = parse_args(argv) | |
673 | |
674 global user_cache_dir | |
675 if args.local_cache: | |
676 # print(f"Local cache: {args.local_cache}") | |
677 user_cache_dir = args.local_cache | |
678 if args.download_only: | |
679 if args.local_cache: | |
680 if not args.dry_run: | |
681 # print(f"Download only: {args.download_only}") | |
682 os.makedirs(args.local_cache, exist_ok=True) | |
683 download_egapx_ftp_data(args.local_cache) | |
684 else: | |
685 print(f"Download only to {args.local_cache}") | |
686 return 0 | |
687 else: | |
688 print("Local cache not set") | |
689 return 1 | |
690 else: | |
691 # Check that input and output set | |
692 if not args.filename or not args.output: | |
693 print("Input file and output directory must be set") | |
694 return 1 | |
695 | |
696 packaged_distro = bool(getattr(sys, '_MEIPASS', '')) | |
697 script_directory = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__))) | |
698 config_file = get_config(script_directory, args) | |
699 if not config_file: | |
700 return 1 | |
701 | |
702 # Check for workdir, set if not set, and manage last used workdir | |
703 if not manage_workdir(args): | |
704 return 1 | |
705 | |
706 files_to_delete = [] | |
707 | |
708 # Read default task parameters into a dict | |
709 task_params = yaml.safe_load(open(Path(script_directory) / 'assets' / 'default_task_params.yaml', 'r')) | |
710 run_inputs = repackage_inputs(yaml.safe_load(open(args.filename, 'r'))) | |
711 | |
712 if not expand_and_validate_params(run_inputs): | |
713 return 1 | |
714 | |
715 # Command line overrides manifest input | |
716 if args.output: | |
717 run_inputs['output'] = args.output | |
718 | |
719 # Convert inputs to absolute paths | |
720 convert_paths(run_inputs) | |
721 | |
722 # Create output directory if needed | |
723 os.makedirs(run_inputs['output'], exist_ok=True) | |
724 | |
725 if args.summary_only: | |
726 print_statistics(run_inputs['output']) | |
727 return 0 | |
728 | |
729 # Reformat reads into pairs in fromPairs format and add reads_metadata.tsv file | |
730 prepare_reads(run_inputs) | |
731 | |
732 | |
733 ##if True or args.download_only: | |
734 ## with open("./dumped_input.yaml", 'w') as f: | |
735 ## yaml.dump(run_inputs, f) | |
736 ## f.flush() | |
737 ##return 0 | |
738 | |
739 # Add to default task parameters, if input file has some task parameters they will override the default | |
740 task_params = merge_params(task_params, run_inputs) | |
741 | |
742 # Move output from YAML file to arguments to have more transparent Nextflow log | |
743 output = task_params['output'] | |
744 del task_params['output'] | |
745 | |
746 if args.func_name: | |
747 task_params['func_name'] = args.func_name | |
748 | |
749 # Run nextflow process | |
750 if args.verbosity >= VERBOSITY_VERBOSE: | |
751 task_params['verbose'] = True | |
752 print("Nextflow inputs:") | |
753 print(yaml.dump(task_params)) | |
754 if 'reads_metadata' in run_inputs['input']: | |
755 print("Reads metadata:") | |
756 with open(run_inputs['input']['reads_metadata'], 'r') as f: | |
757 print(f.read()) | |
758 if packaged_distro: | |
759 main_nf = Path(script_directory) / 'nf' / 'ui.nf' | |
760 else: | |
761 main_nf = Path(script_directory) / '..' / 'nf' / 'ui.nf' | |
762 nf_cmd = ["nextflow", "-C", config_file, "-log", f"{output}/nextflow.log", "run", main_nf, "--output", output] | |
763 if args.stub_run: | |
764 nf_cmd += ["-stub-run", "-profile", "stubrun"] | |
765 if args.report: | |
766 nf_cmd += ["-with-report", f"{args.report}.report.html", "-with-timeline", f"{args.report}.timeline.html"] | |
767 else: | |
768 nf_cmd += ["-with-report", f"{output}/run.report.html", "-with-timeline", f"{output}/run.timeline.html"] | |
769 | |
770 nf_cmd += ["-with-trace", f"{output}/run.trace.txt"] | |
771 # if output directory does not exist, it will be created | |
772 if not os.path.exists(output): | |
773 os.makedirs(output) | |
774 params_file = Path(output) / "run_params.yaml" | |
775 nf_cmd += ["-params-file", str(params_file)] | |
776 | |
777 if args.dry_run: | |
778 print(" ".join(map(str, nf_cmd))) | |
779 else: | |
780 with open(params_file, 'w') as f: | |
781 yaml.dump(task_params, f) | |
782 f.flush() | |
783 if args.verbosity >= VERBOSITY_VERBOSE: | |
784 print(" ".join(map(str, nf_cmd))) | |
785 resume_file = Path(output) / "resume.sh" | |
786 with open(resume_file, 'w') as f: | |
787 f.write("#!/bin/bash\n") | |
788 f.write(" ".join(map(str, nf_cmd))) | |
789 f.write(" -resume") | |
790 if os.environ.get('NXF_WORK'): | |
791 f.write(" -work-dir " + os.environ['NXF_WORK']) | |
792 f.write("\n") | |
793 try: | |
794 subprocess.run(nf_cmd, check=True, capture_output=(args.verbosity <= VERBOSITY_QUIET), text=True) | |
795 except subprocess.CalledProcessError as e: | |
796 print(e.stderr) | |
797 print(f"To resume execution, run: sh {resume_file}") | |
798 if files_to_delete: | |
799 print(f"Don't forget to delete file(s) {' '.join(files_to_delete)}") | |
800 return 1 | |
801 if not args.dry_run and not args.stub_run: | |
802 print_statistics(output) | |
803 # TODO: Use try-finally to delete the metadata file | |
804 for f in files_to_delete: | |
805 os.unlink(f) | |
806 | |
807 return 0 | |
808 | |
809 if __name__ == "__main__": | |
810 sys.exit(main(sys.argv)) |