view RaGOO/ragoo_utilities/ContigAlignment.py @ 13:b9a3aeb162ab draft default tip

Uploaded
author dereeper
date Mon, 26 Jul 2021 18:22:37 +0000
parents
children
line wrap: on
line source

import operator
from collections import defaultdict

from ragoo_utilities.utilities import summarize_planesweep, binary_search


class ContigAlignment:
    """
    This object will represent a contig's alignment to a reference chromosome.
    Specifically, this class will organize alignments in a one-to-many fashion, where
    we will only have one object per contig, and each object will store all of the alignments
    of that contig.


    """

    def __init__(self, in_contig):
        """
        Each object will refer to a single contig via its header name.

        All other alignment metrics will be stored in lists, where the list offsets
        correspond to the alignment number.

        Don't actually add any alignments through object instantiation. Only use the add_alignment method.
        """
        self.contig = in_contig

        self.query_lens = []
        self.query_starts = []
        self.query_ends = []
        self.strands = []
        self.ref_headers = []
        self.ref_lens = []
        self.ref_starts = []
        self.ref_ends = []
        self.num_matches = []
        self.aln_lens = []
        self.mapqs = []

        self._is_unique_anchor_filtered = False
        self._is_merged = False

    def __repr__(self):
        return '<ContigAlignment' + self.contig + '>'

    def __str__(self):
        """ Return the alignments in sorted PAF format. """
        self.sort_by_query()
        all_alns = []
        for i in range(len(self.ref_headers)):
            all_alns.append(
                "\t".join([
                    self.contig,
                    str(self.query_lens[i]),
                    str(self.query_starts[i]),
                    str(self.query_ends[i]),
                    self.strands[i],
                    self.ref_headers[i],
                    str(self.ref_lens[i]),
                    str(self.ref_starts[i]),
                    str(self.ref_ends[i]),
                    str(self.num_matches[i]),
                    str(self.aln_lens[i]),
                    str(self.mapqs[i])
                ])
            )
        return "\n".join(all_alns)

    def get_attr_lens(self):
        all_lens = [
            len(self.query_lens),
            len(self.query_starts),
            len(self.query_ends),
            len(self.strands),
            len(self.ref_headers),
            len(self.ref_lens),
            len(self.ref_starts),
            len(self.ref_ends),
            len(self.num_matches),
            len(self.aln_lens),
            len(self.mapqs)
        ]
        return all_lens

    def add_alignment(self, paf_line):
        """
        The only way to add alignments for this contig. Only accepts a PAFLine type object which represents
        as single line of a paf file.
        """
        if not paf_line.contig == self.contig:
            raise ValueError('Only can add alignments with query header %s' % self.contig)

        self.query_lens.append(paf_line.query_len)
        self.query_starts.append(paf_line.query_start)
        self.query_ends.append(paf_line.query_end)
        self.strands.append(paf_line.strand)
        self.ref_headers.append(paf_line.ref_header)
        self.ref_lens.append(paf_line.ref_len)
        self.ref_starts.append(paf_line.ref_start)
        self.ref_ends.append(paf_line.ref_end)
        self.num_matches.append(paf_line.num_match)
        self.aln_lens.append(paf_line.aln_len)
        self.mapqs.append(paf_line.mapq)

        # Check that all attribute lists have same length
        all_lens = self.get_attr_lens()
        assert len(set(all_lens)) == 1

    def has_unique_chr_match(self):
        s1 = set(self.ref_headers)
        if len(s1) == 1:
            return True
        return False

    def count_chr_matches(self):
        s1 = set(self.ref_headers)
        return len(s1)

    def rearrange_alns(self, hits):
        """ Generally re structure the alignments according to 'hits', an ordered list of indices. """
        self.query_lens = [self.query_lens[i] for i in hits]
        self.query_starts = [self.query_starts[i] for i in hits]
        self.query_ends = [self.query_ends[i] for i in hits]
        self.strands = [self.strands[i] for i in hits]
        self.ref_headers = [self.ref_headers[i] for i in hits]
        self.ref_lens = [self.ref_lens[i] for i in hits]
        self.ref_starts = [self.ref_starts[i] for i in hits]
        self.ref_ends = [self.ref_ends[i] for i in hits]
        self.num_matches = [self.num_matches[i] for i in hits]
        self.aln_lens = [self.aln_lens[i] for i in hits]
        self.mapqs = [self.mapqs[i] for i in hits]

    def filter_ref_chroms(self, in_chroms):
        """
        Given a list of chromosomes, mutate this object so as to only include alignments to those chromosomes.
        """
        hits = []
        for i in range(len(self.ref_headers)):
            if self.ref_headers[i] in in_chroms:
                hits.append(i)

        self.rearrange_alns(hits)

    def sort_by_ref(self):
        ref_pos = []
        for i in range(len(self.ref_headers)):
            ref_pos.append((self.ref_headers[i], self.ref_starts[i], self.ref_ends[i], i))
        hits = [i[3] for i in sorted(ref_pos)]

        self.rearrange_alns(hits)

    def sort_by_query(self):
        q_pos = []
        for i in range(len(self.ref_headers)):
            q_pos.append((self.query_starts[i], self.query_ends[i], i))
        hits = [i[2] for i in sorted(q_pos)]

        self.rearrange_alns(hits)

    def exclude_ref_chroms(self, exclude_list):
        hits = []
        for i in range(len(self.ref_headers)):
            if self.ref_headers[i] not in exclude_list:
                hits.append(i)

        self.rearrange_alns(hits)

    def filter_lengths(self, l):
        hits = []
        for i in range(len(self.ref_headers)):
            if self.aln_lens[i] > l:
                hits.append(i)

        self.rearrange_alns(hits)

    def unique_anchor_filter(self):
        """
        The contents of this method are either influenced by or directly copied from "Assemblytics_uniq_anchor.py"
        written by Maria Nattestad. The original script can be found here:

        https://github.com/MariaNattestad/Assemblytics

        And the publication associated with Maria's work is here:

        Nattestad, Maria, and Michael C. Schatz. "Assemblytics: a
        web analytics tool for the detection of variants from an
        assembly." Bioinformatics 32.19 (2016): 3021-3023.
        """

        if not self._is_unique_anchor_filtered:
            lines_by_query = []
            for i, j in zip(self.query_starts, self.query_ends):
                lines_by_query.append((i, j))

            hits = summarize_planesweep(lines_by_query, 10000)
            self.rearrange_alns(hits)
            self._is_unique_anchor_filtered = True

    def merge_alns(self, merge_dist=100000):
        """
        Merge chains of alignments that are to the same target, have the same orientation, and are less than
        merge_dist away from each other.
        :param merge_dist:
        :return:
        """
        # Sort the alignments
        self.sort_by_ref()

        # Might also want to filter out low identity alignments

        # Keep track of which alignments we are comparing
        i = 0
        j = 1
        while j < len(self.ref_headers):
            if all([
                self.ref_headers[i] == self.ref_headers[j],
                self.strands[i] == self.strands[j],
                self.ref_starts[j] - self.ref_ends[i] <= merge_dist
            ]):
                # Merge the alignments in place of the first alignment
                self.ref_starts[i] = min(self.ref_starts[i], self.ref_starts[j])
                self.ref_ends[i] = max(self.ref_ends[i], self.ref_ends[j])
                self.query_starts[i] = min(self.query_starts[i], self.query_starts[j])
                self.query_ends[i] = max(self.query_ends[i], self.query_ends[j])

                self.num_matches[i] += self.num_matches[j]
                self.aln_lens[i] = self.ref_ends[i] - self.ref_starts[i]
                self.mapqs[i] = (self.mapqs[i] + self.mapqs[j])//2

                # Remove the redundant alignment
                self.query_lens.pop(j)
                self.query_starts.pop(j)
                self.query_ends.pop(j)
                self.strands.pop(j)
                self.ref_headers.pop(j)
                self.ref_lens.pop(j)
                self.ref_starts.pop(j)
                self.ref_ends.pop(j)
                self.num_matches.pop(j)
                self.aln_lens.pop(j)
                self.mapqs.pop(j)
            else:
                i += 1
                j += 1

        # remove contained alignments. contained w.r.t the contig
        cont_inds = []
        for i in range(len(self.ref_headers)):
            for j in range(len(self.ref_headers)):
                # Check if j is contained by i
                if i == j:
                    continue
                if self.query_starts[i] <= self.query_starts[j] and self.query_ends[i] >= self.query_ends[j]:
                    cont_inds.append(j)

        hits = [i for i in range(len(self.ref_headers)) if i not in cont_inds]
        self.rearrange_alns(hits)

        self._is_merged = True

    def get_break_candidates(self):
        if not self._is_merged:
            raise ValueError("Alignments must first be merged.")

        self.sort_by_query()
        all_candidates = []
        for i in range(1, len(self.ref_headers)):
            all_candidates.append(min(self.query_ends[i-1], self.query_starts[i]))
            all_candidates.append(max(self.query_ends[i-1], self.query_starts[i]))

        return all_candidates



