diff env/lib/python3.7/site-packages/planemo/io.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/planemo/io.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,402 +0,0 @@
-from __future__ import absolute_import
-from __future__ import print_function
-
-import contextlib
-import errno
-import fnmatch
-import os
-import shutil
-import subprocess
-import sys
-import tempfile
-import time
-from sys import platform as _platform
-from xml.sax.saxutils import escape
-
-import click
-from galaxy.tool_util.deps import commands
-from galaxy.tool_util.deps.commands import download_command
-from six import (
-    string_types,
-    StringIO
-)
-
-from .exit_codes import (
-    EXIT_CODE_NO_SUCH_TARGET,
-    EXIT_CODE_OK,
-)
-
-
-IS_OS_X = _platform == "darwin"
-
-
-def args_to_str(args):
-    if args is None or isinstance(args, string_types):
-        return args
-    else:
-        return commands.argv_to_str(args)
-
-
-def communicate(cmds, **kwds):
-    cmd_string = args_to_str(cmds)
-    info(cmd_string)
-    p = commands.shell_process(cmds, **kwds)
-    if kwds.get("stdout", None) is None and commands.redirecting_io(sys=sys):
-        output = commands.redirect_aware_commmunicate(p)
-    else:
-        output = p.communicate()
-
-    if p.returncode != 0:
-        template = "Problem executing commands {0} - ({1}, {2})"
-        msg = template.format(cmd_string, output[0], output[1])
-        raise RuntimeError(msg)
-    return output
-
-
-def shell(cmds, **kwds):
-    cmd_string = args_to_str(cmds)
-    info(cmd_string)
-    return commands.shell(cmds, **kwds)
-
-
-def info(message, *args):
-    if args:
-        message = message % args
-    click.echo(click.style(message, bold=True, fg='green'))
-
-
-def can_write_to_path(path, **kwds):
-    if not kwds["force"] and os.path.exists(path):
-        error("%s already exists, exiting." % path)
-        return False
-    return True
-
-
-def error(message, *args):
-    if args:
-        message = message % args
-    click.echo(click.style(message, bold=True, fg='red'), err=True)
-
-
-def warn(message, *args):
-    if args:
-        message = message % args
-    click.echo(click.style(message, fg='red'), err=True)
-
-
-def shell_join(*args):
-    """Join potentially empty commands together with '&&'."""
-    return " && ".join(args_to_str(_) for _ in args if _)
-
-
-def write_file(path, content, force=True):
-    if os.path.exists(path) and not force:
-        return
-
-    with open(path, "w") as f:
-        f.write(content)
-
-
-def untar_to(url, tar_args=None, path=None, dest_dir=None):
-    if tar_args:
-        assert not (path and dest_dir)
-        if dest_dir:
-            if not os.path.exists(dest_dir):
-                os.makedirs(dest_dir)
-            tar_args[0:0] = ['-C', dest_dir]
-        if path:
-            tar_args.insert(0, '-O')
-
-        download_cmd = download_command(url)
-        download_p = commands.shell_process(download_cmd, stdout=subprocess.PIPE)
-        untar_cmd = ['tar'] + tar_args
-        if path:
-            with open(path, 'wb') as fh:
-                shell(untar_cmd, stdin=download_p.stdout, stdout=fh)
-        else:
-            shell(untar_cmd, stdin=download_p.stdout)
-        download_p.wait()
-    else:
-        cmd = download_command(url, to=path)
-        shell(cmd)
-
-
-def find_matching_directories(path, pattern, recursive):
-    """Find directories below supplied path with file matching pattern.
-
-    Returns an empty list if no matches are found, and if recursive is False
-    only the top directory specified by path will be considered.
-    """
-    dirs = []
-    if recursive:
-        if not os.path.isdir(path):
-            template = "--recursive specified with non-directory path [%s]"
-            message = template % (path)
-            raise Exception(message)
-
-        for base_path, dirnames, filenames in os.walk(path):
-            dirnames.sort()
-            for filename in fnmatch.filter(filenames, pattern):
-                dirs.append(base_path)
-    else:
-        if os.path.exists(os.path.join(path, pattern)):
-            dirs.append(path)
-        elif os.path.basename(path) == pattern:
-            dirs.append(os.path.dirname(path))
-    return dirs
-
-
-@contextlib.contextmanager
-def real_io():
-    """Ensure stdout and stderr have supported ``fileno()`` method.
-
-    nosetests replaces these streams with :class:`StringIO` objects
-    that may not work the same in every situtation - :func:`subprocess.Popen`
-    calls in particular.
-    """
-    original_stdout = sys.stdout
-    original_stderr = sys.stderr
-    try:
-        if commands.redirecting_io(sys=sys):
-            sys.stdout = sys.__stdout__
-            sys.stderr = sys.__stderr__
-        yield
-    finally:
-        sys.stdout = original_stdout
-        sys.stderr = original_stderr
-
-
-@contextlib.contextmanager
-def temp_directory(prefix="planemo_tmp_", dir=None, **kwds):
-    if dir is not None:
-        try:
-            os.makedirs(dir)
-        except OSError as e:
-            if e.errno != errno.EEXIST:
-                raise
-    temp_dir = tempfile.mkdtemp(prefix=prefix, dir=dir, **kwds)
-    try:
-        yield temp_dir
-    finally:
-        shutil.rmtree(temp_dir)
-
-
-def ps1_for_path(path, base="PS1"):
-    """ Used by environment commands to build a PS1 shell
-    variables for tool or directory of tools.
-    """
-    file_name = os.path.basename(path)
-    base_name = os.path.splitext(file_name)[0]
-    ps1 = "(%s)${%s}" % (base_name, base)
-    return ps1
-
-
-def kill_pid_file(pid_file):
-    try:
-        os.stat(pid_file)
-    except OSError as e:
-        if e.errno == errno.ENOENT:
-            return False
-
-    with open(pid_file, "r") as fh:
-        pid = int(fh.read())
-    kill_posix(pid)
-    try:
-        os.unlink(pid_file)
-    except Exception:
-        pass
-
-
-def kill_posix(pid):
-    def _check_pid():
-        try:
-            os.kill(pid, 0)
-            return True
-        except OSError:
-            return False
-
-    if _check_pid():
-        for sig in [15, 9]:
-            try:
-                os.kill(pid, sig)
-            except OSError:
-                return
-            time.sleep(1)
-            if not _check_pid():
-                return
-
-
-@contextlib.contextmanager
-def conditionally_captured_io(capture, tee=False):
-    captured_std = []
-    if capture:
-        with Capturing() as captured_std:
-            yield captured_std
-        if tee:
-            tee_captured_output(captured_std)
-    else:
-        yield
-
-
-@contextlib.contextmanager
-def captured_io_for_xunit(kwds, captured_io):
-    captured_std = []
-    with_xunit = kwds.get('report_xunit', False)
-    with conditionally_captured_io(with_xunit, tee=True):
-        time1 = time.time()
-        yield
-        time2 = time.time()
-
-    if with_xunit:
-        stdout = [escape(m['data']) for m in captured_std
-                  if m['logger'] == 'stdout']
-        stderr = [escape(m['data']) for m in captured_std
-                  if m['logger'] == 'stderr']
-        captured_io["stdout"] = stdout
-        captured_io["stderr"] = stderr
-        captured_io["time"] = (time2 - time1)
-    else:
-        captured_io["stdout"] = None
-        captured_io["stderr"] = None
-        captured_io["time"] = None
-
-
-class Capturing(list):
-    """Function context which captures stdout/stderr
-
-    This keeps planemo's codebase clean without requiring planemo to hold onto
-    messages, or pass user-facing messages back at all. This could probably be
-    solved by swapping planemo entirely to a logger and reading from/writing
-    to that, but this is easier.
-
-    This swaps sys.std{out,err} with StringIOs and then makes that output
-    available.
-    """
-    # http://stackoverflow.com/a/16571630
-
-    def __enter__(self):
-        self._stdout = sys.stdout
-        self._stderr = sys.stderr
-        sys.stdout = self._stringio_stdout = StringIO()
-        sys.stderr = self._stringio_stderr = StringIO()
-        return self
-
-    def __exit__(self, *args):
-        self.extend([{'logger': 'stdout', 'data': x} for x in
-                     self._stringio_stdout.getvalue().splitlines()])
-        self.extend([{'logger': 'stderr', 'data': x} for x in
-                     self._stringio_stderr.getvalue().splitlines()])
-
-        sys.stdout = self._stdout
-        sys.stderr = self._stderr
-
-
-def tee_captured_output(output):
-    """For messages captured with Capturing, send them to their correct
-    locations so as to not interfere with normal user experience.
-    """
-    for message in output:
-        # Append '\n' due to `splitlines()` above
-        if message['logger'] == 'stdout':
-            sys.stdout.write(message['data'] + '\n')
-        if message['logger'] == 'stderr':
-            sys.stderr.write(message['data'] + '\n')
-
-
-def wait_on(function, desc, timeout=5, polling_backoff=0):
-    """Wait on given function's readiness. Grow the polling
-    interval incrementally by the polling_backoff."""
-    delta = .25
-    timing = 0
-    while True:
-        if timing > timeout:
-            message = "Timed out waiting on %s." % desc
-            raise Exception(message)
-        timing += delta
-        delta += polling_backoff
-        value = function()
-        if value is not None:
-            return value
-        time.sleep(delta)
-
-
-@contextlib.contextmanager
-def open_file_or_standard_output(path, *args, **kwds):
-    if path == "-":
-        yield sys.stdout
-    else:
-        yield open(path, *args, **kwds)
-
-
-def filter_paths(paths, cwd=None, **kwds):
-    if cwd is None:
-        cwd = os.getcwd()
-
-    def norm(path):
-        if not os.path.isabs(path):
-            path = os.path.join(cwd, path)
-        return os.path.normpath(path)
-
-    def exclude_func(exclude_path):
-        def path_startswith(p):
-            """Check that p starts with exclude_path and that the first
-            character of p not included in exclude_path (if any) is the
-            directory separator.
-            """
-            norm_p = norm(p)
-            norm_exclude_path = norm(exclude_path)
-            if norm_p.startswith(norm_exclude_path):
-                return norm_p[len(norm_exclude_path):len(norm_exclude_path) + 1] in ['', os.sep]
-            return False
-        return path_startswith
-
-    filters_as_funcs = []
-    filters_as_funcs.extend(map(exclude_func, kwds.get("exclude", [])))
-
-    for exclude_paths_ins in kwds.get("exclude_from", []):
-        with open(exclude_paths_ins, "r") as f:
-            for line in f.readlines():
-                line = line.strip()
-                if not line or line.startswith("#"):
-                    continue
-                filters_as_funcs.append(exclude_func(line))
-
-    return [p for p in paths if not any(f(p) for f in filters_as_funcs)]
-
-
-def coalesce_return_codes(ret_codes, assert_at_least_one=False):
-    # Return 0 if everything is fine, otherwise pick the least
-    # specific non-0 return code - preferring to report errors
-    # to other non-0 exit codes.
-    if assert_at_least_one and len(ret_codes) == 0:
-        return EXIT_CODE_NO_SUCH_TARGET
-
-    coalesced_return_code = EXIT_CODE_OK
-    for ret_code in ret_codes:
-        # None is equivalent to 0 in these methods.
-        ret_code = 0 if ret_code is None else ret_code
-        if ret_code == 0:
-            # Everything is fine, keep moving...
-            pass
-        elif coalesced_return_code == 0:
-            coalesced_return_code = ret_code
-        # At this point in logic both ret_code and coalesced_return_code are
-        # are non-zero
-        elif ret_code < 0:
-            # Error state, this should override eveything else.
-            coalesced_return_code = ret_code
-        elif ret_code > 0 and coalesced_return_code < 0:
-            # Keep error state recorded.
-            pass
-        elif ret_code > 0:
-            # Lets somewhat arbitrarily call the smaller exit code
-            # the less specific.
-            coalesced_return_code = min(ret_code, coalesced_return_code)
-
-    if coalesced_return_code < 0:
-        # Map -1 => 254, -2 => 253, etc...
-        # Not sure it is helpful to have negative error codes
-        # this was a design and API mistake in planemo.
-        coalesced_return_code = 255 + coalesced_return_code
-
-    return coalesced_return_code