Mercurial > repos > cpt > cpt_xmfa
diff xmfa_process.py @ 1:bfe0d989004d draft
planemo upload commit 94b0cd1fff0826c6db3e7dc0c91c0c5a8be8bb0c
author | cpt |
---|---|
date | Mon, 05 Jun 2023 02:54:26 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/xmfa_process.py Mon Jun 05 02:54:26 2023 +0000 @@ -0,0 +1,115 @@ +#!/usr/bin/env python +from Bio import SeqIO +import argparse +import json +import os +from CPT_GFFParser import gffParse, gffWrite + + +def parse_xmfa(xmfa): + """Simple XMFA parser until https://github.com/biopython/biopython/pull/544""" + current_lcb = [] + current_seq = {} + for line in xmfa.readlines(): + if line.startswith("#"): + continue + + if line.strip() == "=": + if "id" in current_seq: + current_lcb.append(current_seq) + current_seq = {} + yield current_lcb + current_lcb = [] + else: + line = line.strip() + if line.startswith(">"): + if "id" in current_seq: + current_lcb.append(current_seq) + current_seq = {} + data = line.strip().split() + # 0 1 2 3 4 5 + # > 1:5986-6406 + CbK.fa # CbK_gp011 + id, loc = data[1].split(":") + start, end = loc.split("-") + current_seq = { + "rid": "_".join(data[1:]), + "id": id, + "start": int(start), + "end": int(end), + "strand": 1 if data[2] == "+" else -1, + "seq": "", + "comment": "", + } + if len(data) > 5: + current_seq["comment"] = " ".join(data[5:]) + # else: + # current_seq['seq'] += line.strip() + + +def percent_identity(a, b): + """Calculate % identity, ignoring gaps in the host sequence""" + match = 0 + mismatch = 0 + for char_a, char_b in zip(list(a), list(b)): + if char_a == "-": + continue + if char_a == char_b: + match += 1 + else: + mismatch += 1 + + if match + mismatch == 0: + return 0.0 + return 100 * float(match) / (match + mismatch) + + +def get_fasta_ids(sequences): + """Returns a list of fasta records in the order they appear""" + ids = [] + for seq in SeqIO.parse(sequences, "fasta"): + ids.append(seq.id) + return ids + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="parse xmfa file") + parser.add_argument("gff3", type=argparse.FileType("r"), help="Multi-GFF3 File") + parser.add_argument("fasta", type=argparse.FileType("r"), help="Multi-FA file") + parser.add_argument("xmfa", type=argparse.FileType("r"), help="XMFA File") + parser.add_argument("output_dir", type=str, help="output directory") + args = parser.parse_args() + + fasta_list = get_fasta_ids(args.fasta) + lcbs = parse_xmfa(args.xmfa) + + if not os.path.exists(args.output_dir): + os.makedirs(args.output_dir) + + output = {"fasta": [], "gff3": [], "xmfa": None} + + processed_xmfa = os.path.join(args.output_dir, "regions.json") + with open(processed_xmfa, "w") as handle: + json.dump([lcb for lcb in lcbs if len(lcb) > 1], handle, sort_keys=True) + + output["xmfa"] = processed_xmfa + + # Have to seek because we already access args.fasta once in id_tn_dict + args.fasta.seek(0) + # Load up sequence(s) for GFF3 data + seq_dict = SeqIO.to_dict(SeqIO.parse(args.fasta, "fasta")) + # Parse GFF3 records + gffs = gffParse(args.gff3, base_dict=seq_dict) + for record in sorted(gffs, key=lambda rec: fasta_list.index(rec.id)): + gff_output = os.path.join(args.output_dir, record.id + ".gff") + with open(gff_output, "w") as handle: + gffWrite([record], handle) + output["gff3"].append(gff_output) + + fa_output = os.path.join(args.output_dir, record.id + ".txt") + with open(fa_output, "w") as handle: + handle.write(str(record.seq)) + output["fasta"].append( + {"path": fa_output, "length": len(record.seq), "name": record.id} + ) + + print(json.dumps(output, sort_keys=True))