0
|
1 #!/usr/bin/env python3
|
|
2 import logging
|
|
3 logger = logging.getLogger(__name__)
|
|
4 import itertools
|
|
5 import os
|
|
6 import sys
|
|
7 import random
|
|
8 import sqlite3
|
|
9 import subprocess
|
|
10 import shlex # for command line arguments split
|
|
11 from collections import namedtuple
|
|
12 from lib.utils import format_query
|
|
13 from lib.parallel.parallel import parallel2 as parallel
|
|
14 from lib.parallel.parallel import get_max_proc
|
|
15 REQUIRED_VERSION = (3, 4)
|
|
16 MAX_PRINT = 10
|
|
17
|
|
18 if sys.version_info < REQUIRED_VERSION:
|
|
19 raise Exception("\n\npython 3.4 or higher is required!\n")
|
|
20
|
|
21 # additional functions
|
|
22
|
|
23
|
|
24
|
|
25 def _hitsort_worker(query, blastdb, output, options):
|
|
26
|
|
27 # output from blast is parsed from stdout
|
|
28 cmd = options.all2all_search_params.format(query = query, blastdb = blastdb)
|
|
29 print(cmd)
|
|
30 min_lcov, min_pid, min_ovl, min_scov, evalue_max = options.filtering_threshold
|
|
31 pair2insert = ''
|
|
32 signs ={'plus':'+', 'minus':'-'}
|
|
33 with open(output, 'w') as f:
|
|
34 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p:
|
|
35 for i in p.stdout:
|
|
36 items = i.decode('utf-8').split()
|
|
37 if options.filter_self_hits:
|
|
38 if items[4] == items[0]:
|
|
39 continue
|
|
40 evalue = float(items[10])
|
|
41 ovl_q = abs(float(items[2]) - float(items[3])) + 1
|
|
42 ovl_h = abs(float(items[6]) - float(items[7])) + 1
|
|
43 if (ovl_q >= min_ovl or ovl_h >= min_ovl) and float(items[8]) >= min_pid:
|
|
44 if float(items[1]) >= float(items[5]):
|
|
45 lcov = ovl_q * 100.0 / float(items[1])
|
|
46 scov = ovl_h * 100.0 / float(items[5])
|
|
47 else:
|
|
48 lcov = ovl_q * 100.0 / float(items[5])
|
|
49 scov = ovl_h * 100.0 / float(items[1])
|
|
50 # positive line:
|
|
51 if lcov >= min_lcov and scov >= min_scov and evalue_max > evalue:
|
|
52 pr = [items[0], items[4]]
|
|
53 # previous HSP
|
|
54 if pair2insert != "{0}\t{1}".format(pr[0], pr[1]):
|
|
55 pair2insert = "{0}\t{1}".format(pr[0], pr[1])
|
|
56 if items[11] in ['plus', 'minus']:
|
|
57 items[11] = signs[items[11]]
|
|
58 f.write("{0}\t{1}\t{2}\n".format(pair2insert, items[9], "\t".join(
|
|
59 items[k] for k in [1, 2, 3, 5, 6, 7, 8, 10, 11])))
|
|
60
|
|
61 def blast_with_filter(fasta_file_filter, blastdb):
|
|
62 '''
|
|
63 Return list of sequences query id which has similarity to filter
|
|
64 It uses mgblast for search
|
|
65 '''
|
|
66 params = '-p 85 -W18 -UT -X40 -KT -JF -F "m D" -v100000000 -b100000000 -D4 -C 30 -H 30'
|
|
67 positive_hits = set()
|
|
68 min_pid = 90
|
|
69 min_ovl_perc = 90
|
|
70 cmd = " ".join(['mgblast',
|
|
71 '-i', fasta_file_filter,
|
|
72 '-d', blastdb,
|
|
73 params
|
|
74 ])
|
|
75 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p:
|
|
76 for i in p.stdout:
|
|
77 items = i.decode('utf-8').split()
|
|
78 ovl_perc = (abs(float(items[6]) - float(items[7])) + 1) / float(items[5]) * 100
|
|
79 pid = float(items[8])
|
|
80 if pid > min_pid and ovl_perc > min_ovl_perc:
|
|
81 positive_hits.add(items[4])
|
|
82 return(positive_hits)
|
|
83
|
|
84
|
|
85
|
|
86 # TODO test task
|
|
87 # predefine blast params
|
|
88 def blast_worker(query, blastdb, output, params):
|
|
89 if params['program'] in ['blastx', 'blastn']:
|
|
90 default_columns = ' -outfmt "6 {}"'.format(params['output_columns'])
|
|
91 cmd = "{} -query {} -db {} {} {}".format(
|
|
92 params['program'], query, blastdb, params['args'], default_columns)
|
|
93 elif params['program']=='diamond blastx':
|
|
94 # diomand have slightly different format than blastx
|
|
95 default_columns = ' --outfmt 6 {}'.format(params['output_columns'])
|
|
96 cmd = "{} -q {} -d {} {} {}".format(
|
|
97 params['program'], query, blastdb, params['args'], default_columns)
|
|
98 print(cmd)
|
|
99 preceeding_pair = ['', '']
|
|
100 BlastValues = namedtuple("BlastValues", params['output_columns'])
|
|
101 with open(output, 'wb') as f:
|
|
102 with subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE) as p:
|
|
103 for i in p.stdout:
|
|
104 items = i.decode('utf-8').split()
|
|
105 blast_values = BlastValues(*[f(i) for i, f in zip(items, params['column_types'])])
|
|
106 #qseqid, sseqid, qlen, slen, length, ppos, bitscore = [
|
|
107 # f(i) for i, f in zip(items, params['column_types'])]
|
|
108 if params['filter_function'](blast_values):
|
|
109 if preceeding_pair != [blast_values.qseqid, blast_values.sseqid]:
|
|
110 f.write(i)
|
|
111 preceeding_pair = [blast_values.qseqid, blast_values.sseqid]
|
|
112
|
|
113
|
|
114 class Sequence:
|
|
115
|
|
116 def __init__(self, seq, name, paired=False):
|
|
117 # the mode os seq storage can be changed later to make it more
|
|
118 # memory efficient
|
|
119 self._seq = bytes(str(seq), "ascii")
|
|
120 self.name = str(name)
|
|
121
|
|
122 @property
|
|
123 def seq(self):
|
|
124 return self._seq.decode("utf-8")
|
|
125
|
|
126 @seq.setter
|
|
127 def seq(self, value):
|
|
128 self._seq = bytes(str(value), "ascii")
|
|
129
|
|
130 def __str__(self):
|
|
131 return "{0} : {1}".format(self.name, self.seq)
|
|
132
|
|
133 @staticmethod
|
|
134 def read_fasta(fasta_file_name):
|
|
135 '''
|
|
136 generator - reads sequences from fasta file
|
|
137 return sequence one by one
|
|
138 '''
|
|
139 with open(fasta_file_name, 'r') as f:
|
|
140 header = None
|
|
141 seqstr = None
|
|
142 for rawline in f:
|
|
143 line = rawline.strip()
|
|
144 if line == "":
|
|
145 continue
|
|
146 if line[0] == ">":
|
|
147 if header and seqstr:
|
|
148 yield Sequence(seqstr, header)
|
|
149 # reset
|
|
150 seqstr = None
|
|
151 header = line[1:]
|
|
152 elif seqstr:
|
|
153 Warning("sequence was not preceeded by header")
|
|
154 else:
|
|
155 header = line[1:]
|
|
156 else:
|
|
157 seqstr = line if not seqstr else seqstr + line
|
|
158 # skip empty lines:
|
|
159 if header and seqstr:
|
|
160 yield Sequence(seqstr, header)
|
|
161 return
|
|
162
|
|
163 def write2fasta(self, file_object):
|
|
164 file_object.write(">{0}\n{1}\n".format(self.name, self.seq))
|
|
165
|
|
166
|
|
167 class SequenceSet:
|
|
168
|
|
169 def __init__(self, filename=None, source=None, sample_size=0, new=False, paired=False, prefix_length=0, rename=False, seed=123, fasta=None):
|
|
170 '''
|
|
171 filename: path to sqlite database, if none database is stored in memory
|
|
172 source: path to fasta file, if none empty database is created
|
|
173 sample_size: use only sample of sample_size from fasta file, if 0 all is used
|
|
174 new: should be old database created?
|
|
175 paired - paired reads
|
|
176 prefix_length -int
|
|
177 rename -- use int as names
|
|
178
|
|
179 creates SequenceSet, either empty or from fasta file,
|
|
180 it can be stored as dictionary or in shelve in file
|
|
181 only sample of all sequences can be loaded. if sample_size = 0
|
|
182 all sequences are loaded
|
|
183 if source is give, new database is created - if filename exist - it will be deleted!
|
|
184 '''
|
|
185 #======================================================================
|
|
186 # TODO name checking
|
|
187 # TODO detect unique prefixes
|
|
188 #======================================================================
|
|
189 # type of storage
|
|
190 random.seed(seed)
|
|
191
|
|
192 self.source = source
|
|
193 self.sample_size = sample_size
|
|
194 self.paired = paired
|
|
195 self.filename = filename
|
|
196 self.prefix_length = prefix_length
|
|
197 self.prefix_codes = {}
|
|
198 self.rename = rename
|
|
199 self.fasta = fasta
|
|
200 self._length = None
|
|
201 self._original_length = None
|
|
202 self.blastdb = None
|
|
203 self.blastdb_legacy = None
|
|
204 self.chunks = None
|
|
205 self.ids_kept = None
|
|
206 self.annotations = []
|
|
207 self.hitsort = None
|
|
208 if filename:
|
|
209 logger.debug("Creating database in file")
|
|
210 if os.path.isfile(filename) and (new or source):
|
|
211 # remove old database if exists
|
|
212 os.remove(filename)
|
|
213 # sqlite database in file
|
|
214 self.conn = sqlite3.connect(filename)
|
|
215 else:
|
|
216 # sqlite database in memory
|
|
217 logger.debug("Creating database in memory")
|
|
218 self.conn = sqlite3.connect(":memory:")
|
|
219 c = self.conn.cursor()
|
|
220 c.execute("create table sequences (name text, sequence text)")
|
|
221
|
|
222 if source:
|
|
223 self._read_from_fasta()
|
|
224 c.execute("CREATE TABLE prefix_codes (prefix TEXT, N INTEGER)")
|
|
225 self.conn.executemany("INSERT INTO prefix_codes (prefix, N) values (?,?)", self.prefix_codes.items())
|
|
226 self.conn.commit()
|
|
227 self._commit_immediately = True
|
|
228 self.makeblastdb(legacy=True)
|
|
229 # store some attribudes in database
|
|
230 self._update_database()
|
|
231 logger.info("sequences loaded")
|
|
232 logger.info(self)
|
|
233
|
|
234 def _update_database(self):
|
|
235 # pass all atributes - which are either float, ind and str to extra table
|
|
236 c = self.conn.cursor()
|
|
237 stored_attributes = [
|
|
238 ("sample_size", "integer"),
|
|
239 ("_length", "integer"),
|
|
240 ("_original_length", "integer"),
|
|
241 ("paired", "integer"),
|
|
242 ]
|
|
243 columns = ", ".join(i[0] + " " + i[1] for i in stored_attributes)
|
|
244 try:
|
|
245 c.execute((
|
|
246 "CREATE TABLE seqinfo ( {} )".format(columns)
|
|
247 ))
|
|
248 except sqlite3.OperationalError:
|
|
249 pass # table already exists
|
|
250 # get current values
|
|
251 values = tuple(getattr(self, i[0]) for i in stored_attributes)
|
|
252 placeholder = ", ".join(["?"] * len(values))
|
|
253 c.execute("DELETE FROM seqinfo")
|
|
254 c.execute("INSERT INTO seqinfo VALUES ({})".format(placeholder), values)
|
|
255
|
|
256
|
|
257 def _read_from_fasta(self):
|
|
258 # changed will be commited after whole file is loaded - this is faster
|
|
259 self._commit_immediately = False
|
|
260 if self.fasta:
|
|
261 f = open(self.fasta, mode='w')
|
|
262 # define sampling
|
|
263 if self.sample_size:
|
|
264 logger.debug(
|
|
265 'sampling sequences: {} sample size'.format(self.sample_size))
|
|
266 N = self.fasta_length(self.source)
|
|
267 if self.paired:
|
|
268 s = itertools.chain(
|
|
269 [[i, i + 1] for i in sorted(random.sample(range(1, N, 2), int(self.sample_size / 2)))])
|
|
270
|
|
271 sample = itertools.chain(
|
|
272 [item for sublist in s for item in sublist])
|
|
273
|
|
274 else:
|
|
275 sample = itertools.chain(
|
|
276 sorted(random.sample(range(1, N + 1), self.sample_size)))
|
|
277 logger.debug("sampling unpaired reads")
|
|
278 else:
|
|
279 # no sampling - use all
|
|
280 sample = itertools.count(1)
|
|
281
|
|
282 # define counter:
|
|
283 if self.rename:
|
|
284 if self.paired:
|
|
285 counter = itertools.count(1, 0.5)
|
|
286 else:
|
|
287 counter = itertools.count(1, 1)
|
|
288 else:
|
|
289 counter = itertools.cycle([""])
|
|
290 # define pairs:
|
|
291 if self.paired:
|
|
292 pair = itertools.cycle(['f', 'r'])
|
|
293 else:
|
|
294 pair = itertools.cycle([''])
|
|
295 position = 0
|
|
296
|
|
297 seq2write = next(sample)
|
|
298
|
|
299 for i in Sequence.read_fasta(self.source):
|
|
300 position += 1
|
|
301 if position == seq2write:
|
|
302
|
|
303 # create name:
|
|
304 prefix = i.name[0:self.prefix_length] # could be empty ""
|
|
305 if self.rename:
|
|
306 i.name = "{0}{1}{2}".format(
|
|
307 prefix,
|
|
308 int(next(counter)),
|
|
309 next(pair)
|
|
310 )
|
|
311 if self.prefix_length:
|
|
312 if prefix in self.prefix_codes:
|
|
313 self.prefix_codes[prefix] += 1
|
|
314 else:
|
|
315 self.prefix_codes[prefix] = 1
|
|
316 self[i.name] = i.seq
|
|
317 if self.fasta:
|
|
318 i.write2fasta(f)
|
|
319 try:
|
|
320 seq2write = next(sample)
|
|
321 except StopIteration:
|
|
322 # no more sequences to sample
|
|
323 break
|
|
324
|
|
325
|
|
326 self.conn.commit() # save changes
|
|
327 if self.fasta:
|
|
328 f.close()
|
|
329 self._commit_immediately = True
|
|
330
|
|
331 def __getitem__(self, item):
|
|
332 # item cannot be empty!!!
|
|
333 c = self.conn.cursor()
|
|
334 c.execute("select * from sequences where name= '{}'".format(item))
|
|
335 # try:
|
|
336 name, sequence = c.fetchone() # should be only one
|
|
337 # except TypeError:
|
|
338 # return None
|
|
339 return sequence
|
|
340
|
|
341 def __setitem__(self, item, value):
|
|
342 c = self.conn.cursor()
|
|
343 c.execute(
|
|
344 "insert into sequences values ( '{0}', '{1}')".format(item, value))
|
|
345 if self._commit_immediately:
|
|
346 self.conn.commit() # save changes
|
|
347
|
|
348 # def __iter__(self):
|
|
349 # self.c.execute("select name from sequences")
|
|
350 # for i in self.c:
|
|
351 # yield i[0]
|
|
352
|
|
353 def __iter__(self):
|
|
354 c = self.conn.cursor()
|
|
355 c.execute("select name from sequences")
|
|
356 for i in c:
|
|
357 yield i[0]
|
|
358
|
|
359
|
|
360 # def __iter__(self):
|
|
361 # for i in range(1,len(self)):
|
|
362 # yield i
|
|
363 def items(self):
|
|
364 c = self.conn.cursor()
|
|
365 c.execute("select name, sequence from sequences")
|
|
366 for name, seq in c:
|
|
367 yield Sequence(seq, name)
|
|
368
|
|
369 def toDict(self):
|
|
370 c = self.conn.cursor()
|
|
371 c.execute("select name, sequence from sequences")
|
|
372 d = {}
|
|
373 for name, seq in c:
|
|
374 d[name]=seq
|
|
375 return(d)
|
|
376
|
|
377 def __len__(self):
|
|
378 c = self.conn.cursor()
|
|
379 c.execute("select count(*) from sequences")
|
|
380 length = c.fetchone()[0]
|
|
381 return(length)
|
|
382
|
|
383 def __str__(self):
|
|
384 out = ""
|
|
385 c = self.conn.cursor()
|
|
386 c.execute("select * from sequences limit {0}".format(MAX_PRINT))
|
|
387 for n, s in c:
|
|
388 out = out + "{0} : {1}\n".format(n, s)
|
|
389 out = out + "...\n"
|
|
390 out = out + str(len(self)) + " sequence total\n"
|
|
391 return out
|
|
392
|
|
393 def insert(self, sequence, commit=True):
|
|
394 self._commit_immediately = commit
|
|
395 self[sequence.name] = sequence.seq
|
|
396 self._commit_immediately = True
|
|
397
|
|
398 def keys(self):
|
|
399 '''
|
|
400 this will get names of sequences
|
|
401 '''
|
|
402 c = self.conn.cursor()
|
|
403 c.execute('select name from sequences order by rowid')
|
|
404 names = []
|
|
405 for i in c.fetchall():
|
|
406 names.append(i[0])
|
|
407 return names
|
|
408
|
|
409 def sample(self, size, seed=123):
|
|
410 '''
|
|
411 generator - reproducible sampling sequences
|
|
412 '''
|
|
413 max_index = len(self)
|
|
414 # sample = seqtools.SequenceSet(source=info.input_sequences, )
|
|
415 if self.paired:
|
|
416 size = int(size / 2)
|
|
417 step = 2
|
|
418 else:
|
|
419 step = 1
|
|
420 random.seed(seed)
|
|
421 c = self.conn.cursor()
|
|
422 for i in sorted(random.sample(range(1, max_index, step), size)):
|
|
423 c.execute(
|
|
424 "select name, sequence from sequences where rowid={}".format(i))
|
|
425 name, sequence = c.fetchone()
|
|
426 yield Sequence(sequence, name)
|
|
427 if self.paired:
|
|
428 c.execute(
|
|
429 "select name, sequence from sequences where rowid={}".format(i + 1))
|
|
430 name, sequence = c.fetchone()
|
|
431 yield Sequence(sequence, name)
|
|
432
|
|
433 def sample2db(self, db_file=None, fasta_file_name=None, size=100, seed=123):
|
|
434 if (not db_file) and (not fasta_file_name):
|
|
435 raise IOError("no db_file nor fasta_file_name were defined")
|
|
436 # open files
|
|
437 if db_file:
|
|
438 db = SequenceSet(filename=db_file, source=None, new=True)
|
|
439 if fasta_file_name:
|
|
440 f = open(fasta_file_name, 'w')
|
|
441 # write in files
|
|
442 for i in self.sample(size, seed):
|
|
443 if db_file:
|
|
444 db.insert(i, commit=False)
|
|
445 if fasta_file_name:
|
|
446 i.write2fasta(f)
|
|
447 # close files
|
|
448 if fasta_file_name:
|
|
449 f.close()
|
|
450 if db_file:
|
|
451 db.conn.commit()
|
|
452 return db
|
|
453
|
|
454 def save2fasta(self, fasta_file_name, keep=True, subset=None):
|
|
455 if subset:
|
|
456 with open(fasta_file_name, 'w') as f:
|
|
457 c = self.conn.cursor()
|
|
458 c.execute("select name, sequence from sequences where name in ('{}')".format(
|
|
459 "','".join(subset)))
|
|
460 for name, sequence in c:
|
|
461 s = Sequence(sequence, name)
|
|
462 s.write2fasta(f)
|
|
463 else:
|
|
464 with open(fasta_file_name, 'w') as f:
|
|
465 for i in self.items():
|
|
466 i.write2fasta(f)
|
|
467 if keep:
|
|
468 self.fasta = fasta_file_name
|
|
469
|
|
470 def save_annotation(self, annotation_name, subset, dir):
|
|
471 annotation_file = "{}/{}_annotation.csv".format(dir, annotation_name)
|
|
472 with open(annotation_file, "w") as f:
|
|
473 c = self.conn.cursor()
|
|
474 c.execute(
|
|
475 "select * from {} where name in ('{}')".format(
|
|
476 annotation_name, "','".join(subset)
|
|
477 )
|
|
478 )
|
|
479 header = [description[0] for description in c.description]
|
|
480 f.write("\t".join(str(j) for j in header) + "\n")
|
|
481 for i in c:
|
|
482 f.write("\t".join(str(j) for j in i) + "\n")
|
|
483 return annotation_file
|
|
484
|
|
485 def make_chunks(self, file_name=None, chunk_size=5000):
|
|
486 '''
|
|
487 split SequneceSet to chunks and save to files,
|
|
488 return list of files names
|
|
489 '''
|
|
490 logger.debug("creating chunks from fasta file: ")
|
|
491 if not file_name and self.fasta:
|
|
492 file_name = self.fasta
|
|
493 else:
|
|
494 raise Exception('file name for chunks is not defined!')
|
|
495
|
|
496 seqs = iter(self.items())
|
|
497 fn_chunk_names = []
|
|
498 for i in itertools.count():
|
|
499 try:
|
|
500 fn = "{0}.{1}".format(file_name, i)
|
|
501 logger.info("saving chunk {}".format(fn))
|
|
502 for n in range(chunk_size):
|
|
503 s = next(seqs)
|
|
504 if not n:
|
|
505 f = open(fn, 'w')
|
|
506 s.write2fasta(f)
|
|
507 f.close()
|
|
508 fn_chunk_names.append(fn)
|
|
509 except StopIteration:
|
|
510 if not f.closed:
|
|
511 f.close()
|
|
512 fn_chunk_names.append(fn)
|
|
513 break
|
|
514 self.chunks = fn_chunk_names
|
|
515 logger.debug(self.chunks)
|
|
516 return(fn_chunk_names)
|
|
517
|
|
518 def makeblastdb(self, blastdb=None, dbtype='nucl', legacy=False, lastdb = False):
|
|
519 '''
|
|
520 dbtype eithe 'nucl' of 'prot'
|
|
521 '''
|
|
522 logger.info("creating blast database")
|
|
523 # check if fasta file on disk is present
|
|
524 if not self.fasta:
|
|
525 try:
|
|
526 self.save2fasta(blastdb)
|
|
527 except TypeError:
|
|
528 print("\nEither blastdb or fasta must be ",
|
|
529 "defined when making blast database!!!\n")
|
|
530 raise
|
|
531 else:
|
|
532 blastdb = blastdb if blastdb else self.fasta
|
|
533
|
|
534 cmd = "makeblastdb -in {0} -out {1} -dbtype {2}".format(self.fasta,
|
|
535 blastdb,
|
|
536 dbtype)
|
|
537 args = shlex.split(cmd)
|
|
538 if 0 == subprocess.check_call(args, stderr=sys.stdout):
|
|
539 self.blastdb = blastdb
|
|
540 else:
|
|
541 Warning("makeblastdb failed")
|
|
542
|
|
543 if legacy:
|
|
544 logger.info("creating legacy blast database")
|
|
545 prot = {'nucl': 'F', 'prot': 'T'}[dbtype]
|
|
546 cmd = "formatdb -i {0} -p {1} -n {2}.legacy".format(self.fasta,
|
|
547 prot,
|
|
548 blastdb
|
|
549 )
|
|
550 args = shlex.split(cmd)
|
|
551 if subprocess.check_call(args) == 0:
|
|
552 self.blastdb_legacy = "{}.legacy".format(blastdb)
|
|
553 else:
|
|
554 Warning("formatdb failed")
|
|
555 if lastdb:
|
|
556 logger.info("creating lastdb database")
|
|
557 cmd = "lastdb -u NEAR {fasta} {fasta}".format(fasta=self.fasta)
|
|
558 args = shlex.split(cmd)
|
|
559 if subprocess.check_call(args) == 0:
|
|
560 self.lastdb = self.fasta
|
|
561 else:
|
|
562 Warning("formatdb failed")
|
|
563
|
|
564
|
|
565 def create_hitsort(self, options, output=None):
|
|
566 '''
|
|
567 query is name of files to search against blastdb of SequenceSet object
|
|
568 query could be multiple files
|
|
569 this is inteded to be used for tabular output only
|
|
570 output file must be specified,
|
|
571 if more than one query is used, multiple files are created
|
|
572 worker - function to execute blast search
|
|
573 '''
|
|
574
|
|
575 logger.info('running all to all blast')
|
|
576 query = self.chunks
|
|
577 if not output:
|
|
578 output = "{}.blast".format(self.fasta)
|
|
579 database = getattr(self, options.database)
|
|
580 if len(query) > 1:
|
|
581 output_parts = [output + "." + str(i) for i in range(len(query))]
|
|
582 else:
|
|
583 output_parts = [output]
|
|
584 query = [query]
|
|
585
|
|
586 parallel(_hitsort_worker, query, [database],
|
|
587 output_parts, [options])
|
|
588 logger.info('all to all blast finished')
|
|
589 logger.info('removing duplicates from all to all blast results')
|
|
590 self._clean_hitsort(
|
|
591 hitsort_files=output_parts, filtered_hitsort=output)
|
|
592 # remove temp files:
|
|
593 for f in output_parts:
|
|
594 os.unlink(f)
|
|
595 self.hitsort = output
|
|
596 return output
|
|
597
|
|
598 def _clean_hitsort(self, hitsort_files, filtered_hitsort):
|
|
599 ''' alterantive hitsort duplicate removal '''
|
|
600
|
|
601 ids = self.keys()
|
|
602 Nfiles = 5 + int(sum([os.path.getsize(i)
|
|
603 for i in hitsort_files]) / 5e9)
|
|
604 hitsort_parts = [
|
|
605 "{0}.{1}.parts".format(filtered_hitsort, i) for i in range(Nfiles)]
|
|
606 # open files
|
|
607 fs = []
|
|
608 for i in hitsort_parts:
|
|
609 fs.append(open(i, 'w'))
|
|
610
|
|
611 ids_destination = {}
|
|
612 fs_iter = itertools.cycle(fs)
|
|
613 for i in ids:
|
|
614 #ids_destination[i] = fs_iter.next()
|
|
615 ids_destination[i] = next(fs_iter)
|
|
616 lines_out = lines_total = 0
|
|
617 temp_open = False
|
|
618
|
|
619 for i in hitsort_files:
|
|
620 f = open(i, 'r')
|
|
621 while True:
|
|
622 line = f.readline()
|
|
623 if not line:
|
|
624 break
|
|
625 lines_total += 1
|
|
626 line_items = line.split()
|
|
627 # ids_destination[line_items[0]].write(line)
|
|
628 # do not assume that it is sorted!
|
|
629 ids_destination[min(line_items[0:2])].write(line)
|
|
630 # close all files
|
|
631 for i in fs:
|
|
632 i.close()
|
|
633
|
|
634 # load one by one and exclude duplicates:
|
|
635 hit_out = open(filtered_hitsort, 'w')
|
|
636 for i in hitsort_parts:
|
|
637 f = open(i, 'r')
|
|
638 hitsort_shelve = {}
|
|
639 while True:
|
|
640 line = f.readline()
|
|
641 if not line:
|
|
642 break
|
|
643 line_items = line.split()
|
|
644 key = "\t".join(sorted(line_items[0:2]))
|
|
645 value = line_items
|
|
646 bitscore = float(line_items[2])
|
|
647 if key in hitsort_shelve:
|
|
648 if float(hitsort_shelve[key][2]) > bitscore:
|
|
649 hitsort_shelve[key] = value
|
|
650 hit_out.write("\t".join(hitsort_shelve[key]) + "\n")
|
|
651 del hitsort_shelve[key]
|
|
652 lines_out += 1
|
|
653 else:
|
|
654 hitsort_shelve[key] = value
|
|
655 f.close()
|
|
656 for key in hitsort_shelve:
|
|
657 hit_out.write("\t".join(hitsort_shelve[key]) + "\n")
|
|
658 lines_out += 1
|
|
659 # clean up
|
|
660 for i in hitsort_parts:
|
|
661 os.unlink(i)
|
|
662 hit_out.close()
|
|
663 return lines_total, lines_out
|
|
664
|
|
665 def _check_database(self, database):
|
|
666 '''
|
|
667 database is path to fastafile, database with same prefix should exist
|
|
668 it creates database if does not exist as temporal file!
|
|
669 '''
|
|
670 pass
|
|
671 # TODO
|
|
672
|
|
673 def remove_sequences_using_filter(self, fasta_file_filter, keep_proportion,
|
|
674 omitted_sequences_file,
|
|
675 kept_sequences_file):
|
|
676 '''
|
|
677 Run blast with fasta_file_filter against legaci blastdb of SequenceSet to detect sequences
|
|
678 which are to be removed from SequencesSet. Complete pairsare removed if paired sequences
|
|
679 are used.
|
|
680 '''
|
|
681 ids = blast_with_filter(fasta_file_filter, self.blastdb_legacy)
|
|
682 # check against database and remove sequences
|
|
683
|
|
684 c = self.conn.cursor()
|
|
685 if self.paired:
|
|
686 c.execute("SELECT rowid FROM sequences WHERE name IN {}".format(
|
|
687 format_query(ids)
|
|
688 ))
|
|
689 odd_rowids = set()
|
|
690 for i in c.fetchall():
|
|
691 if i[0] % 2 == 0:
|
|
692 odd_rowids.add(i[0] - 1)
|
|
693 else:
|
|
694 odd_rowids.add(i[0])
|
|
695 odd_rowids_keep = random.sample(odd_rowids, round(keep_proportion * len(odd_rowids)))
|
|
696 c.execute("SELECT name FROM sequences WHERE rowid IN {} ORDER BY rowid".format(
|
|
697 format_query(odd_rowids_keep + [i+1 for i in odd_rowids_keep])
|
|
698 ))
|
|
699 self.ids_kept = [i[0] for i in c.fetchall()]
|
|
700 odd_rowids_omit = list(odd_rowids.difference(odd_rowids_keep))
|
|
701 c.execute("SELECT name FROM sequences WHERE rowid IN {} ORDER BY rowid".format(
|
|
702 format_query(odd_rowids_omit + [i+1 for i in odd_rowids_omit])
|
|
703 ))
|
|
704 ids_omit = [i[0] for i in c.fetchall()]
|
|
705
|
|
706 else:
|
|
707 self.ids_kept = random.sample(ids, round(keep_proportion * len(ids)))
|
|
708 ids_omit = list(ids.difference(self.ids_kept))
|
|
709 self.save2fasta(omitted_sequences_file, keep=False, subset=ids_omit)
|
|
710 self.save2fasta(kept_sequences_file, keep=False, subset=self.ids_kept)
|
|
711 self.drop_sequences(ids_omit)
|
|
712 # save info about omited sequences
|
|
713 c.execute("CREATE TABLE ids_kept (name TEXT)")
|
|
714 c.executemany("INSERT INTO ids_kept (name) values (?)", [(i,) for i in self.ids_kept])
|
|
715 return len(ids_omit)
|
|
716
|
|
717 def drop_sequences(self, ids_to_drop):
|
|
718 '''
|
|
719 remove sequences from sequence set based on the ID
|
|
720 '''
|
|
721 self._original_length = len(self)
|
|
722 c = self.conn.cursor()
|
|
723 idslist = format_query(ids_to_drop)
|
|
724 c.execute("CREATE TABLE omited_sequences AS SELECT * FROM sequences WHERE name IN {}".format(idslist))
|
|
725 c.execute("DELETE FROM sequences WHERE name IN {}".format(
|
|
726 idslist
|
|
727 ))
|
|
728 # new databases must be created - with ommited sequences!
|
|
729 self.save2fasta(fasta_file_name=self.fasta) ## this will replace origninal file!
|
|
730 self._length = len(self)
|
|
731 self.makeblastdb(legacy=True)
|
|
732 self._update_database()
|
|
733
|
|
734 def annotate(self, database, annotation_name, directory="", params=""):
|
|
735 '''
|
|
736 database : path to blast formated database
|
|
737 method: type of search to use for annotation. it must be
|
|
738 'blastn' of 'blastx'
|
|
739 Annotation is save in directory also into the table with name stored in
|
|
740 annotation_name variable
|
|
741 '''
|
|
742 logger.info("annotating reads with {} ".format(annotation_name))
|
|
743 self._check_database(database)
|
|
744 if "parallelize" in params:
|
|
745 parallelize = params['parallelize']
|
|
746 else:
|
|
747 parallelize = True
|
|
748 if parallelize:
|
|
749 if not self.chunks:
|
|
750 self.make_chunks()
|
|
751 output = [
|
|
752 "{}/{}_{}".format(directory, annotation_name,os.path.basename(i)) for i in self.chunks]
|
|
753 parallel(blast_worker, self.chunks, [database], output, [params])
|
|
754 else:
|
|
755 # no explicit parallelization using parallel
|
|
756 single_output = "{}/{}_hits".format(directory, annotation_name)
|
|
757 params['args'] = params['args'].format(max_proc=get_max_proc())
|
|
758 blast_worker(self.fasta, database, single_output,params)
|
|
759 output = [single_output]
|
|
760
|
|
761 # put into as new attribute and sqlite table
|
|
762 c = self.conn.cursor()
|
|
763 c.execute("create table {} (name text, db_id text, length real, bitscore real , pid real)".format(
|
|
764 annotation_name))
|
|
765 for blast_results in output:
|
|
766 with open(blast_results, 'r') as f:
|
|
767 for i in f:
|
|
768 items = i.split()
|
|
769 types = [str, str, float, float, float, float, float]
|
|
770 qseqid, sseqid, qlen, slen, length, pident, bitscore = [
|
|
771 f(i) for i, f in zip(items, types)]
|
|
772 c.execute('insert into {} values ("{}", "{}", "{}", "{}", "{}")'.format(
|
|
773 annotation_name, qseqid, sseqid, length, bitscore, pident))
|
|
774 self.conn.commit()
|
|
775 self.annotations.append(annotation_name)
|
|
776
|
|
777 @staticmethod
|
|
778 def fasta_length(fasta_file_name):
|
|
779 '''
|
|
780 count number of sequences,
|
|
781 '''
|
|
782 with open(fasta_file_name, 'r') as f:
|
|
783 N = 0
|
|
784 for i in f:
|
|
785 if i.strip()[0] == ">":
|
|
786 N += 1
|
|
787 return N
|
|
788
|
|
789
|
|
790 # test
|
|
791 if __name__ == "__main__":
|
|
792 # get root directory
|
|
793
|
|
794 MAIN_DIR = os.path.realpath(
|
|
795 os.path.dirname(os.path.realpath(__file__)) + "/..")
|
|
796 print("MAIN_DIR:")
|
|
797 print(MAIN_DIR)
|
|
798 TMP_DIR = "{}/tmp".format(MAIN_DIR)
|
|
799 TEST_DATA_DIR = "{}/test_data".format(MAIN_DIR)
|
|
800
|
|
801 # some data:
|
|
802 fn = "{}/pair_reads.fasta".format(TEST_DATA_DIR)
|
|
803 os.chdir(TMP_DIR)
|
|
804 # s = SequenceSet(filename='sqlite.db')
|
|
805 print("number of sequences in fasta file:")
|
|
806 print(SequenceSet.fasta_length(fn))
|
|
807
|
|
808 print("loading sequences...")
|
|
809 s = SequenceSet(source=fn, paired=True, rename=False,
|
|
810 filename='sqlite.db', prefix_length=4, fasta='sample2.fasta')
|
|
811 print(s)
|
|
812
|
|
813 print("creating blast database")
|
|
814 s.makeblastdb()
|
|
815
|
|
816 print("creating chunks from fasta file: ")
|
|
817 file_list = s.make_chunks("fasta_chunks", chunk_size=500)
|
|
818 print(file_list)
|
|
819
|
|
820 print('creating hitsort')
|
|
821 s.create_hitsort(file_list, "hitsort")
|
|
822
|
|
823 print("saving to fasta file")
|
|
824 s.save2fasta('sampleX.fasta', keep=False)
|