diff data_manager/path_name_value_key_manager.py @ 1:8495c49cd056 draft default tip

planemo upload for repository https://github.com/LUMC/lumc-galaxy-tools/tree/master/data_manager_select_index_by_path commit 9061997af3bc94f49653ffd42f10b973578e371d
author rhpvorderman
date Mon, 16 Jul 2018 10:58:36 -0400
parents 5f8d9309058b
children
line wrap: on
line diff
--- a/data_manager/path_name_value_key_manager.py	Mon Sep 25 03:35:26 2017 -0400
+++ b/data_manager/path_name_value_key_manager.py	Mon Jul 16 10:58:36 2018 -0400
@@ -1,104 +1,222 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
+"""Script to create data manager jsons"""
 
+import argparse
 import json
-import argparse
-import os
+from pathlib import Path
+
 import yaml
+from schema import Schema, Optional
 
-def _add_data_table_entry( data_manager_dict, data_table_name, data_table_entry ):
-    data_manager_dict['data_tables'] = data_manager_dict.get( 'data_tables', {} )
-    data_manager_dict['data_tables'][ data_table_name ] = data_manager_dict['data_tables'].get( data_table_name, [] )
-    data_manager_dict['data_tables'][ data_table_name ].append( data_table_entry )
-    return data_manager_dict
+
+def indexes_schema():
+    return Schema(
+        {'name': str,
+         Optional('prefix'): bool,
+         Optional('extensions'): [str],
+         Optional('prefix_strip_extension'): bool,
+         Optional('extra_columns'): [str],
+         Optional('folder'): [str]})
 
 
-def check_param(name, value, default=None,  check_tab=True):
-    if value in [ None, '', '?' ]:
-        if default:
-            print "Using {0} for {1} as no value provided".format( default, name )
-            value = default
-        else:
-            raise Exception( '{0} is not a valid {1}. You must specify a valid {1}.'.format( value, name ) )
-    if check_tab and "\t" in value:
-        raise Exception( '{0} is not a valid {1}. It may not contain a tab because these are used as seperators by galaxy .'.format( value, name ) )
-    return value
+def argument_parser():
+    parser = argparse.ArgumentParser()
+    parser.add_argument('--value', type=str, help='value')
+    parser.add_argument('--dbkey', type=str, help='dbkey')
+    parser.add_argument('--name', type=str, help='name')
+    parser.add_argument('--path', type=Path, help='path',
+                        required=True)
+    parser.add_argument('--data_table_name', action='store', type=str,
+                        help='Name of the data table',
+                        required=True)
+    parser.add_argument('--json_output_file', action='store', type=Path,
+                        help='Json output file',
+                        required=True)
+    parser.add_argument("--extra-columns", type=str,
+                        help='Yaml formatted string with extra columns '
+                             'and their values. For example '
+                             '\'{"with-gtf":"0"}\' for STAR indexes')
+    return parser
 
-def prefix_exists(directory, prefix):
-    '''checks if files exist with prefix in a directory. Returns Boolean'''
-    matched_files = []
-    directory_files = os.listdir(directory)
-    for directory_file in directory_files:
-        if directory_file.startswith(prefix):
-            matched_files.append(directory_file)
-    # Empty list should return False
-    return bool(matched_files)
+
+def check_tab(name: str, value: str):
+    if '\t' in value:
+        raise ValueError(
+            "'{0}' is not a valid '{1}'. It may not contain a tab because "
+            "these are used as seperators by galaxy .".format(
+                value, name))
 
-def prefix_plus_extension_exists(directory, prefix, extension):
-    '''checks if files exist with prefix in a directory. Returns Boolean'''
-    matched_files = []
-    directory_files = os.listdir(directory)
-    for directory_file in directory_files:
-        if directory_file.startswith(prefix) and directory_file.endswith(extension):
-            matched_files.append(directory_file)
+
+def prefix_plus_extension_exists(directory: Path, prefix: str, extension: str):
+    """checks if files exist with prefix in a directory. Returns Boolean"""
+    matched_files = [directory_file for directory_file in directory.iterdir()
+                     if
+                     directory_file.name.startswith(
+                         prefix) and directory_file.suffix == extension]
     # Empty list should return False
     return bool(matched_files)
 
-def main():
+
+class DataTable(object):
 
