diff scripts/ReMatCh/modules/download.py @ 0:965517909457 draft

planemo upload commit 15239f1674081ab51ab8dd75a9a40cf1bfaa93e8
author cstrittmatter
date Wed, 22 Jan 2020 08:41:44 -0500
parents
children 0cbed1c0a762
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scripts/ReMatCh/modules/download.py	Wed Jan 22 08:41:44 2020 -0500
@@ -0,0 +1,432 @@
+import utils
+import os.path
+import multiprocessing
+import sys
+import functools
+import time
+
+
+def getReadRunInfo(ena_id):
+    import urllib
+
+    url = 'http://www.ebi.ac.uk/ena/data/warehouse/filereport?accession=' + ena_id + '&result=read_run'
+
+    readRunInfo = None
+    try:
+        url = urllib.urlopen(url)
+        readRunInfo = url.read().splitlines()
+        if len(readRunInfo) <= 1:
+            readRunInfo = None
+    except Exception as error:
+        print error
+
+    return readRunInfo
+
+
+def getDownloadInformation(readRunInfo):
+    header_line = readRunInfo[0].split('\t')
+    info_line = readRunInfo[1].split('\t')
+
+    downloadInformation = {'fastq': None, 'submitted': None, 'cram_index': None}
+    download_types = ['aspera', 'ftp']
+
+    for i in range(0, len(header_line)):
+        header = header_line[i].lower().rsplit('_', 1)
+        if header[0] in downloadInformation.keys():
+            if header[1] in download_types:
+                if len(info_line[i]) > 0:
+                    files_path = info_line[i].split(';')
+                    if len(files_path) > 2:
+                        print 'WARNING: Were found more files than expected in ' + header[1] + ' download links!'
+                    if downloadInformation[header[0]] is None:
+                        downloadInformation[header[0]] = {}
+                    downloadInformation[header[0]][header[1]] = files_path
+
+    return downloadInformation
+
+
+def getSequencingInformation(readRunInfo):
+    header_line = readRunInfo[0].split('\t')
+    info_line = readRunInfo[1].split('\t')
+
+    sequencingInformation = {'run_accession': None, 'instrument_platform': None, 'instrument_model': None, 'library_layout': None, 'library_source': None, 'extra_run_accession': None, 'nominal_length': None, 'read_count': None, 'base_count': None}
+
+    for i in range(0, len(header_line)):
+        header = header_line[i].lower()
+        if header in sequencingInformation.keys():
+            if len(info_line[i]) > 0:
+                sequencingInformation[header] = info_line[i]
+
+    if len(readRunInfo) > 2:
+        extra_run_accession = []
+        for i in range(2, len(readRunInfo)):
+            info = readRunInfo[i].split('\t')
+            for j in range(0, len(header_line)):
+                header = header_line[j].lower()
+                if header == 'run_accession':
+                    if len(info[j]) > 0:
+                        extra_run_accession.append(info[j])
+        if len(extra_run_accession) >= 1:
+            sequencingInformation['extra_run_accession'] = ','.join(extra_run_accession)
+
+    return sequencingInformation
+
+
+@utils.trace_unhandled_exceptions
+def downloadWithAspera(aspera_file_path, asperaKey, outdir, pickle_prefix):
+    command = ['ascp', '-QT', '-l', '300m', '-P33001', '-i', asperaKey, str('era-fasp@' + aspera_file_path), outdir]
+    run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True)
+
+    utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + aspera_file_path.rsplit('/', 1)[1]))
+
+
+@utils.trace_unhandled_exceptions
+def downloadWithWget(ftp_file_path, outdir, pickle_prefix):
+    file_download = ftp_file_path.rsplit('/', 1)[1]
+    command = ['wget', '--tries=2', ftp_file_path, '-O', os.path.join(outdir, file_download)]
+    run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True)
+
+    utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + file_download))
+
+
+@utils.trace_unhandled_exceptions
+def downloadWithCurl(ftp_file_path, outdir, pickle_prefix):
+    file_download = ftp_file_path.rsplit('/', 1)[1]
+    command = ['curl', '--retry', '2', ftp_file_path, '-O', os.path.join(outdir, file_download)]
+    run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, 3600, True)
+
+    utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + file_download))
+
+
+def getPickleRunSuccessfully(directory, pickle_prefix):
+    run_successfully = True
+    read_pickle = False
+
+    files = findFiles(directory, pickle_prefix, '.pkl')
+    if files is not None:
+        for file_found in files:
+            if run_successfully:
+                run_successfully = utils.extractVariableFromPickle(file_found)
+                read_pickle = True
+
+            os.remove(file_found)
+
+    if not read_pickle:
+        run_successfully = False
+
+    return run_successfully
+
+
+def curl_installed():
+    command = ['which', 'curl']
+    run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, False)
+    return run_successfully
+
+
+def download(downloadInformation_type, asperaKey, outdir):
+    pickle_prefix = 'download'
+
+    run_successfully = False
+
+    if asperaKey is not None and downloadInformation_type['aspera'] is not None:
+        pool = multiprocessing.Pool(processes=2)
+        for file_download in downloadInformation_type['aspera']:
+            pool.apply_async(downloadWithAspera, args=(file_download, asperaKey, outdir, pickle_prefix,))
+        pool.close()
+        pool.join()
+
+        run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix)
+
+    if downloadInformation_type['ftp'] is not None and not run_successfully:
+        if curl_installed():
+            pool = multiprocessing.Pool(processes=2)
+            for file_download in downloadInformation_type['ftp']:
+                pool.apply_async(downloadWithCurl, args=(file_download, outdir, pickle_prefix,))
+            pool.close()
+            pool.join()
+            run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix)
+        if not run_successfully:
+            pool = multiprocessing.Pool(processes=2)
+            for file_download in downloadInformation_type['ftp']:
+                pool.apply_async(downloadWithWget, args=(file_download, outdir, pickle_prefix,))
+            pool.close()
+            pool.join()
+            run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix)
+
+    return run_successfully
+
+
+def downloadFiles(downloadInformation, asperaKey, outdir, download_cram_bam_True):
+    run_successfully = False
+    cram_index_run_successfully = False
+
+    if downloadInformation['fastq'] is not None:
+        run_successfully = download(downloadInformation['fastq'], asperaKey, outdir)
+
+    if not run_successfully:
+        if downloadInformation['submitted'] is not None:
+            if not download_cram_bam_True:
+                cram_bam = False
+                for i in downloadInformation['submitted']:
+                    if downloadInformation['submitted'][i][0].endswith(('.cram', '.bam')):
+                        cram_bam = True
+                        break
+                if not cram_bam:
+                    run_successfully = download(downloadInformation['submitted'], asperaKey, outdir)
+
+            elif download_cram_bam_True:
+                run_successfully = download(downloadInformation['submitted'], asperaKey, outdir)
+                if run_successfully and downloadInformation['cram_index'] is not None:
+                    cram_index_run_successfully = download(downloadInformation['cram_index'], asperaKey, outdir)
+
+    return run_successfully, cram_index_run_successfully
+
+
+def sortAlignment(alignment_file, output_file, sortByName_True, threads):
+    outFormat_string = os.path.splitext(output_file)[1][1:].lower()
+    command = ['samtools', 'sort', '-o', output_file, '-O', outFormat_string, '', '-@', str(threads), alignment_file]
+    if sortByName_True:
+        command[6] = '-n'
+    run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, True)
+
+    if not run_successfully:
+        output_file = None
+
+    return run_successfully, output_file
+
+
+def alignmentToFastq(alignment_file, outdir, threads, pair_end_type):
+    fastq_basename = os.path.splitext(alignment_file)[0]
+    outfiles = None
+    bamFile = fastq_basename + '.temp.bam'
+    # sort cram
+    run_successfully, bamFile = sortAlignment(alignment_file, bamFile, True, threads)
+    if run_successfully:
+        command = ['samtools', 'fastq', '', bamFile]
+        if pair_end_type.lower() == 'paired':
+            command[2] = '-1 ' + str(fastq_basename + '_1.fq') + ' -2 ' + str(fastq_basename + '_2.fq')
+        elif pair_end_type == 'single':
+            command[2] = '-0 ' + str(fastq_basename + '.fq')
+
+        run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, False, None, True)
+        if run_successfully:
+            if pair_end_type.lower() == 'paired':
+                outfiles = [str(fastq_basename + '_1.fq'), str(fastq_basename + '_2.fq')]
+            elif pair_end_type.lower() == 'single':
+                outfiles = [str(fastq_basename + '.fq')]
+
+    if os.path.isfile(bamFile):
+        os.remove(bamFile)
+
+    return run_successfully, outfiles
+
+
+def formartFastqHeaders(in_fastq_1, in_fastq_2):
+    import itertools
+
+    out_fastq_1 = in_fastq_1 + '.temp'
+    out_fastq_2 = in_fastq_2 + '.temp'
+    writer_in_fastq_1 = open(out_fastq_1, 'wt')
+    writer_in_fastq_2 = open(out_fastq_2, 'wt')
+    outfiles = [out_fastq_1, out_fastq_2]
+    with open(in_fastq_1, 'rtU') as reader_in_fastq_1, open(in_fastq_2, 'rtU') as reader_in_fastq_2:
+        plus_line = True
+        quality_line = True
+        number_reads = 0
+        for in_1, in_2 in itertools.izip(reader_in_fastq_1, reader_in_fastq_2):
+            if len(in_1) > 0:
+                in_1 = in_1.splitlines()[0]
+                in_2 = in_2.splitlines()[0]
+                if in_1.startswith('@') and plus_line and quality_line:
+                    if in_1 != in_2:
+                        sys.exit('The PE fastq files are not aligned properly!')
+                    in_1 += '/1' + '\n'
+                    in_2 += '/2' + '\n'
+                    writer_in_fastq_1.write(in_1)
+                    writer_in_fastq_2.write(in_2)
+                    plus_line = False
+                    quality_line = False
+                elif in_1.startswith('+') and not plus_line:
+                    in_1 += '\n'
+                    writer_in_fastq_1.write(in_1)
+                    writer_in_fastq_2.write(in_1)
+                    plus_line = True
+                elif plus_line and not quality_line:
+                    in_1 += '\n'
+                    in_2 += '\n'
+                    writer_in_fastq_1.write(in_1)
+                    writer_in_fastq_2.write(in_2)
+                    writer_in_fastq_1.flush()
+                    writer_in_fastq_2.flush()
+                    number_reads += 1
+                    quality_line = True
+                else:
+                    in_1 += '\n'
+                    in_2 += '\n'
+                    writer_in_fastq_1.write(in_1)
+                    writer_in_fastq_2.write(in_2)
+    return number_reads, outfiles
+
+
+@utils.trace_unhandled_exceptions
+def gzipFiles(file_2_compress, pickle_prefix, outdir):
+    out_file = None
+    if file_2_compress.endswith('.temp'):
+        out_file = os.path.splitext(file_2_compress)[0]
+    else:
+        out_file = file_2_compress
+
+    command = ['gzip', '--stdout', '--best', file_2_compress, '>', str(out_file + '.gz')]
+    run_successfully, stdout, stderr = utils.runCommandPopenCommunicate(command, True, None, True)
+    if run_successfully:
+        os.remove(file_2_compress)
+
+    utils.saveVariableToPickle(run_successfully, outdir, str(pickle_prefix + '.' + os.path.basename(file_2_compress)))
+
+
+def findFiles(directory, prefix, suffix):
+    list_files_found = []
+    files = [f for f in os.listdir(directory) if not f.startswith('.') and os.path.isfile(os.path.join(directory, f))]
+    for file_found in files:
+        if file_found.startswith(prefix) and file_found.endswith(suffix):
+            file_path = os.path.join(directory, file_found)
+            list_files_found.append(file_path)
+
+    if len(list_files_found) == 0:
+        list_files_found = None
+
+    return list_files_found
+
+
+def compressFiles(fastq_files, outdir, threads):
+    pickle_prefix = 'compress'
+    compressed_fastq_files = None
+
+    pool = multiprocessing.Pool(processes=threads)
+    for fastq in fastq_files:
+        pool.apply_async(gzipFiles, args=(fastq, pickle_prefix, outdir,))
+    pool.close()
+    pool.join()
+
+    run_successfully = getPickleRunSuccessfully(outdir, pickle_prefix)
+    if run_successfully:
+        compressed_fastq_files = findFiles(outdir, '', '.gz')
+
+    return run_successfully, compressed_fastq_files
+
+
+def bamCram_2_fastq(alignment_file, outdir, threads, pair_end_type):
+    run_successfully, fastq_files = alignmentToFastq(alignment_file, outdir, threads, pair_end_type)
+    if run_successfully:
+        if pair_end_type.lower() == 'paired':
+            number_reads, fastq_files = formartFastqHeaders(fastq_files[0], fastq_files[1])
+
+        run_successfully, fastq_files = compressFiles(fastq_files, outdir, threads)
+
+    return run_successfully, fastq_files
+
+
+def check_correct_links(downloadInformation):
+    for i in downloadInformation:
+        if downloadInformation[i] is not None:
+            if downloadInformation[i]['aspera'] is not None:
+                for j in range(0, len(downloadInformation[i]['aspera'])):
+                    if downloadInformation[i]['aspera'][j].startswith('fasp.sra.ebi.ac.uk/'):
+                        downloadInformation[i]['aspera'][j] = downloadInformation[i]['aspera'][j].replace('fasp.sra.ebi.ac.uk/', 'fasp.sra.ebi.ac.uk:/', 1)
+            if downloadInformation[i]['ftp'] is not None:
+                for j in range(0, len(downloadInformation[i]['ftp'])):
+                    if '#' in downloadInformation[i]['ftp'][j]:
+                        downloadInformation[i]['ftp'][j] = downloadInformation[i]['ftp'][j].replace('#', '%23')
+    return downloadInformation
+
+
+def get_fastq_files(download_dir, cram_index_run_successfully, threads, download_paired_type):
+    run_successfully = False
+    downloaded_files = findFiles(download_dir, '', '')
+    if cram_index_run_successfully:
+        cram_file = None
+        for i in downloaded_files:
+            if i.endswith('.cram'):
+                cram_file = i
+        run_successfully, downloaded_files = bamCram_2_fastq(cram_file, download_dir, threads, download_paired_type)
+    else:
+        if len(downloaded_files) > 0:
+            run_successfully = True
+
+    return run_successfully, downloaded_files
+
+
+def rename_move_files(list_files, new_name, outdir, download_paired_type):
+    list_new_files = {}
+    run_successfully = False
+
+    for i in range(0, len(list_files)):
+        temp_name = utils.rchop(os.path.basename(list_files[i]), 'astq.gz')
+        if len(temp_name) == len(os.path.basename(list_files[i])):
+            temp_name = utils.rchop(os.path.basename(list_files[i]), 'q.gz')
+        if download_paired_type.lower() == 'paired':
+            if temp_name.endswith(('_R1_001.f', '_1.f')):
+                list_new_files[i] = os.path.join(outdir, new_name + '_1.fq.gz')
+            elif temp_name.endswith(('_R2_001.f', '_2.f')):
+                list_new_files[i] = os.path.join(outdir, new_name + '_2.fq.gz')
+        else:
+            if not temp_name.endswith(('_R1_001.f', '_R2_001.f')):
+                list_new_files[i] = os.path.join(outdir, new_name + '.fq.gz')
+                if temp_name.endswith(('_1.f', '_2.f')):
+                    print 'WARNING: possible single-end file conflict with pair-end (' + list_files[i] + ')!'
+
+    if len(list_new_files) == 2 and download_paired_type.lower() == 'paired':
+        run_successfully = True
+    elif len(list_new_files) == 1 and download_paired_type.lower() == 'single':
+        run_successfully = True
+
+    if run_successfully:
+        try:
+            for i in range(0, len(list_files)):
+                if i not in list_new_files:
+                    if os.path.isfile(list_files[i]):
+                        os.remove(list_files[i])
+                else:
+                    os.rename(list_files[i], list_new_files[i])
+            list_new_files = list_new_files.values()
+        except Exception as e:
+            print e
+            run_successfully = False
+
+    if not run_successfully:
+        list_new_files = None
+
+    return run_successfully, list_new_files
+
+
+download_timer = functools.partial(utils.timer, name='Download module')
+
+
+@download_timer
+def runDownload(ena_id, download_paired_type, asperaKey, outdir, download_cram_bam_True, threads, instrument_platform):
+    download_dir = os.path.join(outdir, 'download', '')
+    utils.removeDirectory(download_dir)
+    os.mkdir(download_dir)
+
+    run_successfully = False
+    downloaded_files = None
+    sequencingInformation = {'run_accession': None, 'instrument_platform': None, 'instrument_model': None, 'library_layout': None, 'library_source': None, 'extra_run_accession': None, 'nominal_length': None, 'read_count': None, 'base_count': None, 'date_download': None}
+
+    readRunInfo = getReadRunInfo(ena_id)
+    if readRunInfo is not None:
+        downloadInformation = getDownloadInformation(readRunInfo)
+        downloadInformation = check_correct_links(downloadInformation)
+        sequencingInformation = getSequencingInformation(readRunInfo)
+        sequencingInformation['date_download'] = time.strftime("%Y-%m-%d")
+
+        if instrument_platform.lower() == 'all' or (sequencingInformation['instrument_platform'] is not None and sequencingInformation['instrument_platform'].lower() == instrument_platform.lower()):
+            if download_paired_type.lower() == 'both' or (sequencingInformation['library_layout'] is not None and sequencingInformation['library_layout'].lower() == download_paired_type.lower()):
+                run_successfully, cram_index_run_successfully = downloadFiles(downloadInformation, asperaKey, download_dir, download_cram_bam_True)
+                if run_successfully:
+                    run_successfully, downloaded_files = get_fastq_files(download_dir, cram_index_run_successfully, threads, sequencingInformation['library_layout'])
+                if run_successfully and downloaded_files is not None:
+                    run_successfully, downloaded_files = rename_move_files(downloaded_files, sequencingInformation['run_accession'], outdir, sequencingInformation['library_layout'])
+
+    utils.removeDirectory(download_dir)
+
+    return run_successfully, downloaded_files, sequencingInformation