| 0 | 1 import argparse | 
|  | 2 import logging | 
|  | 3 import os | 
|  | 4 import re | 
|  | 5 import shutil | 
|  | 6 import sys | 
|  | 7 import tarfile | 
|  | 8 import tempfile | 
|  | 9 import zipfile | 
|  | 10 | 
|  | 11 import magic | 
|  | 12 | 
|  | 13 imgt_file_regex = re.compile("^\d+_[^P]") | 
|  | 14 | 
|  | 15 | 
|  | 16 def sniff_imgt_type(input_file): | 
| 2 | 17     file_type = magic.from_file(input_file) | 
| 0 | 18     logging.debug("File type of {0} is {1}".format(input_file, file_type)) | 
| 2 | 19     return file_type | 
| 0 | 20 | 
|  | 21 | 
|  | 22 def unpack_imgt_zip(input_file, output_dir): | 
|  | 23     imgt_type = sniff_imgt_type(input_file) | 
| 2 | 24     if imgt_type.startswith("Zip"): | 
| 0 | 25         with zipfile.ZipFile(input_file) as inf: | 
|  | 26             inf.extractall(output_dir) | 
| 2 | 27     elif imgt_type.startswith("XZ"): | 
| 0 | 28         with tarfile.open(input_file) as inf: | 
|  | 29             inf.extractall(output_dir) | 
|  | 30     else: | 
|  | 31         raise IOError("Unsuppported file type: {0}".format(imgt_type)) | 
|  | 32     logging.debug("Extracted {0} to {1}".format(input_file, output_dir)) | 
|  | 33     check = os.listdir(output_dir) | 
|  | 34     if len(check) == 1: | 
|  | 35         check = os.path.join(output_dir, check[0]) | 
|  | 36         if os.path.isdir(check): | 
|  | 37             logging.info("{0} is an older IMGT zip, removing extra dir".format(input_file)) | 
|  | 38             files = os.listdir(check) | 
|  | 39             for file in files: | 
|  | 40                 new_file = os.path.join(output_dir, file) | 
|  | 41                 file = os.path.join(check, file) | 
|  | 42                 shutil.move(file, new_file) | 
|  | 43             shutil.rmtree(check) | 
|  | 44 | 
|  | 45 | 
| 2 | 46 def filter_tabular_file(old_file, new_file, column, regex): | 
|  | 47     logging.debug("Filtering {0} with {1}".format(old_file, regex.pattern)) | 
| 0 | 48     first = True | 
|  | 49     total = 0 | 
|  | 50     remain = 0 | 
|  | 51     with open(old_file, 'r') as of, open(new_file, 'w') as nf: | 
|  | 52         column_index = -1 | 
|  | 53         for line in of: | 
|  | 54             splt = line.split("\t") | 
|  | 55             if first: | 
|  | 56                 column_index = splt.index(column) | 
|  | 57                 first = False | 
|  | 58                 nf.write(line) | 
|  | 59                 continue | 
|  | 60             total += 1 | 
| 2 | 61             if len(splt) >= column_index and regex.search(splt[column_index]): | 
| 0 | 62                 remain += 1 | 
|  | 63                 nf.write(line) | 
|  | 64     return total, remain | 
|  | 65 | 
|  | 66 | 
|  | 67 def all_same_in_list(l): | 
|  | 68     return all(l[0] == x for x in l[1:]) | 
|  | 69 | 
|  | 70 | 
| 2 | 71 def filter_imgt_dir(imgt_dir, loci): | 
|  | 72     logging.info("Filtering {0} with {1}".format(imgt_dir, loci)) | 
| 0 | 73     imgt_files = [f for f in os.listdir(imgt_dir) if imgt_file_regex.match(f)] | 
|  | 74     tmp_file = os.path.join(imgt_dir, "tmp.txt") | 
|  | 75     totals = [] | 
|  | 76     remains = [] | 
| 2 | 77     loci_regex = re.compile("|".join(loci)) | 
| 0 | 78     for imgt_file in imgt_files: | 
|  | 79         imgt_file = os.path.join(imgt_dir, imgt_file) | 
| 2 | 80         total, remain = filter_tabular_file(imgt_file, tmp_file, "V-GENE and allele", loci_regex) | 
| 0 | 81         totals.append(total) | 
|  | 82         remains.append(remain) | 
|  | 83         logging.debug("{0} rows, {1} after filtering".format(total, remain)) | 
|  | 84         shutil.move(tmp_file, imgt_file) | 
|  | 85     if not (all_same_in_list(totals) and all_same_in_list(remains)): | 
| 2 | 86         logging.warning("Not all files had the same number of sequences remaining for {0}: {1}".format(imgt_dir, remains)) | 
| 0 | 87     return totals[0], remains[0] | 
|  | 88 | 
|  | 89 | 
|  | 90 def make_new_xz_file(input_dir, output_file): | 
|  | 91     logging.info("Creating new IMGT zip for {0} at {1}".format(input_dir, output_file)) | 
|  | 92     imgt_files = [f for f in os.listdir(input_dir)] | 
|  | 93     with tarfile.open(output_file, 'w:xz') as out: | 
|  | 94         for imgt_file in imgt_files: | 
|  | 95             logging.debug("Writing {0} to new IMGT zip".format(imgt_file)) | 
|  | 96             imgt_file = os.path.join(input_dir, imgt_file) | 
|  | 97             out.add(imgt_file, arcname=os.path.basename(imgt_file)) | 
|  | 98 | 
|  | 99 | 
|  | 100 def main(): | 
|  | 101     parser = argparse.ArgumentParser() | 
| 2 | 102     parser.add_argument("-i", "--input", help="The input IMGT file", required=True) | 
|  | 103     parser.add_argument("-l", "--loci", help="The Loci to filter on", required=True) | 
|  | 104     parser.add_argument("-o", "--output", help="The output file for the new IMGT zip with just the filtered sequences", required=True) | 
| 0 | 105 | 
|  | 106     logging.basicConfig(filename="./log.html", level=logging.DEBUG, format="%(asctime)s: %(message)s <br />", | 
|  | 107                         datefmt='%Y/%m/%d %H:%M:%S') | 
|  | 108     logging.getLogger().addHandler(logging.StreamHandler(sys.stdout)) | 
|  | 109     logging.info("Started IMGT locus split") | 
|  | 110 | 
|  | 111     args = parser.parse_args() | 
|  | 112     input_file = args.input | 
| 2 | 113     loci = args.loci.split(",") | 
|  | 114     output_file = args.output | 
| 0 | 115 | 
|  | 116     logging.debug("All Parameters:") | 
|  | 117     logging.debug("Input: {0}".format(input_file)) | 
| 2 | 118     logging.debug("Loci: {0}".format(loci)) | 
|  | 119     logging.debug("Output: {0}".format(output_file)) | 
| 0 | 120 | 
| 2 | 121     if len(loci) == 0: | 
| 0 | 122         raise Exception("No locus selected, nothing to do") | 
|  | 123 | 
|  | 124     work_dir = tempfile.mkdtemp() | 
|  | 125     original_files_dir = os.path.join(work_dir, "original") | 
|  | 126     os.mkdir(original_files_dir) | 
|  | 127 | 
|  | 128     unpack_imgt_zip(input_file, original_files_dir) | 
|  | 129 | 
| 2 | 130     total, remain = filter_imgt_dir(original_files_dir, loci) | 
|  | 131     logging.info("{0}\t{1}".format(total, remain)) | 
| 0 | 132 | 
| 2 | 133     make_new_xz_file(original_files_dir, output_file) | 
| 0 | 134 | 
|  | 135 | 
|  | 136 if __name__ == "__main__": | 
|  | 137     main() |