0
|
1 """
|
|
2 SnpEff datatypes
|
|
3 """
|
|
4 import os,os.path,re,sys,gzip,logging
|
|
5 import galaxy.datatypes.data
|
|
6 from galaxy.datatypes.data import Text
|
|
7 from galaxy.datatypes.metadata import MetadataElement
|
|
8
|
|
9 log = logging.getLogger(__name__)
|
|
10
|
|
11 class SnpEffDb( Text ):
|
|
12 """Class describing a SnpEff genome build"""
|
|
13 file_ext = "snpeffdb"
|
|
14 MetadataElement( name="genome_version", default=None, desc="Genome Version", readonly=True, visible=True, no_value=None )
|
|
15 MetadataElement( name="snpeff_version", default="SnpEff4.0", desc="SnpEff Version", readonly=True, visible=True, no_value=None )
|
|
16 MetadataElement( name="regulation", default=[], desc="Regulation Names", readonly=True, visible=True, no_value=[], optional=True)
|
|
17 MetadataElement( name="annotation", default=[], desc="Annotation Names", readonly=True, visible=True, no_value=[], optional=True)
|
|
18
|
|
19 def __init__( self, **kwd ):
|
|
20 Text.__init__( self, **kwd )
|
|
21
|
|
22 "" The SnpEff version line was added in SnpEff version 4.1
|
|
23 def getSnpeffVersionFromFile(self, path):
|
|
24 snpeff_version = None
|
|
25 try:
|
|
26 fh = gzip.open(path, 'rb')
|
|
27 buf = fh.read(100)
|
|
28 lines = buf.splitlines()
|
|
29 m = re.match('^(SnpEff)\s+(\d+\.\d+).*$',lines[0].strip())
|
|
30 if m:
|
|
31 snpeff_version = m.groups()[0] + m.groups()[1]
|
|
32 fh.close()
|
|
33 except Exception, e:
|
|
34 pass
|
|
35 return snpeff_version
|
|
36
|
|
37 def set_meta( self, dataset, **kwd ):
|
|
38 Text.set_meta(self, dataset, **kwd )
|
|
39 data_dir = dataset.extra_files_path
|
|
40 ## search data_dir/genome_version for files
|
|
41 regulation_pattern = 'regulation_(.+).bin'
|
|
42 # annotation files that are included in snpEff by a flag
|
|
43 annotations_dict = {'nextProt.bin' : '-nextprot','motif.bin': '-motif'}
|
|
44 regulations = []
|
|
45 annotations = []
|
|
46 genome_version = None
|
|
47 snpeff_version = None
|
|
48 if data_dir and os.path.isdir(data_dir):
|
|
49 for root, dirs, files in os.walk(data_dir):
|
|
50 for fname in files:
|
|
51 if fname.startswith('snpEffectPredictor'):
|
|
52 # if snpEffectPredictor.bin download succeeded
|
|
53 genome_version = os.path.basename(root)
|
|
54 dataset.metadata.genome_version = genome_version
|
|
55 else:
|
|
56 m = re.match(regulation_pattern,fname)
|
|
57 if m:
|
|
58 name = m.groups()[0]
|
|
59 regulations.append(name)
|
|
60 elif fname in annotations_dict:
|
|
61 value = annotations_dict[fname]
|
|
62 name = value.lstrip('-')
|
|
63 annotations.append(name)
|
|
64 dataset.metadata.regulation = regulations
|
|
65 dataset.metadata.annotation = annotations
|
|
66 try:
|
|
67 fh = file(dataset.file_name,'w')
|
|
68 fh.write("%s\n" % genome_version if genome_version else 'Genome unknown')
|
|
69 fh.write("%s\n" % snpeff_version if snpeff_version else 'SnpEff version unknown')
|
|
70 if annotations:
|
|
71 fh.write("annotations: %s\n" % ','.join(annotations))
|
|
72 if regulations:
|
|
73 fh.write("regulations: %s\n" % ','.join(regulations))
|
|
74 fh.close()
|
|
75 except:
|
|
76 pass
|
|
77
|