Mercurial > repos > bjoern-gruening > glimmer_hmm
diff glimmerHMM/BCBio/GFF/GFFOutput.py @ 0:0a15677c6668 default tip
Uploaded
author | bjoern-gruening |
---|---|
date | Wed, 11 Jan 2012 09:58:35 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/glimmerHMM/BCBio/GFF/GFFOutput.py Wed Jan 11 09:58:35 2012 -0500 @@ -0,0 +1,169 @@ +"""Output Biopython SeqRecords and SeqFeatures to GFF3 format. + +The target format is GFF3, the current GFF standard: + http://www.sequenceontology.org/gff3.shtml +""" +import urllib + +class _IdHandler: + """Generate IDs for GFF3 Parent/Child relationships where they don't exist. + """ + def __init__(self): + self._prefix = "biopygen" + self._counter = 1 + self._seen_ids = [] + + def _generate_id(self, quals): + """Generate a unique ID not present in our existing IDs. + """ + gen_id = self._get_standard_id(quals) + if gen_id is None: + while 1: + gen_id = "%s%s" % (self._prefix, self._counter) + if gen_id not in self._seen_ids: + break + self._counter += 1 + return gen_id + + def _get_standard_id(self, quals): + """Retrieve standardized IDs from other sources like NCBI GenBank. + + This tries to find IDs from known key/values when stored differently + than GFF3 specifications. + """ + possible_keys = ["transcript_id", "protein_id"] + for test_key in possible_keys: + if quals.has_key(test_key): + cur_id = quals[test_key] + if isinstance(cur_id, tuple) or isinstance(cur_id, list): + return cur_id[0] + else: + return cur_id + return None + + def update_quals(self, quals, has_children): + """Update a set of qualifiers, adding an ID if necessary. + """ + cur_id = quals.get("ID", None) + # if we have an ID, record it + if cur_id: + if not isinstance(cur_id, list) and not isinstance(cur_id, tuple): + cur_id = [cur_id] + for add_id in cur_id: + self._seen_ids.append(add_id) + # if we need one and don't have it, create a new one + elif has_children: + new_id = self._generate_id(quals) + self._seen_ids.append(new_id) + quals["ID"] = [new_id] + return quals + +class GFF3Writer: + """Write GFF3 files starting with standard Biopython objects. + """ + def __init__(self): + pass + + def write(self, recs, out_handle): + """Write the provided records to the given handle in GFF3 format. + """ + id_handler = _IdHandler() + self._write_header(out_handle) + for rec in recs: + self._write_rec(rec, out_handle) + self._write_annotations(rec.annotations, rec.id, out_handle) + for sf in rec.features: + sf = self._clean_feature(sf) + id_handler = self._write_feature(sf, rec.id, out_handle, + id_handler) + + def _clean_feature(self, feature): + quals = {} + for key, val in feature.qualifiers.items(): + if not isinstance(val, (list, tuple)): + val = [val] + val = [str(x) for x in val] + quals[key] = val + feature.qualifiers = quals + clean_sub = [self._clean_feature(f) for f in feature.sub_features] + feature.sub_features = clean_sub + return feature + + def _write_rec(self, rec, out_handle): + # if we have a SeqRecord, write out optional directive + if len(rec.seq) > 0: + out_handle.write("##sequence-region %s 1 %s\n" % (rec.id, len(rec.seq))) + + def _write_feature(self, feature, rec_id, out_handle, id_handler, + parent_id=None): + """Write a feature with location information. + """ + if feature.strand == 1: + strand = '+' + elif feature.strand == -1: + strand = '-' + else: + strand = '.' + # remove any standard features from the qualifiers + quals = feature.qualifiers.copy() + for std_qual in ["source", "score", "phase"]: + if quals.has_key(std_qual) and len(quals[std_qual]) == 1: + del quals[std_qual] + # add a link to a parent identifier if it exists + if parent_id: + if not quals.has_key("Parent"): + quals["Parent"] = [] + quals["Parent"].append(parent_id) + quals = id_handler.update_quals(quals, len(feature.sub_features) > 0) + if feature.type: + ftype = feature.type + else: + ftype = "sequence_feature" + parts = [str(rec_id), + feature.qualifiers.get("source", ["feature"])[0], + ftype, + str(feature.location.nofuzzy_start + 1), # 1-based indexing + str(feature.location.nofuzzy_end), + feature.qualifiers.get("score", ["."])[0], + strand, + str(feature.qualifiers.get("phase", ["."])[0]), + self._format_keyvals(quals)] + out_handle.write("\t".join(parts) + "\n") + for sub_feature in feature.sub_features: + id_handler = self._write_feature(sub_feature, rec_id, out_handle, + id_handler, quals["ID"][0]) + return id_handler + + def _format_keyvals(self, keyvals): + format_kvs = [] + for key, values in keyvals.items(): + key = key.strip() + format_vals = [] + if not isinstance(values, list) or isinstance(values, tuple): + values = [values] + for val in values: + val = urllib.quote(str(val).strip()) + if ((key and val) and val not in format_vals): + format_vals.append(val) + format_kvs.append("%s=%s" % (key, ",".join(format_vals))) + return ";".join(format_kvs) + + def _write_annotations(self, anns, rec_id, out_handle): + """Add annotations which refer to an entire sequence. + """ + format_anns = self._format_keyvals(anns) + if format_anns: + parts = [rec_id, "annotation", "remark", ".", ".", ".", ".", ".", + format_anns] + out_handle.write("\t".join(parts) + "\n") + + def _write_header(self, out_handle): + """Write out standard header directives. + """ + out_handle.write("##gff-version 3\n") + +def write(recs, out_handle): + """High level interface to write GFF3 files from SeqRecords and SeqFeatures. + """ + writer = GFF3Writer() + return writer.write(recs, out_handle)