Mercurial > repos > iuc > decontaminator
view utils/preprocess.py @ 0:b856d3d95413 draft default tip
planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/decontaminator commit 3f8e87001f3dfe7d005d0765aeaa930225c93b72
author | iuc |
---|---|
date | Mon, 09 Jan 2023 13:27:09 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python # -*- coding: utf-8 -*- # Credits: Grigorii Sukhorukov, Macha Nikolski import math import os import pathlib import random import h5py import numpy as np from Bio import SeqIO from Bio.Seq import Seq from Bio.SeqRecord import SeqRecord from sklearn.utils import shuffle def reverse_complement(fragment): """ provides reverse complement to sequences Input: sequences - list with SeqRecord sequences in fasta format Output: complementary_sequences - list with SeqRecord complementary sequences in fasta format """ # complementary_sequences = [] # for sequence in sequences: # complementary_sequence = SeqRecord( # seq=Seq(sequence.seq).reverse_complement(), # id=sequence.id + "_reverse_complement", # ) # complementary_sequences.append(complementary_sequence) fragment = fragment[::-1].translate(str.maketrans('ACGT', 'TGCA')) return fragment def introduce_mutations(seqs, mut_rate, rs=None): """ Function that mutates sequences in the entering fasta file A proportion of nucleotides are changed to other nucleotide Not yet taking account of mutation for gaps mut_rate - proportion from 0.0 to 1.0, float """ random.seed(a=rs) assert 0.0 <= mut_rate <= 1.0 mutated_seqs = [] for seq in seqs: mut_seq = list(str(seq.seq)) l_ = len(mut_seq) mutated_sites_i = random.sample(range(l_), int(mut_rate * l_)) for mut_site_i in mutated_sites_i: mut_site = mut_seq[mut_site_i] mutations = ["A", "C", "T", "G"] if mut_site in mutations: mutations.remove(mut_site) mut_seq[mut_site_i] = random.sample(mutations, 1)[0] mutated_seq = SeqRecord( seq=Seq("".join(mut_seq)), id=seq.id + f"mut_{mut_rate}", name="", description="", ) mutated_seqs.append(mutated_seq) return mutated_seqs def separate_by_length(length_, seq_list, fold=None,): # TODO: add docs included = [] to_process = [] excluded = 0 for seq_ in seq_list: l_ = len(seq_.seq) if l_ >= length_: if fold is None: included.append(seq_) elif l_ < length_ * fold: included.append(seq_) else: to_process.append(seq_) else: excluded += 1 print(f"A total of {excluded} sequences was excluded due to being smaller than {length_}") return included, to_process def chunks(lst, n): """Yield successive n-sized chunks from lst. https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks""" for i in range(0, len(lst), n): yield lst[i:i + n] def correct(frag): """ leaves only unambiguous DNA code (ACTG-) Input: frag - string of nucleotides Output: pr_frag - corrected string of nucleotides """ pr_frag = frag.upper() pr_frag_s = set(pr_frag) if pr_frag_s != {"A", "C", "G", "T", "-"}: for letter in pr_frag_s - {"A", "C", "G", "T", "-"}: pr_frag = pr_frag.replace(letter, "-") return pr_frag def fragmenting(sequences, sl_wind_size, max_gap=0.05, sl_wind_step=None): """ slices sequences in fragments by sliding window based on its size and step. last fragment is padded by '-' fragments have ambiguous bases replaced by '-' fragments with many '-' are discarded Input: sequences - list with SeqRecord sequences in fasta format max_gap - max allowed proportion of '-' sl_wind_size - sliding window step sl_wind_step - sliding window step, by default equals sliding window size (None is replaced by it) Output: fragments - list with sequence fragments """ if sl_wind_step is None: sl_wind_step = sl_wind_size fragments = [] fragments_rc = [] out_sequences = [] for sequence in sequences: seq = str(sequence.seq) n_fragments = 1 + max(0, math.ceil((len(seq) - sl_wind_size) / sl_wind_step)) for n in range(n_fragments): if n + 1 != n_fragments: frag = seq[n * sl_wind_step: n * sl_wind_step + sl_wind_size] elif n_fragments == 1: # padding the shorter fragment to sl_wind_size frag_short = seq[n * sl_wind_step: n * sl_wind_step + sl_wind_size] frag = frag_short + (sl_wind_size - len(frag_short)) * "-" else: frag = seq[(len(seq) - sl_wind_size):] # replace ambiguous characters frag = correct(frag) assert len(frag) == sl_wind_size, f"{len(frag)} vs {sl_wind_size}" # skipping sequences with many gaps if frag.count("-") / sl_wind_size <= max_gap: fragments.append(frag) # generating reverse complement fragments_rc.append(reverse_complement(frag)) fr_seq = SeqRecord( seq=Seq(frag), id=f"{sequence.id}_{n*sl_wind_step}_{sl_wind_size}", name="", description="", ) out_sequences.append(fr_seq) return fragments, fragments_rc, out_sequences def label_fasta_fragments(sequences, label): """ Provides labels to generated fragments stored in fasta Input: sequences - list with SeqRecord sequences label - type of label (bacteria, virus, plant) Output: labeled_fragments - list with labeled SeqRecord sequences """ # assert label in ["virus", "plant", "bacteria"] labeled_fragments = [] for sequence in sequences: sequence.id = sequence.id + f"_{label}" labeled_fragments.append(sequence) return labeled_fragments def one_hot_encode(fragments): """ produces one-hot matrices from fragments and labels '-' is given all zeros Input: fragments - list with sequence fragments label - type of label (int <= depth) label_depth - number of possible labels Output: encoded_fragments - list with one-hot encoded fragments labels - list with one-hot encoded labels """ import tensorflow as tf encoded_fragments = [] map_dict = {"A": 0, "C": 1, "G": 2, "T": 3, "-": -1} for frag in fragments: frag_array = np.array(list(frag)) integer_encoded = np.int8(np.vectorize(map_dict.get)(frag_array)) one_hot_encoded = tf.one_hot(integer_encoded, depth=4, dtype=tf.int8).numpy() encoded_fragments.append(one_hot_encoded) encoded_fragments = np.stack(encoded_fragments) return encoded_fragments def prepare_labels(fragments, label, label_depth): """ produces one-hot labels '-' is given all zeros Input: fragments - list with sequence fragments label - type of label (int <= depth) label_depth - number of possible labels Output: labels - list with one-hot encoded labels """ import tensorflow as tf n_fragments = len(fragments) labels = np.int8(np.full(n_fragments, label)) labels = tf.one_hot(labels, depth=label_depth).numpy() return labels # TODO: write docs for functions def calculate_total_length(seq_path): """ Calculate total length of the sequences in the fasta file. Needed for weighted sampling Input: seq_path - path to the file with sequences Output: seq_length - total length of all sequences in the file """ seqs = list(SeqIO.parse(seq_path, "fasta")) seq_length = 0 for seq in seqs: seq_length += len(seq.seq) return seq_length def prepare_seq_lists(in_paths, n_fragments, weights=None,): """ selects files with sequences based on extension and calculates number of fragments to be sampled Input: in_paths - list of paths to folder with sequence files. Can be a string also a string n_fragments - number of fragments to be sampled weights - upsampling of fragments. fractions should sum to one Output: seqs_list - list with path to files with sequences n_fragments_list - number of fragments to be sampled lists are zipped to work with ray iterators """ # case when we recieve a single sequence file if type(in_paths) is str and in_paths.endswith(('.fna', '.fasta')): return [[in_paths, n_fragments]] else: # transform string to list if type(in_paths) is str or type(in_paths) is pathlib.PosixPath: in_paths = [in_paths] if weights: assert len(weights) == len(in_paths) assert 1.01 > round(sum(weights), 2) > 0.99 else: l_ = len(in_paths) weights = [1 / l_] * l_ n_fragments_list_all = [] seqs_list_all = [] for in_paths, w_ in zip(in_paths, weights): seqs_list = [] seq_length_list = [] total_length = 0 for file in os.listdir(in_paths): if file.endswith("fna") or file.endswith("fasta"): seq_path = (os.path.join(in_paths, file)) seqs_length = calculate_total_length(seq_path) seqs_list.append(seq_path) seq_length_list.append(seqs_length) total_length += seqs_length # + 1 may lead to a slightly bigger number than desired n_fragments_list = [((seq_length / total_length) * n_fragments * w_ + 1) for seq_length in seq_length_list] n_fragments_list_all.extend(n_fragments_list) seqs_list_all.extend(seqs_list) print("list calculation done") return list(zip(seqs_list_all, n_fragments_list_all)) def sample_fragments(seq_container, length, random_seed=1, limit=None, max_gap=0.05, sl_wind_step=None): """ Randomly samples fragments from sequences in the list. Is a bit cumbersome written to work with ray. Input: seq_container - list with each entry containing path to sequence, and n samples from this sequence. length - desired length of sampled fragments Output: fragments - list with sequence fragments """ random.seed(a=random_seed) total_fragments = [] total_fragments_rc = [] total_seqs = [] for entry in seq_container: seq = list(SeqIO.parse(entry[0], "fasta")) n_fragments = entry[1] seqs = [] fragments = [] fragments_rc = [] counter_1 = 0 counter_2 = 0 while counter_1 < n_fragments: # select chromosomes if there are any fragment_full = random.choice(seq) r_end = len(fragment_full.seq) - length try: r_start = random.randrange(r_end) fragment = SeqRecord( seq=fragment_full.seq[r_start:(r_start + length)], id=f"{fragment_full.id}_{length}_{r_start}", name="", description="", ) temp_, temp_rc, _ = fragmenting([fragment], length, max_gap, sl_wind_step=sl_wind_step) if temp_ and temp_rc: seqs.append(fragment) fragments.extend(temp_) fragments_rc.extend(temp_rc) counter_1 += 1 except ValueError: # print(f"{fragment_full.id} has length {len(fragment_full.seq)} and is too short to be sampled") pass counter_2 += 1 if limit: assert counter_2 <= limit * n_fragments, f"While cycle iterated more than {limit}, data is ambiguous." \ f" Only {len(fragments)} fragments were sampled out of {n_fragments}" total_fragments.extend(fragments) total_fragments_rc.extend(fragments_rc) total_seqs.extend(seqs) # print("sequence sampling done") return total_fragments, total_fragments_rc, total_seqs def prepare_ds_fragmenting(in_seq, label, label_int, fragment_length, sl_wind_step, max_gap=0.05, n_cpus=1): if sl_wind_step is None: sl_wind_step = int(fragment_length / 2) # generating viral fragments and labels seqs = list(SeqIO.parse(in_seq, "fasta")) frags, frags_rc, seqs_ = fragmenting(seqs, fragment_length, max_gap=max_gap, sl_wind_step=sl_wind_step) encoded = one_hot_encode(frags) encoded_rc = one_hot_encode(frags_rc) labs = prepare_labels(frags, label=label_int, label_depth=2) seqs_ = label_fasta_fragments(seqs_, label=label) # subsetting to unique fragments u_encoded, indices = np.unique(encoded, axis=0, return_index=True) u_encoded_rc = encoded_rc[indices] u_labs = labs[indices] u_seqs = [seqs_[i] for i in indices] assert (np.shape(u_encoded)[0] == np.shape(u_encoded_rc)[0]) print(f"Encoding {label} sequences finished") # print(f"{np.shape(u_encoded)[0]} forward fragments generated") n_frags = np.shape(u_encoded)[0] return u_encoded, u_encoded_rc, u_labs, u_seqs, n_frags def prepare_ds_sampling(in_seqs, fragment_length, n_frags, label, label_int, random_seed, n_cpus=1, limit=100): # generating plant fragments and labels seqs_list = prepare_seq_lists(in_seqs, n_frags) frags, frags_rc, seqs_ = sample_fragments(seqs_list, fragment_length, random_seed, limit=limit, max_gap=0.05) frags, frags_rc, seqs_ = shuffle(frags, frags_rc, seqs_, random_state=random_seed, n_samples=int(n_frags)) encoded = one_hot_encode(frags) encoded_rc = one_hot_encode(frags_rc) labs = prepare_labels(frags, label=label_int, label_depth=2) seqs_ = label_fasta_fragments(seqs_, label=label) assert (np.shape(encoded)[0] == np.shape(encoded_rc)[0]) print(f"Encoding {label} sequences finished") # print(f"{np.shape(encoded)[0]} forward fragments generated") return encoded, encoded_rc, labs, seqs_, n_frags def storing_encoded(encoded, encoded_rc, labs, out_path, ): f = h5py.File(out_path, "w") f.create_dataset("fragments", data=encoded) f.create_dataset("fragments_rc", data=encoded_rc) f.create_dataset("labels", data=labs) f.close()