Mercurial > repos > petr-novak > repeatrxplorer
comparison lib/assembly_tools.py @ 0:1d1b9e1b2e2f draft
Uploaded
author | petr-novak |
---|---|
date | Thu, 19 Dec 2019 10:24:45 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:1d1b9e1b2e2f |
---|---|
1 #!/usr/bin/env python3 | |
2 import sys | |
3 import logging | |
4 import subprocess | |
5 import os | |
6 import tempfile | |
7 import config | |
8 import shutil | |
9 import itertools | |
10 import pickle | |
11 import shlex | |
12 from lib.parallel.parallel import parallel2 as parallel | |
13 from lib.parallel.parallel import get_max_proc | |
14 | |
15 REQUIRED_VERSION = (3, 4) | |
16 MAX_BUFFER_SIZE = 100000 | |
17 if sys.version_info < REQUIRED_VERSION: | |
18 raise Exception("\n\npython 3.4 or higher is required!\n") | |
19 LOGGER = logging.getLogger(__name__) | |
20 MAX_FASTA_IN_DIRECTORY = 1000 | |
21 | |
22 def assembly(sequences, hitsort, clusters_info, assembly_dir, | |
23 contigs_file, min_size_of_cluster_for_assembly = 0): | |
24 ''' | |
25 Runs assembly on sequences (SequenceSet). Assembly is | |
26 performed on each cluster separatelly, clusters are taken | |
27 from hitsort(Graph) | |
28 Cluster_listing - list of Clusters | |
29 if cluster.tandem_rank is 1 or 2 - no assembly is performed!! | |
30 ''' | |
31 | |
32 # iterate over large clusters, assembly is performed on sequences stored in | |
33 # cluster_little[index].fasta_file_full - for annotated clusters | |
34 # sequences of small clusters are retrieved from database | |
35 fasta_seqs = [ | |
36 i.fasta_file_full | |
37 for i in clusters_info | |
38 if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS | |
39 ] | |
40 prefixes = ["CL{}".format(i.index) | |
41 for i in clusters_info | |
42 if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS] | |
43 LOGGER.info("Number of clusters for assembly: {}".format(hitsort.number_of_clusters)) | |
44 LOGGER.info("Assembling large clusters") | |
45 assembly_files = parallel(cap3worker, fasta_seqs, prefixes) | |
46 LOGGER.info("Large clusters assembled") | |
47 # some clusters - tanem rank 1 assembled | |
48 j = 0 | |
49 for cl in clusters_info: | |
50 if cl.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS: | |
51 cl.assembly_files = {i: None for i in config.CAP3_FILES_MAPPING} | |
52 consensus_file = cl.dir + "/tarean_contigs.fasta" | |
53 cl.assembly_files["{}.{}.contigs"] = consensus_file | |
54 with open(cl.dir_tarean + "/tarean_contigs.fasta", | |
55 'r') as fin, open(consensus_file, 'w') as fout: | |
56 for line in fin: | |
57 if line[0] == ">": | |
58 line = ">CL{}Contig{}".format(cl.index, line[1:]) | |
59 fout.write(line) | |
60 else: | |
61 cl.assembly_files = assembly_files[j] | |
62 j += 1 | |
63 # assembly of small clusters: | |
64 # connection to files were results will be concatenated | |
65 LOGGER.info("Assembly of small cluster - making tmp files") | |
66 | |
67 tmp_dir_root = tempfile.mkdtemp() | |
68 tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root) | |
69 nproc = get_max_proc() | |
70 prefixes = [[] for i in range(nproc)] | |
71 tmp_seq_files = [[] for i in range(nproc)] | |
72 | |
73 LOGGER.info("Assembly of small clusters - saving small cluster to tmp files") | |
74 seq_dictionary = sequences.toDict() | |
75 fasta_count = 0 | |
76 chunk_counter = itertools.cycle(range(nproc)) | |
77 for index in range(len(clusters_info) + 1, hitsort.number_of_clusters): | |
78 chunk = next(chunk_counter) | |
79 ids = hitsort.get_cluster_reads(index) | |
80 if len(ids) < min_size_of_cluster_for_assembly: | |
81 break | |
82 prefixes[chunk].append("CL{}".format(index)) | |
83 fasta_count += 1 | |
84 if fasta_count > MAX_FASTA_IN_DIRECTORY: | |
85 # create new subdir to keep number of files in directory low | |
86 fasta_count = 1 | |
87 tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root) | |
88 fasta_file_name = "{}/{}".format(tmp_subdir, index) | |
89 write_seqDict_to_fasta(file_name=fasta_file_name, sequences=seq_dictionary, subset=ids) | |
90 tmp_seq_files[chunk].append(fasta_file_name) | |
91 del seq_dictionary | |
92 LOGGER.info("Assembly of small clusters running") | |
93 pickled_fparts_small_contigs = parallel(cap3worker_multiple, tmp_seq_files, prefixes) | |
94 LOGGER.info("Assembly of small clusters done") | |
95 # append to all results | |
96 LOGGER.info("Assembly of small clusters - appending results") | |
97 all_con = {} | |
98 for fkey in config.CAP3_FILES_MAPPING: | |
99 if config.CAP3_FILES_MAPPING[fkey]: | |
100 all_con[fkey] = open("{}/{}".format( | |
101 assembly_dir, config.CAP3_FILES_MAPPING[fkey]), 'w') | |
102 else: | |
103 all_con[fkey] = None | |
104 for pickled_fparts_multiple in pickled_fparts_small_contigs: | |
105 with open(pickled_fparts_multiple, 'rb') as fp: | |
106 fparts_multiple = pickle.load(fp) | |
107 os.unlink(pickled_fparts_multiple) | |
108 for fparts in fparts_multiple: | |
109 for fkey in fparts: | |
110 if all_con[fkey]: | |
111 with open(fparts[fkey]) as f: | |
112 all_con[fkey].write(f.read()) | |
113 os.unlink(fparts[fkey]) | |
114 else: | |
115 os.unlink(fparts[fkey]) | |
116 | |
117 for fkey in all_con: | |
118 if all_con[fkey]: | |
119 all_con[fkey].close() | |
120 LOGGER.info("Assembling - copying files to final location") | |
121 # copy all contigs to one location: - contigs_file | |
122 with open(contigs_file, 'w') as fout: | |
123 for cl in clusters_info: | |
124 # append contig from large clusters | |
125 with open(cl.assembly_files["{}.{}.contigs"], 'r') as fin: | |
126 for i in fin: | |
127 fout.write(i) | |
128 # append contigs from small clusters | |
129 small_aln_file = "{}/{}".format( | |
130 assembly_dir, config.CAP3_FILES_MAPPING['{}.{}.aln']) | |
131 # calculate profile on aln file of small clusters | |
132 file_base_name = small_aln_file[:-4] | |
133 os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=small_aln_file, out=file_base_name)) | |
134 os.system("select_and_sort_contigs.pl {fn}.info.fasta 5 2>&1".format(fn=file_base_name)) | |
135 small_contig_file = file_base_name + ".info.fasta" | |
136 with open(small_contig_file, 'r') as fin: | |
137 for i in fin: | |
138 fout.write(i) | |
139 shutil.rmtree(tmp_dir_root) | |
140 | |
141 def write_seqDict_to_fasta(file_name, sequences, subset): | |
142 with open(file_name, 'w') as f: | |
143 for i in subset: | |
144 f.write(">{}\n{}\n".format(i, sequences[i])) | |
145 | |
146 | |
147 | |
148 | |
149 def cap3worker(seqfile, prefix="cap", cap3args=" -p 80 -o 40 "): | |
150 prefix2 = "cap" | |
151 cmd = "cap3 " + seqfile + cap3args + " -x " + prefix2 | |
152 with open(seqfile + "." + prefix2 + ".aln", "w") as aln: | |
153 subprocess.check_call(shlex.split(cmd), shell=False, stdout=aln) | |
154 # this generate three files | |
155 files_dict = {} | |
156 for fkey in config.CAP3_FILENAMES: | |
157 fn = fkey.format(seqfile, prefix2) | |
158 fn_tmp = "{}.tmp".format(fn) | |
159 files_dict[fkey] = fn | |
160 if config.CAP3_PATTERNS_REPLACE[fkey]: | |
161 pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey] | |
162 with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out: | |
163 for line in con_in: | |
164 con_out.write(line.replace(pattern, replacement.format( | |
165 prefix))) | |
166 os.rename(fn_tmp, fn) | |
167 | |
168 # make new meaningful names here | |
169 | |
170 for fkey in config.CAP3_FILES_GOODNAMES: | |
171 config.CAP3_FILES_GOODNAMES[fkey] | |
172 fn_goodname = os.path.dirname(files_dict[fkey]) + "/" + config.CAP3_FILES_GOODNAMES[fkey] | |
173 os.rename(files_dict[fkey], fn_goodname) | |
174 files_dict[fkey] = fn_goodname | |
175 | |
176 aln_file = files_dict["{}.{}.aln"] | |
177 file_base_name = aln_file[:-4] | |
178 os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=aln_file, out=file_base_name)) | |
179 os.system("select_and_sort_contigs.pl {fn}.info.fasta 5".format(fn=file_base_name)) | |
180 # TODO -add new file to files_dict | |
181 # replace simple fasta with info.fasta | |
182 files_dict["{}.{}.contigs"] = file_base_name + ".info.fasta" | |
183 return files_dict | |
184 | |
185 def cap3worker_multiple(many_seqfile, many_prefixes, cap3args=" -p 80 -o 40 "): | |
186 ''' | |
187 purpose of this script is to run multiple assemblies within single process | |
188 avoiding running high number of short parallel subprocesses | |
189 As starting subprocess for each cap3 assembly was very ineffective, | |
190 all ap3 commands are written to single file and run using single subprocess | |
191 command - | |
192 ''' | |
193 cmd_file = tempfile.NamedTemporaryFile(mode="w",delete=False).name | |
194 with open(cmd_file,'w') as cmdf: | |
195 for seqfile, prefix in zip(many_seqfile, many_prefixes): | |
196 cmd = "cap3 " + seqfile + cap3args + " -x " + prefix + " > " + seqfile + "." + prefix + ".aln\n" | |
197 cmdf.write(cmd) | |
198 os.system("sh "+ cmd_file) | |
199 # collect results: | |
200 files_dict_many = [] | |
201 for seqfile, prefix in zip(many_seqfile, many_prefixes): | |
202 files_dict = {} | |
203 for fkey in config.CAP3_FILENAMES: | |
204 fn = fkey.format(seqfile, prefix) | |
205 fn_tmp = "{}.tmp".format(fn) | |
206 files_dict[fkey] = fn | |
207 if config.CAP3_PATTERNS_REPLACE[fkey]: | |
208 pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey] | |
209 with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out: | |
210 for line in con_in: | |
211 con_out.write(line.replace(pattern, replacement.format( | |
212 prefix))) | |
213 os.rename(fn_tmp, fn) | |
214 files_dict_many.append(files_dict) | |
215 # this is too large to be return directly - use picking | |
216 f = tempfile.NamedTemporaryFile(delete=False) | |
217 os.unlink(cmd_file) | |
218 pickle.dump(files_dict_many,file=f) | |
219 return f.name | |
220 | |
221 |