diff env/lib/python3.7/site-packages/planemo/cli.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/planemo/cli.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,332 @@
+"""The module describes a CLI framework extending ``click``."""
+import functools
+import logging.config
+import os
+import shutil
+import sys
+import traceback
+
+import click
+from six.moves.urllib.request import urlopen
+
+from planemo import __version__
+from planemo.exit_codes import ExitCodeException
+from planemo.galaxy import profiles
+from .config import (
+    OptionSource,
+    read_global_config,
+)
+from .io import error
+
+
+CONTEXT_SETTINGS = dict(auto_envvar_prefix='PLANEMO')
+COMMAND_ALIASES = {
+    "l": "lint",
+    "o": "open",
+    "t": "test",
+    "s": "serve",
+}
+
+
+class Context(object):
+    """Describe context of Planemo computation.
+
+    Handles cross cutting concerns for Planemo such as verbose log
+    tracking, the definition of the Planemo workspace (``~/.planemo``),
+    and the global configuraton defined in ``~/.planemo.yml``.
+    """
+
+    def __init__(self):
+        """Construct a Context object using execution environment."""
+        self.home = os.getcwd()
+        self._global_config = None
+        # Will be set by planemo CLI driver
+        self.verbose = False
+        self.planemo_config = None
+        self.planemo_directory = None
+        self.option_source = {}
+
+    def set_option_source(self, param_name, option_source, force=False):
+        """Specify how an option was set."""
+        if not force:
+            assert param_name not in self.option_source, "No option source for [%s]" % param_name
+        self.option_source[param_name] = option_source
+
+    def get_option_source(self, param_name):
+        """Return OptionSource value indicating how the option was set."""
+        assert param_name in self.option_source, "No option source for [%s]" % param_name
+        return self.option_source[param_name]
+
+    @property
+    def global_config(self):
+        """Read Planemo's global configuration.
+
+        As defined most simply by ~/.planemo.yml.
+        """
+        if self._global_config is None:
+            self._global_config = read_global_config(self.planemo_config)
+        return self._global_config or {}
+
+    def log(self, msg, *args):
+        """Log a message to stderr."""
+        if args:
+            msg %= args
+        click.echo(msg, file=sys.stderr)
+
+    def vlog(self, msg, *args, **kwds):
+        """Log a message to stderr only if verbose is enabled."""
+        if self.verbose:
+            self.log(msg, *args)
+            if kwds.get("exception", False):
+                traceback.print_exc(file=sys.stderr)
+
+    @property
+    def workspace(self):
+        """Create and return Planemo's workspace.
+
+        By default this will be ``~/.planemo``.
+        """
+        if not self.planemo_directory:
+            raise Exception("No planemo workspace defined.")
+        workspace = self.planemo_directory
+        return self._ensure_directory(workspace, "workspace")
+
+    @property
+    def galaxy_profiles_directory(self):
+        """Create a return a directory for storing Galaxy profiles."""
+        path = os.path.join(self.workspace, "profiles")
+        return self._ensure_directory(path, "Galaxy profiles")
+
+    def _ensure_directory(self, path, name):
+        if not os.path.exists(path):
+            os.makedirs(path)
+        if not os.path.isdir(path):
+            template = "Planemo %s directory [%s] unavailable."
+            message = template % (name, path)
+            raise Exception(message)
+        return path
+
+    def exit(self, exit_code):
+        """Exit planemo with the supplied exit code."""
+        self.vlog("Exiting planemo with exit code [%d]" % exit_code)
+        raise ExitCodeException(exit_code)
+
+    def cache_download(self, url, destination):
+        cache = os.path.join(self.workspace, "cache")
+        if not os.path.exists(cache):
+            os.makedirs(cache)
+        filename = os.path.basename(url)
+        cache_destination = os.path.join(cache, filename)
+        if not os.path.exists(cache_destination):
+            with urlopen(url) as fh:
+                content = fh.read()
+            if len(content) == 0:
+                raise Exception("Failed to download [%s]." % url)
+            with open(cache_destination, "wb") as f:
+                f.write(content)
+
+        shutil.copy(cache_destination, destination)
+
+
+pass_context = click.make_pass_decorator(Context, ensure=True)
+cmd_folder = os.path.abspath(os.path.join(os.path.dirname(__file__),
+                                          'commands'))
+
+
+def list_cmds():
+    """List planemo commands from commands folder."""
+    rv = []
+    for filename in os.listdir(cmd_folder):
+        if filename.endswith('.py') and \
+           filename.startswith('cmd_'):
+            rv.append(filename[len("cmd_"):-len(".py")])
+    rv.sort()
+    return rv
+
+
+def name_to_command(name):
+    """Convert a subcommand name to the cli function for that command.
+
+    Command <X> is defined by the method 'planemo.commands.cmd_<x>:cli',
+    this method uses `__import__` to load and return that method.
+    """
+    try:
+        if sys.version_info[0] == 2:
+            name = name.encode('ascii', 'replace')
+        mod_name = 'planemo.commands.cmd_' + name
+        mod = __import__(mod_name, None, None, ['cli'])
+    except ImportError as e:
+        error("Problem loading command %s, exception %s" % (name, e))
+        return
+    return mod.cli
+
+
+class PlanemoCLI(click.MultiCommand):
+
+    def list_commands(self, ctx):
+        return list_cmds()
+
+    def get_command(self, ctx, name):
+        if name in COMMAND_ALIASES:
+            name = COMMAND_ALIASES[name]
+        return name_to_command(name)
+
+
+def command_function(f):
+    """Extension point for processing kwds after click callbacks."""
+    @functools.wraps(f)
+    def handle_blended_options(*args, **kwds):
+        profile = kwds.get("profile", None)
+        if profile:
+            ctx = args[0]
+            profile_defaults = profiles.ensure_profile(
+                ctx, profile, **kwds
+            )
+            _setup_profile_options(ctx, profile_defaults, kwds)
+
+        _setup_galaxy_source_options(args[0], kwds)
+
+        try:
+            return f(*args, **kwds)
+        except ExitCodeException as e:
+            sys.exit(e.exit_code)
+
+    return pass_context(handle_blended_options)
+
+
+EXCLUSIVE_OPTIONS_LIST = [
+]
+
+
+def _setup_galaxy_source_options(ctx, kwds):
+    for exclusive_options in EXCLUSIVE_OPTIONS_LIST:
+        option_source = {}
+        for option in exclusive_options:
+            if option in kwds:
+                option_source[option] = ctx.get_option_source(option)
+            else:
+                option_source[option] = None
+
+        most_authoratative_source = None
+        most_authoratative_source_options = []
+        for key, value in option_source.items():
+            if value is None:
+                continue
+            if most_authoratative_source is None or value.value < most_authoratative_source.value:
+                most_authoratative_source = value
+                most_authoratative_source_options = [key]
+            elif value == most_authoratative_source:
+                most_authoratative_source_options.append(key)
+
+        if most_authoratative_source != OptionSource.default and len(most_authoratative_source_options) > 1:
+            raise click.UsageError("Cannot specify multiple of %s" % most_authoratative_source_options)
+
+        for option in exclusive_options:
+            if option in kwds and option not in most_authoratative_source_options:
+                del kwds[option]
+
+
+def _setup_profile_options(ctx, profile_defaults, kwds):
+    for key, value in profile_defaults.items():
+        option_present = key in kwds
+        option_cli_specified = option_present and (ctx.get_option_source(key) == OptionSource.cli)
+        use_profile_option = not option_present or not option_cli_specified
+        if use_profile_option:
+            kwds[key] = value
+            ctx.set_option_source(
+                key, OptionSource.profile, force=True
+            )
+
+
+@click.command(cls=PlanemoCLI, context_settings=CONTEXT_SETTINGS)
+@click.version_option(__version__)
+@click.option('-v', '--verbose', is_flag=True,
+              help='Enables verbose mode.')
+@click.option('--config',
+              default="~/.planemo.yml",
+              envvar="PLANEMO_GLOBAL_CONFIG_PATH",
+              help="Planemo configuration YAML file.")
+@click.option('--directory',
+              default="~/.planemo",
+              envvar="PLANEMO_GLOBAL_WORKSPACE",
+              help="Workspace for planemo.")
+@pass_context
+def planemo(ctx, config, directory, verbose, configure_logging=True):
+    """A command-line toolkit for building tools and workflows for Galaxy.
+
+    Check out the full documentation for Planemo online
+    http://planemo.readthedocs.org or open with ``planemo docs``.
+
+    All the individual planemo commands support the ``--help`` option, for
+    example use ``planemo lint --help`` for more details on checking tools.
+    """
+    ctx.verbose = verbose
+    if configure_logging:
+        logging_config = {
+            'version': 1,
+            'disable_existing_loggers': False,
+            'formatters': {
+                'verbose': {
+                    'format': '%(name)s %(levelname)s %(asctime)s: %(message)s'
+                },
+                'simple': {
+                    'format': '%(name)s %(levelname)s: %(message)s'
+                },
+            },
+            'handlers': {
+                'console': {
+                    'level': 'DEBUG',
+                    'class': 'logging.StreamHandler',
+                    'formatter': 'simple' if not verbose else 'verbose'
+                },
+            },
+            'loggers': {
+                # Suppress CWL is beta warning, for Planemo purposes - it is absolutely not.
+                'galaxy.tools.parser.factory': {
+                    'handlers': ['console'],
+                    'propagate': False,
+                    'level': 'ERROR' if not verbose else "DEBUG",
+                },
+                'galaxy.tools.deps.commands': {
+                    'handlers': ['console'],
+                    'propagate': False,
+                    'level': 'ERROR' if not verbose else "DEBUG",
+                },
+                'galaxy': {
+                    'handlers': ['console'],
+                    'propagate': False,
+                    'level': 'INFO' if not verbose else "DEBUG",
+                },
+                # @jmchilton
+                # I'm fixing up Planemo's lint functionality for CWL and I keep seeing this for the
+                # schema metadata stuff (e.g. in the workflows repo). "rdflib.term WARNING:
+                # http://schema.org/docs/!DOCTYPE html does not look like a valid URI, trying to
+                # serialize this will break.". I'm going to suppress this warning I think, or are the
+                # examples wrong and should declare their namespaces differently in some way?
+                # @mr-c
+                # That particular warning is worth suppressing. A PR to silence it permanently would be very welcome!
+                # https://github.com/RDFLib/rdflib/blob/master/rdflib/term.py#L225
+                'rdflib.term': {
+                    'handlers': ['console'],
+                    'propagate': False,
+                    'level': 'ERROR' if not verbose else "DEBUG",
+                }
+            },
+            'root': {
+                'handlers': ['console'],
+                'propagate': False,
+                'level': 'WARNING' if not verbose else "DEBUG",
+            }
+        }
+        logging.config.dictConfig(logging_config)
+    ctx.planemo_config = os.path.expanduser(config)
+    ctx.planemo_directory = os.path.expanduser(directory)
+
+
+__all__ = (
+    "command_function",
+    "Context",
+    "list_cmds",
+    "name_to_command",
+    "planemo",
+)