Mercurial > repos > bgruening > sklearn_ensemble
diff search_model_validation.py @ 31:af0523c606a7 draft
"planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/tools/sklearn commit 5b2ac730ec6d3b762faa9034eddd19ad1b347476"
author | bgruening |
---|---|
date | Mon, 16 Dec 2019 05:42:39 -0500 |
parents | 47d4baa183b2 |
children | 19d6c2745d34 |
line wrap: on
line diff
--- a/search_model_validation.py Thu Nov 07 05:45:03 2019 -0500 +++ b/search_model_validation.py Mon Dec 16 05:42:39 2019 -0500 @@ -4,41 +4,35 @@ import joblib import json import numpy as np +import os import pandas as pd import pickle import skrebate -import sklearn import sys -import xgboost import warnings -from imblearn import under_sampling, over_sampling, combine from scipy.io import mmread -from mlxtend import classifier, regressor -from sklearn.base import clone -from sklearn import (cluster, compose, decomposition, ensemble, - feature_extraction, feature_selection, - gaussian_process, kernel_approximation, metrics, - model_selection, naive_bayes, neighbors, - pipeline, preprocessing, svm, linear_model, - tree, discriminant_analysis) +from sklearn import (cluster, decomposition, feature_selection, + kernel_approximation, model_selection, preprocessing) from sklearn.exceptions import FitFailedWarning from sklearn.model_selection._validation import _score, cross_validate from sklearn.model_selection import _search, _validation +from sklearn.pipeline import Pipeline from galaxy_ml.utils import (SafeEval, get_cv, get_scoring, load_model, - read_columns, try_get_attr, get_module) + read_columns, try_get_attr, get_module, + clean_params, get_main_estimator) _fit_and_score = try_get_attr('galaxy_ml.model_validations', '_fit_and_score') setattr(_search, '_fit_and_score', _fit_and_score) setattr(_validation, '_fit_and_score', _fit_and_score) -N_JOBS = int(__import__('os').environ.get('GALAXY_SLOTS', 1)) -CACHE_DIR = './cached' +N_JOBS = int(os.environ.get('GALAXY_SLOTS', 1)) +# handle disk cache +CACHE_DIR = os.path.join(os.getcwd(), 'cached') +del os NON_SEARCHABLE = ('n_jobs', 'pre_dispatch', 'memory', '_path', 'nthread', 'callbacks') -ALLOWED_CALLBACKS = ('EarlyStopping', 'TerminateOnNaN', 'ReduceLROnPlateau', - 'CSVLogger', 'None') def _eval_search_params(params_builder): @@ -164,74 +158,40 @@ return search_params -def main(inputs, infile_estimator, infile1, infile2, - outfile_result, outfile_object=None, - outfile_weights=None, groups=None, - ref_seq=None, intervals=None, targets=None, - fasta_path=None): - """ - Parameter - --------- - inputs : str - File path to galaxy tool parameter +def _handle_X_y(estimator, params, infile1, infile2, loaded_df={}, + ref_seq=None, intervals=None, targets=None, + fasta_path=None): + """read inputs - infile_estimator : str - File path to estimator - + Params + ------- + estimator : estimator object + params : dict + Galaxy tool parameter inputs infile1 : str File path to dataset containing features - infile2 : str File path to dataset containing target values - - outfile_result : str - File path to save the results, either cv_results or test result - - outfile_object : str, optional - File path to save searchCV object - - outfile_weights : str, optional - File path to save model weights - - groups : str - File path to dataset containing groups labels - + loaded_df : dict + Contains loaded DataFrame objects with file path as keys ref_seq : str File path to dataset containing genome sequence file - - intervals : str + interval : str File path to dataset containing interval file - targets : str File path to dataset compressed target bed file - fasta_path : str File path to dataset containing fasta file - """ - warnings.simplefilter('ignore') - with open(inputs, 'r') as param_handler: - params = json.load(param_handler) - - # conflict param checker - if params['outer_split']['split_mode'] == 'nested_cv' \ - and params['save'] != 'nope': - raise ValueError("Save best estimator is not possible for nested CV!") - if not (params['search_schemes']['options']['refit']) \ - and params['save'] != 'nope': - raise ValueError("Save best estimator is not possible when refit " - "is False!") - - params_builder = params['search_schemes']['search_params_builder'] - - with open(infile_estimator, 'rb') as estimator_handler: - estimator = load_model(estimator_handler) + Returns + ------- + estimator : estimator object after setting new attributes + X : numpy array + y : numpy array + """ estimator_params = estimator.get_params() - # store read dataframe object - loaded_df = {} - input_type = params['input_options']['selected_input'] # tabular input if input_type == 'tabular': @@ -245,6 +205,10 @@ c = None df_key = infile1 + repr(header) + + if df_key in loaded_df: + infile1 = loaded_df[df_key] + df = pd.read_csv(infile1, sep='\t', header=header, parse_dates=True) loaded_df[df_key] = df @@ -317,6 +281,196 @@ y = None # end y + return estimator, X, y + + +def _do_outer_cv(searcher, X, y, outer_cv, scoring, error_score='raise', + outfile=None): + """Do outer cross-validation for nested CV + + Parameters + ---------- + searcher : object + SearchCV object + X : numpy array + Containing features + y : numpy array + Target values or labels + outer_cv : int or CV splitter + Control the cv splitting + scoring : object + Scorer + error_score: str, float or numpy float + Whether to raise fit error or return an value + outfile : str + File path to store the restuls + """ + if error_score == 'raise': + rval = cross_validate( + searcher, X, y, scoring=scoring, + cv=outer_cv, n_jobs=N_JOBS, verbose=0, + error_score=error_score) + else: + warnings.simplefilter('always', FitFailedWarning) + with warnings.catch_warnings(record=True) as w: + try: + rval = cross_validate( + searcher, X, y, + scoring=scoring, + cv=outer_cv, n_jobs=N_JOBS, + verbose=0, + error_score=error_score) + except ValueError: + pass + for warning in w: + print(repr(warning.message)) + + keys = list(rval.keys()) + for k in keys: + if k.startswith('test'): + rval['mean_' + k] = np.mean(rval[k]) + rval['std_' + k] = np.std(rval[k]) + if k.endswith('time'): + rval.pop(k) + rval = pd.DataFrame(rval) + rval = rval[sorted(rval.columns)] + rval.to_csv(path_or_buf=outfile, sep='\t', header=True, index=False) + + +def _do_train_test_split_val(searcher, X, y, params, error_score='raise', + primary_scoring=None, groups=None, + outfile=None): + """ do train test split, searchCV validates on the train and then use + the best_estimator_ to evaluate on the test + + Returns + -------- + Fitted SearchCV object + """ + train_test_split = try_get_attr( + 'galaxy_ml.model_validations', 'train_test_split') + split_options = params['outer_split'] + + # splits + if split_options['shuffle'] == 'stratified': + split_options['labels'] = y + X, X_test, y, y_test = train_test_split(X, y, **split_options) + elif split_options['shuffle'] == 'group': + if groups is None: + raise ValueError("No group based CV option was choosen for " + "group shuffle!") + split_options['labels'] = groups + if y is None: + X, X_test, groups, _ =\ + train_test_split(X, groups, **split_options) + else: + X, X_test, y, y_test, groups, _ =\ + train_test_split(X, y, groups, **split_options) + else: + if split_options['shuffle'] == 'None': + split_options['shuffle'] = None + X, X_test, y, y_test =\ + train_test_split(X, y, **split_options) + + if error_score == 'raise': + searcher.fit(X, y, groups=groups) + else: + warnings.simplefilter('always', FitFailedWarning) + with warnings.catch_warnings(record=True) as w: + try: + searcher.fit(X, y, groups=groups) + except ValueError: + pass + for warning in w: + print(repr(warning.message)) + + scorer_ = searcher.scorer_ + if isinstance(scorer_, collections.Mapping): + is_multimetric = True + else: + is_multimetric = False + + best_estimator_ = getattr(searcher, 'best_estimator_') + + # TODO Solve deep learning models in pipeline + if best_estimator_.__class__.__name__ == 'KerasGBatchClassifier': + test_score = best_estimator_.evaluate( + X_test, scorer=scorer_, is_multimetric=is_multimetric) + else: + test_score = _score(best_estimator_, X_test, + y_test, scorer_, + is_multimetric=is_multimetric) + + if not is_multimetric: + test_score = {primary_scoring: test_score} + for key, value in test_score.items(): + test_score[key] = [value] + result_df = pd.DataFrame(test_score) + result_df.to_csv(path_or_buf=outfile, sep='\t', header=True, + index=False) + + return searcher + + +def main(inputs, infile_estimator, infile1, infile2, + outfile_result, outfile_object=None, + outfile_weights=None, groups=None, + ref_seq=None, intervals=None, targets=None, + fasta_path=None): + """ + Parameter + --------- + inputs : str + File path to galaxy tool parameter + + infile_estimator : str + File path to estimator + + infile1 : str + File path to dataset containing features + + infile2 : str + File path to dataset containing target values + + outfile_result : str + File path to save the results, either cv_results or test result + + outfile_object : str, optional + File path to save searchCV object + + outfile_weights : str, optional + File path to save model weights + + groups : str + File path to dataset containing groups labels + + ref_seq : str + File path to dataset containing genome sequence file + + intervals : str + File path to dataset containing interval file + + targets : str + File path to dataset compressed target bed file + + fasta_path : str + File path to dataset containing fasta file + """ + warnings.simplefilter('ignore') + + # store read dataframe object + loaded_df = {} + + with open(inputs, 'r') as param_handler: + params = json.load(param_handler) + + # Override the refit parameter + params['search_schemes']['options']['refit'] = True \ + if params['save'] != 'nope' else False + + with open(infile_estimator, 'rb') as estimator_handler: + estimator = load_model(estimator_handler) + optimizer = params['search_schemes']['selected_search_scheme'] optimizer = getattr(model_selection, optimizer) @@ -337,8 +491,10 @@ c = None df_key = groups + repr(header) - if df_key in loaded_df: - groups = loaded_df[df_key] + + groups = pd.read_csv(groups, sep='\t', header=header, + parse_dates=True) + loaded_df[df_key] = groups groups = read_columns( groups, @@ -352,7 +508,6 @@ splitter, groups = get_cv(options.pop('cv_selector')) options['cv'] = splitter - options['n_jobs'] = N_JOBS primary_scoring = options['scoring']['primary_scoring'] options['scoring'] = get_scoring(options['scoring']) if options['error_score']: @@ -364,55 +519,56 @@ if 'pre_dispatch' in options and options['pre_dispatch'] == '': options['pre_dispatch'] = None - # del loaded_df - del loaded_df + params_builder = params['search_schemes']['search_params_builder'] + param_grid = _eval_search_params(params_builder) + + estimator = clean_params(estimator) - # handle memory - memory = joblib.Memory(location=CACHE_DIR, verbose=0) + # save the SearchCV object without fit + if params['save'] == 'save_no_fit': + searcher = optimizer(estimator, param_grid, **options) + print(searcher) + with open(outfile_object, 'wb') as output_handler: + pickle.dump(searcher, output_handler, + pickle.HIGHEST_PROTOCOL) + return 0 + + # read inputs and loads new attributes, like paths + estimator, X, y = _handle_X_y(estimator, params, infile1, infile2, + loaded_df=loaded_df, ref_seq=ref_seq, + intervals=intervals, targets=targets, + fasta_path=fasta_path) + # cache iraps_core fits could increase search speed significantly - if estimator.__class__.__name__ == 'IRAPSClassifier': - estimator.set_params(memory=memory) - else: - # For iraps buried in pipeline - for p, v in estimator_params.items(): - if p.endswith('memory'): - # for case of `__irapsclassifier__memory` - if len(p) > 8 and p[:-8].endswith('irapsclassifier'): - # cache iraps_core fits could increase search - # speed significantly - new_params = {p: memory} - estimator.set_params(**new_params) - # security reason, we don't want memory being - # modified unexpectedly - elif v: - new_params = {p, None} - estimator.set_params(**new_params) - # For now, 1 CPU is suggested for iprasclassifier - elif p.endswith('n_jobs'): - new_params = {p: 1} - estimator.set_params(**new_params) - # for security reason, types of callbacks are limited - elif p.endswith('callbacks'): - for cb in v: - cb_type = cb['callback_selection']['callback_type'] - if cb_type not in ALLOWED_CALLBACKS: - raise ValueError( - "Prohibited callback type: %s!" % cb_type) + memory = joblib.Memory(location=CACHE_DIR, verbose=0) + main_est = get_main_estimator(estimator) + if main_est.__class__.__name__ == 'IRAPSClassifier': + main_est.set_params(memory=memory) - param_grid = _eval_search_params(params_builder) searcher = optimizer(estimator, param_grid, **options) - # do nested split split_mode = params['outer_split'].pop('split_mode') - # nested CV, outer cv using cross_validate + if split_mode == 'nested_cv': + # make sure refit is choosen + # this could be True for sklearn models, but not the case for + # deep learning models + if not options['refit'] and \ + not all(hasattr(estimator, attr) + for attr in ('config', 'model_type')): + warnings.warn("Refit is change to `True` for nested validation!") + setattr(searcher, 'refit', True) + outer_cv, _ = get_cv(params['outer_split']['cv_selector']) - + # nested CV, outer cv using cross_validate if options['error_score'] == 'raise': rval = cross_validate( searcher, X, y, scoring=options['scoring'], - cv=outer_cv, n_jobs=N_JOBS, verbose=0, - error_score=options['error_score']) + cv=outer_cv, n_jobs=N_JOBS, + verbose=options['verbose'], + return_estimator=(params['save'] == 'save_estimator'), + error_score=options['error_score'], + return_train_score=True) else: warnings.simplefilter('always', FitFailedWarning) with warnings.catch_warnings(record=True) as w: @@ -421,13 +577,38 @@ searcher, X, y, scoring=options['scoring'], cv=outer_cv, n_jobs=N_JOBS, - verbose=0, - error_score=options['error_score']) + verbose=options['verbose'], + return_estimator=(params['save'] == 'save_estimator'), + error_score=options['error_score'], + return_train_score=True) except ValueError: pass for warning in w: print(repr(warning.message)) + fitted_searchers = rval.pop('estimator', []) + if fitted_searchers: + import os + pwd = os.getcwd() + save_dir = os.path.join(pwd, 'cv_results_in_folds') + try: + os.mkdir(save_dir) + for idx, obj in enumerate(fitted_searchers): + target_name = 'cv_results_' + '_' + 'split%d' % idx + target_path = os.path.join(pwd, save_dir, target_name) + cv_results_ = getattr(obj, 'cv_results_', None) + if not cv_results_: + print("%s is not available" % target_name) + continue + cv_results_ = pd.DataFrame(cv_results_) + cv_results_ = cv_results_[sorted(cv_results_.columns)] + cv_results_.to_csv(target_path, sep='\t', header=True, + index=False) + except Exception as e: + print(e) + finally: + del os + keys = list(rval.keys()) for k in keys: if k.startswith('test'): @@ -437,46 +618,22 @@ rval.pop(k) rval = pd.DataFrame(rval) rval = rval[sorted(rval.columns)] - rval.to_csv(path_or_buf=outfile_result, sep='\t', - header=True, index=False) - else: - if split_mode == 'train_test_split': - train_test_split = try_get_attr( - 'galaxy_ml.model_validations', 'train_test_split') - # make sure refit is choosen - # this could be True for sklearn models, but not the case for - # deep learning models - if not options['refit'] and \ - not all(hasattr(estimator, attr) - for attr in ('config', 'model_type')): - warnings.warn("Refit is change to `True` for nested " - "validation!") - setattr(searcher, 'refit', True) - split_options = params['outer_split'] + rval.to_csv(path_or_buf=outfile_result, sep='\t', header=True, + index=False) + + return 0 - # splits - if split_options['shuffle'] == 'stratified': - split_options['labels'] = y - X, X_test, y, y_test = train_test_split(X, y, **split_options) - elif split_options['shuffle'] == 'group': - if groups is None: - raise ValueError("No group based CV option was " - "choosen for group shuffle!") - split_options['labels'] = groups - if y is None: - X, X_test, groups, _ =\ - train_test_split(X, groups, **split_options) - else: - X, X_test, y, y_test, groups, _ =\ - train_test_split(X, y, groups, **split_options) - else: - if split_options['shuffle'] == 'None': - split_options['shuffle'] = None - X, X_test, y, y_test =\ - train_test_split(X, y, **split_options) - # end train_test_split + # deprecate train test split mode + """searcher = _do_train_test_split_val( + searcher, X, y, params, + primary_scoring=primary_scoring, + error_score=options['error_score'], + groups=groups, + outfile=outfile_result)""" - # shared by both train_test_split and non-split + # no outer split + else: + searcher.set_params(n_jobs=N_JOBS) if options['error_score'] == 'raise': searcher.fit(X, y, groups=groups) else: @@ -489,47 +646,14 @@ for warning in w: print(repr(warning.message)) - # no outer split - if split_mode == 'no': - # save results - cv_results = pd.DataFrame(searcher.cv_results_) - cv_results = cv_results[sorted(cv_results.columns)] - cv_results.to_csv(path_or_buf=outfile_result, sep='\t', - header=True, index=False) - - # train_test_split, output test result using best_estimator_ - # or rebuild the trained estimator using weights if applicable. - else: - scorer_ = searcher.scorer_ - if isinstance(scorer_, collections.Mapping): - is_multimetric = True - else: - is_multimetric = False - - best_estimator_ = getattr(searcher, 'best_estimator_', None) - if not best_estimator_: - raise ValueError("GridSearchCV object has no " - "`best_estimator_` when `refit`=False!") - - if best_estimator_.__class__.__name__ == 'KerasGBatchClassifier' \ - and hasattr(estimator.data_batch_generator, 'target_path'): - test_score = best_estimator_.evaluate( - X_test, scorer=scorer_, is_multimetric=is_multimetric) - else: - test_score = _score(best_estimator_, X_test, - y_test, scorer_, - is_multimetric=is_multimetric) - - if not is_multimetric: - test_score = {primary_scoring: test_score} - for key, value in test_score.items(): - test_score[key] = [value] - result_df = pd.DataFrame(test_score) - result_df.to_csv(path_or_buf=outfile_result, sep='\t', - header=True, index=False) + cv_results = pd.DataFrame(searcher.cv_results_) + cv_results = cv_results[sorted(cv_results.columns)] + cv_results.to_csv(path_or_buf=outfile_result, sep='\t', + header=True, index=False) memory.clear(warn=False) + # output best estimator, and weights if applicable if outfile_object: best_estimator_ = getattr(searcher, 'best_estimator_', None) if not best_estimator_: @@ -538,9 +662,10 @@ "nested gridsearch or `refit` is False!") return - main_est = best_estimator_ - if isinstance(best_estimator_, pipeline.Pipeline): - main_est = best_estimator_.steps[-1][-1] + # clean prams + best_estimator_ = clean_params(best_estimator_) + + main_est = get_main_estimator(best_estimator_) if hasattr(main_est, 'model_') \ and hasattr(main_est, 'save_weights'): @@ -554,6 +679,7 @@ del main_est.data_generator_ with open(outfile_object, 'wb') as output_handler: + print("Best estimator is saved: %s " % repr(best_estimator_)) pickle.dump(best_estimator_, output_handler, pickle.HIGHEST_PROTOCOL)