comparison gff_to_bed_converter.py @ 0:696e702ebf74 draft

"planemo upload commit 0f6eca49bafc3c946189d793161a7f81d595e1a1-dirty"
author petr-novak
date Mon, 09 May 2022 08:26:30 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:696e702ebf74
1 #!/usr/bin/env python
2 from __future__ import print_function
3
4 import sys
5
6 from galaxy.datatypes.util.gff_util import parse_gff_attributes
7
8
9 def get_bed_line(chrom, name, strand, blocks):
10 """Returns a BED line for given data."""
11
12 if len(blocks) == 1:
13 # Use simple BED format if there is only a single block:
14 # chrom, chromStart, chromEnd, name, score, strand
15 #
16 start, end = blocks[0]
17 return "%s\t%i\t%i\t%s\t0\t%s\n" % (chrom, start, end, name, strand)
18
19 #
20 # Build lists for transcript blocks' starts, sizes.
21 #
22
23 # Get transcript start, end.
24 t_start = sys.maxsize
25 t_end = -1
26 for block_start, block_end in blocks:
27 if block_start < t_start:
28 t_start = block_start
29 if block_end > t_end:
30 t_end = block_end
31
32 # Get block starts, sizes.
33 block_starts = []
34 block_sizes = []
35 for block_start, block_end in blocks:
36 block_starts.append(str(block_start - t_start))
37 block_sizes.append(str(block_end - block_start))
38
39 #
40 # Create BED entry.
41 # Bed format: chrom, chromStart, chromEnd, name, score, strand, \
42 # thickStart, thickEnd, itemRgb, blockCount, blockSizes, blockStarts
43 #
44 # Render complete feature with thick blocks. There's no clear way to do this unless
45 # we analyze the block names, but making everything thick makes more sense than
46 # making everything thin.
47 #
48 return "%s\t%i\t%i\t%s\t0\t%s\t%i\t%i\t0\t%i\t%s\t%s\n" % (
49 chrom,
50 t_start,
51 t_end,
52 name,
53 strand,
54 t_start,
55 t_end,
56 len(block_starts),
57 ",".join(block_sizes),
58 ",".join(block_starts),
59 )
60
61
62 def __main__():
63 input_name = sys.argv[1]
64 output_name = sys.argv[2]
65 skipped_lines = 0
66 first_skipped_line = 0
67 i = 0
68 cur_transcript_chrome = None
69 cur_transcript_id = None
70 cur_transcript_strand = None
71 cur_transcripts_blocks = [] # (start, end) for each block.
72 with open(output_name, "w") as out, open(input_name) as in_fh:
73 for i, line in enumerate(in_fh):
74 line = line.rstrip("\r\n")
75 if line and not line.startswith("#"):
76 try:
77 # GFF format: chrom source, name, chromStart, chromEnd, score, strand, attributes
78 elems = line.split("\t")
79 start = str(int(elems[3]) - 1)
80 coords = [int(start), int(elems[4])]
81 strand = elems[6]
82 if strand not in ["+", "-"]:
83 strand = "+"
84 attributes = parse_gff_attributes(elems[8])
85 t_id = attributes.get("transcript_id", None)
86
87 if not t_id:
88 #
89 # No transcript ID, so write last transcript and write current line as its own line.
90 #
91
92 # Write previous transcript.
93 if cur_transcript_id:
94 # Write BED entry.
95 out.write(
96 get_bed_line(
97 cur_transcript_chrome,
98 cur_transcript_id,
99 cur_transcript_strand,
100 cur_transcripts_blocks,
101 )
102 )
103
104 # Replace any spaces in the name with underscores so UCSC will not complain.
105 name = elems[2].replace(" ", "_")
106 out.write(get_bed_line(elems[0], name, strand, [coords]))
107 continue
108
109 # There is a transcript ID, so process line at transcript level.
110 if t_id == cur_transcript_id:
111 # Line is element of transcript and will be a block in the BED entry.
112 cur_transcripts_blocks.append(coords)
113 continue
114
115 #
116 # Line is part of new transcript; write previous transcript and start
117 # new transcript.
118 #
119
120 # Write previous transcript.
121 if cur_transcript_id:
122 # Write BED entry.
123 out.write(
124 get_bed_line(
125 cur_transcript_chrome, cur_transcript_id, cur_transcript_strand, cur_transcripts_blocks
126 )
127 )
128
129 # Start new transcript.
130 cur_transcript_chrome = elems[0]
131 cur_transcript_id = t_id
132 cur_transcript_strand = strand
133 cur_transcripts_blocks = []
134 cur_transcripts_blocks.append(coords)
135 except Exception:
136 skipped_lines += 1
137 if not first_skipped_line:
138 first_skipped_line = i + 1
139 else:
140 skipped_lines += 1
141 if not first_skipped_line:
142 first_skipped_line = i + 1
143
144 # Write last transcript.
145 if cur_transcript_id:
146 # Write BED entry.
147 out.write(
148 get_bed_line(cur_transcript_chrome, cur_transcript_id, cur_transcript_strand, cur_transcripts_blocks)
149 )
150 info_msg = "%i lines converted to BED. " % (i + 1 - skipped_lines)
151 if skipped_lines > 0:
152 info_msg += "Skipped %d blank/comment/invalid lines starting with line #%d." % (
153 skipped_lines,
154 first_skipped_line,
155 )
156 print(info_msg)
157
158
159 if __name__ == "__main__":
160 __main__()