Mercurial > repos > cstrittmatter > test_eurl_vtec_wgs_pt
comparison scripts/ReMatCh/modules/download.py @ 0:965517909457 draft
planemo upload commit 15239f1674081ab51ab8dd75a9a40cf1bfaa93e8
author | cstrittmatter |
---|---|
date | Wed, 22 Jan 2020 08:41:44 -0500 |
parents | |
children | 0cbed1c0a762 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:965517909457 |
---|---|
1 import utils | |
2 import os.path | |
3 import multiprocessing | |
4 import sys | |
5 import functools | |
6 import time | |
7 | |
8 | |
9 def getReadRunInfo(ena_id): | |
10 import urllib | |
11 | |
12 url = 'http://www.ebi.ac.uk/ena/data/warehouse/filereport?accession=' + ena_id + '&result=read_run' | |
13 | |
14 readRunInfo = None | |
15 try: | |
16 url = urllib.urlopen(url) | |
17 readRunInfo = url.read().splitlines() | |
18 if len(readRunInfo) <= 1: | |
19 readRunInfo = None | |
20 except Exception as error: | |
21 print error | |
22 | |
23 return readRunInfo | |
24 | |
25 | |
26 def getDownloadInformation(readRunInfo): | |
27 header_line = readRunInfo[0].split('\t') | |
28 info_line = readRunInfo[1].split('\t') | |
29 | |
30 downloadInformation = {'fastq': None, 'submitted': None, 'cram_index': None} | |
31 download_types = ['aspera', 'ftp'] | |
32 | |
33 for i in range(0, len(header_line)): | |
34 header = header_line[i].lower().rsplit('_', 1) | |
35 if header[0] in downloadInformation.keys(): | |
36 if header[1] in download_types: | |
37 if len(info_line[i]) > 0: | |
38 files_path = info_line[i].split(';') | |
39 if len(files_path) > 2: | |
40 print 'WARNING: Were found more files than expected in ' + header[1] + ' download links!' | |
41 if downloadInformation[header[0]] is None: | |
42 downloadInformation[header[0]] = {} | |
43 downloadInformation[header[0]][header[1]] = files_path | |
44 | |
45 return downloadInformation | |
46 | |
47 | |
48 def getSequencingInformation(readRunInfo): | |
49 header_line = readRunInfo[0].split('\t') | |
50 info_line = readRunInfo[1].split('\t') | |
51 | |
52 sequencingInformation = {'run_accession': None, 'instrument_platform': None, 'instrument_model': None, 'library_layout': None, 'library_source': None, 'extra_run_accession': None, 'nominal_length': None, 'read_count': None, 'base_count': None} | |
53 | |
54 for i in range(0, len(header_line)): | |
55 header = header_line[i].lower() | |
56 if header in sequencingInformation.keys(): | |
57 if len(info_line[i]) > 0: | |
58 sequencingInformation[header] = info_line[i] | |
59 | |
60 if len(readRunInfo) > 2: | |
61 extra_run_accession = [] | |
62 for i in range(2, len(readRunInfo)): | |
63 info = readRunInfo[i].split('\t') | |
64 for j in range(0, len(header_line)): | |
65 header = header_line[j].lower() | |
66 if header == 'run_accession': | |
67 if len(info[j]) > 0: | |
68 extra_run_accession.append(info[j]) | |
69 if len(extra_run_accession) >= 1: | |
70 sequencingInformation['extra_run_accession'] = ','.join(extra_run_accession) | |
71 | |
72 return sequencingInformation | |
73 | |
74 | |
75 @utils.trace_unhandled_exceptions | |
76 def downloadWithAspera(aspera_file_path, asperaKey, outdir, pickle_prefix): | |
77 command = ['ascp', '-QT', '-l', '300m', '-P33001', '-i', asperaKey, str('era-fasp@' + aspera_file_path), outdir] | |
78 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True) | |
79 | |
80 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + aspera_file_path.rsplit('/', 1)[1])) | |
81 | |
82 | |
83 @utils.trace_unhandled_exceptions | |
84 def downloadWithWget(ftp_file_path, outdir, pickle_prefix): | |
85 file_download = ftp_file_path.rsplit('/', 1)[1] | |
86 command = ['wget', '--tries=2', ftp_file_path, '-O', os.path.join(outdir, file_download)] | |
87 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True) | |
88 | |
89 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + file_download)) | |
90 | |
91 | |
92 @utils.trace_unhandled_exceptions | |
93 def downloadWithCurl(ftp_file_path, outdir, pickle_prefix): | |
94 file_download = ftp_file_path.rsplit('/', 1)[1] | |
95 command = ['curl', '--retry', '2', ftp_file_path, '-O', os.path.join(outdir, file_download)] | |
96 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True) | |
97 | |
98 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + file_download)) | |
99 | |
100 | |
101 def getPickleRunSuccessfully(directory, pickle_prefix): | |
102 run_successfully = True | |
103 read_pickle = False | |
104 | |
105 files = findFiles(directory, pickle_prefix, '.pkl') | |
106 if files is not None: | |
107 for file_found in files: | |
108 if run_successfully: | |
109 run_successfully = utils.extractVariableFromPickle(file_found) | |
110 read_pickle = True | |
111 | |
112 os.remove(file_found) | |
113 | |
114 if not read_pickle: | |
115 run_successfully = False | |
116 | |
117 return run_successfully | |
118 | |
119 | |
120 def curl_installed(): | |
121 command = ['which', 'curl'] | |
122 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, False) | |
123 return run_successfully | |
124 | |
125 | |
126 def download(downloadInformation_type, asperaKey, outdir): | |
127 pickle_prefix = 'download' | |
128 | |
129 run_successfully = False | |
130 | |
131 if asperaKey is not None and downloadInformation_type['aspera'] is not None: | |
132 pool = multiprocessing.Pool(processes=2) | |
133 for file_download in downloadInformation_type['aspera']: | |
134 pool.apply_async(downloadWithAspera, args=(file_download, asperaKey, outdir, pickle_prefix,)) | |
135 pool.close() | |
136 pool.join() | |
137 | |
138 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix) | |
139 | |
140 if downloadInformation_type['ftp'] is not None and not run_successfully: | |
141 if curl_installed(): | |
142 pool = multiprocessing.Pool(processes=2) | |
143 for file_download in downloadInformation_type['ftp']: | |
144 pool.apply_async(downloadWithCurl, args=(file_download, outdir, pickle_prefix,)) | |
145 pool.close() | |
146 pool.join() | |
147 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix) | |
148 if not run_successfully: | |
149 pool = multiprocessing.Pool(processes=2) | |
150 for file_download in downloadInformation_type['ftp']: | |
151 pool.apply_async(downloadWithWget, args=(file_download, outdir, pickle_prefix,)) | |
152 pool.close() | |
153 pool.join() | |
154 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix) | |
155 | |
156 return run_successfully | |
157 | |
158 | |
159 def downloadFiles(downloadInformation, asperaKey, outdir, download_cram_bam_True): | |
160 run_successfully = False | |
161 cram_index_run_successfully = False | |
162 | |
163 if downloadInformation['fastq'] is not None: | |
164 run_successfully = download(downloadInformation['fastq'], asperaKey, outdir) | |
165 | |
166 if not run_successfully: | |
167 if downloadInformation['submitted'] is not None: | |
168 if not download_cram_bam_True: | |
169 cram_bam = False | |
170 for i in downloadInformation['submitted']: | |
171 if downloadInformation['submitted'][i][0].endswith(('.cram', '.bam')): | |
172 cram_bam = True | |
173 break | |
174 if not cram_bam: | |
175 run_successfully = download(downloadInformation['submitted'], asperaKey, outdir) | |
176 | |
177 elif download_cram_bam_True: | |
178 run_successfully = download(downloadInformation['submitted'], asperaKey, outdir) | |
179 if run_successfully and downloadInformation['cram_index'] is not None: | |
180 cram_index_run_successfully = download(downloadInformation['cram_index'], asperaKey, outdir) | |
181 | |
182 return run_successfully, cram_index_run_successfully | |
183 | |
184 | |
185 def sortAlignment(alignment_file, output_file, sortByName_True, threads): | |
186 outFormat_string = os.path.splitext(output_file)[1][1:].lower() | |
187 command = ['samtools', 'sort', '-o', output_file, '-O', outFormat_string, '', '-@', str(threads), alignment_file] | |
188 if sortByName_True: | |
189 command[6] = '-n' | |
190 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, True) | |
191 | |
192 if not run_successfully: | |
193 output_file = None | |
194 | |
195 return run_successfully, output_file | |
196 | |
197 | |
198 def alignmentToFastq(alignment_file, outdir, threads, pair_end_type): | |
199 fastq_basename = os.path.splitext(alignment_file)[0] | |
200 outfiles = None | |
201 bamFile = fastq_basename + '.temp.bam' | |
202 # sort cram | |
203 run_successfully, bamFile = sortAlignment(alignment_file, bamFile, True, threads) | |
204 if run_successfully: | |
205 command = ['samtools', 'fastq', '', bamFile] | |
206 if pair_end_type.lower() == 'paired': | |
207 command[2] = '-1 ' + str(fastq_basename + '_1.fq') + ' -2 ' + str(fastq_basename + '_2.fq') | |
208 elif pair_end_type == 'single': | |
209 command[2] = '-0 ' + str(fastq_basename + '.fq') | |
210 | |
211 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, True) | |
212 if run_successfully: | |
213 if pair_end_type.lower() == 'paired': | |
214 outfiles = [str(fastq_basename + '_1.fq'), str(fastq_basename + '_2.fq')] | |
215 elif pair_end_type.lower() == 'single': | |
216 outfiles = [str(fastq_basename + '.fq')] | |
217 | |
218 if os.path.isfile(bamFile): | |
219 os.remove(bamFile) | |
220 | |
221 return run_successfully, outfiles | |
222 | |
223 | |
224 def formartFastqHeaders(in_fastq_1, in_fastq_2): | |
225 import itertools | |
226 | |
227 out_fastq_1 = in_fastq_1 + '.temp' | |
228 out_fastq_2 = in_fastq_2 + '.temp' | |
229 writer_in_fastq_1 = open(out_fastq_1, 'wt') | |
230 writer_in_fastq_2 = open(out_fastq_2, 'wt') | |
231 outfiles = [out_fastq_1, out_fastq_2] | |
232 with open(in_fastq_1, 'rtU') as reader_in_fastq_1, open(in_fastq_2, 'rtU') as reader_in_fastq_2: | |
233 plus_line = True | |
234 quality_line = True | |
235 number_reads = 0 | |
236 for in_1, in_2 in itertools.izip(reader_in_fastq_1, reader_in_fastq_2): | |
237 if len(in_1) > 0: | |
238 in_1 = in_1.splitlines()[0] | |
239 in_2 = in_2.splitlines()[0] | |
240 if in_1.startswith('@') and plus_line and quality_line: | |
241 if in_1 != in_2: | |
242 sys.exit('The PE fastq files are not aligned properly!') | |
243 in_1 += '/1' + '\n' | |
244 in_2 += '/2' + '\n' | |
245 writer_in_fastq_1.write(in_1) | |
246 writer_in_fastq_2.write(in_2) | |
247 plus_line = False | |
248 quality_line = False | |
249 elif in_1.startswith('+') and not plus_line: | |
250 in_1 += '\n' | |
251 writer_in_fastq_1.write(in_1) | |
252 writer_in_fastq_2.write(in_1) | |
253 plus_line = True | |
254 elif plus_line and not quality_line: | |
255 in_1 += '\n' | |
256 in_2 += '\n' | |
257 writer_in_fastq_1.write(in_1) | |
258 writer_in_fastq_2.write(in_2) | |
259 writer_in_fastq_1.flush() | |
260 writer_in_fastq_2.flush() | |
261 number_reads += 1 | |
262 quality_line = True | |
263 else: | |
264 in_1 += '\n' | |
265 in_2 += '\n' | |
266 writer_in_fastq_1.write(in_1) | |
267 writer_in_fastq_2.write(in_2) | |
268 return number_reads, outfiles | |
269 | |
270 | |
271 @utils.trace_unhandled_exceptions | |
272 def gzipFiles(file_2_compress, pickle_prefix, outdir): | |
273 out_file = None | |
274 if file_2_compress.endswith('.temp'): | |
275 out_file = os.path.splitext(file_2_compress)[0] | |
276 else: | |
277 out_file = file_2_compress | |
278 | |
279 command = ['gzip', '--stdout', '--best', file_2_compress, '>', str(out_file + '.gz')] | |
280 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, True, None, True) | |
281 if run_successfully: | |
282 os.remove(file_2_compress) | |
283 | |
284 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + os.path.basename(file_2_compress))) | |
285 | |
286 | |
287 def findFiles(directory, prefix, suffix): | |
288 list_files_found = [] | |
289 files = [f for f in os.listdir(directory) if not f.startswith('.') and os.path.isfile(os.path.join(directory, f))] | |
290 for file_found in files: | |
291 if file_found.startswith(prefix) and file_found.endswith(suffix): | |
292 file_path = os.path.join(directory, file_found) | |
293 list_files_found.append(file_path) | |
294 | |
295 if len(list_files_found) == 0: | |
296 list_files_found = None | |
297 | |
298 return list_files_found | |
299 | |
300 | |
301 def compressFiles(fastq_files, outdir, threads): | |
302 pickle_prefix = 'compress' | |
303 compressed_fastq_files = None | |
304 | |
305 pool = multiprocessing.Pool(processes=threads) | |
306 for fastq in fastq_files: | |
307 pool.apply_async(gzipFiles, args=(fastq, pickle_prefix, outdir,)) | |
308 pool.close() | |
309 pool.join() | |
310 | |
311 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix) | |
312 if run_successfully: | |
313 compressed_fastq_files = findFiles(outdir, '', '.gz') | |
314 | |
315 return run_successfully, compressed_fastq_files | |
316 | |
317 | |
318 def bamCram_2_fastq(alignment_file, outdir, threads, pair_end_type): | |
319 run_successfully, fastq_files = alignmentToFastq(alignment_file, outdir, threads, pair_end_type) | |
320 if run_successfully: | |
321 if pair_end_type.lower() == 'paired': | |
322 number_reads, fastq_files = formartFastqHeaders(fastq_files[0], fastq_files[1]) | |
323 | |
324 run_successfully, fastq_files = compressFiles(fastq_files, outdir, threads) | |
325 | |
326 return run_successfully, fastq_files | |
327 | |
328 | |
329 def check_correct_links(downloadInformation): | |
330 for i in downloadInformation: | |
331 if downloadInformation[i] is not None: | |
332 if downloadInformation[i]['aspera'] is not None: | |
333 for j in range(0, len(downloadInformation[i]['aspera'])): | |
334 if downloadInformation[i]['aspera'][j].startswith('fasp.sra.ebi.ac.uk/'): | |
335 downloadInformation[i]['aspera'][j] = downloadInformation[i]['aspera'][j].replace('fasp.sra.ebi.ac.uk/', 'fasp.sra.ebi.ac.uk:/', 1) | |
336 if downloadInformation[i]['ftp'] is not None: | |
337 for j in range(0, len(downloadInformation[i]['ftp'])): | |
338 if '#' in downloadInformation[i]['ftp'][j]: | |
339 downloadInformation[i]['ftp'][j] = downloadInformation[i]['ftp'][j].replace('#', '%23') | |
340 return downloadInformation | |
341 | |
342 | |
343 def get_fastq_files(download_dir, cram_index_run_successfully, threads, download_paired_type): | |
344 run_successfully = False | |
345 downloaded_files = findFiles(download_dir, '', '') | |
346 if cram_index_run_successfully: | |
347 cram_file = None | |
348 for i in downloaded_files: | |
349 if i.endswith('.cram'): | |
350 cram_file = i | |
351 run_successfully, downloaded_files = bamCram_2_fastq(cram_file, download_dir, threads, download_paired_type) | |
352 else: | |
353 if len(downloaded_files) > 0: | |
354 run_successfully = True | |
355 | |
356 return run_successfully, downloaded_files | |
357 | |
358 | |
359 def rename_move_files(list_files, new_name, outdir, download_paired_type): | |
360 list_new_files = {} | |
361 run_successfully = False | |
362 | |
363 for i in range(0, len(list_files)): | |
364 temp_name = utils.rchop(os.path.basename(list_files[i]), 'astq.gz') | |
365 if len(temp_name) == len(os.path.basename(list_files[i])): | |
366 temp_name = utils.rchop(os.path.basename(list_files[i]), 'q.gz') | |
367 if download_paired_type.lower() == 'paired': | |
368 if temp_name.endswith(('_R1_001.f', '_1.f')): | |
369 list_new_files[i] = os.path.join(outdir, new_name + '_1.fq.gz') | |
370 elif temp_name.endswith(('_R2_001.f', '_2.f')): | |
371 list_new_files[i] = os.path.join(outdir, new_name + '_2.fq.gz') | |
372 else: | |
373 if not temp_name.endswith(('_R1_001.f', '_R2_001.f')): | |
374 list_new_files[i] = os.path.join(outdir, new_name + '.fq.gz') | |
375 if temp_name.endswith(('_1.f', '_2.f')): | |
376 print 'WARNING: possible single-end file conflict with pair-end (' + list_files[i] + ')!' | |
377 | |
378 if len(list_new_files) == 2 and download_paired_type.lower() == 'paired': | |
379 run_successfully = True | |
380 elif len(list_new_files) == 1 and download_paired_type.lower() == 'single': | |
381 run_successfully = True | |
382 | |
383 if run_successfully: | |
384 try: | |
385 for i in range(0, len(list_files)): | |
386 if i not in list_new_files: | |
387 if os.path.isfile(list_files[i]): | |
388 os.remove(list_files[i]) | |
389 else: | |
390 os.rename(list_files[i], list_new_files[i]) | |
391 list_new_files = list_new_files.values() | |
392 except Exception as e: | |
393 print e | |
394 run_successfully = False | |
395 | |
396 if not run_successfully: | |
397 list_new_files = None | |
398 | |
399 return run_successfully, list_new_files | |
400 | |
401 | |
402 download_timer = functools.partial(utils.timer, name='Download module') | |
403 | |
404 | |
405 @download_timer | |
406 def runDownload(ena_id, download_paired_type, asperaKey, outdir, download_cram_bam_True, threads, instrument_platform): | |
407 download_dir = os.path.join(outdir, 'download', '') | |
408 utils.removeDirectory(download_dir) | |
409 os.mkdir(download_dir) | |
410 | |
411 run_successfully = False | |
412 downloaded_files = None | |
413 sequencingInformation = {'run_accession': None, 'instrument_platform': None, 'instrument_model': None, 'library_layout': None, 'library_source': None, 'extra_run_accession': None, 'nominal_length': None, 'read_count': None, 'base_count': None, 'date_download': None} | |
414 | |
415 readRunInfo = getReadRunInfo(ena_id) | |
416 if readRunInfo is not None: | |
417 downloadInformation = getDownloadInformation(readRunInfo) | |
418 downloadInformation = check_correct_links(downloadInformation) | |
419 sequencingInformation = getSequencingInformation(readRunInfo) | |
420 sequencingInformation['date_download'] = time.strftime("%Y-%m-%d") | |
421 | |
422 if instrument_platform.lower() == 'all' or (sequencingInformation['instrument_platform'] is not None and sequencingInformation['instrument_platform'].lower() == instrument_platform.lower()): | |
423 if download_paired_type.lower() == 'both' or (sequencingInformation['library_layout'] is not None and sequencingInformation['library_layout'].lower() == download_paired_type.lower()): | |
424 run_successfully, cram_index_run_successfully = downloadFiles(downloadInformation, asperaKey, download_dir, download_cram_bam_True) | |
425 if run_successfully: | |
426 run_successfully, downloaded_files = get_fastq_files(download_dir, cram_index_run_successfully, threads, sequencingInformation['library_layout']) | |
427 if run_successfully and downloaded_files is not None: | |
428 run_successfully, downloaded_files = rename_move_files(downloaded_files, sequencingInformation['run_accession'], outdir, sequencingInformation['library_layout']) | |
429 | |
430 utils.removeDirectory(download_dir) | |
431 | |
432 return run_successfully, downloaded_files, sequencingInformation |