Mercurial > repos > petr-novak > repeatrxplorer
view seqclust @ 0:1d1b9e1b2e2f draft
Uploaded
author | petr-novak |
---|---|
date | Thu, 19 Dec 2019 10:24:45 -0500 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python3 ''' TAndem REpeat ANalyzer ''' import os import sys import shutil import subprocess import argparse from argparse import RawTextHelpFormatter import logging import shlex import multiprocessing # config must be loaded before seqtools,... import config import re from lib import seqtools, graphtools, utils, assembly_tools from lib import r2py REQUIRED_VERSION = (3, 4) if sys.version_info < REQUIRED_VERSION: raise Exception("\n\npython 3.4 or higher is required!\n") # append path to louvain clustering and other binaries os.environ['PATH'] = "{}:{}:{}".format(config.BINARIES, config.LOUVAIN, os.environ['PATH']) LOGGER = logging.getLogger(__name__) def get_version(path, tarean_mode): try: branch = subprocess.check_output("git rev-parse --abbrev-ref HEAD", shell=True, cwd=path).decode('ascii').strip() shorthash = subprocess.check_output( "git log --pretty=format:'%h' -n 1 ", shell=True, cwd=path).decode('ascii').strip() revcount = len(subprocess.check_output( "git log --oneline", shell=True, cwd=path).decode('ascii').split()) try: tag = subprocess.check_output("git describe --tags --abbrev=0", cwd=path, shell=True).decode('ascii').strip() except subprocess.CalledProcessError: tag = " " version_string = ( "-------------------------------------" "-------------------------------------\n" "PIPELINE VERSION : " "{branch}-{tag}-{revcount}({shorthash})\n\n" "PROTEIN DATABASE VERSION : {PD}\n" " md5 checksum : {PDmd5}\n\n" "DNA DATABASE VERSION : {DD}\n" " md5 checksum : {DDmd5}\n" "-------------------------------------" "-------------------------------------\n").format( branch=branch, shorthash=shorthash, revcount=revcount, tag=tag, PD=os.path.basename(config.PROTEIN_DATABASE), PDmd5=utils.md5checksum(config.PROTEIN_DATABASE + ".psq", fail_if_missing = not tarean_mode), DD=os.path.basename(config.DNA_DATABASE), DDmd5=utils.md5checksum(config.DNA_DATABASE + ".nsq")) except subprocess.CalledProcessError: version_string = "version of pipeline not available!" LOGGER.info(version_string) return version_string def valid_database(database_file): with open(database_file, 'r', encoding='ascii') as f: for i in f: if i[0] == ">": if not re.match(">.+#.+/*", i): # TODO - make edits to correct fomating of custom database??? return False return True def add_databases(databases, custom_databases_dir, dbtype='nucl'): '''custom databases are copied to directory tree and blast database is created using makeblastdb ''' databases_ok = [] print(databases) for db_path, db_name in databases: db_destination = "{}/{}".format(custom_databases_dir, db_name) shutil.copyfile(db_path, db_destination) if not valid_database(db_destination): raise ValueError(( "\n------------------------------------------------------------\n" "Custom database is not valid!\n" "Custom database of repeats are DNA sequences in fasta format.\n" "The required format for IDs in a custom library is : \n" " '>reapeatname#class/subclass'\n" "Reformat the database and try again!\n" "-------------------------------------------------------------\n\n" )) cmd = "makeblastdb -in {0} -out {0} -dbtype {1}".format(db_destination, dbtype) print(cmd) args = shlex.split(cmd) print(args) if subprocess.check_call(args, stderr=sys.stdout): Warning("makeblastdb on {} failed".format(db_name)) else: databases_ok.append([db_destination, "custom_db_" + db_name]) if len(databases_ok) == 0: return None else: return databases_ok def meminfo(): ''' detect physical memory and memory usage''' info = {} required_fields = [ 'MemTotal:', 'MemFree:', 'Cached:', 'SwapCached:', 'Buffers:' ] with open('/proc/meminfo', 'r') as f: for i in f: a = i.split() if a[0] in required_fields: info[a[0]] = int(a[1]) return info def dict2lists(d): ''' convert dict to nested list use the funsction to pass dictionary to R function ''' values = list(d.values()) keys = list(d.keys()) return [values, keys] def show_object(obj): ''' helper function for printing all public atributes, does not print callebme atributes e.i. methods.. ''' s = "Configuration--------------->\n" for i in dir(obj): # do not show private if i[:2] != "__": value = getattr(obj, i) if not callable(value): s += "{} : {}\n".format(i, value) s += "<---------------configuration\n" return s class DataInfo(): ''' stores information state of clustering and data ''' def __init__(self, args, paths): LOGGER.info("getting information about input sequences") self.args = args self.working_directory = args.output_dir self.input_sequences = args.sequences.name self.number_of_input_sequences = seqtools.SequenceSet.fasta_length( self.input_sequences) self.paired = args.paired self.prefix_length = args.prefix_length self.physical_memory = meminfo()['MemTotal:'] self.edges_max = config.EMAX # set max memory if args.max_memory: self.max_memory = args.max_memory else: self.max_memory = meminfo()["MemTotal:"] # modify initial setup if number of sequences is low if args.automatic_filtering: config.NUMBER_OF_SEQUENCES_FOR_PRERUN = config.NUMBER_OF_SEQUENCES_FOR_PRERUN_WITH_FILTERING if self.number_of_input_sequences < config.NUMBER_OF_SEQUENCES_FOR_PRERUN: config.NUMBER_OF_SEQUENCES_FOR_PRERUN = self.number_of_input_sequences # is number of input sequences sufficient if self.number_of_input_sequences < config.MINIMUM_NUMBER_OF_INPUT_SEQUENCES: raise WrongInputDataError( "provide more sequences for clustering, minumum {} is .required".format( config.MINIMUM_NUMBER_OF_INPUT_SEQUENCES)) # these atribudes will be set later after clustering is done self.max_annotated_clusters = None self.max_annotated_superclusters = None # the atributes will be set after prerun is performed self.prerun_ecount = None self.prerun_ecount_corrected = None self.sample_size = None self.max_number_reads_for_clustering = None self.mincln = None self.number_of_omitted_reads = 0 LOGGER.info("sampling sequences for prerun analysis") sample = seqtools.SequenceSet( source=self.input_sequences, sample_size=config.NUMBER_OF_SEQUENCES_FOR_PRERUN, paired=self.paired, filename=paths.sample_db, fasta=paths.sample_fasta, rename=True) sample.makeblastdb(legacy=args.options.legacy_database, lastdb=args.options.lastdb) # preliminary clustering self.prerun_vcount = len(sample) # line count self._prerun(sample, paths) # adjust size of chunks: if self.number_of_reads_for_clustering < config.CHUNK_SIZE * 30: config.CHUNK_SIZE = round(self.number_of_reads_for_clustering / 40) def _prerun(self, sample, paths): '''Preliminary characterization sequences using clustering on small dataset - stored as sample ''' sample.make_chunks(chunk_size=1000) sample.create_hitsort(options=self.args.options) sample_hitsort = graphtools.Graph(source=sample.hitsort, paired=self.paired, seqids=sample.keys()) sample_hitsort.save_indexed_graph() sample_hitsort.louvain_clustering(merge_threshold=0.2) sample_hitsort.export_cls(path=paths.prerun_cls_file) sample.annotate( config.DNA_DATABASE, annotation_name="dna_database", directory=paths.prerun, params=self.args.options.annotation_search_params.blastn) selected_tarean_contigs = [] ecount_corrected = sample_hitsort.ecount vcount_corrected = sample_hitsort.vcount if self.args.automatic_filtering: prerun_cluster_info = sample_hitsort.export_clusters_files_multiple( min_size=10, directory=paths.prerun_clusters, sequences=sample, tRNA_database_path=config.TRNA_DATABASE, satellite_model_path=config.SATELLITE_MODEL) # check of prerun contain clusters with large number of edges # these sequences can be used for filtering for cl in prerun_cluster_info: print(cl.ecount, cl.vcount, sample_hitsort.ecount, cl.tandem_rank) if (cl.tandem_rank in config.TANDEM_RANKS[0:2] and cl.ecount / sample_hitsort.ecount > config.FILTER_MIN_PROP_THRESHOLD and cl.vcount > config.FILTER_MIN_SIZE_THRESHOLD): selected_tarean_contigs.append(cl.tarean_contig_file) ecount_corrected -= cl.ecount vcount_corrected -= cl.vcount if selected_tarean_contigs: with open(paths.filter_sequences_file, 'w') as out: for fname in selected_tarean_contigs: with open(fname, 'r') as f: out.write(f.read()) self.sequence_fiter = paths.filter_sequences_file else: self.sequence_fiter = None self.prerun_ecount = sample_hitsort.ecount self.prerun_ecount_corrected = ecount_corrected self.prerun_vcount_corrected = vcount_corrected self.max_number_reads_for_clustering = round(( ((self.edges_max * self.max_memory) / self.prerun_ecount_corrected * self.prerun_vcount**2)**(0.5)) / 2) if self.max_number_reads_for_clustering >= self.number_of_input_sequences: self.sample_size = 0 else: self.sample_size = self.max_number_reads_for_clustering n1 = self.sample_size if self.sample_size != 0 else self.number_of_input_sequences n2 = self.args.sample if self.args.sample != 0 else self.number_of_input_sequences self.number_of_reads_for_clustering = min(n1, n2) # minlcn is set either based on mincl or value specified in config, # whatever is higher self.mincln = int(self.number_of_reads_for_clustering * self.args.mincl / 100) if self.mincln < config.MINIMUM_NUMBER_OF_READS_IN_CLUSTER: self.mincln = config.MINIMUM_NUMBER_OF_READS_IN_CLUSTER def __str__(self): s = "Data info------------------->\n" for i in dir(self): # do not show private if i[:2] != "__": value = getattr(self, i) if not callable(value): s += "{} : {}\n".format(i, value) s += "<----------------------Data info\n" return s class DataFiles(object): ''' stores location of data files and create directories ... atributes are: - individual directories - individual files - list of files or directories directories are created if does not exist ''' def __init__(self, working_dir, subdirs, files): LOGGER.info("creating directory structure") self.working_dir = working_dir # add and create directories paths for i in subdirs: d = os.path.join(self.working_dir, subdirs[i]) os.makedirs(d, exist_ok=True) setattr(self, i, d) setattr(self, i + "__relative", subdirs[i]) # add file paths for i in files: d = os.path.join(self.working_dir, files[i]) setattr(self, i, d) setattr(self, i + "__relative", files[i]) def __str__(self): s = "" for i in dir(self): # do not show private if i[:2] != "__": value = getattr(self, i) if not callable(value): s += "{} : {}\n".format(i, value) return s def as_list(self): ''' convert attr and vaues to list - suitable for passing values to R functions ''' values = list() keys = list() for i in dir(self): # do not show private if i[:2] != "__": value = getattr(self, i) if not callable(value): values.append(value) keys.append(i) return [values, keys] def cleanup(self, paths): ''' will remove unnecessary files from working directory ''' for i in paths: fn = getattr(self, i) if os.path.exists(fn): if os.path.isdir(fn): shutil.rmtree(fn, ignore_errors=False) else: os.remove(fn) class WrongInputDataError(Exception): '''Custom exception for wrong input ''' def __init__(self, arg): super(WrongInputDataError, self).__init__(arg) self.msg = arg class Range(): ''' This class is used to check float range in argparse ''' def __init__(self, start, end): self.start = start self.end = end def __eq__(self, other): return self.start <= other <= self.end def __str__(self): return "float range {}..{}".format(self.start, self.end) def __repr__(self): return "float range {}..{}".format(self.start, self.end) class DirectoryType(object): ''' this class is similar to argparse.FileType for mode 'w' creates and check the access to the directory for mode 'r' check the presence of the dictory and accesibility ''' def __init__(self, mode='r'): self._mode = mode def __call__(self, string): if self._mode == 'w': try: os.makedirs(string, exist_ok=True) except FileExistsError: raise argparse.ArgumentTypeError( "Cannot create directory, '{}' is a file".format(string)) if os.access(string, os.W_OK): return string else: raise argparse.ArgumentTypeError( "Directory '{}' is not writable".format(string)) if self._mode == 'r': if not os.path.isdir(string): raise argparse.ArgumentTypeError( "'{}' is not a directory".format(string)) if os.access(string, os.R_OK): return string else: raise argparse.ArgumentTypeError( "Directory '{}' is not readable".format(string)) def get_cmdline_args(): '''seqclust command line parser''' description = """RepeatExplorer: Repetitive sequence discovery and clasification from NGS data """ # arguments parsing parser = argparse.ArgumentParser(description=description, formatter_class=RawTextHelpFormatter) parser.add_argument('-p', '--paired', action='store_true', default=False) parser.add_argument('-A', '--automatic_filtering', action='store_true', default=False) parser.add_argument( '-t', '--tarean_mode', action='store_true', default=False, help="analyze only tandem reapeats without additional classification") parser.add_argument('sequences', type=argparse.FileType('r')) parser.add_argument('-l', '--logfile', type=argparse.FileType('w'), default=None, help='log file, logging goes to stdout if not defines') parser.add_argument('-m', '--mincl', type=float, choices=[Range(0.0, 100.0)], default=0.01) parser.add_argument( '-M', '--merge_threshold', type=float, choices=[0, Range(0.1, 1)], default=0, help= "threshold for mate-pair based cluster merging, default 0 - no merging") parser.add_argument( '-o', '--min_lcov', type=float, choices=[Range(30.0, 80.0)], default=55, help= "minimal overlap coverage - relative to longer sequence length, default 55") parser.add_argument('-c', '--cpu', type=int, default=int(os.environ.get('TAREAN_CPU', 0)), help="number of cpu to use, if 0 use max available") parser.add_argument( '-s', '--sample', type=int, default=0, help="use only sample of input data[by default max reads is used") parser.add_argument( '-P', '--prefix_length', type=int, default=0, help=("If you wish to keep part of the sequences name,\n" " enter the number of characters which should be \n" "kept (1-10) instead of zero. Use this setting if\n" " you are doing comparative analysis")) parser.add_argument('-v', '--output_dir', type=DirectoryType('w'), default="clustering_results") parser.add_argument( '-r', '--max_memory', type=int, default=int(os.environ.get('TAREAN_MAX_MEM', 0)), help=("Maximal amount of available RAM in kB if not set\n" "clustering tries to use whole available RAM")) parser.add_argument( '-d', '--database', default=None, help="fasta file with database for annotation and name of database", nargs=2, action='append') parser.add_argument( "-C", "--cleanup", default=False, action="store_true", help="remove unncessary large files from working directory") parser.add_argument( "-k", "--keep_names", default=False, action="store_true", help="keep sequence names, by default sequences are renamed") parser.add_argument( '-a', '--assembly_min', default=5, type=int, choices=[2,3,4,5], help=('Assembly is performed on individual clusters, by default \n' 'clusters with size less then 5 are not assembled. If you \n' 'want need assembly of smaller cluster set *assmbly_min* \n' 'accordingly\n') ) parser.add_argument('-tax', '--taxon', default=config.PROTEIN_DATABASE_DEFAULT, choices=list(config.PROTEIN_DATABASE_OPTIONS.keys()), help="Select taxon and protein database version" ) parser.add_argument( '-opt', '--options', default="ILLUMINA", choices=['ILLUMINA','ILLUMINA_DUST_OFF', 'ILLUMINA_SHORT', 'OXFORD_NANOPORE']) parser.add_argument( '-D', '--domain_search', default="BLASTX_W3", choices=['BLASTX_W2', 'BLASTX_W3', 'DIAMOND'], help= ('Detection of protein domains can be performed by either blastx or\n' ' diamond" program. options are:\n' ' BLASTX_W2 - blastx with word size 2 (slowest, the most sesitive)\n' ' BLASTX_W3 - blastx with word size 3 (default)\n' ' DIAMOND - diamond program (significantly faster, less sensitive)\n' 'To use this option diamond program must be installed in your PATH')) args = parser.parse_args() # covert option string to namedtuple of options args.options = getattr(config, args.options) # set protein database args.options = args.options._replace( annotation_search_params= args.options.annotation_search_params._replace(blastx=getattr( config, args.domain_search))) return args def main(): ''' Perform graph based clustering ''' # argument parsing: args = get_cmdline_args() config.ARGS = args logfile = args.logfile.name if args.logfile else None logging.basicConfig( filename=logfile, format='\n%(asctime)s - %(name)s - %(levelname)s -\n%(message)s\n', level=logging.INFO) config.PROTEIN_DATABASE, config.CLASSIFICATION_HIERARCHY = config.PROTEIN_DATABASE_OPTIONS[ args.taxon] # number of CPU to use pipeline_version_info = get_version(config.MAIN_DIR, tarean_mode = args.tarean_mode) config.PROC = args.cpu if args.cpu != 0 else multiprocessing.cpu_count() # TODO add kmer range specification to config - based on the technology r2py.create_connection() try: reporting = r2py.R(config.RSOURCE_reporting, verbose=True) create_annotation = r2py.R(config.RSOURCE_create_annotation, verbose=True) LOGGER.info(args) paths = DataFiles(working_dir=args.output_dir, subdirs=config.DIRECTORY_TREE, files=config.FILES) # files to be included in output for src, dest in config.INCLUDE: shutil.copy(src, os.path.join(paths.working_dir, dest)) # geting information about data run_info = DataInfo(args, paths) LOGGER.info(run_info) LOGGER.info(show_object(config)) # load all sequences or sample sequences = seqtools.SequenceSet( source=run_info.input_sequences, sample_size=run_info.number_of_reads_for_clustering, paired=run_info.paired, filename=paths.sequences_db, fasta=paths.sequences_fasta, prefix_length=run_info.prefix_length, rename=not run_info.args.keep_names) if run_info.sequence_fiter: n = sequences.remove_sequences_using_filter( run_info.sequence_fiter, keep_proportion=config.FILTER_PROPORTION_OF_KEPT, omitted_sequences_file=paths.filter_omitted, kept_sequences_file=paths.filter_kept ) run_info.number_of_omitted_reads = n # add custom databases if provided if args.database: config.CUSTOM_DNA_DATABASE = add_databases( args.database, custom_databases_dir=paths.custom_databases) sequences.makeblastdb(legacy=args.options.legacy_database, lastdb=args.options.lastdb) LOGGER.info("chunksize: {}".format(config.CHUNK_SIZE)) sequences.make_chunks(chunk_size=config.CHUNK_SIZE) sequences.create_hitsort(output=paths.hitsort, options=args.options) hitsort = graphtools.Graph(filename=paths.hitsort_db, source=paths.hitsort, paired=run_info.paired, seqids=sequences.keys()) LOGGER.info('hitsort with {} reads and {} edges loaded.'.format( hitsort.vcount, hitsort.ecount)) hitsort.save_indexed_graph() LOGGER.info('hitsort index created.') hitsort.louvain_clustering(merge_threshold=args.merge_threshold, cleanup=args.cleanup) hitsort.export_cls(path=paths.cls_file) hitsort.adjust_cluster_size(config.FILTER_PROPORTION_OF_KEPT, sequences.ids_kept) sequences.annotate(config.DNA_DATABASE, annotation_name="dna_database", directory=paths.blastn, params=args.options.annotation_search_params.blastn) if config.CUSTOM_DNA_DATABASE: LOGGER.info('annotating with custom database') for db, db_name in config.CUSTOM_DNA_DATABASE: sequences.annotate( db, annotation_name=db_name, directory=paths.blastn, params=args.options.annotation_search_params.blastn) if not args.tarean_mode: # additional analyses - full RE run # this must be finished befor creating clusters_info sequences.annotate( config.PROTEIN_DATABASE, annotation_name="protein_database", directory=paths.blastx, params=args.options.annotation_search_params.blastx) ## annotating using customa databasesreplace LOGGER.info('creating cluster graphs') clusters_info = hitsort.export_clusters_files_multiple( min_size=run_info.mincln, directory=paths.clusters, sequences=sequences, tRNA_database_path=config.TRNA_DATABASE, satellite_model_path=config.SATELLITE_MODEL) if not args.tarean_mode: LOGGER.info("assembling..") assembly_tools.assembly(sequences, hitsort, clusters_info, assembly_dir=paths.assembly, contigs_file=paths.contigs, min_size_of_cluster_for_assembly=args.assembly_min) LOGGER.info("detecting LTR in assembly..") for i in clusters_info: i.detect_ltr(config.TRNA_DATABASE) run_info.max_annotated_clusters = max([i.index for i in clusters_info]) run_info.max_annotated_superclusters = max([i.supercluster for i in clusters_info]) # make reports cluster_listing = [i.listing() for i in clusters_info] # make path relative to paths.cluster_info utils.save_as_table(cluster_listing, paths.clusters_info) # creates table cluster_info in hitsort database graphtools.Cluster.add_cluster_table_to_database(cluster_listing, paths.hitsort_db) # export files for consensus sequences, one for each ranks consensus_files = [] for i in config.TANDEM_RANKS: consensus_files.append(utils.export_tandem_consensus( clusters_info, path=paths.TR_consensus_fasta.format(i), rank=i)) if not args.tarean_mode: LOGGER.info("Creating report for superclusters") create_annotation.create_all_superclusters_report( max_supercluster=run_info.max_annotated_superclusters, paths=paths.as_list(), libdir=paths.libdir, superclusters_dir=paths.superclusters, seqdb=paths.sequences_db, hitsortdb=paths.hitsort_db, classification_hierarchy_file=config.CLASSIFICATION_HIERARCHY, HTML_LINKS=dict2lists(config.HTML_LINKS)) LOGGER.info("Creating report for individual clusters") for cluster in clusters_info: create_annotation.create_cluster_report( cluster.index, seqdb=paths.sequences_db, hitsortdb=paths.hitsort_db, classification_hierarchy_file= config.CLASSIFICATION_HIERARCHY, HTML_LINKS=dict2lists(config.HTML_LINKS)) LOGGER.info("Creating main html report") reporting.create_main_reports( paths=paths.as_list(), N_clustering=run_info.number_of_reads_for_clustering, N_input=run_info.number_of_input_sequences, N_omit=run_info.number_of_omitted_reads, merge_threshold=args.merge_threshold, paired=run_info.paired, consensus_files=consensus_files, custom_db=bool(config.CUSTOM_DNA_DATABASE), tarean_mode=args.tarean_mode, HTML_LINKS=dict2lists(config.HTML_LINKS), pipeline_version_info=pipeline_version_info, max_memory=run_info.max_memory, max_number_reads_for_clustering=run_info.max_number_reads_for_clustering, mincln=run_info.mincln ) LOGGER.info("Html report reports created") except: r2py.shutdown(config.RSERVE_PORT) raise finally: if args.cleanup: paths.cleanup(config.FILES_TO_DISCARD_AT_CLEANUP) else: LOGGER.info("copy databases to working directory") shutil.copy(paths.sequences_db, paths.working_dir) shutil.copy(paths.hitsort_db, paths.working_dir) # copy log file inside working directory if logfile: shutil.copyfile(logfile, paths.logfile) if __name__ == "__main__": main() # some error handling here: