13
|
1 #!/usr/bin/env python
|
|
2 import argparse
|
|
3 from collections import defaultdict
|
|
4
|
|
5 """
|
|
6 This utility converts a SAM file to a nucmer delta file.
|
|
7
|
|
8 SAM files must contain an NM tag, which is default in minimap2 alignments.
|
|
9 """
|
|
10
|
|
11
|
|
12 class SAMAlignment:
|
|
13
|
|
14 def __init__(self, in_ref_header, in_query_header, in_ref_start, in_cigar, in_flag, in_seq_len, in_num_mismatches):
|
|
15 self.num_mismatches = in_num_mismatches
|
|
16 self.seq_len = in_seq_len
|
|
17 self.flag = int(in_flag)
|
|
18 self.is_rc = False
|
|
19
|
|
20 # Check if the query was reverse complemented
|
|
21 if self.flag & 16:
|
|
22 self.is_rc = True
|
|
23
|
|
24 self.cigar = in_cigar
|
|
25 self.parsed_cigar = []
|
|
26 self._parse_cigar()
|
|
27
|
|
28 if self.is_rc:
|
|
29 self.parsed_cigar = self.parsed_cigar[::-1]
|
|
30
|
|
31 self.ref_header = in_ref_header
|
|
32 self.query_header = in_query_header
|
|
33 self.ref_start = int(in_ref_start)
|
|
34 self.ref_end = None
|
|
35 self.query_start = None
|
|
36 self.query_end = None
|
|
37 self.query_aln_len = 0
|
|
38 self.ref_aln_len = 0
|
|
39
|
|
40 num_hard_clipped = 0
|
|
41 for i in self.parsed_cigar:
|
|
42 if i[-1] == 'H':
|
|
43 num_hard_clipped += int(i[:-1])
|
|
44
|
|
45 if self.seq_len == 1:
|
|
46 self.seq_len = 0
|
|
47 for i in self.parsed_cigar:
|
|
48 if i[-1] == 'M' or i[-1] == 'I' or i[-1] == 'S' or i[-1] == 'X' or i[-1] == '=':
|
|
49 self.seq_len += int(i[:-1])
|
|
50
|
|
51 self.query_len = self.seq_len + num_hard_clipped
|
|
52
|
|
53 self._get_query_start()
|
|
54 self._get_query_aln_len()
|
|
55 self._get_ref_aln_len()
|
|
56 self._get_query_end()
|
|
57 self._get_ref_end()
|
|
58
|
|
59 if self.is_rc:
|
|
60 self.query_start, self.query_end = self.query_end, self.query_start
|
|
61 # Lazily taking care of off-by-one errors
|
|
62 self.query_end += 1
|
|
63 self.query_start -= 1
|
|
64
|
|
65 self.query_end -= 1
|
|
66 self.ref_end -= 1
|
|
67
|
|
68 def __str__(self):
|
|
69 return ' '.join([self.ref_header, self.query_header])
|
|
70
|
|
71 def __repr__(self):
|
|
72 return '<' + ' '.join([self.ref_header, self.query_header]) + '>'
|
|
73
|
|
74 def _get_query_start(self):
|
|
75 """
|
|
76 An alignment will either start with soft clipping, hard clipping, or with one of the alignment
|
|
77 codes (e.g. Match/Mismatch (M) or InDel (I/D)). If hard or soft clipped, the start of the alignment is
|
|
78 the very next base after clipping. If the alignment has commenced, then the start of the alignment is 1.
|
|
79
|
|
80 Notes:
|
|
81 At least for now, I am only specifying this for minimap2 output, which I believe only has the following
|
|
82 characters in cigar strings:
|
|
83 {'I', 'S', 'M', 'D', 'H'}
|
|
84 so I will only handle these cases.
|
|
85 """
|
|
86 if self.parsed_cigar[0][-1] == 'I' or self.parsed_cigar[0][-1] == 'D' or self.parsed_cigar[0][-1] == 'M':
|
|
87 self.query_start = 1
|
|
88 else:
|
|
89 # The alignment has started with either soft or hard clipping
|
|
90 # Need to double check if a 1 needs to be added here
|
|
91 self.query_start = int(self.parsed_cigar[0][:-1]) + 1
|
|
92
|
|
93 def _get_query_aln_len(self):
|
|
94 """ Just the addition of all cigar codes which "consume" the query (M, I)"""
|
|
95 for i in self.parsed_cigar:
|
|
96 if i[-1] == 'M' or i[-1] == 'I':
|
|
97 self.query_aln_len += int(i[:-1])
|
|
98
|
|
99 def _get_ref_aln_len(self):
|
|
100 for i in self.parsed_cigar:
|
|
101 if i[-1] == 'M' or i[-1] == 'D':
|
|
102 self.ref_aln_len += int(i[:-1])
|
|
103
|
|
104 def _get_query_end(self):
|
|
105 """ This is the length of the alignment + query start """
|
|
106 self.query_end = self.query_start + self.query_aln_len
|
|
107
|
|
108 def _get_ref_end(self):
|
|
109 """ This is the length of the alignment + query start """
|
|
110 self.ref_end = self.ref_start + self.ref_aln_len
|
|
111
|
|
112 def _parse_cigar(self):
|
|
113 cigar_chars = {
|
|
114 'M',
|
|
115 'I',
|
|
116 'D',
|
|
117 'N',
|
|
118 'S',
|
|
119 'H',
|
|
120 'P',
|
|
121 '=',
|
|
122 'X'
|
|
123 }
|
|
124
|
|
125 this_field = ''
|
|
126 for char in self.cigar:
|
|
127 this_field += char
|
|
128 if char in cigar_chars:
|
|
129 self.parsed_cigar.append(this_field)
|
|
130 this_field = ''
|
|
131
|
|
132
|
|
133 def write_delta(in_alns, in_file_name):
|
|
134 with open(in_file_name, 'w') as f:
|
|
135 f.write('file1 file2\n')
|
|
136 f.write('NUCMER\n')
|
|
137 for aln in in_alns.keys():
|
|
138 query_len = in_alns[aln][0].query_len
|
|
139 #print (query_len)
|
|
140 f.write('>%s %s %r %r\n' % (aln[0], aln[1], ref_chr_lens[aln[0]], query_len))
|
|
141 for i in in_alns[aln]:
|
|
142 f.write('%r %r %r %r %r %r %r\n' % (
|
|
143 i.ref_start,
|
|
144 i.ref_end,
|
|
145 i.query_start,
|
|
146 i.query_end,
|
|
147 i.num_mismatches,
|
|
148 i.num_mismatches,
|
|
149 0
|
|
150 ))
|
|
151 # Continue with the cigar string
|
|
152 offsets = []
|
|
153 cigar = i.parsed_cigar
|
|
154 if cigar[0][-1] == 'S' or cigar[0][-1] == 'H':
|
|
155 cigar = cigar[1:-1]
|
|
156 else:
|
|
157 cigar = cigar[:-1]
|
|
158
|
|
159 counter = 1
|
|
160 for code in cigar:
|
|
161 if code[-1] == 'M':
|
|
162 counter += int(code[:-1])
|
|
163 elif code[-1] == 'D':
|
|
164 offsets.append(counter)
|
|
165 num_I = int(code[:-1])
|
|
166 for i in range(1, num_I):
|
|
167 offsets.append(1)
|
|
168 counter = 1
|
|
169 elif code[-1] == 'I':
|
|
170 offsets.append(-1*counter)
|
|
171 num_I = int(code[:-1])
|
|
172 for i in range(1, num_I):
|
|
173 offsets.append(-1)
|
|
174 counter = 1
|
|
175 else:
|
|
176 raise ValueError('Unexpected CIGAR code')
|
|
177 offsets.append(0)
|
|
178 offsets = [str(i) for i in offsets]
|
|
179 f.write('\n'.join(offsets) + '\n')
|
|
180
|
|
181
|
|
182 parser = argparse.ArgumentParser(description='Convert a SAM file to a nucmer delta file.')
|
|
183 parser.add_argument("sam_file", metavar="<alns.sam>", type=str, help="SAM file to convert")
|
|
184
|
|
185 args = parser.parse_args()
|
|
186 sam_file = args.sam_file
|
|
187
|
|
188 # Make a dictionary storing all alignments between and query and a reference sequence.
|
|
189 # key = (reference_header, query_header)
|
|
190 # value = list of alignments between those sequences. only store info needed for the delta file
|
|
191
|
|
192 alns = defaultdict(list)
|
|
193
|
|
194 # Read through the sam file
|
|
195 ref_chr_lens = dict()
|
|
196 with open(sam_file) as f:
|
|
197 for line in f:
|
|
198 # Skip SAM headers
|
|
199 if line.startswith('@SQ'):
|
|
200 header_list = line.split('\t')
|
|
201 chrom = header_list[1].replace('SN:', '')
|
|
202 chr_len = int(header_list[2].replace('LN:', ''))
|
|
203 ref_chr_lens[chrom] = chr_len
|
|
204 continue
|
|
205
|
|
206 if line.startswith('@'):
|
|
207 continue
|
|
208
|
|
209 fields = line.split('\t')
|
|
210 ref_header = fields[2]
|
|
211 query_header = fields[0]
|
|
212 ref_start = fields[3]
|
|
213 cigar = fields[5]
|
|
214 flag = fields[1]
|
|
215 seq_len = len(fields[9])
|
|
216
|
|
217 if not cigar == '*':
|
|
218 # Get the NM flag
|
|
219 nm = None
|
|
220 for i in fields:
|
|
221 if i.startswith('NM:i'):
|
|
222 nm = int(i[5:])
|
|
223 continue
|
|
224
|
|
225 if nm is None:
|
|
226 raise ValueError('SAM file must include NM tag.')
|
|
227
|
|
228 x = SAMAlignment(
|
|
229 ref_header,
|
|
230 query_header,
|
|
231 ref_start,
|
|
232 cigar,
|
|
233 flag,
|
|
234 seq_len,
|
|
235 nm
|
|
236 )
|
|
237 alns[(ref_header, query_header)].append(x)
|
|
238
|
|
239 write_delta(alns, sam_file + '.delta')
|