Mercurial > repos > artbio > sequence_format_converter
comparison sequence_format_converter.py @ 0:a8aacccd79a3 draft
planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/master/tools/sequence_format_converter commit d6ef80f9db43eae4f58b33f58b5ef6f8209907db
| author | artbio | 
|---|---|
| date | Mon, 04 Sep 2017 07:13:28 -0400 | 
| parents | |
| children | 9ce7ccd468aa | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:a8aacccd79a3 | 
|---|---|
| 1 #!/usr/bin/env python | |
| 2 # | |
| 3 import argparse | |
| 4 import logging | |
| 5 import sys | |
| 6 from collections import defaultdict | |
| 7 | |
| 8 | |
| 9 def Parser(): | |
| 10 the_parser = argparse.ArgumentParser() | |
| 11 the_parser.add_argument( | |
| 12 '--input', action="store", type=str, | |
| 13 help="input file, accepted format: fastq, fasta, fasta_weigthed, \ | |
| 14 tabular") | |
| 15 the_parser.add_argument( | |
| 16 '--output', action="store", type=str, help="output converted file") | |
| 17 the_parser.add_argument( | |
| 18 '--format', action="store", type=str, | |
| 19 help="select output format (fasta, fasta_weigthed, tabular") | |
| 20 args = the_parser.parse_args() | |
| 21 return args | |
| 22 | |
| 23 | |
| 24 class Sequencing: | |
| 25 | |
| 26 def __init__(self, input, output, format): | |
| 27 self.input = input | |
| 28 self.output = open(output, 'w') | |
| 29 self.outputformat = format | |
| 30 self.inputformat = self.detectformat(self.input) | |
| 31 self.seqdic = defaultdict(int) | |
| 32 self.read(self.input, self.inputformat) | |
| 33 self.write(self.output, self.outputformat) | |
| 34 | |
| 35 def detectformat(self, input): | |
| 36 input = open(input, 'r') | |
| 37 block = [] | |
| 38 reference = ['A', 'T', 'G', 'C', 'N'] | |
| 39 format = '' | |
| 40 try: | |
| 41 for l in range(4): | |
| 42 block.append(input.readline()[:-1]) | |
| 43 except: | |
| 44 logging.info("File hasn't at leat four lines !") | |
| 45 sys.exit("File hasn't at leat four lines !") | |
| 46 input.close() | |
| 47 line1, line2, line3, line4 = block[0], block[1], block[2], block[3] | |
| 48 if line1[0] == '>' and line3[0] == '>': | |
| 49 logging.info("'>' detected in lines 1 and 3") | |
| 50 sequence = ''.join([line2, line4]).upper() | |
| 51 nucleotides = set([base for base in sequence]) | |
| 52 for nucleotide in nucleotides: | |
| 53 if nucleotide not in reference: | |
| 54 logging.info("But other nucleotides that A, T, G, C or N") | |
| 55 sys.exit('input appears to be Fasta but with \ | |
| 56 unexpected nucleotides') | |
| 57 format = 'fasta' | |
| 58 elif line1[0] == '>' and line4[0] == '>': | |
| 59 logging.info("'>' detected in lines 1 and 4") | |
| 60 sequence = ''.join([line2, line3]).upper() | |
| 61 nucleotides = set([base for base in sequence]) | |
| 62 for nucleotide in nucleotides: | |
| 63 if nucleotide not in reference: | |
| 64 logging.info("But other nucleotides that A, T, G, C or N") | |
| 65 sys.exit('input appears to be Fasta but with \ | |
| 66 unexpected nucleotides') | |
| 67 format = 'fasta' | |
| 68 elif line1[0] == '>': | |
| 69 logging.info("'>' detected in lines 1") | |
| 70 sequence = ''.join([line2, line3, line4]).upper() | |
| 71 nucleotides = set([base for base in sequence]) | |
| 72 for nucleotide in nucleotides: | |
| 73 if nucleotide not in reference: | |
| 74 logging.info("But other nucleotides that A, T, G, C or N") | |
| 75 sys.exit('input appears to be Fasta but with \ | |
| 76 unexpected nucleotides') | |
| 77 format = 'fasta' | |
| 78 if format == 'fasta': | |
| 79 try: | |
| 80 for line in block: | |
| 81 if line[0] == '>': | |
| 82 int(line.split('_')[-1]) | |
| 83 return 'fastaw' | |
| 84 except: | |
| 85 return 'fasta' | |
| 86 if line1[0] == '@' and line3[0] == '+': | |
| 87 nucleotides = set([base for base in line2]) | |
| 88 for nucleotide in nucleotides: | |
| 89 if nucleotide not in reference: | |
| 90 logging.info("Looks like fastq input but other nucleotides \ | |
| 91 that A, T, G, C or N") | |
| 92 sys.exit("input appears to be Fastq \ | |
| 93 but with unexpected nucleotides") | |
| 94 return 'fastq' | |
| 95 for line in block: | |
| 96 if len(line.split('\t')) != 2: | |
| 97 logging.info("No valid format detected") | |
| 98 sys.exit('No valid format detected') | |
| 99 try: | |
| 100 int(line.split('\t')[-1]) | |
| 101 except: | |
| 102 logging.info("No valid format detected") | |
| 103 sys.exit('No valid format detected') | |
| 104 for nucleotide in line.split('\t')[0]: | |
| 105 if nucleotide not in reference: | |
| 106 logging.info("No valid format detected") | |
| 107 sys.exit('No valid format detected') | |
| 108 return 'tabular' | |
| 109 | |
| 110 def read(self, input, format): | |
| 111 input = open(input, 'r') | |
| 112 if format == 'fasta': | |
| 113 try: | |
| 114 self.readfasta(input) | |
| 115 except: | |
| 116 logging.info("an error occured while reading fasta") | |
| 117 elif format == 'fastaw': | |
| 118 try: | |
| 119 self.readfastaw(input) | |
| 120 except: | |
| 121 logging.info("an error occured while reading fastaw") | |
| 122 elif format == 'tabular': | |
| 123 try: | |
| 124 self.readtabular(input) | |
| 125 except: | |
| 126 logging.info("an error occured while reading tabular") | |
| 127 elif format == 'fastq': | |
| 128 try: | |
| 129 self.readfastq(input) | |
| 130 except: | |
| 131 logging.info("an error occured while reading fastq") | |
| 132 else: | |
| 133 logging.info("no valid format detected") | |
| 134 sys.exit('No valid format detected') | |
| 135 | |
| 136 def readfastaw(self, input): | |
| 137 for line in input: | |
| 138 if line[0] == ">": | |
| 139 weigth = int(line[:-1].split("_")[-1]) | |
| 140 else: | |
| 141 self.seqdic[line[:-1]] += weigth | |
| 142 input.close() | |
| 143 | |
| 144 def readfasta(self, input): | |
| 145 ''' this method is able to read multi-line fasta sequence''' | |
| 146 for line in input: | |
| 147 if line[0] == ">": | |
| 148 try: | |
| 149 # to dump the sequence of the previous item | |
| 150 # try because of first missing stringlist variable | |
| 151 self.seqdic["".join(stringlist)] += 1 | |
| 152 except NameError: | |
| 153 pass | |
| 154 stringlist = [] | |
| 155 else: | |
| 156 try: | |
| 157 stringlist.append(line[:-1]) | |
| 158 except UnboundLocalError: | |
| 159 # if file went through filter and contains only empty lines | |
| 160 logging.info("first line is empty.") | |
| 161 try: | |
| 162 self.seqdic["".join(stringlist)] += 1 # for the last sequence | |
| 163 except NameError: | |
| 164 logging.info("input file has not fasta sequences.") | |
| 165 input.close() | |
| 166 | |
| 167 def readtabular(self, input): | |
| 168 for line in input: | |
| 169 fields = line[:-1].split('\t') | |
| 170 self.seqdic[fields[0]] += int(fields[1]) | |
| 171 input.close() | |
| 172 | |
| 173 def readfastq(self, input): | |
| 174 linecount = 0 | |
| 175 for line in input: | |
| 176 linecount += 1 | |
| 177 if linecount % 4 == 2: | |
| 178 self.seqdic[line[:-1]] += 1 | |
| 179 input.close() | |
| 180 | |
| 181 def write(self, output, format='fasta'): | |
| 182 if format == 'fasta': | |
| 183 headercount = 0 | |
| 184 for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True): | |
| 185 for i in range(self.seqdic[seq]): | |
| 186 headercount += 1 | |
| 187 output.write('>%s\n%s\n' % (headercount, seq)) | |
| 188 elif format == 'fastaw': | |
| 189 headercount = 0 | |
| 190 for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True): | |
| 191 headercount += 1 | |
| 192 output.write('>%s_%s\n%s\n' % (headercount, | |
| 193 self.seqdic[seq], seq)) | |
| 194 elif format == 'tabular': | |
| 195 for seq in sorted(self.seqdic, key=self.seqdic.get, reverse=True): | |
| 196 output.write('%s\t%s\n' % (seq, self.seqdic[seq])) | |
| 197 output.close() | |
| 198 | |
| 199 | |
| 200 def main(input, output, format): | |
| 201 Sequencing(input, output, format) | |
| 202 | |
| 203 | |
| 204 if __name__ == "__main__": | |
| 205 args = Parser() | |
| 206 log = logging.getLogger(__name__) | |
| 207 logging.basicConfig(stream=sys.stdout, level=logging.INFO) | |
| 208 main(args.input, args.output, args.format) | 
