0
|
1 #!/usr/bin/env python3
|
|
2 import sys
|
|
3 import logging
|
|
4 import subprocess
|
|
5 import os
|
|
6 import tempfile
|
|
7 import config
|
|
8 import shutil
|
|
9 import itertools
|
|
10 import pickle
|
|
11 import shlex
|
|
12 from lib.parallel.parallel import parallel2 as parallel
|
|
13 from lib.parallel.parallel import get_max_proc
|
|
14
|
|
15 REQUIRED_VERSION = (3, 4)
|
|
16 MAX_BUFFER_SIZE = 100000
|
|
17 if sys.version_info < REQUIRED_VERSION:
|
|
18 raise Exception("\n\npython 3.4 or higher is required!\n")
|
|
19 LOGGER = logging.getLogger(__name__)
|
|
20 MAX_FASTA_IN_DIRECTORY = 1000
|
|
21
|
|
22 def assembly(sequences, hitsort, clusters_info, assembly_dir,
|
|
23 contigs_file, min_size_of_cluster_for_assembly = 0):
|
|
24 '''
|
|
25 Runs assembly on sequences (SequenceSet). Assembly is
|
|
26 performed on each cluster separatelly, clusters are taken
|
|
27 from hitsort(Graph)
|
|
28 Cluster_listing - list of Clusters
|
|
29 if cluster.tandem_rank is 1 or 2 - no assembly is performed!!
|
|
30 '''
|
|
31
|
|
32 # iterate over large clusters, assembly is performed on sequences stored in
|
|
33 # cluster_little[index].fasta_file_full - for annotated clusters
|
|
34 # sequences of small clusters are retrieved from database
|
|
35 fasta_seqs = [
|
|
36 i.fasta_file_full
|
|
37 for i in clusters_info
|
|
38 if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS
|
|
39 ]
|
|
40 prefixes = ["CL{}".format(i.index)
|
|
41 for i in clusters_info
|
|
42 if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS]
|
|
43 LOGGER.info("Number of clusters for assembly: {}".format(hitsort.number_of_clusters))
|
|
44 LOGGER.info("Assembling large clusters")
|
|
45 assembly_files = parallel(cap3worker, fasta_seqs, prefixes)
|
|
46 LOGGER.info("Large clusters assembled")
|
|
47 # some clusters - tanem rank 1 assembled
|
|
48 j = 0
|
|
49 for cl in clusters_info:
|
|
50 if cl.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS:
|
|
51 cl.assembly_files = {i: None for i in config.CAP3_FILES_MAPPING}
|
|
52 consensus_file = cl.dir + "/tarean_contigs.fasta"
|
|
53 cl.assembly_files["{}.{}.contigs"] = consensus_file
|
|
54 with open(cl.dir_tarean + "/tarean_contigs.fasta",
|
|
55 'r') as fin, open(consensus_file, 'w') as fout:
|
|
56 for line in fin:
|
|
57 if line[0] == ">":
|
|
58 line = ">CL{}Contig{}".format(cl.index, line[1:])
|
|
59 fout.write(line)
|
|
60 else:
|
|
61 cl.assembly_files = assembly_files[j]
|
|
62 j += 1
|
|
63 # assembly of small clusters:
|
|
64 # connection to files were results will be concatenated
|
|
65 LOGGER.info("Assembly of small cluster - making tmp files")
|
|
66
|
|
67 tmp_dir_root = tempfile.mkdtemp()
|
|
68 tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root)
|
|
69 nproc = get_max_proc()
|
|
70 prefixes = [[] for i in range(nproc)]
|
|
71 tmp_seq_files = [[] for i in range(nproc)]
|
|
72
|
|
73 LOGGER.info("Assembly of small clusters - saving small cluster to tmp files")
|
|
74 seq_dictionary = sequences.toDict()
|
|
75 fasta_count = 0
|
|
76 chunk_counter = itertools.cycle(range(nproc))
|
|
77 for index in range(len(clusters_info) + 1, hitsort.number_of_clusters):
|
|
78 chunk = next(chunk_counter)
|
|
79 ids = hitsort.get_cluster_reads(index)
|
|
80 if len(ids) < min_size_of_cluster_for_assembly:
|
|
81 break
|
|
82 prefixes[chunk].append("CL{}".format(index))
|
|
83 fasta_count += 1
|
|
84 if fasta_count > MAX_FASTA_IN_DIRECTORY:
|
|
85 # create new subdir to keep number of files in directory low
|
|
86 fasta_count = 1
|
|
87 tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root)
|
|
88 fasta_file_name = "{}/{}".format(tmp_subdir, index)
|
|
89 write_seqDict_to_fasta(file_name=fasta_file_name, sequences=seq_dictionary, subset=ids)
|
|
90 tmp_seq_files[chunk].append(fasta_file_name)
|
|
91 del seq_dictionary
|
|
92 LOGGER.info("Assembly of small clusters running")
|
|
93 pickled_fparts_small_contigs = parallel(cap3worker_multiple, tmp_seq_files, prefixes)
|
|
94 LOGGER.info("Assembly of small clusters done")
|
|
95 # append to all results
|
|
96 LOGGER.info("Assembly of small clusters - appending results")
|
|
97 all_con = {}
|
|
98 for fkey in config.CAP3_FILES_MAPPING:
|
|
99 if config.CAP3_FILES_MAPPING[fkey]:
|
|
100 all_con[fkey] = open("{}/{}".format(
|
|
101 assembly_dir, config.CAP3_FILES_MAPPING[fkey]), 'w')
|
|
102 else:
|
|
103 all_con[fkey] = None
|
|
104 for pickled_fparts_multiple in pickled_fparts_small_contigs:
|
|
105 with open(pickled_fparts_multiple, 'rb') as fp:
|
|
106 fparts_multiple = pickle.load(fp)
|
|
107 os.unlink(pickled_fparts_multiple)
|
|
108 for fparts in fparts_multiple:
|
|
109 for fkey in fparts:
|
|
110 if all_con[fkey]:
|
|
111 with open(fparts[fkey]) as f:
|
|
112 all_con[fkey].write(f.read())
|
|
113 os.unlink(fparts[fkey])
|
|
114 else:
|
|
115 os.unlink(fparts[fkey])
|
|
116
|
|
117 for fkey in all_con:
|
|
118 if all_con[fkey]:
|
|
119 all_con[fkey].close()
|
|
120 LOGGER.info("Assembling - copying files to final location")
|
|
121 # copy all contigs to one location: - contigs_file
|
|
122 with open(contigs_file, 'w') as fout:
|
|
123 for cl in clusters_info:
|
|
124 # append contig from large clusters
|
|
125 with open(cl.assembly_files["{}.{}.contigs"], 'r') as fin:
|
|
126 for i in fin:
|
|
127 fout.write(i)
|
|
128 # append contigs from small clusters
|
|
129 small_aln_file = "{}/{}".format(
|
|
130 assembly_dir, config.CAP3_FILES_MAPPING['{}.{}.aln'])
|
|
131 # calculate profile on aln file of small clusters
|
|
132 file_base_name = small_aln_file[:-4]
|
|
133 os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=small_aln_file, out=file_base_name))
|
|
134 os.system("select_and_sort_contigs.pl {fn}.info.fasta 5 2>&1".format(fn=file_base_name))
|
|
135 small_contig_file = file_base_name + ".info.fasta"
|
|
136 with open(small_contig_file, 'r') as fin:
|
|
137 for i in fin:
|
|
138 fout.write(i)
|
|
139 shutil.rmtree(tmp_dir_root)
|
|
140
|
|
141 def write_seqDict_to_fasta(file_name, sequences, subset):
|
|
142 with open(file_name, 'w') as f:
|
|
143 for i in subset:
|
|
144 f.write(">{}\n{}\n".format(i, sequences[i]))
|
|
145
|
|
146
|
|
147
|
|
148
|
|
149 def cap3worker(seqfile, prefix="cap", cap3args=" -p 80 -o 40 "):
|
|
150 prefix2 = "cap"
|
|
151 cmd = "cap3 " + seqfile + cap3args + " -x " + prefix2
|
|
152 with open(seqfile + "." + prefix2 + ".aln", "w") as aln:
|
|
153 subprocess.check_call(shlex.split(cmd), shell=False, stdout=aln)
|
|
154 # this generate three files
|
|
155 files_dict = {}
|
|
156 for fkey in config.CAP3_FILENAMES:
|
|
157 fn = fkey.format(seqfile, prefix2)
|
|
158 fn_tmp = "{}.tmp".format(fn)
|
|
159 files_dict[fkey] = fn
|
|
160 if config.CAP3_PATTERNS_REPLACE[fkey]:
|
|
161 pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey]
|
|
162 with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out:
|
|
163 for line in con_in:
|
|
164 con_out.write(line.replace(pattern, replacement.format(
|
|
165 prefix)))
|
|
166 os.rename(fn_tmp, fn)
|
|
167
|
|
168 # make new meaningful names here
|
|
169
|
|
170 for fkey in config.CAP3_FILES_GOODNAMES:
|
|
171 config.CAP3_FILES_GOODNAMES[fkey]
|
|
172 fn_goodname = os.path.dirname(files_dict[fkey]) + "/" + config.CAP3_FILES_GOODNAMES[fkey]
|
|
173 os.rename(files_dict[fkey], fn_goodname)
|
|
174 files_dict[fkey] = fn_goodname
|
|
175
|
|
176 aln_file = files_dict["{}.{}.aln"]
|
|
177 file_base_name = aln_file[:-4]
|
|
178 os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=aln_file, out=file_base_name))
|
|
179 os.system("select_and_sort_contigs.pl {fn}.info.fasta 5".format(fn=file_base_name))
|
|
180 # TODO -add new file to files_dict
|
|
181 # replace simple fasta with info.fasta
|
|
182 files_dict["{}.{}.contigs"] = file_base_name + ".info.fasta"
|
|
183 return files_dict
|
|
184
|
|
185 def cap3worker_multiple(many_seqfile, many_prefixes, cap3args=" -p 80 -o 40 "):
|
|
186 '''
|
|
187 purpose of this script is to run multiple assemblies within single process
|
|
188 avoiding running high number of short parallel subprocesses
|
|
189 As starting subprocess for each cap3 assembly was very ineffective,
|
|
190 all ap3 commands are written to single file and run using single subprocess
|
|
191 command -
|
|
192 '''
|
|
193 cmd_file = tempfile.NamedTemporaryFile(mode="w",delete=False).name
|
|
194 with open(cmd_file,'w') as cmdf:
|
|
195 for seqfile, prefix in zip(many_seqfile, many_prefixes):
|
|
196 cmd = "cap3 " + seqfile + cap3args + " -x " + prefix + " > " + seqfile + "." + prefix + ".aln\n"
|
|
197 cmdf.write(cmd)
|
|
198 os.system("sh "+ cmd_file)
|
|
199 # collect results:
|
|
200 files_dict_many = []
|
|
201 for seqfile, prefix in zip(many_seqfile, many_prefixes):
|
|
202 files_dict = {}
|
|
203 for fkey in config.CAP3_FILENAMES:
|
|
204 fn = fkey.format(seqfile, prefix)
|
|
205 fn_tmp = "{}.tmp".format(fn)
|
|
206 files_dict[fkey] = fn
|
|
207 if config.CAP3_PATTERNS_REPLACE[fkey]:
|
|
208 pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey]
|
|
209 with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out:
|
|
210 for line in con_in:
|
|
211 con_out.write(line.replace(pattern, replacement.format(
|
|
212 prefix)))
|
|
213 os.rename(fn_tmp, fn)
|
|
214 files_dict_many.append(files_dict)
|
|
215 # this is too large to be return directly - use picking
|
|
216 f = tempfile.NamedTemporaryFile(delete=False)
|
|
217 os.unlink(cmd_file)
|
|
218 pickle.dump(files_dict_many,file=f)
|
|
219 return f.name
|
|
220
|
|
221
|