Mercurial > repos > galaxy-australia > alphafold2
comparison validate_fasta.py @ 6:04e95886cf24 draft
"planemo upload for repository https://github.com/usegalaxy-au/tools-au commit 724a7a389c878dded1c0332f3b6e507e0c4cd52a-dirty"
author | galaxy-australia |
---|---|
date | Mon, 04 Apr 2022 01:46:22 +0000 |
parents | 6c92e000d684 |
children | eb085b3dbaf8 |
comparison
equal
deleted
inserted
replaced
5:138feebde7d3 | 6:04e95886cf24 |
---|---|
1 """Validate input FASTA sequence.""" | 1 """Validate input FASTA sequence.""" |
2 | 2 |
3 import re | 3 import re |
4 import sys | |
4 import argparse | 5 import argparse |
5 from typing import List, TextIO | 6 from typing import List, TextIO |
6 | 7 |
7 | 8 |
8 class Fasta: | 9 class Fasta: |
14 class FastaLoader: | 15 class FastaLoader: |
15 def __init__(self, fasta_path: str): | 16 def __init__(self, fasta_path: str): |
16 """Initialize from FASTA file.""" | 17 """Initialize from FASTA file.""" |
17 self.fastas = [] | 18 self.fastas = [] |
18 self.load(fasta_path) | 19 self.load(fasta_path) |
19 print("Loaded FASTA sequences:") | |
20 for f in self.fastas: | |
21 print(f.header) | |
22 print(f.aa_seq) | |
23 | 20 |
24 def load(self, fasta_path: str): | 21 def load(self, fasta_path: str): |
25 """Load bare or FASTA formatted sequence.""" | 22 """Load bare or FASTA formatted sequence.""" |
26 with open(fasta_path, 'r') as f: | 23 with open(fasta_path, 'r') as f: |
27 self.content = f.read() | 24 self.content = f.read() |
28 | 25 |
29 if "__cn__" in self.content: | 26 if "__cn__" in self.content: |
30 # Pasted content with escaped characters | 27 # Pasted content with escaped characters |
31 self.newline = '__cn__' | 28 self.newline = '__cn__' |
32 self.caret = '__gt__' | 29 self.read_caret = '__gt__' |
33 else: | 30 else: |
34 # Uploaded file with normal content | 31 # Uploaded file with normal content |
35 self.newline = '\n' | 32 self.newline = '\n' |
36 self.caret = '>' | 33 self.read_caret = '>' |
37 | 34 |
38 self.lines = self.content.split(self.newline) | 35 self.lines = self.content.split(self.newline) |
39 header, sequence = self.interpret_first_line() | 36 |
40 | 37 if not self.lines[0].startswith(self.read_caret): |
41 i = 0 | 38 # Fasta is headless, load as single sequence |
42 while i < len(self.lines): | 39 self.update_fastas( |
43 line = self.lines[i] | 40 '', ''.join(self.lines) |
44 if line.startswith(self.caret): | 41 ) |
45 self.update_fastas(header, sequence) | 42 |
46 header = '>' + self.strip_header(line) | |
47 sequence = '' | |
48 else: | |
49 sequence += line.strip('\n ') | |
50 i += 1 | |
51 | |
52 # after reading whole file, header & sequence buffers might be full | |
53 self.update_fastas(header, sequence) | |
54 | |
55 def interpret_first_line(self): | |
56 line = self.lines[0] | |
57 if line.startswith(self.caret): | |
58 header = '>' + self.strip_header(line) | |
59 return header, '' | |
60 else: | 43 else: |
61 return '', line | 44 header = None |
45 sequence = None | |
46 for line in self.lines: | |
47 if line.startswith(self.read_caret): | |
48 if header: | |
49 self.update_fastas(header, sequence) | |
50 header = '>' + self.strip_header(line) | |
51 sequence = '' | |
52 else: | |
53 sequence += line.strip('\n ') | |
54 self.update_fastas(header, sequence) | |
62 | 55 |
63 def strip_header(self, line): | 56 def strip_header(self, line): |
64 """Strip characters escaped with underscores from pasted text.""" | 57 """Strip characters escaped with underscores from pasted text.""" |
65 return re.sub(r'\_\_.{2}\_\_', '', line).strip('>') | 58 return re.sub(r'\_\_.{2}\_\_', '', line).strip('>') |
66 | 59 |
75 # Create new Fasta | 68 # Create new Fasta |
76 self.fastas.append(Fasta(header, sequence)) | 69 self.fastas.append(Fasta(header, sequence)) |
77 | 70 |
78 | 71 |
79 class FastaValidator: | 72 class FastaValidator: |
80 def __init__(self, fasta_list: List[Fasta]): | 73 def __init__( |
74 self, | |
75 fasta_list: List[Fasta], | |
76 min_length=None, | |
77 max_length=None): | |
78 self.min_length = min_length | |
79 self.max_length = max_length | |
81 self.fasta_list = fasta_list | 80 self.fasta_list = fasta_list |
82 self.min_length = 30 | |
83 self.max_length = 2000 | |
84 self.iupac_characters = { | 81 self.iupac_characters = { |
85 'A', 'B', 'C', 'D', 'E', 'F', 'G', | 82 'A', 'B', 'C', 'D', 'E', 'F', 'G', |
86 'H', 'I', 'K', 'L', 'M', 'N', 'P', | 83 'H', 'I', 'K', 'L', 'M', 'N', 'P', |
87 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', | 84 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', |
88 'Y', 'Z', '-' | 85 'Y', 'Z', '-' |
91 def validate(self): | 88 def validate(self): |
92 """performs fasta validation""" | 89 """performs fasta validation""" |
93 self.validate_num_seqs() | 90 self.validate_num_seqs() |
94 self.validate_length() | 91 self.validate_length() |
95 self.validate_alphabet() | 92 self.validate_alphabet() |
93 | |
96 # not checking for 'X' nucleotides at the moment. | 94 # not checking for 'X' nucleotides at the moment. |
97 # alphafold can throw an error if it doesn't like it. | 95 # alphafold can throw an error if it doesn't like it. |
98 #self.validate_x() | 96 # self.validate_x() |
99 | 97 |
100 def validate_num_seqs(self) -> None: | 98 def validate_num_seqs(self) -> None: |
99 """Assert that only one sequence has been provided.""" | |
101 if len(self.fasta_list) > 1: | 100 if len(self.fasta_list) > 1: |
102 raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input') | 101 raise Exception( |
102 'Error encountered validating fasta:' | |
103 f' More than 1 sequence detected ({len(self.fasta_list)}).' | |
104 ' Please use single fasta sequence as input.') | |
103 elif len(self.fasta_list) == 0: | 105 elif len(self.fasta_list) == 0: |
104 raise Exception(f'Error encountered validating fasta: input file has no fasta sequences') | 106 raise Exception( |
107 'Error encountered validating fasta:' | |
108 ' input file has no fasta sequences') | |
105 | 109 |
106 def validate_length(self): | 110 def validate_length(self): |
107 """Confirms whether sequence length is valid. """ | 111 """Confirm whether sequence length is valid.""" |
108 fasta = self.fasta_list[0] | 112 fasta = self.fasta_list[0] |
109 if len(fasta.aa_seq) < self.min_length: | 113 if self.min_length: |
110 raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa') | 114 if len(fasta.aa_seq) < self.min_length: |
111 if len(fasta.aa_seq) > self.max_length: | 115 raise Exception( |
112 raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa') | 116 'Error encountered validating fasta: Sequence too short' |
117 f' ({len(fasta.aa_seq)}AA).' | |
118 f' Minimum length is {self.min_length}AA.') | |
119 if self.max_length: | |
120 if len(fasta.aa_seq) > self.max_length: | |
121 raise Exception( | |
122 'Error encountered validating fasta:' | |
123 f' Sequence too long ({len(fasta.aa_seq)}AA).' | |
124 f' Maximum length is {self.max_length}AA.') | |
113 | 125 |
114 def validate_alphabet(self): | 126 def validate_alphabet(self): |
115 """ | 127 """ |
116 Confirms whether the sequence conforms to IUPAC codes. | 128 Confirm whether the sequence conforms to IUPAC codes. |
117 If not, reports the offending character and its position. | 129 If not, report the offending character and its position. |
118 """ | 130 """ |
119 fasta = self.fasta_list[0] | 131 fasta = self.fasta_list[0] |
120 for i, char in enumerate(fasta.aa_seq.upper()): | 132 for i, char in enumerate(fasta.aa_seq.upper()): |
121 if char not in self.iupac_characters: | 133 if char not in self.iupac_characters: |
122 raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: "{char}"') | 134 raise Exception( |
135 'Error encountered validating fasta: Invalid amino acid' | |
136 f' found at pos {i}: "{char}"') | |
123 | 137 |
124 def validate_x(self): | 138 def validate_x(self): |
125 """checks if any bases are X. TODO check whether alphafold accepts X bases. """ | 139 """Check for X bases.""" |
126 fasta = self.fasta_list[0] | 140 fasta = self.fasta_list[0] |
127 for i, char in enumerate(fasta.aa_seq.upper()): | 141 for i, char in enumerate(fasta.aa_seq.upper()): |
128 if char == 'X': | 142 if char == 'X': |
129 raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}') | 143 raise Exception( |
144 'Error encountered validating fasta: Unsupported AA code' | |
145 f' "X" found at pos {i}') | |
130 | 146 |
131 | 147 |
132 class FastaWriter: | 148 class FastaWriter: |
133 def __init__(self) -> None: | 149 def __init__(self) -> None: |
134 self.outfile = 'alphafold.fasta' | 150 self.line_wrap = 60 |
135 self.formatted_line_len = 60 | |
136 | 151 |
137 def write(self, fasta: Fasta): | 152 def write(self, fasta: Fasta): |
138 with open(self.outfile, 'w') as fp: | 153 header = fasta.header |
139 header = fasta.header | 154 seq = self.format_sequence(fasta.aa_seq) |
140 seq = self.format_sequence(fasta.aa_seq) | 155 sys.stdout.write(header + '\n') |
141 fp.write(header + '\n') | 156 sys.stdout.write(seq) |
142 fp.write(seq + '\n') | |
143 | 157 |
144 def format_sequence(self, aa_seq: str): | 158 def format_sequence(self, aa_seq: str): |
145 formatted_seq = '' | 159 formatted_seq = '' |
146 for i in range(0, len(aa_seq), self.formatted_line_len): | 160 for i in range(0, len(aa_seq), self.line_wrap): |
147 formatted_seq += aa_seq[i: i + self.formatted_line_len] + '\n' | 161 formatted_seq += aa_seq[i: i + self.line_wrap] + '\n' |
148 return formatted_seq | 162 return formatted_seq |
149 | 163 |
150 | 164 |
151 def main(): | 165 def main(): |
152 # load fasta file | 166 # load fasta file |
153 args = parse_args() | 167 args = parse_args() |
154 fas = FastaLoader(args.input_fasta) | 168 fas = FastaLoader(args.input) |
155 | 169 |
156 # validate | 170 # validate |
157 fv = FastaValidator(fas.fastas) | 171 fv = FastaValidator( |
172 fas.fastas, | |
173 min_length=args.min_length, | |
174 max_length=args.max_length, | |
175 ) | |
158 fv.validate() | 176 fv.validate() |
159 | 177 |
160 # write cleaned version | 178 # write cleaned version |
161 fw = FastaWriter() | 179 fw = FastaWriter() |
162 fw.write(fas.fastas[0]) | 180 fw.write(fas.fastas[0]) |
163 | 181 |
164 | 182 |
165 def parse_args() -> argparse.Namespace: | 183 def parse_args() -> argparse.Namespace: |
166 parser = argparse.ArgumentParser() | 184 parser = argparse.ArgumentParser() |
167 parser.add_argument( | 185 parser.add_argument( |
168 "input_fasta", | 186 "input", |
169 help="input fasta file", | 187 help="input fasta file", |
170 type=str | 188 type=str |
171 ) | 189 ) |
190 parser.add_argument( | |
191 "--min_length", | |
192 dest='min_length', | |
193 help="Minimum length of input protein sequence (AA)", | |
194 default=None, | |
195 type=int, | |
196 ) | |
197 parser.add_argument( | |
198 "--max_length", | |
199 dest='max_length', | |
200 help="Maximum length of input protein sequence (AA)", | |
201 default=None, | |
202 type=int, | |
203 ) | |
172 return parser.parse_args() | 204 return parser.parse_args() |
173 | |
174 | 205 |
175 | 206 |
176 if __name__ == '__main__': | 207 if __name__ == '__main__': |
177 main() | 208 main() |