Mercurial > repos > galaxy-australia > alphafold2
comparison validate_fasta.py @ 0:7ae9d78b06f5 draft
"planemo upload for repository https://github.com/usegalaxy-au/galaxy-local-tools commit 7b79778448363aa8c9b14604337e81009e461bd2-dirty"
| author | galaxy-australia |
|---|---|
| date | Fri, 28 Jan 2022 04:56:29 +0000 |
| parents | |
| children | 6c92e000d684 |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:7ae9d78b06f5 |
|---|---|
| 1 | |
| 2 | |
| 3 import argparse | |
| 4 from typing import List, TextIO | |
| 5 | |
| 6 | |
| 7 class Fasta: | |
| 8 def __init__(self, header_str: str, seq_str: str): | |
| 9 self.header = header_str | |
| 10 self.aa_seq = seq_str | |
| 11 | |
| 12 | |
| 13 class FastaLoader: | |
| 14 def __init__(self): | |
| 15 """creates a Fasta() from a file""" | |
| 16 self.fastas: List[Fasta] = [] | |
| 17 | |
| 18 def load(self, fasta_path: str): | |
| 19 """ | |
| 20 load function has to be very flexible. | |
| 21 file may be normal fasta format (header, seq) or can just be a bare sequence. | |
| 22 """ | |
| 23 with open(fasta_path, 'r') as fp: | |
| 24 header, sequence = self.interpret_first_line(fp) | |
| 25 line = fp.readline().rstrip('\n') | |
| 26 | |
| 27 while line: | |
| 28 if line.startswith('>'): | |
| 29 self.update_fastas(header, sequence) | |
| 30 header = line | |
| 31 sequence = '' | |
| 32 else: | |
| 33 sequence += line | |
| 34 line = fp.readline().rstrip('\n') | |
| 35 | |
| 36 # after reading whole file, header & sequence buffers might be full | |
| 37 self.update_fastas(header, sequence) | |
| 38 return self.fastas | |
| 39 | |
| 40 def interpret_first_line(self, fp: TextIO): | |
| 41 header = '' | |
| 42 sequence = '' | |
| 43 line = fp.readline().rstrip('\n') | |
| 44 if line.startswith('>'): | |
| 45 header = line | |
| 46 else: | |
| 47 sequence += line | |
| 48 return header, sequence | |
| 49 | |
| 50 def update_fastas(self, header: str, sequence: str): | |
| 51 # if we have a sequence | |
| 52 if not sequence == '': | |
| 53 # create generic header if not exists | |
| 54 if header == '': | |
| 55 fasta_count = len(self.fastas) | |
| 56 header = f'>sequence_{fasta_count}' | |
| 57 | |
| 58 # create new Fasta | |
| 59 self.fastas.append(Fasta(header, sequence)) | |
| 60 | |
| 61 | |
| 62 class FastaValidator: | |
| 63 def __init__(self, fasta_list: List[Fasta]): | |
| 64 self.fasta_list = fasta_list | |
| 65 self.min_length = 30 | |
| 66 self.max_length = 2000 | |
| 67 self.iupac_characters = { | |
| 68 'A', 'B', 'C', 'D', 'E', 'F', 'G', | |
| 69 'H', 'I', 'K', 'L', 'M', 'N', 'P', | |
| 70 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', | |
| 71 'Y', 'Z', '-' | |
| 72 } | |
| 73 | |
| 74 def validate(self): | |
| 75 """performs fasta validation""" | |
| 76 self.validate_num_seqs() | |
| 77 self.validate_length() | |
| 78 self.validate_alphabet() | |
| 79 # not checking for 'X' nucleotides at the moment. | |
| 80 # alphafold can throw an error if it doesn't like it. | |
| 81 #self.validate_x() | |
| 82 | |
| 83 def validate_num_seqs(self) -> None: | |
| 84 if len(self.fasta_list) > 1: | |
| 85 raise Exception(f'Error encountered validating fasta: More than 1 sequence detected ({len(self.fasta_list)}). Please use single fasta sequence as input') | |
| 86 elif len(self.fasta_list) == 0: | |
| 87 raise Exception(f'Error encountered validating fasta: input file has no fasta sequences') | |
| 88 | |
| 89 def validate_length(self): | |
| 90 """Confirms whether sequence length is valid. """ | |
| 91 fasta = self.fasta_list[0] | |
| 92 if len(fasta.aa_seq) < self.min_length: | |
| 93 raise Exception(f'Error encountered validating fasta: Sequence too short ({len(fasta.aa_seq)}aa). Must be > 30aa') | |
| 94 if len(fasta.aa_seq) > self.max_length: | |
| 95 raise Exception(f'Error encountered validating fasta: Sequence too long ({len(fasta.aa_seq)}aa). Must be < 2000aa') | |
| 96 | |
| 97 def validate_alphabet(self): | |
| 98 """ | |
| 99 Confirms whether the sequence conforms to IUPAC codes. | |
| 100 If not, reports the offending character and its position. | |
| 101 """ | |
| 102 fasta = self.fasta_list[0] | |
| 103 for i, char in enumerate(fasta.aa_seq.upper()): | |
| 104 if char not in self.iupac_characters: | |
| 105 raise Exception(f'Error encountered validating fasta: Invalid amino acid found at pos {i}: {char}') | |
| 106 | |
| 107 def validate_x(self): | |
| 108 """checks if any bases are X. TODO check whether alphafold accepts X bases. """ | |
| 109 fasta = self.fasta_list[0] | |
| 110 for i, char in enumerate(fasta.aa_seq.upper()): | |
| 111 if char == 'X': | |
| 112 raise Exception(f'Error encountered validating fasta: Unsupported aa code "X" found at pos {i}') | |
| 113 | |
| 114 | |
| 115 class FastaWriter: | |
| 116 def __init__(self) -> None: | |
| 117 self.outfile = 'alphafold.fasta' | |
| 118 self.formatted_line_len = 60 | |
| 119 | |
| 120 def write(self, fasta: Fasta): | |
| 121 with open(self.outfile, 'w') as fp: | |
| 122 header = fasta.header | |
| 123 seq = self.format_sequence(fasta.aa_seq) | |
| 124 fp.write(header + '\n') | |
| 125 fp.write(seq + '\n') | |
| 126 | |
| 127 def format_sequence(self, aa_seq: str): | |
| 128 formatted_seq = '' | |
| 129 for i in range(0, len(aa_seq), self.formatted_line_len): | |
| 130 formatted_seq += aa_seq[i: i + self.formatted_line_len] + '\n' | |
| 131 return formatted_seq | |
| 132 | |
| 133 | |
| 134 def main(): | |
| 135 # load fasta file | |
| 136 args = parse_args() | |
| 137 fl = FastaLoader() | |
| 138 fastas = fl.load(args.input_fasta) | |
| 139 | |
| 140 # validate | |
| 141 fv = FastaValidator(fastas) | |
| 142 fv.validate() | |
| 143 | |
| 144 # write cleaned version | |
| 145 fw = FastaWriter() | |
| 146 fw.write(fastas[0]) | |
| 147 | |
| 148 | |
| 149 def parse_args() -> argparse.Namespace: | |
| 150 parser = argparse.ArgumentParser() | |
| 151 parser.add_argument( | |
| 152 "input_fasta", | |
| 153 help="input fasta file", | |
| 154 type=str | |
| 155 ) | |
| 156 return parser.parse_args() | |
| 157 | |
| 158 | |
| 159 | |
| 160 if __name__ == '__main__': | |
| 161 main() |
