diff search_model_validation.py @ 31:af0523c606a7 draft

"planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/tools/sklearn commit 5b2ac730ec6d3b762faa9034eddd19ad1b347476"
author bgruening
date Mon, 16 Dec 2019 05:42:39 -0500
parents 47d4baa183b2
children 19d6c2745d34
line wrap: on
line diff
--- a/search_model_validation.py	Thu Nov 07 05:45:03 2019 -0500
+++ b/search_model_validation.py	Mon Dec 16 05:42:39 2019 -0500
@@ -4,41 +4,35 @@
 import joblib
 import json
 import numpy as np
+import os
 import pandas as pd
 import pickle
 import skrebate
-import sklearn
 import sys
-import xgboost
 import warnings
-from imblearn import under_sampling, over_sampling, combine
 from scipy.io import mmread
-from mlxtend import classifier, regressor
-from sklearn.base import clone
-from sklearn import (cluster, compose, decomposition, ensemble,
-                     feature_extraction, feature_selection,
-                     gaussian_process, kernel_approximation, metrics,
-                     model_selection, naive_bayes, neighbors,
-                     pipeline, preprocessing, svm, linear_model,
-                     tree, discriminant_analysis)
+from sklearn import (cluster, decomposition, feature_selection,
+                     kernel_approximation, model_selection, preprocessing)
 from sklearn.exceptions import FitFailedWarning
 from sklearn.model_selection._validation import _score, cross_validate
 from sklearn.model_selection import _search, _validation
+from sklearn.pipeline import Pipeline
 
 from galaxy_ml.utils import (SafeEval, get_cv, get_scoring, load_model,
-                             read_columns, try_get_attr, get_module)
+                             read_columns, try_get_attr, get_module,
+                             clean_params, get_main_estimator)
 
 
 _fit_and_score = try_get_attr('galaxy_ml.model_validations', '_fit_and_score')
 setattr(_search, '_fit_and_score', _fit_and_score)
 setattr(_validation, '_fit_and_score', _fit_and_score)
 
-N_JOBS = int(__import__('os').environ.get('GALAXY_SLOTS', 1))
-CACHE_DIR = './cached'
+N_JOBS = int(os.environ.get('GALAXY_SLOTS', 1))
+# handle  disk cache
+CACHE_DIR = os.path.join(os.getcwd(), 'cached')
+del os
 NON_SEARCHABLE = ('n_jobs', 'pre_dispatch', 'memory', '_path',
                   'nthread', 'callbacks')
-ALLOWED_CALLBACKS = ('EarlyStopping', 'TerminateOnNaN', 'ReduceLROnPlateau',
-                     'CSVLogger', 'None')
 
 
 def _eval_search_params(params_builder):
@@ -164,74 +158,40 @@
     return search_params
 
 
-def main(inputs, infile_estimator, infile1, infile2,
-         outfile_result, outfile_object=None,
-         outfile_weights=None, groups=None,
-         ref_seq=None, intervals=None, targets=None,
-         fasta_path=None):
-    """
-    Parameter
-    ---------
-    inputs : str
-        File path to galaxy tool parameter
+def _handle_X_y(estimator, params, infile1, infile2, loaded_df={},
+                ref_seq=None, intervals=None, targets=None,
+                fasta_path=None):
+    """read inputs
 
-    infile_estimator : str
-        File path to estimator
-
+    Params
+    -------
+    estimator : estimator object
+    params : dict
+        Galaxy tool parameter inputs
     infile1 : str
         File path to dataset containing features
-
     infile2 : str
         File path to dataset containing target values
-
-    outfile_result : str
-        File path to save the results, either cv_results or test result
-
-    outfile_object : str, optional
-        File path to save searchCV object
-
-    outfile_weights : str, optional
-        File path to save model weights
-
-    groups : str
-        File path to dataset containing groups labels
-
+    loaded_df : dict
+        Contains loaded DataFrame objects with file path as keys
     ref_seq : str
         File path to dataset containing genome sequence file
-
-    intervals : str
+    interval : str
         File path to dataset containing interval file
-
     targets : str
         File path to dataset compressed target bed file
-
     fasta_path : str
         File path to dataset containing fasta file
