Mercurial > repos > fubar > tool_factory_2
diff toolfactory/rgToolFactory2.py @ 30:6f48315c32c1 draft
Uploaded
author | fubar |
---|---|
date | Fri, 07 Aug 2020 07:54:23 -0400 |
parents | |
children | 4d578c8c1613 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/toolfactory/rgToolFactory2.py Fri Aug 07 07:54:23 2020 -0400 @@ -0,0 +1,610 @@ +# rgToolFactory.py +# see https://github.com/fubar2/toolfactory +# +# copyright ross lazarus (ross stop lazarus at gmail stop com) May 2012 +# +# all rights reserved +# Licensed under the LGPL +# suggestions for improvement and bug fixes welcome at https://github.com/fubar2/toolfactory +# +# July 2020: BCC was fun and I feel like rip van winkle after 5 years. +# Decided to +# 1. Fix the toolfactory so it works - done for simplest case +# 2. Fix planemo so the toolfactory function works +# 3. Rewrite bits using galaxyxml functions where that makes sense - done +# +# removed all the old complications including making the new tool use this same script +# galaxyxml now generates the tool xml https://github.com/hexylena/galaxyxml +# No support for automatic HTML file creation from arbitrary outputs +# TODO: add option to run that code as a post execution hook +# TODO: add additional history input parameters - currently only one + +import sys +import subprocess +import shutil +import os +import time +import tempfile +import argparse +import tarfile +import re +import galaxyxml.tool as gxt +import galaxyxml.tool.parameters as gxtp +import logging + + +progname = os.path.split(sys.argv[0])[1] +myversion = 'V2.1 July 2020' +verbose = True +debug = True +toolFactoryURL = 'https://github.com/fubar2/toolfactory' +ourdelim = '~~~' + +# --input_files="$input_files~~~$CL~~~$input_formats~~~$input_label~~~$input_help" +IPATHPOS = 0 +ICLPOS = 1 +IFMTPOS = 2 +ILABPOS = 3 +IHELPOS = 4 +IOCLPOS = 5 +# --output_files "$otab.history_name~~~$otab.history_format~~~$otab.CL +ONAMEPOS = 0 +OFMTPOS = 1 +OCLPOS = 2 +OOCLPOS = 3 + +#--additional_parameters="$i.param_name~~~$i.param_value~~~$i.param_label~~~$i.param_help~~~$i.param_type~~~$i.CL" +ANAMEPOS = 0 +AVALPOS = 1 +ALABPOS = 2 +AHELPPOS = 3 +ATYPEPOS = 4 +ACLPOS = 5 +AOCLPOS = 6 + +def timenow(): + """return current time as a string + """ + return time.strftime('%d/%m/%Y %H:%M:%S', time.localtime(time.time())) + + +def quote_non_numeric(s): + """return a prequoted string for non-numerics + useful for perl and Rscript parameter passing? + """ + try: + _ = float(s) + return s + except ValueError: + return '"%s"' % s + + +html_escape_table = { + "&": "&", + ">": ">", + "<": "<", + "$": r"\$" +} + + +def html_escape(text): + """Produce entities within text.""" + return "".join(html_escape_table.get(c, c) for c in text) + + +def html_unescape(text): + """Revert entities within text. Multiple character targets so use replace""" + t = text.replace('&', '&') + t = t.replace('>', '>') + t = t.replace('<', '<') + t = t.replace('\\$', '$') + return t + + +def parse_citations(citations_text): + """ + """ + citations = [c for c in citations_text.split("**ENTRY**") if c.strip()] + citation_tuples = [] + for citation in citations: + if citation.startswith("doi"): + citation_tuples.append(("doi", citation[len("doi"):].strip())) + else: + citation_tuples.append( + ("bibtex", citation[len("bibtex"):].strip())) + return citation_tuples + + +class ScriptRunner: + """Wrapper for an arbitrary script + uses galaxyxml + + """ + + + def __init__(self, args=None): + """ + prepare command line cl for running the tool here + and prepare elements needed for galaxyxml tool generation + """ + + self.infiles = [x.split(ourdelim) for x in args.input_files] + self.outfiles = [x.split(ourdelim) for x in args.output_files] + self.addpar = [x.split(ourdelim) for x in args.additional_parameters] + self.args = args + self.cleanuppar() + self.lastclredirect = None + self.lastxclredirect = None + self.cl = [] + self.xmlcl = [] + aCL = self.cl.append + assert args.parampass in ['0','argparse','positional'],'Parameter passing in args.parampass must be "0","positional" or "argparse"' + self.tool_name = re.sub('[^a-zA-Z0-9_]+', '', args.tool_name) + self.tool_id = self.tool_name + self.xmlfile = '%s.xml' % self.tool_name + if self.args.runmode == "Executable" or self.args.runmode == "system": # binary - no need + aCL(self.args.exe_package) # this little CL will just run + else: + rx = open(self.args.script_path, 'r').readlines() + rx = [x.rstrip() for x in rx ] + rxcheck = [x.strip() for x in rx if x.strip() > ''] + assert len(rxcheck) > 0,"Supplied script is empty. Cannot run" + self.script = '\n'.join(rx) + fhandle, self.sfile = tempfile.mkstemp( + prefix=self.tool_name, suffix=".%s" % (args.interpreter_name)) + tscript = open(self.sfile, 'w') + tscript.write(self.script) + tscript.close() + self.indentedScript = " %s" % '\n'.join( + [' %s' % html_escape(x) for x in rx]) + self.escapedScript = "%s" % '\n'.join( + [' %s' % html_escape(x) for x in rx]) + art = '%s.%s' % (self.tool_name, args.interpreter_name) + artifact = open(art, 'wb') + artifact.write(bytes(self.script, "utf8")) + artifact.close() + aCL(self.args.interpreter_name) + aCL(self.sfile) + self.elog = "%s_error_log.txt" % self.tool_name + self.tlog = "%s_runner_log.txt" % self.tool_name + + if self.args.parampass == '0': + self.clsimple() + else: + clsuffix = [] + xclsuffix = [] + for i, p in enumerate(self.infiles): + appendme = [p[IOCLPOS], p[ICLPOS], p[IPATHPOS]] + clsuffix.append(appendme) + xclsuffix.append([p[IOCLPOS],p[ICLPOS],'$%s' % p[ICLPOS]]) + #print('##infile i=%d, appendme=%s' % (i,appendme)) + for i, p in enumerate(self.outfiles): + if p[OOCLPOS] == "STDOUT": + self.lastclredirect = ['>',p[ONAMEPOS]] + self.lastxclredirect = ['>','$%s' % p[OCLPOS]] + #print('##outfiles i=%d lastclredirect = %s' % (i,self.lastclredirect)) + else: + appendme = [p[OOCLPOS], p[OCLPOS],p[ONAMEPOS]] + clsuffix.append(appendme) + xclsuffix.append([p[OOCLPOS], p[OCLPOS],'$%s' % p[ONAMEPOS]]) + #print('##outfiles i=%d' % i,'appendme',appendme) + for p in self.addpar: + appendme = [p[AOCLPOS], p[ACLPOS], p[AVALPOS]] + clsuffix.append(appendme) + xclsuffix.append([p[AOCLPOS], p[ACLPOS], '"$%s"' % p[ANAMEPOS]]) + #print('##adpar %d' % i,'appendme=',appendme) + clsuffix.sort() + xclsuffix.sort() + self.xclsuffix = xclsuffix + self.clsuffix = clsuffix + if self.args.parampass == 'positional': + self.clpositional() + else: + self.clargparse() + + def cleanuppar(self): + """ positional parameters are complicated by their numeric ordinal""" + for i,p in enumerate(self.infiles): + if self.args.parampass == 'positional': + assert p[ICLPOS].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % (p[ICLPOS],p[ILABPOS]) + p.append(p[ICLPOS]) + if p[ICLPOS].isdigit() or self.args.parampass == "0": + scl = 'input%d' % (i+1) + p[ICLPOS] = scl + self.infiles[i] = p + for i,p in enumerate(self.outfiles): # trying to automagically gather using extensions + if self.args.parampass == 'positional' and p[OCLPOS] != "STDOUT": + assert p[OCLPOS].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % (p[OCLPOS],p[ONAMEPOS]) + p.append(p[OCLPOS]) + if p[OCLPOS].isdigit() or p[OCLPOS] == "STDOUT": + scl = p[ONAMEPOS] + p[OCLPOS] = scl + self.outfiles[i] = p + for i,p in enumerate(self.addpar): + if self.args.parampass == 'positional': + assert p[ACLPOS].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % (p[ACLPOS],p[ANAMEPOS]) + p.append(p[ACLPOS]) + if p[ACLPOS].isdigit(): + scl = 'input%s' % p[ACLPOS] + p[ACLPOS] = scl + self.addpar[i] = p + + + + def clsimple(self): + """ no parameters - uses < and > for i/o + """ + aCL = self.cl.append + aCL('<') + aCL(self.infiles[0][IPATHPOS]) + aCL('>') + aCL(self.outfiles[0][OCLPOS]) + aXCL = self.xmlcl.append + aXCL('<') + aXCL('$%s' % self.infiles[0][ICLPOS]) + aXCL('>') + aXCL('$%s' % self.outfiles[0][ONAMEPOS]) + + + def clpositional(self): + # inputs in order then params + aCL = self.cl.append + for (o_v,k, v) in self.clsuffix: + if " " in v: + aCL("%s" % v) + else: + aCL(v) + aXCL = self.xmlcl.append + for (o_v,k, v) in self.xclsuffix: + aXCL(v) + if self.lastxclredirect: + aXCL(self.lastxclredirect[0]) + aXCL(self.lastxclredirect[1]) + + + + def clargparse(self): + """ argparse style + """ + aCL = self.cl.append + aXCL = self.xmlcl.append + # inputs then params in argparse named form + for (o_v,k, v) in self.xclsuffix: + aXCL(k) + aXCL(v) + for (o_v,k, v) in self.clsuffix: + if len(k.strip()) == 1: + k = '-%s' % k + else: + k = '--%s' % k + aCL(k) + aCL(v) + + + + + def makeXML(self): + """ + Create a Galaxy xml tool wrapper for the new script + Uses galaxyhtml + Hmmm. How to get the command line into correct order... + """ + + if self.args.interpreter_name: + exe = "$runMe" + interp = self.args.interpreter_name + else: + interp = None + exe = self.args.exe_package + assert exe is not None, 'No interpeter or executable passed in to makeXML' + tool = gxt.Tool(self.args.tool_name, self.tool_id, + self.args.tool_version, self.args.tool_desc, exe) + tool.command_line_override = self.xmlcl + print('#### tool cl override=',self.xmlcl) + if interp: + tool.interpreter = interp + if self.args.help_text: + helptext = open(self.args.help_text, 'r').readlines() + helptext = [html_escape(x) for x in helptext] + tool.help = ''.join([x for x in helptext]) + else: + tool.help = 'Please ask the tool author (%s) for help \ + as none was supplied at tool generation\n' % (self.args.user_email) + tool.version_command = None # do not want + tinputs = gxtp.Inputs() + toutputs = gxtp.Outputs() + requirements = gxtp.Requirements() + testparam = [] + is_positional = (self.args.parampass == 'positional') + if self.args.interpreter_name: + if self.args.interpreter_name == 'python': + requirements.append(gxtp.Requirement( + 'package', 'python', self.args.interpreter_version)) + elif self.args.interpreter_name not in ['bash', 'sh']: + requirements.append(gxtp.Requirement( + 'package', self.args.interpreter_name, self.args.interpreter_version)) + else: + if self.args.exe_package and self.args.parampass != "system": + requirements.append(gxtp.Requirement( + 'package', self.args.exe_package, self.args.exe_package_version)) + tool.requirements = requirements + if self.args.parampass == '0': + alab = self.infiles[0][ILABPOS] + if len(alab) == 0: + alab = self.infiles[0][ICLPOS] + max1s = 'Maximum one input if parampass is 0 - more than one input files supplied - %s' % str(self.infiles) + assert len(self.infiles) == 1,max1s + newname = self.infiles[0][ICLPOS] + aninput = gxtp.DataParam(newname, optional=False, label=alab, help=self.infiles[0][IHELPOS], + format=self.infiles[0][IFMTPOS], multiple=False, num_dashes=0) + aninput.command_line_override = '< $%s' % newname + aninput.positional = is_positional + tinputs.append(aninput) + tp = gxtp.TestParam(name=newname, value='%s_sample' % newname) + testparam.append(tp) + newname = self.outfiles[0][OCLPOS] + newfmt = self.outfiles[0][OFMTPOS] + anout = gxtp.OutputData(newname, format=newfmt, num_dashes=0) + anout.command_line_override = '> $%s' % newname + anout.positional = is_positional + toutputs.append(anout) + tp = gxtp.TestOutput(name=newname, value='%s_sample' % newname,format=newfmt) + testparam.append(tp) + else: + for p in self.outfiles: + newname,newfmt,newcl,oldcl = p + if is_positional: + ndash = 0 + else: + ndash = 2 + if len(newcl) < 2: + ndash = 1 + aparm = gxtp.OutputData(newcl, format=newfmt, num_dashes=ndash) + aparm.positional = is_positional + if is_positional: + if oldcl == "STDOUT": + aparm.positional = 9999999 + aparm.command_line_override = "> $%s" % newcl + else: + aparm.positional = int(oldcl) + aparm.command_line_override = '$%s' % newcl + toutputs.append(aparm) + tp = gxtp.TestOutput(name=newcl, value='%s_sample' % newcl ,format=newfmt) + testparam.append(tp) + for p in self.infiles: + newname = p[ICLPOS] + newfmt = p[IFMTPOS] + if is_positional: + ndash = 0 + else: + if len(newname) > 1: + ndash = 2 + else: + ndash = 1 + if not len(p[ILABPOS]) > 0: + alab = p[ICLPOS] + else: + alab = p[ILABPOS] + aninput = gxtp.DataParam(newname, optional=False, label=alab, help=p[IHELPOS], + format=newfmt, multiple=False, num_dashes=ndash) + aninput.positional = is_positional + if is_positional: + aninput.positional = is_positional + tinputs.append(aninput) + tparm = gxtp.TestParam(name=newname, value='%s_sample' % newname ) + testparam.append(tparm) + for p in self.addpar: + newname, newval, newlabel, newhelp, newtype, newcl, oldcl = p + if not len(newlabel) > 0: + newlabel = newname + if is_positional: + ndash = 0 + else: + if len(newname) > 1: + ndash = 2 + else: + ndash = 1 + if newtype == "text": + aparm = gxtp.TextParam( + newname, label=newlabel, help=newhelp, value=newval, num_dashes=ndash) + elif newtype == "integer": + aparm = gxtp.IntegerParam( + newname, label=newname, help=newhelp, value=newval, num_dashes=ndash) + elif newtype == "float": + aparm = gxtp.FloatParam( + newname, label=newname, help=newhelp, value=newval, num_dashes=ndash) + else: + raise ValueError('Unrecognised parameter type "%s" for\ + additional parameter %s in makeXML' % (newtype, newname)) + aparm.positional = is_positional + if is_positional: + aninput.positional = int(oldcl) + tinputs.append(aparm) + tparm = gxtp.TestParam(newname, value=newval) + testparam.append(tparm) + tool.outputs = toutputs + tool.inputs = tinputs + if not self.args.runmode in ['Executable','system']: + configfiles = gxtp.Configfiles() + configfiles.append(gxtp.Configfile(name="runMe", text=self.script)) + tool.configfiles = configfiles + tests = gxtp.Tests() + test_a = gxtp.Test() + for tp in testparam: + test_a.append(tp) + tests.append(test_a) + tool.tests = tests + tool.add_comment('Created by %s at %s using the Galaxy Tool Factory.' % ( + self.args.user_email, timenow())) + tool.add_comment('Source in git at: %s' % (toolFactoryURL)) + tool.add_comment( + 'Cite: Creating re-usable tools from scripts doi: 10.1093/bioinformatics/bts573') + exml = tool.export() + xf = open(self.xmlfile, 'w') + xf.write(exml) + xf.write('\n') + xf.close() + # ready for the tarball + + def makeTooltar(self): + """ + a tool is a gz tarball with eg + /toolname/tool.xml /toolname/tool.py /toolname/test-data/test1_in.foo ... + NOTE names for test inputs and outputs are munged here so must + correspond to actual input and output names used on the generated cl + """ + retval = self.run() + if retval: + sys.stderr.write( + '## Run failed. Cannot build yet. Please fix and retry') + sys.exit(1) + tdir = 'tfout' + if not os.path.exists(tdir): + os.mkdir(tdir) + self.makeXML() + testdir = os.path.join(tdir,'test-data') + if not os.path.exists(testdir): + os.mkdir(testdir) # make tests directory + for p in self.infiles: + pth = p[IPATHPOS] + dest = os.path.join(testdir, '%s_sample' % + p[ICLPOS]) + shutil.copyfile(pth, dest) + for p in self.outfiles: + pth = p[OCLPOS] + if p[OOCLPOS] == 'STDOUT' or self.args.parampass == "0": + pth = p[ONAMEPOS] + dest = os.path.join(testdir,'%s_sample' % p[ONAMEPOS]) + shutil.copyfile(pth, dest) + dest = os.path.join(tdir, p[ONAMEPOS]) + shutil.copyfile(pth, dest) + else: + pth = p[OCLPOS] + dest = os.path.join(testdir,'%s_sample' % p[OCLPOS]) + shutil.copyfile(pth, dest) + dest = os.path.join(tdir, p[OCLPOS]) + shutil.copyfile(pth, dest) + + if os.path.exists(self.tlog) and os.stat(self.tlog).st_size > 0: + shutil.copyfile(self.tlog, os.path.join( + testdir, 'test1_log.txt')) + if not self.args.runmode in ['Executable','system']: + stname = os.path.join(tdir, '%s' % (self.sfile)) + if not os.path.exists(stname): + shutil.copyfile(self.sfile, stname) + xtname = os.path.join(tdir,self.xmlfile) + if not os.path.exists(xtname): + shutil.copyfile(self.xmlfile, xtname) + tarpath = 'toolfactory_%s.tgz' % self.tool_name + tf = tarfile.open(tarpath,"w:gz") + tf.add(name=tdir,arcname=self.tool_name) + tf.close() + shutil.copyfile(tarpath, self.args.new_tool) + return retval + + def run(self): + """ + Some devteam tools have this defensive stderr read so I'm keeping with the faith + Feel free to update. + """ + s = 'run cl=%s' % str(self.cl) + + logging.debug(s) + scl = ' '.join(self.cl) + err = None + if self.args.parampass != '0': + ste = open(self.elog, 'wb') + if self.lastclredirect: + sto = open(self.lastclredirect[1],'wb') # is name of an output file + else: + sto = open(self.tlog, 'wb') + sto.write( + bytes('## Executing Toolfactory generated command line = %s\n' % scl, "utf8")) + sto.flush() + p = subprocess.run(self.cl, shell=False, stdout=sto, + stderr=ste) + sto.close() + ste.close() + tmp_stderr = open(self.elog, 'rb') + err = '' + buffsize = 1048576 + try: + while True: + err += str(tmp_stderr.read(buffsize)) + if not err or len(err) % buffsize != 0: + break + except OverflowError: + pass + tmp_stderr.close() + retval = p.returncode + else: # work around special case of simple scripts that take stdin and write to stdout + sti = open(self.infiles[0][IPATHPOS], 'rb') + sto = open(self.outfiles[0][ONAMEPOS], 'wb') + # must use shell to redirect + p = subprocess.run(self.cl, shell=False, stdout=sto, stdin=sti) + retval = p.returncode + sto.close() + sti.close() + if os.path.isfile(self.tlog) and os.stat(self.tlog).st_size == 0: + os.unlink(self.tlog) + if os.path.isfile(self.elog) and os.stat(self.elog).st_size == 0: + os.unlink(self.elog) + if p.returncode != 0 and err: # problem + sys.stderr.write(err) + logging.debug('run done') + return retval + + +def main(): + """ + This is a Galaxy wrapper. It expects to be called by a special purpose tool.xml as: + <command interpreter="python">rgBaseScriptWrapper.py --script_path "$scriptPath" --tool_name "foo" --interpreter "Rscript" + </command> + """ + parser = argparse.ArgumentParser() + a = parser.add_argument + a('--script_path', default='') + a('--tool_name', default=None) + a('--interpreter_name', default=None) + a('--interpreter_version', default=None) + a('--exe_package', default=None) + a('--exe_package_version', default=None) + a('--input_files', default=[], action="append") + a('--output_files', default=[], action="append") + a('--user_email', default='Unknown') + a('--bad_user', default=None) + a('--make_Tool', default=None) + a('--help_text', default=None) + a('--tool_desc', default=None) + a('--tool_version', default=None) + a('--citations', default=None) + a('--additional_parameters', action='append', default=[]) + a('--edit_additional_parameters', action="store_true", default=False) + a('--parampass', default="positional") + a('--tfout', default="./tfout") + a('--new_tool',default="new_tool") + a('--runmode',default=None) + args = parser.parse_args() + assert not args.bad_user, 'UNAUTHORISED: %s is NOT authorized to use this tool until Galaxy admin adds %s to "admin_users" in the Galaxy configuration file' % ( + args.bad_user, args.bad_user) + assert args.tool_name, '## Tool Factory expects a tool name - eg --tool_name=DESeq' + assert (args.interpreter_name or args.exe_package), '## Tool Factory wrapper expects an interpreter - eg --interpreter_name=Rscript or an executable package findable by the dependency management package' + assert args.exe_package or (len(args.script_path) > 0 and os.path.isfile( + args.script_path)), '## Tool Factory wrapper expects a script path - eg --script_path=foo.R if no executable' + args.input_files = [x.replace('"', '').replace("'", '') + for x in args.input_files] + # remove quotes we need to deal with spaces in CL params + for i, x in enumerate(args.additional_parameters): + args.additional_parameters[i] = args.additional_parameters[i].replace( + '"', '') + r = ScriptRunner(args) + if args.make_Tool: + retcode = r.makeTooltar() + else: + retcode = r.run() + if retcode: + sys.exit(retcode) # indicate failure to job runner + + +if __name__ == "__main__": + main()