view RaGOO/ragoo_utilities/utilities.py @ 13:b9a3aeb162ab draft default tip

Uploaded
author dereeper
date Mon, 26 Jul 2021 18:22:37 +0000
parents
children
line wrap: on
line source

import time
import subprocess
import operator

from ragoo_utilities.SeqReader import SeqReader

""" A collection of various helper functions"""

complements = str.maketrans("ACGTNURYSWKMBVDHacgtnuryswkmbvdh", "TGCANAYRSWMKVBHDtgcanayrswmkvbhd")


def reverse_complement(seq):
    """
    Reverse complement a nucleotide sequence.
    :param seq: Sequence to be reverse complemented
    :return: A reverse complemented sequence
    """
    return seq.translate(complements)[::-1]


def run(cmnd):
    """ Run command and report status. """
    log('Running : %s' % cmnd)
    if subprocess.call(cmnd, shell=True, executable='/bin/bash') != 0:
        raise RuntimeError('Failed : %s ' % cmnd)


def log(message):
    """ Log messages to standard output. """
    print(time.ctime() + ' --- ' + message, flush=True)


def read_contigs(in_file):
    d = dict()
    x = SeqReader(in_file)
    for header, seq in x.parse_fasta():
        d[header.replace('>', '').split(' ')[0]] = seq
    return d


def read_gz_contigs(in_file):
    d = dict()
    x = SeqReader(in_file)
    for header, seq in x.parse_gzip_fasta():
        d[header.replace('>', '').split(' ')[0]] = seq
    return d


def binary_search(query, numbers, left, right):
    """
    The contents of this method are either influenced by or directly copied from "Assemblytics_uniq_anchor.py"
    written by Maria Nattestad. The original script can be found here:

    https://github.com/MariaNattestad/Assemblytics

    And the publication associated with Maria's work is here:

    Nattestad, Maria, and Michael C. Schatz. "Assemblytics: a
    web analytics tool for the detection of variants from an
    assembly." Bioinformatics 32.19 (2016): 3021-3023.
    """
    #  Returns index of the matching element or the first element to the right

    if left >= right:
        return right
    mid = (right + left) // 2

    if query == numbers[mid]:
        return mid
    elif query < numbers[mid]:
        return binary_search(query, numbers, left, mid)
    else:  # if query > numbers[mid]:
        return binary_search(query, numbers, mid + 1, right)


def summarize_planesweep(lines, unique_length_required, keep_small_uniques=False):
    """
    The contents of this method are either influenced by or directly copied from "Assemblytics_uniq_anchor.py"
    written by Maria Nattestad. The original script can be found here:

    https://github.com/MariaNattestad/Assemblytics

    And the publication associated with Maria's work is here:

    Nattestad, Maria, and Michael C. Schatz. "Assemblytics: a
    web analytics tool for the detection of variants from an
    assembly." Bioinformatics 32.19 (2016): 3021-3023.
    """
    alignments_to_keep = []
    # print len(lines)

    # If no alignments:
    if len(lines) == 0:
        return []

    # If only one alignment:
    if len(lines) == 1:
        if keep_small_uniques == True or abs(lines[0][1] - lines[0][0]) >= unique_length_required:
            return [0]
        else:
            return []

    starts_and_stops = []
    for query_min, query_max in lines:
        # print query_min, query_max
        starts_and_stops.append((query_min, "start"))
        starts_and_stops.append((query_max, "stop"))

    sorted_starts_and_stops = sorted(starts_and_stops, key=operator.itemgetter(0))

    current_coverage = 0
    last_position = -1
    sorted_unique_intervals_left = []
    sorted_unique_intervals_right = []
    for pos, change in sorted_starts_and_stops:

        if current_coverage == 1:
            # sorted_unique_intervals.append((last_position,pos))
            sorted_unique_intervals_left.append(last_position)
            sorted_unique_intervals_right.append(pos)

        if change == "start":
            current_coverage += 1
        else:
            current_coverage -= 1
        last_position = pos

    linecounter = 0
    for query_min, query_max in lines:

        i = binary_search(query_min, sorted_unique_intervals_left, 0, len(sorted_unique_intervals_left))

        exact_match = False
        if sorted_unique_intervals_left[i] == query_min and sorted_unique_intervals_right[i] == query_max:
            exact_match = True
        sum_uniq = 0
        while i < len(sorted_unique_intervals_left) and sorted_unique_intervals_left[i] >= query_min and \
                        sorted_unique_intervals_right[i] <= query_max:
            sum_uniq += sorted_unique_intervals_right[i] - sorted_unique_intervals_left[i]
            i += 1

        # print query_min,query_max,sum_uniq
        if sum_uniq >= unique_length_required:
            alignments_to_keep.append(linecounter)
        elif keep_small_uniques == True and exact_match == True:
            alignments_to_keep.append(linecounter)
            # print "Keeping small alignment:", query_min, query_max
            # print sorted_unique_intervals_left[i-1],sorted_unique_intervals_right[i-1]

        linecounter += 1

    return alignments_to_keep