Mercurial > repos > bebatut > convert_extract_sequence_file
comparison convert_extract_sequence_file.py @ 0:01c2b74b3a21 draft
planemo upload commit 23ef4b1699065b4f6200c58328bfecfb33dd7fd1-dirty
author | bebatut |
---|---|
date | Tue, 26 Apr 2016 08:18:18 -0400 |
parents | |
children | 158642ce204f |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:01c2b74b3a21 |
---|---|
1 #!/usr/bin/python | |
2 # -*- coding: utf-8 -*- | |
3 | |
4 import sys | |
5 import os | |
6 import argparse | |
7 import copy | |
8 import operator | |
9 | |
10 FASTA_FILE_LAST_POS = None | |
11 | |
12 ################# | |
13 # Parse methods # | |
14 ################# | |
15 def text_end_of_file(row): | |
16 if row == '': | |
17 return True | |
18 else: | |
19 return False | |
20 | |
21 def get_new_line(input_file, generate_error = True): | |
22 row = input_file.readline() | |
23 if text_end_of_file(row): | |
24 if generate_error : | |
25 string = os.path.basename(__file__) + ': ' | |
26 string += ' unexpected end of file' | |
27 raise ValueError(string) | |
28 else : | |
29 return None | |
30 else: | |
31 return row[:-1] | |
32 | |
33 def next_fasta_record(input_file): | |
34 global FASTA_FILE_LAST_POS | |
35 if FASTA_FILE_LAST_POS != None: | |
36 input_file.seek(FASTA_FILE_LAST_POS) | |
37 else: | |
38 FASTA_FILE_LAST_POS = input_file.tell() | |
39 | |
40 id_line = get_new_line(input_file, generate_error = False) | |
41 if id_line == None: | |
42 return None | |
43 split_line = id_line[1:].split(' ') | |
44 seq_id = split_line[0] | |
45 description = ' '.join(split_line[1:]) | |
46 new_line = get_new_line(input_file, generate_error = False) | |
47 seq = '' | |
48 while new_line != None: | |
49 if new_line[0] != '>': | |
50 seq += new_line | |
51 FASTA_FILE_LAST_POS = input_file.tell() | |
52 new_line = get_new_line(input_file, generate_error = False) | |
53 else: | |
54 new_line = None | |
55 return SeqRecord(seq_id, seq, description) | |
56 | |
57 def next_fastq_record(input_file): | |
58 id_line = get_new_line(input_file, generate_error = False) | |
59 if id_line == None: | |
60 return None | |
61 if id_line[0] != '@': | |
62 string = os.path.basename(__file__) + ': ' | |
63 string += ' issue in fastq file' | |
64 raise ValueError(string) | |
65 split_line = id_line[1:].split(' ') | |
66 seq_id = split_line[0] | |
67 description = ' '.join(split_line[1:]) | |
68 seq = get_new_line(input_file) | |
69 spacer = get_new_line(input_file) | |
70 quals = get_new_line(input_file) | |
71 return SeqRecord(seq_id, seq, description, quals) | |
72 | |
73 def next_record(input_file, file_format): | |
74 if file_format == 'fasta': | |
75 return next_fasta_record(input_file) | |
76 elif file_format == 'fastq': | |
77 return next_fastq_record(input_file) | |
78 else: | |
79 string = os.path.basename(__file__) + ': ' | |
80 string += file_format + ' is not managed' | |
81 raise ValueError(string) | |
82 | |
83 def write_fasta_record(record, output_sequence_file): | |
84 output_sequence_file.write('>' + record.get_id() + ' ' + | |
85 record.get_description() + '\n') | |
86 seq = record.get_sequence() | |
87 split_seq = [seq[i:i+60] for i in xrange(0,len(seq),60)] | |
88 for split in split_seq: | |
89 output_sequence_file.write(split + '\n') | |
90 | |
91 def format_qual_value(qual_score, sliding_value, authorized_range, qual_format): | |
92 ascii_value = ord(qual_score) | |
93 score = ascii_value-sliding_value | |
94 if score < authorized_range[0] or score > authorized_range[1]: | |
95 string = os.path.basename(__file__) + ': wrong score (' | |
96 string += str(score) + ') with quality format (' | |
97 string += qual_format | |
98 raise ValueError(string) | |
99 return score | |
100 | |
101 def format_qual_string(qual_string, qual_format): | |
102 if qual_format == 'sanger': | |
103 return format_qual_value(qual_string, 33 ,[0,40], qual_format) | |
104 elif qual_format == "solexa": | |
105 return format_qual_value(qual_string, 64 ,[-5,40], qual_format) | |
106 elif qual_format == "illumina_1_3": | |
107 return format_qual_value(qual_string, 33 ,[0,40], qual_format) | |
108 elif qual_format == "illumina_1_5": | |
109 return format_qual_value(qual_string, 33 ,[3,40], qual_format) | |
110 elif qual_format == "illumina_1_8": | |
111 return format_qual_value(qual_string, 33 ,[0,41], qual_format) | |
112 else: | |
113 string = os.path.basename(__file__) + ': quality format (' | |
114 string += qual_format + ') is not managed' | |
115 raise ValueError(string) | |
116 | |
117 def write_qual_record(record, output_qual_file, qual_format): | |
118 output_qual_file.write('>' + record.get_id() + ' ' + | |
119 record.get_description() + '\n') | |
120 qual = record.get_quality() | |
121 qual = [str(format_qual_string(qual_str,qual_format)) for qual_str in qual] | |
122 split_seq = [qual[i:i+60] for i in xrange(0,len(qual),60)] | |
123 for split in split_seq: | |
124 output_qual_file.write(' '.join(split) + '\n') | |
125 | |
126 def write_fastq_record(record, output_sequence_file): | |
127 output_sequence_file.write('@' + record.get_id() + ' ' + | |
128 record.get_description() + '\n') | |
129 output_sequence_file.write(record.get_sequence() + '\n') | |
130 output_sequence_file.write('+\n') | |
131 output_sequence_file.write(record.get_quality() + '\n') | |
132 | |
133 def write_information(record, output_file_formats, output_sequence_file, | |
134 output_qual_file, qual_format): | |
135 if "fasta" in output_file_formats: | |
136 write_fasta_record(record, output_sequence_file) | |
137 if "qual" in output_file_formats: | |
138 write_qual_record(record, output_qual_file, qual_format) | |
139 if "fastq" in output_file_formats: | |
140 write_fastq_record(record, output_sequence_file) | |
141 | |
142 def fast_test_element_in_list(element,list_to_test): | |
143 to_continue = True | |
144 i = 0 | |
145 while to_continue: | |
146 if i == len(list_to_test) or list_to_test[i] >= element: | |
147 to_continue = False | |
148 else: | |
149 i += 1 | |
150 | |
151 found = False | |
152 if i < len(list_to_test): | |
153 if list_to_test[i] == element: | |
154 found = True | |
155 | |
156 return found | |
157 | |
158 ######################### | |
159 # Constraint definition # | |
160 ######################### | |
161 constraints = { | |
162 'equal': operator.eq, | |
163 'different': operator.ne, | |
164 'lower': operator.le, | |
165 'strictly_lower': operator.lt, | |
166 'greater': operator.ge, | |
167 'strictly_greater': operator.gt, | |
168 'in': operator.contains, | |
169 'not_in': 'in' | |
170 } | |
171 | |
172 extractable_information = { | |
173 'id': str, | |
174 'length': int, | |
175 'description': str | |
176 } | |
177 | |
178 ########### | |
179 # Classes # | |
180 ########### | |
181 class SeqRecord: | |
182 | |
183 def __init__(self, seq_id, sequence, description, quality = ""): | |
184 self.id = seq_id | |
185 self.sequence = sequence | |
186 self.quality = quality | |
187 self.description = description | |
188 self.length = len(self.sequence) | |
189 | |
190 # Getters | |
191 def get_id(self): | |
192 return self.id | |
193 | |
194 def get_sequence(self): | |
195 return self.sequence | |
196 | |
197 def get_quality(self): | |
198 return self.quality | |
199 | |
200 def get_length(self): | |
201 return self.length | |
202 | |
203 def get_description(self): | |
204 return self.description | |
205 | |
206 def get(self, category): | |
207 if category == 'id': | |
208 return self.get_id() | |
209 elif category == 'length': | |
210 return self.get_length() | |
211 elif category == 'description': | |
212 return self.get_description() | |
213 else: | |
214 string = os.path.basename(__file__) + ': ' | |
215 string += category + ' can not be extracted from SeqRecord' | |
216 raise ValueError(string) | |
217 | |
218 # Other functions | |
219 def extract_information(self,to_extract): | |
220 extracted_info = [] | |
221 for info_to_extract in to_extract: | |
222 extracted_info.append(self.get(info_to_extract)) | |
223 return extracted_info | |
224 | |
225 def test_conservation(self, constraints): | |
226 to_conserve = True | |
227 for constrained_info in constraints: | |
228 record_value = self.get(constrained_info) | |
229 for constraint in constraints[constrained_info]: | |
230 to_conserve &= constraint.test_constraint(record_value) | |
231 return to_conserve | |
232 | |
233 class Records: | |
234 | |
235 def __init__(self, input_filepath, file_format, constraints): | |
236 self.records = [] | |
237 self.conserved_records = [] | |
238 with open(input_filepath, 'r') as input_file: | |
239 to_continue = True | |
240 while to_continue: | |
241 record = next_record(input_file, file_format) | |
242 if record != None: | |
243 self.records.append(record) | |
244 to_conserve = record.test_conservation(constraints) | |
245 if to_conserve: | |
246 self.conserved_records.append(copy.copy(record)) | |
247 else: | |
248 to_continue = False | |
249 | |
250 # Getters | |
251 def get_records(self): | |
252 return copy.copy(self.records) | |
253 | |
254 def get_record_nb(self): | |
255 return len(self.records) | |
256 | |
257 def get_conserved_records(self): | |
258 return copy.copy(self.conserved_records) | |
259 | |
260 def get_conserved_record_nb(self): | |
261 return len(self.conserved_records) | |
262 | |
263 # Other functions | |
264 def save_conserved_records(self,args): | |
265 if args.custom_extraction_type == 'True': | |
266 to_extract = args.to_extract[1:-1].split(',') | |
267 with open(args.output_information, 'w') as output_information_file: | |
268 output_information_file.write('\t'.join(to_extract) + '\n') | |
269 for record in self.conserved_records: | |
270 extracted_info = record.extract_information(to_extract) | |
271 string_info = [str(info) for info in extracted_info] | |
272 string = '\t'.join(string_info) | |
273 output_information_file.write(string + '\n') | |
274 else: | |
275 qual_format = None | |
276 if args.format == 'fasta': | |
277 output_file_formats = ['fasta'] | |
278 elif args.format == 'fastq': | |
279 if args.split == 'True': | |
280 output_file_formats = ['fasta','qual'] | |
281 qual_format = args.quality_format | |
282 else: | |
283 output_file_formats = ['fastq'] | |
284 | |
285 with open(args.output_sequence,'w') as output_sequence_file: | |
286 if "qual" in output_file_formats: | |
287 output_qual_file = open(args.output_quality, 'w') | |
288 else: | |
289 output_qual_file = None | |
290 for record in self.conserved_records: | |
291 write_information(record, output_file_formats, | |
292 output_sequence_file, output_qual_file, qual_format) | |
293 if "qual" in output_file_formats: | |
294 output_qual_file.close() | |
295 | |
296 class Constraint: | |
297 | |
298 def __init__(self, constraint_type, value, constrained_information): | |
299 if not constraints.has_key(constraint_type): | |
300 string = os.path.basename(__file__) + ': ' | |
301 string += constraint_type + ' is not a correct type of constraint' | |
302 raise ValueError(string) | |
303 self.raw_constraint_type = constraint_type | |
304 self.type = constraints[constraint_type] | |
305 | |
306 value_format = extractable_information[constrained_information] | |
307 if self.raw_constraint_type in ['in', 'not_in']: | |
308 self.values = [] | |
309 with open(value, 'r') as value_file: | |
310 for row in value_file.readlines(): | |
311 value = row[:-1] | |
312 self.values.append(value_format(value)) | |
313 else: | |
314 self.values = [value_format(value)] | |
315 self.values.sort() | |
316 | |
317 def get_raw_constraint_type(self): | |
318 return self.raw_constraint_type | |
319 | |
320 def get_type(self): | |
321 return self.type | |
322 | |
323 def get_values(self): | |
324 return self.values | |
325 | |
326 def test_constraint(self, similarity_info_value): | |
327 to_conserve = True | |
328 if self.raw_constraint_type == 'in': | |
329 to_conserve &= fast_test_element_in_list(similarity_info_value, | |
330 self.values) | |
331 elif self.raw_constraint_type == 'not_in': | |
332 to_conserve &= (not fast_test_element_in_list(similarity_info_value, | |
333 self.values)) | |
334 else: | |
335 to_conserve &= self.type(similarity_info_value, self.values[0]) | |
336 return to_conserve | |
337 | |
338 ################ | |
339 # Misc methods # | |
340 ################ | |
341 def test_input_filepath(input_filepath, tool, file_format): | |
342 if not os.path.exists(input_filepath): | |
343 string = os.path.basename(__file__) + ': ' | |
344 string += input_filepath + ' does not exist' | |
345 raise ValueError(string) | |
346 | |
347 def format_constraints(constraints): | |
348 formatted_constraints = {} | |
349 if constraints != None: | |
350 for constr in constraints: | |
351 split_constraint = constr.split(': ') | |
352 constrained_information = split_constraint[0] | |
353 constraint = Constraint(split_constraint[1], split_constraint[2], | |
354 constrained_information) | |
355 formatted_constraints.setdefault(constrained_information,[]).append( | |
356 constraint) | |
357 return formatted_constraints | |
358 | |
359 def convert_extract_sequence_file(args): | |
360 input_filepath = args.input | |
361 file_format = args.format | |
362 constraints = args.constraint | |
363 formatted_constraints = format_constraints(constraints) | |
364 | |
365 records = Records(input_filepath, file_format, formatted_constraints) | |
366 records.save_conserved_records(args) | |
367 | |
368 report_filepath = args.report | |
369 with open(report_filepath, 'w') as report_file: | |
370 | |
371 report_file.write('Information to extract:\n') | |
372 if args.custom_extraction_type == 'True': | |
373 for info in args.to_extract[1:-1].split(','): | |
374 report_file.write('\t' + info + '\n') | |
375 else: | |
376 report_file.write('\tsequences\n') | |
377 | |
378 if constraints != None: | |
379 report_file.write('Constraints on extraction:\n') | |
380 for constrained_info in formatted_constraints: | |
381 report_file.write('\tInfo to constraint: ' + constrained_info | |
382 + '\n') | |
383 for constraint in formatted_constraints[constrained_info]: | |
384 report_file.write('\t\tType of constraint: ' + | |
385 constraint.get_raw_constraint_type() | |
386 + '\n') | |
387 report_file.write('\t\tValues:\n') | |
388 values = constraint.get_values() | |
389 for value in values: | |
390 report_file.write('\t\t\t' + str(value) + '\n') | |
391 report_file.write('Number of similarity records: ' + | |
392 str(records.get_record_nb()) + '\n') | |
393 report_file.write('Number of extracted similarity records: ' + | |
394 str(records.get_conserved_record_nb()) + '\n') | |
395 | |
396 ######## | |
397 # Main # | |
398 ######## | |
399 if __name__ == "__main__": | |
400 parser = argparse.ArgumentParser() | |
401 parser.add_argument('--input', required=True) | |
402 parser.add_argument('--format', required=True) | |
403 parser.add_argument('--custom_extraction_type', required=True) | |
404 parser.add_argument('--to_extract') | |
405 parser.add_argument('--output_information') | |
406 parser.add_argument('--split') | |
407 parser.add_argument('--quality_format') | |
408 parser.add_argument('--output_sequence') | |
409 parser.add_argument('--output_quality') | |
410 parser.add_argument('--constraint', action='append') | |
411 parser.add_argument('--report', required=True) | |
412 args = parser.parse_args() | |
413 | |
414 convert_extract_sequence_file(args) |