-    #value = "test_value"
-    #name = "test_name"
-    #print '{0} other {1} more{0}'.format(value, name )
-    #print '{0} is not a valid {1}. It may not contain a tab.'.format( value, name )
+    def __init__(self,
+                 index_path: Path,
+                 data_table_name: str,
+                 indexes_properties_file: Path,
+                 name: str = None,
+                 dbkey: str = None,
+                 value: str = None,
+                 extra_columns: dict = None
+                 ):
+        self.index_path = index_path
+        self.data_table_name = data_table_name
+        self.name = name if name else str(self.index_path.with_suffix(
+            '').name)
+        self.value = value if value else self.name
+        self.dbkey = dbkey if dbkey else self.value
+        self.extra_columns = extra_columns if extra_columns is not None else {}
+        self.indexes_properties_file = indexes_properties_file
+
+        self.check_params()
+
+        self.index_properties = self.get_index_properties()
+
+        self.check_index_file_presence()
+
+    def check_params(self):
+
+        check_tab('name', self.name)
+        check_tab('index_path', str(self.index_path.absolute().name))
+        check_tab('value', self.value)
+        check_tab('dbkey', self.dbkey)
+        self.check_extra_columns()
 
-    #Parse Command Line
-    parser = argparse.ArgumentParser()
-    parser.add_argument( '--value', action='store', type=str, default=None, help='value' )
-    parser.add_argument( '--dbkey', action='store', type=str, default=None, help='dbkey' )
-    parser.add_argument( '--name',  action='store', type=str, default=None, help='name' )
-    parser.add_argument( '--path', action='store', type=str, default=None, help='path' )
-    parser.add_argument( '--data_table_name', action='store', type=str, default=None, help='path' )
-    parser.add_argument( '--json_output_file', action='store', type=str, default=None, help='path' )
-    options = parser.parse_args()
+    def check_extra_columns(self):
+        index_properties = self.get_index_properties()
+        index_extra_columns = set(index_properties.get("extra_columns", []))
+        given_extra_columns = self.extra_columns.keys()
+        if index_extra_columns != given_extra_columns:
+            if len(index_extra_columns) > 0:
+                raise ValueError(
+                    "Values for the following columns should be "
+                    "supplied: {0}.".format(
+                        str(index_extra_columns).strip("{}")))
+            if len(index_extra_columns) == 0:
+                raise ValueError(
+                    "The table '{0}' does not have extra columns".format(
+                        self.data_table_name))
+        for key, value in self.extra_columns.items():
+            check_tab(key, value)
 
-    path = check_param("path", options.path)
-    basename = os.path.basename(path)
-    filename = os.path.splitext(basename)[0]
-    name = check_param("name", options.name, default=filename)
-    value = check_param("value", options.value, default=name)
-    dbkey = check_param("dbkey", options.dbkey, default=value)
-    data_table_name = check_param("data_table_name", options.data_table_name)
-    json_output_file = check_param("json_output_file", options.json_output_file, check_tab=False)
+    def get_index_properties(self) -> dict:
+        with self.indexes_properties_file.open('r') as properties_file:
+            indexes = yaml.safe_load(properties_file)
+        index_properties = indexes.get(self.data_table_name)
+        if index_properties is None:
+            raise ValueError(
+                "'{0}' not a supported table name".format(
+                    self.data_table_name))
+        return indexes_schema().validate(index_properties)
+
+    def check_index_file_presence(self):
+        index_name = self.index_properties.get('name')
+        if index_name is None:
+            raise NotImplementedError(
+                "Property 'name' not defined for '{0}',"
+                " please contact the developers to correct the mistake.")
+        index_extensions = self.index_properties.get('extensions', [''])
+
+        # Sometimes an index path is a prefix.
+        # For example, with BWA. 'reference.fa' is the index.
+        # But the actual index files are
+        # 'reference.fa.amb', 'reference.fa.ann' etc.
 