class UniqueContigAlignment:
    """
    A class representing the reference chromosome to which a particular contig will be assigned to.
    At first, the incoming alignments may be to multiple chromosomes, but this class will assign a single
    reference chromosome to an input contig.
    """

    def __init__(self, in_contig_aln):
        if not isinstance(in_contig_aln, ContigAlignment):
            message = 'Can only make a unique alignment from a ContigAlignment object. Got type %s instead.' % str(type(in_contig_aln))
            raise ValueError(message)

        self.contig = in_contig_aln.contig

        # Find the chromosome which was covered the most in these alignments.
        self.ref_chrom = None
        self.confidence = 0.0
        self._get_best_ref_cov(in_contig_aln)

    def __str__(self):
        return '<UniqueContigAlignment:' + self.contig + ' - ' + self.ref_chrom + '>'

    def _get_best_ref_cov(self, alns):
        # Get list of all references chromosomes
        all_chroms = set(alns.ref_headers)
        if len(all_chroms) == 1:
            self.ref_chrom = alns.ref_headers[0]
            self.confidence = 1
            return

        # Initialize coverage counts for each chromosome
        ranges = dict()
        for i in all_chroms:
            ranges[i] = 0

        # Get all the ranges in reference to all chromosomes
        all_intervals = defaultdict(list)
        for i in range(len(alns.ref_headers)):
            this_range = (alns.ref_starts[i], alns.ref_ends[i])
            this_chrom = alns.ref_headers[i]
            all_intervals[this_chrom].append(this_range)

        # For each chromosome, sort the range and get the union interval length.
        # Algorithm and pseudocode given by Michael Kirsche
        for i in all_intervals.keys():
            sorted_intervals = sorted(all_intervals[i], key=lambda tup: tup[0])
            max_end = -1
            for j in sorted_intervals:
                start_new_terr = max(j[0], max_end)
                ranges[i] += max(0, j[1] - start_new_terr)
                max_end = max(max_end, j[1])

        assert ranges

        # I convert to a list and sort the ranges.items() in order to have ties broken in a deterministic way.
        max_chrom = max(sorted(list(ranges.items())), key=operator.itemgetter(1))[0]
        self.ref_chrom = max_chrom

        # Now get the confidence of this chromosome assignment
        # Equal to the max range over all ranges
        self.confidence = ranges[max_chrom]/sum(ranges.values())
        assert self.confidence >= 0
        assert self.confidence <= 1


class LongestContigAlignment:
    """
    Given a ContigAlignment object, find the longest alignment with respect to a given references chromosome.

    1. Filter out any alignments that are not aligned to specified chromosome.
    2. Find longest alignment remaining.
    """

    def __init__(self, in_contig_aln):
        self.best_index = self._get_best_index(in_contig_aln)
        self.contig = in_contig_aln.contig
        self.ref_start = in_contig_aln.ref_starts[self.best_index]
        self.ref_end = in_contig_aln.ref_ends[self.best_index]
        self.aln_len = in_contig_aln.aln_lens[self.best_index]
        self.strand = in_contig_aln.strands[self.best_index]
        self.interval = (self.ref_start, self.ref_end)

    def _get_best_index(self, aln):
        max_index = -1
        max_len = -1
        for i in range(len(aln.aln_lens)):
            if aln.aln_lens[i] > max_len:
                max_len = aln.aln_lens[i]
                max_index = i
        return max_index