view tools/effectiveT3/effectiveT3.py @ 10:a46d7861c32c draft

"Update all the pico_galaxy tools on main Tool Shed"
author peterjc
date Fri, 16 Apr 2021 22:34:56 +0000
parents 512530020360
children ed8c1babc166
line wrap: on
line source

#!/usr/bin/env python
"""Wrapper for EffectiveT3 v1.0.1 for use in Galaxy.

This script takes exactly five command line arguments:
 * model name (e.g. TTSS_STD-1.0.1.jar)
 * threshold (selective or sensitive)
 * an input protein FASTA filename
 * output tabular filename

It then calls the standalone Effective T3 v1.0.1 program (not the
webservice), and reformats the semi-colon separated output into
tab separated output for use in Galaxy.
"""
import os

# We want to be able to use shutil.which, but need Python 3.3+
# import shutil
import subprocess
import sys

# The Galaxy auto-install via tool_dependencies.xml will set the
# environment variable $EFFECTIVET3 pointing at the folder with
# the JAR file.
#
# The BioConda recipe will put a wrapper script on the $PATH,
# which we can use to find the JAR file.
#
# We fall back on /opt/EffectiveT3/
#
effective_t3_jarname = "TTSS_GUI-1.0.1.jar"

if "-v" in sys.argv or "--version" in sys.argv:
    # TODO - Get version of the JAR file dynamically?
    print("Wrapper v0.0.20, for %s" % effective_t3_jarname)
    sys.exit(0)

if len(sys.argv) != 5:
    sys.exit(
        "Require four arguments: model, threshold, input protein "
        "FASTA file & output tabular file"
    )

model, threshold, fasta_file, tabular_file = sys.argv[1:]

if not os.path.isfile(fasta_file):
    sys.exit("Input FASTA file not found: %s" % fasta_file)

if threshold not in ["selective", "sensitive"] and not threshold.startswith("cutoff="):
    sys.exit(
        "Threshold should be selective, sensitive, or cutoff=..., not %r" % threshold
    )


def clean_tabular(raw_handle, out_handle):
    """Clean up Effective T3 output to make it tabular."""
    count = 0
    positive = 0
    errors = 0
    for line in raw_handle:
        if (
            not line
            or line.startswith("#")
            or line.startswith("Id; Description; Score;")
        ):
            continue
        assert line.count(";") >= 3, repr(line)
        # Normally there will just be three semi-colons, however the
        # original FASTA file's ID or description might have had
        # semi-colons in it as well, hence the following hackery:
        try:
            id_descr, score, effective = line.rstrip("\r\n").rsplit(";", 2)
            # Cope when there was no FASTA description
            if "; " not in id_descr and id_descr.endswith(";"):
                id = id_descr[:-1]
                descr = ""
            else:
                id, descr = id_descr.split("; ", 1)
        except ValueError:
            sys.exit("Problem parsing line:\n%s\n" % line)
        parts = [s.strip() for s in [id, descr, score, effective]]
        out_handle.write("\t".join(parts) + "\n")
        count += 1
        if float(score) < 0:
            errors += 1
        if effective.lower() == "true":
            positive += 1
    return count, positive, errors


def run(cmd):
    """Run the command line string via subprocess."""
    # Avoid using shell=True when we call subprocess to ensure if the Python
    # script is killed, so too is the child process.
    try:
        child = subprocess.Popen(
            cmd, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
        )
    except Exception as err:
        sys.exit("Error invoking command:\n%s\n\n%s\n" % (" ".join(cmd), err))
    # Use .communicate as can get deadlocks with .wait(),
    stdout, stderr = child.communicate()
    return_code = child.returncode
    if return_code or stderr.startswith("Exception in thread"):
        cmd_str = " ".join(cmd)  # doesn't quote spaces etc
        if stderr and stdout:
            sys.exit(
                "Return code %i from command:\n%s\n\n%s\n\n%s"
                % (return_code, cmd_str, stdout, stderr)
            )
        else:
            sys.exit(
                "Return code %i from command:\n%s\n%s" % (return_code, cmd_str, stderr)
            )


try:
    from shutil import which
