comparison lib/assembly_tools.py @ 0:1d1b9e1b2e2f draft

Uploaded
author petr-novak
date Thu, 19 Dec 2019 10:24:45 -0500
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:1d1b9e1b2e2f
1 #!/usr/bin/env python3
2 import sys
3 import logging
4 import subprocess
5 import os
6 import tempfile
7 import config
8 import shutil
9 import itertools
10 import pickle
11 import shlex
12 from lib.parallel.parallel import parallel2 as parallel
13 from lib.parallel.parallel import get_max_proc
14
15 REQUIRED_VERSION = (3, 4)
16 MAX_BUFFER_SIZE = 100000
17 if sys.version_info < REQUIRED_VERSION:
18 raise Exception("\n\npython 3.4 or higher is required!\n")
19 LOGGER = logging.getLogger(__name__)
20 MAX_FASTA_IN_DIRECTORY = 1000
21
22 def assembly(sequences, hitsort, clusters_info, assembly_dir,
23 contigs_file, min_size_of_cluster_for_assembly = 0):
24 '''
25 Runs assembly on sequences (SequenceSet). Assembly is
26 performed on each cluster separatelly, clusters are taken
27 from hitsort(Graph)
28 Cluster_listing - list of Clusters
29 if cluster.tandem_rank is 1 or 2 - no assembly is performed!!
30 '''
31
32 # iterate over large clusters, assembly is performed on sequences stored in
33 # cluster_little[index].fasta_file_full - for annotated clusters
34 # sequences of small clusters are retrieved from database
35 fasta_seqs = [
36 i.fasta_file_full
37 for i in clusters_info
38 if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS
39 ]
40 prefixes = ["CL{}".format(i.index)
41 for i in clusters_info
42 if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS]
43 LOGGER.info("Number of clusters for assembly: {}".format(hitsort.number_of_clusters))
44 LOGGER.info("Assembling large clusters")
45 assembly_files = parallel(cap3worker, fasta_seqs, prefixes)
46 LOGGER.info("Large clusters assembled")
47 # some clusters - tanem rank 1 assembled
48 j = 0
49 for cl in clusters_info:
50 if cl.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS:
51 cl.assembly_files = {i: None for i in config.CAP3_FILES_MAPPING}
52 consensus_file = cl.dir + "/tarean_contigs.fasta"
53 cl.assembly_files["{}.{}.contigs"] = consensus_file
54 with open(cl.dir_tarean + "/tarean_contigs.fasta",
55 'r') as fin, open(consensus_file, 'w') as fout:
56 for line in fin:
57 if line[0] == ">":
58 line = ">CL{}Contig{}".format(cl.index, line[1:])
59 fout.write(line)
60 else:
61 cl.assembly_files = assembly_files[j]
62 j += 1
63 # assembly of small clusters:
64 # connection to files were results will be concatenated
65 LOGGER.info("Assembly of small cluster - making tmp files")
66
67 tmp_dir_root = tempfile.mkdtemp()
68 tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root)
69 nproc = get_max_proc()
70 prefixes = [[] for i in range(nproc)]
71 tmp_seq_files = [[] for i in range(nproc)]
72
73 LOGGER.info("Assembly of small clusters - saving small cluster to tmp files")
74 seq_dictionary = sequences.toDict()
75 fasta_count = 0
76 chunk_counter = itertools.cycle(range(nproc))
77 for index in range(len(clusters_info) + 1, hitsort.number_of_clusters):
78 chunk = next(chunk_counter)
79 ids = hitsort.get_cluster_reads(index)
80 if len(ids) < min_size_of_cluster_for_assembly:
81 break
82 prefixes[chunk].append("CL{}".format(index))
83 fasta_count += 1
84 if fasta_count > MAX_FASTA_IN_DIRECTORY:
85 # create new subdir to keep number of files in directory low
86 fasta_count = 1
87 tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root)
88 fasta_file_name = "{}/{}".format(tmp_subdir, index)
89 write_seqDict_to_fasta(file_name=fasta_file_name, sequences=seq_dictionary, subset=ids)
90 tmp_seq_files[chunk].append(fasta_file_name)
91 del seq_dictionary
92 LOGGER.info("Assembly of small clusters running")
93 pickled_fparts_small_contigs = parallel(cap3worker_multiple, tmp_seq_files, prefixes)
94 LOGGER.info("Assembly of small clusters done")
95 # append to all results
96 LOGGER.info("Assembly of small clusters - appending results")
97 all_con = {}
98 for fkey in config.CAP3_FILES_MAPPING:
99 if config.CAP3_FILES_MAPPING[fkey]:
100 all_con[fkey] = open("{}/{}".format(
101 assembly_dir, config.CAP3_FILES_MAPPING[fkey]), 'w')
102 else:
103 all_con[fkey] = None
104 for pickled_fparts_multiple in pickled_fparts_small_contigs:
105 with open(pickled_fparts_multiple, 'rb') as fp:
106 fparts_multiple = pickle.load(fp)
107 os.unlink(pickled_fparts_multiple)
108 for fparts in fparts_multiple:
109 for fkey in fparts:
110 if all_con[fkey]:
111 with open(fparts[fkey]) as f:
112 all_con[fkey].write(f.read())
113 os.unlink(fparts[fkey])
114 else:
115 os.unlink(fparts[fkey])
116
117 for fkey in all_con:
118 if all_con[fkey]:
119 all_con[fkey].close()
120 LOGGER.info("Assembling - copying files to final location")
121 # copy all contigs to one location: - contigs_file
122 with open(contigs_file, 'w') as fout:
123 for cl in clusters_info:
124 # append contig from large clusters
125 with open(cl.assembly_files["{}.{}.contigs"], 'r') as fin:
126 for i in fin:
127 fout.write(i)
128 # append contigs from small clusters
129 small_aln_file = "{}/{}".format(
130 assembly_dir, config.CAP3_FILES_MAPPING['{}.{}.aln'])
131 # calculate profile on aln file of small clusters
132 file_base_name = small_aln_file[:-4]
133 os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=small_aln_file, out=file_base_name))
134 os.system("select_and_sort_contigs.pl {fn}.info.fasta 5 2>&1".format(fn=file_base_name))
135 small_contig_file = file_base_name + ".info.fasta"
136 with open(small_contig_file, 'r') as fin:
137 for i in fin:
138 fout.write(i)
139 shutil.rmtree(tmp_dir_root)
140
141 def write_seqDict_to_fasta(file_name, sequences, subset):
142 with open(file_name, 'w') as f:
143 for i in subset:
144 f.write(">{}\n{}\n".format(i, sequences[i]))
145
146
147
148
149 def cap3worker(seqfile, prefix="cap", cap3args=" -p 80 -o 40 "):
150 prefix2 = "cap"
151 cmd = "cap3 " + seqfile + cap3args + " -x " + prefix2
152 with open(seqfile + "." + prefix2 + ".aln", "w") as aln:
153 subprocess.check_call(shlex.split(cmd), shell=False, stdout=aln)
154 # this generate three files
155 files_dict = {}
156 for fkey in config.CAP3_FILENAMES:
157 fn = fkey.format(seqfile, prefix2)
158 fn_tmp = "{}.tmp".format(fn)
159 files_dict[fkey] = fn
160 if config.CAP3_PATTERNS_REPLACE[fkey]:
161 pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey]
162 with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out:
163 for line in con_in:
164 con_out.write(line.replace(pattern, replacement.format(
165 prefix)))
166 os.rename(fn_tmp, fn)
167
168 # make new meaningful names here
169
170 for fkey in config.CAP3_FILES_GOODNAMES:
171 config.CAP3_FILES_GOODNAMES[fkey]
172 fn_goodname = os.path.dirname(files_dict[fkey]) + "/" + config.CAP3_FILES_GOODNAMES[fkey]
173 os.rename(files_dict[fkey], fn_goodname)
174 files_dict[fkey] = fn_goodname
175
176 aln_file = files_dict["{}.{}.aln"]
177 file_base_name = aln_file[:-4]
178 os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=aln_file, out=file_base_name))
179 os.system("select_and_sort_contigs.pl {fn}.info.fasta 5".format(fn=file_base_name))
180 # TODO -add new file to files_dict
181 # replace simple fasta with info.fasta
182 files_dict["{}.{}.contigs"] = file_base_name + ".info.fasta"
183 return files_dict
184
185 def cap3worker_multiple(many_seqfile, many_prefixes, cap3args=" -p 80 -o 40 "):
186 '''
187 purpose of this script is to run multiple assemblies within single process
188 avoiding running high number of short parallel subprocesses
189 As starting subprocess for each cap3 assembly was very ineffective,
190 all ap3 commands are written to single file and run using single subprocess
191 command -
192 '''
193 cmd_file = tempfile.NamedTemporaryFile(mode="w",delete=False).name
194 with open(cmd_file,'w') as cmdf:
195 for seqfile, prefix in zip(many_seqfile, many_prefixes):
196 cmd = "cap3 " + seqfile + cap3args + " -x " + prefix + " > " + seqfile + "." + prefix + ".aln\n"
197 cmdf.write(cmd)
198 os.system("sh "+ cmd_file)
199 # collect results:
200 files_dict_many = []
201 for seqfile, prefix in zip(many_seqfile, many_prefixes):
202 files_dict = {}
203 for fkey in config.CAP3_FILENAMES:
204 fn = fkey.format(seqfile, prefix)
205 fn_tmp = "{}.tmp".format(fn)
206 files_dict[fkey] = fn
207 if config.CAP3_PATTERNS_REPLACE[fkey]:
208 pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey]
209 with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out:
210 for line in con_in:
211 con_out.write(line.replace(pattern, replacement.format(
212 prefix)))
213 os.rename(fn_tmp, fn)
214 files_dict_many.append(files_dict)
215 # this is too large to be return directly - use picking
216 f = tempfile.NamedTemporaryFile(delete=False)
217 os.unlink(cmd_file)
218 pickle.dump(files_dict_many,file=f)
219 return f.name
220
221