Mercurial > repos > bgruening > create_tool_recommendation_model
diff prepare_data.py @ 0:9bf25dbe00ad draft
"planemo upload for repository https://github.com/bgruening/galaxytools/tree/recommendation_training/tools/tool_recommendation_model commit 7fac577189d01cedd01118a77fc2baaefe7d5cad"
author | bgruening |
---|---|
date | Wed, 28 Aug 2019 07:19:38 -0400 |
parents | |
children | 5b3c08710e47 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/prepare_data.py Wed Aug 28 07:19:38 2019 -0400 @@ -0,0 +1,251 @@ +""" +Prepare the workflow paths to be used by downstream +machine learning algorithm. The paths are divided +into the test and training sets +""" + +import os +import collections +import numpy as np +import random + +import predict_tool_usage + +main_path = os.getcwd() + + +class PrepareData: + + @classmethod + def __init__(self, max_seq_length, test_data_share): + """ Init method. """ + self.max_tool_sequence_len = max_seq_length + self.test_share = test_data_share + + @classmethod + def process_workflow_paths(self, workflow_paths): + """ + Get all the tools and complete set of individual paths for each workflow + """ + tokens = list() + raw_paths = workflow_paths + raw_paths = [x.replace("\n", '') for x in raw_paths] + for item in raw_paths: + split_items = item.split(",") + for token in split_items: + if token is not "": + tokens.append(token) + tokens = list(set(tokens)) + tokens = np.array(tokens) + tokens = np.reshape(tokens, [-1, ]) + return tokens, raw_paths + + @classmethod + def create_new_dict(self, new_data_dict): + """ + Create new data dictionary + """ + reverse_dict = dict((v, k) for k, v in new_data_dict.items()) + return new_data_dict, reverse_dict + + @classmethod + def assemble_dictionary(self, new_data_dict, old_data_dictionary={}): + """ + Create/update tools indices in the forward and backward dictionary + """ + new_data_dict, reverse_dict = self.create_new_dict(new_data_dict) + return new_data_dict, reverse_dict + + @classmethod + def create_data_dictionary(self, words, old_data_dictionary={}): + """ + Create two dictionaries having tools names and their indexes + """ + count = collections.Counter(words).most_common() + dictionary = dict() + for word, _ in count: + dictionary[word] = len(dictionary) + 1 + dictionary, reverse_dictionary = self.assemble_dictionary(dictionary, old_data_dictionary) + return dictionary, reverse_dictionary + + @classmethod + def decompose_paths(self, paths, dictionary): + """ + Decompose the paths to variable length sub-paths keeping the first tool fixed + """ + sub_paths_pos = list() + for index, item in enumerate(paths): + tools = item.split(",") + len_tools = len(tools) + if len_tools <= self.max_tool_sequence_len: + for window in range(1, len_tools): + sequence = tools[0: window + 1] + tools_pos = [str(dictionary[str(tool_item)]) for tool_item in sequence] + if len(tools_pos) > 1: + sub_paths_pos.append(",".join(tools_pos)) + sub_paths_pos = list(set(sub_paths_pos)) + return sub_paths_pos + + @classmethod + def prepare_paths_labels_dictionary(self, dictionary, reverse_dictionary, paths, compatible_next_tools): + """ + Create a dictionary of sequences with their labels for training and test paths + """ + paths_labels = dict() + random.shuffle(paths) + for item in paths: + if item and item not in "": + tools = item.split(",") + label = tools[-1] + train_tools = tools[:len(tools) - 1] + last_but_one_name = reverse_dictionary[int(train_tools[-1])] + try: + compatible_tools = compatible_next_tools[last_but_one_name].split(",") + except Exception: + continue + if len(compatible_tools) > 0: + compatible_tools_ids = [str(dictionary[x]) for x in compatible_tools] + compatible_tools_ids.append(label) + composite_labels = ",".join(compatible_tools_ids) + train_tools = ",".join(train_tools) + if train_tools in paths_labels: + paths_labels[train_tools] += "," + composite_labels + else: + paths_labels[train_tools] = composite_labels + for item in paths_labels: + paths_labels[item] = ",".join(list(set(paths_labels[item].split(",")))) + return paths_labels + + @classmethod + def pad_paths(self, paths_dictionary, num_classes): + """ + Add padding to the tools sequences and create multi-hot encoded labels + """ + size_data = len(paths_dictionary) + data_mat = np.zeros([size_data, self.max_tool_sequence_len]) + label_mat = np.zeros([size_data, num_classes + 1]) + train_counter = 0 + for train_seq, train_label in list(paths_dictionary.items()): + positions = train_seq.split(",") + start_pos = self.max_tool_sequence_len - len(positions) + for id_pos, pos in enumerate(positions): + data_mat[train_counter][start_pos + id_pos] = int(pos) + for label_item in train_label.split(","): + label_mat[train_counter][int(label_item)] = 1.0 + train_counter += 1 + return data_mat, label_mat + + @classmethod + def split_test_train_data(self, multilabels_paths): + """ + Split into test and train data randomly for each run + """ + train_dict = dict() + test_dict = dict() + all_paths = multilabels_paths.keys() + random.shuffle(list(all_paths)) + split_number = int(self.test_share * len(all_paths)) + for index, path in enumerate(list(all_paths)): + if index < split_number: + test_dict[path] = multilabels_paths[path] + else: + train_dict[path] = multilabels_paths[path] + return train_dict, test_dict + + @classmethod + def verify_overlap(self, train_paths, test_paths): + """ + Verify the overlapping of samples in train and test data + """ + intersection = list(set(train_paths).intersection(set(test_paths))) + print("Overlap in train and test: %d" % len(intersection)) + + @classmethod + def get_predicted_usage(self, data_dictionary, predicted_usage): + """ + Get predicted usage for tools + """ + usage = dict() + epsilon = 0.0 + # index 0 does not belong to any tool + usage[0] = epsilon + for k, v in data_dictionary.items(): + try: + usg = predicted_usage[k] + if usg < epsilon: + usg = epsilon + usage[v] = usg + except Exception: + usage[v] = epsilon + continue + return usage + + @classmethod + def assign_class_weights(self, n_classes, predicted_usage): + """ + Compute class weights using usage + """ + class_weights = dict() + class_weights[str(0)] = 0.0 + for key in range(1, n_classes): + u_score = predicted_usage[key] + if u_score < 1.0: + u_score += 1.0 + class_weights[key] = np.log(u_score) + return class_weights + + @classmethod + def get_sample_weights(self, train_data, reverse_dictionary, paths_frequency): + """ + Compute the frequency of paths in training data + """ + path_weights = np.zeros(len(train_data)) + for path_index, path in enumerate(train_data): + sample_pos = np.where(path > 0)[0] + sample_tool_pos = path[sample_pos[0]:] + path_name = ",".join([reverse_dictionary[int(tool_pos)] for tool_pos in sample_tool_pos]) + try: + path_weights[path_index] = int(paths_frequency[path_name]) + except Exception: + path_weights[path_index] = 1 + return path_weights + + @classmethod + def get_data_labels_matrices(self, workflow_paths, tool_usage_path, cutoff_date, compatible_next_tools, old_data_dictionary={}): + """ + Convert the training and test paths into corresponding numpy matrices + """ + processed_data, raw_paths = self.process_workflow_paths(workflow_paths) + dictionary, reverse_dictionary = self.create_data_dictionary(processed_data, old_data_dictionary) + num_classes = len(dictionary) + + print("Raw paths: %d" % len(raw_paths)) + random.shuffle(raw_paths) + + print("Decomposing paths...") + all_unique_paths = self.decompose_paths(raw_paths, dictionary) + random.shuffle(all_unique_paths) + + print("Creating dictionaries...") + multilabels_paths = self.prepare_paths_labels_dictionary(dictionary, reverse_dictionary, all_unique_paths, compatible_next_tools) + + print("Complete data: %d" % len(multilabels_paths)) + train_paths_dict, test_paths_dict = self.split_test_train_data(multilabels_paths) + + print("Train data: %d" % len(train_paths_dict)) + print("Test data: %d" % len(test_paths_dict)) + + test_data, test_labels = self.pad_paths(test_paths_dict, num_classes) + train_data, train_labels = self.pad_paths(train_paths_dict, num_classes) + + # Predict tools usage + print("Predicting tools' usage...") + usage_pred = predict_tool_usage.ToolPopularity() + usage = usage_pred.extract_tool_usage(tool_usage_path, cutoff_date, dictionary) + tool_usage_prediction = usage_pred.get_pupularity_prediction(usage) + tool_predicted_usage = self.get_predicted_usage(dictionary, tool_usage_prediction) + + # get class weights using the predicted usage for each tool + class_weights = self.assign_class_weights(train_labels.shape[1], tool_predicted_usage) + + return train_data, train_labels, test_data, test_labels, dictionary, reverse_dictionary, class_weights, tool_predicted_usage