Mercurial > repos > fubar > jbrowse2
comparison GFFOutput.py @ 17:4c201a3d4755 draft
planemo upload for repository https://github.com/usegalaxy-eu/temporary-tools/tree/master/jbrowse2 commit a37bfdfc108501b11c7b2aa15efb1bd16f0c4b66
author | fubar |
---|---|
date | Sun, 28 Jan 2024 06:48:52 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
16:1fe91657bfd6 | 17:4c201a3d4755 |
---|---|
1 """Output Biopython SeqRecords and SeqFeatures to GFF3 format. | |
2 | |
3 The target format is GFF3, the current GFF standard: | |
4 http://www.sequenceontology.org/gff3.shtml | |
5 """ | |
6 from six.moves import urllib | |
7 | |
8 from Bio import SeqIO | |
9 | |
10 | |
11 class _IdHandler: | |
12 """Generate IDs for GFF3 Parent/Child | |
13 relationships where they don't exist.""" | |
14 | |
15 def __init__(self): | |
16 self._prefix = "biopygen" | |
17 self._counter = 1 | |
18 self._seen_ids = [] | |
19 | |
20 def _generate_id(self, quals): | |
21 """Generate a unique ID not present in our existing IDs.""" | |
22 gen_id = self._get_standard_id(quals) | |
23 if gen_id is None: | |
24 while 1: | |
25 gen_id = "%s%s" % (self._prefix, self._counter) | |
26 if gen_id not in self._seen_ids: | |
27 break | |
28 self._counter += 1 | |
29 return gen_id | |
30 | |
31 def _get_standard_id(self, quals): | |
32 """Retrieve standardized IDs from other sources like NCBI GenBank. | |
33 | |
34 This tries to find IDs from known key/values when stored differently | |
35 than GFF3 specifications. | |
36 """ | |
37 possible_keys = ["transcript_id", "protein_id"] | |
38 for test_key in possible_keys: | |
39 if test_key in quals: | |
40 cur_id = quals[test_key] | |
41 if isinstance(cur_id, tuple) or isinstance(cur_id, list): | |
42 return cur_id[0] | |
43 else: | |
44 return cur_id | |
45 return None | |
46 | |
47 def update_quals(self, quals, has_children): | |
48 """Update a set of qualifiers, adding an ID if necessary.""" | |
49 cur_id = quals.get("ID", None) | |
50 # if we have an ID, record it | |
51 if cur_id: | |
52 if not isinstance(cur_id, list) and not isinstance(cur_id, tuple): | |
53 cur_id = [cur_id] | |
54 for add_id in cur_id: | |
55 self._seen_ids.append(add_id) | |
56 # if we need one and don't have it, create a new one | |
57 elif has_children: | |
58 new_id = self._generate_id(quals) | |
59 self._seen_ids.append(new_id) | |
60 quals["ID"] = [new_id] | |
61 return quals | |
62 | |
63 | |
64 class GFF3Writer: | |
65 """Write GFF3 files starting with standard Biopython objects.""" | |
66 | |
67 def __init__(self): | |
68 pass | |
69 | |
70 def write(self, recs, out_handle, include_fasta=False): | |
71 """Write the provided records to the given handle in GFF3 format.""" | |
72 id_handler = _IdHandler() | |
73 self._write_header(out_handle) | |
74 fasta_recs = [] | |
75 try: | |
76 recs = iter(recs) | |
77 except TypeError: | |
78 recs = [recs] | |
79 for rec in recs: | |
80 self._write_rec(rec, out_handle) | |
81 self._write_annotations(rec.annotations, rec.id, len(rec.seq), out_handle) | |
82 for sf in rec.features: | |
83 sf = self._clean_feature(sf) | |
84 id_handler = self._write_feature(sf, rec.id, out_handle, id_handler) | |
85 if include_fasta and len(rec.seq) > 0: | |
86 fasta_recs.append(rec) | |
87 if len(fasta_recs) > 0: | |
88 self._write_fasta(fasta_recs, out_handle) | |
89 | |
90 def _clean_feature(self, feature): | |
91 quals = {} | |
92 for key, val in feature.qualifiers.items(): | |
93 if not isinstance(val, (list, tuple)): | |
94 val = [val] | |
95 val = [str(x) for x in val] | |
96 quals[key] = val | |
97 feature.qualifiers = quals | |
98 # Support for Biopython 1.68 and above, which removed sub_features | |
99 if not hasattr(feature, "sub_features"): | |
100 feature.sub_features = [] | |
101 clean_sub = [self._clean_feature(f) for f in feature.sub_features] | |
102 feature.sub_features = clean_sub | |
103 return feature | |
104 | |
105 def _write_rec(self, rec, out_handle): | |
106 # if we have a SeqRecord, write out optional directive | |
107 if len(rec.seq) > 0: | |
108 out_handle.write("##sequence-region %s 1 %s\n" % (rec.id, len(rec.seq))) | |
109 | |
110 def _get_phase(self, feature): | |
111 if "phase" in feature.qualifiers: | |
112 phase = feature.qualifiers["phase"][0] | |
113 elif feature.type == "CDS": | |
114 phase = int(feature.qualifiers.get("codon_start", [1])[0]) - 1 | |
115 else: | |
116 phase = "." | |
117 return str(phase) | |
118 | |
119 def _write_feature(self, feature, rec_id, out_handle, id_handler, parent_id=None): | |
120 """Write a feature with location information.""" | |
121 if feature.location.strand == 1: | |
122 strand = "+" | |
123 elif feature.location.strand == -1: | |
124 strand = "-" | |
125 else: | |
126 strand = "." | |
127 # remove any standard features from the qualifiers | |
128 quals = feature.qualifiers.copy() | |
129 for std_qual in ["source", "score", "phase"]: | |
130 if std_qual in quals and len(quals[std_qual]) == 1: | |
131 del quals[std_qual] | |
132 # add a link to a parent identifier if it exists | |
133 if parent_id: | |
134 if "Parent" not in quals: | |
135 quals["Parent"] = [] | |
136 quals["Parent"].append(parent_id) | |
137 quals = id_handler.update_quals(quals, len(feature.sub_features) > 0) | |
138 if feature.type: | |
139 ftype = feature.type | |
140 else: | |
141 ftype = "sequence_feature" | |
142 parts = [ | |
143 str(rec_id), | |
144 feature.qualifiers.get("source", ["feature"])[0], | |
145 ftype, | |
146 str(feature.location.start + 1), # 1-based indexing | |
147 str(feature.location.end), | |
148 feature.qualifiers.get("score", ["."])[0], | |
149 strand, | |
150 self._get_phase(feature), | |
151 self._format_keyvals(quals), | |
152 ] | |
153 out_handle.write("\t".join(parts) + "\n") | |
154 for sub_feature in feature.sub_features: | |
155 id_handler = self._write_feature( | |
156 sub_feature, | |
157 rec_id, | |
158 out_handle, | |
159 id_handler, | |
160 quals["ID"][0], | |
161 ) | |
162 return id_handler | |
163 | |
164 def _format_keyvals(self, keyvals): | |
165 format_kvs = [] | |
166 for key in sorted(keyvals.keys()): | |
167 values = keyvals[key] | |
168 key = key.strip() | |
169 format_vals = [] | |
170 if not isinstance(values, list) or isinstance(values, tuple): | |
171 values = [values] | |
172 for val in values: | |
173 val = urllib.parse.quote(str(val).strip(), safe=":/ ") | |
174 if (key and val) and val not in format_vals: | |
175 format_vals.append(val) | |
176 format_kvs.append("%s=%s" % (key, ",".join(format_vals))) | |
177 return ";".join(format_kvs) | |
178 | |
179 def _write_annotations(self, anns, rec_id, size, out_handle): | |
180 """Add annotations which refer to an entire sequence.""" | |
181 format_anns = self._format_keyvals(anns) | |
182 if format_anns: | |
183 parts = [ | |
184 rec_id, | |
185 "annotation", | |
186 "remark", | |
187 "1", | |
188 str(size if size > 1 else 1), | |
189 ".", | |
190 ".", | |
191 ".", | |
192 format_anns, | |
193 ] | |
194 out_handle.write("\t".join(parts) + "\n") | |
195 | |
196 def _write_header(self, out_handle): | |
197 """Write out standard header directives.""" | |
198 out_handle.write("##gff-version 3\n") | |
199 | |
200 def _write_fasta(self, recs, out_handle): | |
201 """Write sequence records using the ##FASTA directive.""" | |
202 out_handle.write("##FASTA\n") | |
203 SeqIO.write(recs, out_handle, "fasta") | |
204 | |
205 | |
206 def write(recs, out_handle, include_fasta=False): | |
207 """High level interface to write GFF3 files from SeqRecords and SeqFeatures. | |
208 | |
209 If include_fasta is True, the GFF3 file will include sequence information | |
210 using the ##FASTA directive. | |
211 """ | |
212 writer = GFF3Writer() | |
213 return writer.write(recs, out_handle, include_fasta) |