Mercurial > repos > fubar > tool_factory_2
diff toolfactory/rgToolFactory2.py @ 36:ce2b1f8ea68d draft
passes flake8 tests finally :)
author | fubar |
---|---|
date | Mon, 10 Aug 2020 23:24:41 -0400 |
parents | 5052ac89c036 |
children | a30536c100bf |
line wrap: on
line diff
--- a/toolfactory/rgToolFactory2.py Sat Aug 08 19:55:55 2020 -0400 +++ b/toolfactory/rgToolFactory2.py Mon Aug 10 23:24:41 2020 -0400 @@ -19,27 +19,30 @@ # TODO: add option to run that code as a post execution hook # TODO: add additional history input parameters - currently only one -import sys + +import argparse +import logging +import os +import re +import shutil import subprocess -import shutil -import os +import sys +import tarfile +import tempfile import time -import tempfile -import argparse -import tarfile -import re + import galaxyxml.tool as gxt import galaxyxml.tool.parameters as gxtp -import logging +import lxml - +foo = lxml.__name__ # fug you, flake8. Say my name! Please accept the PR, Helena! progname = os.path.split(sys.argv[0])[1] -myversion = 'V2.1 July 2020' +myversion = "V2.1 July 2020" verbose = True debug = True -toolFactoryURL = 'https://github.com/fubar2/toolfactory' -ourdelim = '~~~' +toolFactoryURL = "https://github.com/fubar2/toolfactory" +ourdelim = "~~~" # --input_files="$input_files~~~$CL~~~$input_formats~~~$input_label~~~$input_help" IPATHPOS = 0 @@ -54,7 +57,7 @@ OCLPOS = 2 OOCLPOS = 3 -#--additional_parameters="$i.param_name~~~$i.param_value~~~$i.param_label~~~$i.param_help~~~$i.param_type~~~$i.CL" +# --additional_parameters="$i.param_name~~~$i.param_value~~~$i.param_label~~~$i.param_help~~~$i.param_type~~~$i.CL" ANAMEPOS = 0 AVALPOS = 1 ALABPOS = 2 @@ -63,10 +66,11 @@ ACLPOS = 5 AOCLPOS = 6 + def timenow(): """return current time as a string """ - return time.strftime('%d/%m/%Y %H:%M:%S', time.localtime(time.time())) + return time.strftime("%d/%m/%Y %H:%M:%S", time.localtime(time.time())) def quote_non_numeric(s): @@ -80,12 +84,7 @@ return '"%s"' % s -html_escape_table = { - "&": "&", - ">": ">", - "<": "<", - "$": r"\$" -} +html_escape_table = {"&": "&", ">": ">", "<": "<", "$": r"\$"} def html_escape(text): @@ -95,10 +94,10 @@ def html_unescape(text): """Revert entities within text. Multiple character targets so use replace""" - t = text.replace('&', '&') - t = t.replace('>', '>') - t = t.replace('<', '<') - t = t.replace('\\$', '$') + t = text.replace("&", "&") + t = t.replace(">", ">") + t = t.replace("<", "<") + t = t.replace("\\$", "$") return t @@ -111,8 +110,7 @@ if citation.startswith("doi"): citation_tuples.append(("doi", citation[len("doi"):].strip())) else: - citation_tuples.append( - ("bibtex", citation[len("bibtex"):].strip())) + citation_tuples.append(("bibtex", citation[len("bibtex"):].strip())) return citation_tuples @@ -121,8 +119,7 @@ uses galaxyxml """ - - + def __init__(self, args=None): """ prepare command line cl for running the tool here @@ -138,38 +135,43 @@ self.lastxclredirect = None self.cl = [] self.xmlcl = [] + self.is_positional = self.args.parampass == "positional" aCL = self.cl.append - assert args.parampass in ['0','argparse','positional'],'Parameter passing in args.parampass must be "0","positional" or "argparse"' - self.tool_name = re.sub('[^a-zA-Z0-9_]+', '', args.tool_name) + assert args.parampass in [ + "0", + "argparse", + "positional", + ], 'Parameter passing in args.parampass must be "0","positional" or "argparse"' + self.tool_name = re.sub("[^a-zA-Z0-9_]+", "", args.tool_name) self.tool_id = self.tool_name - self.xmlfile = '%s.xml' % self.tool_name - if self.args.runmode == "Executable" or self.args.runmode == "system": # binary - no need + self.xmlfile = "%s.xml" % self.tool_name + if self.args.interpreter_name: + exe = "$runMe" + else: + exe = self.args.exe_package + assert ( + exe is not None + ), "No interpeter or executable passed in - nothing to run so cannot build" + self.tool = gxt.Tool( + self.args.tool_name, + self.tool_id, + self.args.tool_version, + self.args.tool_desc, + exe, + ) + self.tinputs = gxtp.Inputs() + self.toutputs = gxtp.Outputs() + self.testparam = [] + if ( + self.args.runmode == "Executable" or self.args.runmode == "system" + ): # binary - no need aCL(self.args.exe_package) # this little CL will just run - else: - rx = open(self.args.script_path, 'r').readlines() - rx = [x.rstrip() for x in rx ] - rxcheck = [x.strip() for x in rx if x.strip() > ''] - assert len(rxcheck) > 0,"Supplied script is empty. Cannot run" - self.script = '\n'.join(rx) - fhandle, self.sfile = tempfile.mkstemp( - prefix=self.tool_name, suffix=".%s" % (args.interpreter_name)) - tscript = open(self.sfile, 'w') - tscript.write(self.script) - tscript.close() - self.indentedScript = " %s" % '\n'.join( - [' %s' % html_escape(x) for x in rx]) - self.escapedScript = "%s" % '\n'.join( - [' %s' % html_escape(x) for x in rx]) - art = '%s.%s' % (self.tool_name, args.interpreter_name) - artifact = open(art, 'wb') - artifact.write(bytes(self.script, "utf8")) - artifact.close() - aCL(self.args.interpreter_name) - aCL(self.sfile) + else: + self.prepScript() self.elog = "%s_error_log.txt" % self.tool_name self.tlog = "%s_runner_log.txt" % self.tool_name - if self.args.parampass == '0': + if self.args.parampass == "0": self.clsimple() else: clsuffix = [] @@ -177,92 +179,120 @@ for i, p in enumerate(self.infiles): appendme = [p[IOCLPOS], p[ICLPOS], p[IPATHPOS]] clsuffix.append(appendme) - xclsuffix.append([p[IOCLPOS],p[ICLPOS],'$%s' % p[ICLPOS]]) - #print('##infile i=%d, appendme=%s' % (i,appendme)) + xclsuffix.append([p[IOCLPOS], p[ICLPOS], "$%s" % p[ICLPOS]]) + # print('##infile i=%d, appendme=%s' % (i,appendme)) for i, p in enumerate(self.outfiles): if p[OOCLPOS] == "STDOUT": - self.lastclredirect = ['>',p[ONAMEPOS]] - self.lastxclredirect = ['>','$%s' % p[OCLPOS]] - #print('##outfiles i=%d lastclredirect = %s' % (i,self.lastclredirect)) + self.lastclredirect = [">", p[ONAMEPOS]] + self.lastxclredirect = [">", "$%s" % p[OCLPOS]] + # print('##outfiles i=%d lastclredirect = %s' % (i,self.lastclredirect)) else: - appendme = [p[OOCLPOS], p[OCLPOS],p[ONAMEPOS]] - clsuffix.append(appendme) - xclsuffix.append([p[OOCLPOS], p[OCLPOS],'$%s' % p[ONAMEPOS]]) - #print('##outfiles i=%d' % i,'appendme',appendme) - for p in self.addpar: + appendme = [p[OOCLPOS], p[OCLPOS], p[ONAMEPOS]] + clsuffix.append(appendme) + xclsuffix.append([p[OOCLPOS], p[OCLPOS], "$%s" % p[ONAMEPOS]]) + # print('##outfiles i=%d' % i,'appendme',appendme) + for p in self.addpar: appendme = [p[AOCLPOS], p[ACLPOS], p[AVALPOS]] clsuffix.append(appendme) xclsuffix.append([p[AOCLPOS], p[ACLPOS], '"$%s"' % p[ANAMEPOS]]) - #print('##adpar %d' % i,'appendme=',appendme) + # print('##adpar %d' % i,'appendme=',appendme) clsuffix.sort() xclsuffix.sort() self.xclsuffix = xclsuffix self.clsuffix = clsuffix - if self.args.parampass == 'positional': + if self.args.parampass == "positional": self.clpositional() else: self.clargparse() - + + def prepScript(self): + aCL = self.cl.append + rx = open(self.args.script_path, "r").readlines() + rx = [x.rstrip() for x in rx] + rxcheck = [x.strip() for x in rx if x.strip() > ""] + assert len(rxcheck) > 0, "Supplied script is empty. Cannot run" + self.script = "\n".join(rx) + fhandle, self.sfile = tempfile.mkstemp( + prefix=self.tool_name, suffix=".%s" % (self.args.interpreter_name) + ) + tscript = open(self.sfile, "w") + tscript.write(self.script) + tscript.close() + self.indentedScript = " %s" % "\n".join([" %s" % html_escape(x) for x in rx]) + self.escapedScript = "%s" % "\n".join([" %s" % html_escape(x) for x in rx]) + art = "%s.%s" % (self.tool_name, self.args.interpreter_name) + artifact = open(art, "wb") + artifact.write(bytes(self.script, "utf8")) + artifact.close() + aCL(self.args.interpreter_name) + aCL(self.sfile) + def cleanuppar(self): """ positional parameters are complicated by their numeric ordinal""" - for i,p in enumerate(self.infiles): - if self.args.parampass == 'positional': - assert p[ICLPOS].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % (p[ICLPOS],p[ILABPOS]) + for i, p in enumerate(self.infiles): + if self.args.parampass == "positional": + assert p[ICLPOS].isdigit(), ( + "Positional parameters must be ordinal integers - got %s for %s" + % (p[ICLPOS], p[ILABPOS]) + ) p.append(p[ICLPOS]) if p[ICLPOS].isdigit() or self.args.parampass == "0": - scl = 'input%d' % (i+1) + scl = "input%d" % (i + 1) p[ICLPOS] = scl self.infiles[i] = p - for i,p in enumerate(self.outfiles): # trying to automagically gather using extensions - if self.args.parampass == 'positional' and p[OCLPOS] != "STDOUT": - assert p[OCLPOS].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % (p[OCLPOS],p[ONAMEPOS]) + for i, p in enumerate( + self.outfiles + ): # trying to automagically gather using extensions + if self.args.parampass == "positional" and p[OCLPOS] != "STDOUT": + assert p[OCLPOS].isdigit(), ( + "Positional parameters must be ordinal integers - got %s for %s" + % (p[OCLPOS], p[ONAMEPOS]) + ) p.append(p[OCLPOS]) if p[OCLPOS].isdigit() or p[OCLPOS] == "STDOUT": scl = p[ONAMEPOS] p[OCLPOS] = scl self.outfiles[i] = p - for i,p in enumerate(self.addpar): - if self.args.parampass == 'positional': - assert p[ACLPOS].isdigit(), "Positional parameters must be ordinal integers - got %s for %s" % (p[ACLPOS],p[ANAMEPOS]) + for i, p in enumerate(self.addpar): + if self.args.parampass == "positional": + assert p[ACLPOS].isdigit(), ( + "Positional parameters must be ordinal integers - got %s for %s" + % (p[ACLPOS], p[ANAMEPOS]) + ) p.append(p[ACLPOS]) if p[ACLPOS].isdigit(): - scl = 'input%s' % p[ACLPOS] + scl = "input%s" % p[ACLPOS] p[ACLPOS] = scl self.addpar[i] = p - - - + def clsimple(self): """ no parameters - uses < and > for i/o """ aCL = self.cl.append - aCL('<') + aCL("<") aCL(self.infiles[0][IPATHPOS]) - aCL('>') + aCL(">") aCL(self.outfiles[0][OCLPOS]) aXCL = self.xmlcl.append - aXCL('<') - aXCL('$%s' % self.infiles[0][ICLPOS]) - aXCL('>') - aXCL('$%s' % self.outfiles[0][ONAMEPOS]) - + aXCL("<") + aXCL("$%s" % self.infiles[0][ICLPOS]) + aXCL(">") + aXCL("$%s" % self.outfiles[0][ONAMEPOS]) def clpositional(self): # inputs in order then params aCL = self.cl.append - for (o_v,k, v) in self.clsuffix: + for (o_v, k, v) in self.clsuffix: if " " in v: aCL("%s" % v) else: aCL(v) aXCL = self.xmlcl.append - for (o_v,k, v) in self.xclsuffix: + for (o_v, k, v) in self.xclsuffix: aXCL(v) if self.lastxclredirect: aXCL(self.lastxclredirect[0]) aXCL(self.lastxclredirect[1]) - - def clargparse(self): """ argparse style @@ -270,23 +300,142 @@ aCL = self.cl.append aXCL = self.xmlcl.append # inputs then params in argparse named form - for (o_v,k, v) in self.xclsuffix: + for (o_v, k, v) in self.xclsuffix: if len(k.strip()) == 1: - k = '-%s' % k + k = "-%s" % k else: - k = '--%s' % k + k = "--%s" % k aXCL(k) aXCL(v) - for (o_v,k, v) in self.clsuffix: + for (o_v, k, v) in self.clsuffix: if len(k.strip()) == 1: - k = '-%s' % k + k = "-%s" % k else: - k = '--%s' % k + k = "--%s" % k aCL(k) aCL(v) - + def getNdash(self, newname): + if self.is_positional: + ndash = 0 + else: + ndash = 2 + if len(newname) < 2: + ndash = 1 + return ndash + def doXMLparam(self): + """flake8 made me do this...""" + for p in self.outfiles: + newname, newfmt, newcl, oldcl = p + ndash = self.getNdash(newcl) + aparm = gxtp.OutputData(newcl, format=newfmt, num_dashes=ndash) + aparm.positional = self.is_positional + if self.is_positional: + if oldcl == "STDOUT": + aparm.positional = 9999999 + aparm.command_line_override = "> $%s" % newcl + else: + aparm.positional = int(oldcl) + aparm.command_line_override = "$%s" % newcl + self.toutputs.append(aparm) + tp = gxtp.TestOutput(name=newcl, value="%s_sample" % newcl, format=newfmt) + self.testparam.append(tp) + for p in self.infiles: + newname = p[ICLPOS] + newfmt = p[IFMTPOS] + ndash = self.getNdash(newname) + if not len(p[ILABPOS]) > 0: + alab = p[ICLPOS] + else: + alab = p[ILABPOS] + aninput = gxtp.DataParam( + newname, + optional=False, + label=alab, + help=p[IHELPOS], + format=newfmt, + multiple=False, + num_dashes=ndash, + ) + aninput.positional = self.is_positional + self.tinputs.append(aninput) + tparm = gxtp.TestParam(name=newname, value="%s_sample" % newname) + self.testparam.append(tparm) + for p in self.addpar: + newname, newval, newlabel, newhelp, newtype, newcl, oldcl = p + if not len(newlabel) > 0: + newlabel = newname + ndash = self.getNdash(newname) + if newtype == "text": + aparm = gxtp.TextParam( + newname, + label=newlabel, + help=newhelp, + value=newval, + num_dashes=ndash, + ) + elif newtype == "integer": + aparm = gxtp.IntegerParam( + newname, + label=newname, + help=newhelp, + value=newval, + num_dashes=ndash, + ) + elif newtype == "float": + aparm = gxtp.FloatParam( + newname, + label=newname, + help=newhelp, + value=newval, + num_dashes=ndash, + ) + else: + raise ValueError( + 'Unrecognised parameter type "%s" for\ + additional parameter %s in makeXML' + % (newtype, newname) + ) + aparm.positional = self.is_positional + if self.is_positional: + aninput.positional = int(oldcl) + self.tinputs.append(aparm) + self.tparm = gxtp.TestParam(newname, value=newval) + self.testparam.append(tparm) + + def doNoXMLparam(self): + alab = self.infiles[0][ILABPOS] + if len(alab) == 0: + alab = self.infiles[0][ICLPOS] + max1s = ( + "Maximum one input if parampass is 0 - more than one input files supplied - %s" + % str(self.infiles) + ) + assert len(self.infiles) == 1, max1s + newname = self.infiles[0][ICLPOS] + aninput = gxtp.DataParam( + newname, + optional=False, + label=alab, + help=self.infiles[0][IHELPOS], + format=self.infiles[0][IFMTPOS], + multiple=False, + num_dashes=0, + ) + aninput.command_line_override = "< $%s" % newname + aninput.positional = self.is_positional + self.tinputs.append(aninput) + tp = gxtp.TestParam(name=newname, value="%s_sample" % newname) + self.testparam.append(tp) + newname = self.outfiles[0][OCLPOS] + newfmt = self.outfiles[0][OFMTPOS] + anout = gxtp.OutputData(newname, format=newfmt, num_dashes=0) + anout.command_line_override = "> $%s" % newname + anout.positional = self.is_positional + self.toutputs.append(anout) + tp = gxtp.TestOutput(name=newname, value="%s_sample" % newname, format=newfmt) + self.testparam.append(tp) def makeXML(self): """ @@ -294,159 +443,71 @@ Uses galaxyhtml Hmmm. How to get the command line into correct order... """ - + self.tool.command_line_override = self.xmlcl if self.args.interpreter_name: - exe = "$runMe" - interp = self.args.interpreter_name - else: - interp = None - exe = self.args.exe_package - assert exe is not None, 'No interpeter or executable passed in to makeXML' - tool = gxt.Tool(self.args.tool_name, self.tool_id, - self.args.tool_version, self.args.tool_desc, exe) - tool.command_line_override = self.xmlcl - if interp: - tool.interpreter = interp + self.tool.interpreter = self.interp if self.args.help_text: - helptext = open(self.args.help_text, 'r').readlines() + helptext = open(self.args.help_text, "r").readlines() helptext = [html_escape(x) for x in helptext] - tool.help = ''.join([x for x in helptext]) + self.tool.help = "".join([x for x in helptext]) else: - tool.help = 'Please ask the tool author (%s) for help \ - as none was supplied at tool generation\n' % (self.args.user_email) - tool.version_command = None # do not want - tinputs = gxtp.Inputs() - toutputs = gxtp.Outputs() + self.tool.help = ( + "Please ask the tool author (%s) for help \ + as none was supplied at tool generation\n" + % (self.args.user_email) + ) + self.tool.version_command = None # do not want requirements = gxtp.Requirements() - testparam = [] - is_positional = (self.args.parampass == 'positional') + if self.args.interpreter_name: - if self.args.interpreter_name == 'python': - requirements.append(gxtp.Requirement( - 'package', 'python', self.args.interpreter_version)) - elif self.args.interpreter_name not in ['bash', 'sh']: - requirements.append(gxtp.Requirement( - 'package', self.args.interpreter_name, self.args.interpreter_version)) + if self.args.interpreter_name == "python": + requirements.append( + gxtp.Requirement("package", "python", self.args.interpreter_version) + ) + elif self.args.interpreter_name not in ["bash", "sh"]: + requirements.append( + gxtp.Requirement( + "package", + self.args.interpreter_name, + self.args.interpreter_version, + ) + ) else: if self.args.exe_package and self.args.parampass != "system": - requirements.append(gxtp.Requirement( - 'package', self.args.exe_package, self.args.exe_package_version)) - tool.requirements = requirements - if self.args.parampass == '0': - alab = self.infiles[0][ILABPOS] - if len(alab) == 0: - alab = self.infiles[0][ICLPOS] - max1s = 'Maximum one input if parampass is 0 - more than one input files supplied - %s' % str(self.infiles) - assert len(self.infiles) == 1,max1s - newname = self.infiles[0][ICLPOS] - aninput = gxtp.DataParam(newname, optional=False, label=alab, help=self.infiles[0][IHELPOS], - format=self.infiles[0][IFMTPOS], multiple=False, num_dashes=0) - aninput.command_line_override = '< $%s' % newname - aninput.positional = is_positional - tinputs.append(aninput) - tp = gxtp.TestParam(name=newname, value='%s_sample' % newname) - testparam.append(tp) - newname = self.outfiles[0][OCLPOS] - newfmt = self.outfiles[0][OFMTPOS] - anout = gxtp.OutputData(newname, format=newfmt, num_dashes=0) - anout.command_line_override = '> $%s' % newname - anout.positional = is_positional - toutputs.append(anout) - tp = gxtp.TestOutput(name=newname, value='%s_sample' % newname,format=newfmt) - testparam.append(tp) + requirements.append( + gxtp.Requirement( + "package", self.args.exe_package, self.args.exe_package_version + ) + ) + self.tool.requirements = requirements + if self.args.parampass == "0": + self.doXMLNoparam() else: - for p in self.outfiles: - newname,newfmt,newcl,oldcl = p - if is_positional: - ndash = 0 - else: - ndash = 2 - if len(newcl) < 2: - ndash = 1 - aparm = gxtp.OutputData(newcl, format=newfmt, num_dashes=ndash) - aparm.positional = is_positional - if is_positional: - if oldcl == "STDOUT": - aparm.positional = 9999999 - aparm.command_line_override = "> $%s" % newcl - else: - aparm.positional = int(oldcl) - aparm.command_line_override = '$%s' % newcl - toutputs.append(aparm) - tp = gxtp.TestOutput(name=newcl, value='%s_sample' % newcl ,format=newfmt) - testparam.append(tp) - for p in self.infiles: - newname = p[ICLPOS] - newfmt = p[IFMTPOS] - if is_positional: - ndash = 0 - else: - if len(newname) > 1: - ndash = 2 - else: - ndash = 1 - if not len(p[ILABPOS]) > 0: - alab = p[ICLPOS] - else: - alab = p[ILABPOS] - aninput = gxtp.DataParam(newname, optional=False, label=alab, help=p[IHELPOS], - format=newfmt, multiple=False, num_dashes=ndash) - aninput.positional = is_positional - if is_positional: - aninput.positional = is_positional - tinputs.append(aninput) - tparm = gxtp.TestParam(name=newname, value='%s_sample' % newname ) - testparam.append(tparm) - for p in self.addpar: - newname, newval, newlabel, newhelp, newtype, newcl, oldcl = p - if not len(newlabel) > 0: - newlabel = newname - if is_positional: - ndash = 0 - else: - if len(newname) > 1: - ndash = 2 - else: - ndash = 1 - if newtype == "text": - aparm = gxtp.TextParam( - newname, label=newlabel, help=newhelp, value=newval, num_dashes=ndash) - elif newtype == "integer": - aparm = gxtp.IntegerParam( - newname, label=newname, help=newhelp, value=newval, num_dashes=ndash) - elif newtype == "float": - aparm = gxtp.FloatParam( - newname, label=newname, help=newhelp, value=newval, num_dashes=ndash) - else: - raise ValueError('Unrecognised parameter type "%s" for\ - additional parameter %s in makeXML' % (newtype, newname)) - aparm.positional = is_positional - if is_positional: - aninput.positional = int(oldcl) - tinputs.append(aparm) - tparm = gxtp.TestParam(newname, value=newval) - testparam.append(tparm) - tool.outputs = toutputs - tool.inputs = tinputs - if not self.args.runmode in ['Executable','system']: + self.doXMLParam() + self.tool.outputs = self.toutputs + self.tool.inputs = self.tinputs + if self.args.runmode not in ["Executable", "system"]: configfiles = gxtp.Configfiles() configfiles.append(gxtp.Configfile(name="runMe", text=self.script)) - tool.configfiles = configfiles + self.tool.configfiles = configfiles tests = gxtp.Tests() test_a = gxtp.Test() - for tp in testparam: + for tp in self.testparam: test_a.append(tp) tests.append(test_a) - tool.tests = tests - tool.add_comment('Created by %s at %s using the Galaxy Tool Factory.' % ( - self.args.user_email, timenow())) - tool.add_comment('Source in git at: %s' % (toolFactoryURL)) - tool.add_comment( - 'Cite: Creating re-usable tools from scripts doi: 10.1093/bioinformatics/bts573') - exml = tool.export() - xf = open(self.xmlfile, 'w') + self.tool.tests = tests + self.tool.add_comment( + "Created by %s at %s using the Galaxy Tool Factory." + % (self.args.user_email, timenow()) + ) + self.tool.add_comment("Source in git at: %s" % (toolFactoryURL)) + self.tool.add_comment( + "Cite: Creating re-usable tools from scripts doi: 10.1093/bioinformatics/bts573" + ) + exml = self.tool.export() + xf = open(self.xmlfile, "w") xf.write(exml) - xf.write('\n') + xf.write("\n") xf.close() # ready for the tarball @@ -459,49 +520,46 @@ """ retval = self.run() if retval: - sys.stderr.write( - '## Run failed. Cannot build yet. Please fix and retry') + sys.stderr.write("## Run failed. Cannot build yet. Please fix and retry") sys.exit(1) - tdir = 'tfout' + tdir = "tfout" if not os.path.exists(tdir): os.mkdir(tdir) self.makeXML() - testdir = os.path.join(tdir,'test-data') + testdir = os.path.join(tdir, "test-data") if not os.path.exists(testdir): os.mkdir(testdir) # make tests directory for p in self.infiles: pth = p[IPATHPOS] - dest = os.path.join(testdir, '%s_sample' % - p[ICLPOS]) + dest = os.path.join(testdir, "%s_sample" % p[ICLPOS]) shutil.copyfile(pth, dest) for p in self.outfiles: pth = p[OCLPOS] - if p[OOCLPOS] == 'STDOUT' or self.args.parampass == "0": + if p[OOCLPOS] == "STDOUT" or self.args.parampass == "0": pth = p[ONAMEPOS] - dest = os.path.join(testdir,'%s_sample' % p[ONAMEPOS]) + dest = os.path.join(testdir, "%s_sample" % p[ONAMEPOS]) shutil.copyfile(pth, dest) dest = os.path.join(tdir, p[ONAMEPOS]) - shutil.copyfile(pth, dest) + shutil.copyfile(pth, dest) else: pth = p[OCLPOS] - dest = os.path.join(testdir,'%s_sample' % p[OCLPOS]) + dest = os.path.join(testdir, "%s_sample" % p[OCLPOS]) shutil.copyfile(pth, dest) dest = os.path.join(tdir, p[OCLPOS]) - shutil.copyfile(pth, dest) + shutil.copyfile(pth, dest) if os.path.exists(self.tlog) and os.stat(self.tlog).st_size > 0: - shutil.copyfile(self.tlog, os.path.join( - testdir, 'test1_log.txt')) - if not self.args.runmode in ['Executable','system']: - stname = os.path.join(tdir, '%s' % (self.sfile)) + shutil.copyfile(self.tlog, os.path.join(testdir, "test1_log.txt")) + if self.args.runmode not in ["Executable", "system"]: + stname = os.path.join(tdir, "%s" % (self.sfile)) if not os.path.exists(stname): shutil.copyfile(self.sfile, stname) - xtname = os.path.join(tdir,self.xmlfile) + xtname = os.path.join(tdir, self.xmlfile) if not os.path.exists(xtname): shutil.copyfile(self.xmlfile, xtname) - tarpath = 'toolfactory_%s.tgz' % self.tool_name - tf = tarfile.open(tarpath,"w:gz") - tf.add(name=tdir,arcname=self.tool_name) + tarpath = "toolfactory_%s.tgz" % self.tool_name + tf = tarfile.open(tarpath, "w:gz") + tf.add(name=tdir, arcname=self.tool_name) tf.close() shutil.copyfile(tarpath, self.args.new_tool) return retval @@ -511,26 +569,29 @@ Some devteam tools have this defensive stderr read so I'm keeping with the faith Feel free to update. """ - s = 'run cl=%s' % str(self.cl) - + s = "run cl=%s" % str(self.cl) + logging.debug(s) - scl = ' '.join(self.cl) + scl = " ".join(self.cl) err = None - if self.args.parampass != '0': - ste = open(self.elog, 'wb') + if self.args.parampass != "0": + ste = open(self.elog, "wb") if self.lastclredirect: - sto = open(self.lastclredirect[1],'wb') # is name of an output file + sto = open(self.lastclredirect[1], "wb") # is name of an output file else: - sto = open(self.tlog, 'wb') + sto = open(self.tlog, "wb") sto.write( - bytes('## Executing Toolfactory generated command line = %s\n' % scl, "utf8")) + bytes( + "## Executing Toolfactory generated command line = %s\n" % scl, + "utf8", + ) + ) sto.flush() - p = subprocess.run(self.cl, shell=False, stdout=sto, - stderr=ste) + p = subprocess.run(self.cl, shell=False, stdout=sto, stderr=ste) sto.close() ste.close() - tmp_stderr = open(self.elog, 'rb') - err = '' + tmp_stderr = open(self.elog, "rb") + err = "" buffsize = 1048576 try: while True: @@ -542,8 +603,8 @@ tmp_stderr.close() retval = p.returncode else: # work around special case of simple scripts that take stdin and write to stdout - sti = open(self.infiles[0][IPATHPOS], 'rb') - sto = open(self.outfiles[0][ONAMEPOS], 'wb') + sti = open(self.infiles[0][IPATHPOS], "rb") + sto = open(self.outfiles[0][ONAMEPOS], "wb") # must use shell to redirect p = subprocess.run(self.cl, shell=False, stdout=sto, stdin=sti) retval = p.returncode @@ -555,7 +616,7 @@ os.unlink(self.elog) if p.returncode != 0 and err: # problem sys.stderr.write(err) - logging.debug('run done') + logging.debug("run done") return retval @@ -567,40 +628,43 @@ """ parser = argparse.ArgumentParser() a = parser.add_argument - a('--script_path', default='') - a('--tool_name', default=None) - a('--interpreter_name', default=None) - a('--interpreter_version', default=None) - a('--exe_package', default=None) - a('--exe_package_version', default=None) - a('--input_files', default=[], action="append") - a('--output_files', default=[], action="append") - a('--user_email', default='Unknown') - a('--bad_user', default=None) - a('--make_Tool', default=None) - a('--help_text', default=None) - a('--tool_desc', default=None) - a('--tool_version', default=None) - a('--citations', default=None) - a('--additional_parameters', action='append', default=[]) - a('--edit_additional_parameters', action="store_true", default=False) - a('--parampass', default="positional") - a('--tfout', default="./tfout") - a('--new_tool',default="new_tool") - a('--runmode',default=None) + a("--script_path", default="") + a("--tool_name", default=None) + a("--interpreter_name", default=None) + a("--interpreter_version", default=None) + a("--exe_package", default=None) + a("--exe_package_version", default=None) + a("--input_files", default=[], action="append") + a("--output_files", default=[], action="append") + a("--user_email", default="Unknown") + a("--bad_user", default=None) + a("--make_Tool", default=None) + a("--help_text", default=None) + a("--tool_desc", default=None) + a("--tool_version", default=None) + a("--citations", default=None) + a("--additional_parameters", action="append", default=[]) + a("--edit_additional_parameters", action="store_true", default=False) + a("--parampass", default="positional") + a("--tfout", default="./tfout") + a("--new_tool", default="new_tool") + a("--runmode", default=None) args = parser.parse_args() - assert not args.bad_user, 'UNAUTHORISED: %s is NOT authorized to use this tool until Galaxy admin adds %s to "admin_users" in the Galaxy configuration file' % ( - args.bad_user, args.bad_user) - assert args.tool_name, '## Tool Factory expects a tool name - eg --tool_name=DESeq' - assert (args.interpreter_name or args.exe_package), '## Tool Factory wrapper expects an interpreter - eg --interpreter_name=Rscript or an executable package findable by the dependency management package' - assert args.exe_package or (len(args.script_path) > 0 and os.path.isfile( - args.script_path)), '## Tool Factory wrapper expects a script path - eg --script_path=foo.R if no executable' - args.input_files = [x.replace('"', '').replace("'", '') - for x in args.input_files] + assert not args.bad_user, ( + 'UNAUTHORISED: %s is NOT authorized to use this tool until Galaxy admin adds %s to "admin_users" in the Galaxy configuration file' + % (args.bad_user, args.bad_user) + ) + assert args.tool_name, "## Tool Factory expects a tool name - eg --tool_name=DESeq" + assert ( + args.interpreter_name or args.exe_package + ), "## Tool Factory wrapper expects an interpreter or an executable package" + assert args.exe_package or ( + len(args.script_path) > 0 and os.path.isfile(args.script_path) + ), "## Tool Factory wrapper expects a script path - eg --script_path=foo.R if no executable" + args.input_files = [x.replace('"', "").replace("'", "") for x in args.input_files] # remove quotes we need to deal with spaces in CL params for i, x in enumerate(args.additional_parameters): - args.additional_parameters[i] = args.additional_parameters[i].replace( - '"', '') + args.additional_parameters[i] = args.additional_parameters[i].replace('"', "") r = ScriptRunner(args) if args.make_Tool: retcode = r.makeTooltar()