-    # Check if file or prefix exists
-    indexes = yaml.load(file(os.path.join(os.path.dirname(__file__), 'indexes.yml')))
-    index_dict = indexes.get(data_table_name,{})
-    index_name = index_dict.get('name','index')
-    index_extensions = index_dict.get('extensions', [''])
-    no_prefix = index_dict.get('no_prefix', False)
-    if not no_prefix:
-        dirname = os.path.dirname(path)
-        prefix = basename
-        for extension in index_extensions:
-            if not prefix_plus_extension_exists(dirname,prefix,extension):
-                raise Exception( 'Unable to find files with prefix "{0}" and extension "{1}" in {2}. Is this a valid {3}?'.format( prefix, extension, dirname, index_name ) )
+        # If the index is not a prefix,
+        # the index file is taken to be the path itself.
+        index_is_a_prefix = self.index_properties.get('prefix', True)
+        prefix_strip_extension = self.index_properties.get(
+            'prefix_strip_extension', False)
+        if index_is_a_prefix:
+            if prefix_strip_extension:
+                prefix = str(self.index_path.with_suffix("").name)
+            else:
+                prefix = str(self.index_path.name)
+            for extension in index_extensions:
+                if not prefix_plus_extension_exists(self.index_path.parent,
+                                                    prefix, extension):
+                    raise FileNotFoundError(
+                        "Unable to find files with prefix '{0}' "
+                        "and extension '{1}' in {2}. Is this a valid {3}?"
+                        .format(
+                            prefix,
+                            extension,
+                            str(self.index_path.parent),
+                            index_name))
+        elif self.index_properties.get('folder') is not None:
+            for file in self.index_properties.get('folder'):
+                if not (self.index_path / Path(file)).exists():
+                    raise FileNotFoundError(
+                        "A file named '{0}' was not found in '{1}'".format(
+                            file, str(self.index_path)))
+        elif not self.index_path.exists() and not self.index_path.is_dir():
+            raise FileNotFoundError(
+                'Unable to find path {0}.'.format(self.index_path))
+        elif self.index_path.is_dir() and self.index_properties.get(
+                'folder') is None:
+            raise IsADirectoryError(
+                '{0} is a directory not a file'.format(self.index_path))
+        elif self.index_path.exists():
+            pass
+        else:
+            raise NotImplementedError("This condition was not expected "
+                                      "and should not be reached. Please "
+                                      "contact the developers.")
+
+    @property
+    def data_manager_dict(self) -> dict:
+        data_table_entry = dict(value=self.value, dbkey=self.dbkey,
+                                name=self.name,
+                                path=str(self.index_path),
+                                **self.extra_columns)
+        data_manager_dict = dict(data_tables=dict())
+        data_manager_dict["data_tables"][
+            self.data_table_name] = [data_table_entry]
+        return data_manager_dict
+
+    @property
+    def data_manager_json(self) -> str:
+        return json.dumps(self.data_manager_dict)
+
+
+def main():
+    options = argument_parser().parse_args()
+
+    if options.json_output_file.exists():
+        pass  # Do not raise error.
+
+    if options.extra_columns is None:
+        extra_columns = dict()
     else:
-        if not os.path.exists(path):
-            raise Exception( 'Unable to find path {0}.'.format( path ) )
-
-    if os.path.exists(json_output_file):
-        params = json.loads( open( json_output_file ).read() )
-        print "params", params
-    else:
-        params = {}
+        try:
+            extra_columns = yaml.safe_load(options.extra_columns)
+        except yaml.parser.ParserError as e:
+            raise yaml.parser.ParserError(
+                "Invalid yaml string for --extra_indexes. \nError {0}".format(
+                    e))
 
-    data_manager_dict = {}
-    data_table_entry = dict( value=value, dbkey=dbkey, name=name, path=path )
-    _add_data_table_entry( data_manager_dict, data_table_name, data_table_entry )
+    index_properties_file = Path(__file__).parent / Path("indexes.yml")
+    data_table = DataTable(index_path=options.path,
+                           data_table_name=options.data_table_name,
+                           name=options.name,
+                           value=options.value,
+                           dbkey=options.dbkey,
+                           indexes_properties_file=index_properties_file,
+                           extra_columns=extra_columns)
 
-    #save info to json file
-    with open( json_output_file, 'wb' ) as output_file:
-        output_file.write( json.dumps( data_manager_dict ) )
-        output_file.write( "\n" )
+    # save info to json file
+    with options.json_output_file.open('w') as output_file:
+        output_file.write(data_table.data_manager_json)
+
 
 if __name__ == "__main__":
     main()