comparison sequence_format_converter.py @ 0:a8aacccd79a3 draft

planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/master/tools/sequence_format_converter commit d6ef80f9db43eae4f58b33f58b5ef6f8209907db
author artbio
date Mon, 04 Sep 2017 07:13:28 -0400
parents
children 9ce7ccd468aa
comparison
equal deleted inserted replaced
-1:000000000000 0:a8aacccd79a3
1 #!/usr/bin/env python
2 #
3 import argparse
4 import logging
5 import sys
6 from collections import defaultdict
7
8
9 def Parser():
10 the_parser = argparse.ArgumentParser()
11 the_parser.add_argument(
12 '--input', action="store", type=str,
13 help="input file, accepted format: fastq, fasta, fasta_weigthed, \
14 tabular")
15 the_parser.add_argument(
16 '--output', action="store", type=str, help="output converted file")
17 the_parser.add_argument(
18 '--format', action="store", type=str,
19 help="select output format (fasta, fasta_weigthed, tabular")
20 args = the_parser.parse_args()
21 return args
22
23
24 class Sequencing:
25
26 def __init__(self, input, output, format):
27 self.input = input
28 self.output = open(output, 'w')
29 self.outputformat = format
30 self.inputformat = self.detectformat(self.input)
31 self.seqdic = defaultdict(int)
32 self.read(self.input, self.inputformat)
33 self.write(self.output, self.outputformat)
34
35 def detectformat(self, input):
36 input = open(input, 'r')
37 block = []
38 reference = ['A', 'T', 'G', 'C', 'N']
39 format = ''
40 try:
41 for l in range(4):
42 block.append(input.readline()[:-1])
43 except:
44 logging.info("File hasn't at leat four lines !")
45 sys.exit("File hasn't at leat four lines !")
46 input.close()
47 line1, line2, line3, line4 = block[0], block[1], block[2], block[3]
48 if line1[0] == '>' and line3[0] == '>':
49 logging.info("'>' detected in lines 1 and 3")
50 sequence = ''.join([line2, line4]).upper()
51 nucleotides = set([base for base in sequence])
52 for nucleotide in nucleotides:
53 if nucleotide not in reference:
54 logging.info("But other nucleotides that A, T, G, C or N")
55 sys.exit('input appears to be Fasta but with \
56 unexpected nucleotides')
57 format = 'fasta'
58 elif line1[0] == '>' and line4[0] == '>':
59 logging.info("'>' detected in lines 1 and 4")
60 sequence = ''.join([line2, line3]).upper()
61 nucleotides = set([base for base in sequence])
62 for nucleotide in nucleotides:
63 if nucleotide not in reference:
64 logging.info("But other nucleotides that A, T, G, C or N")
65 sys.exit('input appears to be Fasta but with \
66 unexpected nucleotides')
67 format = 'fasta'
68 elif line1[0] == '>':
69 logging.info("'>' detected in lines 1")
70 sequence = ''.join([line2, line3, line4]).upper()
71 nucleotides = set([base for base in sequence])
72 for nucleotide in nucleotides:
73 if nucleotide not in reference:
74 logging.info("But other nucleotides that A, T, G, C or N")
75 sys.exit('input appears to be Fasta but with \
76 unexpected nucleotides')
77 format = 'fasta'
78 if format == 'fasta':
79 try:
80 for line in block:
81 if line[0] == '>':
82 int(line.split('_')[-1])
83 return 'fastaw'
84 except:
85 return 'fasta'
86 if line1[0] == '@' and line3[0] == '+':
87 nucleotides = set([base for base in line2])
88 for nucleotide in nucleotides:
89 if nucleotide not in reference:
90 logging.info("Looks like fastq input but other nucleotides \
91 that A, T, G, C or N")
92 sys.exit("input appears to be Fastq \
93 but with unexpected nucleotides")
94 return 'fastq'
95 for line in block:
96 if len(line.split('\t')) != 2:
97 logging.info("No valid format detected")
98 sys.exit('No valid format detected')
99 try:
100 int(line.split('\t')[-1])
101 except:
102 logging.info("No valid format detected")
103 sys.exit('No valid format detected')
104 for nucleotide in line.split('\t')[0]:
105 if nucleotide not in reference:
106 logging.info("No valid format detected")
107 sys.exit('No valid format detected')
108 return 'tabular'
109
110 def read(self, input, format):
111 input = open(input, 'r')
112 if format == 'fasta':
113 try:
114 self.readfasta(input)
115 except:
116 logging.info("an error occured while reading fasta")
117 elif format == 'fastaw':
118 try:
119 self.readfastaw(input)
120 except:
121 logging.info("an error occured while reading fastaw")
122 elif format == 'tabular':
123 try:
124 self.readtabular(input)
125 except:
126 logging.info("an error occured while reading tabular")
127 elif format == 'fastq':
128 try:
129 self.readfastq(input)
130 except:
131 logging.info("an error occured while reading fastq")
132 else:
133 logging.info("no valid format detected")
134 sys.exit('No valid format detected')
135
136 def readfastaw(self, input):
137 for line in input:
138 if line[0] == ">":
139 weigth = int(line[:-1].split("_")[-1])
140 else:
141 self.seqdic[line[:-1]] += weigth
142 input.close()
143
144 def readfasta(self, input):
145 ''' this method is able to read multi-line fasta sequence'''
146 for line in input:
147 if line[0] == ">":
148 try:
149 # to dump the sequence of the previous item
150 # try because of first missing stringlist variable
151 self.seqdic["".join(stringlist)] += 1
152 except NameError:
153 pass
154 stringlist = []
155 else:
156 try:
157 stringlist.append(line[:-1])
158 except UnboundLocalError:
159 # if file went through filter and contains only empty lines
160 logging.info("first line is empty.")
161 try:
162 self.seqdic["".join(stringlist)] += 1 # for the last sequence
163 except NameError:
164 logging.info("input file has not fasta sequences.")
165 input.close()
166
167 def readtabular(self, input):
168 for line in input:
169 fields = line[:-1].split('\t')
170 self.seqdic[fields[0]] += int(fields[1])
171 input.close()
172
173 def readfastq(self, input):
174 linecount = 0
175 for line in input:
176 linecount += 1
177 if linecount % 4 == 2:
178 self.seqdic[line[:-1]] += 1
179 input.close()
180
181 def write(self, output, format='fasta'):
182 if format == 'fasta':
183 headercount = 0
184 for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True):
185 for i in range(self.seqdic[seq]):
186 headercount += 1
187 output.write('>%s\n%s\n' % (headercount, seq))
188 elif format == 'fastaw':
189 headercount = 0
190 for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True):
191 headercount += 1
192 output.write('>%s_%s\n%s\n' % (headercount,
193 self.seqdic[seq], seq))
194 elif format == 'tabular':
195 for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True):
196 output.write('%s\t%s\n' % (seq, self.seqdic[seq]))
197 output.close()
198
199
200 def main(input, output, format):
201 Sequencing(input, output, format)
202
203
204 if __name__ == "__main__":
205 args = Parser()
206 log = logging.getLogger(__name__)
207 logging.basicConfig(stream=sys.stdout, level=logging.INFO)
208 main(args.input, args.output, args.format)