Mercurial > repos > fubar > jbrowse2
diff GFFOutput.py @ 17:4c201a3d4755 draft
planemo upload for repository https://github.com/usegalaxy-eu/temporary-tools/tree/master/jbrowse2 commit a37bfdfc108501b11c7b2aa15efb1bd16f0c4b66
author | fubar |
---|---|
date | Sun, 28 Jan 2024 06:48:52 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/GFFOutput.py Sun Jan 28 06:48:52 2024 +0000 @@ -0,0 +1,213 @@ +"""Output Biopython SeqRecords and SeqFeatures to GFF3 format. + +The target format is GFF3, the current GFF standard: + http://www.sequenceontology.org/gff3.shtml +""" +from six.moves import urllib + +from Bio import SeqIO + + +class _IdHandler: + """Generate IDs for GFF3 Parent/Child + relationships where they don't exist.""" + + def __init__(self): + self._prefix = "biopygen" + self._counter = 1 + self._seen_ids = [] + + def _generate_id(self, quals): + """Generate a unique ID not present in our existing IDs.""" + gen_id = self._get_standard_id(quals) + if gen_id is None: + while 1: + gen_id = "%s%s" % (self._prefix, self._counter) + if gen_id not in self._seen_ids: + break + self._counter += 1 + return gen_id + + def _get_standard_id(self, quals): + """Retrieve standardized IDs from other sources like NCBI GenBank. + + This tries to find IDs from known key/values when stored differently + than GFF3 specifications. + """ + possible_keys = ["transcript_id", "protein_id"] + for test_key in possible_keys: + if test_key in quals: + cur_id = quals[test_key] + if isinstance(cur_id, tuple) or isinstance(cur_id, list): + return cur_id[0] + else: + return cur_id + return None + + def update_quals(self, quals, has_children): + """Update a set of qualifiers, adding an ID if necessary.""" + cur_id = quals.get("ID", None) + # if we have an ID, record it + if cur_id: + if not isinstance(cur_id, list) and not isinstance(cur_id, tuple): + cur_id = [cur_id] + for add_id in cur_id: + self._seen_ids.append(add_id) + # if we need one and don't have it, create a new one + elif has_children: + new_id = self._generate_id(quals) + self._seen_ids.append(new_id) + quals["ID"] = [new_id] + return quals + + +class GFF3Writer: + """Write GFF3 files starting with standard Biopython objects.""" + + def __init__(self): + pass + + def write(self, recs, out_handle, include_fasta=False): + """Write the provided records to the given handle in GFF3 format.""" + id_handler = _IdHandler() + self._write_header(out_handle) + fasta_recs = [] + try: + recs = iter(recs) + except TypeError: + recs = [recs] + for rec in recs: + self._write_rec(rec, out_handle) + self._write_annotations(rec.annotations, rec.id, len(rec.seq), out_handle) + for sf in rec.features: + sf = self._clean_feature(sf) + id_handler = self._write_feature(sf, rec.id, out_handle, id_handler) + if include_fasta and len(rec.seq) > 0: + fasta_recs.append(rec) + if len(fasta_recs) > 0: + self._write_fasta(fasta_recs, out_handle) + + def _clean_feature(self, feature): + quals = {} + for key, val in feature.qualifiers.items(): + if not isinstance(val, (list, tuple)): + val = [val] + val = [str(x) for x in val] + quals[key] = val + feature.qualifiers = quals + # Support for Biopython 1.68 and above, which removed sub_features + if not hasattr(feature, "sub_features"): + feature.sub_features = [] + clean_sub = [self._clean_feature(f) for f in feature.sub_features] + feature.sub_features = clean_sub + return feature + + def _write_rec(self, rec, out_handle): + # if we have a SeqRecord, write out optional directive + if len(rec.seq) > 0: + out_handle.write("##sequence-region %s 1 %s\n" % (rec.id, len(rec.seq))) + + def _get_phase(self, feature): + if "phase" in feature.qualifiers: + phase = feature.qualifiers["phase"][0] + elif feature.type == "CDS": + phase = int(feature.qualifiers.get("codon_start", [1])[0]) - 1 + else: + phase = "." + return str(phase) + + def _write_feature(self, feature, rec_id, out_handle, id_handler, parent_id=None): + """Write a feature with location information.""" + if feature.location.strand == 1: + strand = "+" + elif feature.location.strand == -1: + strand = "-" + else: + strand = "." + # remove any standard features from the qualifiers + quals = feature.qualifiers.copy() + for std_qual in ["source", "score", "phase"]: + if std_qual in quals and len(quals[std_qual]) == 1: + del quals[std_qual] + # add a link to a parent identifier if it exists + if parent_id: + if "Parent" not in quals: + quals["Parent"] = [] + quals["Parent"].append(parent_id) + quals = id_handler.update_quals(quals, len(feature.sub_features) > 0) + if feature.type: + ftype = feature.type + else: + ftype = "sequence_feature" + parts = [ + str(rec_id), + feature.qualifiers.get("source", ["feature"])[0], + ftype, + str(feature.location.start + 1), # 1-based indexing + str(feature.location.end), + feature.qualifiers.get("score", ["."])[0], + strand, + self._get_phase(feature), + self._format_keyvals(quals), + ] + out_handle.write("\t".join(parts) + "\n") + for sub_feature in feature.sub_features: + id_handler = self._write_feature( + sub_feature, + rec_id, + out_handle, + id_handler, + quals["ID"][0], + ) + return id_handler + + def _format_keyvals(self, keyvals): + format_kvs = [] + for key in sorted(keyvals.keys()): + values = keyvals[key] + key = key.strip() + format_vals = [] + if not isinstance(values, list) or isinstance(values, tuple): + values = [values] + for val in values: + val = urllib.parse.quote(str(val).strip(), safe=":/ ") + if (key and val) and val not in format_vals: + format_vals.append(val) + format_kvs.append("%s=%s" % (key, ",".join(format_vals))) + return ";".join(format_kvs) + + def _write_annotations(self, anns, rec_id, size, out_handle): + """Add annotations which refer to an entire sequence.""" + format_anns = self._format_keyvals(anns) + if format_anns: + parts = [ + rec_id, + "annotation", + "remark", + "1", + str(size if size > 1 else 1), + ".", + ".", + ".", + format_anns, + ] + out_handle.write("\t".join(parts) + "\n") + + def _write_header(self, out_handle): + """Write out standard header directives.""" + out_handle.write("##gff-version 3\n") + + def _write_fasta(self, recs, out_handle): + """Write sequence records using the ##FASTA directive.""" + out_handle.write("##FASTA\n") + SeqIO.write(recs, out_handle, "fasta") + + +def write(recs, out_handle, include_fasta=False): + """High level interface to write GFF3 files from SeqRecords and SeqFeatures. + + If include_fasta is True, the GFF3 file will include sequence information + using the ##FASTA directive. + """ + writer = GFF3Writer() + return writer.write(recs, out_handle, include_fasta)