-    """
-    warnings.simplefilter('ignore')
 
-    with open(inputs, 'r') as param_handler:
-        params = json.load(param_handler)
-
-    # conflict param checker
-    if params['outer_split']['split_mode'] == 'nested_cv' \
-            and params['save'] != 'nope':
-        raise ValueError("Save best estimator is not possible for nested CV!")
 
-    if not (params['search_schemes']['options']['refit']) \
-            and params['save'] != 'nope':
-        raise ValueError("Save best estimator is not possible when refit "
-                         "is False!")
-
-    params_builder = params['search_schemes']['search_params_builder']
-
-    with open(infile_estimator, 'rb') as estimator_handler:
-        estimator = load_model(estimator_handler)
+    Returns
+    -------
+    estimator : estimator object after setting new attributes
+    X : numpy array
+    y : numpy array
+    """
     estimator_params = estimator.get_params()
 
-    # store read dataframe object
-    loaded_df = {}
-
     input_type = params['input_options']['selected_input']
     # tabular input
     if input_type == 'tabular':
@@ -245,6 +205,10 @@
             c = None
 
         df_key = infile1 + repr(header)
+
+        if df_key in loaded_df:
+            infile1 = loaded_df[df_key]
+
         df = pd.read_csv(infile1, sep='\t', header=header,
                          parse_dates=True)
         loaded_df[df_key] = df
@@ -317,6 +281,196 @@
         y = None
     # end y
 
+    return estimator, X, y
+
+
+def _do_outer_cv(searcher, X, y, outer_cv, scoring, error_score='raise',
+                 outfile=None):
+    """Do outer cross-validation for nested CV
+
+    Parameters
+    ----------
+    searcher : object
+        SearchCV object
+    X : numpy array
+        Containing features
+    y : numpy array
+        Target values or labels
+    outer_cv : int or CV splitter
+        Control the cv splitting
+    scoring : object
+        Scorer
+    error_score: str, float or numpy float
+        Whether to raise fit error or return an value
+    outfile : str
+        File path to store the restuls
+    """
+    if error_score == 'raise':
+        rval = cross_validate(
+            searcher, X, y, scoring=scoring,
+            cv=outer_cv, n_jobs=N_JOBS, verbose=0,
+            error_score=error_score)
+    else:
+        warnings.simplefilter('always', FitFailedWarning)
+        with warnings.catch_warnings(record=True) as w:
+            try:
+                rval = cross_validate(
+                    searcher, X, y,
+                    scoring=scoring,
+                    cv=outer_cv, n_jobs=N_JOBS,
+                    verbose=0,
+                    error_score=error_score)
+            except ValueError:
+                pass
+            for warning in w:
+                print(repr(warning.message))
+
+    keys = list(rval.keys())
+    for k in keys:
+        if k.startswith('test'):
+            rval['mean_' + k] = np.mean(rval[k])
+            rval['std_' + k] = np.std(rval[k])
+        if k.endswith('time'):
+            rval.pop(k)
+    rval = pd.DataFrame(rval)
+    rval = rval[sorted(rval.columns)]
+    rval.to_csv(path_or_buf=outfile, sep='\t', header=True, index=False)
+
+
+def _do_train_test_split_val(searcher, X, y, params, error_score='raise',
+                             primary_scoring=None, groups=None,
+                             outfile=None):
+    """ do train test split, searchCV validates on the train and then use
+    the best_estimator_ to evaluate on the test
+
+    Returns
+    --------
+    Fitted SearchCV object
+    """
+    train_test_split = try_get_attr(
+        'galaxy_ml.model_validations', 'train_test_split')
+    split_options = params['outer_split']
+
+    # splits
+    if split_options['shuffle'] == 'stratified':
+        split_options['labels'] = y
+        X, X_test, y, y_test = train_test_split(X, y, **split_options)
+    elif split_options['shuffle'] == 'group':
+        if groups is None:
+            raise ValueError("No group based CV option was choosen for "
+                             "group shuffle!")
+        split_options['labels'] = groups
+        if y is None:
+            X, X_test, groups, _ =\
+                train_test_split(X, groups, **split_options)
+        else:
+            X, X_test, y, y_test, groups, _ =\
+                train_test_split(X, y, groups, **split_options)
+    else:
+        if split_options['shuffle'] == 'None':
+            split_options['shuffle'] = None
+        X, X_test, y, y_test =\
+            train_test_split(X, y, **split_options)
+
+    if error_score == 'raise':
+        searcher.fit(X, y, groups=groups)
+    else:
+        warnings.simplefilter('always', FitFailedWarning)
+        with warnings.catch_warnings(record=True) as w:
+            try:
+                searcher.fit(X, y, groups=groups)
+            except ValueError:
+                pass
+            for warning in w:
+                print(repr(warning.message))
+
+    scorer_ = searcher.scorer_
+    if isinstance(scorer_, collections.Mapping):
+        is_multimetric = True
+    else:
+        is_multimetric = False
+
+    best_estimator_ = getattr(searcher, 'best_estimator_')
+
+    # TODO Solve deep learning models in pipeline
+    if best_estimator_.__class__.__name__ == 'KerasGBatchClassifier':
+        test_score = best_estimator_.evaluate(
+            X_test, scorer=scorer_, is_multimetric=is_multimetric)
+    else:
+        test_score = _score(best_estimator_, X_test,
+                            y_test, scorer_,
+                            is_multimetric=is_multimetric)
+
+    if not is_multimetric:
+        test_score = {primary_scoring: test_score}
+    for key, value in test_score.items():
+        test_score[key] = [value]
+    result_df = pd.DataFrame(test_score)
+    result_df.to_csv(path_or_buf=outfile, sep='\t', header=True,
+                     index=False)
+
+    return searcher
+
+
+def main(inputs, infile_estimator, infile1, infile2,
+         outfile_result, outfile_object=None,
+         outfile_weights=None, groups=None,
+         ref_seq=None, intervals=None, targets=None,
+         fasta_path=None):
+    """
+    Parameter
+    ---------
+    inputs : str
+        File path to galaxy tool parameter
+
+    infile_estimator : str
+        File path to estimator
+
+    infile1 : str
+        File path to dataset containing features
+
+    infile2 : str
+        File path to dataset containing target values
+
+    outfile_result : str
+        File path to save the results, either cv_results or test result
+
+    outfile_object : str, optional
+        File path to save searchCV object
+
+    outfile_weights : str, optional
+        File path to save model weights
+
+    groups : str
+        File path to dataset containing groups labels
+
+    ref_seq : str
+        File path to dataset containing genome sequence file
+
+    intervals : str
+        File path to dataset containing interval file
+
+    targets : str
+        File path to dataset compressed target bed file
+
+    fasta_path : str
+        File path to dataset containing fasta file
+    """
+    warnings.simplefilter('ignore')
+
+    # store read dataframe object
+    loaded_df = {}
+
+    with open(inputs, 'r') as param_handler:
+        params = json.load(param_handler)
+
+    # Override the refit parameter
+    params['search_schemes']['options']['refit'] = True \
+        if params['save'] != 'nope' else False
+
+    with open(infile_estimator, 'rb') as estimator_handler:
+        estimator = load_model(estimator_handler)
+
     optimizer = params['search_schemes']['selected_search_scheme']
     optimizer = getattr(model_selection, optimizer)
 
