Mercurial > repos > peterjc > tmhmm_and_signalp
diff tools/protein_analysis/seq_analysis_utils.py @ 19:f3ecd80850e2 draft
v0.2.9 Python style improvements
author | peterjc |
---|---|
date | Wed, 01 Feb 2017 09:46:42 -0500 |
parents | eb6ac44d4b8e |
children | a19b3ded8f33 |
line wrap: on
line diff
--- a/tools/protein_analysis/seq_analysis_utils.py Tue Sep 01 09:56:36 2015 -0400 +++ b/tools/protein_analysis/seq_analysis_utils.py Wed Feb 01 09:46:42 2017 -0500 @@ -12,17 +12,12 @@ import subprocess from time import sleep -__version__ = "0.0.1" - -def sys_exit(msg, error_level=1): - """Print error message to stdout and quit with given error level.""" - sys.stderr.write("%s\n" % msg) - sys.exit(error_level) +__version__ = "0.0.2" try: from multiprocessing import cpu_count except ImportError: - #Must be under Python 2.5, this is copied from multiprocessing: + # Must be under Python 2.5, this is copied from multiprocessing: def cpu_count(): """Returns the number of CPUs in the system.""" if sys.platform == 'win32': @@ -54,18 +49,18 @@ def thread_count(command_line_arg, default=1): try: num = int(command_line_arg) - except: + except ValueError: num = default if num < 1: - sys_exit("Threads argument %r is not a positive integer" % command_line_arg) - #Cap this with the pysical limit of the machine, + sys.exit("Threads argument %r is not a positive integer" % command_line_arg) + # Cap this with the pysical limit of the machine, try: num = min(num, cpu_count()) except NotImplementedError: pass - #For debugging, - #hostname = os.environ.get("HOSTNAME", "this machine") - #sys.stderr.write("Using %i cores on %s\n" % (num, hostname)) + # For debugging, + # hostname = os.environ.get("HOSTNAME", "this machine") + # sys.stderr.write("Using %i cores on %s\n" % (num, hostname)) return num @@ -79,7 +74,7 @@ if truncate: seq = seq[:truncate] if max_len and len(seq) > max_len: - raise ValueError("Sequence %s is length %i, max length %i" \ + raise ValueError("Sequence %s is length %i, max length %i" % (title.split()[0], len(seq), max_len)) yield title, seq title = line[1:].rstrip() @@ -87,8 +82,8 @@ elif title: seq += line.strip() elif not line.strip() or line.startswith("#"): - #Ignore blank lines, and any comment lines - #between records (starting with hash). + # Ignore blank lines, and any comment lines + # between records (starting with hash). pass else: handle.close() @@ -98,11 +93,12 @@ if truncate: seq = seq[:truncate] if max_len and len(seq) > max_len: - raise ValueError("Sequence %s is length %i, max length %i" \ + raise ValueError("Sequence %s is length %i, max length %i" % (title.split()[0], len(seq), max_len)) yield title, seq raise StopIteration + def split_fasta(input_filename, output_filename_base, n=500, truncate=None, keep_descr=False, max_len=None): """Split FASTA file into sub-files each of at most n sequences. @@ -132,20 +128,20 @@ for title, seq in records: handle.write(">%s\n" % title) for i in range(0, len(seq), 60): - handle.write(seq[i:i+60] + "\n") + handle.write(seq[i:i + 60] + "\n") else: for title, seq in records: handle.write(">%s\n" % title.split()[0]) for i in range(0, len(seq), 60): - handle.write(seq[i:i+60] + "\n") + handle.write(seq[i:i + 60] + "\n") handle.close() files.append(new_filename) - #print "%i records in %s" % (len(records), new_filename) + # print "%i records in %s" % (len(records), new_filename) except ValueError, err: - #Max length failure from parser - clean up + # Max length failure from parser - clean up try: handle.close() - except: + except Exception: pass for f in files: if os.path.isfile(f): @@ -155,35 +151,36 @@ assert os.path.isfile(f), "Missing split file %r (!??)" % f return files + def run_jobs(jobs, threads, pause=10, verbose=False): """Takes list of cmd strings, returns dict with error levels.""" pending = jobs[:] running = [] results = {} if threads == 1: - #Special case this for speed, don't need the waits + # Special case this for speed, don't need the waits for cmd in jobs: results[cmd] = subprocess.call(cmd, shell=True) return results while pending or running: - #See if any have finished + # See if any have finished for (cmd, process) in running: - return_code = process.poll() #non-blocking + return_code = process.poll() # non-blocking if return_code is not None: results[cmd] = return_code - running = [(cmd, process) for (cmd, process) in running \ + running = [(cmd, process) for (cmd, process) in running if cmd not in results] if verbose: print "%i jobs pending, %i running, %i completed" \ % (len(pending), len(running), len(results)) - #See if we can start any new threads + # See if we can start any new threads while pending and len(running) < threads: cmd = pending.pop(0) if verbose: print cmd process = subprocess.Popen(cmd, shell=True) running.append((cmd, process)) - #Loop... + # Loop... sleep(pause) if verbose: print "%i jobs completed" % len(results)