Mercurial > repos > bgruening > create_tool_recommendation_model
diff utils.py @ 3:5b3c08710e47 draft
"planemo upload for repository https://github.com/bgruening/galaxytools/tree/recommendation_training/tools/tool_recommendation_model commit c635df659fe1835679438589ded43136b0e515c6"
author | bgruening |
---|---|
date | Sat, 09 May 2020 05:38:23 -0400 |
parents | 76251d1ccdcc |
children | afec8c595124 |
line wrap: on
line diff
--- a/utils.py Fri Oct 11 18:24:54 2019 -0400 +++ b/utils.py Sat May 09 05:38:23 2020 -0400 @@ -2,6 +2,7 @@ import numpy as np import json import h5py +import random from keras import backend as K @@ -15,23 +16,6 @@ return file_content -def write_file(file_path, content): - """ - Write a file - """ - remove_file(file_path) - with open(file_path, "w") as json_file: - json_file.write(json.dumps(content)) - - -def save_processed_workflows(file_path, unique_paths): - workflow_paths_unique = "" - for path in unique_paths: - workflow_paths_unique += path + "\n" - with open(file_path, "w") as workflows_file: - workflows_file.write(workflow_paths_unique) - - def format_tool_id(tool_link): """ Extract tool id from tool link @@ -63,17 +47,13 @@ hf_file.close() -def remove_file(file_path): - if os.path.exists(file_path): - os.remove(file_path) - - def weighted_loss(class_weights): """ Create a weighted loss function. Penalise the misclassification of classes more with the higher usage """ weight_values = list(class_weights.values()) + weight_values.extend(weight_values) def weighted_binary_crossentropy(y_true, y_pred): # add another dimension to compute dot product @@ -82,46 +62,101 @@ return weighted_binary_crossentropy -def compute_precision(model, x, y, reverse_data_dictionary, next_compatible_tools, usage_scores, actual_classes_pos, topk): +def balanced_sample_generator(train_data, train_labels, batch_size, l_tool_tr_samples): + while True: + dimension = train_data.shape[1] + n_classes = train_labels.shape[1] + tool_ids = list(l_tool_tr_samples.keys()) + generator_batch_data = np.zeros([batch_size, dimension]) + generator_batch_labels = np.zeros([batch_size, n_classes]) + for i in range(batch_size): + random_toolid_index = random.sample(range(0, len(tool_ids)), 1)[0] + random_toolid = tool_ids[random_toolid_index] + sample_indices = l_tool_tr_samples[str(random_toolid)] + random_index = random.sample(range(0, len(sample_indices)), 1)[0] + random_tr_index = sample_indices[random_index] + generator_batch_data[i] = train_data[random_tr_index] + generator_batch_labels[i] = train_labels[random_tr_index] + yield generator_batch_data, generator_batch_labels + + +def compute_precision(model, x, y, reverse_data_dictionary, usage_scores, actual_classes_pos, topk, standard_conn, last_tool_id, lowest_tool_ids): """ Compute absolute and compatible precision """ - absolute_precision = 0.0 + pred_t_name = "" + top_precision = 0.0 + mean_usage = 0.0 + usage_wt_score = list() + pub_precision = 0.0 + lowest_pub_prec = 0.0 + lowest_norm_prec = 0.0 + pub_tools = list() + actual_next_tool_names = list() test_sample = np.reshape(x, (1, len(x))) # predict next tools for a test path prediction = model.predict(test_sample, verbose=0) + # divide the predicted vector into two halves - one for published and + # another for normal workflows nw_dimension = prediction.shape[1] - - # remove the 0th position as there is no tool at this index - prediction = np.reshape(prediction, (nw_dimension,)) + half_len = int(nw_dimension / 2) - prediction_pos = np.argsort(prediction, axis=-1) - topk_prediction_pos = prediction_pos[-topk:] + # predict tools + prediction = np.reshape(prediction, (nw_dimension,)) + # get predictions of tools from published workflows + standard_pred = prediction[:half_len] + # get predictions of tools from normal workflows + normal_pred = prediction[half_len:] - # remove the wrong tool position from the predicted list of tool positions - topk_prediction_pos = [x for x in topk_prediction_pos if x > 0] + standard_prediction_pos = np.argsort(standard_pred, axis=-1) + standard_topk_prediction_pos = standard_prediction_pos[-topk] + + normal_prediction_pos = np.argsort(normal_pred, axis=-1) + normal_topk_prediction_pos = normal_prediction_pos[-topk] - # read tool names using reverse dictionary - actual_next_tool_names = [reverse_data_dictionary[int(tool_pos)] for tool_pos in actual_classes_pos] - top_predicted_next_tool_names = [reverse_data_dictionary[int(tool_pos)] for tool_pos in topk_prediction_pos] - - # compute the class weights of predicted tools - mean_usg_score = 0 - usg_wt_scores = list() - for t_id in topk_prediction_pos: - t_name = reverse_data_dictionary[int(t_id)] - if t_id in usage_scores and t_name in actual_next_tool_names: - usg_wt_scores.append(np.log(usage_scores[t_id] + 1.0)) - if len(usg_wt_scores) > 0: - mean_usg_score = np.sum(usg_wt_scores) / float(topk) - false_positives = [tool_name for tool_name in top_predicted_next_tool_names if tool_name not in actual_next_tool_names] - absolute_precision = 1 - (len(false_positives) / float(topk)) - return mean_usg_score, absolute_precision + # get true tools names + for a_t_pos in actual_classes_pos: + if a_t_pos > half_len: + t_name = reverse_data_dictionary[int(a_t_pos - half_len)] + else: + t_name = reverse_data_dictionary[int(a_t_pos)] + actual_next_tool_names.append(t_name) + last_tool_name = reverse_data_dictionary[x[-1]] + # compute scores for published recommendations + if standard_topk_prediction_pos in reverse_data_dictionary: + pred_t_name = reverse_data_dictionary[int(standard_topk_prediction_pos)] + if last_tool_name in standard_conn: + pub_tools = standard_conn[last_tool_name] + if pred_t_name in pub_tools: + pub_precision = 1.0 + if last_tool_id in lowest_tool_ids: + lowest_pub_prec = 1.0 + if standard_topk_prediction_pos in usage_scores: + usage_wt_score.append(np.log(usage_scores[standard_topk_prediction_pos] + 1.0)) + # compute scores for normal recommendations + if normal_topk_prediction_pos in reverse_data_dictionary: + pred_t_name = reverse_data_dictionary[int(normal_topk_prediction_pos)] + if pred_t_name in actual_next_tool_names: + if normal_topk_prediction_pos in usage_scores: + usage_wt_score.append(np.log(usage_scores[normal_topk_prediction_pos] + 1.0)) + top_precision = 1.0 + if last_tool_id in lowest_tool_ids: + lowest_norm_prec = 1.0 + if len(usage_wt_score) > 0: + mean_usage = np.mean(usage_wt_score) + return mean_usage, top_precision, pub_precision, lowest_pub_prec, lowest_norm_prec -def verify_model(model, x, y, reverse_data_dictionary, next_compatible_tools, usage_scores, topk_list=[1, 2, 3]): +def get_lowest_tools(l_tool_freq, fraction=0.25): + l_tool_freq = dict(sorted(l_tool_freq.items(), key=lambda kv: kv[1], reverse=True)) + tool_ids = list(l_tool_freq.keys()) + lowest_ids = tool_ids[-int(len(tool_ids) * fraction):] + return lowest_ids + + +def verify_model(model, x, y, reverse_data_dictionary, usage_scores, standard_conn, lowest_tool_ids, topk_list=[1, 2, 3]): """ Verify the model on test data """ @@ -130,31 +165,49 @@ size = y.shape[0] precision = np.zeros([len(y), len(topk_list)]) usage_weights = np.zeros([len(y), len(topk_list)]) + epo_pub_prec = np.zeros([len(y), len(topk_list)]) + epo_lowest_tools_pub_prec = list() + epo_lowest_tools_norm_prec = list() + # loop over all the test samples and find prediction precision for i in range(size): + lowest_pub_topk = list() + lowest_norm_topk = list() actual_classes_pos = np.where(y[i] > 0)[0] + test_sample = x[i, :] + last_tool_id = str(int(test_sample[-1])) for index, abs_topk in enumerate(topk_list): - abs_mean_usg_score, absolute_precision = compute_precision(model, x[i, :], y, reverse_data_dictionary, next_compatible_tools, usage_scores, actual_classes_pos, abs_topk) + usg_wt_score, absolute_precision, pub_prec, lowest_p_prec, lowest_n_prec = compute_precision(model, test_sample, y, reverse_data_dictionary, usage_scores, actual_classes_pos, abs_topk, standard_conn, last_tool_id, lowest_tool_ids) precision[i][index] = absolute_precision - usage_weights[i][index] = abs_mean_usg_score + usage_weights[i][index] = usg_wt_score + epo_pub_prec[i][index] = pub_prec + if last_tool_id in lowest_tool_ids: + lowest_pub_topk.append(lowest_p_prec) + lowest_norm_topk.append(lowest_n_prec) + if last_tool_id in lowest_tool_ids: + epo_lowest_tools_pub_prec.append(lowest_pub_topk) + epo_lowest_tools_norm_prec.append(lowest_norm_topk) mean_precision = np.mean(precision, axis=0) mean_usage = np.mean(usage_weights, axis=0) - return mean_precision, mean_usage + mean_pub_prec = np.mean(epo_pub_prec, axis=0) + mean_lowest_pub_prec = np.mean(epo_lowest_tools_pub_prec, axis=0) + mean_lowest_norm_prec = np.mean(epo_lowest_tools_norm_prec, axis=0) + return mean_usage, mean_precision, mean_pub_prec, mean_lowest_pub_prec, mean_lowest_norm_prec, len(epo_lowest_tools_pub_prec) -def save_model(results, data_dictionary, compatible_next_tools, trained_model_path, class_weights): +def save_model(results, data_dictionary, compatible_next_tools, trained_model_path, class_weights, standard_connections): # save files trained_model = results["model"] best_model_parameters = results["best_parameters"] model_config = trained_model.to_json() model_weights = trained_model.get_weights() - model_values = { 'data_dictionary': data_dictionary, 'model_config': model_config, 'best_parameters': best_model_parameters, 'model_weights': model_weights, "compatible_tools": compatible_next_tools, - "class_weights": class_weights + "class_weights": class_weights, + "standard_connections": standard_connections } set_trained_model(trained_model_path, model_values)