@@ -337,8 +491,10 @@
             c = None
 
         df_key = groups + repr(header)
-        if df_key in loaded_df:
-            groups = loaded_df[df_key]
+
+        groups = pd.read_csv(groups, sep='\t', header=header,
+                             parse_dates=True)
+        loaded_df[df_key] = groups
 
         groups = read_columns(
                 groups,
@@ -352,7 +508,6 @@
 
     splitter, groups = get_cv(options.pop('cv_selector'))
     options['cv'] = splitter
-    options['n_jobs'] = N_JOBS
     primary_scoring = options['scoring']['primary_scoring']
     options['scoring'] = get_scoring(options['scoring'])
     if options['error_score']:
@@ -364,55 +519,56 @@
     if 'pre_dispatch' in options and options['pre_dispatch'] == '':
         options['pre_dispatch'] = None
 
-    # del loaded_df
-    del loaded_df
+    params_builder = params['search_schemes']['search_params_builder']
+    param_grid = _eval_search_params(params_builder)
+
+    estimator = clean_params(estimator)
 
-    # handle memory
-    memory = joblib.Memory(location=CACHE_DIR, verbose=0)
+    # save the SearchCV object without fit
+    if params['save'] == 'save_no_fit':
+        searcher = optimizer(estimator, param_grid, **options)
+        print(searcher)
+        with open(outfile_object, 'wb') as output_handler:
+            pickle.dump(searcher, output_handler,
+                        pickle.HIGHEST_PROTOCOL)
+        return 0
+
+    # read inputs and loads new attributes, like paths
+    estimator, X, y = _handle_X_y(estimator, params, infile1, infile2,
+                                  loaded_df=loaded_df, ref_seq=ref_seq,
+                                  intervals=intervals, targets=targets,
+                                  fasta_path=fasta_path)
+
     # cache iraps_core fits could increase search speed significantly
-    if estimator.__class__.__name__ == 'IRAPSClassifier':
-        estimator.set_params(memory=memory)
-    else:
-        # For iraps buried in pipeline
-        for p, v in estimator_params.items():
-            if p.endswith('memory'):
-                # for case of `__irapsclassifier__memory`
-                if len(p) > 8 and p[:-8].endswith('irapsclassifier'):
-                    # cache iraps_core fits could increase search
-                    # speed significantly
-                    new_params = {p: memory}
-                    estimator.set_params(**new_params)
-                # security reason, we don't want memory being
-                # modified unexpectedly
-                elif v:
-                    new_params = {p, None}
-                    estimator.set_params(**new_params)
-            # For now, 1 CPU is suggested for iprasclassifier
-            elif p.endswith('n_jobs'):
-                new_params = {p: 1}
-                estimator.set_params(**new_params)
-            # for security reason, types of callbacks are limited
-            elif p.endswith('callbacks'):
-                for cb in v:
-                    cb_type = cb['callback_selection']['callback_type']
-                    if cb_type not in ALLOWED_CALLBACKS:
-                        raise ValueError(
-                            "Prohibited callback type: %s!" % cb_type)
+    memory = joblib.Memory(location=CACHE_DIR, verbose=0)
+    main_est = get_main_estimator(estimator)
+    if main_est.__class__.__name__ == 'IRAPSClassifier':
+        main_est.set_params(memory=memory)
 
-    param_grid = _eval_search_params(params_builder)
     searcher = optimizer(estimator, param_grid, **options)
 
-    # do nested split
     split_mode = params['outer_split'].pop('split_mode')
-    # nested CV, outer cv using cross_validate
+
     if split_mode == 'nested_cv':
+        # make sure refit is choosen
+        # this could be True for sklearn models, but not the case for
+        # deep learning models
+        if not options['refit'] and \
+                not all(hasattr(estimator, attr)
+                        for attr in ('config', 'model_type')):
+            warnings.warn("Refit is change to `True` for nested validation!")
+            setattr(searcher, 'refit', True)
+
         outer_cv, _ = get_cv(params['outer_split']['cv_selector'])
-
+        # nested CV, outer cv using cross_validate
         if options['error_score'] == 'raise':
             rval = cross_validate(
                 searcher, X, y, scoring=options['scoring'],
-                cv=outer_cv, n_jobs=N_JOBS, verbose=0,
-                error_score=options['error_score'])
+                cv=outer_cv, n_jobs=N_JOBS,
+                verbose=options['verbose'],
+                return_estimator=(params['save'] == 'save_estimator'),
+                error_score=options['error_score'],
+                return_train_score=True)
         else:
             warnings.simplefilter('always', FitFailedWarning)
             with warnings.catch_warnings(record=True) as w:
