Mercurial > repos > cstrittmatter > test_eurl_vtec_wgs_pt
comparison scripts/ReMatCh/modules/download.py @ 3:0cbed1c0a762 draft default tip
planemo upload commit 15239f1674081ab51ab8dd75a9a40cf1bfaa93e8
author | cstrittmatter |
---|---|
date | Tue, 28 Jan 2020 10:42:31 -0500 |
parents | 965517909457 |
children |
comparison
equal
deleted
inserted
replaced
2:6837f733b4aa | 3:0cbed1c0a762 |
---|---|
1 import utils | |
2 import os.path | 1 import os.path |
3 import multiprocessing | 2 import multiprocessing |
4 import sys | 3 import sys |
5 import functools | 4 import functools |
6 import time | 5 import time |
7 | 6 import subprocess |
8 | 7 |
9 def getReadRunInfo(ena_id): | 8 try: |
10 import urllib | 9 import modules.utils as utils |
10 except ImportError: | |
11 from ReMatCh.modules import utils as utils | |
12 | |
13 | |
14 def get_read_run_info(ena_id): | |
15 import urllib.request | |
11 | 16 |
12 url = 'http://www.ebi.ac.uk/ena/data/warehouse/filereport?accession=' + ena_id + '&result=read_run' | 17 url = 'http://www.ebi.ac.uk/ena/data/warehouse/filereport?accession=' + ena_id + '&result=read_run' |
13 | 18 |
14 readRunInfo = None | 19 read_run_info = None |
15 try: | 20 try: |
16 url = urllib.urlopen(url) | 21 url = urllib.request.urlopen(url) |
17 readRunInfo = url.read().splitlines() | 22 read_run_info = url.read().decode("utf8").splitlines() |
18 if len(readRunInfo) <= 1: | 23 if len(read_run_info) <= 1: |
19 readRunInfo = None | 24 read_run_info = None |
20 except Exception as error: | 25 except Exception as error: |
21 print error | 26 print(error) |
22 | 27 |
23 return readRunInfo | 28 return read_run_info |
24 | 29 |
25 | 30 |
26 def getDownloadInformation(readRunInfo): | 31 def get_download_information(read_run_info): |
27 header_line = readRunInfo[0].split('\t') | 32 header_line = read_run_info[0].split('\t') |
28 info_line = readRunInfo[1].split('\t') | 33 info_line = read_run_info[1].split('\t') |
29 | 34 |
30 downloadInformation = {'fastq': None, 'submitted': None, 'cram_index': None} | 35 download_information = {'fastq': None, 'submitted': None, 'cram_index': None} |
31 download_types = ['aspera', 'ftp'] | 36 download_types = ['aspera', 'ftp'] |
32 | 37 |
33 for i in range(0, len(header_line)): | 38 for i in range(0, len(header_line)): |
34 header = header_line[i].lower().rsplit('_', 1) | 39 header = header_line[i].lower().rsplit('_', 1) |
35 if header[0] in downloadInformation.keys(): | 40 if header[0] in list(download_information.keys()): |
36 if header[1] in download_types: | 41 if header[1] in download_types: |
37 if len(info_line[i]) > 0: | 42 if len(info_line[i]) > 0: |
38 files_path = info_line[i].split(';') | 43 files_path = info_line[i].split(';') |
39 if len(files_path) > 2: | 44 if len(files_path) > 2: |
40 print 'WARNING: Were found more files than expected in ' + header[1] + ' download links!' | 45 print('WARNING: Were found more files than expected in' |
41 if downloadInformation[header[0]] is None: | 46 ' {download_information}-{download_types} download' |
42 downloadInformation[header[0]] = {} | 47 ' links!'.format(download_information=header[0], download_types=header[1])) |
43 downloadInformation[header[0]][header[1]] = files_path | 48 if download_information[header[0]] is None: |
44 | 49 download_information[header[0]] = {} |
45 return downloadInformation | 50 download_information[header[0]][header[1]] = files_path |
46 | 51 |
47 | 52 return download_information |
48 def getSequencingInformation(readRunInfo): | 53 |
49 header_line = readRunInfo[0].split('\t') | 54 |
50 info_line = readRunInfo[1].split('\t') | 55 def get_sequencing_information(read_run_info): |
51 | 56 header_line = read_run_info[0].split('\t') |
52 sequencingInformation = {'run_accession': None, 'instrument_platform': None, 'instrument_model': None, 'library_layout': None, 'library_source': None, 'extra_run_accession': None, 'nominal_length': None, 'read_count': None, 'base_count': None} | 57 info_line = read_run_info[1].split('\t') |
58 | |
59 sequencing_information = {'run_accession': None, 'instrument_platform': None, 'instrument_model': None, | |
60 'library_layout': None, 'library_source': None, 'extra_run_accession': None, | |
61 'nominal_length': None, 'read_count': None, 'base_count': None, | |
62 'date_download': time.strftime("%Y-%m-%d")} | |
53 | 63 |
54 for i in range(0, len(header_line)): | 64 for i in range(0, len(header_line)): |
55 header = header_line[i].lower() | 65 header = header_line[i].lower() |
56 if header in sequencingInformation.keys(): | 66 if header in list(sequencing_information.keys()): |
57 if len(info_line[i]) > 0: | 67 if len(info_line[i]) > 0: |
58 sequencingInformation[header] = info_line[i] | 68 sequencing_information[header] = info_line[i] |
59 | 69 |
60 if len(readRunInfo) > 2: | 70 if len(read_run_info) > 2: |
61 extra_run_accession = [] | 71 extra_run_accession = [] |
62 for i in range(2, len(readRunInfo)): | 72 for i in range(2, len(read_run_info)): |
63 info = readRunInfo[i].split('\t') | 73 info = read_run_info[i].split('\t') |
64 for j in range(0, len(header_line)): | 74 for j in range(0, len(header_line)): |
65 header = header_line[j].lower() | 75 header = header_line[j].lower() |
66 if header == 'run_accession': | 76 if header == 'run_accession': |
67 if len(info[j]) > 0: | 77 if len(info[j]) > 0: |
68 extra_run_accession.append(info[j]) | 78 extra_run_accession.append(info[j]) |
69 if len(extra_run_accession) >= 1: | 79 if len(extra_run_accession) >= 1: |
70 sequencingInformation['extra_run_accession'] = ','.join(extra_run_accession) | 80 sequencing_information['extra_run_accession'] = ','.join(extra_run_accession) |
71 | 81 |
72 return sequencingInformation | 82 return sequencing_information |
73 | 83 |
74 | 84 |
75 @utils.trace_unhandled_exceptions | 85 @utils.trace_unhandled_exceptions |
76 def downloadWithAspera(aspera_file_path, asperaKey, outdir, pickle_prefix): | 86 def download_with_aspera(aspera_file_path, aspera_key, outdir, pickle_prefix, sra, ena_id): |
77 command = ['ascp', '-QT', '-l', '300m', '-P33001', '-i', asperaKey, str('era-fasp@' + aspera_file_path), outdir] | 87 command = ['ascp', '-QT', '-l', '300m', '', '-i', aspera_key, '', outdir] |
78 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True) | 88 if not sra: |
79 | 89 command[4] = '-P33001' |
80 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + aspera_file_path.rsplit('/', 1)[1])) | 90 command[7] = str('era-fasp@' + aspera_file_path) |
91 pickle = pickle_prefix + '.' + aspera_file_path.rsplit('/', 1)[1] | |
92 else: | |
93 command[7] = 'anonftp@ftp.ncbi.nlm.nih.gov:/sra/sra-instant/reads/ByRun/sra/{a}/{b}/{c}/{c}.sra'.format( | |
94 a=ena_id[:3], b=ena_id[:6], c=ena_id) | |
95 pickle = pickle_prefix + '.' + ena_id | |
96 | |
97 run_successfully, stdout, stderr = utils.run_command_popen_communicate(command, False, 3600, True) | |
98 | |
99 utils.save_variable_to_pickle(run_successfully, outdir, pickle) | |
81 | 100 |
82 | 101 |
83 @utils.trace_unhandled_exceptions | 102 @utils.trace_unhandled_exceptions |
84 def downloadWithWget(ftp_file_path, outdir, pickle_prefix): | 103 def download_with_wget(ftp_file_path, outdir, pickle_prefix, sra, ena_id): |
85 file_download = ftp_file_path.rsplit('/', 1)[1] | 104 command = ['wget', '--tries=1', '', '-O', ''] |
86 command = ['wget', '--tries=2', ftp_file_path, '-O', os.path.join(outdir, file_download)] | 105 if not sra: |
87 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True) | 106 command[2] = ftp_file_path |
88 | 107 file_download = ftp_file_path.rsplit('/', 1)[1] |
89 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + file_download)) | 108 command[4] = os.path.join(outdir, file_download) |
109 pickle = pickle_prefix + '.' + file_download | |
110 else: | |
111 command[2] = 'ftp://ftp-trace.ncbi.nih.gov/sra/sra-instant/reads/ByRun/sra/{a}/{b}/{c}/{c}.sra'.format( | |
112 a=ena_id[:3], b=ena_id[:6], c=ena_id) | |
113 command[4] = os.path.join(outdir, ena_id + '.sra') | |
114 pickle = pickle_prefix + '.' + ena_id | |
115 run_successfully, stdout, stderr = utils.run_command_popen_communicate(command, False, 3600, True) | |
116 | |
117 utils.save_variable_to_pickle(run_successfully, outdir, pickle) | |
90 | 118 |
91 | 119 |
92 @utils.trace_unhandled_exceptions | 120 @utils.trace_unhandled_exceptions |
93 def downloadWithCurl(ftp_file_path, outdir, pickle_prefix): | 121 def download_with_sra_prefetch(aspera_key, outdir, pickle_prefix, ena_id): |
94 file_download = ftp_file_path.rsplit('/', 1)[1] | 122 command = ['prefetch', '', ena_id] |
95 command = ['curl', '--retry', '2', ftp_file_path, '-O', os.path.join(outdir, file_download)] | 123 |
96 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True) | 124 if aspera_key is not None: |
97 | 125 _, ascp, _ = utils.run_command_popen_communicate(['which', 'ascp'], False, None, False) |
98 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + file_download)) | 126 command[1] = '-a {ascp}|{aspera_key}'.format(ascp=ascp.splitlines()[0], aspera_key=aspera_key) |
99 | 127 |
100 | 128 run_successfully, stdout, stderr = utils.run_command_popen_communicate(command, False, 3600, True) |
101 def getPickleRunSuccessfully(directory, pickle_prefix): | 129 if run_successfully: |
130 _, prefetch_outdir, _ = utils.run_command_popen_communicate(['echo', '$HOME/ncbi/public/sra'], True, None, | |
131 False) | |
132 | |
133 try: | |
134 os.rename(os.path.join(prefetch_outdir.splitlines()[0], ena_id + '.sra'), | |
135 os.path.join(outdir, ena_id + '.sra')) | |
136 except OSError as e: | |
137 print('Found the following error:' | |
138 '{}'.format(e)) | |
139 | |
140 from shutil import copy as shutil_copy | |
141 | |
142 shutil_copy(os.path.join(prefetch_outdir.splitlines()[0], ena_id + '.sra'), | |
143 os.path.join(outdir, ena_id + '.sra')) | |
144 os.remove(os.path.join(prefetch_outdir.splitlines()[0], ena_id + '.sra')) | |
145 | |
146 utils.save_variable_to_pickle(run_successfully, outdir, pickle_prefix + '.' + ena_id) | |
147 | |
148 | |
149 @utils.trace_unhandled_exceptions | |
150 def download_with_curl(ftp_file_path, outdir, pickle_prefix, sra, ena_id): | |
151 command = ['curl', '--retry', '1', '', '-o', ''] | |
152 if not sra: | |
153 command[3] = ftp_file_path | |
154 file_download = ftp_file_path.rsplit('/', 1)[1] | |
155 command[5] = os.path.join(outdir, file_download) | |
156 pickle = pickle_prefix + '.' + file_download | |
157 else: | |
158 command[3] = 'ftp://ftp-trace.ncbi.nih.gov/sra/sra-instant/reads/ByRun/sra/{a}/{b}/{c}/{c}.sra'.format( | |
159 a=ena_id[:3], b=ena_id[:6], c=ena_id) | |
160 command[5] = os.path.join(outdir, ena_id + '.sra') | |
161 pickle = pickle_prefix + '.' + ena_id | |
162 run_successfully, stdout, stderr = utils.run_command_popen_communicate(command, False, 3600, True) | |
163 | |
164 utils.save_variable_to_pickle(run_successfully, outdir, pickle) | |
165 | |
166 | |
167 def get_pickle_run_successfully(directory, pickle_prefix): | |
102 run_successfully = True | 168 run_successfully = True |
103 read_pickle = False | 169 read_pickle = False |
104 | 170 |
105 files = findFiles(directory, pickle_prefix, '.pkl') | 171 files = find_files(directory, pickle_prefix, '.pkl') |
106 if files is not None: | 172 if files is not None: |
107 for file_found in files: | 173 for file_found in files: |
108 if run_successfully: | 174 if run_successfully: |
109 run_successfully = utils.extractVariableFromPickle(file_found) | 175 run_successfully = utils.extract_variable_from_pickle(file_found) |
110 read_pickle = True | 176 read_pickle = True |
111 | 177 |
112 os.remove(file_found) | 178 os.remove(file_found) |
113 | 179 |
114 if not read_pickle: | 180 if not read_pickle: |
117 return run_successfully | 183 return run_successfully |
118 | 184 |
119 | 185 |
120 def curl_installed(): | 186 def curl_installed(): |
121 command = ['which', 'curl'] | 187 command = ['which', 'curl'] |
122 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, False) | 188 run_successfully, stdout, stderr = utils.run_command_popen_communicate(command, False, None, False) |
123 return run_successfully | 189 return run_successfully |
124 | 190 |
125 | 191 |
126 def download(downloadInformation_type, asperaKey, outdir): | 192 def download(download_information_type, aspera_key, outdir, sra, sra_opt, ena_id): |
127 pickle_prefix = 'download' | 193 pickle_prefix = 'download' |
128 | 194 |
129 run_successfully = False | 195 run_successfully = False |
130 | 196 download_sra = False |
131 if asperaKey is not None and downloadInformation_type['aspera'] is not None: | 197 |
132 pool = multiprocessing.Pool(processes=2) | 198 if not sra: |
133 for file_download in downloadInformation_type['aspera']: | 199 if aspera_key is not None and download_information_type['aspera'] is not None: |
134 pool.apply_async(downloadWithAspera, args=(file_download, asperaKey, outdir, pickle_prefix,)) | |
135 pool.close() | |
136 pool.join() | |
137 | |
138 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix) | |
139 | |
140 if downloadInformation_type['ftp'] is not None and not run_successfully: | |
141 if curl_installed(): | |
142 pool = multiprocessing.Pool(processes=2) | 200 pool = multiprocessing.Pool(processes=2) |
143 for file_download in downloadInformation_type['ftp']: | 201 for file_download in download_information_type['aspera']: |
144 pool.apply_async(downloadWithCurl, args=(file_download, outdir, pickle_prefix,)) | 202 pool.apply_async(download_with_aspera, args=(file_download, aspera_key, outdir, pickle_prefix, sra, |
203 ena_id,)) | |
145 pool.close() | 204 pool.close() |
146 pool.join() | 205 pool.join() |
147 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix) | 206 run_successfully = get_pickle_run_successfully(outdir, pickle_prefix) |
207 if not run_successfully and download_information_type['ftp'] is not None: | |
208 if curl_installed(): | |
209 pool = multiprocessing.Pool(processes=2) | |
210 for file_download in download_information_type['ftp']: | |
211 pool.apply_async(download_with_curl, args=(file_download, outdir, pickle_prefix, sra, ena_id,)) | |
212 pool.close() | |
213 pool.join() | |
214 run_successfully = get_pickle_run_successfully(outdir, pickle_prefix) | |
215 if not run_successfully: | |
216 pool = multiprocessing.Pool(processes=2) | |
217 for file_download in download_information_type['ftp']: | |
218 pool.apply_async(download_with_wget, args=(file_download, outdir, pickle_prefix, sra, ena_id,)) | |
219 pool.close() | |
220 pool.join() | |
221 run_successfully = get_pickle_run_successfully(outdir, pickle_prefix) | |
222 | |
223 if not run_successfully and (sra or sra_opt): | |
224 if aspera_key is not None: | |
225 download_with_aspera(None, aspera_key, outdir, pickle_prefix, sra or sra_opt, ena_id) | |
226 run_successfully = get_pickle_run_successfully(outdir, pickle_prefix) | |
148 if not run_successfully: | 227 if not run_successfully: |
149 pool = multiprocessing.Pool(processes=2) | 228 download_with_sra_prefetch(aspera_key, outdir, pickle_prefix, ena_id) |
150 for file_download in downloadInformation_type['ftp']: | 229 run_successfully = get_pickle_run_successfully(outdir, pickle_prefix) |
151 pool.apply_async(downloadWithWget, args=(file_download, outdir, pickle_prefix,)) | 230 if not run_successfully: |
152 pool.close() | 231 if curl_installed(): |
153 pool.join() | 232 download_with_curl(None, outdir, pickle_prefix, sra or sra_opt, ena_id) |
154 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix) | 233 run_successfully = get_pickle_run_successfully(outdir, pickle_prefix) |
155 | 234 if not run_successfully: |
156 return run_successfully | 235 download_with_wget(None, outdir, pickle_prefix, sra or sra_opt, ena_id) |
157 | 236 run_successfully = get_pickle_run_successfully(outdir, pickle_prefix) |
158 | 237 |
159 def downloadFiles(downloadInformation, asperaKey, outdir, download_cram_bam_True): | 238 if run_successfully: |
239 download_sra = True | |
240 | |
241 return run_successfully, download_sra | |
242 | |
243 | |
244 def download_files(download_information, aspera_key, outdir, download_cram_bam_true, sra, sra_opt, ena_id): | |
160 run_successfully = False | 245 run_successfully = False |
161 cram_index_run_successfully = False | 246 cram_index_run_successfully = False |
162 | 247 download_sra = False |
163 if downloadInformation['fastq'] is not None: | 248 |
164 run_successfully = download(downloadInformation['fastq'], asperaKey, outdir) | 249 if download_information['fastq'] is not None: |
250 run_successfully, download_sra = download(download_information['fastq'], aspera_key, outdir, sra, sra_opt, | |
251 ena_id) | |
165 | 252 |
166 if not run_successfully: | 253 if not run_successfully: |
167 if downloadInformation['submitted'] is not None: | 254 if download_information['submitted'] is not None: |
168 if not download_cram_bam_True: | 255 if not download_cram_bam_true: |
169 cram_bam = False | 256 cram_bam = False |
170 for i in downloadInformation['submitted']: | 257 for i in download_information['submitted']: |
171 if downloadInformation['submitted'][i][0].endswith(('.cram', '.bam')): | 258 if download_information['submitted'][i][0].endswith(('.cram', '.bam')): |
172 cram_bam = True | 259 cram_bam = True |
173 break | 260 break |
174 if not cram_bam: | 261 if not cram_bam: |
175 run_successfully = download(downloadInformation['submitted'], asperaKey, outdir) | 262 run_successfully, download_sra = download(download_information['submitted'], aspera_key, outdir, |
176 | 263 False, False, ena_id) |
177 elif download_cram_bam_True: | 264 |
178 run_successfully = download(downloadInformation['submitted'], asperaKey, outdir) | 265 elif download_cram_bam_true: |
179 if run_successfully and downloadInformation['cram_index'] is not None: | 266 run_successfully, download_sra = download(download_information['submitted'], aspera_key, outdir, False, |
180 cram_index_run_successfully = download(downloadInformation['cram_index'], asperaKey, outdir) | 267 False, ena_id) |
181 | 268 if run_successfully and download_information['cram_index'] is not None: |
182 return run_successfully, cram_index_run_successfully | 269 cram_index_run_successfully = download(download_information['cram_index'], aspera_key, outdir, |
183 | 270 False, False, ena_id) |
184 | 271 |
185 def sortAlignment(alignment_file, output_file, sortByName_True, threads): | 272 if not run_successfully and (sra or sra_opt): |
186 outFormat_string = os.path.splitext(output_file)[1][1:].lower() | 273 run_successfully, download_sra = download(download_information['fastq'], aspera_key, outdir, True, sra_opt, |
187 command = ['samtools', 'sort', '-o', output_file, '-O', outFormat_string, '', '-@', str(threads), alignment_file] | 274 ena_id) |
188 if sortByName_True: | 275 |
276 return run_successfully, cram_index_run_successfully, download_sra | |
277 | |
278 | |
279 def sort_alignment(alignment_file, output_file, sort_by_name_true, threads): | |
280 out_format_string = os.path.splitext(output_file)[1][1:].lower() | |
281 command = ['samtools', 'sort', '-o', output_file, '-O', out_format_string, '', '-@', str(threads), alignment_file] | |
282 if sort_by_name_true: | |
189 command[6] = '-n' | 283 command[6] = '-n' |
190 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, True) | 284 run_successfully, stdout, stderr = utils.run_command_popen_communicate(command, False, None, True) |
191 | 285 |
192 if not run_successfully: | 286 if not run_successfully: |
193 output_file = None | 287 output_file = None |
194 | 288 |
195 return run_successfully, output_file | 289 return run_successfully, output_file |
196 | 290 |
197 | 291 |
198 def alignmentToFastq(alignment_file, outdir, threads, pair_end_type): | 292 def alignment_to_fastq(alignment_file, threads, pair_end_type): |
199 fastq_basename = os.path.splitext(alignment_file)[0] | 293 fastq_basename = os.path.splitext(alignment_file)[0] |
200 outfiles = None | 294 outfiles = None |
201 bamFile = fastq_basename + '.temp.bam' | 295 bam_file = fastq_basename + '.temp.bam' |
202 # sort cram | 296 # sort cram |
203 run_successfully, bamFile = sortAlignment(alignment_file, bamFile, True, threads) | 297 run_successfully, bam_file = sort_alignment(alignment_file, bam_file, True, threads) |
204 if run_successfully: | 298 if run_successfully: |
205 command = ['samtools', 'fastq', '', bamFile] | 299 command = ['samtools', 'fastq', '', bam_file] |
206 if pair_end_type.lower() == 'paired': | 300 if pair_end_type.lower() == 'paired': |
207 command[2] = '-1 ' + str(fastq_basename + '_1.fq') + ' -2 ' + str(fastq_basename + '_2.fq') | 301 command[2] = '-1 ' + str(fastq_basename + '_1.fq') + ' -2 ' + str(fastq_basename + '_2.fq') |
208 elif pair_end_type == 'single': | 302 elif pair_end_type == 'single': |
209 command[2] = '-0 ' + str(fastq_basename + '.fq') | 303 command[2] = '-0 ' + str(fastq_basename + '.fq') |
210 | 304 |
211 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, True) | 305 run_successfully, stdout, stderr = utils.run_command_popen_communicate(command, False, None, True) |
212 if run_successfully: | 306 if run_successfully: |
213 if pair_end_type.lower() == 'paired': | 307 if pair_end_type.lower() == 'paired': |
214 outfiles = [str(fastq_basename + '_1.fq'), str(fastq_basename + '_2.fq')] | 308 outfiles = [str(fastq_basename + '_1.fq'), str(fastq_basename + '_2.fq')] |
215 elif pair_end_type.lower() == 'single': | 309 elif pair_end_type.lower() == 'single': |
216 outfiles = [str(fastq_basename + '.fq')] | 310 outfiles = [str(fastq_basename + '.fq')] |
217 | 311 |
218 if os.path.isfile(bamFile): | 312 if bam_file is not None and os.path.isfile(bam_file): |
219 os.remove(bamFile) | 313 os.remove(bam_file) |
220 | 314 |
221 return run_successfully, outfiles | 315 return run_successfully, outfiles |
222 | 316 |
223 | 317 |
224 def formartFastqHeaders(in_fastq_1, in_fastq_2): | 318 def formart_fastq_headers(in_fastq_1, in_fastq_2): |
225 import itertools | |
226 | 319 |
227 out_fastq_1 = in_fastq_1 + '.temp' | 320 out_fastq_1 = in_fastq_1 + '.temp' |
228 out_fastq_2 = in_fastq_2 + '.temp' | 321 out_fastq_2 = in_fastq_2 + '.temp' |
229 writer_in_fastq_1 = open(out_fastq_1, 'wt') | 322 writer_in_fastq_1 = open(out_fastq_1, 'wt') |
230 writer_in_fastq_2 = open(out_fastq_2, 'wt') | 323 writer_in_fastq_2 = open(out_fastq_2, 'wt') |
231 outfiles = [out_fastq_1, out_fastq_2] | 324 outfiles = [out_fastq_1, out_fastq_2] |
232 with open(in_fastq_1, 'rtU') as reader_in_fastq_1, open(in_fastq_2, 'rtU') as reader_in_fastq_2: | 325 with open(in_fastq_1, 'rtU') as reader_in_fastq_1, open(in_fastq_2, 'rtU') as reader_in_fastq_2: |
233 plus_line = True | 326 plus_line = True |
234 quality_line = True | 327 quality_line = True |
235 number_reads = 0 | 328 number_reads = 0 |
236 for in_1, in_2 in itertools.izip(reader_in_fastq_1, reader_in_fastq_2): | 329 for in_1, in_2 in zip(reader_in_fastq_1, reader_in_fastq_2): |
237 if len(in_1) > 0: | 330 if len(in_1) > 0: |
238 in_1 = in_1.splitlines()[0] | 331 in_1 = in_1.splitlines()[0] |
239 in_2 = in_2.splitlines()[0] | 332 in_2 = in_2.splitlines()[0] |
240 if in_1.startswith('@') and plus_line and quality_line: | 333 if in_1.startswith('@') and plus_line and quality_line: |
241 if in_1 != in_2: | 334 if in_1 != in_2: |
267 writer_in_fastq_2.write(in_2) | 360 writer_in_fastq_2.write(in_2) |
268 return number_reads, outfiles | 361 return number_reads, outfiles |
269 | 362 |
270 | 363 |
271 @utils.trace_unhandled_exceptions | 364 @utils.trace_unhandled_exceptions |
272 def gzipFiles(file_2_compress, pickle_prefix, outdir): | 365 def gzip_files(file_2_compress, pickle_prefix, outdir): |
273 out_file = None | |
274 if file_2_compress.endswith('.temp'): | 366 if file_2_compress.endswith('.temp'): |
275 out_file = os.path.splitext(file_2_compress)[0] | 367 out_file = os.path.splitext(file_2_compress)[0] |
276 else: | 368 else: |
277 out_file = file_2_compress | 369 out_file = file_2_compress |
278 | 370 |
279 command = ['gzip', '--stdout', '--best', file_2_compress, '>', str(out_file + '.gz')] | 371 command = ['gzip', '--stdout', '--best', file_2_compress, '>', str(out_file + '.gz')] |
280 run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, True, None, True) | 372 run_successfully, stdout, stderr = utils.run_command_popen_communicate(command, True, None, True) |
281 if run_successfully: | 373 if run_successfully: |
282 os.remove(file_2_compress) | 374 os.remove(file_2_compress) |
283 | 375 |
284 utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + os.path.basename(file_2_compress))) | 376 utils.save_variable_to_pickle(run_successfully, outdir, |
285 | 377 str(pickle_prefix + '.' + os.path.basename(file_2_compress))) |
286 | 378 |
287 def findFiles(directory, prefix, suffix): | 379 |
380 def find_files(directory, prefix, suffix): | |
288 list_files_found = [] | 381 list_files_found = [] |
289 files = [f for f in os.listdir(directory) if not f.startswith('.') and os.path.isfile(os.path.join(directory, f))] | 382 files = [f for f in os.listdir(directory) if not f.startswith('.') and os.path.isfile(os.path.join(directory, f))] |
290 for file_found in files: | 383 for file_found in files: |
291 if file_found.startswith(prefix) and file_found.endswith(suffix): | 384 if file_found.startswith(prefix) and file_found.endswith(suffix): |
292 file_path = os.path.join(directory, file_found) | 385 file_path = os.path.join(directory, file_found) |
296 list_files_found = None | 389 list_files_found = None |
297 | 390 |
298 return list_files_found | 391 return list_files_found |
299 | 392 |
300 | 393 |
301 def compressFiles(fastq_files, outdir, threads): | 394 def compress_files(fastq_files, outdir, threads): |
302 pickle_prefix = 'compress' | 395 pickle_prefix = 'compress' |
303 compressed_fastq_files = None | 396 compressed_fastq_files = None |
304 | 397 |
305 pool = multiprocessing.Pool(processes=threads) | 398 pool = multiprocessing.Pool(processes=threads) |
306 for fastq in fastq_files: | 399 for fastq in fastq_files: |
307 pool.apply_async(gzipFiles, args=(fastq, pickle_prefix, outdir,)) | 400 pool.apply_async(gzip_files, args=(fastq, pickle_prefix, outdir,)) |
308 pool.close() | 401 pool.close() |
309 pool.join() | 402 pool.join() |
310 | 403 |
311 run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix) | 404 run_successfully = get_pickle_run_successfully(outdir, pickle_prefix) |
312 if run_successfully: | 405 if run_successfully: |
313 compressed_fastq_files = findFiles(outdir, '', '.gz') | 406 compressed_fastq_files = find_files(outdir, '', '.gz') |
314 | 407 |
315 return run_successfully, compressed_fastq_files | 408 return run_successfully, compressed_fastq_files |
316 | 409 |
317 | 410 |
318 def bamCram_2_fastq(alignment_file, outdir, threads, pair_end_type): | 411 def bam_cram_2_fastq(alignment_file, outdir, threads, pair_end_type): |
319 run_successfully, fastq_files = alignmentToFastq(alignment_file, outdir, threads, pair_end_type) | 412 run_successfully, fastq_files = alignment_to_fastq(alignment_file, threads, pair_end_type) |
320 if run_successfully: | 413 if run_successfully: |
321 if pair_end_type.lower() == 'paired': | 414 if pair_end_type.lower() == 'paired': |
322 number_reads, fastq_files = formartFastqHeaders(fastq_files[0], fastq_files[1]) | 415 number_reads, fastq_files = formart_fastq_headers(fastq_files[0], fastq_files[1]) |
323 | 416 |
324 run_successfully, fastq_files = compressFiles(fastq_files, outdir, threads) | 417 run_successfully, fastq_files = compress_files(fastq_files, outdir, threads) |
325 | 418 |
326 return run_successfully, fastq_files | 419 return run_successfully, fastq_files |
327 | 420 |
328 | 421 |
329 def check_correct_links(downloadInformation): | 422 def check_correct_links(download_information): |
330 for i in downloadInformation: | 423 for i in download_information: |
331 if downloadInformation[i] is not None: | 424 if download_information[i] is not None: |
332 if downloadInformation[i]['aspera'] is not None: | 425 if download_information[i]['aspera'] is not None: |
333 for j in range(0, len(downloadInformation[i]['aspera'])): | 426 for j in range(0, len(download_information[i]['aspera'])): |
334 if downloadInformation[i]['aspera'][j].startswith('fasp.sra.ebi.ac.uk/'): | 427 if download_information[i]['aspera'][j].startswith('fasp.sra.ebi.ac.uk/'): |
335 downloadInformation[i]['aspera'][j] = downloadInformation[i]['aspera'][j].replace('fasp.sra.ebi.ac.uk/', 'fasp.sra.ebi.ac.uk:/', 1) | 428 download_information[i]['aspera'][j] = download_information[i]['aspera'][j].replace( |
336 if downloadInformation[i]['ftp'] is not None: | 429 'fasp.sra.ebi.ac.uk/', 'fasp.sra.ebi.ac.uk:/', 1) |
337 for j in range(0, len(downloadInformation[i]['ftp'])): | 430 if download_information[i]['ftp'] is not None: |
338 if '#' in downloadInformation[i]['ftp'][j]: | 431 for j in range(0, len(download_information[i]['ftp'])): |
339 downloadInformation[i]['ftp'][j] = downloadInformation[i]['ftp'][j].replace('#', '%23') | 432 if '#' in download_information[i]['ftp'][j]: |
340 return downloadInformation | 433 download_information[i]['ftp'][j] = download_information[i]['ftp'][j].replace('#', '%23') |
434 return download_information | |
341 | 435 |
342 | 436 |
343 def get_fastq_files(download_dir, cram_index_run_successfully, threads, download_paired_type): | 437 def get_fastq_files(download_dir, cram_index_run_successfully, threads, download_paired_type): |
344 run_successfully = False | 438 run_successfully = False |
345 downloaded_files = findFiles(download_dir, '', '') | 439 downloaded_files = find_files(download_dir, '', '') |
346 if cram_index_run_successfully: | 440 if cram_index_run_successfully: |
347 cram_file = None | 441 cram_file = None |
348 for i in downloaded_files: | 442 for i in downloaded_files: |
349 if i.endswith('.cram'): | 443 if i.endswith('.cram'): |
350 cram_file = i | 444 cram_file = i |
351 run_successfully, downloaded_files = bamCram_2_fastq(cram_file, download_dir, threads, download_paired_type) | 445 run_successfully, downloaded_files = bam_cram_2_fastq(cram_file, download_dir, threads, download_paired_type) |
352 else: | 446 else: |
353 if len(downloaded_files) > 0: | 447 if downloaded_files is not None and len(downloaded_files) > 0: |
354 run_successfully = True | 448 run_successfully = True |
355 | 449 |
356 return run_successfully, downloaded_files | 450 return run_successfully, downloaded_files |
357 | 451 |
358 | 452 |
371 list_new_files[i] = os.path.join(outdir, new_name + '_2.fq.gz') | 465 list_new_files[i] = os.path.join(outdir, new_name + '_2.fq.gz') |
372 else: | 466 else: |
373 if not temp_name.endswith(('_R1_001.f', '_R2_001.f')): | 467 if not temp_name.endswith(('_R1_001.f', '_R2_001.f')): |
374 list_new_files[i] = os.path.join(outdir, new_name + '.fq.gz') | 468 list_new_files[i] = os.path.join(outdir, new_name + '.fq.gz') |
375 if temp_name.endswith(('_1.f', '_2.f')): | 469 if temp_name.endswith(('_1.f', '_2.f')): |
376 print 'WARNING: possible single-end file conflict with pair-end (' + list_files[i] + ')!' | 470 print('WARNING: possible single-end file conflict with pair-end (' + list_files[i] + ')!') |
377 | 471 |
378 if len(list_new_files) == 2 and download_paired_type.lower() == 'paired': | 472 if len(list_new_files) == 2 and download_paired_type.lower() == 'paired': |
379 run_successfully = True | 473 run_successfully = True |
380 elif len(list_new_files) == 1 and download_paired_type.lower() == 'single': | 474 elif len(list_new_files) == 1 and download_paired_type.lower() == 'single': |
381 run_successfully = True | 475 run_successfully = True |
386 if i not in list_new_files: | 480 if i not in list_new_files: |
387 if os.path.isfile(list_files[i]): | 481 if os.path.isfile(list_files[i]): |
388 os.remove(list_files[i]) | 482 os.remove(list_files[i]) |
389 else: | 483 else: |
390 os.rename(list_files[i], list_new_files[i]) | 484 os.rename(list_files[i], list_new_files[i]) |
391 list_new_files = list_new_files.values() | 485 list_new_files = list(list_new_files.values()) |
392 except Exception as e: | 486 except Exception as e: |
393 print e | 487 print(e) |
394 run_successfully = False | 488 run_successfully = False |
395 | 489 |
396 if not run_successfully: | 490 if not run_successfully: |
397 list_new_files = None | 491 list_new_files = None |
398 | 492 |
399 return run_successfully, list_new_files | 493 return run_successfully, list_new_files |
400 | 494 |
401 | 495 |
496 # @utils.trace_unhandled_exceptions | |
497 def rename_header_sra(fastq): | |
498 run_successfully = False | |
499 try: | |
500 command = ['gawk', '\'{if(NR%4==1) $0=gensub(/\./, \"/\", 2); print}\'', fastq, '|', 'gzip', '-1', '>', | |
501 str(fastq + '.gz')] | |
502 print('Running: ' + str(' '.join(command))) | |
503 return_code = subprocess.call(' '.join(command), shell=True) | |
504 if return_code == 0: | |
505 run_successfully = True | |
506 else: | |
507 print('Something went wrong with command: {command}'.format(command=' '.join(command))) | |
508 except Exception as e: | |
509 print(e) | |
510 | |
511 return run_successfully | |
512 | |
513 | |
514 def sra_2_fastq(download_dir, ena_id): | |
515 command = ['fastq-dump', '-I', '-O', download_dir, '--split-files', '{download_dir}{ena_id}.sra'.format( | |
516 download_dir=download_dir, ena_id=ena_id)] | |
517 run_successfully, stdout, stderr = utils.run_command_popen_communicate(command, False, 3600, True) | |
518 if run_successfully: | |
519 files = [os.path.join(download_dir, f) for f in os.listdir(download_dir) | |
520 if not f.startswith('.') and os.path.isfile(os.path.join(download_dir, f)) and f.endswith('.fastq')] | |
521 | |
522 pool = multiprocessing.Pool(processes=2) | |
523 results = [] | |
524 p = pool.map_async(rename_header_sra, files, callback=results.extend) | |
525 p.wait() | |
526 | |
527 run_successfully = all(results) | |
528 | |
529 return run_successfully | |
530 | |
531 | |
402 download_timer = functools.partial(utils.timer, name='Download module') | 532 download_timer = functools.partial(utils.timer, name='Download module') |
403 | 533 |
404 | 534 |
405 @download_timer | 535 @download_timer |
406 def runDownload(ena_id, download_paired_type, asperaKey, outdir, download_cram_bam_True, threads, instrument_platform): | 536 def run_download(ena_id, download_paired_type, aspera_key, outdir, download_cram_bam_true, threads, instrument_platform, |
537 sra, sra_opt): | |
407 download_dir = os.path.join(outdir, 'download', '') | 538 download_dir = os.path.join(outdir, 'download', '') |
408 utils.removeDirectory(download_dir) | 539 utils.remove_directory(download_dir) |
409 os.mkdir(download_dir) | 540 os.mkdir(download_dir) |
410 | 541 |
411 run_successfully = False | 542 run_successfully = False |
412 downloaded_files = None | 543 downloaded_files = None |
413 sequencingInformation = {'run_accession': None, 'instrument_platform': None, 'instrument_model': None, 'library_layout': None, 'library_source': None, 'extra_run_accession': None, 'nominal_length': None, 'read_count': None, 'base_count': None, 'date_download': None} | 544 sequencing_information = {'run_accession': None, 'instrument_platform': None, 'instrument_model': None, |
414 | 545 'library_layout': None, 'library_source': None, 'extra_run_accession': None, |
415 readRunInfo = getReadRunInfo(ena_id) | 546 'nominal_length': None, 'read_count': None, 'base_count': None, |
416 if readRunInfo is not None: | 547 'date_download': time.strftime("%Y-%m-%d")} |
417 downloadInformation = getDownloadInformation(readRunInfo) | 548 |
418 downloadInformation = check_correct_links(downloadInformation) | 549 read_run_info = get_read_run_info(ena_id) |
419 sequencingInformation = getSequencingInformation(readRunInfo) | 550 if read_run_info is not None: |
420 sequencingInformation['date_download'] = time.strftime("%Y-%m-%d") | 551 download_information = get_download_information(read_run_info) |
421 | 552 download_information = check_correct_links(download_information) |
422 if instrument_platform.lower() == 'all' or (sequencingInformation['instrument_platform'] is not None and sequencingInformation['instrument_platform'].lower() == instrument_platform.lower()): | 553 sequencing_information = get_sequencing_information(read_run_info) |
423 if download_paired_type.lower() == 'both' or (sequencingInformation['library_layout'] is not None and sequencingInformation['library_layout'].lower() == download_paired_type.lower()): | 554 |
424 run_successfully, cram_index_run_successfully = downloadFiles(downloadInformation, asperaKey, download_dir, download_cram_bam_True) | 555 if instrument_platform.lower() == 'all' or \ |
556 (sequencing_information['instrument_platform'] is not None and | |
557 sequencing_information['instrument_platform'].lower() == instrument_platform.lower()): | |
558 if download_paired_type.lower() == 'both' or \ | |
559 (sequencing_information['library_layout'] is not None and | |
560 sequencing_information['library_layout'].lower() == download_paired_type.lower()): | |
561 run_successfully, cram_index_run_successfully, download_sra = download_files(download_information, | |
562 aspera_key, download_dir, | |
563 download_cram_bam_true, | |
564 sra, sra_opt, ena_id) | |
565 if download_sra: | |
566 run_successfully = sra_2_fastq(download_dir, ena_id) | |
425 if run_successfully: | 567 if run_successfully: |
426 run_successfully, downloaded_files = get_fastq_files(download_dir, cram_index_run_successfully, threads, sequencingInformation['library_layout']) | 568 run_successfully, downloaded_files = get_fastq_files(download_dir, cram_index_run_successfully, |
569 threads, | |
570 sequencing_information['library_layout']) | |
427 if run_successfully and downloaded_files is not None: | 571 if run_successfully and downloaded_files is not None: |
428 run_successfully, downloaded_files = rename_move_files(downloaded_files, sequencingInformation['run_accession'], outdir, sequencingInformation['library_layout']) | 572 run_successfully, downloaded_files = rename_move_files(downloaded_files, |
429 | 573 sequencing_information['run_accession'], |
430 utils.removeDirectory(download_dir) | 574 outdir, |
431 | 575 sequencing_information['library_layout']) |
432 return run_successfully, downloaded_files, sequencingInformation | 576 else: |
577 if sra or sra_opt: | |
578 run_successfully, cram_index_run_successfully, download_sra = download_files({'fastq': None, | |
579 'submitted': None, | |
580 'cram_index': None}, | |
581 aspera_key, download_dir, | |
582 download_cram_bam_true, sra, | |
583 sra_opt, ena_id) | |
584 if download_sra: | |
585 run_successfully = sra_2_fastq(download_dir, ena_id) | |
586 if run_successfully: | |
587 run_successfully, downloaded_files = get_fastq_files(download_dir, cram_index_run_successfully, threads, | |
588 'paired') | |
589 if not run_successfully: | |
590 run_successfully, downloaded_files = get_fastq_files(download_dir, cram_index_run_successfully, | |
591 threads, 'single') | |
592 if run_successfully and downloaded_files is not None: | |
593 run_successfully, downloaded_files = rename_move_files(downloaded_files, ena_id, outdir, 'paired') | |
594 if not run_successfully: | |
595 run_successfully, downloaded_files = rename_move_files(downloaded_files, ena_id, outdir, 'single') | |
596 | |
597 utils.remove_directory(download_dir) | |
598 | |
599 return run_successfully, downloaded_files, sequencing_information |