Mercurial > repos > cstrittmatter > test_eurl_vtec_wgs_pt
view scripts/modules/utils.py @ 2:6837f733b4aa draft
planemo upload commit 15239f1674081ab51ab8dd75a9a40cf1bfaa93e8
author | cstrittmatter |
---|---|
date | Wed, 22 Jan 2020 09:10:12 -0500 |
parents | 965517909457 |
children | 0cbed1c0a762 |
line wrap: on
line source
import pickle import traceback import shlex import subprocess from threading import Timer import shutil import time import functools import os.path import sys import argparse def start_logger(workdir): time_str = time.strftime("%Y%m%d-%H%M%S") sys.stdout = Logger(workdir, time_str) logfile = sys.stdout.getLogFile() return logfile, time_str class Logger(object): def __init__(self, out_directory, time_str): self.logfile = os.path.join(out_directory, str('run.' + time_str + '.log')) self.terminal = sys.stdout self.log = open(self.logfile, "w") def write(self, message): self.terminal.write(message) self.log.write(message) self.log.flush() def flush(self): pass def getLogFile(self): return self.logfile def checkPrograms(programs_version_dictionary): print '\n' + 'Checking dependencies...' programs = programs_version_dictionary which_program = ['which', ''] listMissings = [] for program in programs: which_program[1] = program run_successfully, stdout, stderr = runCommandPopenCommunicate(which_program, False, None, False) if not run_successfully: listMissings.append(program + ' not found in PATH.') else: print stdout.splitlines()[0] if programs[program][0] is None: print program + ' (impossible to determine programme version) found at: ' + stdout.splitlines()[0] else: if program.endswith('.jar'): check_version = ['java', '-jar', stdout.splitlines()[0], programs[program][0]] programs[program].append(stdout.splitlines()[0]) else: check_version = [stdout.splitlines()[0], programs[program][0]] run_successfully, stdout, stderr = runCommandPopenCommunicate(check_version, False, None, False) if stdout == '': stdout = stderr if program == 'wget': version_line = stdout.splitlines()[0].split(' ', 3)[2] else: version_line = stdout.splitlines()[0].split(' ')[-1] replace_characters = ['"', 'v', 'V', '+'] for i in replace_characters: version_line = version_line.replace(i, '') print program + ' (' + version_line + ') found' if programs[program][1] == '>=': program_found_version = version_line.split('.') program_version_required = programs[program][2].split('.') if len(program_version_required) == 3: if len(program_found_version) == 2: program_found_version.append(0) else: program_found_version[2] = program_found_version[2].split('_')[0] for i in range(0, len(program_version_required)): if isinstance(program_found_version[i], (int, long)): if int(program_found_version[i]) < int(program_version_required[i]): listMissings.append('It is required ' + program + ' with version ' + programs[program][1] + ' ' + programs[program][2]) else: if version_line != programs[program][2]: listMissings.append('It is required ' + program + ' with version ' + programs[program][1] + ' ' + programs[program][2]) return listMissings def requiredPrograms(): programs_version_dictionary = {} programs_version_dictionary['rematch.py'] = ['--version', '>=', '3.2'] missingPrograms = checkPrograms(programs_version_dictionary) if len(missingPrograms) > 0: sys.exit('\n' + 'Errors:' + '\n' + '\n'.join(missingPrograms)) def general_information(logfile, version, outdir, time_str): # Check if output directory exists print '\n' + '==========> patho_typing <==========' print '\n' + 'Program start: ' + time.ctime() # Tells where the logfile will be stored print '\n' + 'LOGFILE:' print logfile # Print command print '\n' + 'COMMAND:' script_path = os.path.abspath(sys.argv[0]) print sys.executable + ' ' + script_path + ' ' + ' '.join(sys.argv[1:]) # Print directory where programme was lunch print '\n' + 'PRESENT DIRECTORY:' present_directory = os.path.abspath(os.getcwd()) print present_directory # Print program version print '\n' + 'VERSION:' scriptVersionGit(version, present_directory, script_path) # Check programms requiredPrograms() return script_path def setPATHvariable(doNotUseProvidedSoftware, script_path): path_variable = os.environ['PATH'] script_folder = os.path.dirname(script_path) # Set path to use provided softwares if not doNotUseProvidedSoftware: bowtie2 = os.path.join(script_folder, 'src', 'bowtie2-2.2.9') samtools = os.path.join(script_folder, 'src', 'samtools-1.3.1', 'bin') bcftools = os.path.join(script_folder, 'src', 'bcftools-1.3.1', 'bin') os.environ['PATH'] = str(':'.join([bowtie2, samtools, bcftools, path_variable])) # Print PATH variable print '\n' + 'PATH variable:' print os.environ['PATH'] def scriptVersionGit(version, directory, script_path): print 'Version ' + version try: os.chdir(os.path.dirname(script_path)) command = ['git', 'log', '-1', '--date=local', '--pretty=format:"%h (%H) - Commit by %cn, %cd) : %s"'] run_successfully, stdout, stderr = runCommandPopenCommunicate(command, False, 15, False) print stdout command = ['git', 'remote', 'show', 'origin'] run_successfully, stdout, stderr = runCommandPopenCommunicate(command, False, 15, False) print stdout os.chdir(directory) except: print 'HARMLESS WARNING: git command possibly not found. The GitHub repository information will not be obtained.' def runTime(start_time): end_time = time.time() time_taken = end_time - start_time hours, rest = divmod(time_taken, 3600) minutes, seconds = divmod(rest, 60) print 'Runtime :' + str(hours) + 'h:' + str(minutes) + 'm:' + str(round(seconds, 2)) + 's' return round(time_taken, 2) def timer(function, name): @functools.wraps(function) def wrapper(*args, **kwargs): print('\n' + 'RUNNING {0}\n'.format(name)) start_time = time.time() results = list(function(*args, **kwargs)) # guarantees return is a list to allow .insert() time_taken = runTime(start_time) print('END {0}'.format(name)) results.insert(0, time_taken) return results return wrapper def removeDirectory(directory): if os.path.isdir(directory): shutil.rmtree(directory) def saveVariableToPickle(variableToStore, pickleFile): with open(pickleFile, 'wb') as writer: pickle.dump(variableToStore, writer) def extractVariableFromPickle(pickleFile): with open(pickleFile, 'rb') as reader: variable = pickle.load(reader) return variable def trace_unhandled_exceptions(func): @functools.wraps(func) def wrapped_func(*args, **kwargs): try: func(*args, **kwargs) except: print 'Exception in ' + func.__name__ traceback.print_exc() return wrapped_func def kill_subprocess_Popen(subprocess_Popen, command): print 'Command run out of time: ' + str(command) subprocess_Popen.kill() def runCommandPopenCommunicate(command, shell_True, timeout_sec_None, print_comand_True): run_successfully = False if not isinstance(command, basestring): command = ' '.join(command) command = shlex.split(command) if print_comand_True: print 'Running: ' + ' '.join(command) if shell_True: command = ' '.join(command) proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) else: proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) not_killed_by_timer = True if timeout_sec_None is None: stdout, stderr = proc.communicate() else: timer = Timer(timeout_sec_None, kill_subprocess_Popen, args=(proc, command,)) timer.start() stdout, stderr = proc.communicate() timer.cancel() not_killed_by_timer = timer.isAlive() if proc.returncode == 0: run_successfully = True else: if not print_comand_True and not_killed_by_timer: print 'Running: ' + str(command) if len(stdout) > 0: print 'STDOUT' print stdout.decode("utf-8") if len(stderr) > 0: print 'STDERR' print stderr.decode("utf-8") return run_successfully, stdout, stderr def required_length(tuple_length_options, argument_name): class RequiredLength(argparse.Action): def __call__(self, parser, args, values, option_string=None): if len(values) not in tuple_length_options: msg = 'Option {argument_name} requires one of the following number of arguments: {tuple_length_options}'.format( argument_name=self.argument_name, tuple_length_options=tuple_length_options) raise argparse.ArgumentTypeError(msg) setattr(args, self.dest, values) return RequiredLength def get_sequence_information(fasta_file, length_extra_seq): sequence_dict = {} headers = {} with open(fasta_file, 'rtU') as reader: blank_line_found = False sequence_counter = 0 temp_sequence_dict = {} for line in reader: line = line.splitlines()[0] if len(line) > 0: if not blank_line_found: if line.startswith('>'): if len(temp_sequence_dict) > 0: if temp_sequence_dict.values()[0]['length'] - 2 * length_extra_seq > 0: sequence_dict[temp_sequence_dict.keys()[0]] = temp_sequence_dict.values()[0] headers[temp_sequence_dict.values()[0]['header'].lower()] = sequence_counter else: print temp_sequence_dict.values()[0]['header'] + ' sequence ignored due to length <= 0' temp_sequence_dict = {} if line[1:].lower() in headers: sys.exit('Found duplicated sequence headers') sequence_counter += 1 temp_sequence_dict[sequence_counter] = {'header': line[1:].lower(), 'sequence': '', 'length': 0} else: temp_sequence_dict[sequence_counter]['sequence'] += line.upper() temp_sequence_dict[sequence_counter]['length'] += len(line) else: sys.exit('It was found a blank line between the fasta file above line ' + line) else: blank_line_found = True if len(temp_sequence_dict) > 0: if temp_sequence_dict.values()[0]['length'] - 2 * length_extra_seq > 0: sequence_dict[temp_sequence_dict.keys()[0]] = temp_sequence_dict.values()[0] headers[temp_sequence_dict.values()[0]['header'].lower()] = sequence_counter else: print temp_sequence_dict.values()[0]['header'] + ' sequence ignored due to length <= 0' return sequence_dict, headers def simplify_sequence_dict(sequence_dict): simple_sequence_dict = {} for counter, info in sequence_dict.items(): simple_sequence_dict[info['header']] = info del simple_sequence_dict[info['header']]['header'] return simple_sequence_dict def chunkstring(string, length): return (string[0 + i:length + i] for i in range(0, len(string), length)) def clean_headers_sequences(sequence_dict): problematic_characters = ["|", " ", ",", ".", "(", ")", "'", "/", ":"] # print 'Checking if reference sequences contain ' + str(problematic_characters) + '\n' headers_changed = False new_headers = {} for i in sequence_dict: if any(x in sequence_dict[i]['header'] for x in problematic_characters): for x in problematic_characters: sequence_dict[i]['header'] = sequence_dict[i]['header'].replace(x, '_') headers_changed = True new_headers[sequence_dict[i]['header'].lower()] = i if headers_changed: print 'At least one of the those characters was found. Replacing those with _' + '\n' return sequence_dict, new_headers