@@ -421,13 +577,38 @@
                         searcher, X, y,
                         scoring=options['scoring'],
                         cv=outer_cv, n_jobs=N_JOBS,
-                        verbose=0,
-                        error_score=options['error_score'])
+                        verbose=options['verbose'],
+                        return_estimator=(params['save'] == 'save_estimator'),
+                        error_score=options['error_score'],
+                        return_train_score=True)
                 except ValueError:
                     pass
                 for warning in w:
                     print(repr(warning.message))
 
+        fitted_searchers = rval.pop('estimator', [])
+        if fitted_searchers:
+            import os
+            pwd = os.getcwd()
+            save_dir = os.path.join(pwd, 'cv_results_in_folds')
+            try:
+                os.mkdir(save_dir)
+                for idx, obj in enumerate(fitted_searchers):
+                    target_name = 'cv_results_' + '_' + 'split%d' % idx
+                    target_path = os.path.join(pwd, save_dir, target_name)
+                    cv_results_ = getattr(obj, 'cv_results_', None)
+                    if not cv_results_:
+                        print("%s is not available" % target_name)
+                        continue
+                    cv_results_ = pd.DataFrame(cv_results_)
+                    cv_results_ = cv_results_[sorted(cv_results_.columns)]
+                    cv_results_.to_csv(target_path, sep='\t', header=True,
+                                       index=False)
+            except Exception as e:
+                print(e)
+            finally:
+                del os
+
         keys = list(rval.keys())
         for k in keys:
             if k.startswith('test'):
@@ -437,46 +618,22 @@
                 rval.pop(k)
         rval = pd.DataFrame(rval)
         rval = rval[sorted(rval.columns)]
-        rval.to_csv(path_or_buf=outfile_result, sep='\t',
-                    header=True, index=False)
-    else:
-        if split_mode == 'train_test_split':
-            train_test_split = try_get_attr(
-                'galaxy_ml.model_validations', 'train_test_split')
-            # make sure refit is choosen
-            # this could be True for sklearn models, but not the case for
-            # deep learning models
-            if not options['refit'] and \
-                    not all(hasattr(estimator, attr)
-                            for attr in ('config', 'model_type')):
-                warnings.warn("Refit is change to `True` for nested "
-                              "validation!")
-                setattr(searcher, 'refit', True)
-            split_options = params['outer_split']
+        rval.to_csv(path_or_buf=outfile_result, sep='\t', header=True,
+                    index=False)
+
+        return 0
 
