comparison validate_fasta.py @ 1:6c92e000d684 draft

"planemo upload for repository https://github.com/usegalaxy-au/galaxy-local-tools commit a510e97ebd604a5e30b1f16e5031f62074f23e86"
author galaxy-australia
date Tue, 01 Mar 2022 02:53:05 +0000
parents 7ae9d78b06f5
children 04e95886cf24
comparison
equal deleted inserted replaced
0:7ae9d78b06f5 1:6c92e000d684
1 """Validate input FASTA sequence."""
1 2
2 3 import re
3 import argparse 4 import argparse
4 from typing import List, TextIO 5 from typing import List, TextIO
5 6
6 7
7 class Fasta: 8 class Fasta:
9 self.header = header_str 10 self.header = header_str
10 self.aa_seq = seq_str 11 self.aa_seq = seq_str
11 12
12 13
13 class FastaLoader: 14 class FastaLoader:
14 def __init__(self): 15 def __init__(self, fasta_path: str):
15 """creates a Fasta() from a file""" 16 """Initialize from FASTA file."""
16 self.fastas: List[Fasta] = [] 17 self.fastas = []
18 self.load(fasta_path)
19 print("Loaded FASTA sequences:")
20 for f in self.fastas:
21 print(f.header)
22 print(f.aa_seq)
17 23
18 def load(self, fasta_path: str): 24 def load(self, fasta_path: str):
19 """ 25 """Load bare or FASTA formatted sequence."""
20 load function has to be very flexible. 26 with open(fasta_path, 'r') as f:
21 file may be normal fasta format (header, seq) or can just be a bare sequence. 27 self.content = f.read()
22 """ 28
23 with open(fasta_path, 'r') as fp: 29 if "__cn__" in self.content:
24 header, sequence = self.interpret_first_line(fp) 30 # Pasted content with escaped characters
25 line = fp.readline().rstrip('\n') 31 self.newline = '__cn__'
26 32 self.caret = '__gt__'
27 while line: 33 else:
28 if line.startswith('>'): 34 # Uploaded file with normal content
29 self.update_fastas(header, sequence) 35 self.newline = '\n'
30 header = line 36 self.caret = '>'
31 sequence = '' 37
32 else: 38 self.lines = self.content.split(self.newline)
33 sequence += line 39 header, sequence = self.interpret_first_line()
34 line = fp.readline().rstrip('\n') 40
41 i = 0
42 while i < len(self.lines):
43 line = self.lines[i]
44 if line.startswith(self.caret):
45 self.update_fastas(header, sequence)
46 header = '>' + self.strip_header(line)
47 sequence = ''
48 else:
49 sequence += line.strip('\n ')
50 i += 1
35 51
36 # after reading whole file, header & sequence buffers might be full 52 # after reading whole file, header & sequence buffers might be full
37 self.update_fastas(header, sequence) 53 self.update_fastas(header, sequence)
38 return self.fastas
39 54
40 def interpret_first_line(self, fp: TextIO): 55 def interpret_first_line(self):
41 header = '' 56 line = self.lines[0]
42 sequence = '' 57 if line.startswith(self.caret):
43 line = fp.readline().rstrip('\n') 58 header = '>' + self.strip_header(line)
44 if line.startswith('>'): 59 return header, ''
45 header = line
46 else: 60 else:
47 sequence += line 61 return '', line
48 return header, sequence 62
49 63 def strip_header(self, line):
64 """Strip characters escaped with underscores from pasted text."""
65 return re.sub(r'\_\_.{2}\_\_', '', line).strip('>')
66
50 def update_fastas(self, header: str, sequence: str): 67 def update_fastas(self, header: str, sequence: str):
51 # if we have a sequence 68 # if we have a sequence
52 if not sequence == '': 69 if sequence:
53 # create generic header if not exists 70 # create generic header if not exists
54 if header == '': 71 if not header:
55 fasta_count = len(self.fastas) 72 fasta_count = len(self.fastas)
56 header = f'>sequence_{fasta_count}' 73 header = f'>sequence_{fasta_count}'
57 74
58 # create new Fasta 75 # Create new Fasta
59 self.fastas.append(Fasta(header, sequence)) 76 self.fastas.append(Fasta(header, sequence))
60 77
61 78
62 class FastaValidator: 79 class FastaValidator:
63 def __init__(self, fasta_list: List[Fasta]): 80 def __init__(self, fasta_list: List[Fasta]):
64 self.fasta_list = fasta_list 81 self.fasta_list = fasta_list
65 self.min_length = 30 82 self.min_length = 30
66 self.max_length = 2000 83 self.max_length = 2000
67 self.iupac_characters = { 84 self.iupac_characters = {
68 'A', 'B', 'C', 'D', 'E', 'F', 'G', 85 'A', 'B', 'C', 'D', 'E', 'F', 'G',
69 'H', 'I', 'K', 'L', 'M', 'N', 'P', 86 'H', 'I', 'K', 'L', 'M', 'N', 'P',
70 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 87 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
71 'Y', 'Z', '-' 88 'Y', 'Z', '-'
72 } 89 }
73 90
74 def validate(self): 91 def validate(self):
75 """performs fasta validation""" 92 """performs fasta validation"""
76 self.validate_num_seqs() 93 self.validate_num_seqs()
77 self.validate_length() 94 self.validate_length()
78 self.validate_alphabet() 95 self.validate_alphabet()
79 # not checking for 'X' nucleotides at the moment. 96 # not checking for 'X' nucleotides at the moment.
80 # alphafold can throw an error if it doesn't like it. 97 # alphafold can throw an error if it doesn't like it.
81 #self.validate_x() 98 #self.validate_x()
82 99
83 def validate_num_seqs(self) -> None: 100 def validate_num_seqs(self) -> None:
84 if len(self.fasta_list) > 1: 101 if len(self.fasta_list) > 1:
85 raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input') 102 raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input')
86 elif len(self.fasta_list) == 0: 103 elif len(self.fasta_list) == 0:
91 fasta = self.fasta_list[0] 108 fasta = self.fasta_list[0]
92 if len(fasta.aa_seq) < self.min_length: 109 if len(fasta.aa_seq) < self.min_length:
93 raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa') 110 raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa')
94 if len(fasta.aa_seq) > self.max_length: 111 if len(fasta.aa_seq) > self.max_length:
95 raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa') 112 raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa')
96 113
97 def validate_alphabet(self): 114 def validate_alphabet(self):
98 """ 115 """
99 Confirms whether the sequence conforms to IUPAC codes. 116 Confirms whether the sequence conforms to IUPAC codes.
100 If not, reports the offending character and its position. 117 If not, reports the offending character and its position.
101 """ 118 """
102 fasta = self.fasta_list[0] 119 fasta = self.fasta_list[0]
103 for i, char in enumerate(fasta.aa_seq.upper()): 120 for i, char in enumerate(fasta.aa_seq.upper()):
104 if char not in self.iupac_characters: 121 if char not in self.iupac_characters:
105 raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: {char}') 122 raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: "{char}"')
106 123
107 def validate_x(self): 124 def validate_x(self):
108 """checks if any bases are X. TODO check whether alphafold accepts X bases. """ 125 """checks if any bases are X. TODO check whether alphafold accepts X bases. """
109 fasta = self.fasta_list[0] 126 fasta = self.fasta_list[0]
110 for i, char in enumerate(fasta.aa_seq.upper()): 127 for i, char in enumerate(fasta.aa_seq.upper()):
111 if char == 'X': 128 if char == 'X':
112 raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}') 129 raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}')
113 130
132 149
133 150
134 def main(): 151 def main():
135 # load fasta file 152 # load fasta file
136 args = parse_args() 153 args = parse_args()
137 fl = FastaLoader() 154 fas = FastaLoader(args.input_fasta)
138 fastas = fl.load(args.input_fasta)
139 155
140 # validate 156 # validate
141 fv = FastaValidator(fastas) 157 fv = FastaValidator(fas.fastas)
142 fv.validate() 158 fv.validate()
143 159
144 # write cleaned version 160 # write cleaned version
145 fw = FastaWriter() 161 fw = FastaWriter()
146 fw.write(fastas[0]) 162 fw.write(fas.fastas[0])
147 163
148 164
149 def parse_args() -> argparse.Namespace: 165 def parse_args() -> argparse.Namespace:
150 parser = argparse.ArgumentParser() 166 parser = argparse.ArgumentParser()
151 parser.add_argument( 167 parser.add_argument(
152 "input_fasta", 168 "input_fasta",
153 help="input fasta file", 169 help="input fasta file",
154 type=str 170 type=str
155 ) 171 )
156 return parser.parse_args() 172 return parser.parse_args()
157 173
158 174
159 175
160 if __name__ == '__main__': 176 if __name__ == '__main__':