Mercurial > repos > jjohnson > ensembl_variant_report
comparison ensemblref.py @ 0:9f4ea174ce3d draft
planemo upload for repository https://github.com/jj-umn/galaxytools/tree/master/ensembl_variant_report commit e6aa05bbbee3cc7d98f16354fc41c674f439ff1b-dirty
author | jjohnson |
---|---|
date | Thu, 14 Jun 2018 17:51:39 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9f4ea174ce3d |
---|---|
1 """ | |
2 with gtf and twobit | |
3 get_bed(transcript_id) | |
4 """ | |
5 from gtf_to_genes import gene, gene_utilities | |
6 from twobitreader import TwoBitFile | |
7 from Bio.Seq import reverse_complement, translate | |
8 import logging | |
9 logger = logging.getLogger("test") | |
10 | |
11 class EnsemblRef(object): | |
12 | |
13 def __init__(self,gtf_file,twobitfile,read_now=True): | |
14 self.gtf_file = gtf_file | |
15 self.twobitfile = twobitfile | |
16 self.twobit = TwoBitFile(self.twobitfile) | |
17 self.gene_dict = None | |
18 self.transcript_idx = None | |
19 self.name_idx = None | |
20 if read_now: | |
21 self.get_transcript_idx() | |
22 | |
23 | |
24 def get_gene_dict(self): | |
25 if self.gene_dict is None: | |
26 gene_structures = gene.t_parse_gtf('test') | |
27 self.gene_dict = gene_structures.get_genes(self.gtf_file,logger=logger) | |
28 return self.gene_dict | |
29 | |
30 | |
31 def get_transcript_idx(self): | |
32 if self.transcript_idx is None: | |
33 self.transcript_idx = gene_utilities.index_transcripts(self.get_gene_dict(),by_prot_id=False) | |
34 return self.transcript_idx | |
35 | |
36 | |
37 def get_name_idx(self): | |
38 if self.name_idx is None: | |
39 self.name_idx = dict() | |
40 for i,t in self.get_transcript_idx().items(): | |
41 for name in t.gene.names: | |
42 self.name_idx[name] = t.gene | |
43 for name in t.names: | |
44 self.name_idx[name] = t | |
45 if t.prot_id: | |
46 self.name_idx[t.prot_id] = t | |
47 return self.name_idx | |
48 | |
49 | |
50 def get_gtf_transcript(self,name): | |
51 idx = self.get_transcript_idx() | |
52 if name in idx: | |
53 return idx[name] | |
54 else: | |
55 nidx = self.get_name_idx() | |
56 if name in nidx: | |
57 return nidx[name] | |
58 return None | |
59 | |
60 | |
61 def transcript_is_coding(self,transcript_id): | |
62 tx = self.get_transcript_idx()[transcript_id] | |
63 return len(tx.start_codons) > 0 | |
64 | |
65 | |
66 def get_transcript_start_codon(self,transcript_id): | |
67 tx = self.get_transcript_idx()[transcript_id] | |
68 return tx.start_codons[0] if len(tx.start_codons) > 0 else None | |
69 | |
70 | |
71 def get_bed_line(self,transcript_id,score=0,itemRgb='0,0,0',coding=False): | |
72 tx = self.get_transcript_idx()[transcript_id] | |
73 chrom = tx.gene.contig | |
74 chromStart = tx.coding_beg if coding else tx.beg | |
75 chromEnd = tx.coding_end if coding else tx.end | |
76 name = transcript_id | |
77 strand = '+' if tx.gene.strand else '-' | |
78 thickStart = tx.coding_beg if tx.coding_beg else chromStart | |
79 thickEnd = tx.coding_end if tx.coding_end else chromEnd | |
80 exons = tx.get_coding_exons() if coding else tx.get_exons() | |
81 blockCount = len(exons) | |
82 if tx.gene.strand: | |
83 strand = '+' | |
84 blockSizes = [abs(e-s) for s,e in exons] | |
85 blockStarts = [s - chromStart for s,e in exons] | |
86 else: | |
87 strand = '-' | |
88 blockSizes = [abs(e-s) for s,e in reversed(exons)] | |
89 blockStarts = [s - chromStart for s,e in reversed(exons)] | |
90 blockSizes = ','.join([str(x) for x in blockSizes]) | |
91 blockStarts = ','.join([str(x) for x in blockStarts]) | |
92 return '%s\t%d\t%d\t%s\t%s\t%s\t%d\t%d\t%s\t%d\t%s\t%s' % (chrom,chromStart,chromEnd,name,score,strand,thickStart,thickEnd,itemRgb,blockCount,blockSizes,blockStarts) | |
93 | |
94 | |
95 def transcripts_in_range(self,chrom,startpos,endpos,strand=None): | |
96 spos = min(startpos,endpos) if endpos else startpos | |
97 epos = max(startpos,endpos) if endpos else startpos | |
98 transcripts = [] | |
99 for i,t in self.get_transcript_idx().items(): | |
100 if t.gene.contig == chrom and t.beg <= epos and spos <= t.end: | |
101 if strand and t.gene.strand != strand: | |
102 continue | |
103 transcripts.append(t) | |
104 return transcripts | |
105 | |
106 | |
107 def genes_in_range(self,chrom,startpos,endpos,strand=None,gene_types=None): | |
108 spos = min(startpos,endpos) if endpos else startpos | |
109 epos = max(startpos,endpos) if endpos else startpos | |
110 gene_dict = self.get_gene_dict() | |
111 gtypes = set(gene_types) & set(gene_dict.keys()) if gene_types else set(gene_dict.keys()) | |
112 genes = [] | |
113 for gt in gtypes: | |
114 for gene in gene_dict[gt]: | |
115 if gene.contig == chrom and gene.beg <= epos and spos <= gene.end: | |
116 if strand and gene.strand != strand: | |
117 continue | |
118 genes.append(gene) | |
119 return genes | |
120 | |
121 | |
122 def get_sequence(self,chrom,start,end): | |
123 if self.twobit: | |
124 if chrom in self.twobit: | |
125 return self.twobit[chrom][start:end] | |
126 contig = chrom[3:] if chrom.startswith('chr') else 'chr%s' % chrom | |
127 if contig in self.twobit: | |
128 return self.twobit[contig][start:end] | |
129 return None | |
130 | |
131 | |
132 def sequence_sizes(self): | |
133 return self.twobit.sequence_sizes() | |
134 | |
135 | |
136 def get_transcript_seq(self,transcript_id,coding=False): | |
137 tx = self.get_transcript_idx()[transcript_id] | |
138 chrom = tx.gene.contig | |
139 exonbnds = tx.get_coding_exons() if coding else tx.get_exons() | |
140 if tx.gene.strand: | |
141 seqs = [self.get_sequence(chrom,s,e) for s,e in exonbnds] | |
142 else: | |
143 seqs = [reverse_complement(self.get_sequence(chrom,s,e)) for s,e in exonbnds] | |
144 return ''.join(seqs) | |
145 | |
146 | |
147 def get_cdna(self,transcript_id): | |
148 return self.get_transcript_seq(transcript_id,coding=False) | |
149 | |
150 | |
151 def get_cds(self,transcript_id): | |
152 return self.get_transcript_seq(transcript_id,coding=True) | |
153 | |
154 | |
155 def genome_to_transcript_pos(self,transcript_id,genome_pos,coding=False): | |
156 tx = self.get_transcript_idx()[transcript_id] | |
157 if not tx.beg <= genome_pos < tx.end: | |
158 return None | |
159 exonbnds = tx.get_coding_exons() if coding else tx.get_exons() | |
160 cdna_pos = 0 | |
161 if tx.gene.strand: | |
162 for s,e in exonbnds: | |
163 if s <= genome_pos < e: | |
164 cdna_pos += genome_pos - s | |
165 break | |
166 else: | |
167 cdna_pos += e - s | |
168 else: | |
169 for s,e in exonbnds: | |
170 if s <= genome_pos < e: | |
171 cdna_pos += e - genome_pos - 1 | |
172 break | |
173 else: | |
174 cdna_pos += e - s | |
175 return cdna_pos | |
176 | |
177 | |
178 def genome_to_cdna_pos(self,transcript_id,genome_pos): | |
179 return self.genome_to_transcript_pos(transcript_id,genome_pos,coding=False) | |
180 | |
181 | |
182 def genome_to_cds_pos(self,transcript_id,genome_pos): | |
183 return self.genome_to_transcript_pos(transcript_id,genome_pos,coding=True) | |
184 | |
185 |