Mercurial > repos > bebatut > convert_extract_sequence_file
diff convert_extract_sequence_file.py @ 0:01c2b74b3a21 draft
planemo upload commit 23ef4b1699065b4f6200c58328bfecfb33dd7fd1-dirty
author | bebatut |
---|---|
date | Tue, 26 Apr 2016 08:18:18 -0400 |
parents | |
children | 158642ce204f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/convert_extract_sequence_file.py Tue Apr 26 08:18:18 2016 -0400 @@ -0,0 +1,414 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +import sys +import os +import argparse +import copy +import operator + +FASTA_FILE_LAST_POS = None + +################# +# Parse methods # +################# +def text_end_of_file(row): + if row == '': + return True + else: + return False + +def get_new_line(input_file, generate_error = True): + row = input_file.readline() + if text_end_of_file(row): + if generate_error : + string = os.path.basename(__file__) + ': ' + string += ' unexpected end of file' + raise ValueError(string) + else : + return None + else: + return row[:-1] + +def next_fasta_record(input_file): + global FASTA_FILE_LAST_POS + if FASTA_FILE_LAST_POS != None: + input_file.seek(FASTA_FILE_LAST_POS) + else: + FASTA_FILE_LAST_POS = input_file.tell() + + id_line = get_new_line(input_file, generate_error = False) + if id_line == None: + return None + split_line = id_line[1:].split(' ') + seq_id = split_line[0] + description = ' '.join(split_line[1:]) + new_line = get_new_line(input_file, generate_error = False) + seq = '' + while new_line != None: + if new_line[0] != '>': + seq += new_line + FASTA_FILE_LAST_POS = input_file.tell() + new_line = get_new_line(input_file, generate_error = False) + else: + new_line = None + return SeqRecord(seq_id, seq, description) + +def next_fastq_record(input_file): + id_line = get_new_line(input_file, generate_error = False) + if id_line == None: + return None + if id_line[0] != '@': + string = os.path.basename(__file__) + ': ' + string += ' issue in fastq file' + raise ValueError(string) + split_line = id_line[1:].split(' ') + seq_id = split_line[0] + description = ' '.join(split_line[1:]) + seq = get_new_line(input_file) + spacer = get_new_line(input_file) + quals = get_new_line(input_file) + return SeqRecord(seq_id, seq, description, quals) + +def next_record(input_file, file_format): + if file_format == 'fasta': + return next_fasta_record(input_file) + elif file_format == 'fastq': + return next_fastq_record(input_file) + else: + string = os.path.basename(__file__) + ': ' + string += file_format + ' is not managed' + raise ValueError(string) + +def write_fasta_record(record, output_sequence_file): + output_sequence_file.write('>' + record.get_id() + ' ' + + record.get_description() + '\n') + seq = record.get_sequence() + split_seq = [seq[i:i+60] for i in xrange(0,len(seq),60)] + for split in split_seq: + output_sequence_file.write(split + '\n') + +def format_qual_value(qual_score, sliding_value, authorized_range, qual_format): + ascii_value = ord(qual_score) + score = ascii_value-sliding_value + if score < authorized_range[0] or score > authorized_range[1]: + string = os.path.basename(__file__) + ': wrong score (' + string += str(score) + ') with quality format (' + string += qual_format + raise ValueError(string) + return score + +def format_qual_string(qual_string, qual_format): + if qual_format == 'sanger': + return format_qual_value(qual_string, 33 ,[0,40], qual_format) + elif qual_format == "solexa": + return format_qual_value(qual_string, 64 ,[-5,40], qual_format) + elif qual_format == "illumina_1_3": + return format_qual_value(qual_string, 33 ,[0,40], qual_format) + elif qual_format == "illumina_1_5": + return format_qual_value(qual_string, 33 ,[3,40], qual_format) + elif qual_format == "illumina_1_8": + return format_qual_value(qual_string, 33 ,[0,41], qual_format) + else: + string = os.path.basename(__file__) + ': quality format (' + string += qual_format + ') is not managed' + raise ValueError(string) + +def write_qual_record(record, output_qual_file, qual_format): + output_qual_file.write('>' + record.get_id() + ' ' + + record.get_description() + '\n') + qual = record.get_quality() + qual = [str(format_qual_string(qual_str,qual_format)) for qual_str in qual] + split_seq = [qual[i:i+60] for i in xrange(0,len(qual),60)] + for split in split_seq: + output_qual_file.write(' '.join(split) + '\n') + +def write_fastq_record(record, output_sequence_file): + output_sequence_file.write('@' + record.get_id() + ' ' + + record.get_description() + '\n') + output_sequence_file.write(record.get_sequence() + '\n') + output_sequence_file.write('+\n') + output_sequence_file.write(record.get_quality() + '\n') + +def write_information(record, output_file_formats, output_sequence_file, + output_qual_file, qual_format): + if "fasta" in output_file_formats: + write_fasta_record(record, output_sequence_file) + if "qual" in output_file_formats: + write_qual_record(record, output_qual_file, qual_format) + if "fastq" in output_file_formats: + write_fastq_record(record, output_sequence_file) + +def fast_test_element_in_list(element,list_to_test): + to_continue = True + i = 0 + while to_continue: + if i == len(list_to_test) or list_to_test[i] >= element: + to_continue = False + else: + i += 1 + + found = False + if i < len(list_to_test): + if list_to_test[i] == element: + found = True + + return found + +######################### +# Constraint definition # +######################### +constraints = { + 'equal': operator.eq, + 'different': operator.ne, + 'lower': operator.le, + 'strictly_lower': operator.lt, + 'greater': operator.ge, + 'strictly_greater': operator.gt, + 'in': operator.contains, + 'not_in': 'in' +} + +extractable_information = { + 'id': str, + 'length': int, + 'description': str +} + +########### +# Classes # +########### +class SeqRecord: + + def __init__(self, seq_id, sequence, description, quality = ""): + self.id = seq_id + self.sequence = sequence + self.quality = quality + self.description = description + self.length = len(self.sequence) + + # Getters + def get_id(self): + return self.id + + def get_sequence(self): + return self.sequence + + def get_quality(self): + return self.quality + + def get_length(self): + return self.length + + def get_description(self): + return self.description + + def get(self, category): + if category == 'id': + return self.get_id() + elif category == 'length': + return self.get_length() + elif category == 'description': + return self.get_description() + else: + string = os.path.basename(__file__) + ': ' + string += category + ' can not be extracted from SeqRecord' + raise ValueError(string) + + # Other functions + def extract_information(self,to_extract): + extracted_info = [] + for info_to_extract in to_extract: + extracted_info.append(self.get(info_to_extract)) + return extracted_info + + def test_conservation(self, constraints): + to_conserve = True + for constrained_info in constraints: + record_value = self.get(constrained_info) + for constraint in constraints[constrained_info]: + to_conserve &= constraint.test_constraint(record_value) + return to_conserve + +class Records: + + def __init__(self, input_filepath, file_format, constraints): + self.records = [] + self.conserved_records = [] + with open(input_filepath, 'r') as input_file: + to_continue = True + while to_continue: + record = next_record(input_file, file_format) + if record != None: + self.records.append(record) + to_conserve = record.test_conservation(constraints) + if to_conserve: + self.conserved_records.append(copy.copy(record)) + else: + to_continue = False + + # Getters + def get_records(self): + return copy.copy(self.records) + + def get_record_nb(self): + return len(self.records) + + def get_conserved_records(self): + return copy.copy(self.conserved_records) + + def get_conserved_record_nb(self): + return len(self.conserved_records) + + # Other functions + def save_conserved_records(self,args): + if args.custom_extraction_type == 'True': + to_extract = args.to_extract[1:-1].split(',') + with open(args.output_information, 'w') as output_information_file: + output_information_file.write('\t'.join(to_extract) + '\n') + for record in self.conserved_records: + extracted_info = record.extract_information(to_extract) + string_info = [str(info) for info in extracted_info] + string = '\t'.join(string_info) + output_information_file.write(string + '\n') + else: + qual_format = None + if args.format == 'fasta': + output_file_formats = ['fasta'] + elif args.format == 'fastq': + if args.split == 'True': + output_file_formats = ['fasta','qual'] + qual_format = args.quality_format + else: + output_file_formats = ['fastq'] + + with open(args.output_sequence,'w') as output_sequence_file: + if "qual" in output_file_formats: + output_qual_file = open(args.output_quality, 'w') + else: + output_qual_file = None + for record in self.conserved_records: + write_information(record, output_file_formats, + output_sequence_file, output_qual_file, qual_format) + if "qual" in output_file_formats: + output_qual_file.close() + +class Constraint: + + def __init__(self, constraint_type, value, constrained_information): + if not constraints.has_key(constraint_type): + string = os.path.basename(__file__) + ': ' + string += constraint_type + ' is not a correct type of constraint' + raise ValueError(string) + self.raw_constraint_type = constraint_type + self.type = constraints[constraint_type] + + value_format = extractable_information[constrained_information] + if self.raw_constraint_type in ['in', 'not_in']: + self.values = [] + with open(value, 'r') as value_file: + for row in value_file.readlines(): + value = row[:-1] + self.values.append(value_format(value)) + else: + self.values = [value_format(value)] + self.values.sort() + + def get_raw_constraint_type(self): + return self.raw_constraint_type + + def get_type(self): + return self.type + + def get_values(self): + return self.values + + def test_constraint(self, similarity_info_value): + to_conserve = True + if self.raw_constraint_type == 'in': + to_conserve &= fast_test_element_in_list(similarity_info_value, + self.values) + elif self.raw_constraint_type == 'not_in': + to_conserve &= (not fast_test_element_in_list(similarity_info_value, + self.values)) + else: + to_conserve &= self.type(similarity_info_value, self.values[0]) + return to_conserve + +################ +# Misc methods # +################ +def test_input_filepath(input_filepath, tool, file_format): + if not os.path.exists(input_filepath): + string = os.path.basename(__file__) + ': ' + string += input_filepath + ' does not exist' + raise ValueError(string) + +def format_constraints(constraints): + formatted_constraints = {} + if constraints != None: + for constr in constraints: + split_constraint = constr.split(': ') + constrained_information = split_constraint[0] + constraint = Constraint(split_constraint[1], split_constraint[2], + constrained_information) + formatted_constraints.setdefault(constrained_information,[]).append( + constraint) + return formatted_constraints + +def convert_extract_sequence_file(args): + input_filepath = args.input + file_format = args.format + constraints = args.constraint + formatted_constraints = format_constraints(constraints) + + records = Records(input_filepath, file_format, formatted_constraints) + records.save_conserved_records(args) + + report_filepath = args.report + with open(report_filepath, 'w') as report_file: + + report_file.write('Information to extract:\n') + if args.custom_extraction_type == 'True': + for info in args.to_extract[1:-1].split(','): + report_file.write('\t' + info + '\n') + else: + report_file.write('\tsequences\n') + + if constraints != None: + report_file.write('Constraints on extraction:\n') + for constrained_info in formatted_constraints: + report_file.write('\tInfo to constraint: ' + constrained_info + + '\n') + for constraint in formatted_constraints[constrained_info]: + report_file.write('\t\tType of constraint: ' + + constraint.get_raw_constraint_type() + + '\n') + report_file.write('\t\tValues:\n') + values = constraint.get_values() + for value in values: + report_file.write('\t\t\t' + str(value) + '\n') + report_file.write('Number of similarity records: ' + + str(records.get_record_nb()) + '\n') + report_file.write('Number of extracted similarity records: ' + + str(records.get_conserved_record_nb()) + '\n') + +######## +# Main # +######## +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument('--input', required=True) + parser.add_argument('--format', required=True) + parser.add_argument('--custom_extraction_type', required=True) + parser.add_argument('--to_extract') + parser.add_argument('--output_information') + parser.add_argument('--split') + parser.add_argument('--quality_format') + parser.add_argument('--output_sequence') + parser.add_argument('--output_quality') + parser.add_argument('--constraint', action='append') + parser.add_argument('--report', required=True) + args = parser.parse_args() + + convert_extract_sequence_file(args)