view xmfa2gff3.py @ 5:ce795616bd9c draft default tip

"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/progressivemauve commit fcddceef74e33d544e239d1036467f65ef11767f"
author iuc
date Sat, 27 Nov 2021 12:13:25 +0000
parents 97a43bcbf44d
children
line wrap: on
line source

#!/usr/bin/env python
import argparse
import logging
import sys

from BCBio import GFF
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqFeature import (
    FeatureLocation,
    SeqFeature
)
from Bio.SeqRecord import SeqRecord

logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)


# Patch bcbio gff to work around url encoding issue. This is clearly
# sub-optimal but we should transition to the newer library.
def _new_format_keyvals(self, keyvals):
    return ";".join(["%s=%s" % (k, ",".join(v)) for (k, v) in sorted(keyvals.items())])


GFF.GFFOutput.GFF3Writer._format_keyvals = _new_format_keyvals


def parse_xmfa(xmfa):
    """Simple XMFA parser until https://github.com/biopython/biopython/pull/544
    """
    current_lcb = []
    current_seq = {}
    for line in xmfa.readlines():
        if line.startswith('#'):
            continue

        if line.strip() == '=':
            if 'id' in current_seq:
                current_lcb.append(current_seq)
                current_seq = {}
            yield current_lcb
            current_lcb = []
        else:
            line = line.strip()
            if line.startswith('>'):
                if 'id' in current_seq:
                    current_lcb.append(current_seq)
                    current_seq = {}
                data = line.strip().split()
                id, loc = data[1].split(':')
                start, end = loc.split('-')
                current_seq = {
                    'rid': '_'.join(data[1:]),
                    'id': id,
                    'start': int(start),
                    'end': int(end),
                    'strand': 1 if data[2] == '+' else -1,
                    'seq': ''
                }
            else:
                current_seq['seq'] += line.strip()


def _percent_identity(a, b):
    """Calculate % identity, ignoring gaps in the host sequence
    """
    match = 0
    mismatch = 0
    for char_a, char_b in zip(list(a), list(b)):
        if char_a == '-':
            continue
        if char_a == char_b:
            match += 1
        else:
            mismatch += 1

    if match + mismatch == 0:
        return 0
    return 100 * float(match) / (match + mismatch)


def _id_tn_dict(sequences):
    """Figure out sequence IDs
    """
    label_convert = {}
    if sequences is not None:
        if len(sequences) == 1:
            for i, record in enumerate(SeqIO.parse(sequences[0], 'fasta')):
                label_convert[str(i + 1)] = record.id
        else:
            for i, sequence in enumerate(sequences):
                for record in SeqIO.parse(sequence, 'fasta'):
                    label_convert[str(i + 1)] = record.id
                    continue
    return label_convert


def convert_xmfa_to_gff3(xmfa_file, relative_to='1', sequences=None, window_size=1000):
    label_convert = _id_tn_dict(sequences)

    lcbs = parse_xmfa(xmfa_file)

    records = [SeqRecord(Seq("A"), id=label_convert.get(relative_to, relative_to))]
    for lcb in lcbs:
        ids = [seq['id'] for seq in lcb]

        # Doesn't match part of our sequence
        if relative_to not in ids:
            continue

        # Skip sequences that are JUST our "relative_to" genome
        if len(ids) == 1:
            continue

        parent = [seq for seq in lcb if seq['id'] == relative_to][0]
        others = [seq for seq in lcb if seq['id'] != relative_to]

        for other in others:
            other['feature'] = SeqFeature(
                FeatureLocation(parent['start'], parent['end'] + 1),
                type="match", strand=parent['strand'],
                qualifiers={
                    "source": "progressiveMauve",
                    "Target": " ".join(map(str, [label_convert.get(other['id'], other['id']), other['start'], other['end'], '+' if other['strand'] > 0 else '-'])),
                    "ID": label_convert.get(other['id'], 'xmfa_' + other['rid'])
                }
            )

        for i in range(0, len(lcb[0]['seq']), window_size):
            block_seq = parent['seq'][i:i + window_size]
            real_window_size = len(block_seq)
            real_start = abs(parent['start']) - parent['seq'][0:i].count('-') + i
            real_end = real_start + real_window_size - block_seq.count('-')

            if (real_end - real_start) < 10:
                continue

            if parent['start'] < 0:
                strand = -1
            else:
                strand = 1

            for other in others:
                pid = _percent_identity(block_seq, other['seq'][i:i + real_window_size])
                # Ignore 0% identity sequences
                if pid == 0:
                    continue

                # Support for Biopython 1.68 and above, which removed sub_features
                if not hasattr(other['feature'], "sub_features"):
                    other['feature'].sub_features = []
                other['feature'].sub_features.append(
                    SeqFeature(
                        FeatureLocation(real_start, real_end),
                        type="match_part", strand=strand,
                        qualifiers={
                            "source": "progressiveMauve",
                            'score': pid
                        }
                    )
                )

        for other in others:
            records[0].features.append(other['feature'])
    return records


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Convert XMFA alignments to gff3', prog='xmfa2gff3')
    parser.add_argument('xmfa_file', type=argparse.FileType('r'), help='XMFA File')
    parser.add_argument('--window_size', type=int, help='Window size for analysis', default=1000)
    parser.add_argument('--relative_to', type=str, help='Index of the parent sequence in the MSA', default='1')
    parser.add_argument('--sequences', type=argparse.FileType('r'), nargs='+',
                        help='Fasta files (in same order) passed to parent for reconstructing proper IDs')
    parser.add_argument('--version', action='version', version='%(prog)s 1.0')

    args = parser.parse_args()

    result = convert_xmfa_to_gff3(**vars(args))
    GFF.write(result, sys.stdout)