comparison scripts/ReMatCh/modules/download.py @ 0:965517909457 draft

planemo upload commit 15239f1674081ab51ab8dd75a9a40cf1bfaa93e8
author cstrittmatter
date Wed, 22 Jan 2020 08:41:44 -0500
parents
children 0cbed1c0a762
comparison
equal deleted inserted replaced
-1:000000000000 0:965517909457
1 import utils
2 import os.path
3 import multiprocessing
4 import sys
5 import functools
6 import time
7
8
9 def getReadRunInfo(ena_id):
10 import urllib
11
12 url = 'http://www.ebi.ac.uk/ena/data/warehouse/filereport?accession=' + ena_id + '&result=read_run'
13
14 readRunInfo = None
15 try:
16 url = urllib.urlopen(url)
17 readRunInfo = url.read().splitlines()
18 if len(readRunInfo) <= 1:
19 readRunInfo = None
20 except Exception as error:
21 print error
22
23 return readRunInfo
24
25
26 def getDownloadInformation(readRunInfo):
27 header_line = readRunInfo[0].split('\t')
28 info_line = readRunInfo[1].split('\t')
29
30 downloadInformation = {'fastq': None, 'submitted': None, 'cram_index': None}
31 download_types = ['aspera', 'ftp']
32
33 for i in range(0, len(header_line)):
34 header = header_line[i].lower().rsplit('_', 1)
35 if header[0] in downloadInformation.keys():
36 if header[1] in download_types:
37 if len(info_line[i]) > 0:
38 files_path = info_line[i].split(';')
39 if len(files_path) > 2:
40 print 'WARNING: Were found more files than expected in ' + header[1] + ' download links!'
41 if downloadInformation[header[0]] is None:
42 downloadInformation[header[0]] = {}
43 downloadInformation[header[0]][header[1]] = files_path
44
45 return downloadInformation
46
47
48 def getSequencingInformation(readRunInfo):
49 header_line = readRunInfo[0].split('\t')
50 info_line = readRunInfo[1].split('\t')
51
52 sequencingInformation = {'run_accession': None, 'instrument_platform': None, 'instrument_model': None, 'library_layout': None, 'library_source': None, 'extra_run_accession': None, 'nominal_length': None, 'read_count': None, 'base_count': None}
53
54 for i in range(0, len(header_line)):
55 header = header_line[i].lower()
56 if header in sequencingInformation.keys():
57 if len(info_line[i]) > 0:
58 sequencingInformation[header] = info_line[i]
59
60 if len(readRunInfo) > 2:
61 extra_run_accession = []
62 for i in range(2, len(readRunInfo)):
63 info = readRunInfo[i].split('\t')
64 for j in range(0, len(header_line)):
65 header = header_line[j].lower()
66 if header == 'run_accession':
67 if len(info[j]) > 0:
68 extra_run_accession.append(info[j])
69 if len(extra_run_accession) >= 1:
70 sequencingInformation['extra_run_accession'] = ','.join(extra_run_accession)
71
72 return sequencingInformation
73
74
75 @utils.trace_unhandled_exceptions
76 def downloadWithAspera(aspera_file_path, asperaKey, outdir, pickle_prefix):
77 command = ['ascp', '-QT', '-l', '300m', '-P33001', '-i', asperaKey, str('era-fasp@' + aspera_file_path), outdir]
78 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True)
79
80 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + aspera_file_path.rsplit('/', 1)[1]))
81
82
83 @utils.trace_unhandled_exceptions
84 def downloadWithWget(ftp_file_path, outdir, pickle_prefix):
85 file_download = ftp_file_path.rsplit('/', 1)[1]
86 command = ['wget', '--tries=2', ftp_file_path, '-O', os.path.join(outdir, file_download)]
87 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True)
88
89 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + file_download))
90
91
92 @utils.trace_unhandled_exceptions
93 def downloadWithCurl(ftp_file_path, outdir, pickle_prefix):
94 file_download = ftp_file_path.rsplit('/', 1)[1]
95 command = ['curl', '--retry', '2', ftp_file_path, '-O', os.path.join(outdir, file_download)]
96 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True)
97
98 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + file_download))
99
100
101 def getPickleRunSuccessfully(directory, pickle_prefix):
102 run_successfully = True
103 read_pickle = False
104
105 files = findFiles(directory, pickle_prefix, '.pkl')
106 if files is not None:
107 for file_found in files:
108 if run_successfully:
109 run_successfully = utils.extractVariableFromPickle(file_found)
110 read_pickle = True
111
112 os.remove(file_found)
113
114 if not read_pickle:
115 run_successfully = False
116
117 return run_successfully
118
119
120 def curl_installed():
121 command = ['which', 'curl']
122 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, False)
123 return run_successfully
124
125
126 def download(downloadInformation_type, asperaKey, outdir):
127 pickle_prefix = 'download'
128
129 run_successfully = False
130
131 if asperaKey is not None and downloadInformation_type['aspera'] is not None:
132 pool = multiprocessing.Pool(processes=2)
133 for file_download in downloadInformation_type['aspera']:
134 pool.apply_async(downloadWithAspera, args=(file_download, asperaKey, outdir, pickle_prefix,))
135 pool.close()
136 pool.join()
137
138 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix)
139
140 if downloadInformation_type['ftp'] is not None and not run_successfully:
141 if curl_installed():
142 pool = multiprocessing.Pool(processes=2)
143 for file_download in downloadInformation_type['ftp']:
144 pool.apply_async(downloadWithCurl, args=(file_download, outdir, pickle_prefix,))
145 pool.close()
146 pool.join()
147 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix)
148 if not run_successfully:
149 pool = multiprocessing.Pool(processes=2)
150 for file_download in downloadInformation_type['ftp']:
151 pool.apply_async(downloadWithWget, args=(file_download, outdir, pickle_prefix,))
152 pool.close()
153 pool.join()
154 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix)
155
156 return run_successfully
157
158
159 def downloadFiles(downloadInformation, asperaKey, outdir, download_cram_bam_True):
160 run_successfully = False
161 cram_index_run_successfully = False
162
163 if downloadInformation['fastq'] is not None:
164 run_successfully = download(downloadInformation['fastq'], asperaKey, outdir)
165
166 if not run_successfully:
167 if downloadInformation['submitted'] is not None:
168 if not download_cram_bam_True:
169 cram_bam = False
170 for i in downloadInformation['submitted']:
171 if downloadInformation['submitted'][i][0].endswith(('.cram', '.bam')):
172 cram_bam = True
173 break
174 if not cram_bam:
175 run_successfully = download(downloadInformation['submitted'], asperaKey, outdir)
176
177 elif download_cram_bam_True:
178 run_successfully = download(downloadInformation['submitted'], asperaKey, outdir)
179 if run_successfully and downloadInformation['cram_index'] is not None:
180 cram_index_run_successfully = download(downloadInformation['cram_index'], asperaKey, outdir)
181
182 return run_successfully, cram_index_run_successfully
183
184
185 def sortAlignment(alignment_file, output_file, sortByName_True, threads):
186 outFormat_string = os.path.splitext(output_file)[1][1:].lower()
187 command = ['samtools', 'sort', '-o', output_file, '-O', outFormat_string, '', '-@', str(threads), alignment_file]
188 if sortByName_True:
189 command[6] = '-n'
190 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, True)
191
192 if not run_successfully:
193 output_file = None
194
195 return run_successfully, output_file
196
197
198 def alignmentToFastq(alignment_file, outdir, threads, pair_end_type):
199 fastq_basename = os.path.splitext(alignment_file)[0]
200 outfiles = None
201 bamFile = fastq_basename + '.temp.bam'
202 # sort cram
203 run_successfully, bamFile = sortAlignment(alignment_file, bamFile, True, threads)
204 if run_successfully:
205 command = ['samtools', 'fastq', '', bamFile]
206 if pair_end_type.lower() == 'paired':
207 command[2] = '-1 ' + str(fastq_basename + '_1.fq') + ' -2 ' + str(fastq_basename + '_2.fq')
208 elif pair_end_type == 'single':
209 command[2] = '-0 ' + str(fastq_basename + '.fq')
210
211 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, True)
212 if run_successfully:
213 if pair_end_type.lower() == 'paired':
214 outfiles = [str(fastq_basename + '_1.fq'), str(fastq_basename + '_2.fq')]
215 elif pair_end_type.lower() == 'single':
216 outfiles = [str(fastq_basename + '.fq')]
217
218 if os.path.isfile(bamFile):
219 os.remove(bamFile)
220
221 return run_successfully, outfiles
222
223
224 def formartFastqHeaders(in_fastq_1, in_fastq_2):
225 import itertools
226
227 out_fastq_1 = in_fastq_1 + '.temp'
228 out_fastq_2 = in_fastq_2 + '.temp'
229 writer_in_fastq_1 = open(out_fastq_1, 'wt')
230 writer_in_fastq_2 = open(out_fastq_2, 'wt')
231 outfiles = [out_fastq_1, out_fastq_2]
232 with open(in_fastq_1, 'rtU') as reader_in_fastq_1, open(in_fastq_2, 'rtU') as reader_in_fastq_2:
233 plus_line = True
234 quality_line = True
235 number_reads = 0
236 for in_1, in_2 in itertools.izip(reader_in_fastq_1, reader_in_fastq_2):
237 if len(in_1) > 0:
238 in_1 = in_1.splitlines()[0]
239 in_2 = in_2.splitlines()[0]
240 if in_1.startswith('@') and plus_line and quality_line:
241 if in_1 != in_2:
242 sys.exit('The PE fastq files are not aligned properly!')
243 in_1 += '/1' + '\n'
244 in_2 += '/2' + '\n'
245 writer_in_fastq_1.write(in_1)
246 writer_in_fastq_2.write(in_2)
247 plus_line = False
248 quality_line = False
249 elif in_1.startswith('+') and not plus_line:
250 in_1 += '\n'
251 writer_in_fastq_1.write(in_1)
252 writer_in_fastq_2.write(in_1)
253 plus_line = True
254 elif plus_line and not quality_line:
255 in_1 += '\n'
256 in_2 += '\n'
257 writer_in_fastq_1.write(in_1)
258 writer_in_fastq_2.write(in_2)
259 writer_in_fastq_1.flush()
260 writer_in_fastq_2.flush()
261 number_reads += 1
262 quality_line = True
263 else:
264 in_1 += '\n'
265 in_2 += '\n'
266 writer_in_fastq_1.write(in_1)
267 writer_in_fastq_2.write(in_2)
268 return number_reads, outfiles
269
270
271 @utils.trace_unhandled_exceptions
272 def gzipFiles(file_2_compress, pickle_prefix, outdir):
273 out_file = None
274 if file_2_compress.endswith('.temp'):
275 out_file = os.path.splitext(file_2_compress)[0]
276 else:
277 out_file = file_2_compress
278
279 command = ['gzip', '--stdout', '--best', file_2_compress, '>', str(out_file + '.gz')]
280 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, True, None, True)
281 if run_successfully:
282 os.remove(file_2_compress)
283
284 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + os.path.basename(file_2_compress)))
285
286
287 def findFiles(directory, prefix, suffix):
288 list_files_found = []
289 files = [f for f in os.listdir(directory) if not f.startswith('.') and os.path.isfile(os.path.join(directory, f))]
290 for file_found in files:
291 if file_found.startswith(prefix) and file_found.endswith(suffix):
292 file_path = os.path.join(directory, file_found)
293 list_files_found.append(file_path)
294
295 if len(list_files_found) == 0:
296 list_files_found = None
297
298 return list_files_found
299
300
301 def compressFiles(fastq_files, outdir, threads):
302 pickle_prefix = 'compress'
303 compressed_fastq_files = None
304
305 pool = multiprocessing.Pool(processes=threads)
306 for fastq in fastq_files:
307 pool.apply_async(gzipFiles, args=(fastq, pickle_prefix, outdir,))
308 pool.close()
309 pool.join()
310
311 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix)
312 if run_successfully:
313 compressed_fastq_files = findFiles(outdir, '', '.gz')
314
315 return run_successfully, compressed_fastq_files
316
317
318 def bamCram_2_fastq(alignment_file, outdir, threads, pair_end_type):
319 run_successfully, fastq_files = alignmentToFastq(alignment_file, outdir, threads, pair_end_type)
320 if run_successfully:
321 if pair_end_type.lower() == 'paired':
322 number_reads, fastq_files = formartFastqHeaders(fastq_files[0], fastq_files[1])
323
324 run_successfully, fastq_files = compressFiles(fastq_files, outdir, threads)
325
326 return run_successfully, fastq_files
327
328
329 def check_correct_links(downloadInformation):
330 for i in downloadInformation:
331 if downloadInformation[i] is not None:
332 if downloadInformation[i]['aspera'] is not None:
333 for j in range(0, len(downloadInformation[i]['aspera'])):
334 if downloadInformation[i]['aspera'][j].startswith('fasp.sra.ebi.ac.uk/'):
335 downloadInformation[i]['aspera'][j] = downloadInformation[i]['aspera'][j].replace('fasp.sra.ebi.ac.uk/', 'fasp.sra.ebi.ac.uk:/', 1)
336 if downloadInformation[i]['ftp'] is not None:
337 for j in range(0, len(downloadInformation[i]['ftp'])):
338 if '#' in downloadInformation[i]['ftp'][j]:
339 downloadInformation[i]['ftp'][j] = downloadInformation[i]['ftp'][j].replace('#', '%23')
340 return downloadInformation
341
342
343 def get_fastq_files(download_dir, cram_index_run_successfully, threads, download_paired_type):
344 run_successfully = False
345 downloaded_files = findFiles(download_dir, '', '')
346 if cram_index_run_successfully:
347 cram_file = None
348 for i in downloaded_files:
349 if i.endswith('.cram'):
350 cram_file = i
351 run_successfully, downloaded_files = bamCram_2_fastq(cram_file, download_dir, threads, download_paired_type)
352 else:
353 if len(downloaded_files) > 0:
354 run_successfully = True
355
356 return run_successfully, downloaded_files
357
358
359 def rename_move_files(list_files, new_name, outdir, download_paired_type):
360 list_new_files = {}
361 run_successfully = False
362
363 for i in range(0, len(list_files)):
364 temp_name = utils.rchop(os.path.basename(list_files[i]), 'astq.gz')
365 if len(temp_name) == len(os.path.basename(list_files[i])):
366 temp_name = utils.rchop(os.path.basename(list_files[i]), 'q.gz')
367 if download_paired_type.lower() == 'paired':
368 if temp_name.endswith(('_R1_001.f', '_1.f')):
369 list_new_files[i] = os.path.join(outdir, new_name + '_1.fq.gz')
370 elif temp_name.endswith(('_R2_001.f', '_2.f')):
371 list_new_files[i] = os.path.join(outdir, new_name + '_2.fq.gz')
372 else:
373 if not temp_name.endswith(('_R1_001.f', '_R2_001.f')):
374 list_new_files[i] = os.path.join(outdir, new_name + '.fq.gz')
375 if temp_name.endswith(('_1.f', '_2.f')):
376 print 'WARNING: possible single-end file conflict with pair-end (' + list_files[i] + ')!'
377
378 if len(list_new_files) == 2 and download_paired_type.lower() == 'paired':
379 run_successfully = True
380 elif len(list_new_files) == 1 and download_paired_type.lower() == 'single':
381 run_successfully = True
382
383 if run_successfully:
384 try:
385 for i in range(0, len(list_files)):
386 if i not in list_new_files:
387 if os.path.isfile(list_files[i]):
388 os.remove(list_files[i])
389 else:
390 os.rename(list_files[i], list_new_files[i])
391 list_new_files = list_new_files.values()
392 except Exception as e:
393 print e
394 run_successfully = False
395
396 if not run_successfully:
397 list_new_files = None
398
399 return run_successfully, list_new_files
400
401
402 download_timer = functools.partial(utils.timer, name='Download module')
403
404
405 @download_timer
406 def runDownload(ena_id, download_paired_type, asperaKey, outdir, download_cram_bam_True, threads, instrument_platform):
407 download_dir = os.path.join(outdir, 'download', '')
408 utils.removeDirectory(download_dir)
409 os.mkdir(download_dir)
410
411 run_successfully = False
412 downloaded_files = None
413 sequencingInformation = {'run_accession': None, 'instrument_platform': None, 'instrument_model': None, 'library_layout': None, 'library_source': None, 'extra_run_accession': None, 'nominal_length': None, 'read_count': None, 'base_count': None, 'date_download': None}
414
415 readRunInfo = getReadRunInfo(ena_id)
416 if readRunInfo is not None:
417 downloadInformation = getDownloadInformation(readRunInfo)
418 downloadInformation = check_correct_links(downloadInformation)
419 sequencingInformation = getSequencingInformation(readRunInfo)
420 sequencingInformation['date_download'] = time.strftime("%Y-%m-%d")
421
422 if instrument_platform.lower() == 'all' or (sequencingInformation['instrument_platform'] is not None and sequencingInformation['instrument_platform'].lower() == instrument_platform.lower()):
423 if download_paired_type.lower() == 'both' or (sequencingInformation['library_layout'] is not None and sequencingInformation['library_layout'].lower() == download_paired_type.lower()):
424 run_successfully, cram_index_run_successfully = downloadFiles(downloadInformation, asperaKey, download_dir, download_cram_bam_True)
425 if run_successfully:
426 run_successfully, downloaded_files = get_fastq_files(download_dir, cram_index_run_successfully, threads, sequencingInformation['library_layout'])
427 if run_successfully and downloaded_files is not None:
428 run_successfully, downloaded_files = rename_move_files(downloaded_files, sequencingInformation['run_accession'], outdir, sequencingInformation['library_layout'])
429
430 utils.removeDirectory(download_dir)
431
432 return run_successfully, downloaded_files, sequencingInformation