view RaGOO/ragoo_utilities/ReadCoverage.py @ 13:b9a3aeb162ab draft default tip

Uploaded
author dereeper
date Mon, 26 Jul 2021 18:22:37 +0000
parents
children
line wrap: on
line source

import bisect

import numpy as np


class ReadCoverage:

    def __init__(self, in_paf):
        self.paf = in_paf
        self.glob_mean = None
        self.glob_std = None
        self.coverage_map = dict()
        self.ctg_lens = dict()
        self._make_coverage_map()
        self._get_glob_mean()

    @staticmethod
    def _tabulate_coverage(cov_list):
        current_coverage = 0
        coverage_list = []
        seen = set()

        for header, pos in cov_list:
            if header in seen:
                current_coverage -= 1
                coverage_list.append((pos, current_coverage))
                seen.remove(header)
            else:
                current_coverage += 1
                coverage_list.append((pos, current_coverage))
                seen.add(header)
        return coverage_list

    @staticmethod
    def _smooth_supported_breaks(sup_breaks, break_types):
        """
        If there are multiple low coverage breaks in close proximity,
        merge it into one break at the lowest coverage point.
        """
        i = 0
        j = 1
        while j < len(sup_breaks):
            if break_types[i] == 'l' and break_types[j] == 'l':
                if abs(sup_breaks[i][0] - sup_breaks[j][0]) < 100000:
                    # Merge these two break points
                    sup_breaks[i] = min([sup_breaks[i], sup_breaks[j]], key=lambda x: x[1])
                    sup_breaks.pop(j)
                else:
                    i += 1
                    j += 1
            else:
                i += 1
                j += 1

        return [z[0] for z in sup_breaks]

    def _trim_ends(self, dist=25000):
        """ Remove the ends of the contigs from the coverage map. """
        for i in self.coverage_map:
            # Start with the beginning of the contig
            start_idx = 0
            end_idx = 0
            for j in range(len(self.coverage_map[i])):
                if self.coverage_map[i][j][0] < dist:
                    start_idx = j

                if self.coverage_map[i][j][0] > self.ctg_lens[i] - dist:
                    end_idx = j-1
                    break
            self.coverage_map[i] = self.coverage_map[i][start_idx:end_idx]

        # Remove contigs which don't have coverage info.
        header_keys = list(self.coverage_map.keys())
        for i in header_keys:
            if not self.coverage_map[i]:
                self.coverage_map.pop(i)

    def _make_coverage_map(self):
        """
        Populate self.coverage_map. This is a dictionary that associates each contig header with a list of alignment
        positions and their coverage levels.
        """
        # Associate with each contig header, a list of (query header, start), (query header, end)
        alns_pos = dict()
        with open(self.paf, 'r') as f:
            for line in f:
                L1 = line.rstrip().split('\t')

                # Only consider an alignment if at least 75% of the read aligned.
                if abs((int(L1[3]) - int(L1[2])))/int(L1[1]) >= 0.75:
                    if L1[5] in alns_pos:
                        alns_pos[L1[5]].append((L1[0], int(L1[7])))
                        alns_pos[L1[5]].append((L1[0], int(L1[8])))
                    else:
                        alns_pos[L1[5]] = [(L1[0], int(L1[7])), (L1[0], int(L1[8]))]
                        self.ctg_lens[L1[5]] = int(L1[6])

        # Sort these coverage positions and get the coverage map
        for i in alns_pos:
            self.coverage_map[i] = self._tabulate_coverage(sorted(alns_pos[i], key=lambda x: x[1]))

        self._trim_ends()

    def _get_glob_mean(self):
        L1 = []
        for i in self.coverage_map:

            # In the case where we have multiple coverage values for one position, take the last one.
            last_pos = 0
            curr_val = 0
            for j in self.coverage_map[i]:
                if j[0] == last_pos:
                    curr_val = j[1]
                else:
                    last_pos = j[0]
                    L1.append(curr_val)
                    cur_val = j[1]

        L1 = np.asarray(L1, dtype=np.int32)
        self.glob_mean = np.median(L1)
        self.glob_std = np.sqrt(self.glob_mean)

    def _get_index_range(self, header, start_ind, distance):
        """
        Get the list of indices that are contained within a distance around the start index
        """
        all_inds = []
        low_counter = 1

        # Check if the start point is at the end of the contig
        if start_ind == len(self.coverage_map[header]):
            start_ind -= 1

        start_pos = self.coverage_map[header][start_ind][0]
        is_low = False

        # Get all coverage map indices representing regions 50kbp upstream of the start
        while not is_low:
            next_ind = start_ind-low_counter
            if next_ind < 0:
                is_low = True
            else:
                next_pos = self.coverage_map[header][next_ind][0]
                if start_pos - next_pos > distance:
                    is_low = True
                else:
                    all_inds.append(next_ind)
                    low_counter += 1

        # Repeat for 50kbp downstream
        high_counter = 1
        is_high = False
        while not is_high:
            next_ind = start_ind + high_counter
            if next_ind >= len(self.coverage_map[header]):
                is_high = True
            else:
                next_pos = self.coverage_map[header][next_ind][0]
                if next_pos - start_pos > distance:
                    is_high = True
                else:
                    all_inds.append(next_ind)
                    high_counter += 1

        return sorted(all_inds)

    def check_break_cov(self, header, in_breaks, min_cov=None, max_cov=None):
        """
        Given a list of potential break points, verify if those break points occur around low or high coverage
        areas. If so, replace the candidate break point with the low/high coverage break point.
        :param header: contig header for these breaks
        :param in_breaks: list of candidate break points
        :param min_cov: break at coverage levels below this value
        :param max_cov: break at coverage levels above this value
        :return: list of real break points, or empty list if not breaking is recommended
        """
        # Check that we have coverage info for this contig.
        if header not in self.coverage_map:
            return []

        if min_cov is None or max_cov is None:
            # Automatically calculate min and max coverage
            min_cov = max(self.glob_mean - (self.glob_std*3), 0)
            max_cov = self.glob_mean + (self.glob_std*3)

        supported_breaks = []
        break_types = []

        for i in in_breaks:
            # Get the coverage for the position closest to this potential break point
            ins_ind = bisect.bisect_left(self.coverage_map[header], (i, 0))
            # Get the coverage for positions within 50 kbp of the candidate coverage position
            # Exclude positions near the ends of the sequence.
            ind_range = self._get_index_range(header, ins_ind, 50000)

            if len(set(ind_range)) > 1:
                # Check for low coverage
                lowest_cov = min(self.coverage_map[header][ind_range[0]:ind_range[-1]], key=lambda x: x[1])
                if lowest_cov[1] < min_cov:
                    supported_breaks.append(lowest_cov)
                    break_types.append('l')
                    continue

                # Check for high coverage
                highest_cov = max(self.coverage_map[header][ind_range[0]:ind_range[-1]], key=lambda x: x[1])
                if highest_cov[1] > max_cov:
                    supported_breaks.append(highest_cov)
                    break_types.append('h')

        return self._smooth_supported_breaks(supported_breaks, break_types)