Mercurial > repos > artbio > sequence_format_converter
comparison sequence_format_converter.py @ 0:a8aacccd79a3 draft
planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/master/tools/sequence_format_converter commit d6ef80f9db43eae4f58b33f58b5ef6f8209907db
author | artbio |
---|---|
date | Mon, 04 Sep 2017 07:13:28 -0400 |
parents | |
children | 9ce7ccd468aa |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:a8aacccd79a3 |
---|---|
1 #!/usr/bin/env python | |
2 # | |
3 import argparse | |
4 import logging | |
5 import sys | |
6 from collections import defaultdict | |
7 | |
8 | |
9 def Parser(): | |
10 the_parser = argparse.ArgumentParser() | |
11 the_parser.add_argument( | |
12 '--input', action="store", type=str, | |
13 help="input file, accepted format: fastq, fasta, fasta_weigthed, \ | |
14 tabular") | |
15 the_parser.add_argument( | |
16 '--output', action="store", type=str, help="output converted file") | |
17 the_parser.add_argument( | |
18 '--format', action="store", type=str, | |
19 help="select output format (fasta, fasta_weigthed, tabular") | |
20 args = the_parser.parse_args() | |
21 return args | |
22 | |
23 | |
24 class Sequencing: | |
25 | |
26 def __init__(self, input, output, format): | |
27 self.input = input | |
28 self.output = open(output, 'w') | |
29 self.outputformat = format | |
30 self.inputformat = self.detectformat(self.input) | |
31 self.seqdic = defaultdict(int) | |
32 self.read(self.input, self.inputformat) | |
33 self.write(self.output, self.outputformat) | |
34 | |
35 def detectformat(self, input): | |
36 input = open(input, 'r') | |
37 block = [] | |
38 reference = ['A', 'T', 'G', 'C', 'N'] | |
39 format = '' | |
40 try: | |
41 for l in range(4): | |
42 block.append(input.readline()[:-1]) | |
43 except: | |
44 logging.info("File hasn't at leat four lines !") | |
45 sys.exit("File hasn't at leat four lines !") | |
46 input.close() | |
47 line1, line2, line3, line4 = block[0], block[1], block[2], block[3] | |
48 if line1[0] == '>' and line3[0] == '>': | |
49 logging.info("'>' detected in lines 1 and 3") | |
50 sequence = ''.join([line2, line4]).upper() | |
51 nucleotides = set([base for base in sequence]) | |
52 for nucleotide in nucleotides: | |
53 if nucleotide not in reference: | |
54 logging.info("But other nucleotides that A, T, G, C or N") | |
55 sys.exit('input appears to be Fasta but with \ | |
56 unexpected nucleotides') | |
57 format = 'fasta' | |
58 elif line1[0] == '>' and line4[0] == '>': | |
59 logging.info("'>' detected in lines 1 and 4") | |
60 sequence = ''.join([line2, line3]).upper() | |
61 nucleotides = set([base for base in sequence]) | |
62 for nucleotide in nucleotides: | |
63 if nucleotide not in reference: | |
64 logging.info("But other nucleotides that A, T, G, C or N") | |
65 sys.exit('input appears to be Fasta but with \ | |
66 unexpected nucleotides') | |
67 format = 'fasta' | |
68 elif line1[0] == '>': | |
69 logging.info("'>' detected in lines 1") | |
70 sequence = ''.join([line2, line3, line4]).upper() | |
71 nucleotides = set([base for base in sequence]) | |
72 for nucleotide in nucleotides: | |
73 if nucleotide not in reference: | |
74 logging.info("But other nucleotides that A, T, G, C or N") | |
75 sys.exit('input appears to be Fasta but with \ | |
76 unexpected nucleotides') | |
77 format = 'fasta' | |
78 if format == 'fasta': | |
79 try: | |
80 for line in block: | |
81 if line[0] == '>': | |
82 int(line.split('_')[-1]) | |
83 return 'fastaw' | |
84 except: | |
85 return 'fasta' | |
86 if line1[0] == '@' and line3[0] == '+': | |
87 nucleotides = set([base for base in line2]) | |
88 for nucleotide in nucleotides: | |
89 if nucleotide not in reference: | |
90 logging.info("Looks like fastq input but other nucleotides \ | |
91 that A, T, G, C or N") | |
92 sys.exit("input appears to be Fastq \ | |
93 but with unexpected nucleotides") | |
94 return 'fastq' | |
95 for line in block: | |
96 if len(line.split('\t')) != 2: | |
97 logging.info("No valid format detected") | |
98 sys.exit('No valid format detected') | |
99 try: | |
100 int(line.split('\t')[-1]) | |
101 except: | |
102 logging.info("No valid format detected") | |
103 sys.exit('No valid format detected') | |
104 for nucleotide in line.split('\t')[0]: | |
105 if nucleotide not in reference: | |
106 logging.info("No valid format detected") | |
107 sys.exit('No valid format detected') | |
108 return 'tabular' | |
109 | |
110 def read(self, input, format): | |
111 input = open(input, 'r') | |
112 if format == 'fasta': | |
113 try: | |
114 self.readfasta(input) | |
115 except: | |
116 logging.info("an error occured while reading fasta") | |
117 elif format == 'fastaw': | |
118 try: | |
119 self.readfastaw(input) | |
120 except: | |
121 logging.info("an error occured while reading fastaw") | |
122 elif format == 'tabular': | |
123 try: | |
124 self.readtabular(input) | |
125 except: | |
126 logging.info("an error occured while reading tabular") | |
127 elif format == 'fastq': | |
128 try: | |
129 self.readfastq(input) | |
130 except: | |
131 logging.info("an error occured while reading fastq") | |
132 else: | |
133 logging.info("no valid format detected") | |
134 sys.exit('No valid format detected') | |
135 | |
136 def readfastaw(self, input): | |
137 for line in input: | |
138 if line[0] == ">": | |
139 weigth = int(line[:-1].split("_")[-1]) | |
140 else: | |
141 self.seqdic[line[:-1]] += weigth | |
142 input.close() | |
143 | |
144 def readfasta(self, input): | |
145 ''' this method is able to read multi-line fasta sequence''' | |
146 for line in input: | |
147 if line[0] == ">": | |
148 try: | |
149 # to dump the sequence of the previous item | |
150 # try because of first missing stringlist variable | |
151 self.seqdic["".join(stringlist)] += 1 | |
152 except NameError: | |
153 pass | |
154 stringlist = [] | |
155 else: | |
156 try: | |
157 stringlist.append(line[:-1]) | |
158 except UnboundLocalError: | |
159 # if file went through filter and contains only empty lines | |
160 logging.info("first line is empty.") | |
161 try: | |
162 self.seqdic["".join(stringlist)] += 1 # for the last sequence | |
163 except NameError: | |
164 logging.info("input file has not fasta sequences.") | |
165 input.close() | |
166 | |
167 def readtabular(self, input): | |
168 for line in input: | |
169 fields = line[:-1].split('\t') | |
170 self.seqdic[fields[0]] += int(fields[1]) | |
171 input.close() | |
172 | |
173 def readfastq(self, input): | |
174 linecount = 0 | |
175 for line in input: | |
176 linecount += 1 | |
177 if linecount % 4 == 2: | |
178 self.seqdic[line[:-1]] += 1 | |
179 input.close() | |
180 | |
181 def write(self, output, format='fasta'): | |
182 if format == 'fasta': | |
183 headercount = 0 | |
184 for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True): | |
185 for i in range(self.seqdic[seq]): | |
186 headercount += 1 | |
187 output.write('>%s\n%s\n' % (headercount, seq)) | |
188 elif format == 'fastaw': | |
189 headercount = 0 | |
190 for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True): | |
191 headercount += 1 | |
192 output.write('>%s_%s\n%s\n' % (headercount, | |
193 self.seqdic[seq], seq)) | |
194 elif format == 'tabular': | |
195 for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True): | |
196 output.write('%s\t%s\n' % (seq, self.seqdic[seq])) | |
197 output.close() | |
198 | |
199 | |
200 def main(input, output, format): | |
201 Sequencing(input, output, format) | |
202 | |
203 | |
204 if __name__ == "__main__": | |
205 args = Parser() | |
206 log = logging.getLogger(__name__) | |
207 logging.basicConfig(stream=sys.stdout, level=logging.INFO) | |
208 main(args.input, args.output, args.format) |