Mercurial > repos > petr-novak > repeatrxplorer
view lib/tarean/kmer_counting.py @ 0:1d1b9e1b2e2f draft
Uploaded
author | petr-novak |
---|---|
date | Thu, 19 Dec 2019 10:24:45 -0500 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python3 import logging logger = logging.getLogger(__name__) import itertools import os import sys import random import sqlite3 import subprocess import shlex # for command line arguments split import operator REQUIRED_VERSION = (3, 4) MAX_PRINT = 10 MEGABLAST = "-task megablast" HITSORT = "-task megablast" if sys.version_info < REQUIRED_VERSION: raise Exception("\n\npython 3.4 or higher is required!\n") # additional functions class Sequence: def __init__(self, seq, name="", paired=False): # the mode os seq storage can be changed later to make it more # memory efficient self._seq = bytes(str(seq), "ascii") self.name = str(name) @property def seq(self): return self._seq.decode("utf-8") @seq.setter def seq(self, value): self._seq = bytes(str(value), "ascii") def __str__(self): return "{0} : {1}".format(self.name, self.seq) @staticmethod def read_fasta(fasta_file_name): ''' generator - reads sequences from fasta file return sequence one by one ''' with open(fasta_file_name, 'r') as f: header = None seqstr = None for rawline in f: line = rawline.strip() if line == "": continue if line[0] == ">": if header and seqstr: yield Sequence(seqstr, header) # reset seqstr = None header = line[1:] elif seqstr: Warning("sequence was not preceeded by header") else: header = line[1:] else: seqstr = line if not seqstr else seqstr + line # skip empty lines: if header and seqstr: yield Sequence(seqstr, header) return def write2fasta(self, file_object): file_object.write(">{0}\n{1}\n".format(self.name, self.seq)) def get_kmers(string, width=11): L = len(string) parts = [string[i:i + width] for i in range(L - width + 0)] return parts def count_kmers_from_file(f, width=11): counts = {} for i in Sequence.read_fasta(f): a = get_kmers(i.seq, width) for km in a: if "N" in km: continue if km in counts: counts[km] += 1 else: counts[km] = 1 sorted_counts = sorted(counts.items(), key=operator.itemgetter(1), reverse=True) return sorted_counts if __name__ == "__main__": L = len(sys.argv) - 1 kmer_length = int(sys.argv[-1]) files = sys.argv[1:-1] for fin in files: counts = count_kmers_from_file(fin, kmer_length) fout = "{}_{}.kmers".format(fin, kmer_length) with open(fout, "w") as f: for i in counts: f.write("{}\t{}\n".format(*i)) print(fout)