Mercurial > repos > galaxy-australia > alphafold2
view scripts/validate_fasta.py @ 22:3f188450ca4f draft default tip
planemo upload for repository https://github.com/usegalaxy-au/tools-au commit d626bb28203543a70d3fc60d662cb054bc3cef7c
author | galaxy-australia |
---|---|
date | Wed, 30 Oct 2024 21:46:34 +0000 |
parents | 2f7702fd0a4c |
children |
line wrap: on
line source
"""Validate input FASTA sequence.""" import argparse import re import sys from typing import List DEFAULT_MAX_SEQUENCE_COUNT = 10 STRIP_SEQUENCE_CHARS = ['\n', '\r', '\t', ' '] class Fasta: def __init__(self, header_str: str, seq_str: str): self.header = header_str self.aa_seq = seq_str class FastaLoader: def __init__(self, fasta_path: str): """Initialize from FASTA file.""" self.fastas = [] self.load(fasta_path) def load(self, fasta_path: str): """Load bare or FASTA formatted sequence.""" with open(fasta_path, 'r') as f: self.content = f.read() if "__cn__" in self.content: # Pasted content with escaped characters self.newline = '__cn__' self.read_caret = '__gt__' else: # Uploaded file with normal content self.newline = '\n' self.read_caret = '>' self.lines = self.content.split(self.newline) if not self.lines[0].startswith(self.read_caret): # Fasta is headless, load as single sequence self.update_fastas( '', ''.join(self.lines) ) else: header = None sequence = None for line in self.lines: if line.startswith(self.read_caret): if header: self.update_fastas(header, sequence) header = '>' + self.strip_header(line) sequence = '' else: sequence += line.strip('\n ') self.update_fastas(header, sequence) def strip_header(self, line): """Strip characters escaped with underscores from pasted text.""" return re.sub(r'\_\_.{2}\_\_', '', line).strip('>') def update_fastas(self, header: str, sequence: str): # if we have a sequence if sequence: # create generic header if not exists if not header: fasta_count = len(self.fastas) header = f'>sequence_{fasta_count}' for char in STRIP_SEQUENCE_CHARS: sequence = sequence.replace(char, '') # Create new Fasta self.fastas.append(Fasta(header, sequence)) class FastaValidator: def __init__( self, min_length=None, max_length=None, multiple=False, max_sequence_count=None, ): self.multiple = multiple self.min_length = min_length self.max_length = max_length self.iupac_characters = { 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', '-' } self.max_sequence_count = ( max_sequence_count or DEFAULT_MAX_SEQUENCE_COUNT) def validate(self, fasta_list: List[Fasta]): """Perform FASTA validation.""" self.fasta_list = fasta_list self.validate_num_seqs() self.validate_length() self.validate_alphabet() # not checking for 'X' nucleotides at the moment. # alphafold can throw an error if it doesn't like it. # self.validate_x() return self.fasta_list def validate_num_seqs(self) -> None: """Assert that only one sequence has been provided.""" fasta_count = len(self.fasta_list) if self.multiple: if fasta_count < 2: raise ValueError( 'Error encountered validating FASTA:\n' 'Multimer mode requires multiple input sequence.' f' Only {fasta_count} sequences were detected in' ' the provided file.') elif fasta_count > self.max_sequence_count: raise ValueError( f'WARNING: detected {fasta_count} sequences but the' f' maximum allowed is {self.max_sequence_count}' ' sequences.') else: if fasta_count > 1: sys.stderr.write( 'WARNING: More than 1 sequence detected.' ' Using first FASTA sequence as input.\n') self.fasta_list = self.fasta_list[:1] elif len(self.fasta_list) == 0: raise ValueError( 'Error encountered validating FASTA:\n' ' no FASTA sequences detected in input file.') def validate_length(self): """Confirm whether sequence length is valid.""" fasta = self.fasta_list[0] if self.min_length: if len(fasta.aa_seq) < self.min_length: raise ValueError( 'Error encountered validating FASTA:\n Sequence too short' f' ({len(fasta.aa_seq)}AA).' f' Minimum length is {self.min_length}AA.') if self.max_length: if len(fasta.aa_seq) > self.max_length: raise ValueError( 'Error encountered validating FASTA:\n' f' Sequence too long ({len(fasta.aa_seq)}AA).' f' Maximum length is {self.max_length}AA.') def validate_alphabet(self): """Confirm whether the sequence conforms to IUPAC codes. If not, report the offending character and its position. """ fasta = self.fasta_list[0] for i, char in enumerate(fasta.aa_seq.upper()): if char not in self.iupac_characters: raise ValueError( 'Error encountered validating FASTA:\n Invalid amino acid' f' found at pos {i}: "{char}"') def validate_x(self): """Check for X bases.""" fasta = self.fasta_list[0] for i, char in enumerate(fasta.aa_seq.upper()): if char == 'X': raise ValueError( 'Error encountered validating FASTA:\n Unsupported AA code' f' "X" found at pos {i}') class FastaWriter: def __init__(self) -> None: self.line_wrap = 60 def write(self, fasta: Fasta): header = fasta.header seq = self.format_sequence(fasta.aa_seq) sys.stdout.write(header + '\n') sys.stdout.write(seq) def format_sequence(self, aa_seq: str): formatted_seq = '' for i in range(0, len(aa_seq), self.line_wrap): formatted_seq += aa_seq[i: i + self.line_wrap] + '\n' return formatted_seq.upper() def main(): # load fasta file try: args = parse_args() fas = FastaLoader(args.input) # validate fv = FastaValidator( min_length=args.min_length, max_length=args.max_length, multiple=args.multimer, max_sequence_count=args.max_sequence_count, ) clean_fastas = fv.validate(fas.fastas) # write clean data fw = FastaWriter() for fas in clean_fastas: fw.write(fas) sys.stderr.write("Validated FASTA sequence(s):\n\n") for fas in clean_fastas: sys.stderr.write(fas.header + '\n') sys.stderr.write(fas.aa_seq + '\n\n') except ValueError as exc: sys.stderr.write(f"{exc}\n\n") raise exc except Exception as exc: sys.stderr.write( "Input error: FASTA input is invalid. Please check your input.\n\n" ) raise exc def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument( "input", help="input fasta file", type=str ) parser.add_argument( "--min_length", dest='min_length', help="Minimum length of input protein sequence (AA)", default=None, type=int, ) parser.add_argument( "--max_length", dest='max_length', help="Maximum length of input protein sequence (AA)", default=None, type=int, ) parser.add_argument( "--max-sequences", dest='max_sequence_count', help="Maximum number of input sequences", default=None, type=int, ) parser.add_argument( "--multimer", action='store_true', help="Require multiple input sequences", ) return parser.parse_args() if __name__ == '__main__': main()