Mercurial > repos > peterjc > tmhmm_and_signalp
diff tools/protein_analysis/seq_analysis_utils.py @ 0:bca9bc7fdaef
Migrated tool version 0.0.1 from old tool shed archive to new tool shed repository
author | peterjc |
---|---|
date | Tue, 07 Jun 2011 18:03:34 -0400 |
parents | |
children | f3b373a41f81 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/protein_analysis/seq_analysis_utils.py Tue Jun 07 18:03:34 2011 -0400 @@ -0,0 +1,129 @@ +"""A few useful functions for working with FASTA files and running jobs. + +This module was originally written to hold common code used in both the TMHMM +and SignalP wrappers in Galaxy. + +Given Galaxy currently supports Python 2.4+ this cannot use the Python module +multiprocessing so the function run_jobs instead is a simple pool approach +using just the subprocess library. +""" +import sys +import os +import subprocess +from time import sleep + +__version__ = "0.0.1" + +def stop_err(msg, error_level=1): + """Print error message to stdout and quit with given error level.""" + sys.stderr.write("%s\n" % msg) + sys.exit(error_level) + +def fasta_iterator(filename, max_len=None, truncate=None): + """Simple FASTA parser yielding tuples of (title, sequence) strings.""" + handle = open(filename) + title, seq = "", "" + for line in handle: + if line.startswith(">"): + if title: + if truncate: + seq = seq[:truncate] + if max_len and len(seq) > max_len: + raise ValueError("Sequence %s is length %i, max length %i" \ + % (title.split()[0], len(seq), max_len)) + yield title, seq + title = line[1:].rstrip() + seq = "" + elif title: + seq += line.strip() + elif not line.strip() or line.startswith("#"): + #Ignore blank lines, and any comment lines + #between records (starting with hash). + pass + else: + raise ValueError("Bad FASTA line %r" % line) + handle.close() + if title: + if truncate: + seq = seq[:truncate] + if max_len and len(seq) > max_len: + raise ValueError("Sequence %s is length %i, max length %i" \ + % (title.split()[0], len(seq), max_len)) + yield title, seq + raise StopIteration + +def split_fasta(input_filename, output_filename_base, n=500, truncate=None, keep_descr=False, max_len=None): + """Split FASTA file into sub-files each of at most n sequences. + + Returns a list of the filenames used (based on the input filename). + Each sequence can also be truncated (since we only need the start for + SignalP), and have its description discarded (since we don't usually + care about it and some tools don't like very long title lines). + + If a max_len is given and any sequence exceeds it no temp files are + created and an exception is raised. + """ + iterator = fasta_iterator(input_filename, max_len, truncate) + files = [] + try: + while True: + records = [] + for i in range(n): + try: + records.append(iterator.next()) + except StopIteration: + break + if not records: + break + new_filename = "%s.%i.tmp" % (output_filename_base, len(files)) + handle = open(new_filename, "w") + if keep_descr: + for title, seq in records: + handle.write(">%s\n" % title) + for i in range(0, len(seq), 60): + handle.write(seq[i:i+60] + "\n") + else: + for title, seq in records: + handle.write(">%s\n" % title.split()[0]) + for i in range(0, len(seq), 60): + handle.write(seq[i:i+60] + "\n") + handle.close() + files.append(new_filename) + #print "%i records in %s" % (len(records), new_filename) + except ValueError, err: + #Max length failure from parser - clean up + try: + handle.close() + except: + pass + for f in files: + if os.path.isfile(f): + os.remove(f) + raise err + return files + +def run_jobs(jobs, threads): + """Takes list of cmd strings, returns dict with error levels.""" + pending = jobs[:] + running = [] + results = {} + while pending or running: + #print "%i jobs pending, %i running, %i completed" \ + # % (len(jobs), len(running), len(results)) + #See if any have finished + for (cmd, process) in running: + return_code = process.wait() + if return_code is not None: + results[cmd] = return_code + running = [(cmd, process) for (cmd, process) in running \ + if cmd not in results] + #See if we can start any new threads + while pending and len(running) < threads: + cmd = pending.pop(0) + process = subprocess.Popen(cmd, shell=True) + running.append((cmd, process)) + #Loop... + sleep(1) + #print "%i jobs completed" % len(results) + assert set(jobs) == set(results) + return results