Mercurial > repos > cpt > cpt_annotation_table
comparison cpt_annotation_table/phage_annotation_table.py @ 0:6a4d1bd8ac1d draft
Uploaded
author | cpt |
---|---|
date | Fri, 17 Jun 2022 12:19:58 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:6a4d1bd8ac1d |
---|---|
1 #!/usr/bin/env python | |
2 # vim: set fileencoding=utf-8 | |
3 import os | |
4 import argparse | |
5 from gff3 import genes, get_gff3_id, get_rbs_from, feature_test_true, feature_lambda, feature_test_type | |
6 from CPT_GFFParser import gffParse, gffWrite | |
7 from Bio import SeqIO | |
8 from jinja2 import Environment, FileSystemLoader | |
9 import logging | |
10 from math import floor | |
11 | |
12 logging.basicConfig(level=logging.DEBUG) | |
13 log = logging.getLogger(name="pat") | |
14 | |
15 # Path to script, required because of Galaxy. | |
16 SCRIPT_PATH = os.path.dirname(os.path.realpath(__file__)) | |
17 # Path to the HTML template for the report | |
18 | |
19 def genes_all(feature_list, feature_type=["gene"], sort=False): | |
20 """ | |
21 Simple filter to extract gene features from the feature set. | |
22 """ | |
23 | |
24 if not sort: | |
25 for x in feature_lambda( | |
26 feature_list, feature_test_type, {"types": feature_type}, subfeatures=True | |
27 ): | |
28 yield x | |
29 else: | |
30 data = list(genes_all(feature_list, feature_type, sort=False)) | |
31 data = sorted(data, key=lambda feature: feature.location.start) | |
32 for x in data: | |
33 yield x | |
34 | |
35 def checkSubs(feature, qualName): | |
36 subFeats = [] | |
37 res = "" | |
38 subFeats = feature.sub_features | |
39 while (len(subFeats) > 0): | |
40 for feat in subFeats: | |
41 for i in feat.qualifiers.keys(): | |
42 for j in qualName: | |
43 if i == j: | |
44 if res == "": | |
45 res = feat.qualifiers[i][0] | |
46 else: | |
47 res += "; " + feat.qualifiers[i][0] | |
48 if res != "": | |
49 return res | |
50 tempFeats = [] | |
51 for feat in subFeats: # Should be breadth-first results | |
52 for x in feat.sub_features: | |
53 tempFeats.append(x) | |
54 subFeats = tempFeats | |
55 return res | |
56 | |
57 def annotation_table_report(record, types, wanted_cols, gaf_data, searchSubs): | |
58 getTypes = [] | |
59 for x in [y.strip() for y in types.split(",")]: | |
60 getTypes.append(x) | |
61 getTypes.append("gene") | |
62 sorted_features = list(genes_all(record.features, getTypes, sort=True)) | |
63 if wanted_cols is None or len(wanted_cols.strip()) == 0: | |
64 return [], [] | |
65 useSubs = searchSubs | |
66 | |
67 def rid(record, feature): | |
68 """Organism ID | |
69 """ | |
70 return record.id | |
71 | |
72 def id(record, feature): | |
73 """ID | |
74 """ | |
75 return feature.id | |
76 | |
77 def featureType(record, feature): | |
78 """Type | |
79 """ | |
80 return feature.type | |
81 | |
82 def name(record, feature): | |
83 """Name | |
84 """ | |
85 for x in ["Name", "name"]: | |
86 for y in feature.qualifiers.keys(): | |
87 if x == y: | |
88 return feature.qualifiers[x][0] | |
89 if useSubs: | |
90 res = checkSubs(feature, ["Name", "name"]) | |
91 if res != "": | |
92 return res | |
93 return "None" | |
94 def start(record, feature): | |
95 """Boundary | |
96 """ | |
97 return str(feature.location.start + 1) | |
98 | |
99 def end(record, feature): | |
100 """Boundary | |
101 """ | |
102 return str(feature.location.end) | |
103 | |
104 def location(record, feature): | |
105 """Location | |
106 """ | |
107 return str(feature.location.start + 1) + "..{0.end}".format(feature.location) | |
108 | |
109 def length(record, feature): | |
110 """CDS Length (AA) | |
111 """ | |
112 | |
113 if feature.type == "CDS": | |
114 cdss = [feature] | |
115 else: | |
116 cdss = list(genes(feature.sub_features, feature_type="CDS", sort=True)) | |
117 | |
118 if cdss == []: | |
119 return "None" | |
120 res = (sum([len(cds) for cds in cdss]) / 3) - 1 | |
121 if floor(res) == res: | |
122 res = int(res) | |
123 return str(res) | |
124 | |
125 def notes(record, feature): | |
126 """User entered Notes""" | |
127 for x in ["Note", "note", "Notes", "notes"]: | |
128 for y in feature.qualifiers.keys(): | |
129 if x == y: | |
130 return feature.qualifiers[x][0] | |
131 if useSubs: | |
132 res = checkSubs(feature, ["Note", "note", "Notes", "notes"]) | |
133 if res != "": | |
134 return res | |
135 return "None" | |
136 | |
137 def date_created(record, feature): | |
138 """Created""" | |
139 return feature.qualifiers.get("date_creation", ["None"])[0] | |
140 | |
141 def date_last_modified(record, feature): | |
142 """Last Modified""" | |
143 res = feature.qualifiers.get("date_last_modified", ["None"])[0] | |
144 if res != "None": | |
145 return res | |
146 if useSubs: | |
147 res = checkSubs(feature, ["date_last_modified"]) | |
148 if res != "": | |
149 return res | |
150 return "None" | |
151 | |
152 def description(record, feature): | |
153 """Description""" | |
154 res = feature.qualifiers.get("description", ["None"])[0] | |
155 if res != "None": | |
156 return res | |
157 if useSubs: | |
158 res = checkSubs(feature, ["description"]) | |
159 if res != "": | |
160 return res | |
161 return "None" | |
162 | |
163 def owner(record, feature): | |
164 """Owner | |
165 | |
166 User who created the feature. In a 464 scenario this may be one of | |
167 the TAs.""" | |
168 for x in ["Owner", "owner"]: | |
169 for y in feature.qualifiers.keys(): | |
170 if x == y: | |
171 return feature.qualifiers[x][0] | |
172 if useSubs: | |
173 res = checkSubs(feature, ["Owner", "owner"]) | |
174 if res != "": | |
175 return res | |
176 return "None" | |
177 | |
178 def product(record, feature): | |
179 """Product | |
180 | |
181 User entered product qualifier (collects "Product" and "product" | |
182 entries)""" | |
183 """User entered Notes""" | |
184 for x in ["product", "Product"]: | |
185 for y in feature.qualifiers.keys(): | |
186 if x == y: | |
187 return feature.qualifiers[x][0] | |
188 if useSubs: | |
189 res = checkSubs(feature, ["product", "Product"]) | |
190 if res != "": | |
191 return res | |
192 return "None" | |
193 | |
194 def note(record, feature): | |
195 """Note | |
196 | |
197 User entered Note qualifier(s)""" | |
198 return feature.qualifiers.get("Note", []) | |
199 | |
200 def strand(record, feature): | |
201 """Strand | |
202 """ | |
203 return "+" if feature.location.strand > 0 else "-" | |
204 | |
205 def sd_spacing(record, feature): | |
206 """Shine-Dalgarno spacing | |
207 """ | |
208 rbss = get_rbs_from(gene) | |
209 if len(rbss) == 0: | |
210 return "None" | |
211 else: | |
212 resp = [] | |
213 for rbs in rbss: | |
214 cdss = list(genes(feature.sub_features, feature_type="CDS", sort=True)) | |
215 if len(cdss) == 0: | |
216 return "No CDS" | |
217 if rbs.location.strand > 0: | |
218 distance = min( | |
219 cdss, key=lambda x: x.location.start - rbs.location.end | |
220 ) | |
221 distance_val = str(distance.location.start - rbs.location.end) | |
222 resp.append(distance_val) | |
223 else: | |
224 distance = min( | |
225 cdss, key=lambda x: x.location.end - rbs.location.start | |
226 ) | |
227 distance_val = str(rbs.location.start - distance.location.end) | |
228 resp.append(distance_val) | |
229 | |
230 if len(resp) == 1: | |
231 return str(resp[0]) | |
232 return resp | |
233 | |
234 def sd_seq(record, feature): | |
235 """Shine-Dalgarno sequence | |
236 """ | |
237 rbss = get_rbs_from(gene) | |
238 if len(rbss) == 0: | |
239 return "None" | |
240 else: | |
241 resp = [] | |
242 for rbs in rbss: | |
243 resp.append(str(rbs.extract(record).seq)) | |
244 if len(resp) == 1: | |
245 return str(resp[0]) | |
246 else: | |
247 return resp | |
248 | |
249 def start_codon(record, feature): | |
250 """Start Codon | |
251 """ | |
252 if feature.type == "CDS": | |
253 cdss = [feature] | |
254 else: | |
255 cdss = list(genes(feature.sub_features, feature_type="CDS", sort=True)) | |
256 | |
257 data = [x for x in cdss] | |
258 if len(data) == 1: | |
259 return str(data[0].extract(record).seq[0:3]) | |
260 else: | |
261 return [ | |
262 "{0} ({1.location.start}..{1.location.end}:{1.location.strand})".format( | |
263 x.extract(record).seq[0:3], x | |
264 ) | |
265 for x in data | |
266 ] | |
267 | |
268 def stop_codon(record, feature): | |
269 """Stop Codon | |
270 """ | |
271 return str(feature.extract(record).seq[-3:]) | |
272 | |
273 def dbxrefs(record, feature): | |
274 """DBxrefs | |
275 """ | |
276 """User entered Notes""" | |
277 for x in ["Dbxref", "db_xref", "DB_xref", "DBxref", "DB_Xref", "DBXref"]: | |
278 for y in feature.qualifiers.keys(): | |
279 if x == y: | |
280 return feature.qualifiers[x][0] | |
281 return "None" | |
282 | |
283 def upstream_feature(record, feature): | |
284 """Next gene upstream""" | |
285 if feature.strand > 0: | |
286 upstream_features = [ | |
287 x for x in sorted_features if (x.location.start < feature.location.start and x.type == "gene" and x.strand == feature.strand) | |
288 ] | |
289 if len(upstream_features) > 0: | |
290 foundSelf = False | |
291 featCheck = upstream_features[-1].sub_features | |
292 for x in featCheck: | |
293 if x == feature: | |
294 foundSelf = True | |
295 break | |
296 featCheck = featCheck + x.sub_features | |
297 if foundSelf: | |
298 if len(upstream_features) > 1: | |
299 return upstream_features[-2] | |
300 return None | |
301 return upstream_features[-1] | |
302 else: | |
303 return None | |
304 else: | |
305 upstream_features = [ | |
306 x for x in sorted_features if (x.location.end > feature.location.end and x.type == "gene" and x.strand == feature.strand) | |
307 ] | |
308 | |
309 if len(upstream_features) > 0: | |
310 foundSelf = False | |
311 featCheck = upstream_features[0].sub_features | |
312 for x in featCheck: | |
313 if x == feature: | |
314 foundSelf = True | |
315 break | |
316 featCheck = featCheck + x.sub_features | |
317 if foundSelf: | |
318 if len(upstream_features) > 1: | |
319 return upstream_features[1] | |
320 return None | |
321 return upstream_features[0] | |
322 else: | |
323 return None | |
324 | |
325 def upstream_feature__name(record, feature): | |
326 """Next gene upstream""" | |
327 up = upstream_feature(record, feature) | |
328 if up: | |
329 return str(up.id) | |
330 return "None" | |
331 | |
332 def ig_dist(record, feature): | |
333 """Distance to next upstream gene on same strand""" | |
334 up = upstream_feature(record, feature) | |
335 if up: | |
336 dist = None | |
337 if feature.strand > 0: | |
338 dist = feature.location.start - up.location.end | |
339 else: | |
340 dist = up.location.start - feature.location.end | |
341 return str(dist) | |
342 else: | |
343 return "None" | |
344 | |
345 def _main_gaf_func(record, feature, gaf_data, attr): | |
346 if feature.id in gaf_data: | |
347 return [x[attr] for x in gaf_data[feature.id]] | |
348 return [] | |
349 | |
350 def gaf_annotation_extension(record, feature, gaf_data): | |
351 """GAF Annotation Extension | |
352 | |
353 Contains cross references to other ontologies that can be used | |
354 to qualify or enhance the annotation. The cross-reference is | |
355 prefaced by an appropriate GO relationship; references to | |
356 multiple ontologies can be entered. For example, if a gene | |
357 product is localized to the mitochondria of lymphocytes, the GO | |
358 ID (column 5) would be mitochondrion ; GO:0005439, and the | |
359 annotation extension column would contain a cross-reference to | |
360 the term lymphocyte from the Cell Type Ontology. | |
361 """ | |
362 return _main_gaf_func(record, feature, gaf_data, "annotation_extension") | |
363 | |
364 def gaf_aspect(record, feature, gaf_data): | |
365 """GAF Aspect code | |
366 | |
367 E.g. P (biological process), F (molecular function) or C (cellular component) | |
368 """ | |
369 return _main_gaf_func(record, feature, gaf_data, "aspect") | |
370 | |
371 def gaf_assigned_by(record, feature, gaf_data): | |
372 """GAF Creating Organisation | |
373 """ | |
374 return _main_gaf_func(record, feature, gaf_data, "assigned_by") | |
375 | |
376 def gaf_date(record, feature, gaf_data): | |
377 """GAF Creation Date | |
378 """ | |
379 return _main_gaf_func(record, feature, gaf_data, "date") | |
380 | |
381 def gaf_db(record, feature, gaf_data): | |
382 """GAF DB | |
383 """ | |
384 return _main_gaf_func(record, feature, gaf_data, "db") | |
385 | |
386 def gaf_db_reference(record, feature, gaf_data): | |
387 """GAF DB Reference | |
388 """ | |
389 return _main_gaf_func(record, feature, gaf_data, "db_reference") | |
390 | |
391 def gaf_evidence_code(record, feature, gaf_data): | |
392 """GAF Evidence Code | |
393 """ | |
394 return _main_gaf_func(record, feature, gaf_data, "evidence_code") | |
395 | |
396 def gaf_go_id(record, feature, gaf_data): | |
397 """GAF GO ID | |
398 """ | |
399 return _main_gaf_func(record, feature, gaf_data, "go_id") | |
400 | |
401 def gaf_go_term(record, feature, gaf_data): | |
402 """GAF GO Term | |
403 """ | |
404 return _main_gaf_func(record, feature, gaf_data, "go_term") | |
405 | |
406 def gaf_id(record, feature, gaf_data): | |
407 """GAF ID | |
408 """ | |
409 return _main_gaf_func(record, feature, gaf_data, "id") | |
410 | |
411 def gaf_notes(record, feature, gaf_data): | |
412 """GAF Notes | |
413 """ | |
414 return _main_gaf_func(record, feature, gaf_data, "notes") | |
415 | |
416 def gaf_owner(record, feature, gaf_data): | |
417 """GAF Creator | |
418 """ | |
419 return _main_gaf_func(record, feature, gaf_data, "owner") | |
420 | |
421 def gaf_with_or_from(record, feature, gaf_data): | |
422 """GAF With/From | |
423 """ | |
424 return _main_gaf_func(record, feature, gaf_data, "with_or_from") | |
425 | |
426 cols = [] | |
427 data = [] | |
428 funcs = [] | |
429 lcl = locals() | |
430 for x in [y.strip().lower() for y in wanted_cols.split(",")]: | |
431 if not x: | |
432 continue | |
433 if x == "type": | |
434 x = "featureType" | |
435 if x in lcl: | |
436 funcs.append(lcl[x]) | |
437 # Keep track of docs | |
438 func_doc = lcl[x].__doc__.strip().split("\n\n") | |
439 # If there's a double newline, assume following text is the | |
440 # "help" and the first part is the "name". Generate empty help | |
441 # if not provided | |
442 if len(func_doc) == 1: | |
443 func_doc += [""] | |
444 cols.append(func_doc) | |
445 elif "__" in x: | |
446 chosen_funcs = [lcl[y] for y in x.split("__")] | |
447 func_doc = [ | |
448 " of ".join( | |
449 [y.__doc__.strip().split("\n\n")[0] for y in chosen_funcs[::-1]] | |
450 ) | |
451 ] | |
452 cols.append(func_doc) | |
453 funcs.append(chosen_funcs) | |
454 | |
455 for gene in genes_all(record.features, getTypes, sort=True): | |
456 row = [] | |
457 for func in funcs: | |
458 if isinstance(func, list): | |
459 # If we have a list of functions, repeatedly apply them | |
460 value = gene | |
461 for f in func: | |
462 if value is None: | |
463 value = "None" | |
464 break | |
465 | |
466 value = f(record, value) | |
467 else: | |
468 # Otherwise just apply the lone function | |
469 if func.__name__.startswith("gaf_"): | |
470 value = func(record, gene, gaf_data) | |
471 else: | |
472 value = func(record, gene) | |
473 | |
474 if isinstance(value, list): | |
475 collapsed_value = ", ".join(value) | |
476 value = [str(collapsed_value)]#.encode("unicode_escape")] | |
477 else: | |
478 value = str(value)#.encode("unicode_escape") | |
479 | |
480 row.append(value) | |
481 # print row | |
482 data.append(row) | |
483 return data, cols | |
484 | |
485 | |
486 def parseGafData(file): | |
487 cols = [] | |
488 data = {} | |
489 # '10d04a01-5ed8-49c8-b724-d6aa4df5a98d': { | |
490 # 'annotation_extension': '', | |
491 # 'aspect': '', | |
492 # 'assigned_by': 'CPT', | |
493 # 'date': '2017-05-04T16:25:22.161916Z', | |
494 # 'db': 'UniProtKB', | |
495 # 'db_reference': 'GO_REF:0000100', | |
496 # 'evidence_code': 'ISA', | |
497 # 'gene': '0d307196-833d-46e8-90e9-d80f7a041d88', | |
498 # 'go_id': 'GO:0039660', | |
499 # 'go_term': 'structural constituent of virion', | |
500 # 'id': '10d04a01-5ed8-49c8-b724-d6aa4df5a98d', | |
501 # 'notes': 'hit was putative minor structural protein', | |
502 # 'owner': 'amarc1@tamu.edu', | |
503 # 'with_or_from': 'UNIREF90:B2ZYZ7' | |
504 # }, | |
505 for row in file: | |
506 if row.startswith("#"): | |
507 # Header | |
508 cols = ( | |
509 row.strip().replace("# ", "").replace("GO Term", "go_term").split("\t") | |
510 ) | |
511 else: | |
512 line = row.strip().split("\t") | |
513 tmp = dict(zip(cols, line)) | |
514 if "gene" not in tmp.keys(): | |
515 continue | |
516 if tmp["gene"] not in data: | |
517 data[tmp["gene"]] = [] | |
518 | |
519 data[tmp["gene"]].append(tmp) | |
520 return data | |
521 | |
522 | |
523 def evaluate_and_report( | |
524 annotations, | |
525 genome, | |
526 types="gene", | |
527 reportTemplateName="phage_annotation_validator.html", | |
528 annotationTableCols="", | |
529 gafData=None, | |
530 searchSubs = False, | |
531 ): | |
532 """ | |
533 Generate our HTML evaluation of the genome | |
534 """ | |
535 # Get features from GFF file | |
536 seq_dict = SeqIO.to_dict(SeqIO.parse(genome, "fasta")) | |
537 # Get the first GFF3 record | |
538 # TODO: support multiple GFF3 files. | |
539 at_table_data = [] | |
540 gaf = {} | |
541 if gafData: | |
542 gaf = parseGafData(gafData) | |
543 | |
544 for record in gffParse(annotations, base_dict=seq_dict): | |
545 if reportTemplateName.endswith(".html"): | |
546 record.id = record.id.replace(".", "-") | |
547 log.info("Producing an annotation table for %s" % record.id) | |
548 annotation_table_data, annotation_table_col_names = annotation_table_report( | |
549 record, types, annotationTableCols, gaf, searchSubs | |
550 ) | |
551 at_table_data.append((record, annotation_table_data)) | |
552 # break | |
553 | |
554 # This is data that will go into our HTML template | |
555 kwargs = { | |
556 "annotation_table_data": at_table_data, | |
557 "annotation_table_col_names": annotation_table_col_names, | |
558 } | |
559 | |
560 env = Environment( | |
561 loader=FileSystemLoader(SCRIPT_PATH), trim_blocks=True, lstrip_blocks=True | |
562 ) | |
563 if reportTemplateName.endswith(".html"): | |
564 env.filters["nice_id"] = str(get_gff3_id).replace(".", "-") | |
565 else: | |
566 env.filters["nice_id"] = get_gff3_id | |
567 | |
568 def join(listy): | |
569 return "\n".join(listy) | |
570 | |
571 env.filters.update({"join": join}) | |
572 tpl = env.get_template(reportTemplateName) | |
573 return tpl.render(**kwargs).encode("utf-8") | |
574 | |
575 | |
576 if __name__ == "__main__": | |
577 parser = argparse.ArgumentParser( | |
578 description="rebase gff3 features against parent locations", epilog="" | |
579 ) | |
580 parser.add_argument( | |
581 "annotations", type=argparse.FileType("r"), help="Parent GFF3 annotations" | |
582 ) | |
583 parser.add_argument("genome", type=argparse.FileType("r"), help="Genome Sequence") | |
584 | |
585 parser.add_argument( | |
586 "--types", | |
587 help="Select extra types to display in output (Will always include gene)", | |
588 ) | |
589 | |
590 parser.add_argument( | |
591 "--reportTemplateName", | |
592 help="Report template file name", | |
593 default="phageqc_report_full.html", | |
594 ) | |
595 parser.add_argument( | |
596 "--annotationTableCols", | |
597 help="Select columns to report in the annotation table output format", | |
598 ) | |
599 parser.add_argument( | |
600 "--gafData", help="CPT GAF-like table", type=argparse.FileType("r") | |
601 ) | |
602 parser.add_argument( | |
603 "--searchSubs", help="Attempt to populate fields from sub-features if qualifier is empty", action="store_true" | |
604 ) | |
605 | |
606 args = parser.parse_args() | |
607 | |
608 print(evaluate_and_report(**vars(args)).decode("utf-8")) |