Mercurial > repos > dereeper > ragoo
comparison RaGOO/sam2delta.py @ 13:b9a3aeb162ab draft default tip
Uploaded
author | dereeper |
---|---|
date | Mon, 26 Jul 2021 18:22:37 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
12:68a9ec9ce51e | 13:b9a3aeb162ab |
---|---|
1 #!/usr/bin/env python | |
2 import argparse | |
3 from collections import defaultdict | |
4 | |
5 """ | |
6 This utility converts a SAM file to a nucmer delta file. | |
7 | |
8 SAM files must contain an NM tag, which is default in minimap2 alignments. | |
9 """ | |
10 | |
11 | |
12 class SAMAlignment: | |
13 | |
14 def __init__(self, in_ref_header, in_query_header, in_ref_start, in_cigar, in_flag, in_seq_len, in_num_mismatches): | |
15 self.num_mismatches = in_num_mismatches | |
16 self.seq_len = in_seq_len | |
17 self.flag = int(in_flag) | |
18 self.is_rc = False | |
19 | |
20 # Check if the query was reverse complemented | |
21 if self.flag & 16: | |
22 self.is_rc = True | |
23 | |
24 self.cigar = in_cigar | |
25 self.parsed_cigar = [] | |
26 self._parse_cigar() | |
27 | |
28 if self.is_rc: | |
29 self.parsed_cigar = self.parsed_cigar[::-1] | |
30 | |
31 self.ref_header = in_ref_header | |
32 self.query_header = in_query_header | |
33 self.ref_start = int(in_ref_start) | |
34 self.ref_end = None | |
35 self.query_start = None | |
36 self.query_end = None | |
37 self.query_aln_len = 0 | |
38 self.ref_aln_len = 0 | |
39 | |
40 num_hard_clipped = 0 | |
41 for i in self.parsed_cigar: | |
42 if i[-1] == 'H': | |
43 num_hard_clipped += int(i[:-1]) | |
44 | |
45 if self.seq_len == 1: | |
46 self.seq_len = 0 | |
47 for i in self.parsed_cigar: | |
48 if i[-1] == 'M' or i[-1] == 'I' or i[-1] == 'S' or i[-1] == 'X' or i[-1] == '=': | |
49 self.seq_len += int(i[:-1]) | |
50 | |
51 self.query_len = self.seq_len + num_hard_clipped | |
52 | |
53 self._get_query_start() | |
54 self._get_query_aln_len() | |
55 self._get_ref_aln_len() | |
56 self._get_query_end() | |
57 self._get_ref_end() | |
58 | |
59 if self.is_rc: | |
60 self.query_start, self.query_end = self.query_end, self.query_start | |
61 # Lazily taking care of off-by-one errors | |
62 self.query_end += 1 | |
63 self.query_start -= 1 | |
64 | |
65 self.query_end -= 1 | |
66 self.ref_end -= 1 | |
67 | |
68 def __str__(self): | |
69 return ' '.join([self.ref_header, self.query_header]) | |
70 | |
71 def __repr__(self): | |
72 return '<' + ' '.join([self.ref_header, self.query_header]) + '>' | |
73 | |
74 def _get_query_start(self): | |
75 """ | |
76 An alignment will either start with soft clipping, hard clipping, or with one of the alignment | |
77 codes (e.g. Match/Mismatch (M) or InDel (I/D)). If hard or soft clipped, the start of the alignment is | |
78 the very next base after clipping. If the alignment has commenced, then the start of the alignment is 1. | |
79 | |
80 Notes: | |
81 At least for now, I am only specifying this for minimap2 output, which I believe only has the following | |
82 characters in cigar strings: | |
83 {'I', 'S', 'M', 'D', 'H'} | |
84 so I will only handle these cases. | |
85 """ | |
86 if self.parsed_cigar[0][-1] == 'I' or self.parsed_cigar[0][-1] == 'D' or self.parsed_cigar[0][-1] == 'M': | |
87 self.query_start = 1 | |
88 else: | |
89 # The alignment has started with either soft or hard clipping | |
90 # Need to double check if a 1 needs to be added here | |
91 self.query_start = int(self.parsed_cigar[0][:-1]) + 1 | |
92 | |
93 def _get_query_aln_len(self): | |
94 """ Just the addition of all cigar codes which "consume" the query (M, I)""" | |
95 for i in self.parsed_cigar: | |
96 if i[-1] == 'M' or i[-1] == 'I': | |
97 self.query_aln_len += int(i[:-1]) | |
98 | |
99 def _get_ref_aln_len(self): | |
100 for i in self.parsed_cigar: | |
101 if i[-1] == 'M' or i[-1] == 'D': | |
102 self.ref_aln_len += int(i[:-1]) | |
103 | |
104 def _get_query_end(self): | |
105 """ This is the length of the alignment + query start """ | |
106 self.query_end = self.query_start + self.query_aln_len | |
107 | |
108 def _get_ref_end(self): | |
109 """ This is the length of the alignment + query start """ | |
110 self.ref_end = self.ref_start + self.ref_aln_len | |
111 | |
112 def _parse_cigar(self): | |
113 cigar_chars = { | |
114 'M', | |
115 'I', | |
116 'D', | |
117 'N', | |
118 'S', | |
119 'H', | |
120 'P', | |
121 '=', | |
122 'X' | |
123 } | |
124 | |
125 this_field = '' | |
126 for char in self.cigar: | |
127 this_field += char | |
128 if char in cigar_chars: | |
129 self.parsed_cigar.append(this_field) | |
130 this_field = '' | |
131 | |
132 | |
133 def write_delta(in_alns, in_file_name): | |
134 with open(in_file_name, 'w') as f: | |
135 f.write('file1 file2\n') | |
136 f.write('NUCMER\n') | |
137 for aln in in_alns.keys(): | |
138 query_len = in_alns[aln][0].query_len | |
139 #print (query_len) | |
140 f.write('>%s %s %r %r\n' % (aln[0], aln[1], ref_chr_lens[aln[0]], query_len)) | |
141 for i in in_alns[aln]: | |
142 f.write('%r %r %r %r %r %r %r\n' % ( | |
143 i.ref_start, | |
144 i.ref_end, | |
145 i.query_start, | |
146 i.query_end, | |
147 i.num_mismatches, | |
148 i.num_mismatches, | |
149 0 | |
150 )) | |
151 # Continue with the cigar string | |
152 offsets = [] | |
153 cigar = i.parsed_cigar | |
154 if cigar[0][-1] == 'S' or cigar[0][-1] == 'H': | |
155 cigar = cigar[1:-1] | |
156 else: | |
157 cigar = cigar[:-1] | |
158 | |
159 counter = 1 | |
160 for code in cigar: | |
161 if code[-1] == 'M': | |
162 counter += int(code[:-1]) | |
163 elif code[-1] == 'D': | |
164 offsets.append(counter) | |
165 num_I = int(code[:-1]) | |
166 for i in range(1, num_I): | |
167 offsets.append(1) | |
168 counter = 1 | |
169 elif code[-1] == 'I': | |
170 offsets.append(-1*counter) | |
171 num_I = int(code[:-1]) | |
172 for i in range(1, num_I): | |
173 offsets.append(-1) | |
174 counter = 1 | |
175 else: | |
176 raise ValueError('Unexpected CIGAR code') | |
177 offsets.append(0) | |
178 offsets = [str(i) for i in offsets] | |
179 f.write('\n'.join(offsets) + '\n') | |
180 | |
181 | |
182 parser = argparse.ArgumentParser(description='Convert a SAM file to a nucmer delta file.') | |
183 parser.add_argument("sam_file", metavar="<alns.sam>", type=str, help="SAM file to convert") | |
184 | |
185 args = parser.parse_args() | |
186 sam_file = args.sam_file | |
187 | |
188 # Make a dictionary storing all alignments between and query and a reference sequence. | |
189 # key = (reference_header, query_header) | |
190 # value = list of alignments between those sequences. only store info needed for the delta file | |
191 | |
192 alns = defaultdict(list) | |
193 | |
194 # Read through the sam file | |
195 ref_chr_lens = dict() | |
196 with open(sam_file) as f: | |
197 for line in f: | |
198 # Skip SAM headers | |
199 if line.startswith('@SQ'): | |
200 header_list = line.split('\t') | |
201 chrom = header_list[1].replace('SN:', '') | |
202 chr_len = int(header_list[2].replace('LN:', '')) | |
203 ref_chr_lens[chrom] = chr_len | |
204 continue | |
205 | |
206 if line.startswith('@'): | |
207 continue | |
208 | |
209 fields = line.split('\t') | |
210 ref_header = fields[2] | |
211 query_header = fields[0] | |
212 ref_start = fields[3] | |
213 cigar = fields[5] | |
214 flag = fields[1] | |
215 seq_len = len(fields[9]) | |
216 | |
217 if not cigar == '*': | |
218 # Get the NM flag | |
219 nm = None | |
220 for i in fields: | |
221 if i.startswith('NM:i'): | |
222 nm = int(i[5:]) | |
223 continue | |
224 | |
225 if nm is None: | |
226 raise ValueError('SAM file must include NM tag.') | |
227 | |
228 x = SAMAlignment( | |
229 ref_header, | |
230 query_header, | |
231 ref_start, | |
232 cigar, | |
233 flag, | |
234 seq_len, | |
235 nm | |
236 ) | |
237 alns[(ref_header, query_header)].append(x) | |
238 | |
239 write_delta(alns, sam_file + '.delta') |