view env/lib/python3.7/site-packages/planemo/io.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line source

from __future__ import absolute_import
from __future__ import print_function

import contextlib
import errno
import fnmatch
import os
import shutil
import subprocess
import sys
import tempfile
import time
from sys import platform as _platform
from xml.sax.saxutils import escape

import click
from galaxy.tool_util.deps import commands
from galaxy.tool_util.deps.commands import download_command
from six import (
    string_types,
    StringIO
)

from .exit_codes import (
    EXIT_CODE_NO_SUCH_TARGET,
    EXIT_CODE_OK,
)


IS_OS_X = _platform == "darwin"


def args_to_str(args):
    if args is None or isinstance(args, string_types):
        return args
    else:
        return commands.argv_to_str(args)


def communicate(cmds, **kwds):
    cmd_string = args_to_str(cmds)
    info(cmd_string)
    p = commands.shell_process(cmds, **kwds)
    if kwds.get("stdout", None) is None and commands.redirecting_io(sys=sys):
        output = commands.redirect_aware_commmunicate(p)
    else:
        output = p.communicate()

    if p.returncode != 0:
        template = "Problem executing commands {0} - ({1}, {2})"
        msg = template.format(cmd_string, output[0], output[1])
        raise RuntimeError(msg)
    return output


def shell(cmds, **kwds):
    cmd_string = args_to_str(cmds)
    info(cmd_string)
    return commands.shell(cmds, **kwds)


def info(message, *args):
    if args:
        message = message % args
    click.echo(click.style(message, bold=True, fg='green'))


def can_write_to_path(path, **kwds):
    if not kwds["force"] and os.path.exists(path):
        error("%s already exists, exiting." % path)
        return False
    return True


def error(message, *args):
    if args:
        message = message % args
    click.echo(click.style(message, bold=True, fg='red'), err=True)


def warn(message, *args):
    if args:
        message = message % args
    click.echo(click.style(message, fg='red'), err=True)


def shell_join(*args):
    """Join potentially empty commands together with '&&'."""
    return " && ".join(args_to_str(_) for _ in args if _)


def write_file(path, content, force=True):
    if os.path.exists(path) and not force:
        return

    with open(path, "w") as f:
        f.write(content)


def untar_to(url, tar_args=None, path=None, dest_dir=None):
    if tar_args:
        assert not (path and dest_dir)
        if dest_dir:
            if not os.path.exists(dest_dir):
                os.makedirs(dest_dir)
            tar_args[0:0] = ['-C', dest_dir]
        if path:
            tar_args.insert(0, '-O')

        download_cmd = download_command(url)
        download_p = commands.shell_process(download_cmd, stdout=subprocess.PIPE)
        untar_cmd = ['tar'] + tar_args
        if path:
            with open(path, 'wb') as fh:
                shell(untar_cmd, stdin=download_p.stdout, stdout=fh)
        else:
            shell(untar_cmd, stdin=download_p.stdout)
        download_p.wait()
    else:
        cmd = download_command(url, to=path)
        shell(cmd)


def find_matching_directories(path, pattern, recursive):
    """Find directories below supplied path with file matching pattern.

    Returns an empty list if no matches are found, and if recursive is False
    only the top directory specified by path will be considered.
    """
    dirs = []
    if recursive:
        if not os.path.isdir(path):
            template = "--recursive specified with non-directory path [%s]"
            message = template % (path)
            raise Exception(message)

        for base_path, dirnames, filenames in os.walk(path):
            dirnames.sort()
            for filename in fnmatch.filter(filenames, pattern):
                dirs.append(base_path)
    else:
        if os.path.exists(os.path.join(path, pattern)):
            dirs.append(path)
        elif os.path.basename(path) == pattern:
            dirs.append(os.path.dirname(path))
    return dirs


@contextlib.contextmanager
def real_io():
    """Ensure stdout and stderr have supported ``fileno()`` method.

    nosetests replaces these streams with :class:`StringIO` objects
    that may not work the same in every situtation - :func:`subprocess.Popen`
    calls in particular.
    """
    original_stdout = sys.stdout
    original_stderr = sys.stderr
    try:
        if commands.redirecting_io(sys=sys):
            sys.stdout = sys.__stdout__
            sys.stderr = sys.__stderr__
        yield
    finally:
        sys.stdout = original_stdout
        sys.stderr = original_stderr


@contextlib.contextmanager
def temp_directory(prefix="planemo_tmp_", dir=None, **kwds):
    if dir is not None:
        try:
            os.makedirs(dir)
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise
    temp_dir = tempfile.mkdtemp(prefix=prefix, dir=dir, **kwds)
    try:
        yield temp_dir
    finally:
        shutil.rmtree(temp_dir)


def ps1_for_path(path, base="PS1"):
    """ Used by environment commands to build a PS1 shell
    variables for tool or directory of tools.
    """
    file_name = os.path.basename(path)
    base_name = os.path.splitext(file_name)[0]
    ps1 = "(%s)${%s}" % (base_name, base)
    return ps1


def kill_pid_file(pid_file):
    try:
        os.stat(pid_file)
    except OSError as e:
        if e.errno == errno.ENOENT:
            return False

    with open(pid_file, "r") as fh:
        pid = int(fh.read())
    kill_posix(pid)
    try:
        os.unlink(pid_file)
    except Exception:
        pass


def kill_posix(pid):
    def _check_pid():
        try:
            os.kill(pid, 0)
            return True
        except OSError:
            return False

    if _check_pid():
        for sig in [15, 9]:
            try:
                os.kill(pid, sig)
            except OSError:
                return
            time.sleep(1)
            if not _check_pid():
                return


