Mercurial > repos > yufei-luo > s_mart
view commons/core/parsing/BamParser.py @ 41:e57682cd6997
Uploaded
author | m-zytnicki |
---|---|
date | Thu, 30 May 2013 03:04:47 -0400 |
parents | 769e306b7933 |
children |
line wrap: on
line source
# # Copyright INRA-URGI 2009-2012 # # This software is governed by the CeCILL license under French law and # abiding by the rules of distribution of free software. You can use, # modify and/ or redistribute the software under the terms of the CeCILL # license as circulated by CEA, CNRS and INRIA at the following URL # "http://www.cecill.info". # # As a counterpart to the access to the source code and rights to copy, # modify and redistribute granted by the license, users are provided only # with a limited warranty and the software's author, the holder of the # economic rights, and the successive licensors have only limited # liability. # # In this respect, the user's attention is drawn to the risks associated # with loading, using, modifying and/or developing or reproducing the # software by the user in light of its specific status of free software, # that may mean that it is complicated to manipulate, and that also # therefore means that it is reserved for developers and experienced # professionals having in-depth computer knowledge. Users are therefore # encouraged to load and test the software's suitability as regards their # requirements in conditions enabling the security of their systems and/or # data to be ensured and, more generally, to use and operate it in the # same conditions as regards security. # # The fact that you are presently reading this means that you have had # knowledge of the CeCILL license and that you accept its terms. # import re, sys, gzip, struct from commons.core.parsing.MapperParser import MapperParser from SMART.Java.Python.structure.Mapping import Mapping from SMART.Java.Python.structure.SubMapping import SubMapping from SMART.Java.Python.structure.Interval import Interval BAM_DNA_LOOKUP = "=ACMGRSVTWYHKDBN" BAM_CIGAR_LOOKUP = "MIDNSHP=X" BAM_CIGAR_SHIFT = 4 BAM_CIGAR_MASK = ((1 << BAM_CIGAR_SHIFT) - 1) def pack_int32(x): return struct.pack('<i', x) def pack_uint32(x): return struct.pack('<I', x) def unpack_int8(x): return struct.unpack('<b', x)[0] def unpack_int16(x): return struct.unpack('<h', x)[0] def unpack_int32(x): return struct.unpack('<i', x)[0] def unpack_int64(x): return struct.unpack('<q', x)[0] def unpack_uint8(x): return struct.unpack('<B', x)[0] def unpack_uint16(x): return struct.unpack('<H', x)[0] def unpack_uint32(x): return struct.unpack('<I', x)[0] def unpack_uint64(x): return struct.unpack('<Q', x)[0] def unpack_float(x): return struct.unpack('<f', x)[0] def unpack_string(x): length = len(x) format_string = "<{0}s".format(length) string = struct.unpack(format_string, x)[0] if string[-1] == '\0': return string[:-1] else: return string BAM_TAG_CODE = {"c": unpack_int8, \ "C": unpack_uint8, \ "s": unpack_int16, \ "S": unpack_uint16, \ "i": unpack_int32, \ "I": unpack_uint32, \ "f": unpack_float, \ #"A": unpack_int8, \ "A": lambda x: x, \ "Z": unpack_int8, \ "H": unpack_int8} BAM_TAG_VALUE = {"c": int, \ "C": int, \ "s": int, \ "S": int, \ "i": int, \ "I": int, \ "f": float, \ "A": lambda x: x} BAM_TAG_SIZE = {"c": 1, \ "C": 1, \ "s": 2, \ "S": 2, \ "i": 4, \ "I": 4, \ "f": 4, \ "A": 1} class CigarOp(object): def __init__(self, data): self._length = data >> BAM_CIGAR_SHIFT self._type = BAM_CIGAR_LOOKUP[ data & BAM_CIGAR_MASK ] class CigarData(object): def __init__(self, data, num_ops): self._ops = [] for i in range(num_ops): cigar_data = unpack_uint32(data[i*4: (i+1)*4]) self._ops.append(CigarOp(cigar_data)) def getCigarData(self): return self._ops def __str__(self): return "".join(["%d%s" % (op._length, op._type) for op in self._ops]) class TagsData(object): def __init__(self): self._tags = {} def add(self, tag): self._tags[tag._tag] = tag def getTags(self): return self._tags def __str__(self): return " ".join([self._tags[tag] for tag in sorted(self._tags.keys())]) class TagData(object): def __init__(self, tag, type, value): self._tag = tag self._type = type self._value = value def __str__(self): if self._type in "AZHB": return "%s:%s:%s" % (self._tag, self._type, self._value) if self._type in "cCsSiI": return "%s:%s:%d" % (self._tag, self._type, self._value) return "%s:%s:%f" % (self._tag, self._type, self._value) class TagParser(object): def __init__(self, data): self._data = data self._tags = TagsData() self._parse() def _parse(self): while self._data: tag = "%s%s" % (chr(unpack_int8(self._data[0])), chr(unpack_int8(self._data[1]))) type = chr(unpack_int8(self._data[2])) self._data = self._data[3:] if type in BAM_TAG_VALUE: value = self._parseUnique(type) elif type == "Z": value = self._parseString() elif type == "H": size = unpack_int8(self._data[0]) self._data = self._data[1:] value = self._parseSeveral("C", size) elif type == "B": secondType = unpack_int8(self._data[0]) size = unpack_int8(self._data[1]) + unpack_int8(self._data[2]) * 16 + unpack_int8(self._data[3]) * 16 * 16 + unpack_int8(self._data[4]) * 16 * 16 * 16 self._data = self._data[5:] value = self._parseSeveral(secondType, size) else: raise Exception("Cannot parse type '%s'." % (type)) fullTag = TagData(tag, type, value) self._tags.add(fullTag) def _parseUnique(self, type): value = BAM_TAG_CODE[type](self._data[:BAM_TAG_SIZE[type]]) self._data = self._data[BAM_TAG_SIZE[type]:] return value def _parseSeveral(self, type, size): value = [] for i in range(size): value.append(self._parseUnique(type)) return value def _parseString(self): value = "" char = self._data[0] self._data = self._data[1:] while unpack_int8(char) != 0: value += char char = self._data[0] self._data = self._data[1:] return value def getTags(self): return self._tags.getTags() def __str__(self): return self._tags class AlignedRead(object): def __init__(self, data, refs): self._data = data self._refs = refs def parse(self): self._parse_common() self._parse_flag_nc() self._parse_bin_mq_nl() self._parse_name() self._parse_cigar() self._parse_sequence() self._parse_quality() self._parse_tags() def _parse_common(self): ref_id = unpack_int32(self._data[0:4]) self._chromosome = self._refs[ref_id] self._pos = unpack_int32(self._data[4:8]) + 1 mate_ref_id = unpack_int32(self._data[20:24]) if mate_ref_id == -1: self._rnext = "*" else: self._rnext = self._refs[mate_ref_id] if self._rnext == self._chromosome: self._rnext = "=" self._pnext = unpack_int32(self._data[24:28]) + 1 self._tlen = unpack_int32(self._data[28:32]) def _parse_bin_mq_nl(self): bin_mq_nl = unpack_uint32(self._data[8:12]) self._bin = bin_mq_nl >> 16 self._mappingQuality = bin_mq_nl >> 8 & 0xff self._query_name_length = bin_mq_nl & 0xff def _parse_flag_nc(self): flag_nc = unpack_uint32(self._data[12:16]) self._flag = flag_nc >> 16 self._num_cigar_ops = flag_nc & 0xffff def _parse_name(self): start = 32 stop = start + self._query_name_length self._name = unpack_string(self._data[start:stop]) def _parse_cigar(self): start = 32 + self._query_name_length stop = start + (self._num_cigar_ops * 4) _buffer = self._data[start:stop] cigar = CigarData(_buffer, self._num_cigar_ops) self._cigar = cigar.getCigarData() def _parse_sequence(self): seq_length = unpack_int32(self._data[16:20]) start = 32 + self._query_name_length + (self._num_cigar_ops * 4) stop = start + (seq_length + 1) / 2 _buffer = self._data[start:stop] self._sequence = "" for i in range(seq_length): x = unpack_uint8(_buffer[(i / 2)]) index = (x >> (4 * (1 - (i % 2)))) & 0xf base = BAM_DNA_LOOKUP[index] self._sequence += base def _parse_quality(self): seq_length = unpack_int32(self._data[16:20]) start = 32 + self._query_name_length + (self._num_cigar_ops * 4) + (seq_length + 1) / 2 stop = start + seq_length _buffer = self._data[start:stop] self._quality = "".join(["%s" % (chr(unpack_int8(x) + 33)) for x in _buffer]) def _parse_tags(self): seq_length = unpack_int32(self._data[16:20]) start = 32 + self._query_name_length + (self._num_cigar_ops * 4) + (seq_length + 1) / 2 + (seq_length + 1) - 1 stop = start + seq_length _buffer = self._data[start:] tagParser = TagParser(_buffer) self._tags = tagParser.getTags() class FileReader(object): def __init__(self, handle): self._handle = handle self._readHeader() def _readHeader(self): magic = unpack_string(self._handle.read(4)) if magic != "BAM\1": raise Exception("File should start with 'BAM\1', starting with '%s' instead." % (magic)) tlen = unpack_int32(self._handle.read(4)) text = unpack_string(self._handle.read(tlen)) nrefs = unpack_int32(self._handle.read(4)) self._refs = [] for i in range(nrefs): sizeName = unpack_int32(self._handle.read(4)) name = unpack_string(self._handle.read(sizeName)) size = unpack_int32(self._handle.read(4)) self._refs.append(name) self._startPos = self._handle.tell() def reset(self): self._handle.seek(self._startPos) def getNextAlignment(self): try: blockSize = unpack_int32(self._handle.read(4)) except struct.error: return False block = self._handle.read(blockSize) currentRead = AlignedRead(block, self._refs) return currentRead def parseAlignedRead(read): if (read._flag & 0x4) == 0x4: return None mapping = Mapping() direction = 1 if (read._flag & 0x10) == 0x0 else -1 genomeStart = read._pos nbOccurrences = 1 nbMismatches = 0 nbMatches = 0 nbGaps = 0 subMapping = None queryOffset = 0 targetOffset = 0 readStart = None for tag, value in read._tags.iteritems(): if tag == "X0": nbOccurrences = value._value elif tag == "X1": nbOccurrences += value._value elif tag == "XM": nbMismatches = value._value mapping.setTagValue("nbOccurrences", nbOccurrences) mapping.setTagValue("quality", read._mappingQuality) for operation in read._cigar: if operation._type == "M": if readStart == None: readStart = queryOffset if subMapping == None: subMapping = SubMapping() subMapping.setSize(operation._length) subMapping.setDirection(direction) subMapping.queryInterval.setName(read._name) subMapping.queryInterval.setStart(queryOffset) subMapping.queryInterval.setDirection(direction) subMapping.targetInterval.setChromosome(read._chromosome) subMapping.targetInterval.setStart(genomeStart + targetOffset) subMapping.targetInterval.setDirection(1) nbMatches += operation._length targetOffset += operation._length queryOffset += operation._length currentNumber = 0 continue if operation._type == "I": nbGaps += 1 queryOffset += operation._length currentNumber = 0 continue if operation._type == "D": if subMapping != None: subMapping.queryInterval.setEnd(queryOffset - 1) subMapping.targetInterval.setEnd(genomeStart + targetOffset - 1) mapping.addSubMapping(subMapping) subMapping = None nbGaps += 1 targetOffset += operation._length currentNumber = 0 continue if operation._type == "N": if subMapping != None: subMapping.queryInterval.setEnd(queryOffset - 1) subMapping.targetInterval.setEnd(genomeStart + targetOffset - 1) mapping.addSubMapping(subMapping) subMapping = None targetOffset += operation._length currentNumber = 0 continue if operation._type == "S": nbMismatches += operation._length targetOffset += operation._length queryOffset += operation._length currentNumber = 0 continue if operation._type == "H": targetOffset += operation._length queryOffset += operation._length currentNumber = 0 continue if operation._type == "P": continue raise Exception("Do not understand parameter '%s'" % (operation._type)) if subMapping != None: subMapping.queryInterval.setEnd(queryOffset - 1) subMapping.targetInterval.setEnd(genomeStart + targetOffset - 1) mapping.addSubMapping(subMapping) mapping.queryInterval.setStart(readStart) mapping.queryInterval.setEnd(queryOffset - 1) mapping.targetInterval.setEnd(genomeStart + targetOffset - 1) mapping.setNbMismatches(nbMismatches) mapping.setNbGaps(nbGaps) mapping.queryInterval.setName(read._name) mapping.queryInterval.setDirection(direction) mapping.targetInterval.setChromosome(read._chromosome) mapping.targetInterval.setStart(genomeStart) mapping.targetInterval.setDirection(direction) mapping.setSize(len(read._sequence)) mapping.setDirection(direction) return mapping class BamParser(MapperParser): """A class that parses BAM format""" def __init__(self, fileName, verbosity = 0): self.verbosity = verbosity self.handle = gzip.open(fileName, "rb") self.reader = FileReader(self.handle) self.nbMappings = None self.fileName = fileName def __del__(self): self.handle.close() def getFileFormats(): return ["bam"] getFileFormats = staticmethod(getFileFormats) def reset(self): self.reader.reset() def getNextMapping(self): self.currentMapping = None while self.currentMapping == None: read = self.reader.getNextAlignment() if not read: self.currentMapping = False return False read.parse() self.currentMapping = parseAlignedRead(read) return self.currentMapping def setDefaultTagValue(self, name, value): pass def skipFirstLines(self): pass