Mercurial > repos > petr-novak > repeatrxplorer
view lib/assembly_tools.py @ 0:1d1b9e1b2e2f draft
Uploaded
author | petr-novak |
---|---|
date | Thu, 19 Dec 2019 10:24:45 -0500 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python3 import sys import logging import subprocess import os import tempfile import config import shutil import itertools import pickle import shlex from lib.parallel.parallel import parallel2 as parallel from lib.parallel.parallel import get_max_proc REQUIRED_VERSION = (3, 4) MAX_BUFFER_SIZE = 100000 if sys.version_info < REQUIRED_VERSION: raise Exception("\n\npython 3.4 or higher is required!\n") LOGGER = logging.getLogger(__name__) MAX_FASTA_IN_DIRECTORY = 1000 def assembly(sequences, hitsort, clusters_info, assembly_dir, contigs_file, min_size_of_cluster_for_assembly = 0): ''' Runs assembly on sequences (SequenceSet). Assembly is performed on each cluster separatelly, clusters are taken from hitsort(Graph) Cluster_listing - list of Clusters if cluster.tandem_rank is 1 or 2 - no assembly is performed!! ''' # iterate over large clusters, assembly is performed on sequences stored in # cluster_little[index].fasta_file_full - for annotated clusters # sequences of small clusters are retrieved from database fasta_seqs = [ i.fasta_file_full for i in clusters_info if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS ] prefixes = ["CL{}".format(i.index) for i in clusters_info if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS] LOGGER.info("Number of clusters for assembly: {}".format(hitsort.number_of_clusters)) LOGGER.info("Assembling large clusters") assembly_files = parallel(cap3worker, fasta_seqs, prefixes) LOGGER.info("Large clusters assembled") # some clusters - tanem rank 1 assembled j = 0 for cl in clusters_info: if cl.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS: cl.assembly_files = {i: None for i in config.CAP3_FILES_MAPPING} consensus_file = cl.dir + "/tarean_contigs.fasta" cl.assembly_files["{}.{}.contigs"] = consensus_file with open(cl.dir_tarean + "/tarean_contigs.fasta", 'r') as fin, open(consensus_file, 'w') as fout: for line in fin: if line[0] == ">": line = ">CL{}Contig{}".format(cl.index, line[1:]) fout.write(line) else: cl.assembly_files = assembly_files[j] j += 1 # assembly of small clusters: # connection to files were results will be concatenated LOGGER.info("Assembly of small cluster - making tmp files") tmp_dir_root = tempfile.mkdtemp() tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root) nproc = get_max_proc() prefixes = [[] for i in range(nproc)] tmp_seq_files = [[] for i in range(nproc)] LOGGER.info("Assembly of small clusters - saving small cluster to tmp files") seq_dictionary = sequences.toDict() fasta_count = 0 chunk_counter = itertools.cycle(range(nproc)) for index in range(len(clusters_info) + 1, hitsort.number_of_clusters): chunk = next(chunk_counter) ids = hitsort.get_cluster_reads(index) if len(ids) < min_size_of_cluster_for_assembly: break prefixes[chunk].append("CL{}".format(index)) fasta_count += 1 if fasta_count > MAX_FASTA_IN_DIRECTORY: # create new subdir to keep number of files in directory low fasta_count = 1 tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root) fasta_file_name = "{}/{}".format(tmp_subdir, index) write_seqDict_to_fasta(file_name=fasta_file_name, sequences=seq_dictionary, subset=ids) tmp_seq_files[chunk].append(fasta_file_name) del seq_dictionary LOGGER.info("Assembly of small clusters running") pickled_fparts_small_contigs = parallel(cap3worker_multiple, tmp_seq_files, prefixes) LOGGER.info("Assembly of small clusters done") # append to all results LOGGER.info("Assembly of small clusters - appending results") all_con = {} for fkey in config.CAP3_FILES_MAPPING: if config.CAP3_FILES_MAPPING[fkey]: all_con[fkey] = open("{}/{}".format( assembly_dir, config.CAP3_FILES_MAPPING[fkey]), 'w') else: all_con[fkey] = None for pickled_fparts_multiple in pickled_fparts_small_contigs: with open(pickled_fparts_multiple, 'rb') as fp: fparts_multiple = pickle.load(fp) os.unlink(pickled_fparts_multiple) for fparts in fparts_multiple: for fkey in fparts: if all_con[fkey]: with open(fparts[fkey]) as f: all_con[fkey].write(f.read()) os.unlink(fparts[fkey]) else: os.unlink(fparts[fkey]) for fkey in all_con: if all_con[fkey]: all_con[fkey].close() LOGGER.info("Assembling - copying files to final location") # copy all contigs to one location: - contigs_file with open(contigs_file, 'w') as fout: for cl in clusters_info: # append contig from large clusters with open(cl.assembly_files["{}.{}.contigs"], 'r') as fin: for i in fin: fout.write(i) # append contigs from small clusters small_aln_file = "{}/{}".format( assembly_dir, config.CAP3_FILES_MAPPING['{}.{}.aln']) # calculate profile on aln file of small clusters file_base_name = small_aln_file[:-4] os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=small_aln_file, out=file_base_name)) os.system("select_and_sort_contigs.pl {fn}.info.fasta 5 2>&1".format(fn=file_base_name)) small_contig_file = file_base_name + ".info.fasta" with open(small_contig_file, 'r') as fin: for i in fin: fout.write(i) shutil.rmtree(tmp_dir_root) def write_seqDict_to_fasta(file_name, sequences, subset): with open(file_name, 'w') as f: for i in subset: f.write(">{}\n{}\n".format(i, sequences[i])) def cap3worker(seqfile, prefix="cap", cap3args=" -p 80 -o 40 "): prefix2 = "cap" cmd = "cap3 " + seqfile + cap3args + " -x " + prefix2 with open(seqfile + "." + prefix2 + ".aln", "w") as aln: subprocess.check_call(shlex.split(cmd), shell=False, stdout=aln) # this generate three files files_dict = {} for fkey in config.CAP3_FILENAMES: fn = fkey.format(seqfile, prefix2) fn_tmp = "{}.tmp".format(fn) files_dict[fkey] = fn if config.CAP3_PATTERNS_REPLACE[fkey]: pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey] with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out: for line in con_in: con_out.write(line.replace(pattern, replacement.format( prefix))) os.rename(fn_tmp, fn) # make new meaningful names here for fkey in config.CAP3_FILES_GOODNAMES: config.CAP3_FILES_GOODNAMES[fkey] fn_goodname = os.path.dirname(files_dict[fkey]) + "/" + config.CAP3_FILES_GOODNAMES[fkey] os.rename(files_dict[fkey], fn_goodname) files_dict[fkey] = fn_goodname aln_file = files_dict["{}.{}.aln"] file_base_name = aln_file[:-4] os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=aln_file, out=file_base_name)) os.system("select_and_sort_contigs.pl {fn}.info.fasta 5".format(fn=file_base_name)) # TODO -add new file to files_dict # replace simple fasta with info.fasta files_dict["{}.{}.contigs"] = file_base_name + ".info.fasta" return files_dict def cap3worker_multiple(many_seqfile, many_prefixes, cap3args=" -p 80 -o 40 "): ''' purpose of this script is to run multiple assemblies within single process avoiding running high number of short parallel subprocesses As starting subprocess for each cap3 assembly was very ineffective, all ap3 commands are written to single file and run using single subprocess command - ''' cmd_file = tempfile.NamedTemporaryFile(mode="w",delete=False).name with open(cmd_file,'w') as cmdf: for seqfile, prefix in zip(many_seqfile, many_prefixes): cmd = "cap3 " + seqfile + cap3args + " -x " + prefix + " > " + seqfile + "." + prefix + ".aln\n" cmdf.write(cmd) os.system("sh "+ cmd_file) # collect results: files_dict_many = [] for seqfile, prefix in zip(many_seqfile, many_prefixes): files_dict = {} for fkey in config.CAP3_FILENAMES: fn = fkey.format(seqfile, prefix) fn_tmp = "{}.tmp".format(fn) files_dict[fkey] = fn if config.CAP3_PATTERNS_REPLACE[fkey]: pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey] with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out: for line in con_in: con_out.write(line.replace(pattern, replacement.format( prefix))) os.rename(fn_tmp, fn) files_dict_many.append(files_dict) # this is too large to be return directly - use picking f = tempfile.NamedTemporaryFile(delete=False) os.unlink(cmd_file) pickle.dump(files_dict_many,file=f) return f.name