Mercurial > repos > peterjc > tmhmm_and_signalp
comparison tools/protein_analysis/seq_analysis_utils.py @ 0:bca9bc7fdaef
Migrated tool version 0.0.1 from old tool shed archive to new tool shed repository
author | peterjc |
---|---|
date | Tue, 07 Jun 2011 18:03:34 -0400 |
parents | |
children | f3b373a41f81 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:bca9bc7fdaef |
---|---|
1 """A few useful functions for working with FASTA files and running jobs. | |
2 | |
3 This module was originally written to hold common code used in both the TMHMM | |
4 and SignalP wrappers in Galaxy. | |
5 | |
6 Given Galaxy currently supports Python 2.4+ this cannot use the Python module | |
7 multiprocessing so the function run_jobs instead is a simple pool approach | |
8 using just the subprocess library. | |
9 """ | |
10 import sys | |
11 import os | |
12 import subprocess | |
13 from time import sleep | |
14 | |
15 __version__ = "0.0.1" | |
16 | |
17 def stop_err(msg, error_level=1): | |
18 """Print error message to stdout and quit with given error level.""" | |
19 sys.stderr.write("%s\n" % msg) | |
20 sys.exit(error_level) | |
21 | |
22 def fasta_iterator(filename, max_len=None, truncate=None): | |
23 """Simple FASTA parser yielding tuples of (title, sequence) strings.""" | |
24 handle = open(filename) | |
25 title, seq = "", "" | |
26 for line in handle: | |
27 if line.startswith(">"): | |
28 if title: | |
29 if truncate: | |
30 seq = seq[:truncate] | |
31 if max_len and len(seq) > max_len: | |
32 raise ValueError("Sequence %s is length %i, max length %i" \ | |
33 % (title.split()[0], len(seq), max_len)) | |
34 yield title, seq | |
35 title = line[1:].rstrip() | |
36 seq = "" | |
37 elif title: | |
38 seq += line.strip() | |
39 elif not line.strip() or line.startswith("#"): | |
40 #Ignore blank lines, and any comment lines | |
41 #between records (starting with hash). | |
42 pass | |
43 else: | |
44 raise ValueError("Bad FASTA line %r" % line) | |
45 handle.close() | |
46 if title: | |
47 if truncate: | |
48 seq = seq[:truncate] | |
49 if max_len and len(seq) > max_len: | |
50 raise ValueError("Sequence %s is length %i, max length %i" \ | |
51 % (title.split()[0], len(seq), max_len)) | |
52 yield title, seq | |
53 raise StopIteration | |
54 | |
55 def split_fasta(input_filename, output_filename_base, n=500, truncate=None, keep_descr=False, max_len=None): | |
56 """Split FASTA file into sub-files each of at most n sequences. | |
57 | |
58 Returns a list of the filenames used (based on the input filename). | |
59 Each sequence can also be truncated (since we only need the start for | |
60 SignalP), and have its description discarded (since we don't usually | |
61 care about it and some tools don't like very long title lines). | |
62 | |
63 If a max_len is given and any sequence exceeds it no temp files are | |
64 created and an exception is raised. | |
65 """ | |
66 iterator = fasta_iterator(input_filename, max_len, truncate) | |
67 files = [] | |
68 try: | |
69 while True: | |
70 records = [] | |
71 for i in range(n): | |
72 try: | |
73 records.append(iterator.next()) | |
74 except StopIteration: | |
75 break | |
76 if not records: | |
77 break | |
78 new_filename = "%s.%i.tmp" % (output_filename_base, len(files)) | |
79 handle = open(new_filename, "w") | |
80 if keep_descr: | |
81 for title, seq in records: | |
82 handle.write(">%s\n" % title) | |
83 for i in range(0, len(seq), 60): | |
84 handle.write(seq[i:i+60] + "\n") | |
85 else: | |
86 for title, seq in records: | |
87 handle.write(">%s\n" % title.split()[0]) | |
88 for i in range(0, len(seq), 60): | |
89 handle.write(seq[i:i+60] + "\n") | |
90 handle.close() | |
91 files.append(new_filename) | |
92 #print "%i records in %s" % (len(records), new_filename) | |
93 except ValueError, err: | |
94 #Max length failure from parser - clean up | |
95 try: | |
96 handle.close() | |
97 except: | |
98 pass | |
99 for f in files: | |
100 if os.path.isfile(f): | |
101 os.remove(f) | |
102 raise err | |
103 return files | |
104 | |
105 def run_jobs(jobs, threads): | |
106 """Takes list of cmd strings, returns dict with error levels.""" | |
107 pending = jobs[:] | |
108 running = [] | |
109 results = {} | |
110 while pending or running: | |
111 #print "%i jobs pending, %i running, %i completed" \ | |
112 # % (len(jobs), len(running), len(results)) | |
113 #See if any have finished | |
114 for (cmd, process) in running: | |
115 return_code = process.wait() | |
116 if return_code is not None: | |
117 results[cmd] = return_code | |
118 running = [(cmd, process) for (cmd, process) in running \ | |
119 if cmd not in results] | |
120 #See if we can start any new threads | |
121 while pending and len(running) < threads: | |
122 cmd = pending.pop(0) | |
123 process = subprocess.Popen(cmd, shell=True) | |
124 running.append((cmd, process)) | |
125 #Loop... | |
126 sleep(1) | |
127 #print "%i jobs completed" % len(results) | |
128 assert set(jobs) == set(results) | |
129 return results |