Mercurial > repos > fubar2 > toolfactory_gtn
comparison toolfactory/ToolFactory_tester.py @ 0:f288fab71d8b draft default tip
Uploaded
author | fubar2 |
---|---|
date | Mon, 26 Apr 2021 04:18:54 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:f288fab71d8b |
---|---|
1 # see https://github.com/fubar2/toolfactory | |
2 # | |
3 # copyright ross lazarus (ross stop lazarus at gmail stop com) May 2012 | |
4 # | |
5 # all rights reserved | |
6 # Licensed under the LGPL | |
7 # suggestions for improvement and bug fixes welcome at | |
8 # https://github.com/fubar2/toolfactory | |
9 # | |
10 # July 2020: BCC was fun and I feel like rip van winkle after 5 years. | |
11 # Decided to | |
12 # 1. Fix the toolfactory so it works - done for simplest case | |
13 # 2. Fix planemo so the toolfactory function works | |
14 # 3. Rewrite bits using galaxyxml functions where that makes sense - done | |
15 | |
16 import argparse | |
17 import copy | |
18 import os | |
19 import subprocess | |
20 import shutil | |
21 import sys | |
22 import tarfile | |
23 import tempfile | |
24 import time | |
25 | |
26 myversion = "V2.2 April 2021" | |
27 verbose = True | |
28 debug = True | |
29 toolFactoryURL = "https://github.com/fubar2/toolfactory" | |
30 | |
31 def timenow(): | |
32 """return current time as a string""" | |
33 return time.strftime("%d/%m/%Y %H:%M:%S", time.localtime(time.time())) | |
34 | |
35 class ToolTester(): | |
36 # requires highly insecure docker settings - like write to tool_conf.xml and to tools ! | |
37 # if in a container possibly not so courageous. | |
38 # Fine on your own laptop but security red flag for most production instances | |
39 # uncompress passed tar, run planemo and rebuild a new tarball with tests | |
40 | |
41 def __init__(self, args=None, in_tool_archive='/galaxy-central/tools/newtool/newtool_toolshed.gz', new_tool_archive=None): | |
42 self.args = args | |
43 self.new_tool_archive = new_tool_archive | |
44 assert tarfile.is_tarfile(in_tool_archive) | |
45 # this is not going to go well with arbitrary names. TODO introspect tool xml! | |
46 self.tooloutdir = "./tfout" | |
47 self.repdir = "./TF_run_report" | |
48 self.testdir = os.path.join(self.tooloutdir, "test-data") | |
49 if not os.path.exists(self.tooloutdir): | |
50 os.mkdir(self.tooloutdir) | |
51 if not os.path.exists(self.testdir): | |
52 os.mkdir(self.testdir) | |
53 if not os.path.exists(self.repdir): | |
54 os.mkdir(self.repdir) | |
55 tff = tarfile.open(in_tool_archive, "r:*") | |
56 flist = tff.getnames() | |
57 ourdir = os.path.commonpath(flist) # eg pyrevpos | |
58 self.tool_name = ourdir | |
59 ourxmls = [x for x in flist if x.lower().endswith('.xml') and os.path.split(x)[0] == ourdir] | |
60 assert len(ourxmls) > 0 | |
61 self.ourxmls = ourxmls # [os.path.join(tool_path,x) for x in ourxmls] | |
62 res = tff.extractall() | |
63 tff.close() | |
64 self.update_tests(ourdir) | |
65 self.makeTool() | |
66 self.moveRunOutputs() | |
67 self.makeToolTar() | |
68 | |
69 def call_planemo(self,xmlpath,ourdir): | |
70 penv = os.environ | |
71 penv['HOME'] = '/home/ross/galaxy-release_21.01' | |
72 toolfile = os.path.split(xmlpath)[1] | |
73 tool_name = self.tool_name | |
74 tool_test_output = f"{tool_name}_planemo_test_report.html" | |
75 cll = [ | |
76 "planemo", | |
77 "test", | |
78 "--test_output", | |
79 os.path.abspath(tool_test_output), | |
80 "--galaxy_root", | |
81 self.args.galaxy_root, | |
82 "--update_test_data", | |
83 os.path.abspath(xmlpath), | |
84 ] | |
85 print(cll) | |
86 p = subprocess.run( | |
87 cll, | |
88 capture_output=True, | |
89 encoding='utf8', | |
90 env = penv, | |
91 shell=False, | |
92 ) | |
93 return p | |
94 | |
95 def makeTool(self): | |
96 """write xmls and input samples into place""" | |
97 for xreal in self.ourxmls: | |
98 x = os.path.split(xreal)[1] | |
99 xout = os.path.join(self.tooloutdir,x) | |
100 shutil.copyfile(xreal, xout) | |
101 # for p in self.infiles: | |
102 # pth = p["name"] | |
103 # dest = os.path.join(self.testdir, "%s_sample" % p["infilename"]) | |
104 # shutil.copyfile(pth, dest) | |
105 # dest = os.path.join(self.repdir, "%s_sample" % p["infilename"]) | |
106 # shutil.copyfile(pth, dest) | |
107 | |
108 def makeToolTar(self): | |
109 """move outputs into test-data and prepare the tarball""" | |
110 excludeme = "_planemo_test_report.html" | |
111 | |
112 def exclude_function(tarinfo): | |
113 filename = tarinfo.name | |
114 return None if filename.endswith(excludeme) else tarinfo | |
115 | |
116 newtar = 'new_%s_toolshed.gz' % self.tool_name | |
117 ttf = tarfile.open(newtar, "w:gz") | |
118 ttf.add(name=self.tooloutdir, | |
119 arcname=self.tool_name, | |
120 filter=exclude_function) | |
121 ttf.close() | |
122 shutil.copyfile(newtar, self.new_tool_archive) | |
123 | |
124 def moveRunOutputs(self): | |
125 """need to move planemo or run outputs into toolfactory collection""" | |
126 with os.scandir(self.tooloutdir) as outs: | |
127 for entry in outs: | |
128 if not entry.is_file(): | |
129 continue | |
130 if "." in entry.name: | |
131 _, ext = os.path.splitext(entry.name) | |
132 if ext in [".tgz", ".json"]: | |
133 continue | |
134 if ext in [".yml", ".xml", ".yaml"]: | |
135 newname = f"{entry.name.replace('.','_')}.txt" | |
136 else: | |
137 newname = entry.name | |
138 else: | |
139 newname = f"{entry.name}.txt" | |
140 dest = os.path.join(self.repdir, newname) | |
141 src = os.path.join(self.tooloutdir, entry.name) | |
142 shutil.copyfile(src, dest) | |
143 with os.scandir('.') as outs: | |
144 for entry in outs: | |
145 if not entry.is_file(): | |
146 continue | |
147 if "." in entry.name: | |
148 _, ext = os.path.splitext(entry.name) | |
149 if ext in [".yml", ".xml", ".yaml"]: | |
150 newname = f"{entry.name.replace('.','_')}.txt" | |
151 else: | |
152 newname = entry.name | |
153 else: | |
154 newname = f"{entry.name}.txt" | |
155 dest = os.path.join(self.repdir, newname) | |
156 src =entry.name | |
157 shutil.copyfile(src, dest) | |
158 if True or self.args.include_tests: | |
159 with os.scandir(self.testdir) as outs: | |
160 for entry in outs: | |
161 if (not entry.is_file()) or entry.name.endswith( | |
162 "_planemo_test_report.html" | |
163 ): | |
164 continue | |
165 if "." in entry.name: | |
166 _, ext = os.path.splitext(entry.name) | |
167 if ext in [".tgz", ".json"]: | |
168 continue | |
169 if ext in [".yml", ".xml", ".yaml"]: | |
170 newname = f"{entry.name.replace('.','_')}.txt" | |
171 else: | |
172 newname = entry.name | |
173 else: | |
174 newname = f"{entry.name}.txt" | |
175 dest = os.path.join(self.repdir, newname) | |
176 src = os.path.join(self.testdir, entry.name) | |
177 shutil.copyfile(src, dest) | |
178 | |
179 | |
180 def update_tests(self,ourdir): | |
181 for xmlf in self.ourxmls: | |
182 capture = self.call_planemo(xmlf,ourdir) | |
183 #sys.stderr.write('%s, stdout=%s, stderr=%s' % (xmlf, capture.stdout, capture.stdout)) | |
184 print('%s, stdout=%s, stderr=%s' % (capture.stdout, capture.stdout,xmlf)) | |
185 | |
186 def main(): | |
187 """ | |
188 This is a Galaxy wrapper. | |
189 It expects to be called by a special purpose tool.xml | |
190 | |
191 """ | |
192 parser = argparse.ArgumentParser() | |
193 a = parser.add_argument | |
194 a("--in_tool_archive", default=None) | |
195 a("--new_tested_tool_archive", default=None) | |
196 a("--galaxy_root", default="/home/ross/galaxy-release_21.01/") | |
197 args = parser.parse_args() | |
198 print('Hello from',os.getcwd()) | |
199 tt = ToolTester(args=args, in_tool_archive=args.in_tool_archive, new_tool_archive=args.new_tested_tool_archive) | |
200 | |
201 if __name__ == "__main__": | |
202 main() |