except ImportError:
    # Likely running on Python 2, use backport:
    def which(cmd, mode=os.F_OK | os.X_OK, path=None):
        """Python implementation of command line tool which.

        Given a command, mode, and a PATH string, return the path which
        conforms to the given mode on the PATH, or None if there is no such
        file.

        `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result
        of os.environ.get("PATH"), or can be overridden with a custom search
        path.
        """
        # Check that a given file can be accessed with the correct mode.
        # Additionally check that `file` is not a directory, as on Windows
        # directories pass the os.access check.
        def _access_check(fn, mode):
            return os.path.exists(fn) and os.access(fn, mode) and not os.path.isdir(fn)

        # Short circuit. If we're given a full path which matches the mode
        # and it exists, we're done here.
        if _access_check(cmd, mode):
            return cmd

        path = (path or os.environ.get("PATH", os.defpath)).split(os.pathsep)

        if sys.platform == "win32":
            # The current directory takes precedence on Windows.
            if os.curdir not in path:
                path.insert(0, os.curdir)

            # PATHEXT is necessary to check on Windows.
            pathext = os.environ.get("PATHEXT", "").split(os.pathsep)
            # See if the given file matches any of the expected path extensions.
            # This will allow us to short circuit when given "python.exe".
            matches = [cmd for ext in pathext if cmd.lower().endswith(ext.lower())]
            # If it does match, only test that one, otherwise we have to try
            # others.
            files = [cmd] if matches else [cmd + ext.lower() for ext in pathext]
        else:
            # On other platforms you don't have things like PATHEXT to tell you
            # what file suffixes are executable, so just pass on cmd as-is.
            files = [cmd]

        seen = set()
        for dir in path:
            dir = os.path.normcase(dir)
            if dir not in seen:
                seen.add(dir)
                for thefile in files:
                    name = os.path.join(dir, thefile)
                    if _access_check(name, mode):
                        return name
        return None


# Try in order the following to find the JAR file:
# - Location of any wrapper script, e.g. from BioConda installation
# - The $EFFECTIVET3 env var, e.g. old-style Galaxy tool installation
# - The /opt/EffectiveT3/ folder.
effective_t3_jar = None
effective_t3_dir = None
dirs = ["/opt/EffectiveT3/"]
if "EFFECTIVET3" in os.environ:
    dirs.insert(0, os.environ.get("EFFECTIVET3"))
if which("effectivet3"):
    # Assuming this is a BioConda installed wrapper for effective T3,
    # this will get the directory of the wrapper script which is where
    # the JAR file will be:
    dirs.insert(0, os.path.split(os.path.realpath(which("effectivet3")))[0])
for effective_t3_dir in dirs:
    effective_t3_jar = os.path.join(effective_t3_dir, effective_t3_jarname)
    if os.path.isfile(effective_t3_jar):
        # Good
        break
    effective_t3_jar = None
if not effective_t3_dir or not effective_t3_jar:
    sys.exit("Effective T3 JAR file %r not found in %r" % (effective_t3_jarname, dirs))

if not os.path.isdir(os.path.join(effective_t3_dir, "module")):
    sys.exit(
        "Effective T3 module folder not found: %r"
        % os.path.join(effective_t3_dir, "module")
    )

effective_t3_model = os.path.join(effective_t3_dir, "module", model)
if not os.path.isfile(effective_t3_model):
    sys.stderr.write(
        "Contents of %r is %s\n"
        % (
            os.path.join(effective_t3_dir, "module"),
            ", ".join(
                repr(p) for p in os.listdir(os.path.join(effective_t3_dir, "module"))
            ),
        )
    )
    sys.stderr.write("Main JAR was found: %r\n" % effective_t3_jar)
    sys.exit("Effective T3 model JAR file not found: %r" % effective_t3_model)

# We will have write access wherever the output should be,
if tabular_file == "/dev/stdout":
    temp_file = os.path.abspath("effectivet3_tabular_output.tmp")
else:
    temp_file = os.path.abspath(tabular_file + ".tmp")

# Use absolute paths since will change current directory...
tabular_file = os.path.abspath(tabular_file)
fasta_file = os.path.abspath(fasta_file)

cmd = [
    "java",
    "-jar",
    effective_t3_jar,
    "-f",
    fasta_file,
    "-m",
    model,
    "-t",
    threshold,
    "-o",
    temp_file,
    "-q",
]

try:
    # Must run from directory above the module subfolder:
    os.chdir(effective_t3_dir)
except Exception:
    sys.exit("Could not change to Effective T3 folder: %s" % effective_t3_dir)

run(cmd)

if not os.path.isfile(temp_file):
    sys.exit("ERROR - No output file from Effective T3")

out_handle = open(tabular_file, "w")
out_handle.write("#ID\tDescription\tScore\tEffective\n")
data_handle = open(temp_file)
count, positive, errors = clean_tabular(data_handle, out_handle)
data_handle.close()
out_handle.close()

os.remove(temp_file)

if errors:
    print("%i sequences, %i positive, %i errors" % (count, positive, errors))
else:
    print("%i/%i sequences positive" % (positive, count))

if count and count == errors:
    # Galaxy will still  allow them to see the output file
    sys.exit("All your sequences gave an error code")