Mercurial > repos > abims-sbr > gffalign
view GFFalign.py @ 0:294f5ba28746 draft
"planemo upload for repository https://github.com/abims-sbr/tools-abims/tools/gffalign commit d8aa0e49353e78e5fd772498a1fcf591e2744f99"
author | abims-sbr |
---|---|
date | Mon, 30 Nov 2020 18:20:46 +0000 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/python3 import argparse import os import gffutils from Bio import SeqIO class GeneComp: # This cass aims to discover the position of the genes in the alignment # and to extract informations on the genes characteristics def __init__(self, ccgene, qstart, qend, otgene, dstart, dend, extract, tol, gattr, output=None): # d* / ot* are the db result, q* / cc the query # gene are the gene from maf-tab file # start, end are the coordinates ''' self.otbeg = int(otgene.start) - dstart self.otend = int(otgene.end) - dstart self.ccbeg = int(ccgene.start) - qstart self.ccend = int(ccgene.end) - qstart self.tol = tol # how much tolerance in nucleotides self.ccname = ccgene.chrom self.otname = otgene.chrom self.q_outstart = qstart+self.otbeg self.q_outend = qstart+self.otend self.genelist = [] self.qstart = qstart if extract: self.fastafile = extract[0] self.fastaout = extract[1] self.output = output self.out = "" self.gattr = f"New_annotation='{gattr}'" def is_equal(self): if (self.otbeg - self.tol) <= self.ccbeg <= (self.otbeg + self.tol) and (self.otend - self.tol) <= self.ccend <= (self.otend + self.tol): self.out = f"{self.ccname}\tprediction\tgene\t{self.q_outstart}\t{self.q_outend}\t.\t{self.qstart}\t.\tID={self.ccname};{self.gattr};Note=confirmed" return self.out def is_shorter(self): if (self.otbeg + self.tol) < self.ccbeg and (self.otend - self.tol) > self.ccend: self.out = f"{self.ccname}\tprediction\tgene\t{self.q_outstart}\t{self.q_outend}\t.\t{self.qstart}\t.\tID={self.ccname};{self.gattr};Note=shorter" return self.out elif (self.otbeg - self.tol) <= self.ccbeg <= (self.otbeg + self.tol) and (self.otend - self.tol) > self.ccend: self.out = f"{self.ccname}\tprediction\tgene\t{self.q_outstart}\t{self.q_outend}\t.\t{self.qstart}\t.\tID={self.ccname};{self.gattr};Note=shorter_right" return self.out elif (self.otbeg + self.tol) < self.ccbeg and (self.otend - self.tol) <= self.ccend <= (self.otend + self.tol): self.out = f"{self.ccname}\tprediction\tgene\t{self.q_outstart}\t{self.q_outend}\t.\t{self.qstart}\t.\tID={self.ccname};{self.gattr};Note=shorter_left" return self.out def is_longer(self): if (self.otbeg - self.tol) > self.ccbeg and (self.otend + self.tol) < self.ccend: self.out = f"{self.ccname}\tprediction\tgene\t{self.q_outstart}\t{self.q_outend}\t.\t{self.qstart}\t.\tID={self.ccname};{self.gattr};Note=longer" return self.out elif (self.otbeg - self.tol) <= self.ccbeg <= (self.otbeg + self.tol) and (self.otend + self.tol) < self.ccend: self.out = f"{self.ccname}\tprediction\tgene\t{self.q_outstart}\t{self.q_outend}\t.\t{self.qstart}\t.\tID={self.ccname};{self.gattr};Note=longer_right" return self.out elif (self.otbeg - self.tol) > self.ccbeg and (self.otend - self.tol) <= self.ccend <= (self.otend + self.tol): self.out = f"{self.ccname}\tprediction\tgene\t{self.q_outstart}\t{self.q_outend}\t.\t{self.qstart}\t.\tID={self.ccname};{self.gattr};Note=longer_left" return self.out def is_offset(self): if (self.otbeg + self.tol) < self.ccbeg < (self.otend - self.tol) and (self.otend + self.tol) < self.ccend: self.out = f"{self.ccname}\tprediction\tgene\t{self.q_outstart}\t{self.q_outend}\t.\t{self.qstart}\t.\tID={self.ccname};{self.gattr};Note=offset_right" return self.out if (self.otbeg - self.tol) > self.ccbeg and (self.otbeg - self.tol) < self.ccend < self.otend + self.tol: self.out = f"{self.ccname}\tprediction\tgene\t{self.q_outstart}\t{self.q_outend}\t.\t{self.qstart}\t.\tID={self.ccname};{self.gattr};Note=offset_left" return self.out def is_different(self): if self.otbeg - self.tol > self.ccend or self.otend + self.tol < self.otbeg: self.out = f"{self.ccname}\tprediction\tgene\t{self.q_outstart}\t{self.q_outend}\t.\t{self.qstart}\t.\tID={self.ccname};{self.gattr};Note=new" return self.out def extract_fasta(self): with open(self.fastaout,"a") as fileout: with open(self.fastafile) as filefasta: for record in SeqIO.parse(filefasta,"fasta"): if record.id == self.ccname: fileout.write(f">{self.ccname}_{self.q_outstart}:{self.q_outend}\n{record.seq[self.q_outstart:self.q_outend]}\n") # def extract_stout(self): # with open(self.fastafile) as filefasta: # for record in SeqIO.parse(filefasta,"fasta"): # if record.id == self.ccname: # print(f">{self.ccname}_{self.q_outstart}:{self.q_outend}\n{record.seq[self.q_outstart:self.q_outend]}") #def __str__(self): # return f"{self.ccname}\t{self.q_outstart}\t{self.q_outend}" def fout(self): try: if self.__class__.fout.called: with open(self.output,"a") as fileout: fileout.write(self.out+"\n") except AttributeError: try: if os.path.isfile(self.output): os.remove(self.output) with open(self.output,"a") as fileout: #fileout.write("##gff-version 3\n") fileout.write(f"##gff-version 3\n{self.out}\n") self.__class__.fout.called = True self.__class__.fout(self) except TypeError: pass def diff_gene(query_genes, target_genes, dstart, dend, qstart, qend, query_db): # are the two genes the same? if args.extract: extract = args.extract else: extract = None if args.tollerance: tol = args.tollerance else: tol = 30 for otgene in target_genes: # gattr is a variable created to store the the annotation of the target gene. # It will be use to suggest a functional annotation gattr = str(otgene).split("\t")[-1] for ccgene in query_genes: algene = GeneComp(ccgene, qstart, qend, otgene, dstart, dend, extract, tol, gattr) if "new" in args.verbosity or "all" in args.verbosity: if algene.is_different(): algene_out = algene.is_different() algene_name = algene_out.split("\t")[0] algene_start = int(algene_out.split("\t")[3]) algene_end = int(algene_out.split("\t")[4]) #print(al_name, al_start, al_end) #print(list(target_db.region(region=(al_name, al_start, al_end), completely_within=True))) if not list(query_db.region(region=(algene_name, algene_start, algene_end), completely_within=False)): if args.output: algene.output=args.output algene.fout() else: print(algene.is_different()) if args.extract: algene.extract_fasta() if "shorter" in args.verbosity or "all" in args.verbosity: if algene.is_shorter(): if args.output: algene.output=args.output algene.fout() else: print(algene.is_shorter()) if args.extract: algene.extract_fasta() if "longer" in args.verbosity or "all" in args.verbosity: if algene.is_longer(): if args.output: algene.output=args.output algene.fout() else: print(algene.is_longer()) if args.extract: algene.extract_fasta() if "offset" in args.verbosity or "all" in args.verbosity: if algene.is_offset(): if args.output: algene.output=args.output algene.fout() else: print(algene.is_offset()) if args.extract: algene.extract_fasta() if "confirmed" in args.verbosity: if algene.is_equal(): if args.output: algene.output=args.output algene.fout() else: print(algene.is_equal()) if args.extract: algene.extract_fasta() def gff_gene_check(GffName, db_name, memory=0): # The IDs on the GFFs must be unique. Moreover, because only the gene # information are needed, all the other information must be removed # from the GFFs. tempgff="" for line in open(GffName): if line[0] != "#": if line.split()[2] == "gene": tempgff+=line else: tempgff+=line if memory: # Write the db in memory and return it as variable so it can be used # as subclass of _DBCreator dbout = gffutils.create_db(tempgff, ":memory:", from_string=True) return dbout else: gffutils.create_db(tempgff, db_name, from_string=True) def check_strand(start,leng,strand): if strand == "+": end = start+leng else: end = start-leng start,end = end,start return(start,end) def check_position(line, query_db,target_db): # check if there is a gene in the aligned area # lets' start with the coordinatescharacteristics elsp = line.split('\t') dname = elsp[1] dstart = int(elsp[2]) dlen = int(elsp[3]) dstrand = elsp[4] qname = elsp[6] qstart = int(elsp[7]) qlen = int(elsp[8]) qstrand = elsp[9] # check the strand, if - reverse the start and the end qstart,qend = check_strand(qstart,qlen,qstrand) dstart,dend = check_strand(dstart,dlen,dstrand) #counter d_counter = 0 # lists of gene within the coordinates target_genes = [ gene for gene in list(target_db.region(region=(dname, dstart, dend), completely_within=True))] query_genes = [ gene for gene in list(query_db.region(region=(qname, qstart, qend), completely_within=True))] if len(target_genes): # and len(query_genes): ###### # PER PROVARE HO TOLTO QUESTO, DA CONTROLLARE ###### #if len(target_genes) >= len(query_genes): # the number of genes in the aligned area must be bigger in the target (?) diff_gene(query_genes, target_genes, dstart, dend, qstart, qend, query_db) if d_counter>=1: return(d_counter) else: return(0) def main(args): fcounter = 0 # import the GFF library and create (has to be done only once) # the GFF databases target_db = query_db = None db_query_name=args.queryGff + "_db" db_target_name=args.targetGff + "_db" if args.force_database: # remove the database if required by user os.remove(db_query_name) os.remove(db_target_name) if args.use_query_database: # use the db passed by the user query_db = gffutils.FeatureDB(args.use_query_database) else: #create the db #gffutils.create_db(args.queryGff, db_query_name) qdbout = gff_gene_check(args.queryGff, db_query_name, args.memory) if args.memory: query_db = gffutils.FeatureDB(qdbout.dbfn) else: query_db = gffutils.FeatureDB(db_query_name) # except ValueError: # print("Please check your GFF file") # except: # raise Exception(f"It seems you already have a db called {db_query_name}. Use the -qu if you want to use it or delete the") if args.use_target_database: target_db = gffutils.FeatureDB(args.use_target_database) else: tdbout = gff_gene_check(args.targetGff, db_target_name, args.memory) if args.memory: target_db = gffutils.FeatureDB(tdbout.dbfn) else: target_db = gffutils.FeatureDB(db_target_name) # except ValueError: # print("Please check your GFF file")if "all" in args.verbosity: # except: # raise Exception(f"It seems you already have a db called {db_target_name}. Use the -du if you want to use it or delete the") # put the content of the DB in objects with open(args.aln) as maftab: for line in maftab: if line[0] != "#": fcounter += check_position(line, query_db, target_db) if __name__ == '__main__': parser = argparse.ArgumentParser(description = '''Tool to extract genes coordinates from a whole genome alignent. This script needs an alignement in TAB format and two gff files''') parser.add_argument('aln', help='alignment file in TAB format. The suggested way to obtain it is to run Last and\ than convert the file from MAF to TAB with maf-convert') parser.add_argument('queryGff', help='''Gff file of the query organism. The gene IDs in the GFF must be unique.''') parser.add_argument('targetGff', help='''Gff file of the "target" organism. The gene IDs in the GFF must be unique.''') parser.add_argument("-uq", "--use-query-database", help='''Use this parament if you already have a query gffutils formatted database or if it\'s not the first time you run this script''', type=str) parser.add_argument("-ut", "--use-target-database", help='''Use this parament if you already have a target gffutils formatted database or if it\'s not the first time you run this script''', type=str) parser.add_argument("-fd", "--force-database", help="delete old gffutils databases and create new ones", action='store_true') parser.add_argument("-m", "--memory", help='''create an in-memory database. This option can't be used with the other DB options. probably usefull in Galaxy integration''', action='store_true' ) parser.add_argument("-e", "--extract", help='''Extract the fasta sequence of the new suggested gene. It takes two argument: the fasta file of the genome and the name of the output file. This will slow down the process A LOT.''', nargs=2, type=str) #parser.add_argument("-es", "--extract_stout", help='Like -e but it will print the result in the standard output. FASTER than -e. It need the fasta file', type=str) parser.add_argument("-o", "--output", help='Name of the output file. Default output is stout', type=str) parser.add_argument("-t", "--tollerance", help='Interval, in nucleotide, within a gene is considered in the same position. Default 30', default=30, type=int) parser.add_argument("-v", "--verbosity", help='''Output options. If not specify the software shows only the genes that are in the exact position of the genes in the target. It\'s possible to show annotated genes that are in aligned regions but that have different lengths or in slightly different positions. It's possible to select multiple, space separated, values.''', choices=["all","shorter", "longer", "offset", "new", "confirmed"], nargs='*', default='new', type=str) #metavar=('all','shorter','longer','shifted','new'), #action='append', #choices=["all","shorter", "longer", "shifted", "new"], default="new") parser.add_argument('--version', action='version', version='0.1.0') args = parser.parse_args() main(args)