@contextlib.contextmanager
def conditionally_captured_io(capture, tee=False):
    captured_std = []
    if capture:
        with Capturing() as captured_std:
            yield captured_std
        if tee:
            tee_captured_output(captured_std)
    else:
        yield


@contextlib.contextmanager
def captured_io_for_xunit(kwds, captured_io):
    captured_std = []
    with_xunit = kwds.get('report_xunit', False)
    with conditionally_captured_io(with_xunit, tee=True):
        time1 = time.time()
        yield
        time2 = time.time()

    if with_xunit:
        stdout = [escape(m['data']) for m in captured_std
                  if m['logger'] == 'stdout']
        stderr = [escape(m['data']) for m in captured_std
                  if m['logger'] == 'stderr']
        captured_io["stdout"] = stdout
        captured_io["stderr"] = stderr
        captured_io["time"] = (time2 - time1)
    else:
        captured_io["stdout"] = None
        captured_io["stderr"] = None
        captured_io["time"] = None


class Capturing(list):
    """Function context which captures stdout/stderr

    This keeps planemo's codebase clean without requiring planemo to hold onto
    messages, or pass user-facing messages back at all. This could probably be
    solved by swapping planemo entirely to a logger and reading from/writing
    to that, but this is easier.

    This swaps sys.std{out,err} with StringIOs and then makes that output
    available.
    """
    # http://stackoverflow.com/a/16571630

    def __enter__(self):
        self._stdout = sys.stdout
        self._stderr = sys.stderr
        sys.stdout = self._stringio_stdout = StringIO()
        sys.stderr = self._stringio_stderr = StringIO()
        return self

    def __exit__(self, *args):
        self.extend([{'logger': 'stdout', 'data': x} for x in
                     self._stringio_stdout.getvalue().splitlines()])
        self.extend([{'logger': 'stderr', 'data': x} for x in
                     self._stringio_stderr.getvalue().splitlines()])

        sys.stdout = self._stdout
        sys.stderr = self._stderr


def tee_captured_output(output):
    """For messages captured with Capturing, send them to their correct
    locations so as to not interfere with normal user experience.
    """
    for message in output:
        # Append '\n' due to `splitlines()` above
        if message['logger'] == 'stdout':
            sys.stdout.write(message['data'] + '\n')
        if message['logger'] == 'stderr':
            sys.stderr.write(message['data'] + '\n')


def wait_on(function, desc, timeout=5, polling_backoff=0):
    """Wait on given function's readiness. Grow the polling
    interval incrementally by the polling_backoff."""
    delta = .25
    timing = 0
    while True:
        if timing > timeout:
            message = "Timed out waiting on %s." % desc
            raise Exception(message)
        timing += delta
        delta += polling_backoff
        value = function()
        if value is not None:
            return value
        time.sleep(delta)


@contextlib.contextmanager
def open_file_or_standard_output(path, *args, **kwds):
    if path == "-":
        yield sys.stdout
    else:
        yield open(path, *args, **kwds)


def filter_paths(paths, cwd=None, **kwds):
    if cwd is None:
        cwd = os.getcwd()

    def norm(path):
        if not os.path.isabs(path):
            path = os.path.join(cwd, path)
        return os.path.normpath(path)

    def exclude_func(exclude_path):
        def path_startswith(p):
            """Check that p starts with exclude_path and that the first
            character of p not included in exclude_path (if any) is the
            directory separator.
            """
            norm_p = norm(p)
            norm_exclude_path = norm(exclude_path)
            if norm_p.startswith(norm_exclude_path):
                return norm_p[len(norm_exclude_path):len(norm_exclude_path) + 1] in ['', os.sep]
            return False
        return path_startswith

    filters_as_funcs = []
    filters_as_funcs.extend(map(exclude_func, kwds.get("exclude", [])))

    for exclude_paths_ins in kwds.get("exclude_from", []):
        with open(exclude_paths_ins, "r") as f:
            for line in f.readlines():
                line = line.strip()
                if not line or line.startswith("#"):
                    continue
                filters_as_funcs.append(exclude_func(line))

    return [p for p in paths if not any(f(p) for f in filters_as_funcs)]


def coalesce_return_codes(ret_codes, assert_at_least_one=False):
    # Return 0 if everything is fine, otherwise pick the least
    # specific non-0 return code - preferring to report errors
    # to other non-0 exit codes.
    if assert_at_least_one and len(ret_codes) == 0:
        return EXIT_CODE_NO_SUCH_TARGET

    coalesced_return_code = EXIT_CODE_OK
    for ret_code in ret_codes:
        # None is equivalent to 0 in these methods.
        ret_code = 0 if ret_code is None else ret_code
        if ret_code == 0:
            # Everything is fine, keep moving...
            pass
        elif coalesced_return_code == 0:
            coalesced_return_code = ret_code
        # At this point in logic both ret_code and coalesced_return_code are
        # are non-zero
        elif ret_code < 0:
            # Error state, this should override eveything else.
            coalesced_return_code = ret_code
        elif ret_code > 0 and coalesced_return_code < 0:
            # Keep error state recorded.
            pass
        elif ret_code > 0:
            # Lets somewhat arbitrarily call the smaller exit code
            # the less specific.
            coalesced_return_code = min(ret_code, coalesced_return_code)

    if coalesced_return_code < 0:
        # Map -1 => 254, -2 => 253, etc...
        # Not sure it is helpful to have negative error codes
        # this was a design and API mistake in planemo.
        coalesced_return_code = 255 + coalesced_return_code

    return coalesced_return_code