Mercurial > repos > fubar > weblogo3
comparison weblogo3-91fa0a8ce39d/rgweblogo/rgWebLogo3.py @ 1:a76ecde597e1 draft
Uploaded
author | fubar |
---|---|
date | Wed, 04 Dec 2013 19:58:06 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
0:cec2527697f6 | 1:a76ecde597e1 |
---|---|
1 """ | |
2 # modified june 2 ross lazarus to add units option at Assaf Gordon's suggestion | |
3 # rgWebLogo3.py | |
4 # wrapper to check that all fasta files are same length | |
5 | |
6 """ | |
7 import optparse, os, sys, subprocess, tempfile | |
8 | |
9 WEBLOGO = 'weblogo' # executable name for weblogo3 - confusing isn't it? | |
10 | |
11 class WL3: | |
12 """ | |
13 simple wrapper class to check fasta sequence lengths are all identical | |
14 """ | |
15 FASTASTARTSYM = '>' | |
16 badseq = '## error - sequences in file %s are not all the same length - cannot proceed. Please read the tool documentation carefully' | |
17 | |
18 def __init__(self,opts=None): | |
19 assert opts<>None,'WL3 class needs opts passed in - got None' | |
20 self.opts = opts | |
21 self.fastaf = file(self.opts.input,'r') | |
22 self.clparams = {} | |
23 | |
24 def whereis(self,program): | |
25 for path in os.environ.get('PATH', '').split(':'): | |
26 if os.path.exists(os.path.join(path, program)) and not os.path.isdir(os.path.join(path, program)): | |
27 return os.path.join(path, program) | |
28 return None | |
29 | |
30 def runCL(self): | |
31 """ construct and run a command line | |
32 """ | |
33 wl = self.whereis(WEBLOGO) | |
34 if not wl: | |
35 print >> sys.stderr, '## rgWebLogo3.py error - cannot locate the weblogo binary %s on the current path' % WEBLOGO | |
36 print >> sys.stderr, '## Please ensure it is installed and working from http://code.google.com/p/weblogo' | |
37 sys.exit(1) | |
38 cll = [WEBLOGO,] | |
39 cll += [' '.join(it) for it in list(self.clparams.items())] | |
40 cl = ' '.join(cll) | |
41 assert cl > '', 'runCL needs a command line as clparms' | |
42 fd,templog = tempfile.mkstemp(suffix='rgtempRun.txt') | |
43 tlf = open(templog,'w') | |
44 process = subprocess.Popen(cl, shell=True, stderr=tlf, stdout=tlf) | |
45 rval = process.wait() | |
46 tlf.close() | |
47 tlogs = ''.join(open(templog,'r').readlines()) | |
48 if len(tlogs) > 1: | |
49 s = '## executing %s returned status %d and log (stdout/stderr) records: \n%s\n' % (cl,rval,tlogs) | |
50 else: | |
51 s = '## executing %s returned status %d. Nothing appeared on stderr/stdout\n' % (cl,rval) | |
52 os.unlink(templog) # always | |
53 if rval <> 0: | |
54 print >> sys.stderr, '## rgWebLogo3.py error - executing %s returned error code %d' % (cl,rval) | |
55 print >> sys.stderr, '## This may be a data problem or a tool dependency (%s) installation problem' % WEBLOGO | |
56 print >> sys.stderr, '## Please ensure %s is correctly installed and working on the command line -see http://code.google.com/p/weblogo' % WEBLOGO | |
57 sys.exit(1) | |
58 return s | |
59 | |
60 | |
61 def iter_fasta(self): | |
62 """ | |
63 generator for fasta sequences from a file | |
64 """ | |
65 aseq = [] | |
66 seqname = None | |
67 for i,row in enumerate(self.fastaf): | |
68 if row.startswith(self.FASTASTARTSYM): | |
69 if seqname <> None: # already in a sequence | |
70 s = ''.join(aseq) | |
71 l = len(s) | |
72 yield (seqname,l) | |
73 seqname = row[1:].strip() | |
74 aseq = [] | |
75 else: | |
76 if i > 0: | |
77 print >> sys.stderr,'Invalid fasta file %s - does not start with %s - please read the tool documentation carefully' % (self.opts.input,self.FASTASTARTSYM) | |
78 sys.exit(1) | |
79 else: | |
80 seqname = row[1:].strip() | |
81 else: # sequence row | |
82 if seqname == None: | |
83 print >> sys.stderr,'Invalid fasta file %s - does not start with %s - please read the tool documentation carefully' % (self.opts.input,self.FASTASTARTSYM) | |
84 sys.exit(1) | |
85 else: | |
86 aseq.append(row.strip()) | |
87 | |
88 if seqname <> None: # last one | |
89 l = len(''.join(aseq)) | |
90 yield (seqname,l) | |
91 | |
92 | |
93 def fcheck(self): | |
94 """ are all fasta sequence same length? | |
95 might be mongo big | |
96 """ | |
97 flen = None | |
98 lasti = None | |
99 f = self.iter_fasta() | |
100 for i,(seqname,seqlen) in enumerate(f): | |
101 lasti = i | |
102 if i == 0: | |
103 flen = seqlen | |
104 else: | |
105 if seqlen <> flen: | |
106 print >> sys.stderr,self.badseq % self.opts.input | |
107 sys.exit(1) | |
108 return '# weblogo input %s has %d sequences all of length %d' % (self.opts.input,lasti,flen) | |
109 | |
110 | |
111 def run(self): | |
112 check = self.fcheck() | |
113 self.clparams['-f'] = self.opts.input | |
114 self.clparams['-o'] = self.opts.output | |
115 self.clparams['-t'] = '"%s"' % self.opts.logoname # must be wrapped as a string | |
116 self.clparams['-F'] = self.opts.outformat | |
117 if self.opts.size <> None: | |
118 self.clparams['-s'] = self.opts.size | |
119 if self.opts.lower <> None: | |
120 self.clparams['-l'] = self.opts.lower | |
121 if self.opts.upper <> None: | |
122 self.clparams['-u'] = self.opts.upper | |
123 if self.opts.colours <> None: | |
124 self.clparams['-c'] = self.opts.colours | |
125 if self.opts.units <> None: | |
126 self.clparams['-U'] = self.opts.units | |
127 s = self.runCL() | |
128 return check,s | |
129 | |
130 | |
131 if __name__ == '__main__': | |
132 ''' | |
133 called as | |
134 <command interpreter="python"> | |
135 rgWebLogo3.py --outformat $outformat -s $size -i $input -o $output -t "$logoname" -c "$colours" | |
136 #if $range.mode == 'part' | |
137 -l "$range.seqstart" -u "$range.seqend" | |
138 #end if | |
139 </command> | |
140 | |
141 ''' | |
142 op = optparse.OptionParser() | |
143 op.add_option('-i', '--input', default=None) | |
144 op.add_option('-F', '--outformat', default='png') | |
145 op.add_option('-s', '--size', default=None) | |
146 op.add_option('-o', '--output', default='rgWebLogo3') | |
147 op.add_option('-t', '--logoname', default='rgWebLogo3') | |
148 op.add_option('-c', '--colours', default=None) | |
149 op.add_option('-l', '--lower', default=None) | |
150 op.add_option('-u', '--upper', default=None) | |
151 op.add_option('-U', '--units', default=None) | |
152 opts, args = op.parse_args() | |
153 assert opts.input <> None,'weblogo3 needs a -i parameter with a fasta input file - cannot open' | |
154 assert os.path.isfile(opts.input),'weblogo3 needs a valid fasta input file - cannot open %s' % opts.input | |
155 w = WL3(opts) | |
156 checks,s = w.run() | |
157 print >> sys.stdout, checks # for info |