Mercurial > repos > rhpvorderman > data_manager_select_index_by_path
diff data_manager/path_name_value_key_manager.py @ 1:8495c49cd056 draft default tip
planemo upload for repository https://github.com/LUMC/lumc-galaxy-tools/tree/master/data_manager_select_index_by_path commit 9061997af3bc94f49653ffd42f10b973578e371d
author | rhpvorderman |
---|---|
date | Mon, 16 Jul 2018 10:58:36 -0400 |
parents | 5f8d9309058b |
children |
line wrap: on
line diff
--- a/data_manager/path_name_value_key_manager.py Mon Sep 25 03:35:26 2017 -0400 +++ b/data_manager/path_name_value_key_manager.py Mon Jul 16 10:58:36 2018 -0400 @@ -1,104 +1,222 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 +"""Script to create data manager jsons""" +import argparse import json -import argparse -import os +from pathlib import Path + import yaml +from schema import Schema, Optional -def _add_data_table_entry( data_manager_dict, data_table_name, data_table_entry ): - data_manager_dict['data_tables'] = data_manager_dict.get( 'data_tables', {} ) - data_manager_dict['data_tables'][ data_table_name ] = data_manager_dict['data_tables'].get( data_table_name, [] ) - data_manager_dict['data_tables'][ data_table_name ].append( data_table_entry ) - return data_manager_dict + +def indexes_schema(): + return Schema( + {'name': str, + Optional('prefix'): bool, + Optional('extensions'): [str], + Optional('prefix_strip_extension'): bool, + Optional('extra_columns'): [str], + Optional('folder'): [str]}) -def check_param(name, value, default=None, check_tab=True): - if value in [ None, '', '?' ]: - if default: - print "Using {0} for {1} as no value provided".format( default, name ) - value = default - else: - raise Exception( '{0} is not a valid {1}. You must specify a valid {1}.'.format( value, name ) ) - if check_tab and "\t" in value: - raise Exception( '{0} is not a valid {1}. It may not contain a tab because these are used as seperators by galaxy .'.format( value, name ) ) - return value +def argument_parser(): + parser = argparse.ArgumentParser() + parser.add_argument('--value', type=str, help='value') + parser.add_argument('--dbkey', type=str, help='dbkey') + parser.add_argument('--name', type=str, help='name') + parser.add_argument('--path', type=Path, help='path', + required=True) + parser.add_argument('--data_table_name', action='store', type=str, + help='Name of the data table', + required=True) + parser.add_argument('--json_output_file', action='store', type=Path, + help='Json output file', + required=True) + parser.add_argument("--extra-columns", type=str, + help='Yaml formatted string with extra columns ' + 'and their values. For example ' + '\'{"with-gtf":"0"}\' for STAR indexes') + return parser -def prefix_exists(directory, prefix): - '''checks if files exist with prefix in a directory. Returns Boolean''' - matched_files = [] - directory_files = os.listdir(directory) - for directory_file in directory_files: - if directory_file.startswith(prefix): - matched_files.append(directory_file) - # Empty list should return False - return bool(matched_files) + +def check_tab(name: str, value: str): + if '\t' in value: + raise ValueError( + "'{0}' is not a valid '{1}'. It may not contain a tab because " + "these are used as seperators by galaxy .".format( + value, name)) -def prefix_plus_extension_exists(directory, prefix, extension): - '''checks if files exist with prefix in a directory. Returns Boolean''' - matched_files = [] - directory_files = os.listdir(directory) - for directory_file in directory_files: - if directory_file.startswith(prefix) and directory_file.endswith(extension): - matched_files.append(directory_file) + +def prefix_plus_extension_exists(directory: Path, prefix: str, extension: str): + """checks if files exist with prefix in a directory. Returns Boolean""" + matched_files = [directory_file for directory_file in directory.iterdir() + if + directory_file.name.startswith( + prefix) and directory_file.suffix == extension] # Empty list should return False return bool(matched_files) -def main(): + +class DataTable(object): - #value = "test_value" - #name = "test_name" - #print '{0} other {1} more{0}'.format(value, name ) - #print '{0} is not a valid {1}. It may not contain a tab.'.format( value, name ) + def __init__(self, + index_path: Path, + data_table_name: str, + indexes_properties_file: Path, + name: str = None, + dbkey: str = None, + value: str = None, + extra_columns: dict = None + ): + self.index_path = index_path + self.data_table_name = data_table_name + self.name = name if name else str(self.index_path.with_suffix( + '').name) + self.value = value if value else self.name + self.dbkey = dbkey if dbkey else self.value + self.extra_columns = extra_columns if extra_columns is not None else {} + self.indexes_properties_file = indexes_properties_file + + self.check_params() + + self.index_properties = self.get_index_properties() + + self.check_index_file_presence() + + def check_params(self): + + check_tab('name', self.name) + check_tab('index_path', str(self.index_path.absolute().name)) + check_tab('value', self.value) + check_tab('dbkey', self.dbkey) + self.check_extra_columns() - #Parse Command Line - parser = argparse.ArgumentParser() - parser.add_argument( '--value', action='store', type=str, default=None, help='value' ) - parser.add_argument( '--dbkey', action='store', type=str, default=None, help='dbkey' ) - parser.add_argument( '--name', action='store', type=str, default=None, help='name' ) - parser.add_argument( '--path', action='store', type=str, default=None, help='path' ) - parser.add_argument( '--data_table_name', action='store', type=str, default=None, help='path' ) - parser.add_argument( '--json_output_file', action='store', type=str, default=None, help='path' ) - options = parser.parse_args() + def check_extra_columns(self): + index_properties = self.get_index_properties() + index_extra_columns = set(index_properties.get("extra_columns", [])) + given_extra_columns = self.extra_columns.keys() + if index_extra_columns != given_extra_columns: + if len(index_extra_columns) > 0: + raise ValueError( + "Values for the following columns should be " + "supplied: {0}.".format( + str(index_extra_columns).strip("{}"))) + if len(index_extra_columns) == 0: + raise ValueError( + "The table '{0}' does not have extra columns".format( + self.data_table_name)) + for key, value in self.extra_columns.items(): + check_tab(key, value) - path = check_param("path", options.path) - basename = os.path.basename(path) - filename = os.path.splitext(basename)[0] - name = check_param("name", options.name, default=filename) - value = check_param("value", options.value, default=name) - dbkey = check_param("dbkey", options.dbkey, default=value) - data_table_name = check_param("data_table_name", options.data_table_name) - json_output_file = check_param("json_output_file", options.json_output_file, check_tab=False) + def get_index_properties(self) -> dict: + with self.indexes_properties_file.open('r') as properties_file: + indexes = yaml.safe_load(properties_file) + index_properties = indexes.get(self.data_table_name) + if index_properties is None: + raise ValueError( + "'{0}' not a supported table name".format( + self.data_table_name)) + return indexes_schema().validate(index_properties) + + def check_index_file_presence(self): + index_name = self.index_properties.get('name') + if index_name is None: + raise NotImplementedError( + "Property 'name' not defined for '{0}'," + " please contact the developers to correct the mistake.") + index_extensions = self.index_properties.get('extensions', ['']) + + # Sometimes an index path is a prefix. + # For example, with BWA. 'reference.fa' is the index. + # But the actual index files are + # 'reference.fa.amb', 'reference.fa.ann' etc. - # Check if file or prefix exists - indexes = yaml.load(file(os.path.join(os.path.dirname(__file__), 'indexes.yml'))) - index_dict = indexes.get(data_table_name,{}) - index_name = index_dict.get('name','index') - index_extensions = index_dict.get('extensions', ['']) - no_prefix = index_dict.get('no_prefix', False) - if not no_prefix: - dirname = os.path.dirname(path) - prefix = basename - for extension in index_extensions: - if not prefix_plus_extension_exists(dirname,prefix,extension): - raise Exception( 'Unable to find files with prefix "{0}" and extension "{1}" in {2}. Is this a valid {3}?'.format( prefix, extension, dirname, index_name ) ) + # If the index is not a prefix, + # the index file is taken to be the path itself. + index_is_a_prefix = self.index_properties.get('prefix', True) + prefix_strip_extension = self.index_properties.get( + 'prefix_strip_extension', False) + if index_is_a_prefix: + if prefix_strip_extension: + prefix = str(self.index_path.with_suffix("").name) + else: + prefix = str(self.index_path.name) + for extension in index_extensions: + if not prefix_plus_extension_exists(self.index_path.parent, + prefix, extension): + raise FileNotFoundError( + "Unable to find files with prefix '{0}' " + "and extension '{1}' in {2}. Is this a valid {3}?" + .format( + prefix, + extension, + str(self.index_path.parent), + index_name)) + elif self.index_properties.get('folder') is not None: + for file in self.index_properties.get('folder'): + if not (self.index_path / Path(file)).exists(): + raise FileNotFoundError( + "A file named '{0}' was not found in '{1}'".format( + file, str(self.index_path))) + elif not self.index_path.exists() and not self.index_path.is_dir(): + raise FileNotFoundError( + 'Unable to find path {0}.'.format(self.index_path)) + elif self.index_path.is_dir() and self.index_properties.get( + 'folder') is None: + raise IsADirectoryError( + '{0} is a directory not a file'.format(self.index_path)) + elif self.index_path.exists(): + pass + else: + raise NotImplementedError("This condition was not expected " + "and should not be reached. Please " + "contact the developers.") + + @property + def data_manager_dict(self) -> dict: + data_table_entry = dict(value=self.value, dbkey=self.dbkey, + name=self.name, + path=str(self.index_path), + **self.extra_columns) + data_manager_dict = dict(data_tables=dict()) + data_manager_dict["data_tables"][ + self.data_table_name] = [data_table_entry] + return data_manager_dict + + @property + def data_manager_json(self) -> str: + return json.dumps(self.data_manager_dict) + + +def main(): + options = argument_parser().parse_args() + + if options.json_output_file.exists(): + pass # Do not raise error. + + if options.extra_columns is None: + extra_columns = dict() else: - if not os.path.exists(path): - raise Exception( 'Unable to find path {0}.'.format( path ) ) - - if os.path.exists(json_output_file): - params = json.loads( open( json_output_file ).read() ) - print "params", params - else: - params = {} + try: + extra_columns = yaml.safe_load(options.extra_columns) + except yaml.parser.ParserError as e: + raise yaml.parser.ParserError( + "Invalid yaml string for --extra_indexes. \nError {0}".format( + e)) - data_manager_dict = {} - data_table_entry = dict( value=value, dbkey=dbkey, name=name, path=path ) - _add_data_table_entry( data_manager_dict, data_table_name, data_table_entry ) + index_properties_file = Path(__file__).parent / Path("indexes.yml") + data_table = DataTable(index_path=options.path, + data_table_name=options.data_table_name, + name=options.name, + value=options.value, + dbkey=options.dbkey, + indexes_properties_file=index_properties_file, + extra_columns=extra_columns) - #save info to json file - with open( json_output_file, 'wb' ) as output_file: - output_file.write( json.dumps( data_manager_dict ) ) - output_file.write( "\n" ) + # save info to json file + with options.json_output_file.open('w') as output_file: + output_file.write(data_table.data_manager_json) + if __name__ == "__main__": main()