Mercurial > repos > petr-novak > repeatrxplorer
comparison lib/seqtools.py @ 0:1d1b9e1b2e2f draft
Uploaded
author | petr-novak |
---|---|
date | Thu, 19 Dec 2019 10:24:45 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:1d1b9e1b2e2f |
---|---|
1 #!/usr/bin/env python3 | |
2 import logging | |
3 logger = logging.getLogger(__name__) | |
4 import itertools | |
5 import os | |
6 import sys | |
7 import random | |
8 import sqlite3 | |
9 import subprocess | |
10 import shlex # for command line arguments split | |
11 from collections import namedtuple | |
12 from lib.utils import format_query | |
13 from lib.parallel.parallel import parallel2 as parallel | |
14 from lib.parallel.parallel import get_max_proc | |
15 REQUIRED_VERSION = (3, 4) | |
16 MAX_PRINT = 10 | |
17 | |
18 if sys.version_info < REQUIRED_VERSION: | |
19 raise Exception("\n\npython 3.4 or higher is required!\n") | |
20 | |
21 # additional functions | |
22 | |
23 | |
24 | |
25 def _hitsort_worker(query, blastdb, output, options): | |
26 | |
27 # output from blast is parsed from stdout | |
28 cmd = options.all2all_search_params.format(query = query, blastdb = blastdb) | |
29 print(cmd) | |
30 min_lcov, min_pid, min_ovl, min_scov, evalue_max = options.filtering_threshold | |
31 pair2insert = '' | |
32 signs ={'plus':'+', 'minus':'-'} | |
33 with open(output, 'w') as f: | |
34 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p: | |
35 for i in p.stdout: | |
36 items = i.decode('utf-8').split() | |
37 if options.filter_self_hits: | |
38 if items[4] == items[0]: | |
39 continue | |
40 evalue = float(items[10]) | |
41 ovl_q = abs(float(items[2]) - float(items[3])) + 1 | |
42 ovl_h = abs(float(items[6]) - float(items[7])) + 1 | |
43 if (ovl_q >= min_ovl or ovl_h >= min_ovl) and float(items[8]) >= min_pid: | |
44 if float(items[1]) >= float(items[5]): | |
45 lcov = ovl_q * 100.0 / float(items[1]) | |
46 scov = ovl_h * 100.0 / float(items[5]) | |
47 else: | |
48 lcov = ovl_q * 100.0 / float(items[5]) | |
49 scov = ovl_h * 100.0 / float(items[1]) | |
50 # positive line: | |
51 if lcov >= min_lcov and scov >= min_scov and evalue_max > evalue: | |
52 pr = [items[0], items[4]] | |
53 # previous HSP | |
54 if pair2insert != "{0}\t{1}".format(pr[0], pr[1]): | |
55 pair2insert = "{0}\t{1}".format(pr[0], pr[1]) | |
56 if items[11] in ['plus', 'minus']: | |
57 items[11] = signs[items[11]] | |
58 f.write("{0}\t{1}\t{2}\n".format(pair2insert, items[9], "\t".join( | |
59 items[k] for k in [1, 2, 3, 5, 6, 7, 8, 10, 11]))) | |
60 | |
61 def blast_with_filter(fasta_file_filter, blastdb): | |
62 ''' | |
63 Return list of sequences query id which has similarity to filter | |
64 It uses mgblast for search | |
65 ''' | |
66 params = '-p 85 -W18 -UT -X40 -KT -JF -F "m D" -v100000000 -b100000000 -D4 -C 30 -H 30' | |
67 positive_hits = set() | |
68 min_pid = 90 | |
69 min_ovl_perc = 90 | |
70 cmd = " ".join(['mgblast', | |
71 '-i', fasta_file_filter, | |
72 '-d', blastdb, | |
73 params | |
74 ]) | |
75 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p: | |
76 for i in p.stdout: | |
77 items = i.decode('utf-8').split() | |
78 ovl_perc = (abs(float(items[6]) - float(items[7])) + 1) / float(items[5]) * 100 | |
79 pid = float(items[8]) | |
80 if pid > min_pid and ovl_perc > min_ovl_perc: | |
81 positive_hits.add(items[4]) | |
82 return(positive_hits) | |
83 | |
84 | |
85 | |
86 # TODO test task | |
87 # predefine blast params | |
88 def blast_worker(query, blastdb, output, params): | |
89 if params['program'] in ['blastx', 'blastn']: | |
90 default_columns = ' -outfmt "6 {}"'.format(params['output_columns']) | |
91 cmd = "{} -query {} -db {} {} {}".format( | |
92 params['program'], query, blastdb, params['args'], default_columns) | |
93 elif params['program']=='diamond blastx': | |
94 # diomand have slightly different format than blastx | |
95 default_columns = ' --outfmt 6 {}'.format(params['output_columns']) | |
96 cmd = "{} -q {} -d {} {} {}".format( | |
97 params['program'], query, blastdb, params['args'], default_columns) | |
98 print(cmd) | |
99 preceeding_pair = ['', ''] | |
100 BlastValues = namedtuple("BlastValues", params['output_columns']) | |
101 with open(output, 'wb') as f: | |
102 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p: | |
103 for i in p.stdout: | |
104 items = i.decode('utf-8').split() | |
105 blast_values = BlastValues(*[f(i) for i, f in zip(items, params['column_types'])]) | |
106 #qseqid, sseqid, qlen, slen, length, ppos, bitscore = [ | |
107 # f(i) for i, f in zip(items, params['column_types'])] | |
108 if params['filter_function'](blast_values): | |
109 if preceeding_pair != [blast_values.qseqid, blast_values.sseqid]: | |
110 f.write(i) | |
111 preceeding_pair = [blast_values.qseqid, blast_values.sseqid] | |
112 | |
113 | |
114 class Sequence: | |
115 | |
116 def __init__(self, seq, name, paired=False): | |
117 # the mode os seq storage can be changed later to make it more | |
118 # memory efficient | |
119 self._seq = bytes(str(seq), "ascii") | |
120 self.name = str(name) | |
121 | |
122 @property | |
123 def seq(self): | |
124 return self._seq.decode("utf-8") | |
125 | |
126 @seq.setter | |
127 def seq(self, value): | |
128 self._seq = bytes(str(value), "ascii") | |
129 | |
130 def __str__(self): | |
131 return "{0} : {1}".format(self.name, self.seq) | |
132 | |
133 @staticmethod | |
134 def read_fasta(fasta_file_name): | |
135 ''' | |
136 generator - reads sequences from fasta file | |
137 return sequence one by one | |
138 ''' | |
139 with open(fasta_file_name, 'r') as f: | |
140 header = None | |
141 seqstr = None | |
142 for rawline in f: | |
143 line = rawline.strip() | |
144 if line == "": | |
145 continue | |
146 if line[0] == ">": | |
147 if header and seqstr: | |
148 yield Sequence(seqstr, header) | |
149 # reset | |
150 seqstr = None | |
151 header = line[1:] | |
152 elif seqstr: | |
153 Warning("sequence was not preceeded by header") | |
154 else: | |
155 header = line[1:] | |
156 else: | |
157 seqstr = line if not seqstr else seqstr + line | |
158 # skip empty lines: | |
159 if header and seqstr: | |
160 yield Sequence(seqstr, header) | |
161 return | |
162 | |
163 def write2fasta(self, file_object): | |
164 file_object.write(">{0}\n{1}\n".format(self.name, self.seq)) | |
165 | |
166 | |
167 class SequenceSet: | |
168 | |
169 def __init__(self, filename=None, source=None, sample_size=0, new=False, paired=False, prefix_length=0, rename=False, seed=123, fasta=None): | |
170 ''' | |
171 filename: path to sqlite database, if none database is stored in memory | |
172 source: path to fasta file, if none empty database is created | |
173 sample_size: use only sample of sample_size from fasta file, if 0 all is used | |
174 new: should be old database created? | |
175 paired - paired reads | |
176 prefix_length -int | |
177 rename -- use int as names | |
178 | |
179 creates SequenceSet, either empty or from fasta file, | |
180 it can be stored as dictionary or in shelve in file | |
181 only sample of all sequences can be loaded. if sample_size = 0 | |
182 all sequences are loaded | |
183 if source is give, new database is created - if filename exist - it will be deleted! | |
184 ''' | |
185 #====================================================================== | |
186 # TODO name checking | |
187 # TODO detect unique prefixes | |
188 #====================================================================== | |
189 # type of storage | |
190 random.seed(seed) | |
191 | |
192 self.source = source | |
193 self.sample_size = sample_size | |
194 self.paired = paired | |
195 self.filename = filename | |
196 self.prefix_length = prefix_length | |
197 self.prefix_codes = {} | |
198 self.rename = rename | |
199 self.fasta = fasta | |
200 self._length = None | |
201 self._original_length = None | |
202 self.blastdb = None | |
203 self.blastdb_legacy = None | |
204 self.chunks = None | |
205 self.ids_kept = None | |
206 self.annotations = [] | |
207 self.hitsort = None | |
208 if filename: | |
209 logger.debug("Creating database in file") | |
210 if os.path.isfile(filename) and (new or source): | |
211 # remove old database if exists | |
212 os.remove(filename) | |
213 # sqlite database in file | |
214 self.conn = sqlite3.connect(filename) | |
215 else: | |
216 # sqlite database in memory | |
217 logger.debug("Creating database in memory") | |
218 self.conn = sqlite3.connect(":memory:") | |
219 c = self.conn.cursor() | |
220 c.execute("create table sequences (name text, sequence text)") | |
221 | |
222 if source: | |
223 self._read_from_fasta() | |
224 c.execute("CREATE TABLE prefix_codes (prefix TEXT, N INTEGER)") | |
225 self.conn.executemany("INSERT INTO prefix_codes (prefix, N) values (?,?)", self.prefix_codes.items()) | |
226 self.conn.commit() | |
227 self._commit_immediately = True | |
228 self.makeblastdb(legacy=True) | |
229 # store some attribudes in database | |
230 self._update_database() | |
231 logger.info("sequences loaded") | |
232 logger.info(self) | |
233 | |
234 def _update_database(self): | |
235 # pass all atributes - which are either float, ind and str to extra table | |
236 c = self.conn.cursor() | |
237 stored_attributes = [ | |
238 ("sample_size", "integer"), | |
239 ("_length", "integer"), | |
240 ("_original_length", "integer"), | |
241 ("paired", "integer"), | |
242 ] | |
243 columns = ", ".join(i[0] + " " + i[1] for i in stored_attributes) | |
244 try: | |
245 c.execute(( | |
246 "CREATE TABLE seqinfo ( {} )".format(columns) | |
247 )) | |
248 except sqlite3.OperationalError: | |
249 pass # table already exists | |
250 # get current values | |
251 values = tuple(getattr(self, i[0]) for i in stored_attributes) | |
252 placeholder = ", ".join(["?"] * len(values)) | |
253 c.execute("DELETE FROM seqinfo") | |
254 c.execute("INSERT INTO seqinfo VALUES ({})".format(placeholder), values) | |
255 | |
256 | |
257 def _read_from_fasta(self): | |
258 # changed will be commited after whole file is loaded - this is faster | |
259 self._commit_immediately = False | |
260 if self.fasta: | |
261 f = open(self.fasta, mode='w') | |
262 # define sampling | |
263 if self.sample_size: | |
264 logger.debug( | |
265 'sampling sequences: {} sample size'.format(self.sample_size)) | |
266 N = self.fasta_length(self.source) | |
267 if self.paired: | |
268 s = itertools.chain( | |
269 [[i, i + 1] for i in sorted(random.sample(range(1, N, 2), int(self.sample_size / 2)))]) | |
270 | |
271 sample = itertools.chain( | |
272 [item for sublist in s for item in sublist]) | |
273 | |
274 else: | |
275 sample = itertools.chain( | |
276 sorted(random.sample(range(1, N + 1), self.sample_size))) | |
277 logger.debug("sampling unpaired reads") | |
278 else: | |
279 # no sampling - use all | |
280 sample = itertools.count(1) | |
281 | |
282 # define counter: | |
283 if self.rename: | |
284 if self.paired: | |
285 counter = itertools.count(1, 0.5) | |
286 else: | |
287 counter = itertools.count(1, 1) | |
288 else: | |
289 counter = itertools.cycle([""]) | |
290 # define pairs: | |
291 if self.paired: | |
292 pair = itertools.cycle(['f', 'r']) | |
293 else: | |
294 pair = itertools.cycle(['']) | |
295 position = 0 | |
296 | |
297 seq2write = next(sample) | |
298 | |
299 for i in Sequence.read_fasta(self.source): | |
300 position += 1 | |
301 if position == seq2write: | |
302 | |
303 # create name: | |
304 prefix = i.name[0:self.prefix_length] # could be empty "" | |
305 if self.rename: | |
306 i.name = "{0}{1}{2}".format( | |
307 prefix, | |
308 int(next(counter)), | |
309 next(pair) | |
310 ) | |
311 if self.prefix_length: | |
312 if prefix in self.prefix_codes: | |
313 self.prefix_codes[prefix] += 1 | |
314 else: | |
315 self.prefix_codes[prefix] = 1 | |
316 self[i.name] = i.seq | |
317 if self.fasta: | |
318 i.write2fasta(f) | |
319 try: | |
320 seq2write = next(sample) | |
321 except StopIteration: | |
322 # no more sequences to sample | |
323 break | |
324 | |
325 | |
326 self.conn.commit() # save changes | |
327 if self.fasta: | |
328 f.close() | |
329 self._commit_immediately = True | |
330 | |
331 def __getitem__(self, item): | |
332 # item cannot be empty!!! | |
333 c = self.conn.cursor() | |
334 c.execute("select * from sequences where name= '{}'".format(item)) | |
335 # try: | |
336 name, sequence = c.fetchone() # should be only one | |
337 # except TypeError: | |
338 # return None | |
339 return sequence | |
340 | |
341 def __setitem__(self, item, value): | |
342 c = self.conn.cursor() | |
343 c.execute( | |
344 "insert into sequences values ( '{0}', '{1}')".format(item, value)) | |
345 if self._commit_immediately: | |
346 self.conn.commit() # save changes | |
347 | |
348 # def __iter__(self): | |
349 # self.c.execute("select name from sequences") | |
350 # for i in self.c: | |
351 # yield i[0] | |
352 | |
353 def __iter__(self): | |
354 c = self.conn.cursor() | |
355 c.execute("select name from sequences") | |
356 for i in c: | |
357 yield i[0] | |
358 | |
359 | |
360 # def __iter__(self): | |
361 # for i in range(1,len(self)): | |
362 # yield i | |
363 def items(self): | |
364 c = self.conn.cursor() | |
365 c.execute("select name, sequence from sequences") | |
366 for name, seq in c: | |
367 yield Sequence(seq, name) | |
368 | |
369 def toDict(self): | |
370 c = self.conn.cursor() | |
371 c.execute("select name, sequence from sequences") | |
372 d = {} | |
373 for name, seq in c: | |
374 d[name]=seq | |
375 return(d) | |
376 | |
377 def __len__(self): | |
378 c = self.conn.cursor() | |
379 c.execute("select count(*) from sequences") | |
380 length = c.fetchone()[0] | |
381 return(length) | |
382 | |
383 def __str__(self): | |
384 out = "" | |
385 c = self.conn.cursor() | |
386 c.execute("select * from sequences limit {0}".format(MAX_PRINT)) | |
387 for n, s in c: | |
388 out = out + "{0} : {1}\n".format(n, s) | |
389 out = out + "...\n" | |
390 out = out + str(len(self)) + " sequence total\n" | |
391 return out | |
392 | |
393 def insert(self, sequence, commit=True): | |
394 self._commit_immediately = commit | |
395 self[sequence.name] = sequence.seq | |
396 self._commit_immediately = True | |
397 | |
398 def keys(self): | |
399 ''' | |
400 this will get names of sequences | |
401 ''' | |
402 c = self.conn.cursor() | |
403 c.execute('select name from sequences order by rowid') | |
404 names = [] | |
405 for i in c.fetchall(): | |
406 names.append(i[0]) | |
407 return names | |
408 | |
409 def sample(self, size, seed=123): | |
410 ''' | |
411 generator - reproducible sampling sequences | |
412 ''' | |
413 max_index = len(self) | |
414 # sample = seqtools.SequenceSet(source=info.input_sequences, ) | |
415 if self.paired: | |
416 size = int(size / 2) | |
417 step = 2 | |
418 else: | |
419 step = 1 | |
420 random.seed(seed) | |
421 c = self.conn.cursor() | |
422 for i in sorted(random.sample(range(1, max_index, step), size)): | |
423 c.execute( | |
424 "select name, sequence from sequences where rowid={}".format(i)) | |
425 name, sequence = c.fetchone() | |
426 yield Sequence(sequence, name) | |
427 if self.paired: | |
428 c.execute( | |
429 "select name, sequence from sequences where rowid={}".format(i + 1)) | |
430 name, sequence = c.fetchone() | |
431 yield Sequence(sequence, name) | |
432 | |
433 def sample2db(self, db_file=None, fasta_file_name=None, size=100, seed=123): | |
434 if (not db_file) and (not fasta_file_name): | |
435 raise IOError("no db_file nor fasta_file_name were defined") | |
436 # open files | |
437 if db_file: | |
438 db = SequenceSet(filename=db_file, source=None, new=True) | |
439 if fasta_file_name: | |
440 f = open(fasta_file_name, 'w') | |
441 # write in files | |
442 for i in self.sample(size, seed): | |
443 if db_file: | |
444 db.insert(i, commit=False) | |
445 if fasta_file_name: | |
446 i.write2fasta(f) | |
447 # close files | |
448 if fasta_file_name: | |
449 f.close() | |
450 if db_file: | |
451 db.conn.commit() | |
452 return db | |
453 | |
454 def save2fasta(self, fasta_file_name, keep=True, subset=None): | |
455 if subset: | |
456 with open(fasta_file_name, 'w') as f: | |
457 c = self.conn.cursor() | |
458 c.execute("select name, sequence from sequences where name in ('{}')".format( | |
459 "','".join(subset))) | |
460 for name, sequence in c: | |
461 s = Sequence(sequence, name) | |
462 s.write2fasta(f) | |
463 else: | |
464 with open(fasta_file_name, 'w') as f: | |
465 for i in self.items(): | |
466 i.write2fasta(f) | |
467 if keep: | |
468 self.fasta = fasta_file_name | |
469 | |
470 def save_annotation(self, annotation_name, subset, dir): | |
471 annotation_file = "{}/{}_annotation.csv".format(dir, annotation_name) | |
472 with open(annotation_file, "w") as f: | |
473 c = self.conn.cursor() | |
474 c.execute( | |
475 "select * from {} where name in ('{}')".format( | |
476 annotation_name, "','".join(subset) | |
477 ) | |
478 ) | |
479 header = [description[0] for description in c.description] | |
480 f.write("\t".join(str(j) for j in header) + "\n") | |
481 for i in c: | |
482 f.write("\t".join(str(j) for j in i) + "\n") | |
483 return annotation_file | |
484 | |
485 def make_chunks(self, file_name=None, chunk_size=5000): | |
486 ''' | |
487 split SequneceSet to chunks and save to files, | |
488 return list of files names | |
489 ''' | |
490 logger.debug("creating chunks from fasta file: ") | |
491 if not file_name and self.fasta: | |
492 file_name = self.fasta | |
493 else: | |
494 raise Exception('file name for chunks is not defined!') | |
495 | |
496 seqs = iter(self.items()) | |
497 fn_chunk_names = [] | |
498 for i in itertools.count(): | |
499 try: | |
500 fn = "{0}.{1}".format(file_name, i) | |
501 logger.info("saving chunk {}".format(fn)) | |
502 for n in range(chunk_size): | |
503 s = next(seqs) | |
504 if not n: | |
505 f = open(fn, 'w') | |
506 s.write2fasta(f) | |
507 f.close() | |
508 fn_chunk_names.append(fn) | |
509 except StopIteration: | |
510 if not f.closed: | |
511 f.close() | |
512 fn_chunk_names.append(fn) | |
513 break | |
514 self.chunks = fn_chunk_names | |
515 logger.debug(self.chunks) | |
516 return(fn_chunk_names) | |
517 | |
518 def makeblastdb(self, blastdb=None, dbtype='nucl', legacy=False, lastdb = False): | |
519 ''' | |
520 dbtype eithe 'nucl' of 'prot' | |
521 ''' | |
522 logger.info("creating blast database") | |
523 # check if fasta file on disk is present | |
524 if not self.fasta: | |
525 try: | |
526 self.save2fasta(blastdb) | |
527 except TypeError: | |
528 print("\nEither blastdb or fasta must be ", | |
529 "defined when making blast database!!!\n") | |
530 raise | |
531 else: | |
532 blastdb = blastdb if blastdb else self.fasta | |
533 | |
534 cmd = "makeblastdb -in {0} -out {1} -dbtype {2}".format(self.fasta, | |
535 blastdb, | |
536 dbtype) | |
537 args = shlex.split(cmd) | |
538 if 0 == subprocess.check_call(args, stderr=sys.stdout): | |
539 self.blastdb = blastdb | |
540 else: | |
541 Warning("makeblastdb failed") | |
542 | |
543 if legacy: | |
544 logger.info("creating legacy blast database") | |
545 prot = {'nucl': 'F', 'prot': 'T'}[dbtype] | |
546 cmd = "formatdb -i {0} -p {1} -n {2}.legacy".format(self.fasta, | |
547 prot, | |
548 blastdb | |
549 ) | |
550 args = shlex.split(cmd) | |
551 if subprocess.check_call(args) == 0: | |
552 self.blastdb_legacy = "{}.legacy".format(blastdb) | |
553 else: | |
554 Warning("formatdb failed") | |
555 if lastdb: | |
556 logger.info("creating lastdb database") | |
557 cmd = "lastdb -u NEAR {fasta} {fasta}".format(fasta=self.fasta) | |
558 args = shlex.split(cmd) | |
559 if subprocess.check_call(args) == 0: | |
560 self.lastdb = self.fasta | |
561 else: | |
562 Warning("formatdb failed") | |
563 | |
564 | |
565 def create_hitsort(self, options, output=None): | |
566 ''' | |
567 query is name of files to search against blastdb of SequenceSet object | |
568 query could be multiple files | |
569 this is inteded to be used for tabular output only | |
570 output file must be specified, | |
571 if more than one query is used, multiple files are created | |
572 worker - function to execute blast search | |
573 ''' | |
574 | |
575 logger.info('running all to all blast') | |
576 query = self.chunks | |
577 if not output: | |
578 output = "{}.blast".format(self.fasta) | |
579 database = getattr(self, options.database) | |
580 if len(query) > 1: | |
581 output_parts = [output + "." + str(i) for i in range(len(query))] | |
582 else: | |
583 output_parts = [output] | |
584 query = [query] | |
585 | |
586 parallel(_hitsort_worker, query, [database], | |
587 output_parts, [options]) | |
588 logger.info('all to all blast finished') | |
589 logger.info('removing duplicates from all to all blast results') | |
590 self._clean_hitsort( | |
591 hitsort_files=output_parts, filtered_hitsort=output) | |
592 # remove temp files: | |
593 for f in output_parts: | |
594 os.unlink(f) | |
595 self.hitsort = output | |
596 return output | |
597 | |
598 def _clean_hitsort(self, hitsort_files, filtered_hitsort): | |
599 ''' alterantive hitsort duplicate removal ''' | |
600 | |
601 ids = self.keys() | |
602 Nfiles = 5 + int(sum([os.path.getsize(i) | |
603 for i in hitsort_files]) / 5e9) | |
604 hitsort_parts = [ | |
605 "{0}.{1}.parts".format(filtered_hitsort, i) for i in range(Nfiles)] | |
606 # open files | |
607 fs = [] | |
608 for i in hitsort_parts: | |
609 fs.append(open(i, 'w')) | |
610 | |
611 ids_destination = {} | |
612 fs_iter = itertools.cycle(fs) | |
613 for i in ids: | |
614 #ids_destination[i] = fs_iter.next() | |
615 ids_destination[i] = next(fs_iter) | |
616 lines_out = lines_total = 0 | |
617 temp_open = False | |
618 | |
619 for i in hitsort_files: | |
620 f = open(i, 'r') | |
621 while True: | |
622 line = f.readline() | |
623 if not line: | |
624 break | |
625 lines_total += 1 | |
626 line_items = line.split() | |
627 # ids_destination[line_items[0]].write(line) | |
628 # do not assume that it is sorted! | |
629 ids_destination[min(line_items[0:2])].write(line) | |
630 # close all files | |
631 for i in fs: | |
632 i.close() | |
633 | |
634 # load one by one and exclude duplicates: | |
635 hit_out = open(filtered_hitsort, 'w') | |
636 for i in hitsort_parts: | |
637 f = open(i, 'r') | |
638 hitsort_shelve = {} | |
639 while True: | |
640 line = f.readline() | |
641 if not line: | |
642 break | |
643 line_items = line.split() | |
644 key = "\t".join(sorted(line_items[0:2])) | |
645 value = line_items | |
646 bitscore = float(line_items[2]) | |
647 if key in hitsort_shelve: | |
648 if float(hitsort_shelve[key][2]) > bitscore: | |
649 hitsort_shelve[key] = value | |
650 hit_out.write("\t".join(hitsort_shelve[key]) + "\n") | |
651 del hitsort_shelve[key] | |
652 lines_out += 1 | |
653 else: | |
654 hitsort_shelve[key] = value | |
655 f.close() | |
656 for key in hitsort_shelve: | |
657 hit_out.write("\t".join(hitsort_shelve[key]) + "\n") | |
658 lines_out += 1 | |
659 # clean up | |
660 for i in hitsort_parts: | |
661 os.unlink(i) | |
662 hit_out.close() | |
663 return lines_total, lines_out | |
664 | |
665 def _check_database(self, database): | |
666 ''' | |
667 database is path to fastafile, database with same prefix should exist | |
668 it creates database if does not exist as temporal file! | |
669 ''' | |
670 pass | |
671 # TODO | |
672 | |
673 def remove_sequences_using_filter(self, fasta_file_filter, keep_proportion, | |
674 omitted_sequences_file, | |
675 kept_sequences_file): | |
676 ''' | |
677 Run blast with fasta_file_filter against legaci blastdb of SequenceSet to detect sequences | |
678 which are to be removed from SequencesSet. Complete pairsare removed if paired sequences | |
679 are used. | |
680 ''' | |
681 ids = blast_with_filter(fasta_file_filter, self.blastdb_legacy) | |
682 # check against database and remove sequences | |
683 | |
684 c = self.conn.cursor() | |
685 if self.paired: | |
686 c.execute("SELECT rowid FROM sequences WHERE name IN {}".format( | |
687 format_query(ids) | |
688 )) | |
689 odd_rowids = set() | |
690 for i in c.fetchall(): | |
691 if i[0] % 2 == 0: | |
692 odd_rowids.add(i[0] - 1) | |
693 else: | |
694 odd_rowids.add(i[0]) | |
695 odd_rowids_keep = random.sample(odd_rowids, round(keep_proportion * len(odd_rowids))) | |
696 c.execute("SELECT name FROM sequences WHERE rowid IN {} ORDER BY rowid".format( | |
697 format_query(odd_rowids_keep + [i+1 for i in odd_rowids_keep]) | |
698 )) | |
699 self.ids_kept = [i[0] for i in c.fetchall()] | |
700 odd_rowids_omit = list(odd_rowids.difference(odd_rowids_keep)) | |
701 c.execute("SELECT name FROM sequences WHERE rowid IN {} ORDER BY rowid".format( | |
702 format_query(odd_rowids_omit + [i+1 for i in odd_rowids_omit]) | |
703 )) | |
704 ids_omit = [i[0] for i in c.fetchall()] | |
705 | |
706 else: | |
707 self.ids_kept = random.sample(ids, round(keep_proportion * len(ids))) | |
708 ids_omit = list(ids.difference(self.ids_kept)) | |
709 self.save2fasta(omitted_sequences_file, keep=False, subset=ids_omit) | |
710 self.save2fasta(kept_sequences_file, keep=False, subset=self.ids_kept) | |
711 self.drop_sequences(ids_omit) | |
712 # save info about omited sequences | |
713 c.execute("CREATE TABLE ids_kept (name TEXT)") | |
714 c.executemany("INSERT INTO ids_kept (name) values (?)", [(i,) for i in self.ids_kept]) | |
715 return len(ids_omit) | |
716 | |
717 def drop_sequences(self, ids_to_drop): | |
718 ''' | |
719 remove sequences from sequence set based on the ID | |
720 ''' | |
721 self._original_length = len(self) | |
722 c = self.conn.cursor() | |
723 idslist = format_query(ids_to_drop) | |
724 c.execute("CREATE TABLE omited_sequences AS SELECT * FROM sequences WHERE name IN {}".format(idslist)) | |
725 c.execute("DELETE FROM sequences WHERE name IN {}".format( | |
726 idslist | |
727 )) | |
728 # new databases must be created - with ommited sequences! | |
729 self.save2fasta(fasta_file_name=self.fasta) ## this will replace origninal file! | |
730 self._length = len(self) | |
731 self.makeblastdb(legacy=True) | |
732 self._update_database() | |
733 | |
734 def annotate(self, database, annotation_name, directory="", params=""): | |
735 ''' | |
736 database : path to blast formated database | |
737 method: type of search to use for annotation. it must be | |
738 'blastn' of 'blastx' | |
739 Annotation is save in directory also into the table with name stored in | |
740 annotation_name variable | |
741 ''' | |
742 logger.info("annotating reads with {} ".format(annotation_name)) | |
743 self._check_database(database) | |
744 if "parallelize" in params: | |
745 parallelize = params['parallelize'] | |
746 else: | |
747 parallelize = True | |
748 if parallelize: | |
749 if not self.chunks: | |
750 self.make_chunks() | |
751 output = [ | |
752 "{}/{}_{}".format(directory, annotation_name,os.path.basename(i)) for i in self.chunks] | |
753 parallel(blast_worker, self.chunks, [database], output, [params]) | |
754 else: | |
755 # no explicit parallelization using parallel | |
756 single_output = "{}/{}_hits".format(directory, annotation_name) | |
757 params['args'] = params['args'].format(max_proc=get_max_proc()) | |
758 blast_worker(self.fasta, database, single_output,params) | |
759 output = [single_output] | |
760 | |
761 # put into as new attribute and sqlite table | |
762 c = self.conn.cursor() | |
763 c.execute("create table {} (name text, db_id text, length real, bitscore real , pid real)".format( | |
764 annotation_name)) | |
765 for blast_results in output: | |
766 with open(blast_results, 'r') as f: | |
767 for i in f: | |
768 items = i.split() | |
769 types = [str, str, float, float, float, float, float] | |
770 qseqid, sseqid, qlen, slen, length, pident, bitscore = [ | |
771 f(i) for i, f in zip(items, types)] | |
772 c.execute('insert into {} values ("{}", "{}", "{}", "{}", "{}")'.format( | |
773 annotation_name, qseqid, sseqid, length, bitscore, pident)) | |
774 self.conn.commit() | |
775 self.annotations.append(annotation_name) | |
776 | |
777 @staticmethod | |
778 def fasta_length(fasta_file_name): | |
779 ''' | |
780 count number of sequences, | |
781 ''' | |
782 with open(fasta_file_name, 'r') as f: | |
783 N = 0 | |
784 for i in f: | |
785 if i.strip()[0] == ">": | |
786 N += 1 | |
787 return N | |
788 | |
789 | |
790 # test | |
791 if __name__ == "__main__": | |
792 # get root directory | |
793 | |
794 MAIN_DIR = os.path.realpath( | |
795 os.path.dirname(os.path.realpath(__file__)) + "/..") | |
796 print("MAIN_DIR:") | |
797 print(MAIN_DIR) | |
798 TMP_DIR = "{}/tmp".format(MAIN_DIR) | |
799 TEST_DATA_DIR = "{}/test_data".format(MAIN_DIR) | |
800 | |
801 # some data: | |
802 fn = "{}/pair_reads.fasta".format(TEST_DATA_DIR) | |
803 os.chdir(TMP_DIR) | |
804 # s = SequenceSet(filename='sqlite.db') | |
805 print("number of sequences in fasta file:") | |
806 print(SequenceSet.fasta_length(fn)) | |
807 | |
808 print("loading sequences...") | |
809 s = SequenceSet(source=fn, paired=True, rename=False, | |
810 filename='sqlite.db', prefix_length=4, fasta='sample2.fasta') | |
811 print(s) | |
812 | |
813 print("creating blast database") | |
814 s.makeblastdb() | |
815 | |
816 print("creating chunks from fasta file: ") | |
817 file_list = s.make_chunks("fasta_chunks", chunk_size=500) | |
818 print(file_list) | |
819 | |
820 print('creating hitsort') | |
821 s.create_hitsort(file_list, "hitsort") | |
822 | |
823 print("saving to fasta file") | |
824 s.save2fasta('sampleX.fasta', keep=False) |