Mercurial > repos > fubar > toolfactory_gtn
diff toolfactory/ToolFactory.py @ 6:efefe43f23c8 draft default tip
Uploaded
author | fubar |
---|---|
date | Fri, 30 Apr 2021 02:10:32 +0000 (2021-04-30) |
parents | e2c8c2fa192d |
children |
line wrap: on
line diff
--- a/toolfactory/ToolFactory.py Tue Apr 27 23:33:49 2021 +0000 +++ b/toolfactory/ToolFactory.py Fri Apr 30 02:10:32 2021 +0000 @@ -28,6 +28,7 @@ import tarfile import tempfile import time +import urllib from bioblend import ConnectionError from bioblend import galaxy @@ -70,13 +71,14 @@ citation_tuples.append(("bibtex", citation[len("bibtex") :].strip())) return citation_tuples -class ToolConfUpdater(): + +class Tool_Conf_Updater(): # update config/tool_conf.xml with a new tool unpacked in /tools # requires highly insecure docker settings - like write to tool_conf.xml and to tools ! # if in a container possibly not so courageous. # Fine on your own laptop but security red flag for most production instances - def __init__(self, args, tool_conf_path, new_tool_archive_path, new_tool_name, tool_dir): + def __init__(self, args, tool_conf_path, new_tool_archive_path, new_tool_name, tool_dir): self.args = args self.tool_conf_path = os.path.join(args.galaxy_root,tool_conf_path) self.tool_dir = os.path.join(args.galaxy_root, tool_dir) @@ -138,7 +140,7 @@ if False and self.args.packages and self.args.packages > '': self.install_deps() -class ScriptRunner: +class Tool_Factory: """Wrapper for an arbitrary script uses galaxyxml @@ -194,9 +196,7 @@ ) self.args = args self.cleanuppar() - self.lastclredirect = None self.lastxclredirect = None - self.cl = [] self.xmlcl = [] self.is_positional = self.args.parampass == "positional" if self.args.sysexe: @@ -209,7 +209,6 @@ self.executeme = [self.args.packages.split(",")[0].split(":")[0].strip(), ] else: self.executeme = None - aCL = self.cl.append aXCL = self.xmlcl.append assert args.parampass in [ "0", @@ -252,13 +251,10 @@ self.test_override = None if self.args.script_path: for ex in self.executeme: - aCL(ex) aXCL(ex) - aCL(self.sfile) aXCL("$runme") else: for ex in self.executeme: - aCL(ex) aXCL(ex) if self.args.parampass == "0": @@ -273,35 +269,23 @@ def clsimple(self): """no parameters or repeats - uses < and > for i/o""" - aCL = self.cl.append aXCL = self.xmlcl.append if len(self.infiles) > 0: - aCL("<") - aCL(self.infiles[0]["infilename"]) aXCL("<") aXCL("$%s" % self.infiles[0]["infilename"]) if len(self.outfiles) > 0: - aCL(">") - aCL(self.outfiles[0]["name"]) aXCL(">") aXCL("$%s" % self.outfiles[0]["name"]) if self.args.cl_user_suffix: # DIY CL end clp = shlex.split(self.args.cl_user_suffix) for c in clp: - aCL(c) aXCL(c) def prepargp(self): - clsuffix = [] xclsuffix = [] for i, p in enumerate(self.infiles): nam = p["infilename"] if p["origCL"].strip().upper() == "STDIN": - appendme = [ - nam, - nam, - "< %s" % nam, - ] xappendme = [ nam, nam, @@ -312,16 +296,12 @@ over = "" if rep: over = f'#for $rep in $R_{nam}:\n--{nam} "$rep.{nam}"\n#end for' - appendme = [p["CL"], p["CL"], ""] xappendme = [p["CL"], "$%s" % p["CL"], over] - clsuffix.append(appendme) xclsuffix.append(xappendme) for i, p in enumerate(self.outfiles): if p["origCL"].strip().upper() == "STDOUT": - self.lastclredirect = [">", p["name"]] self.lastxclredirect = [">", "$%s" % p["name"]] else: - clsuffix.append([p["name"], p["name"], ""]) xclsuffix.append([p["name"], "$%s" % p["name"], ""]) for p in self.addpar: nam = p["name"] @@ -330,40 +310,27 @@ over = f'#for $rep in $R_{nam}:\n--{nam} "$rep.{nam}"\n#end for' else: over = p["override"] - clsuffix.append([p["CL"], nam, over]) xclsuffix.append([p["CL"], '"$%s"' % nam, over]) for p in self.selpar: - clsuffix.append([p["CL"], p["name"], p["override"]]) xclsuffix.append([p["CL"], '"$%s"' % p["name"], p["override"]]) self.xclsuffix = xclsuffix - self.clsuffix = clsuffix def prepclpos(self): - clsuffix = [] xclsuffix = [] for i, p in enumerate(self.infiles): if p["origCL"].strip().upper() == "STDIN": - appendme = [ - "999", - p["infilename"], - "< $%s" % p["infilename"], - ] xappendme = [ "999", p["infilename"], "< $%s" % p["infilename"], ] else: - appendme = [p["CL"], p["infilename"], ""] xappendme = [p["CL"], "$%s" % p["infilename"], ""] - clsuffix.append(appendme) xclsuffix.append(xappendme) for i, p in enumerate(self.outfiles): if p["origCL"].strip().upper() == "STDOUT": - self.lastclredirect = [">", p["name"]] self.lastxclredirect = [">", "$%s" % p["name"]] else: - clsuffix.append([p["CL"], p["name"], ""]) xclsuffix.append([p["CL"], "$%s" % p["name"], ""]) for p in self.addpar: nam = p["name"] @@ -371,15 +338,11 @@ if rep: print(f'### warning. Repeats for {nam} ignored - not permitted in positional parameter command lines!') over = p["override"] - clsuffix.append([p["CL"], nam, over]) xclsuffix.append([p["CL"], '"$%s"' % nam, over]) for p in self.selpar: - clsuffix.append([p["CL"], p["name"], p["override"]]) xclsuffix.append([p["CL"], '"$%s"' % p["name"], p["override"]]) - clsuffix.sort() xclsuffix.sort() self.xclsuffix = xclsuffix - self.clsuffix = clsuffix def prepScript(self): rx = open(self.args.script_path, "r").readlines() @@ -443,12 +406,6 @@ def clpositional(self): # inputs in order then params - aCL = self.cl.append - for (k, v, koverride) in self.clsuffix: - if " " in v: - aCL("%s" % v) - else: - aCL(v) aXCL = self.xmlcl.append for (k, v, koverride) in self.xclsuffix: aXCL(v) @@ -458,13 +415,11 @@ if self.args.cl_user_suffix: # DIY CL end clp = shlex.split(self.args.cl_user_suffix) for c in clp: - aCL(c) aXCL(c) def clargparse(self): """argparse style""" - aCL = self.cl.append aXCL = self.xmlcl.append # inputs then params in argparse named form @@ -479,22 +434,12 @@ k = "--%s" % k aXCL(k) aXCL(v) - for (k, v, koverride) in self.clsuffix: - if koverride > "": - k = koverride - elif len(k.strip()) == 1: - k = "-%s" % k - else: - k = "--%s" % k - aCL(k) - aCL(v) if self.lastxclredirect: aXCL(self.lastxclredirect[0]) aXCL(self.lastxclredirect[1]) if self.args.cl_user_suffix: # DIY CL end clp = shlex.split(self.args.cl_user_suffix) for c in clp: - aCL(c) aXCL(c) def getNdash(self, newname): @@ -857,80 +802,6 @@ xf.close() # ready for the tarball - def run(self): #noqa - """ - generate test outputs by running a command line - won't work if command or test override in play - planemo is the - easiest way to generate test outputs for that case so is - automagically selected - """ - scl = " ".join(self.cl) - err = None - logname = f"{self.tool_name}_runner_log" - if self.args.parampass != "0": - if self.lastclredirect: - logf = open(self.lastclredirect[1], "wb") # is name of an output file - else: - logf = open(logname,'w') - logf.write("No dependencies so sending CL = '%s' to the fast direct runner instead of planemo to generate tests" % scl) - subp = subprocess.run( - self.cl, shell=False, stdout=logf, stderr=logf - ) - logf.close() - retval = subp.returncode - else: # work around special case - stdin and write to stdout - if len(self.infiles) > 0: - sti = open(self.infiles[0]["name"], "rb") - else: - sti = sys.stdin - if len(self.outfiles) > 0: - sto = open(self.outfiles[0]["name"], "wb") - else: - sto = sys.stdout - subp = subprocess.run( - self.cl, shell=False, stdout=sto, stdin=sti - ) - retval = subp.returncode - sto.close() - sti.close() - if retval != 0 and err: # problem - sys.stderr.write(err) - for p in self.outfiles: - oname = p["name"] - tdest = os.path.join(self.testdir, "%s_sample" % oname) - if not os.path.isfile(tdest): - if os.path.isfile(oname): - shutil.copyfile(oname, tdest) - dest = os.path.join(self.repdir, "%s.sample.%s" % (oname,p['format'])) - shutil.copyfile(oname, dest) - else: - if report_fail: - tout.write( - "###Tool may have failed - output file %s not found in testdir after planemo run %s." - % (oname, self.testdir) - ) - for p in self.infiles: - pth = p["name"] - dest = os.path.join(self.testdir, "%s_sample" % p["infilename"]) - shutil.copyfile(pth, dest) - dest = os.path.join(self.repdir, "%s_sample.%s" % (p["infilename"],p["format"])) - shutil.copyfile(pth, dest) - with os.scandir('.') as outs: - for entry in outs: - newname = entry.name - if not entry.is_file() or entry.name.endswith('_sample'): - continue - if not (entry.name.endswith('.html') or entry.name.endswith('.gz') or entry.name.endswith(".tgz")): - fname, ext = os.path.splitext(entry.name) - if len(ext) > 1: - newname = f"{fname}_{ext[1:]}.txt" - else: - newname = f"{fname}.txt" - dest = os.path.join(self.repdir, newname) - src = entry.name - shutil.copyfile(src, dest) - return retval - def writeShedyml(self): """for planemo""" yuser = self.args.user_email.split("@")[0] @@ -1068,6 +939,7 @@ a("--galaxy_venv", default="/galaxy_venv") a("--collection", action="append", default=[]) a("--include_tests", default=False, action="store_true") + a("--admin_only", default=False, action="store_true") a("--install", default=False, action="store_true") a("--run_test", default=False, action="store_true") a("--local_tools", default='tools') # relative to $__root_dir__ @@ -1079,30 +951,20 @@ a("--toolshed_api_key", default="fakekey") a("--galaxy_api_key", default="8993d65865e6d6d1773c2c34a1cc207d") args = parser.parse_args() - assert not args.bad_user, ( - 'UNAUTHORISED: %s is NOT authorized to use this tool until Galaxy \ + if args.admin_only: + assert not args.bad_user, ( + 'UNAUTHORISED: %s is NOT authorized to use this tool until Galaxy \ admin adds %s to "admin_users" in the galaxy.yml Galaxy configuration file' - % (args.bad_user, args.bad_user) + % (args.bad_user, args.bad_user) ) assert args.tool_name, "## Tool Factory expects a tool name - eg --tool_name=DESeq" - assert ( - args.sysexe or args.packages - ), "## Tool Factory wrapper expects an interpreter \ -or an executable package in --sysexe or --packages" - print('Hello from',os.getcwd()) - r = ScriptRunner(args) + r = Tool_Factory(args) r.writeShedyml() r.makeTool() r.makeToolTar() - if args.run_test: - if not args.packages or args.packages.strip() == "bash": - r.run() - r.makeToolTar() - else: - tt = ToolTester(report_dir=r.repdir, in_tool_archive=r.newtarpath, new_tool_archive=r.args.new_tool, galaxy_root=args.galaxy_root, include_tests=False) if args.install: #try: - tcu = ToolConfUpdater(args=args, tool_dir=args.local_tools, + tcu = Tool_Conf_Updater(args=args, tool_dir=args.local_tools, new_tool_archive_path=r.newtarpath, tool_conf_path=args.tool_conf_path, new_tool_name=r.tool_name) #except Exception: