Mercurial > repos > galaxyp > proteore_venn_diagram
comparison venn_diagram.py @ 0:57f01ca855cd draft default tip
"planemo upload commit 47d779aa1de5153673ac8bb1e37c9730210cbb5d"
author | galaxyp |
---|---|
date | Sat, 12 Jun 2021 18:06:28 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:57f01ca855cd |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 import argparse | |
4 import csv | |
5 import json | |
6 import os | |
7 import re | |
8 from itertools import combinations | |
9 | |
10 | |
11 CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) | |
12 | |
13 ######################################################################## | |
14 # FUNCTIONS | |
15 ######################################################################## | |
16 | |
17 | |
18 def isnumber(format, n): | |
19 """ | |
20 Check if an element is integer or float | |
21 """ | |
22 float_format = re.compile(r"^[-]?[1-9][0-9]*.?[0-9]+$") | |
23 int_format = re.compile(r"^[-]?[1-9][0-9]*$") | |
24 test = "" | |
25 if format == "int": | |
26 test = re.match(int_format, n) | |
27 elif format == "float": | |
28 test = re.match(float_format, n) | |
29 if test: | |
30 return True | |
31 else: | |
32 return False | |
33 | |
34 | |
35 def input_to_dict(inputs): | |
36 """ | |
37 Parse input and return a dictionary of name and data of each lists/files | |
38 """ | |
39 comp_dict = {} | |
40 title_dict = {} | |
41 c = ["A", "B", "C", "D", "E", "F"] | |
42 for i in range(len(inputs)): | |
43 input_file = inputs[i][0] | |
44 name = inputs[i][1] | |
45 input_type = inputs[i][2] | |
46 title = c[i] | |
47 title_dict[title] = name | |
48 ids = set() | |
49 if input_type == "file": | |
50 header = inputs[i][3] | |
51 ncol = inputs[i][4] | |
52 with open(input_file, "r") as handle: | |
53 file_content = csv.reader(handle, delimiter="\t") | |
54 file_content = list(file_content) # csv object to list | |
55 | |
56 # Check if column number is in right form | |
57 if isnumber("int", ncol.replace("c", "")): | |
58 if header == "true": | |
59 # gets ids from defined column | |
60 file_content = [x for x in [line[int(ncol.replace("c", ""))-1].split(";") for line in file_content[1:]]] # noqa 501 | |
61 | |
62 else: | |
63 file_content = [x for x in [line[int(ncol.replace("c", ""))-1].split(";") for line in file_content]] # noqa 501 | |
64 else: | |
65 raise ValueError("Please fill in the right format of column number") # noqa 501 | |
66 else: | |
67 ids = set() | |
68 file_content = inputs[i][0].split() | |
69 file_content = [x.split(";") for x in file_content] | |
70 | |
71 # flat list of list of lists, remove empty items | |
72 file_content = [item.strip() for sublist in file_content for item in sublist if item != ''] # noqa 501 | |
73 ids.update(file_content) | |
74 if 'NA' in ids: | |
75 ids.remove('NA') | |
76 comp_dict[title] = ids | |
77 | |
78 return comp_dict, title_dict | |
79 | |
80 | |
81 def intersect(comp_dict): | |
82 """ | |
83 Calculate the intersections of input | |
84 """ | |
85 names = set(comp_dict) | |
86 for i in range(1, len(comp_dict) + 1): | |
87 for group in combinations(sorted(comp_dict), i): | |
88 others = set() | |
89 [others.add(name) for name in names if name not in group] | |
90 difference = [] | |
91 intersected = set.intersection(*(comp_dict[k] for k in group)) | |
92 if len(others) > 0: | |
93 difference = intersected.difference(set.union(*(comp_dict[k] for k in others))) # noqa 501 | |
94 yield group, list(intersected), list(difference) | |
95 | |
96 | |
97 def diagram(comp_dict, title_dict): | |
98 """ | |
99 Create json string for jvenn diagram plot | |
100 """ | |
101 result = {} | |
102 result["name"] = {} | |
103 for k in comp_dict.keys(): | |
104 result["name"][k] = title_dict[k] | |
105 | |
106 result["data"] = {} | |
107 result["values"] = {} | |
108 for group, intersected, difference in intersect(comp_dict): | |
109 if len(group) == 1: | |
110 result["data"]["".join(group)] = sorted(difference) | |
111 result["values"]["".join(group)] = len(difference) | |
112 elif len(group) > 1 and len(group) < len(comp_dict): | |
113 result["data"]["".join(group)] = sorted(difference) | |
114 result["values"]["".join(group)] = len(difference) | |
115 elif len(group) == len(comp_dict): | |
116 result["data"]["".join(group)] = sorted(intersected) | |
117 result["values"]["".join(group)] = len(intersected) | |
118 | |
119 return result | |
120 | |
121 # Write intersections of input to text output file | |
122 | |
123 | |
124 def write_text_venn(json_result): | |
125 lines = [] | |
126 result = dict((k, v) for k, v in json_result["data"].items() if v != []) # noqa 501 | |
127 for key in result: | |
128 if 'NA' in result[key]: | |
129 result[key].remove("NA") | |
130 | |
131 list_names = dict((k, v) for k, v in json_result["name"].items() if v != []) # noqa 501 | |
132 nb_lines_max = max(len(v) for v in result.values()) | |
133 | |
134 # get list names associated to each column | |
135 column_dict = {} | |
136 for key in result: | |
137 if key in list_names: | |
138 column_dict[key] = list_names[key] | |
139 else: | |
140 keys = list(key) | |
141 column_dict[key] = "_".join([list_names[k] for k in keys]) | |
142 | |
143 # construct tsv | |
144 for key in result: | |
145 line = result[key] | |
146 if len(line) < nb_lines_max: | |
147 line.extend([''] * (nb_lines_max - len(line))) | |
148 line = [column_dict[key]] + line # add header | |
149 lines.append(line) | |
150 # transpose tsv | |
151 lines = zip(*lines) | |
152 | |
153 with open("venn_diagram_text_output.tsv", "w") as output: | |
154 tsv_output = csv.writer(output, delimiter='\t') | |
155 tsv_output.writerows(lines) | |
156 | |
157 | |
158 def write_summary(summary_file, inputs): | |
159 """ | |
160 Paste json string into template file | |
161 """ | |
162 a, b = input_to_dict(inputs) | |
163 data = diagram(a, b) | |
164 write_text_venn(data) | |
165 | |
166 to_replace = { | |
167 "series": [data], | |
168 "displayStat": "true", | |
169 "displaySwitch": "true", | |
170 "shortNumber": "true", | |
171 } | |
172 | |
173 FH_summary_tpl = open(os.path.join(CURRENT_DIR, "jvenn_template.html")) | |
174 FH_summary_out = open(summary_file, "w") | |
175 for line in FH_summary_tpl: | |
176 if "###JVENN_DATA###" in line: | |
177 line = line.replace("###JVENN_DATA###", json.dumps(to_replace)) | |
178 FH_summary_out.write(line) | |
179 | |
180 FH_summary_out.close() | |
181 FH_summary_tpl.close() | |
182 | |
183 | |
184 def process(args): | |
185 write_summary(args.summary, args.input) | |
186 | |
187 | |
188 ##################################################################### | |
189 # MAIN | |
190 ##################################################################### | |
191 if __name__ == '__main__': | |
192 # Parse parameters | |
193 parser = argparse.ArgumentParser(description='Filters an abundance file') | |
194 group_input = parser.add_argument_group('Inputs') | |
195 group_input.add_argument('--input', nargs="+", action="append", | |
196 required=True, help="The input tabular file.") | |
197 group_output = parser.add_argument_group('Outputs') | |
198 group_output.add_argument('--summary', default="summary.html", | |
199 help="The HTML file containing the graphs. \ | |
200 [Default: %(default)s]") | |
201 args = parser.parse_args() | |
202 | |
203 # Process | |
204 process(args) |