1 #!/usr/bin/env python3
2 import logging
3 logger = logging.getLogger(__name__)
4 import itertools
5 import os
6 import sys
7 import random
8 import sqlite3
9 import subprocess
10 import shlex # for command line arguments split
11 from collections import namedtuple
12 from lib.utils import format_query
13 from lib.parallel.parallel import parallel2 as parallel
14 from lib.parallel.parallel import get_max_proc
16 MAX_PRINT = 10
18 if sys.version_info < REQUIRED_VERSION:
19 raise Exception("\n\npython 3.4 or higher is required!\n")
21 # additional functions
25 def _hitsort_worker(query, blastdb, output, options):
27 # output from blast is parsed from stdout
28 cmd = options.all2all_search_params.format(query = query, blastdb = blastdb)
29 print(cmd)
30 min_lcov, min_pid, min_ovl, min_scov, evalue_max = options.filtering_threshold
31 pair2insert = ''
32 signs ={'plus':'+', 'minus':'-'}
33 with open(output, 'w') as f:
34 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p:
35 for i in p.stdout:
36 items = i.decode('utf-8').split()
37 if options.filter_self_hits:
38 if items[4] == items[0]:
39 continue
40 evalue = float(items[10])
41 ovl_q = abs(float(items[2]) - float(items[3])) + 1
42 ovl_h = abs(float(items[6]) - float(items[7])) + 1
43 if (ovl_q >= min_ovl or ovl_h >= min_ovl) and float(items[8]) >= min_pid:
44 if float(items[1]) >= float(items[5]):
45 lcov = ovl_q * 100.0 / float(items[1])
46 scov = ovl_h * 100.0 / float(items[5])
47 else:
48 lcov = ovl_q * 100.0 / float(items[5])
49 scov = ovl_h * 100.0 / float(items[1])
50 # positive line:
51 if lcov >= min_lcov and scov >= min_scov and evalue_max > evalue:
52 pr = [items[0], items[4]]
53 # previous HSP
54 if pair2insert != "{0}\t{1}".format(pr[0], pr[1]):
55 pair2insert = "{0}\t{1}".format(pr[0], pr[1])
56 if items[11] in ['plus', 'minus']:
57 items[11] = signs[items[11]]
58 f.write("{0}\t{1}\t{2}\n".format(pair2insert, items[9], "\t".join(
59 items[k] for k in [1, 2, 3, 5, 6, 7, 8, 10, 11])))
61 def blast_with_filter(fasta_file_filter, blastdb):
62 '''
63 Return list of sequences query id which has similarity to filter
64 It uses mgblast for search
65 '''
66 params = '-p 85 -W18 -UT -X40 -KT -JF -F "m D" -v100000000 -b100000000 -D4 -C 30 -H 30'
67 positive_hits = set()
68 min_pid = 90
69 min_ovl_perc = 90
70 cmd = " ".join(['mgblast',
71 '-i', fasta_file_filter,
72 '-d', blastdb,
73 params
74 ])
75 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p:
76 for i in p.stdout:
77 items = i.decode('utf-8').split()
78 ovl_perc = (abs(float(items[6]) - float(items[7])) + 1) / float(items[5]) * 100
79 pid = float(items[8])
80 if pid > min_pid and ovl_perc > min_ovl_perc:
81 positive_hits.add(items[4])
82 return(positive_hits)
86 # TODO test task
87 # predefine blast params
88 def blast_worker(query, blastdb, output, params):
89 if params['program'] in ['blastx', 'blastn']:
90 default_columns = ' -outfmt "6 {}"'.format(params['output_columns'])
91 cmd = "{} -query {} -db {} {} {}".format(
92 params['program'], query, blastdb, params['args'], default_columns)
93 elif params['program']=='diamond blastx':
94 # diomand have slightly different format than blastx
95 default_columns = ' --outfmt 6 {}'.format(params['output_columns'])
96 cmd = "{} -q {} -d {} {} {}".format(
97 params['program'], query, blastdb, params['args'], default_columns)
98 print(cmd)
99 preceeding_pair = ['', '']
100 BlastValues = namedtuple("BlastValues", params['output_columns'])
101 with open(output, 'wb') as f:
102 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p:
103 for i in p.stdout:
104 items = i.decode('utf-8').split()
105 blast_values = BlastValues(*[f(i) for i, f in zip(items, params['column_types'])])
106 #qseqid, sseqid, qlen, slen, length, ppos, bitscore = [
107 # f(i) for i, f in zip(items, params['column_types'])]
108 if params['filter_function'](blast_values):
109 if preceeding_pair != [blast_values.qseqid, blast_values.sseqid]:
110 f.write(i)
111 preceeding_pair = [blast_values.qseqid, blast_values.sseqid]
114 class Sequence:
116 def __init__(self, seq, name, paired=False):
117 # the mode os seq storage can be changed later to make it more
118 # memory efficient
119 self._seq = bytes(str(seq), "ascii")
120 self.name = str(name)
122 @property
123 def seq(self):
124 return self._seq.decode("utf-8")
126 @seq.setter
127 def seq(self, value):
128 self._seq = bytes(str(value), "ascii")
130 def __str__(self):
131 return "{0} : {1}".format(self.name, self.seq)
133 @staticmethod
134 def read_fasta(fasta_file_name):
135 '''
136 generator - reads sequences from fasta file
137 return sequence one by one
138 '''
139 with open(fasta_file_name, 'r') as f:
140 header = None
141 seqstr = None
142 for rawline in f:
143 line = rawline.strip()
144 if line == "":
145 continue
146 if line[0] == ">":
147 if header and seqstr:
148 yield Sequence(seqstr, header)
149 # reset
150 seqstr = None
151 header = line[1:]
152 elif seqstr:
153 Warning("sequence was not preceeded by header")
154 else:
155 header = line[1:]
156 else:
157 seqstr = line if not seqstr else seqstr + line
158 # skip empty lines:
159 if header and seqstr:
160 yield Sequence(seqstr, header)
161 return
163 def write2fasta(self, file_object):
164 file_object.write(">{0}\n{1}\n".format(self.name, self.seq))
167 class SequenceSet:
169 def __init__(self, filename=None, source=None, sample_size=0, new=False, paired=False, prefix_length=0, rename=False, seed=123, fasta=None):
170 '''
171 filename: path to sqlite database, if none database is stored in memory
172 source: path to fasta file, if none empty database is created
173 sample_size: use only sample of sample_size from fasta file, if 0 all is used
174 new: should be old database created?
175 paired - paired reads
176 prefix_length -int
177 rename -- use int as names
179 creates SequenceSet, either empty or from fasta file,
180 it can be stored as dictionary or in shelve in file
181 only sample of all sequences can be loaded. if sample_size = 0
182 all sequences are loaded
183 if source is give, new database is created - if filename exist - it will be deleted!
184 '''
185 #======================================================================
186 # TODO name checking
187 # TODO detect unique prefixes
188 #======================================================================
189 # type of storage
190 random.seed(seed)
192 self.source = source
193 self.sample_size = sample_size
194 self.paired = paired
195 self.filename = filename
196 self.prefix_length = prefix_length
197 self.prefix_codes = {}
198 self.rename = rename
199 self.fasta = fasta
200 self._length = None
201 self._original_length = None
202 self.blastdb = None
203 self.blastdb_legacy = None
204 self.chunks = None
205 self.ids_kept = None
206 self.annotations = []
207 self.hitsort = None
208 if filename:
209 logger.debug("Creating database in file")
210 if os.path.isfile(filename) and (new or source):
211 # remove old database if exists
212 os.remove(filename)
213 # sqlite database in file
214 self.conn = sqlite3.connect(filename)
215 else:
216 # sqlite database in memory
217 logger.debug("Creating database in memory")
218 self.conn = sqlite3.connect(":memory:")
219 c = self.conn.cursor()
220 c.execute("create table sequences (name text, sequence text)")
222 if source:
223 self._read_from_fasta()
224 c.execute("CREATE TABLE prefix_codes (prefix TEXT, N INTEGER)")
225 self.conn.executemany("INSERT INTO prefix_codes (prefix, N) values (?,?)", self.prefix_codes.items())
226 self.conn.commit()
227 self._commit_immediately = True
228 self.makeblastdb(legacy=True)
229 # store some attribudes in database
230 self._update_database()
231 logger.info("sequences loaded")
232 logger.info(self)
234 def _update_database(self):
235 # pass all atributes - which are either float, ind and str to extra table
236 c = self.conn.cursor()
237 stored_attributes = [
238 ("sample_size", "integer"),
239 ("_length", "integer"),
240 ("_original_length", "integer"),
241 ("paired", "integer"),
242 ]
243 columns = ", ".join(i[0] + " " + i[1] for i in stored_attributes)
244 try:
245 c.execute((
246 "CREATE TABLE seqinfo ( {} )".format(columns)
247 ))
248 except sqlite3.OperationalError:
249 pass # table already exists
250 # get current values
251 values = tuple(getattr(self, i[0]) for i in stored_attributes)
252 placeholder = ", ".join(["?"] * len(values))
253 c.execute("DELETE FROM seqinfo")
254 c.execute("INSERT INTO seqinfo VALUES ({})".format(placeholder), values)
257 def _read_from_fasta(self):
258 # changed will be commited after whole file is loaded - this is faster
259 self._commit_immediately = False
260 if self.fasta:
261 f = open(self.fasta, mode='w')
262 # define sampling
263 if self.sample_size:
264 logger.debug(
265 'sampling sequences: {} sample size'.format(self.sample_size))
266 N = self.fasta_length(self.source)
267 if self.paired:
268 s = itertools.chain(
269 [[i, i + 1] for i in sorted(random.sample(range(1, N, 2), int(self.sample_size / 2)))])
271 sample = itertools.chain(
272 [item for sublist in s for item in sublist])
274 else:
275 sample = itertools.chain(
276 sorted(random.sample(range(1, N + 1), self.sample_size)))
277 logger.debug("sampling unpaired reads")
278 else:
279 # no sampling - use all
280 sample = itertools.count(1)
282 # define counter:
283 if self.rename:
284 if self.paired:
285 counter = itertools.count(1, 0.5)
286 else:
287 counter = itertools.count(1, 1)
288 else:
289 counter = itertools.cycle([""])
290 # define pairs:
291 if self.paired:
292 pair = itertools.cycle(['f', 'r'])
293 else:
294 pair = itertools.cycle([''])
295 position = 0
297 seq2write = next(sample)
299 for i in Sequence.read_fasta(self.source):
300 position += 1
301 if position == seq2write:
303 # create name:
304 prefix = i.name[0:self.prefix_length] # could be empty ""
305 if self.rename:
306 i.name = "{0}{1}{2}".format(
307 prefix,
308 int(next(counter)),
309 next(pair)
310 )
311 if self.prefix_length:
312 if prefix in self.prefix_codes:
313 self.prefix_codes[prefix] += 1
314 else:
315 self.prefix_codes[prefix] = 1
316 self[i.name] = i.seq
317 if self.fasta:
318 i.write2fasta(f)
319 try:
320 seq2write = next(sample)
321 except StopIteration:
322 # no more sequences to sample
323 break
326 self.conn.commit() # save changes
327 if self.fasta:
328 f.close()
329 self._commit_immediately = True
331 def __getitem__(self, item):
332 # item cannot be empty!!!
333 c = self.conn.cursor()
334 c.execute("select * from sequences where name= '{}'".format(item))
335 # try:
336 name, sequence = c.fetchone() # should be only one
337 # except TypeError:
338 # return None
339 return sequence
341 def __setitem__(self, item, value):
342 c = self.conn.cursor()
343 c.execute(
344 "insert into sequences values ( '{0}', '{1}')".format(item, value))
345 if self._commit_immediately:
346 self.conn.commit() # save changes
348 # def __iter__(self):
349 # self.c.execute("select name from sequences")
350 # for i in self.c:
351 # yield i[0]
353 def __iter__(self):
354 c = self.conn.cursor()
355 c.execute("select name from sequences")
356 for i in c:
357 yield i[0]
360 # def __iter__(self):
361 # for i in range(1,len(self)):
362 # yield i
363 def items(self):
364 c = self.conn.cursor()
365 c.execute("select name, sequence from sequences")
366 for name, seq in c:
367 yield Sequence(seq, name)
369 def toDict(self):
370 c = self.conn.cursor()
371 c.execute("select name, sequence from sequences")
372 d = {}
373 for name, seq in c:
374 d[name]=seq
375 return(d)
377 def __len__(self):
378 c = self.conn.cursor()
379 c.execute("select count(*) from sequences")
380 length = c.fetchone()[0]
381 return(length)
383 def __str__(self):
384 out = ""
385 c = self.conn.cursor()
386 c.execute("select * from sequences limit {0}".format(MAX_PRINT))
387 for n, s in c:
388 out = out + "{0} : {1}\n".format(n, s)
389 out = out + "...\n"
390 out = out + str(len(self)) + " sequence total\n"
391 return out
393 def insert(self, sequence, commit=True):
394 self._commit_immediately = commit
395 self[sequence.name] = sequence.seq
396 self._commit_immediately = True
398 def keys(self):
399 '''
400 this will get names of sequences
401 '''
402 c = self.conn.cursor()
403 c.execute('select name from sequences order by rowid')
404 names = []
405 for i in c.fetchall():
406 names.append(i[0])
407 return names
409 def sample(self, size, seed=123):
410 '''
411 generator - reproducible sampling sequences
412 '''
413 max_index = len(self)
414 # sample = seqtools.SequenceSet(source=info.input_sequences, )
415 if self.paired:
416 size = int(size / 2)
417 step = 2
418 else:
419 step = 1
420 random.seed(seed)
421 c = self.conn.cursor()
422 for i in sorted(random.sample(range(1, max_index, step), size)):
423 c.execute(
424 "select name, sequence from sequences where rowid={}".format(i))
425 name, sequence = c.fetchone()
426 yield Sequence(sequence, name)
427 if self.paired:
428 c.execute(
429 "select name, sequence from sequences where rowid={}".format(i + 1))
430 name, sequence = c.fetchone()
431 yield Sequence(sequence, name)
433 def sample2db(self, db_file=None, fasta_file_name=None, size=100, seed=123):
434 if (not db_file) and (not fasta_file_name):
435 raise IOError("no db_file nor fasta_file_name were defined")
436 # open files
437 if db_file:
438 db = SequenceSet(filename=db_file, source=None, new=True)
439 if fasta_file_name:
440 f = open(fasta_file_name, 'w')
441 # write in files
442 for i in self.sample(size, seed):
443 if db_file:
444 db.insert(i, commit=False)
445 if fasta_file_name:
446 i.write2fasta(f)
447 # close files
448 if fasta_file_name:
449 f.close()
450 if db_file:
451 db.conn.commit()
452 return db
454 def save2fasta(self, fasta_file_name, keep=True, subset=None):
455 if subset:
456 with open(fasta_file_name, 'w') as f:
457 c = self.conn.cursor()
458 c.execute("select name, sequence from sequences where name in ('{}')".format(
459 "','".join(subset)))
460 for name, sequence in c:
461 s = Sequence(sequence, name)
462 s.write2fasta(f)
463 else:
464 with open(fasta_file_name, 'w') as f:
465 for i in self.items():
466 i.write2fasta(f)
467 if keep:
468 self.fasta = fasta_file_name
470 def save_annotation(self, annotation_name, subset, dir):
471 annotation_file = "{}/{}_annotation.csv".format(dir, annotation_name)
472 with open(annotation_file, "w") as f:
473 c = self.conn.cursor()
474 c.execute(
475 "select * from {} where name in ('{}')".format(
476 annotation_name, "','".join(subset)
477 )
478 )
479 header = [description[0] for description in c.description]
480 f.write("\t".join(str(j) for j in header) + "\n")
481 for i in c:
482 f.write("\t".join(str(j) for j in i) + "\n")
483 return annotation_file
485 def make_chunks(self, file_name=None, chunk_size=5000):
486 '''
487 split SequneceSet to chunks and save to files,
488 return list of files names
489 '''
490 logger.debug("creating chunks from fasta file: ")
491 if not file_name and self.fasta:
492 file_name = self.fasta
493 else:
494 raise Exception('file name for chunks is not defined!')
496 seqs = iter(self.items())
497 fn_chunk_names = []
498 for i in itertools.count():
499 try:
500 fn = "{0}.{1}".format(file_name, i)
501 logger.info("saving chunk {}".format(fn))
502 for n in range(chunk_size):
503 s = next(seqs)
504 if not n:
505 f = open(fn, 'w')
506 s.write2fasta(f)
507 f.close()
508 fn_chunk_names.append(fn)
509 except StopIteration:
510 if not f.closed:
511 f.close()
512 fn_chunk_names.append(fn)
513 break
514 self.chunks = fn_chunk_names
515 logger.debug(self.chunks)
516 return(fn_chunk_names)
518 def makeblastdb(self, blastdb=None, dbtype='nucl', legacy=False, lastdb = False):
519 '''
520 dbtype eithe 'nucl' of 'prot'
521 '''
522 logger.info("creating blast database")
523 # check if fasta file on disk is present
524 if not self.fasta:
525 try:
526 self.save2fasta(blastdb)
527 except TypeError:
528 print("\nEither blastdb or fasta must be ",
529 "defined when making blast database!!!\n")
530 raise
531 else:
532 blastdb = blastdb if blastdb else self.fasta
534 cmd = "makeblastdb -in {0} -out {1} -dbtype {2}".format(self.fasta,
535 blastdb,
536 dbtype)
537 args = shlex.split(cmd)
538 if 0 == subprocess.check_call(args, stderr=sys.stdout):
539 self.blastdb = blastdb
540 else:
541 Warning("makeblastdb failed")
543 if legacy:
544 logger.info("creating legacy blast database")
545 prot = {'nucl': 'F', 'prot': 'T'}[dbtype]
546 cmd = "formatdb -i {0} -p {1} -n {2}.legacy".format(self.fasta,
547 prot,
548 blastdb
549 )
550 args = shlex.split(cmd)
551 if subprocess.check_call(args) == 0:
552 self.blastdb_legacy = "{}.legacy".format(blastdb)
553 else:
554 Warning("formatdb failed")
555 if lastdb:
556 logger.info("creating lastdb database")
557 cmd = "lastdb -u NEAR {fasta} {fasta}".format(fasta=self.fasta)
558 args = shlex.split(cmd)
559 if subprocess.check_call(args) == 0:
560 self.lastdb = self.fasta
561 else:
562 Warning("formatdb failed")
565 def create_hitsort(self, options, output=None):
566 '''
567 query is name of files to search against blastdb of SequenceSet object
568 query could be multiple files
569 this is inteded to be used for tabular output only
570 output file must be specified,
571 if more than one query is used, multiple files are created
572 worker - function to execute blast search
573 '''
575 logger.info('running all to all blast')
576 query = self.chunks
577 if not output:
578 output = "{}.blast".format(self.fasta)
579 database = getattr(self, options.database)
580 if len(query) > 1:
581 output_parts = [output + "." + str(i) for i in range(len(query))]
582 else:
583 output_parts = [output]
584 query = [query]
586 parallel(_hitsort_worker, query, [database],
587 output_parts, [options])
588 logger.info('all to all blast finished')
589 logger.info('removing duplicates from all to all blast results')
590 self._clean_hitsort(
591 hitsort_files=output_parts, filtered_hitsort=output)
592 # remove temp files:
593 for f in output_parts:
594 os.unlink(f)
595 self.hitsort = output
596 return output
598 def _clean_hitsort(self, hitsort_files, filtered_hitsort):
599 ''' alterantive hitsort duplicate removal '''
601 ids = self.keys()
602 Nfiles = 5 + int(sum([os.path.getsize(i)
603 for i in hitsort_files]) / 5e9)
604 hitsort_parts = [
605 "{0}.{1}.parts".format(filtered_hitsort, i) for i in range(Nfiles)]
606 # open files
607 fs = []
608 for i in hitsort_parts:
609 fs.append(open(i, 'w'))
611 ids_destination = {}
612 fs_iter = itertools.cycle(fs)
613 for i in ids:
614 #ids_destination[i] = fs_iter.next()
615 ids_destination[i] = next(fs_iter)
616 lines_out = lines_total = 0
617 temp_open = False
619 for i in hitsort_files:
620 f = open(i, 'r')
621 while True:
622 line = f.readline()
623 if not line:
624 break
625 lines_total += 1
626 line_items = line.split()
627 # ids_destination[line_items[0]].write(line)
628 # do not assume that it is sorted!
629 ids_destination[min(line_items[0:2])].write(line)
630 # close all files
631 for i in fs:
632 i.close()
634 # load one by one and exclude duplicates:
635 hit_out = open(filtered_hitsort, 'w')
636 for i in hitsort_parts:
637 f = open(i, 'r')
638 hitsort_shelve = {}
639 while True:
640 line = f.readline()
641 if not line:
642 break
643 line_items = line.split()
644 key = "\t".join(sorted(line_items[0:2]))
645 value = line_items
646 bitscore = float(line_items[2])
647 if key in hitsort_shelve:
648 if float(hitsort_shelve[key][2]) > bitscore:
649 hitsort_shelve[key] = value
650 hit_out.write("\t".join(hitsort_shelve[key]) + "\n")
651 del hitsort_shelve[key]
652 lines_out += 1
653 else:
654 hitsort_shelve[key] = value
655 f.close()
656 for key in hitsort_shelve:
657 hit_out.write("\t".join(hitsort_shelve[key]) + "\n")
658 lines_out += 1
659 # clean up
660 for i in hitsort_parts:
661 os.unlink(i)
662 hit_out.close()
663 return lines_total, lines_out
665 def _check_database(self, database):
666 '''
667 database is path to fastafile, database with same prefix should exist
668 it creates database if does not exist as temporal file!
669 '''
670 pass
671 # TODO
673 def remove_sequences_using_filter(self, fasta_file_filter, keep_proportion,
674 omitted_sequences_file,
675 kept_sequences_file):
676 '''
677 Run blast with fasta_file_filter against legaci blastdb of SequenceSet to detect sequences
678 which are to be removed from SequencesSet. Complete pairsare removed if paired sequences
679 are used.
680 '''
681 ids = blast_with_filter(fasta_file_filter, self.blastdb_legacy)
682 # check against database and remove sequences
684 c = self.conn.cursor()
685 if self.paired:
686 c.execute("SELECT rowid FROM sequences WHERE name IN {}".format(
687 format_query(ids)
688 ))
689 odd_rowids = set()
690 for i in c.fetchall():
691 if i[0] % 2 == 0:
692 odd_rowids.add(i[0] - 1)
693 else:
694 odd_rowids.add(i[0])
695 odd_rowids_keep = random.sample(odd_rowids, round(keep_proportion * len(odd_rowids)))
696 c.execute("SELECT name FROM sequences WHERE rowid IN {} ORDER BY rowid".format(
697 format_query(odd_rowids_keep + [i+1 for i in odd_rowids_keep])
698 ))
699 self.ids_kept = [i[0] for i in c.fetchall()]
700 odd_rowids_omit = list(odd_rowids.difference(odd_rowids_keep))
701 c.execute("SELECT name FROM sequences WHERE rowid IN {} ORDER BY rowid".format(
702 format_query(odd_rowids_omit + [i+1 for i in odd_rowids_omit])
703 ))
704 ids_omit = [i[0] for i in c.fetchall()]
706 else:
707 self.ids_kept = random.sample(ids, round(keep_proportion * len(ids)))
708 ids_omit = list(ids.difference(self.ids_kept))
709 self.save2fasta(omitted_sequences_file, keep=False, subset=ids_omit)
710 self.save2fasta(kept_sequences_file, keep=False, subset=self.ids_kept)
711 self.drop_sequences(ids_omit)
712 # save info about omited sequences
713 c.execute("CREATE TABLE ids_kept (name TEXT)")
714 c.executemany("INSERT INTO ids_kept (name) values (?)", [(i,) for i in self.ids_kept])
715 return len(ids_omit)
717 def drop_sequences(self, ids_to_drop):
718 '''
719 remove sequences from sequence set based on the ID
720 '''
721 self._original_length = len(self)
722 c = self.conn.cursor()
723 idslist = format_query(ids_to_drop)
724 c.execute("CREATE TABLE omited_sequences AS SELECT * FROM sequences WHERE name IN {}".format(idslist))
725 c.execute("DELETE FROM sequences WHERE name IN {}".format(
726 idslist
727 ))
728 # new databases must be created - with ommited sequences!
729 self.save2fasta(fasta_file_name=self.fasta) ## this will replace origninal file!
730 self._length = len(self)
731 self.makeblastdb(legacy=True)
732 self._update_database()
734 def annotate(self, database, annotation_name, directory="", params=""):
735 '''
736 database : path to blast formated database
737 method: type of search to use for annotation. it must be
738 'blastn' of 'blastx'
739 Annotation is save in directory also into the table with name stored in
740 annotation_name variable
741 '''
742 logger.info("annotating reads with {} ".format(annotation_name))
743 self._check_database(database)
744 if "parallelize" in params:
745 parallelize = params['parallelize']
746 else:
747 parallelize = True
748 if parallelize:
749 if not self.chunks:
750 self.make_chunks()
751 output = [
752 "{}/{}_{}".format(directory, annotation_name,os.path.basename(i)) for i in self.chunks]
753 parallel(blast_worker, self.chunks, [database], output, [params])
754 else:
755 # no explicit parallelization using parallel
756 single_output = "{}/{}_hits".format(directory, annotation_name)
757 params['args'] = params['args'].format(max_proc=get_max_proc())
758 blast_worker(self.fasta, database, single_output,params)
759 output = [single_output]
761 # put into as new attribute and sqlite table
762 c = self.conn.cursor()
763 c.execute("create table {} (name text, db_id text, length real, bitscore real , pid real)".format(
764 annotation_name))
765 for blast_results in output:
766 with open(blast_results, 'r') as f:
767 for i in f:
768 items = i.split()
769 types = [str, str, float, float, float, float, float]
770 qseqid, sseqid, qlen, slen, length, pident, bitscore = [
771 f(i) for i, f in zip(items, types)]
772 c.execute('insert into {} values ("{}", "{}", "{}", "{}", "{}")'.format(
773 annotation_name, qseqid, sseqid, length, bitscore, pident))
774 self.conn.commit()
775 self.annotations.append(annotation_name)
777 @staticmethod
778 def fasta_length(fasta_file_name):
779 '''
780 count number of sequences,
781 '''
782 with open(fasta_file_name, 'r') as f:
783 N = 0
784 for i in f:
785 if i.strip()[0] == ">":
786 N += 1
787 return N
790 # test
791 if __name__ == "__main__":
792 # get root directory
794 MAIN_DIR = os.path.realpath(
795 os.path.dirname(os.path.realpath(__file__)) + "/..")
796 print("MAIN_DIR:")
797 print(MAIN_DIR)
798 TMP_DIR = "{}/tmp".format(MAIN_DIR)
799 TEST_DATA_DIR = "{}/test_data".format(MAIN_DIR)
801 # some data:
802 fn = "{}/pair_reads.fasta".format(TEST_DATA_DIR)
803 os.chdir(TMP_DIR)
804 # s = SequenceSet(filename='sqlite.db')
805 print("number of sequences in fasta file:")
806 print(SequenceSet.fasta_length(fn))
808 print("loading sequences...")
809 s = SequenceSet(source=fn, paired=True, rename=False,
810 filename='sqlite.db', prefix_length=4, fasta='sample2.fasta')
811 print(s)
813 print("creating blast database")
814 s.makeblastdb()
816 print("creating chunks from fasta file: ")
817 file_list = s.make_chunks("fasta_chunks", chunk_size=500)
818 print(file_list)
820 print('creating hitsort')
821 s.create_hitsort(file_list, "hitsort")
823 print("saving to fasta file")
824 s.save2fasta('sampleX.fasta', keep=False)