Mercurial > repos > bgruening > create_tool_recommendation_model
diff utils.py @ 6:e94dc7945639 draft default tip
planemo upload for repository https://github.com/bgruening/galaxytools/tree/recommendation_training/tools/tool_recommendation_model commit 24bab7a797f53fe4bcc668b18ee0326625486164
author | bgruening |
---|---|
date | Sun, 16 Oct 2022 11:52:10 +0000 |
parents | 4f7e6612906b |
children |
line wrap: on
line diff
--- a/utils.py Fri May 06 09:05:18 2022 +0000 +++ b/utils.py Sun Oct 16 11:52:10 2022 +0000 @@ -1,11 +1,15 @@ import json +import os import random import h5py import numpy as np +import pandas as pd import tensorflow as tf -from numpy.random import choice -from tensorflow.keras import backend + +binary_ce = tf.keras.losses.BinaryCrossentropy() +binary_acc = tf.keras.metrics.BinaryAccuracy() +categorical_ce = tf.keras.metrics.CategoricalCrossentropy(from_logits=True) def read_file(file_path): @@ -17,6 +21,43 @@ return file_content +def write_file(file_path, content): + """ + Write a file + """ + remove_file(file_path) + with open(file_path, "w") as json_file: + json_file.write(json.dumps(content)) + + +def save_h5_data(inp, tar, filename): + hf_file = h5py.File(filename, 'w') + hf_file.create_dataset("input", data=inp) + hf_file.create_dataset("target", data=tar) + hf_file.close() + + +def get_low_freq_te_samples(te_data, te_target, tr_freq_dict): + lowest_tool_te_ids = list() + lowest_t_ids = get_lowest_tools(tr_freq_dict) + for i, te_labels in enumerate(te_target): + tools_pos = np.where(te_labels > 0)[0] + tools_pos = [str(int(item)) for item in tools_pos] + intersection = list(set(tools_pos).intersection(set(lowest_t_ids))) + if len(intersection) > 0: + lowest_tool_te_ids.append(i) + lowest_t_ids = [item for item in lowest_t_ids if item not in intersection] + return lowest_tool_te_ids + + +def save_processed_workflows(file_path, unique_paths): + workflow_paths_unique = "" + for path in unique_paths: + workflow_paths_unique += path + "\n" + with open(file_path, "w") as workflows_file: + workflows_file.write(workflow_paths_unique) + + def format_tool_id(tool_link): """ Extract tool id from tool link @@ -26,158 +67,166 @@ return tool_id -def set_trained_model(dump_file, model_values): - """ - Create an h5 file with the trained weights and associated dicts - """ - hf_file = h5py.File(dump_file, "w") - for key in model_values: - value = model_values[key] - if key == "model_weights": - for idx, item in enumerate(value): - w_key = "weight_" + str(idx) - if w_key in hf_file: - hf_file.modify(w_key, item) - else: - hf_file.create_dataset(w_key, data=item) - else: - if key in hf_file: - hf_file.modify(key, json.dumps(value)) - else: - hf_file.create_dataset(key, data=json.dumps(value)) +def save_model_file(model, r_dict, c_wts, c_tools, s_conn, model_file): + model.save_weights(model_file, save_format="h5") + hf_file = h5py.File(model_file, 'r+') + model_values = { + "reverse_dict": r_dict, + "class_weights": c_wts, + "compatible_tools": c_tools, + "standard_connections": s_conn + } + for k in model_values: + hf_file.create_dataset(k, data=json.dumps(model_values[k])) hf_file.close() -def weighted_loss(class_weights): - """ - Create a weighted loss function. Penalise the misclassification - of classes more with the higher usage - """ - weight_values = list(class_weights.values()) - weight_values.extend(weight_values) +def remove_file(file_path): + if os.path.exists(file_path): + os.remove(file_path) + - def weighted_binary_crossentropy(y_true, y_pred): - # add another dimension to compute dot product - expanded_weights = tf.expand_dims(weight_values, axis=-1) - bce = backend.binary_crossentropy(y_true, y_pred) - return backend.dot(bce, expanded_weights) - - return weighted_binary_crossentropy +def verify_oversampling_freq(oversampled_tr_data, rev_dict): + """ + Compute the frequency of tool sequences after oversampling + """ + freq_dict = dict() + freq_dict_names = dict() + for tr_data in oversampled_tr_data: + t_pos = np.where(tr_data > 0)[0] + last_tool_id = str(int(tr_data[t_pos[-1]])) + if last_tool_id not in freq_dict: + freq_dict[last_tool_id] = 0 + freq_dict_names[rev_dict[int(last_tool_id)]] = 0 + freq_dict[last_tool_id] += 1 + freq_dict_names[rev_dict[int(last_tool_id)]] += 1 + s_freq = dict(sorted(freq_dict_names.items(), key=lambda kv: kv[1], reverse=True)) + return s_freq -def balanced_sample_generator( - train_data, train_labels, batch_size, l_tool_tr_samples, reverse_dictionary -): - while True: - dimension = train_data.shape[1] - n_classes = train_labels.shape[1] - tool_ids = list(l_tool_tr_samples.keys()) - random.shuffle(tool_ids) - generator_batch_data = np.zeros([batch_size, dimension]) - generator_batch_labels = np.zeros([batch_size, n_classes]) - generated_tool_ids = choice(tool_ids, batch_size) - for i in range(batch_size): - random_toolid = generated_tool_ids[i] - sample_indices = l_tool_tr_samples[str(random_toolid)] - random_index = random.sample(range(0, len(sample_indices)), 1)[0] - random_tr_index = sample_indices[random_index] - generator_batch_data[i] = train_data[random_tr_index] - generator_batch_labels[i] = train_labels[random_tr_index] - yield generator_batch_data, generator_batch_labels +def collect_sampled_tool_freq(collected_dict, c_freq): + for t in c_freq: + if t not in collected_dict: + collected_dict[t] = int(c_freq[t]) + else: + collected_dict[t] += int(c_freq[t]) + return collected_dict + + +def save_data_as_dict(f_dict, r_dict, inp, tar, save_path): + inp_tar = dict() + for index, (i, t) in enumerate(zip(inp, tar)): + i_pos = np.where(i > 0)[0] + i_seq = ",".join([str(int(item)) for item in i[1:i_pos[-1] + 1]]) + t_pos = np.where(t > 0)[0] + t_seq = ",".join([str(int(item)) for item in t[1:t_pos[-1] + 1]]) + if i_seq not in inp_tar: + inp_tar[i_seq] = list() + inp_tar[i_seq].append(t_seq) + size = 0 + for item in inp_tar: + size += len(inp_tar[item]) + print("Size saved file: ", size) + write_file(save_path, inp_tar) + + +def read_train_test(datapath): + file_obj = h5py.File(datapath, 'r') + data_input = np.array(file_obj["input"]) + data_target = np.array(file_obj["target"]) + return data_input, data_target -def compute_precision( - model, - x, - y, - reverse_data_dictionary, - usage_scores, - actual_classes_pos, - topk, - standard_conn, - last_tool_id, - lowest_tool_ids, -): - """ - Compute absolute and compatible precision - """ - pred_t_name = "" - top_precision = 0.0 - mean_usage = 0.0 - usage_wt_score = list() - pub_precision = 0.0 - lowest_pub_prec = 0.0 - lowest_norm_prec = 0.0 - pub_tools = list() - actual_next_tool_names = list() - test_sample = np.reshape(x, (1, len(x))) +def sample_balanced_tr_y(x_seqs, y_labels, ulabels_tr_y_dict, b_size, tr_t_freq, prev_sel_tools): + batch_y_tools = list(ulabels_tr_y_dict.keys()) + random.shuffle(batch_y_tools) + label_tools = list() + rand_batch_indices = list() + sel_tools = list() + + unselected_tools = [t for t in batch_y_tools if t not in prev_sel_tools] + rand_selected_tools = unselected_tools[:b_size] - # predict next tools for a test path - prediction = model.predict(test_sample, verbose=0) - - # divide the predicted vector into two halves - one for published and - # another for normal workflows - nw_dimension = prediction.shape[1] - half_len = int(nw_dimension / 2) + for l_tool in rand_selected_tools: + seq_indices = ulabels_tr_y_dict[l_tool] + random.shuffle(seq_indices) + rand_s_index = np.random.randint(0, len(seq_indices), 1)[0] + rand_sample = seq_indices[rand_s_index] + sel_tools.append(l_tool) + rand_batch_indices.append(rand_sample) + label_tools.append(l_tool) - # predict tools - prediction = np.reshape(prediction, (nw_dimension,)) - # get predictions of tools from published workflows - standard_pred = prediction[:half_len] - # get predictions of tools from normal workflows - normal_pred = prediction[half_len:] + x_batch_train = x_seqs[rand_batch_indices] + y_batch_train = y_labels[rand_batch_indices] - standard_prediction_pos = np.argsort(standard_pred, axis=-1) - standard_topk_prediction_pos = standard_prediction_pos[-topk] + unrolled_x = tf.convert_to_tensor(x_batch_train, dtype=tf.int64) + unrolled_y = tf.convert_to_tensor(y_batch_train, dtype=tf.int64) + return unrolled_x, unrolled_y, sel_tools + - normal_prediction_pos = np.argsort(normal_pred, axis=-1) - normal_topk_prediction_pos = normal_prediction_pos[-topk] +def sample_balanced_te_y(x_seqs, y_labels, ulabels_tr_y_dict, b_size): + batch_y_tools = list(ulabels_tr_y_dict.keys()) + random.shuffle(batch_y_tools) + label_tools = list() + rand_batch_indices = list() + sel_tools = list() + for l_tool in batch_y_tools: + seq_indices = ulabels_tr_y_dict[l_tool] + random.shuffle(seq_indices) + rand_s_index = np.random.randint(0, len(seq_indices), 1)[0] + rand_sample = seq_indices[rand_s_index] + sel_tools.append(l_tool) + if rand_sample not in rand_batch_indices: + rand_batch_indices.append(rand_sample) + label_tools.append(l_tool) + if len(rand_batch_indices) == b_size: + break + x_batch_train = x_seqs[rand_batch_indices] + y_batch_train = y_labels[rand_batch_indices] + + unrolled_x = tf.convert_to_tensor(x_batch_train, dtype=tf.int64) + unrolled_y = tf.convert_to_tensor(y_batch_train, dtype=tf.int64) + return unrolled_x, unrolled_y, sel_tools + - # get true tools names - for a_t_pos in actual_classes_pos: - if a_t_pos > half_len: - t_name = reverse_data_dictionary[int(a_t_pos - half_len)] - else: - t_name = reverse_data_dictionary[int(a_t_pos)] - actual_next_tool_names.append(t_name) - last_tool_name = reverse_data_dictionary[x[-1]] - # compute scores for published recommendations - if standard_topk_prediction_pos in reverse_data_dictionary: - pred_t_name = reverse_data_dictionary[int(standard_topk_prediction_pos)] - if last_tool_name in standard_conn: - pub_tools = standard_conn[last_tool_name] - if pred_t_name in pub_tools: - pub_precision = 1.0 - # count precision only when there is actually true published tools - if last_tool_id in lowest_tool_ids: - lowest_pub_prec = 1.0 - else: - lowest_pub_prec = np.nan - if standard_topk_prediction_pos in usage_scores: - usage_wt_score.append( - np.log(usage_scores[standard_topk_prediction_pos] + 1.0) - ) - else: - # count precision only when there is actually true published tools - # else set to np.nan. Set to 0 only when there is wrong prediction - pub_precision = np.nan - lowest_pub_prec = np.nan - # compute scores for normal recommendations - if normal_topk_prediction_pos in reverse_data_dictionary: - pred_t_name = reverse_data_dictionary[int(normal_topk_prediction_pos)] - if pred_t_name in actual_next_tool_names: - if normal_topk_prediction_pos in usage_scores: - usage_wt_score.append( - np.log(usage_scores[normal_topk_prediction_pos] + 1.0) - ) - top_precision = 1.0 - if last_tool_id in lowest_tool_ids: - lowest_norm_prec = 1.0 - else: - lowest_norm_prec = np.nan - if len(usage_wt_score) > 0: - mean_usage = np.mean(usage_wt_score) - return mean_usage, top_precision, pub_precision, lowest_pub_prec, lowest_norm_prec +def get_u_tr_labels(y_tr): + labels = list() + labels_pos_dict = dict() + for i, item in enumerate(y_tr): + label_pos = np.where(item > 0)[0] + labels.extend(label_pos) + for label in label_pos: + if label not in labels_pos_dict: + labels_pos_dict[label] = list() + labels_pos_dict[label].append(i) + u_labels = list(set(labels)) + for item in labels_pos_dict: + labels_pos_dict[item] = list(set(labels_pos_dict[item])) + return u_labels, labels_pos_dict + + +def compute_loss(y_true, y_pred, class_weights=None): + y_true = tf.cast(y_true, dtype=tf.float32) + loss = binary_ce(y_true, y_pred) + categorical_loss = categorical_ce(y_true, y_pred) + if class_weights is None: + return tf.reduce_mean(loss), categorical_loss + return tf.tensordot(loss, class_weights, axes=1), categorical_loss + + +def compute_acc(y_true, y_pred): + return binary_acc(y_true, y_pred) + + +def validate_model(te_x, te_y, te_batch_size, model, f_dict, r_dict, ulabels_te_dict, tr_labels, lowest_t_ids): + te_x_batch, y_train_batch, _ = sample_balanced_te_y(te_x, te_y, ulabels_te_dict, te_batch_size) + print("Total test data size: ", te_x.shape, te_y.shape) + print("Batch test data size: ", te_x_batch.shape, y_train_batch.shape) + te_pred_batch, _ = model(te_x_batch, training=False) + test_err, _ = compute_loss(y_train_batch, te_pred_batch) + print("Test loss:") + print(test_err.numpy()) + print("Test finished") def get_lowest_tools(l_tool_freq, fraction=0.25): @@ -187,98 +236,7 @@ return lowest_ids -def verify_model( - model, - x, - y, - reverse_data_dictionary, - usage_scores, - standard_conn, - lowest_tool_ids, - topk_list=[1, 2, 3], -): - """ - Verify the model on test data - """ - print("Evaluating performance on test data...") - print("Test data size: %d" % len(y)) - size = y.shape[0] - precision = np.zeros([len(y), len(topk_list)]) - usage_weights = np.zeros([len(y), len(topk_list)]) - epo_pub_prec = np.zeros([len(y), len(topk_list)]) - epo_lowest_tools_pub_prec = list() - epo_lowest_tools_norm_prec = list() - lowest_counter = 0 - # loop over all the test samples and find prediction precision - for i in range(size): - lowest_pub_topk = list() - lowest_norm_topk = list() - actual_classes_pos = np.where(y[i] > 0)[0] - test_sample = x[i, :] - last_tool_id = str(int(test_sample[-1])) - for index, abs_topk in enumerate(topk_list): - ( - usg_wt_score, - absolute_precision, - pub_prec, - lowest_p_prec, - lowest_n_prec, - ) = compute_precision( - model, - test_sample, - y, - reverse_data_dictionary, - usage_scores, - actual_classes_pos, - abs_topk, - standard_conn, - last_tool_id, - lowest_tool_ids, - ) - precision[i][index] = absolute_precision - usage_weights[i][index] = usg_wt_score - epo_pub_prec[i][index] = pub_prec - lowest_pub_topk.append(lowest_p_prec) - lowest_norm_topk.append(lowest_n_prec) - epo_lowest_tools_pub_prec.append(lowest_pub_topk) - epo_lowest_tools_norm_prec.append(lowest_norm_topk) - if last_tool_id in lowest_tool_ids: - lowest_counter += 1 - mean_precision = np.mean(precision, axis=0) - mean_usage = np.mean(usage_weights, axis=0) - mean_pub_prec = np.nanmean(epo_pub_prec, axis=0) - mean_lowest_pub_prec = np.nanmean(epo_lowest_tools_pub_prec, axis=0) - mean_lowest_norm_prec = np.nanmean(epo_lowest_tools_norm_prec, axis=0) - return ( - mean_usage, - mean_precision, - mean_pub_prec, - mean_lowest_pub_prec, - mean_lowest_norm_prec, - lowest_counter, - ) - - -def save_model( - results, - data_dictionary, - compatible_next_tools, - trained_model_path, - class_weights, - standard_connections, -): - # save files - trained_model = results["model"] - best_model_parameters = results["best_parameters"] - model_config = trained_model.to_json() - model_weights = trained_model.get_weights() - model_values = { - "data_dictionary": data_dictionary, - "model_config": model_config, - "best_parameters": best_model_parameters, - "model_weights": model_weights, - "compatible_tools": compatible_next_tools, - "class_weights": class_weights, - "standard_connections": standard_connections, - } - set_trained_model(trained_model_path, model_values) +def remove_pipe(file_path): + dataframe = pd.read_csv(file_path, sep="|", header=None) + dataframe = dataframe[1:len(dataframe.index) - 1] + return dataframe[1:]