-            # splits
-            if split_options['shuffle'] == 'stratified':
-                split_options['labels'] = y
-                X, X_test, y, y_test = train_test_split(X, y, **split_options)
-            elif split_options['shuffle'] == 'group':
-                if groups is None:
-                    raise ValueError("No group based CV option was "
-                                     "choosen for group shuffle!")
-                split_options['labels'] = groups
-                if y is None:
-                    X, X_test, groups, _ =\
-                        train_test_split(X, groups, **split_options)
-                else:
-                    X, X_test, y, y_test, groups, _ =\
-                        train_test_split(X, y, groups, **split_options)
-            else:
-                if split_options['shuffle'] == 'None':
-                    split_options['shuffle'] = None
-                X, X_test, y, y_test =\
-                    train_test_split(X, y, **split_options)
-        # end train_test_split
+        # deprecate train test split mode
+        """searcher = _do_train_test_split_val(
+            searcher, X, y, params,
+            primary_scoring=primary_scoring,
+            error_score=options['error_score'],
+            groups=groups,
+            outfile=outfile_result)"""
 
-        # shared by both train_test_split and non-split
+    # no outer split
+    else:
+        searcher.set_params(n_jobs=N_JOBS)
         if options['error_score'] == 'raise':
             searcher.fit(X, y, groups=groups)
         else:
@@ -489,47 +646,14 @@
                 for warning in w:
                     print(repr(warning.message))
 
-        # no outer split
-        if split_mode == 'no':
-            # save results
-            cv_results = pd.DataFrame(searcher.cv_results_)
-            cv_results = cv_results[sorted(cv_results.columns)]
-            cv_results.to_csv(path_or_buf=outfile_result, sep='\t',
-                              header=True, index=False)
-
-        # train_test_split, output test result using best_estimator_
-        # or rebuild the trained estimator using weights if applicable.
-        else:
-            scorer_ = searcher.scorer_
-            if isinstance(scorer_, collections.Mapping):
-                is_multimetric = True
-            else:
-                is_multimetric = False
-
-            best_estimator_ = getattr(searcher, 'best_estimator_', None)
-            if not best_estimator_:
-                raise ValueError("GridSearchCV object has no "
-                                 "`best_estimator_` when `refit`=False!")
-
-            if best_estimator_.__class__.__name__ == 'KerasGBatchClassifier' \
-                    and hasattr(estimator.data_batch_generator, 'target_path'):
-                test_score = best_estimator_.evaluate(
-                    X_test, scorer=scorer_, is_multimetric=is_multimetric)
-            else:
-                test_score = _score(best_estimator_, X_test,
-                                    y_test, scorer_,
-                                    is_multimetric=is_multimetric)
-
-            if not is_multimetric:
-                test_score = {primary_scoring: test_score}
-            for key, value in test_score.items():
-                test_score[key] = [value]
-            result_df = pd.DataFrame(test_score)
-            result_df.to_csv(path_or_buf=outfile_result, sep='\t',
-                             header=True, index=False)
+        cv_results = pd.DataFrame(searcher.cv_results_)
+        cv_results = cv_results[sorted(cv_results.columns)]
+        cv_results.to_csv(path_or_buf=outfile_result, sep='\t',
+                          header=True, index=False)
 
     memory.clear(warn=False)
 
+    # output best estimator, and weights if applicable
     if outfile_object:
         best_estimator_ = getattr(searcher, 'best_estimator_', None)
         if not best_estimator_:
@@ -538,9 +662,10 @@
                           "nested gridsearch or `refit` is False!")
             return
 
-        main_est = best_estimator_
-        if isinstance(best_estimator_, pipeline.Pipeline):
-            main_est = best_estimator_.steps[-1][-1]
+        # clean prams
+        best_estimator_ = clean_params(best_estimator_)
+
+        main_est = get_main_estimator(best_estimator_)
 
         if hasattr(main_est, 'model_') \
                 and hasattr(main_est, 'save_weights'):
@@ -554,6 +679,7 @@
                 del main_est.data_generator_
 
         with open(outfile_object, 'wb') as output_handler:
+            print("Best estimator is saved: %s " % repr(best_estimator_))
             pickle.dump(best_estimator_, output_handler,
                         pickle.HIGHEST_PROTOCOL)