Mercurial > repos > petr-novak > various_galaxy_tools
diff gff_to_bed_converter.py @ 0:696e702ebf74 draft
"planemo upload commit 0f6eca49bafc3c946189d793161a7f81d595e1a1-dirty"
author | petr-novak |
---|---|
date | Mon, 09 May 2022 08:26:30 +0000 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/gff_to_bed_converter.py Mon May 09 08:26:30 2022 +0000 @@ -0,0 +1,160 @@ +#!/usr/bin/env python +from __future__ import print_function + +import sys + +from galaxy.datatypes.util.gff_util import parse_gff_attributes + + +def get_bed_line(chrom, name, strand, blocks): + """Returns a BED line for given data.""" + + if len(blocks) == 1: + # Use simple BED format if there is only a single block: + # chrom, chromStart, chromEnd, name, score, strand + # + start, end = blocks[0] + return "%s\t%i\t%i\t%s\t0\t%s\n" % (chrom, start, end, name, strand) + + # + # Build lists for transcript blocks' starts, sizes. + # + + # Get transcript start, end. + t_start = sys.maxsize + t_end = -1 + for block_start, block_end in blocks: + if block_start < t_start: + t_start = block_start + if block_end > t_end: + t_end = block_end + + # Get block starts, sizes. + block_starts = [] + block_sizes = [] + for block_start, block_end in blocks: + block_starts.append(str(block_start - t_start)) + block_sizes.append(str(block_end - block_start)) + + # + # Create BED entry. + # Bed format: chrom, chromStart, chromEnd, name, score, strand, \ + # thickStart, thickEnd, itemRgb, blockCount, blockSizes, blockStarts + # + # Render complete feature with thick blocks. There's no clear way to do this unless + # we analyze the block names, but making everything thick makes more sense than + # making everything thin. + # + return "%s\t%i\t%i\t%s\t0\t%s\t%i\t%i\t0\t%i\t%s\t%s\n" % ( + chrom, + t_start, + t_end, + name, + strand, + t_start, + t_end, + len(block_starts), + ",".join(block_sizes), + ",".join(block_starts), + ) + + +def __main__(): + input_name = sys.argv[1] + output_name = sys.argv[2] + skipped_lines = 0 + first_skipped_line = 0 + i = 0 + cur_transcript_chrome = None + cur_transcript_id = None + cur_transcript_strand = None + cur_transcripts_blocks = [] # (start, end) for each block. + with open(output_name, "w") as out, open(input_name) as in_fh: + for i, line in enumerate(in_fh): + line = line.rstrip("\r\n") + if line and not line.startswith("#"): + try: + # GFF format: chrom source, name, chromStart, chromEnd, score, strand, attributes + elems = line.split("\t") + start = str(int(elems[3]) - 1) + coords = [int(start), int(elems[4])] + strand = elems[6] + if strand not in ["+", "-"]: + strand = "+" + attributes = parse_gff_attributes(elems[8]) + t_id = attributes.get("transcript_id", None) + + if not t_id: + # + # No transcript ID, so write last transcript and write current line as its own line. + # + + # Write previous transcript. + if cur_transcript_id: + # Write BED entry. + out.write( + get_bed_line( + cur_transcript_chrome, + cur_transcript_id, + cur_transcript_strand, + cur_transcripts_blocks, + ) + ) + + # Replace any spaces in the name with underscores so UCSC will not complain. + name = elems[2].replace(" ", "_") + out.write(get_bed_line(elems[0], name, strand, [coords])) + continue + + # There is a transcript ID, so process line at transcript level. + if t_id == cur_transcript_id: + # Line is element of transcript and will be a block in the BED entry. + cur_transcripts_blocks.append(coords) + continue + + # + # Line is part of new transcript; write previous transcript and start + # new transcript. + # + + # Write previous transcript. + if cur_transcript_id: + # Write BED entry. + out.write( + get_bed_line( + cur_transcript_chrome, cur_transcript_id, cur_transcript_strand, cur_transcripts_blocks + ) + ) + + # Start new transcript. + cur_transcript_chrome = elems[0] + cur_transcript_id = t_id + cur_transcript_strand = strand + cur_transcripts_blocks = [] + cur_transcripts_blocks.append(coords) + except Exception: + skipped_lines += 1 + if not first_skipped_line: + first_skipped_line = i + 1 + else: + skipped_lines += 1 + if not first_skipped_line: + first_skipped_line = i + 1 + + # Write last transcript. + if cur_transcript_id: + # Write BED entry. + out.write( + get_bed_line(cur_transcript_chrome, cur_transcript_id, cur_transcript_strand, cur_transcripts_blocks) + ) + info_msg = "%i lines converted to BED. " % (i + 1 - skipped_lines) + if skipped_lines > 0: + info_msg += "Skipped %d blank/comment/invalid lines starting with line #%d." % ( + skipped_lines, + first_skipped_line, + ) + print(info_msg) + + +if __name__ == "__main__": + __main__()