Mercurial > repos > bebatut > format_cd_hit_output
diff format_cd_hit_output.py @ 1:64da677bcee2 draft default tip
"planemo upload for repository https://github.com/bgruening/galaxytools/tree/master/tools/format_cd_hit_output/ commit eea46077010e699403ce6995d7d4aac77b2e0b43"
author | bgruening |
---|---|
date | Wed, 19 Oct 2022 14:42:33 +0000 |
parents | 4015e9d6d277 |
children |
line wrap: on
line diff
--- a/format_cd_hit_output.py Tue Apr 26 08:55:33 2016 -0400 +++ b/format_cd_hit_output.py Wed Oct 19 14:42:33 2022 +0000 @@ -1,43 +1,42 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- -import sys -import os import argparse -import copy -import operator -from sets import Set + def extract_mapping_info(input_mapping_filepath): mapping_info = {} - categories = Set([]) + categories = set([]) - with open(input_mapping_filepath,'r') as mapping_file: + with open(input_mapping_filepath, 'r') as mapping_file: for line in mapping_file.readlines(): split_line = line[:-1].split('\t') - mapping_info.setdefault(split_line[0],split_line[1]) + mapping_info.setdefault(split_line[0], split_line[1]) categories.add(split_line[1]) return mapping_info, categories -def init_category_distribution(categories = None): - cluster_category_distribution = {} - if categories != None: + +def init_category_distribution(categories=None): + cluster_categ_distri = {} + if categories is not None: for category in categories: - cluster_category_distribution[category] = 0 - return cluster_category_distribution + cluster_categ_distri[category] = 0 + return cluster_categ_distri + -def flush_cluster_info(cluster_name, cluster_ref_seq, ref_seq_cluster, - cluster_category_distribution, categories, output_category_distribution_file, - cluster_seq_number): +def flush_cluster_info(cluster_name, cluster_ref_seq, ref_seq_cluster, + cluster_categ_distri, categories, + output_category_distribution_file, cluster_seq_number): if cluster_name != '': - if categories != None: - output_category_distribution_file.write(cluster_name) - output_category_distribution_file.write('\t' + str(cluster_seq_number)) + if categories is not None: + string = cluster_name + string += '\t' + str(cluster_seq_number) for category in categories: - output_category_distribution_file.write('\t') - output_category_distribution_file.write(str(cluster_category_distribution[category])) - output_category_distribution_file.write('\n') + string += '\t' + string += str(cluster_categ_distri[category]) + string += '\n' + output_category_distribution_file.write(string) if cluster_ref_seq == '': string = "No reference sequence found for " @@ -46,11 +45,12 @@ ref_seq_cluster.setdefault(cluster_ref_seq, cluster_name) -def extract_cluster_info(args, mapping_info = None, categories = None): + +def extract_cluster_info(args, mapping_info=None, categories=None): ref_seq_cluster = {} - if args.output_category_distribution != None: - if mapping_info == None or categories == None: + if args.output_category_distribution is not None: + if mapping_info is None or categories is None: string = "A file with category distribution is expected but " string += "no mapping information are available" raise ValueError(string) @@ -63,19 +63,24 @@ else: output_cat_distri_file = None - with open(args.input_cluster_info,'r') as cluster_info_file: + with open(args.input_cluster_info, 'r') as cluster_info_file: cluster_name = '' - cluster_category_distribution = init_category_distribution(categories) + cluster_categ_distri = init_category_distribution(categories) cluster_ref_seq = '' cluster_seq_number = 0 for line in cluster_info_file.readlines(): if line[0] == '>': - flush_cluster_info(cluster_name, cluster_ref_seq, ref_seq_cluster, - cluster_category_distribution, categories, - output_cat_distri_file, cluster_seq_number) + flush_cluster_info( + cluster_name, + cluster_ref_seq, + ref_seq_cluster, + cluster_categ_distri, + categories, + output_cat_distri_file, + cluster_seq_number) cluster_name = line[1:-1] - cluster_name = cluster_name.replace(' ','_') - cluster_category_distribution = init_category_distribution(categories) + cluster_name = cluster_name.replace(' ', '_') + cluster_categ_distri = init_category_distribution(categories) cluster_ref_seq = '' cluster_seq_number = 0 else: @@ -83,50 +88,58 @@ seq_name = seq_info[1][1:-3] cluster_seq_number += 1 - if categories != None: + if categories is not None: seq_count = 1 - if args.number_sum != None: + if args.number_sum is not None: if seq_name.find('size') != -1: substring = seq_name[seq_name.find('size'):-1] seq_count = int(substring.split('=')[1]) - if not mapping_info.has_key(seq_name): + if seq_name not in mapping_info: string = seq_name + " not found in mapping" raise ValueError(string) category = mapping_info[seq_name] - cluster_category_distribution[category] += seq_count - + cluster_categ_distri[category] += seq_count + if seq_info[-1] == '*': if cluster_ref_seq != '': string = "A reference sequence (" + cluster_ref_seq - string += ") already found for cluster " + cluster_name + string += ") already found for cluster " + cluster_name string += " (" + seq_name + ")" raise ValueError(string) cluster_ref_seq = seq_name - flush_cluster_info(cluster_name, cluster_ref_seq, ref_seq_cluster, - cluster_category_distribution, categories, output_cat_distri_file, + flush_cluster_info( + cluster_name, + cluster_ref_seq, + ref_seq_cluster, + cluster_categ_distri, + categories, + output_cat_distri_file, cluster_seq_number) - if args.output_category_distribution != None: + if args.output_category_distribution is not None: output_cat_distri_file.close() return ref_seq_cluster + def rename_representative_sequences(args, ref_seq_cluster): - with open(args.input_representative_sequences,'r') as input_sequences: - with open(args.output_representative_sequences,'w') as output_sequences: + with open(args.input_representative_sequences, 'r') as input_sequences: + with open(args.output_representative_sequences, 'w') as output_seq: for line in input_sequences.readlines(): if line[0] == '>': seq_name = line[1:-1] - if not ref_seq_cluster.has_key(seq_name): + if seq_name not in ref_seq_cluster: string = seq_name + " not found as reference sequence" raise ValueError(string) - output_sequences.write('>' + ref_seq_cluster[seq_name] + '\n') + string = '>' + ref_seq_cluster[seq_name] + '\n' + output_seq.write(string) else: - output_sequences.write(line) + output_seq.write(line) + def format_cd_hit_outputs(args): - if args.input_mapping != None: + if args.input_mapping is not None: mapping_info, categories = extract_mapping_info(args.input_mapping) else: mapping_info = None @@ -134,9 +147,10 @@ ref_seq_cluster = extract_cluster_info(args, mapping_info, categories) - if args.input_representative_sequences != None: + if args.input_representative_sequences is not None: rename_representative_sequences(args, ref_seq_cluster) + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--input_cluster_info', required=True) @@ -147,4 +161,4 @@ parser.add_argument('--number_sum') args = parser.parse_args() - format_cd_hit_outputs(args) \ No newline at end of file + format_cd_hit_outputs(args)