Mercurial > repos > galaxy-australia > alphafold2
comparison validate_fasta.py @ 1:6c92e000d684 draft
"planemo upload for repository https://github.com/usegalaxy-au/galaxy-local-tools commit a510e97ebd604a5e30b1f16e5031f62074f23e86"
author | galaxy-australia |
---|---|
date | Tue, 01 Mar 2022 02:53:05 +0000 |
parents | 7ae9d78b06f5 |
children | 04e95886cf24 |
comparison
equal
deleted
inserted
replaced
0:7ae9d78b06f5 | 1:6c92e000d684 |
---|---|
1 """Validate input FASTA sequence.""" | |
1 | 2 |
2 | 3 import re |
3 import argparse | 4 import argparse |
4 from typing import List, TextIO | 5 from typing import List, TextIO |
5 | 6 |
6 | 7 |
7 class Fasta: | 8 class Fasta: |
9 self.header = header_str | 10 self.header = header_str |
10 self.aa_seq = seq_str | 11 self.aa_seq = seq_str |
11 | 12 |
12 | 13 |
13 class FastaLoader: | 14 class FastaLoader: |
14 def __init__(self): | 15 def __init__(self, fasta_path: str): |
15 """creates a Fasta() from a file""" | 16 """Initialize from FASTA file.""" |
16 self.fastas: List[Fasta] = [] | 17 self.fastas = [] |
18 self.load(fasta_path) | |
19 print("Loaded FASTA sequences:") | |
20 for f in self.fastas: | |
21 print(f.header) | |
22 print(f.aa_seq) | |
17 | 23 |
18 def load(self, fasta_path: str): | 24 def load(self, fasta_path: str): |
19 """ | 25 """Load bare or FASTA formatted sequence.""" |
20 load function has to be very flexible. | 26 with open(fasta_path, 'r') as f: |
21 file may be normal fasta format (header, seq) or can just be a bare sequence. | 27 self.content = f.read() |
22 """ | 28 |
23 with open(fasta_path, 'r') as fp: | 29 if "__cn__" in self.content: |
24 header, sequence = self.interpret_first_line(fp) | 30 # Pasted content with escaped characters |
25 line = fp.readline().rstrip('\n') | 31 self.newline = '__cn__' |
26 | 32 self.caret = '__gt__' |
27 while line: | 33 else: |
28 if line.startswith('>'): | 34 # Uploaded file with normal content |
29 self.update_fastas(header, sequence) | 35 self.newline = '\n' |
30 header = line | 36 self.caret = '>' |
31 sequence = '' | 37 |
32 else: | 38 self.lines = self.content.split(self.newline) |
33 sequence += line | 39 header, sequence = self.interpret_first_line() |
34 line = fp.readline().rstrip('\n') | 40 |
41 i = 0 | |
42 while i < len(self.lines): | |
43 line = self.lines[i] | |
44 if line.startswith(self.caret): | |
45 self.update_fastas(header, sequence) | |
46 header = '>' + self.strip_header(line) | |
47 sequence = '' | |
48 else: | |
49 sequence += line.strip('\n ') | |
50 i += 1 | |
35 | 51 |
36 # after reading whole file, header & sequence buffers might be full | 52 # after reading whole file, header & sequence buffers might be full |
37 self.update_fastas(header, sequence) | 53 self.update_fastas(header, sequence) |
38 return self.fastas | |
39 | 54 |
40 def interpret_first_line(self, fp: TextIO): | 55 def interpret_first_line(self): |
41 header = '' | 56 line = self.lines[0] |
42 sequence = '' | 57 if line.startswith(self.caret): |
43 line = fp.readline().rstrip('\n') | 58 header = '>' + self.strip_header(line) |
44 if line.startswith('>'): | 59 return header, '' |
45 header = line | |
46 else: | 60 else: |
47 sequence += line | 61 return '', line |
48 return header, sequence | 62 |
49 | 63 def strip_header(self, line): |
64 """Strip characters escaped with underscores from pasted text.""" | |
65 return re.sub(r'\_\_.{2}\_\_', '', line).strip('>') | |
66 | |
50 def update_fastas(self, header: str, sequence: str): | 67 def update_fastas(self, header: str, sequence: str): |
51 # if we have a sequence | 68 # if we have a sequence |
52 if not sequence == '': | 69 if sequence: |
53 # create generic header if not exists | 70 # create generic header if not exists |
54 if header == '': | 71 if not header: |
55 fasta_count = len(self.fastas) | 72 fasta_count = len(self.fastas) |
56 header = f'>sequence_{fasta_count}' | 73 header = f'>sequence_{fasta_count}' |
57 | 74 |
58 # create new Fasta | 75 # Create new Fasta |
59 self.fastas.append(Fasta(header, sequence)) | 76 self.fastas.append(Fasta(header, sequence)) |
60 | 77 |
61 | 78 |
62 class FastaValidator: | 79 class FastaValidator: |
63 def __init__(self, fasta_list: List[Fasta]): | 80 def __init__(self, fasta_list: List[Fasta]): |
64 self.fasta_list = fasta_list | 81 self.fasta_list = fasta_list |
65 self.min_length = 30 | 82 self.min_length = 30 |
66 self.max_length = 2000 | 83 self.max_length = 2000 |
67 self.iupac_characters = { | 84 self.iupac_characters = { |
68 'A', 'B', 'C', 'D', 'E', 'F', 'G', | 85 'A', 'B', 'C', 'D', 'E', 'F', 'G', |
69 'H', 'I', 'K', 'L', 'M', 'N', 'P', | 86 'H', 'I', 'K', 'L', 'M', 'N', 'P', |
70 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', | 87 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', |
71 'Y', 'Z', '-' | 88 'Y', 'Z', '-' |
72 } | 89 } |
73 | 90 |
74 def validate(self): | 91 def validate(self): |
75 """performs fasta validation""" | 92 """performs fasta validation""" |
76 self.validate_num_seqs() | 93 self.validate_num_seqs() |
77 self.validate_length() | 94 self.validate_length() |
78 self.validate_alphabet() | 95 self.validate_alphabet() |
79 # not checking for 'X' nucleotides at the moment. | 96 # not checking for 'X' nucleotides at the moment. |
80 # alphafold can throw an error if it doesn't like it. | 97 # alphafold can throw an error if it doesn't like it. |
81 #self.validate_x() | 98 #self.validate_x() |
82 | 99 |
83 def validate_num_seqs(self) -> None: | 100 def validate_num_seqs(self) -> None: |
84 if len(self.fasta_list) > 1: | 101 if len(self.fasta_list) > 1: |
85 raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input') | 102 raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input') |
86 elif len(self.fasta_list) == 0: | 103 elif len(self.fasta_list) == 0: |
91 fasta = self.fasta_list[0] | 108 fasta = self.fasta_list[0] |
92 if len(fasta.aa_seq) < self.min_length: | 109 if len(fasta.aa_seq) < self.min_length: |
93 raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa') | 110 raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa') |
94 if len(fasta.aa_seq) > self.max_length: | 111 if len(fasta.aa_seq) > self.max_length: |
95 raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa') | 112 raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa') |
96 | 113 |
97 def validate_alphabet(self): | 114 def validate_alphabet(self): |
98 """ | 115 """ |
99 Confirms whether the sequence conforms to IUPAC codes. | 116 Confirms whether the sequence conforms to IUPAC codes. |
100 If not, reports the offending character and its position. | 117 If not, reports the offending character and its position. |
101 """ | 118 """ |
102 fasta = self.fasta_list[0] | 119 fasta = self.fasta_list[0] |
103 for i, char in enumerate(fasta.aa_seq.upper()): | 120 for i, char in enumerate(fasta.aa_seq.upper()): |
104 if char not in self.iupac_characters: | 121 if char not in self.iupac_characters: |
105 raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: {char}') | 122 raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: "{char}"') |
106 | 123 |
107 def validate_x(self): | 124 def validate_x(self): |
108 """checks if any bases are X. TODO check whether alphafold accepts X bases. """ | 125 """checks if any bases are X. TODO check whether alphafold accepts X bases. """ |
109 fasta = self.fasta_list[0] | 126 fasta = self.fasta_list[0] |
110 for i, char in enumerate(fasta.aa_seq.upper()): | 127 for i, char in enumerate(fasta.aa_seq.upper()): |
111 if char == 'X': | 128 if char == 'X': |
112 raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}') | 129 raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}') |
113 | 130 |
132 | 149 |
133 | 150 |
134 def main(): | 151 def main(): |
135 # load fasta file | 152 # load fasta file |
136 args = parse_args() | 153 args = parse_args() |
137 fl = FastaLoader() | 154 fas = FastaLoader(args.input_fasta) |
138 fastas = fl.load(args.input_fasta) | |
139 | 155 |
140 # validate | 156 # validate |
141 fv = FastaValidator(fastas) | 157 fv = FastaValidator(fas.fastas) |
142 fv.validate() | 158 fv.validate() |
143 | 159 |
144 # write cleaned version | 160 # write cleaned version |
145 fw = FastaWriter() | 161 fw = FastaWriter() |
146 fw.write(fastas[0]) | 162 fw.write(fas.fastas[0]) |
147 | 163 |
148 | 164 |
149 def parse_args() -> argparse.Namespace: | 165 def parse_args() -> argparse.Namespace: |
150 parser = argparse.ArgumentParser() | 166 parser = argparse.ArgumentParser() |
151 parser.add_argument( | 167 parser.add_argument( |
152 "input_fasta", | 168 "input_fasta", |
153 help="input fasta file", | 169 help="input fasta file", |
154 type=str | 170 type=str |
155 ) | 171 ) |
156 return parser.parse_args() | 172 return parser.parse_args() |
157 | 173 |
158 | 174 |
159 | 175 |
160 if __name__ == '__main__': | 176 if __name__ == '__main__': |