diff env/lib/python3.7/site-packages/ephemeris/run_data_managers.py @ 5:9b1c78e6ba9c draft default tip

"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author shellac
date Mon, 01 Jun 2020 08:59:25 -0400 (2020-06-01)
parents 79f47841a781
children
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/ephemeris/run_data_managers.py	Thu May 14 16:47:39 2020 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,318 +0,0 @@
-#!/usr/bin/env python
-"""Run-data-managers is a tool for provisioning data on a galaxy instance.
-
-Run-data-managers has the ability to run multiple data managers that are interdependent.
-When a reference genome is needed for bwa-mem for example, Run-data-managers
-can first run a data manager to fetch the fasta file and run
-another data manager that indexes the fasta file for bwa-mem.
-This functionality depends on the "watch_tool_data_dir" setting in galaxy.ini to be True.
-Also, if a new data manager is installed, galaxy needs to be restarted in order for it's tool_data_dir to be watched.
-
-Run-data-managers needs a yaml that specifies what data managers are run and with which settings.
-Example files can be found `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.sample>`_,
-`here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.sample.advanced>`_,
-and `here <https://github.com/galaxyproject/ephemeris/blob/master/tests/run_data_managers.yaml.test>`_.
-
-By default run-data-managers skips entries in the yaml file that have already been run.
-It checks it in the following way:
-
-  * If the data manager has input variables "name" or "sequence_name" it will check if the "name" column in the data table already has this entry.
-    "name" will take precedence over "sequence_name".
-  * If the data manager has input variables "value", "sequence_id" or 'dbkey' it will check if the "value"
-    column in the data table already has this entry.
-    Value takes precedence over sequence_id which takes precedence over dbkey.
-  * If none of the above input variables are specified the data manager will always run.
-"""
-import argparse
-import json
-import logging
-import time
-from collections import namedtuple
-
-from bioblend.galaxy.tool_data import ToolDataClient
-from bioblend.galaxy.tools import ToolClient
-from jinja2 import Template
-
-from . import get_galaxy_connection
-from . import load_yaml_file
-from .common_parser import get_common_args
-from .ephemeris_log import disable_external_library_logging, setup_global_logger
-
-DEFAULT_URL = "http://localhost"
-DEFAULT_SOURCE_TABLES = ["all_fasta"]
-
-
-def wait(gi, job_list, log):
-    """
-        Waits until all jobs in a list are finished or failed.
-        It will check the state of the created datasets every 30s.
-        It will return a tuple: ( finished_jobs, failed_jobs )
-    """
-
-    failed_jobs = []
-    successful_jobs = []
-
-    # Empty list returns false and stops the loop.
-    while bool(job_list):
-        finished_jobs = []
-        for job in job_list:
-            job_hid = job['outputs'][0]['hid']
-            # check if the output of the running job is either in 'ok' or 'error' state
-            state = gi.datasets.show_dataset(job['outputs'][0]['id'])['state']
-            if state == 'ok':
-                log.info('Job %i finished with state %s.' % (job_hid, state))
-                successful_jobs.append(job)
-                finished_jobs.append(job)
-            if state == 'error':
-                log.error('Job %i finished with state %s.' % (job_hid, state))
-                job_id = job['jobs'][0]['id']
-                job_details = gi.jobs.show_job(job_id, full_details=True)
-                log.error(
-                    "Job {job_hid}: Tool '{tool_id}' finished with exit code: {exit_code}. Stderr: {stderr}".format(
-                        job_hid=job_hid,
-                        **job_details
-                    ))
-                log.debug("Job {job_hid}: Tool '{tool_id}' stdout: {stdout}".format(
-                    job_hid=job_hid,
-                    **job_details
-                ))
-                failed_jobs.append(job)
-                finished_jobs.append(job)
-            else:
-                log.debug('Job %i still running.' % job_hid)
-        # Remove finished jobs from job_list.
-        for finished_job in finished_jobs:
-            job_list.remove(finished_job)
-        # only sleep if job_list is not empty yet.
-        if bool(job_list):
-            time.sleep(30)
-    return successful_jobs, failed_jobs
-
-
-def get_first_valid_entry(input_dict, key_list):
-    """Iterates over key_list and returns the value of the first key that exists in the dictionary. Or returns None"""
-    for key in key_list:
-        if key in input_dict:
-            return input_dict.get(key)
-    return None
-
-
-class DataManagers:
-    def __init__(self, galaxy_instance, configuration):
-        """
-        :param galaxy_instance: A GalaxyInstance object (import from bioblend.galaxy)
-        :param configuration: A dictionary. Examples in the ephemeris documentation.
-        """
-        self.gi = galaxy_instance
-        self.config = configuration
-        self.tool_data_client = ToolDataClient(self.gi)
-        self.tool_client = ToolClient(self.gi)
-        self.possible_name_keys = ['name', 'sequence_name']  # In order of importance!
-        self.possible_value_keys = ['value', 'sequence_id', 'dbkey']  # In order of importance!
-        self.data_managers = self.config.get('data_managers')
-        self.genomes = self.config.get('genomes', '')
-        self.source_tables = DEFAULT_SOURCE_TABLES
-        self.fetch_jobs = []
-        self.skipped_fetch_jobs = []
-        self.index_jobs = []
-        self.skipped_index_jobs = []
-
-    def initiate_job_lists(self):
-        """
-        Determines which data managers should be run to populate the data tables.
-        Distinguishes between fetch jobs (download files) and index jobs.
-        :return: populate self.fetch_jobs, self.skipped_fetch_jobs, self.index_jobs and self.skipped_index_jobs
-        """
-        self.fetch_jobs = []
-        self.skipped_fetch_jobs = []
-        self.index_jobs = []
-        self.skipped_index_jobs = []
-        for dm in self.data_managers:
-            jobs, skipped_jobs = self.get_dm_jobs(dm)
-            if self.dm_is_fetcher(dm):
-                self.fetch_jobs.extend(jobs)
-                self.skipped_fetch_jobs.extend(skipped_jobs)
-            else:
-                self.index_jobs.extend(jobs)
-                self.skipped_index_jobs.extend(skipped_jobs)
-
-    def get_dm_jobs(self, dm):
-        """Gets the job entries for a single dm. Puts entries that already present in skipped_job_list.
-        :returns job_list, skipped_job_list"""
-        job_list = []
-        skipped_job_list = []
-        items = self.parse_items(dm.get('items', ['']))
-        for item in items:
-            dm_id = dm['id']
-            params = dm['params']
-            inputs = dict()
-            # Iterate over all parameters, replace occurences of {{item}} with the current processing item
-            # and create the tool_inputs dict for running the data manager job
-            for param in params:
-                key, value = list(param.items())[0]
-                value_template = Template(value)
-                value = value_template.render(item=item)
-                inputs.update({key: value})
-
-            job = dict(tool_id=dm_id, inputs=inputs)
-
-            data_tables = dm.get('data_table_reload', [])
-            if self.input_entries_exist_in_data_tables(data_tables, inputs):
-                skipped_job_list.append(job)
-            else:
-                job_list.append(job)
-        return job_list, skipped_job_list
-
-    def dm_is_fetcher(self, dm):
-        """Checks whether the data manager fetches a sequence instead of indexing.
-        This is based on the source table.
-        :returns True if dm is a fetcher. False if it is not."""
-        data_tables = dm.get('data_table_reload', [])
-        for data_table in data_tables:
-            if data_table in self.source_tables:
-                return True
-        return False
-
-    def data_table_entry_exists(self, data_table_name, entry, column='value'):
-        """Checks whether an entry exists in the a specified column in the data_table."""
-        try:
-            data_table_content = self.tool_data_client.show_data_table(data_table_name)
-        except Exception:
-            raise Exception('Table "%s" does not exist' % data_table_name)
-
-        try:
-            column_index = data_table_content.get('columns').index(column)
-        except IndexError:
-            raise IndexError('Column "%s" does not exist in %s' % (column, data_table_name))
-
-        for field in data_table_content.get('fields'):
-            if field[column_index] == entry:
-                return True
-        return False
-
-    def input_entries_exist_in_data_tables(self, data_tables, input_dict):
-        """Checks whether name and value entries from the input are already present in the data tables.
-        If an entry is missing in of the tables, this function returns False"""
-        value_entry = get_first_valid_entry(input_dict, self.possible_value_keys)
-        name_entry = get_first_valid_entry(input_dict, self.possible_name_keys)
-
-        # Return False if name and value entries are both None
-        if not value_entry and not name_entry:
-            return False
-
-        # Check every data table for existence of name and value
-        # Return False as soon as entry is not present
-        for data_table in data_tables:
-            if value_entry:
-                if not self.data_table_entry_exists(data_table, value_entry, column='value'):
-                    return False
-            if name_entry:
-                if not self.data_table_entry_exists(data_table, name_entry, column='name'):
-                    return False
-        # If all checks are passed the entries are present in the database tables.
-        return True
-
-    def parse_items(self, items):
-        """
-        Parses items with jinja2.
-        :param items: the items to be parsed
-        :return: the parsed items
-        """
-        if bool(self.genomes):
-            items_template = Template(json.dumps(items))
-            rendered_items = items_template.render(genomes=json.dumps(self.genomes))
-            # Remove trailing " if present
-            rendered_items = rendered_items.strip('"')
-            items = json.loads(rendered_items)
-        return items
-
-    def run(self, log=None, ignore_errors=False, overwrite=False):
-        """
-        Runs the data managers.
-        :param log: The log to be used.
-        :param ignore_errors: Ignore erroring data_managers. Continue regardless.
-        :param overwrite: Overwrite existing entries in data tables
-        """
-        self.initiate_job_lists()
-        all_succesful_jobs = []
-        all_failed_jobs = []
-        all_skipped_jobs = []
-
-        if not log:
-            log = logging.getLogger()
-
-        def run_jobs(jobs, skipped_jobs):
-            job_list = []
-            for skipped_job in skipped_jobs:
-                if overwrite:
-                    log.info('%s already run for %s. Entry will be overwritten.' %
-                             (skipped_job["tool_id"], skipped_job["inputs"]))
-                    jobs.append(skipped_job)
-                else:
-                    log.info('%s already run for %s. Skipping.' % (skipped_job["tool_id"], skipped_job["inputs"]))
-                    all_skipped_jobs.append(skipped_job)
-            for job in jobs:
-                started_job = self.tool_client.run_tool(history_id=None, tool_id=job["tool_id"],
-                                                        tool_inputs=job["inputs"])
-                log.info('Dispatched job %i. Running DM: "%s" with parameters: %s' %
-                         (started_job['outputs'][0]['hid'], job["tool_id"], job["inputs"]))
-                job_list.append(started_job)
-
-            successful_jobs, failed_jobs = wait(self.gi, job_list, log)
-            if failed_jobs:
-                if not ignore_errors:
-                    log.error('Not all jobs successful! aborting...')
-                    raise RuntimeError('Not all jobs successful! aborting...')
-                else:
-                    log.warning('Not all jobs successful! ignoring...')
-            all_succesful_jobs.extend(successful_jobs)
-            all_failed_jobs.extend(failed_jobs)
-
-        log.info("Running data managers that populate the following source data tables: %s" % self.source_tables)
-        run_jobs(self.fetch_jobs, self.skipped_fetch_jobs)
-        log.info("Running data managers that index sequences.")
-        run_jobs(self.index_jobs, self.skipped_index_jobs)
-
-        log.info('Finished running data managers. Results:')
-        log.info('Successful jobs: %i ' % len(all_succesful_jobs))
-        log.info('Skipped jobs: %i ' % len(all_skipped_jobs))
-        log.info('Failed jobs: %i ' % len(all_failed_jobs))
-        InstallResults = namedtuple("InstallResults", ["successful_jobs", "failed_jobs", "skipped_jobs"])
-        return InstallResults(successful_jobs=all_succesful_jobs, failed_jobs=all_failed_jobs,
-                              skipped_jobs=all_skipped_jobs)
-
-
-def _parser():
-    """returns the parser object."""
-    parent = get_common_args(log_file=True)
-
-    parser = argparse.ArgumentParser(
-        parents=[parent],
-        description='Running Galaxy data managers in a defined order with defined parameters.'
-                    "'watch_tool_data_dir' in galaxy config should be set to true.'")
-    parser.add_argument("--config", required=True,
-                        help="Path to the YAML config file with the list of data managers and data to install.")
-    parser.add_argument("--overwrite", action="store_true",
-                        help="Disables checking whether the item already exists in the tool data table.")
-    parser.add_argument("--ignore_errors", action="store_true",
-                        help="Do not stop running when jobs have failed.")
-    return parser
-
-
-def main():
-    disable_external_library_logging()
-    parser = _parser()
-    args = parser.parse_args()
-    log = setup_global_logger(name=__name__, log_file=args.log_file)
-    if args.verbose:
-        log.setLevel(logging.DEBUG)
-    else:
-        log.setLevel(logging.INFO)
-    gi = get_galaxy_connection(args, file=args.config, log=log, login_required=True)
-    config = load_yaml_file(args.config)
-    data_managers = DataManagers(gi, config)
-    data_managers.run(log, args.ignore_errors, args.overwrite)
-
-
-if __name__ == '__main__':
-    main()