comparison validate_fasta.py @ 0:7ae9d78b06f5 draft

"planemo upload for repository https://github.com/usegalaxy-au/galaxy-local-tools commit 7b79778448363aa8c9b14604337e81009e461bd2-dirty"
author galaxy-australia
date Fri, 28 Jan 2022 04:56:29 +0000
parents
children 6c92e000d684
comparison
equal deleted inserted replaced
-1:000000000000 0:7ae9d78b06f5
1
2
3 import argparse
4 from typing import List, TextIO
5
6
7 class Fasta:
8 def __init__(self, header_str: str, seq_str: str):
9 self.header = header_str
10 self.aa_seq = seq_str
11
12
13 class FastaLoader:
14 def __init__(self):
15 """creates a Fasta() from a file"""
16 self.fastas: List[Fasta] = []
17
18 def load(self, fasta_path: str):
19 """
20 load function has to be very flexible.
21 file may be normal fasta format (header, seq) or can just be a bare sequence.
22 """
23 with open(fasta_path, 'r') as fp:
24 header, sequence = self.interpret_first_line(fp)
25 line = fp.readline().rstrip('\n')
26
27 while line:
28 if line.startswith('>'):
29 self.update_fastas(header, sequence)
30 header = line
31 sequence = ''
32 else:
33 sequence += line
34 line = fp.readline().rstrip('\n')
35
36 # after reading whole file, header & sequence buffers might be full
37 self.update_fastas(header, sequence)
38 return self.fastas
39
40 def interpret_first_line(self, fp: TextIO):
41 header = ''
42 sequence = ''
43 line = fp.readline().rstrip('\n')
44 if line.startswith('>'):
45 header = line
46 else:
47 sequence += line
48 return header, sequence
49
50 def update_fastas(self, header: str, sequence: str):
51 # if we have a sequence
52 if not sequence == '':
53 # create generic header if not exists
54 if header == '':
55 fasta_count = len(self.fastas)
56 header = f'>sequence_{fasta_count}'
57
58 # create new Fasta
59 self.fastas.append(Fasta(header, sequence))
60
61
62 class FastaValidator:
63 def __init__(self, fasta_list: List[Fasta]):
64 self.fasta_list = fasta_list
65 self.min_length = 30
66 self.max_length = 2000
67 self.iupac_characters = {
68 'A', 'B', 'C', 'D', 'E', 'F', 'G',
69 'H', 'I', 'K', 'L', 'M', 'N', 'P',
70 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X',
71 'Y', 'Z', '-'
72 }
73
74 def validate(self):
75 """performs fasta validation"""
76 self.validate_num_seqs()
77 self.validate_length()
78 self.validate_alphabet()
79 # not checking for 'X' nucleotides at the moment.
80 # alphafold can throw an error if it doesn't like it.
81 #self.validate_x()
82
83 def validate_num_seqs(self) -> None:
84 if len(self.fasta_list) > 1:
85 raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input')
86 elif len(self.fasta_list) == 0:
87 raise Exception(f'Error encountered validating fasta: input file has no fasta sequences')
88
89 def validate_length(self):
90 """Confirms whether sequence length is valid. """
91 fasta = self.fasta_list[0]
92 if len(fasta.aa_seq) < self.min_length:
93 raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa')
94 if len(fasta.aa_seq) > self.max_length:
95 raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa')
96
97 def validate_alphabet(self):
98 """
99 Confirms whether the sequence conforms to IUPAC codes.
100 If not, reports the offending character and its position.
101 """
102 fasta = self.fasta_list[0]
103 for i, char in enumerate(fasta.aa_seq.upper()):
104 if char not in self.iupac_characters:
105 raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: {char}')
106
107 def validate_x(self):
108 """checks if any bases are X. TODO check whether alphafold accepts X bases. """
109 fasta = self.fasta_list[0]
110 for i, char in enumerate(fasta.aa_seq.upper()):
111 if char == 'X':
112 raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}')
113
114
115 class FastaWriter:
116 def __init__(self) -> None:
117 self.outfile = 'alphafold.fasta'
118 self.formatted_line_len = 60
119
120 def write(self, fasta: Fasta):
121 with open(self.outfile, 'w') as fp:
122 header = fasta.header
123 seq = self.format_sequence(fasta.aa_seq)
124 fp.write(header + '\n')
125 fp.write(seq + '\n')
126
127 def format_sequence(self, aa_seq: str):
128 formatted_seq = ''
129 for i in range(0, len(aa_seq), self.formatted_line_len):
130 formatted_seq += aa_seq[i: i + self.formatted_line_len] + '\n'
131 return formatted_seq
132
133
134 def main():
135 # load fasta file
136 args = parse_args()
137 fl = FastaLoader()
138 fastas = fl.load(args.input_fasta)
139
140 # validate
141 fv = FastaValidator(fastas)
142 fv.validate()
143
144 # write cleaned version
145 fw = FastaWriter()
146 fw.write(fastas[0])
147
148
149 def parse_args() -> argparse.Namespace:
150 parser = argparse.ArgumentParser()
151 parser.add_argument(
152 "input_fasta",
153 help="input fasta file",
154 type=str
155 )
156 return parser.parse_args()
157
158
159
160 if __name__ == '__main__':
161 main()