Mercurial > repos > youngkim > ezbamqc
view ezBAMQC/src/htslib/cram/cram_io.c @ 8:82bb8c455761
Uploaded
author | cshl-bsr |
---|---|
date | Wed, 30 Mar 2016 12:03:10 -0400 |
parents | dfa3745e5fd8 |
children |
line wrap: on
line source
/* Copyright (c) 2012-2014 Genome Research Ltd. Author: James Bonfield <jkb@sanger.ac.uk> Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. Neither the names Genome Research Ltd and Wellcome Trust Sanger Institute nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY GENOME RESEARCH LTD AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GENOME RESEARCH LTD OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ /* * CRAM I/O primitives. * * - ITF8 encoding and decoding. * - Block based I/O * - Zlib inflating and deflating (memory) * - CRAM basic data structure reading and writing * - File opening / closing * - Reference sequence handling */ /* * TODO: BLOCK_GROW, BLOCK_RESIZE, BLOCK_APPEND and itf8_put_blk all need * a way to return errors for when malloc fails. */ #ifdef HAVE_CONFIG_H #include "io_lib_config.h" #endif #include <stdio.h> #include <errno.h> #include <assert.h> #include <stdlib.h> #include <string.h> #include <zlib.h> #ifdef HAVE_LIBBZ2 #include <bzlib.h> #endif #ifdef HAVE_LIBLZMA #include <lzma.h> #endif #include <sys/types.h> #include <sys/stat.h> #include <math.h> #include <ctype.h> #include "cram/cram.h" #include "cram/os.h" #include "cram/md5.h" #include "cram/open_trace_file.h" #include "cram/rANS_static.h" //#define REF_DEBUG #ifdef REF_DEBUG #include <sys/syscall.h> #define gettid() (int)syscall(SYS_gettid) #define RP(...) fprintf (stderr, __VA_ARGS__) #else #define RP(...) #endif #include "htslib/hfile.h" #include "htslib/bgzf.h" #include "htslib/faidx.h" #define TRIAL_SPAN 50 #define NTRIALS 3 /* ---------------------------------------------------------------------- * ITF8 encoding and decoding. * * Also see the itf8_get and itf8_put macros in cram_io.h */ /* * Reads an integer in ITF-8 encoding from 'cp' and stores it in * *val. * * Returns the number of bytes read on success * -1 on failure */ int itf8_decode(cram_fd *fd, int32_t *val_p) { static int nbytes[16] = { 0,0,0,0, 0,0,0,0, // 0000xxxx - 0111xxxx 1,1,1,1, // 1000xxxx - 1011xxxx 2,2, // 1100xxxx - 1101xxxx 3, // 1110xxxx 4, // 1111xxxx }; static int nbits[16] = { 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f, // 0000xxxx - 0111xxxx 0x3f, 0x3f, 0x3f, 0x3f, // 1000xxxx - 1011xxxx 0x1f, 0x1f, // 1100xxxx - 1101xxxx 0x0f, // 1110xxxx 0x0f, // 1111xxxx }; int32_t val = hgetc(fd->fp); if (val == -1) return -1; int i = nbytes[val>>4]; val &= nbits[val>>4]; switch(i) { case 0: *val_p = val; return 1; case 1: val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val; return 2; case 2: val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val; return 3; case 3: val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val; return 4; case 4: // really 3.5 more, why make it different? val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<4) | (((unsigned char)hgetc(fd->fp)) & 0x0f); *val_p = val; } return 5; } /* * Encodes and writes a single integer in ITF-8 format. * Returns 0 on success * -1 on failure */ int itf8_encode(cram_fd *fd, int32_t val) { char buf[5]; int len = itf8_put(buf, val); return hwrite(fd->fp, buf, len) == len ? 0 : -1; } #ifndef ITF8_MACROS /* * As above, but decoding from memory */ int itf8_get(char *cp, int32_t *val_p) { unsigned char *up = (unsigned char *)cp; if (up[0] < 0x80) { *val_p = up[0]; return 1; } else if (up[0] < 0xc0) { *val_p = ((up[0] <<8) | up[1]) & 0x3fff; return 2; } else if (up[0] < 0xe0) { *val_p = ((up[0]<<16) | (up[1]<< 8) | up[2]) & 0x1fffff; return 3; } else if (up[0] < 0xf0) { *val_p = ((up[0]<<24) | (up[1]<<16) | (up[2]<<8) | up[3]) & 0x0fffffff; return 4; } else { *val_p = ((up[0] & 0x0f)<<28) | (up[1]<<20) | (up[2]<<12) | (up[3]<<4) | (up[4] & 0x0f); return 5; } } /* * Stores a value to memory in ITF-8 format. * * Returns the number of bytes required to store the number. * This is a maximum of 5 bytes. */ int itf8_put(char *cp, int32_t val) { if (!(val & ~0x00000007f)) { // 1 byte *cp = val; return 1; } else if (!(val & ~0x00003fff)) { // 2 byte *cp++ = (val >> 8 ) | 0x80; *cp = val & 0xff; return 2; } else if (!(val & ~0x01fffff)) { // 3 byte *cp++ = (val >> 16) | 0xc0; *cp++ = (val >> 8 ) & 0xff; *cp = val & 0xff; return 3; } else if (!(val & ~0x0fffffff)) { // 4 byte *cp++ = (val >> 24) | 0xe0; *cp++ = (val >> 16) & 0xff; *cp++ = (val >> 8 ) & 0xff; *cp = val & 0xff; return 4; } else { // 5 byte *cp++ = 0xf0 | ((val>>28) & 0xff); *cp++ = (val >> 20) & 0xff; *cp++ = (val >> 12) & 0xff; *cp++ = (val >> 4 ) & 0xff; *cp = val & 0x0f; return 5; } } #endif /* 64-bit itf8 variant */ int ltf8_put(char *cp, int64_t val) { if (!(val & ~((1LL<<7)-1))) { *cp = val; return 1; } else if (!(val & ~((1LL<<(6+8))-1))) { *cp++ = (val >> 8 ) | 0x80; *cp = val & 0xff; return 2; } else if (!(val & ~((1LL<<(5+2*8))-1))) { *cp++ = (val >> 16) | 0xc0; *cp++ = (val >> 8 ) & 0xff; *cp = val & 0xff; return 3; } else if (!(val & ~((1LL<<(4+3*8))-1))) { *cp++ = (val >> 24) | 0xe0; *cp++ = (val >> 16) & 0xff; *cp++ = (val >> 8 ) & 0xff; *cp = val & 0xff; return 4; } else if (!(val & ~((1LL<<(3+4*8))-1))) { *cp++ = (val >> 32) | 0xf0; *cp++ = (val >> 24) & 0xff; *cp++ = (val >> 16) & 0xff; *cp++ = (val >> 8 ) & 0xff; *cp = val & 0xff; return 5; } else if (!(val & ~((1LL<<(2+5*8))-1))) { *cp++ = (val >> 40) | 0xf8; *cp++ = (val >> 32) & 0xff; *cp++ = (val >> 24) & 0xff; *cp++ = (val >> 16) & 0xff; *cp++ = (val >> 8 ) & 0xff; *cp = val & 0xff; return 6; } else if (!(val & ~((1LL<<(1+6*8))-1))) { *cp++ = (val >> 48) | 0xfc; *cp++ = (val >> 40) & 0xff; *cp++ = (val >> 32) & 0xff; *cp++ = (val >> 24) & 0xff; *cp++ = (val >> 16) & 0xff; *cp++ = (val >> 8 ) & 0xff; *cp = val & 0xff; return 7; } else if (!(val & ~((1LL<<(7*8))-1))) { *cp++ = (val >> 56) | 0xfe; *cp++ = (val >> 48) & 0xff; *cp++ = (val >> 40) & 0xff; *cp++ = (val >> 32) & 0xff; *cp++ = (val >> 24) & 0xff; *cp++ = (val >> 16) & 0xff; *cp++ = (val >> 8 ) & 0xff; *cp = val & 0xff; return 8; } else { *cp++ = 0xff; *cp++ = (val >> 56) & 0xff; *cp++ = (val >> 48) & 0xff; *cp++ = (val >> 40) & 0xff; *cp++ = (val >> 32) & 0xff; *cp++ = (val >> 24) & 0xff; *cp++ = (val >> 16) & 0xff; *cp++ = (val >> 8 ) & 0xff; *cp = val & 0xff; return 9; } } int ltf8_get(char *cp, int64_t *val_p) { unsigned char *up = (unsigned char *)cp; if (up[0] < 0x80) { *val_p = up[0]; return 1; } else if (up[0] < 0xc0) { *val_p = (((uint64_t)up[0]<< 8) | (uint64_t)up[1]) & (((1LL<<(6+8)))-1); return 2; } else if (up[0] < 0xe0) { *val_p = (((uint64_t)up[0]<<16) | ((uint64_t)up[1]<< 8) | (uint64_t)up[2]) & ((1LL<<(5+2*8))-1); return 3; } else if (up[0] < 0xf0) { *val_p = (((uint64_t)up[0]<<24) | ((uint64_t)up[1]<<16) | ((uint64_t)up[2]<< 8) | (uint64_t)up[3]) & ((1LL<<(4+3*8))-1); return 4; } else if (up[0] < 0xf8) { *val_p = (((uint64_t)up[0]<<32) | ((uint64_t)up[1]<<24) | ((uint64_t)up[2]<<16) | ((uint64_t)up[3]<< 8) | (uint64_t)up[4]) & ((1LL<<(3+4*8))-1); return 5; } else if (up[0] < 0xfc) { *val_p = (((uint64_t)up[0]<<40) | ((uint64_t)up[1]<<32) | ((uint64_t)up[2]<<24) | ((uint64_t)up[3]<<16) | ((uint64_t)up[4]<< 8) | (uint64_t)up[5]) & ((1LL<<(2+5*8))-1); return 6; } else if (up[0] < 0xfe) { *val_p = (((uint64_t)up[0]<<48) | ((uint64_t)up[1]<<40) | ((uint64_t)up[2]<<32) | ((uint64_t)up[3]<<24) | ((uint64_t)up[4]<<16) | ((uint64_t)up[5]<< 8) | (uint64_t)up[6]) & ((1LL<<(1+6*8))-1); return 7; } else if (up[0] < 0xff) { *val_p = (((uint64_t)up[1]<<48) | ((uint64_t)up[2]<<40) | ((uint64_t)up[3]<<32) | ((uint64_t)up[4]<<24) | ((uint64_t)up[5]<<16) | ((uint64_t)up[6]<< 8) | (uint64_t)up[7]) & ((1LL<<(7*8))-1); return 8; } else { *val_p = (((uint64_t)up[1]<<56) | ((uint64_t)up[2]<<48) | ((uint64_t)up[3]<<40) | ((uint64_t)up[4]<<32) | ((uint64_t)up[5]<<24) | ((uint64_t)up[6]<<16) | ((uint64_t)up[7]<< 8) | (uint64_t)up[8]); return 9; } } int ltf8_decode(cram_fd *fd, int64_t *val_p) { int c = hgetc(fd->fp); int64_t val = (unsigned char)c; if (c == -1) return -1; if (val < 0x80) { *val_p = val; return 1; } else if (val < 0xc0) { val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val & (((1LL<<(6+8)))-1); return 2; } else if (val < 0xe0) { val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val & ((1LL<<(5+2*8))-1); return 3; } else if (val < 0xf0) { val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val & ((1LL<<(4+3*8))-1); return 4; } else if (val < 0xf8) { val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val & ((1LL<<(3+4*8))-1); return 5; } else if (val < 0xfc) { val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val & ((1LL<<(2+5*8))-1); return 6; } else if (val < 0xfe) { val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val & ((1LL<<(1+6*8))-1); return 7; } else if (val < 0xff) { val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val & ((1LL<<(7*8))-1); return 8; } else { val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); val = (val<<8) | (unsigned char)hgetc(fd->fp); *val_p = val; } return 9; } /* * Pushes a value in ITF8 format onto the end of a block. * This shouldn't be used for high-volume data as it is not the fastest * method. * * Returns the number of bytes written */ int itf8_put_blk(cram_block *blk, int val) { char buf[5]; int sz; sz = itf8_put(buf, val); BLOCK_APPEND(blk, buf, sz); return sz; } /* * Decodes a 32-bit little endian value from fd and stores in val. * * Returns the number of bytes read on success * -1 on failure */ int int32_decode(cram_fd *fd, int32_t *val) { int32_t i; if (4 != hread(fd->fp, &i, 4)) return -1; *val = le_int4(i); return 4; } /* * Encodes a 32-bit little endian value 'val' and writes to fd. * * Returns the number of bytes written on success * -1 on failure */ int int32_encode(cram_fd *fd, int32_t val) { val = le_int4(val); if (4 != hwrite(fd->fp, &val, 4)) return -1; return 4; } /* As int32_decoded/encode, but from/to blocks instead of cram_fd */ int int32_get(cram_block *b, int32_t *val) { if (b->uncomp_size - BLOCK_SIZE(b) < 4) return -1; *val = b->data[b->byte ] | (b->data[b->byte+1] << 8) | (b->data[b->byte+2] << 16) | (b->data[b->byte+3] << 24); BLOCK_SIZE(b) += 4; return 4; } /* As int32_decoded/encode, but from/to blocks instead of cram_fd */ int int32_put(cram_block *b, int32_t val) { unsigned char cp[4]; cp[0] = ( val & 0xff); cp[1] = ((val>>8) & 0xff); cp[2] = ((val>>16) & 0xff); cp[3] = ((val>>24) & 0xff); BLOCK_APPEND(b, cp, 4); return b->data ? 0 : -1; } /* ---------------------------------------------------------------------- * zlib compression code - from Gap5's tg_iface_g.c * They're static here as they're only used within the cram_compress_block * and cram_uncompress_block functions, which are the external interface. */ char *zlib_mem_inflate(char *cdata, size_t csize, size_t *size) { z_stream s; unsigned char *data = NULL; /* Uncompressed output */ int data_alloc = 0; int err; /* Starting point at uncompressed size, and scale after that */ data = malloc(data_alloc = csize*1.2+100); if (!data) return NULL; /* Initialise zlib stream */ s.zalloc = Z_NULL; /* use default allocation functions */ s.zfree = Z_NULL; s.opaque = Z_NULL; s.next_in = (unsigned char *)cdata; s.avail_in = csize; s.total_in = 0; s.next_out = data; s.avail_out = data_alloc; s.total_out = 0; //err = inflateInit(&s); err = inflateInit2(&s, 15 + 32); if (err != Z_OK) { fprintf(stderr, "zlib inflateInit error: %s\n", s.msg); free(data); return NULL; } /* Decode to 'data' array */ for (;s.avail_in;) { unsigned char *data_tmp; int alloc_inc; s.next_out = &data[s.total_out]; err = inflate(&s, Z_NO_FLUSH); if (err == Z_STREAM_END) break; if (err != Z_OK) { fprintf(stderr, "zlib inflate error: %s\n", s.msg); break; } /* More to come, so realloc based on growth so far */ alloc_inc = (double)s.avail_in/s.total_in * s.total_out + 100; data = realloc((data_tmp = data), data_alloc += alloc_inc); if (!data) { free(data_tmp); return NULL; } s.avail_out += alloc_inc; } inflateEnd(&s); *size = s.total_out; return (char *)data; } static char *zlib_mem_deflate(char *data, size_t size, size_t *cdata_size, int level, int strat) { z_stream s; unsigned char *cdata = NULL; /* Compressed output */ int cdata_alloc = 0; int cdata_pos = 0; int err; cdata = malloc(cdata_alloc = size*1.05+100); if (!cdata) return NULL; cdata_pos = 0; /* Initialise zlib stream */ s.zalloc = Z_NULL; /* use default allocation functions */ s.zfree = Z_NULL; s.opaque = Z_NULL; s.next_in = (unsigned char *)data; s.avail_in = size; s.total_in = 0; s.next_out = cdata; s.avail_out = cdata_alloc; s.total_out = 0; s.data_type = Z_BINARY; err = deflateInit2(&s, level, Z_DEFLATED, 15|16, 9, strat); if (err != Z_OK) { fprintf(stderr, "zlib deflateInit2 error: %s\n", s.msg); return NULL; } /* Encode to 'cdata' array */ for (;s.avail_in;) { s.next_out = &cdata[cdata_pos]; s.avail_out = cdata_alloc - cdata_pos; if (cdata_alloc - cdata_pos <= 0) { fprintf(stderr, "Deflate produced larger output than expected. Abort\n"); return NULL; } err = deflate(&s, Z_NO_FLUSH); cdata_pos = cdata_alloc - s.avail_out; if (err != Z_OK) { fprintf(stderr, "zlib deflate error: %s\n", s.msg); break; } } if (deflate(&s, Z_FINISH) != Z_STREAM_END) { fprintf(stderr, "zlib deflate error: %s\n", s.msg); } *cdata_size = s.total_out; if (deflateEnd(&s) != Z_OK) { fprintf(stderr, "zlib deflate error: %s\n", s.msg); } return (char *)cdata; } #ifdef HAVE_LIBLZMA /* ------------------------------------------------------------------------ */ /* * Data compression routines using liblzma (xz) * * On a test set this shrunk the main db from 136157104 bytes to 114796168, but * caused tg_index to grow from 2m43.707s to 15m3.961s. Exporting as bfastq * went from 18.3s to 36.3s. So decompression suffers too, but not as bad * as compression times. * * For now we disable this functionality. If it's to be reenabled make sure you * improve the mem_inflate implementation as it's just a test hack at the * moment. */ static char *lzma_mem_deflate(char *data, size_t size, size_t *cdata_size, int level) { char *out; size_t out_size = lzma_stream_buffer_bound(size); *cdata_size = 0; out = malloc(out_size); /* Single call compression */ if (LZMA_OK != lzma_easy_buffer_encode(level, LZMA_CHECK_CRC32, NULL, (uint8_t *)data, size, (uint8_t *)out, cdata_size, out_size)) return NULL; return out; } static char *lzma_mem_inflate(char *cdata, size_t csize, size_t *size) { lzma_stream strm = LZMA_STREAM_INIT; size_t out_size = 0, out_pos = 0; char *out = NULL; int r; /* Initiate the decoder */ if (LZMA_OK != lzma_stream_decoder(&strm, 50000000, 0)) return NULL; /* Decode loop */ strm.avail_in = csize; strm.next_in = (uint8_t *)cdata; for (;strm.avail_in;) { if (strm.avail_in > out_size - out_pos) { out_size += strm.avail_in * 4 + 32768; out = realloc(out, out_size); } strm.avail_out = out_size - out_pos; strm.next_out = (uint8_t *)&out[out_pos]; r = lzma_code(&strm, LZMA_RUN); if (LZMA_OK != r && LZMA_STREAM_END != r) { fprintf(stderr, "r=%d\n", r); fprintf(stderr, "mem=%"PRId64"d\n", (int64_t)lzma_memusage(&strm)); return NULL; } out_pos = strm.total_out; if (r == LZMA_STREAM_END) break; } /* finish up any unflushed data; necessary? */ r = lzma_code(&strm, LZMA_FINISH); if (r != LZMA_OK && r != LZMA_STREAM_END) { fprintf(stderr, "r=%d\n", r); return NULL; } out = realloc(out, strm.total_out); *size = strm.total_out; lzma_end(&strm); return out; } #endif /* ---------------------------------------------------------------------- * CRAM blocks - the dynamically growable data block. We have code to * create, update, (un)compress and read/write. * * These are derived from the deflate_interlaced.c blocks, but with the * CRAM extension of content types and IDs. */ /* * Allocates a new cram_block structure with a specified content_type and * id. * * Returns block pointer on success * NULL on failure */ cram_block *cram_new_block(enum cram_content_type content_type, int content_id) { cram_block *b = malloc(sizeof(*b)); if (!b) return NULL; b->method = b->orig_method = RAW; b->content_type = content_type; b->content_id = content_id; b->comp_size = 0; b->uncomp_size = 0; b->data = NULL; b->alloc = 0; b->byte = 0; b->bit = 7; // MSB return b; } /* * Reads a block from a cram file. * Returns cram_block pointer on success. * NULL on failure */ cram_block *cram_read_block(cram_fd *fd) { cram_block *b = malloc(sizeof(*b)); if (!b) return NULL; //fprintf(stderr, "Block at %d\n", (int)ftell(fd->fp)); if (-1 == (b->method = hgetc(fd->fp))) { free(b); return NULL; } if (-1 == (b->content_type= hgetc(fd->fp))) { free(b); return NULL; } if (-1 == itf8_decode(fd, &b->content_id)) { free(b); return NULL; } if (-1 == itf8_decode(fd, &b->comp_size)) { free(b); return NULL; } if (-1 == itf8_decode(fd, &b->uncomp_size)) { free(b); return NULL; } // fprintf(stderr, " method %d, ctype %d, cid %d, csize %d, ucsize %d\n", // b->method, b->content_type, b->content_id, b->comp_size, b->uncomp_size); if (b->method == RAW) { b->alloc = b->uncomp_size; if (!(b->data = malloc(b->uncomp_size))){ free(b); return NULL; } if (b->uncomp_size != hread(fd->fp, b->data, b->uncomp_size)) { free(b->data); free(b); return NULL; } } else { b->alloc = b->comp_size; if (!(b->data = malloc(b->comp_size))) { free(b); return NULL; } if (b->comp_size != hread(fd->fp, b->data, b->comp_size)) { free(b->data); free(b); return NULL; } } if (CRAM_MAJOR_VERS(fd->version) >= 3) { unsigned char dat[100], *cp = dat;; uint32_t crc; if (-1 == int32_decode(fd, (int32_t *)&b->crc32)) { free(b); return NULL; } *cp++ = b->method; *cp++ = b->content_type; cp += itf8_put(cp, b->content_id); cp += itf8_put(cp, b->comp_size); cp += itf8_put(cp, b->uncomp_size); crc = crc32(0L, dat, cp-dat); crc = crc32(crc, b->data ? b->data : (uc *)"", b->alloc); if (crc != b->crc32) { fprintf(stderr, "Block CRC32 failure\n"); free(b->data); free(b); return NULL; } } b->orig_method = b->method; b->idx = 0; b->byte = 0; b->bit = 7; // MSB return b; } /* * Writes a CRAM block. * Returns 0 on success * -1 on failure */ int cram_write_block(cram_fd *fd, cram_block *b) { assert(b->method != RAW || (b->comp_size == b->uncomp_size)); if (hputc(b->method, fd->fp) == EOF) return -1; if (hputc(b->content_type, fd->fp) == EOF) return -1; if (itf8_encode(fd, b->content_id) == -1) return -1; if (itf8_encode(fd, b->comp_size) == -1) return -1; if (itf8_encode(fd, b->uncomp_size) == -1) return -1; if (b->method == RAW) { if (b->uncomp_size != hwrite(fd->fp, b->data, b->uncomp_size)) return -1; } else { if (b->comp_size != hwrite(fd->fp, b->data, b->comp_size)) return -1; } if (CRAM_MAJOR_VERS(fd->version) >= 3) { unsigned char dat[100], *cp = dat;; uint32_t crc; *cp++ = b->method; *cp++ = b->content_type; cp += itf8_put(cp, b->content_id); cp += itf8_put(cp, b->comp_size); cp += itf8_put(cp, b->uncomp_size); crc = crc32(0L, dat, cp-dat); if (b->method == RAW) { b->crc32 = crc32(crc, b->data ? b->data : (uc*)"", b->uncomp_size); } else { b->crc32 = crc32(crc, b->data ? b->data : (uc*)"", b->comp_size); } if (-1 == int32_encode(fd, b->crc32)) return -1; } return 0; } /* * Frees a CRAM block, deallocating internal data too. */ void cram_free_block(cram_block *b) { if (!b) return; if (b->data) free(b->data); free(b); } /* * Uncompresses a CRAM block, if compressed. */ int cram_uncompress_block(cram_block *b) { char *uncomp; size_t uncomp_size = 0; if (b->uncomp_size == 0) { // blank block b->method = RAW; return 0; } switch (b->method) { case RAW: return 0; case GZIP: uncomp = zlib_mem_inflate((char *)b->data, b->comp_size, &uncomp_size); if (!uncomp) return -1; if ((int)uncomp_size != b->uncomp_size) { free(uncomp); return -1; } free(b->data); b->data = (unsigned char *)uncomp; b->alloc = uncomp_size; b->method = RAW; break; #ifdef HAVE_LIBBZ2 case BZIP2: { unsigned int usize = b->uncomp_size; if (!(uncomp = malloc(usize))) return -1; if (BZ_OK != BZ2_bzBuffToBuffDecompress(uncomp, &usize, (char *)b->data, b->comp_size, 0, 0)) { free(uncomp); return -1; } free(b->data); b->data = (unsigned char *)uncomp; b->alloc = usize; b->method = RAW; b->uncomp_size = usize; // Just incase it differs break; } #else case BZIP2: fprintf(stderr, "Bzip2 compression is not compiled into this " "version.\nPlease rebuild and try again.\n"); return -1; #endif #ifdef HAVE_LIBLZMA case LZMA: uncomp = lzma_mem_inflate((char *)b->data, b->comp_size, &uncomp_size); if (!uncomp) return -1; if ((int)uncomp_size != b->uncomp_size) return -1; free(b->data); b->data = (unsigned char *)uncomp; b->alloc = uncomp_size; b->method = RAW; break; #else case LZMA: fprintf(stderr, "Lzma compression is not compiled into this " "version.\nPlease rebuild and try again.\n"); return -1; break; #endif case RANS: { unsigned int usize = b->uncomp_size, usize2; uncomp = (char *)rans_uncompress(b->data, b->comp_size, &usize2); assert(usize == usize2); free(b->data); b->data = (unsigned char *)uncomp; b->alloc = usize2; b->method = RAW; b->uncomp_size = usize2; // Just incase it differs //fprintf(stderr, "Expanded %d to %d\n", b->comp_size, b->uncomp_size); break; } default: return -1; } return 0; } static char *cram_compress_by_method(char *in, size_t in_size, size_t *out_size, enum cram_block_method method, int level, int strat) { switch (method) { case GZIP: return zlib_mem_deflate(in, in_size, out_size, level, strat); case BZIP2: { #ifdef HAVE_LIBBZ2 unsigned int comp_size = in_size*1.01 + 600; char *comp = malloc(comp_size); if (!comp) return NULL; if (BZ_OK != BZ2_bzBuffToBuffCompress(comp, &comp_size, in, in_size, level, 0, 30)) { free(comp); return NULL; } *out_size = comp_size; return comp; #else return NULL; #endif } case LZMA: #ifdef HAVE_LIBLZMA return lzma_mem_deflate(in, in_size, out_size, level); #else return NULL; #endif case RANS0: { unsigned int out_size_i; unsigned char *cp; cp = rans_compress((unsigned char *)in, in_size, &out_size_i, 0); *out_size = out_size_i; return (char *)cp; } case RANS1: { unsigned int out_size_i; unsigned char *cp; cp = rans_compress((unsigned char *)in, in_size, &out_size_i, 1); *out_size = out_size_i; return (char *)cp; } case RAW: break; default: return NULL; } return NULL; } /* * Compresses a block using one of two different zlib strategies. If we only * want one choice set strat2 to be -1. * * The logic here is that sometimes Z_RLE does a better job than Z_FILTERED * or Z_DEFAULT_STRATEGY on quality data. If so, we'd rather use it as it is * significantly faster. */ int cram_compress_block(cram_fd *fd, cram_block *b, cram_metrics *metrics, int method, int level) { char *comp = NULL; size_t comp_size = 0; int strat; //fprintf(stderr, "IN: block %d, sz %d\n", b->content_id, b->uncomp_size); if (method == RAW || level == 0 || b->uncomp_size == 0) { b->method = RAW; b->comp_size = b->uncomp_size; //fprintf(stderr, "Skip block id %d\n", b->content_id); return 0; } if (metrics) { pthread_mutex_lock(&fd->metrics_lock); if (metrics->trial > 0 || --metrics->next_trial <= 0) { size_t sz_best = INT_MAX; size_t sz_gz_rle = 0; size_t sz_gz_def = 0; size_t sz_rans0 = 0; size_t sz_rans1 = 0; size_t sz_bzip2 = 0; size_t sz_lzma = 0; int method_best = 0; char *c_best = NULL, *c = NULL; if (metrics->revised_method) method = metrics->revised_method; else metrics->revised_method = method; if (metrics->next_trial == 0) { metrics->next_trial = TRIAL_SPAN; metrics->trial = NTRIALS; metrics->sz_gz_rle /= 2; metrics->sz_gz_def /= 2; metrics->sz_rans0 /= 2; metrics->sz_rans1 /= 2; metrics->sz_bzip2 /= 2; metrics->sz_lzma /= 2; } pthread_mutex_unlock(&fd->metrics_lock); if (method & (1<<GZIP_RLE)) { c = cram_compress_by_method((char *)b->data, b->uncomp_size, &sz_gz_rle, GZIP, 1, Z_RLE); if (c && sz_best > sz_gz_rle) { sz_best = sz_gz_rle; method_best = GZIP_RLE; if (c_best) free(c_best); c_best = c; } else if (c) { free(c); } else { sz_gz_rle = b->uncomp_size*2+1000; } //fprintf(stderr, "Block %d; %d->%d\n", b->content_id, b->uncomp_size, sz_gz_rle); } if (method & (1<<GZIP)) { c = cram_compress_by_method((char *)b->data, b->uncomp_size, &sz_gz_def, GZIP, level, Z_FILTERED); if (c && sz_best > sz_gz_def) { sz_best = sz_gz_def; method_best = GZIP; if (c_best) free(c_best); c_best = c; } else if (c) { free(c); } else { sz_gz_def = b->uncomp_size*2+1000; } //fprintf(stderr, "Block %d; %d->%d\n", b->content_id, b->uncomp_size, sz_gz_def); } if (method & (1<<RANS0)) { c = cram_compress_by_method((char *)b->data, b->uncomp_size, &sz_rans0, RANS0, 0, 0); if (c && sz_best > sz_rans0) { sz_best = sz_rans0; method_best = RANS0; if (c_best) free(c_best); c_best = c; } else if (c) { free(c); } else { sz_rans0 = b->uncomp_size*2+1000; } } if (method & (1<<RANS1)) { c = cram_compress_by_method((char *)b->data, b->uncomp_size, &sz_rans1, RANS1, 0, 0); if (c && sz_best > sz_rans1) { sz_best = sz_rans1; method_best = RANS1; if (c_best) free(c_best); c_best = c; } else if (c) { free(c); } else { sz_rans1 = b->uncomp_size*2+1000; } } if (method & (1<<BZIP2)) { c = cram_compress_by_method((char *)b->data, b->uncomp_size, &sz_bzip2, BZIP2, level, 0); if (c && sz_best > sz_bzip2) { sz_best = sz_bzip2; method_best = BZIP2; if (c_best) free(c_best); c_best = c; } else if (c) { free(c); } else { sz_bzip2 = b->uncomp_size*2+1000; } } if (method & (1<<LZMA)) { c = cram_compress_by_method((char *)b->data, b->uncomp_size, &sz_lzma, LZMA, level, 0); if (c && sz_best > sz_lzma) { sz_best = sz_lzma; method_best = LZMA; if (c_best) free(c_best); c_best = c; } else if (c) { free(c); } else { sz_lzma = b->uncomp_size*2+1000; } } //fprintf(stderr, "sz_best = %d\n", sz_best); free(b->data); b->data = (unsigned char *)c_best; //printf("method_best = %s\n", cram_block_method2str(method_best)); b->method = method_best == GZIP_RLE ? GZIP : method_best; b->comp_size = sz_best; pthread_mutex_lock(&fd->metrics_lock); metrics->sz_gz_rle += sz_gz_rle; metrics->sz_gz_def += sz_gz_def; metrics->sz_rans0 += sz_rans0; metrics->sz_rans1 += sz_rans1; metrics->sz_bzip2 += sz_bzip2; metrics->sz_lzma += sz_lzma; if (--metrics->trial == 0) { int best_method = RAW; int best_sz = INT_MAX; // Scale methods by cost if (fd->level <= 3) { metrics->sz_rans1 *= 1.02; metrics->sz_gz_def *= 1.04; metrics->sz_bzip2 *= 1.08; metrics->sz_lzma *= 1.10; } else if (fd->level <= 6) { metrics->sz_rans1 *= 1.01; metrics->sz_gz_def *= 1.02; metrics->sz_bzip2 *= 1.03; metrics->sz_lzma *= 1.05; } if (method & (1<<GZIP_RLE) && best_sz > metrics->sz_gz_rle) best_sz = metrics->sz_gz_rle, best_method = GZIP_RLE; if (method & (1<<GZIP) && best_sz > metrics->sz_gz_def) best_sz = metrics->sz_gz_def, best_method = GZIP; if (method & (1<<RANS0) && best_sz > metrics->sz_rans0) best_sz = metrics->sz_rans0, best_method = RANS0; if (method & (1<<RANS1) && best_sz > metrics->sz_rans1) best_sz = metrics->sz_rans1, best_method = RANS1; if (method & (1<<BZIP2) && best_sz > metrics->sz_bzip2) best_sz = metrics->sz_bzip2, best_method = BZIP2; if (method & (1<<LZMA) && best_sz > metrics->sz_lzma) best_sz = metrics->sz_lzma, best_method = LZMA; if (best_method == GZIP_RLE) { metrics->method = GZIP; metrics->strat = Z_RLE; } else { metrics->method = best_method; metrics->strat = Z_FILTERED; } // If we see at least MAXFAIL trials in a row for a specific // compression method with more than MAXDELTA aggregate // size then we drop this from the list of methods used // for this block type. #define MAXDELTA 0.20 #define MAXFAILS 4 if (best_method == GZIP_RLE) { metrics->gz_rle_cnt = 0; metrics->gz_rle_extra = 0; } else if (best_sz < metrics->sz_gz_rle) { double r = (double)metrics->sz_gz_rle / best_sz - 1; if (++metrics->gz_rle_cnt >= MAXFAILS && (metrics->gz_rle_extra += r) >= MAXDELTA) method &= ~(1<<GZIP_RLE); } if (best_method == GZIP) { metrics->gz_def_cnt = 0; metrics->gz_def_extra = 0; } else if (best_sz < metrics->sz_gz_def) { double r = (double)metrics->sz_gz_def / best_sz - 1; if (++metrics->gz_def_cnt >= MAXFAILS && (metrics->gz_def_extra += r) >= MAXDELTA) method &= ~(1<<GZIP); } if (best_method == RANS0) { metrics->rans0_cnt = 0; metrics->rans0_extra = 0; } else if (best_sz < metrics->sz_rans0) { double r = (double)metrics->sz_rans0 / best_sz - 1; if (++metrics->rans0_cnt >= MAXFAILS && (metrics->rans0_extra += r) >= MAXDELTA) method &= ~(1<<RANS0); } if (best_method == RANS1) { metrics->rans1_cnt = 0; metrics->rans1_extra = 0; } else if (best_sz < metrics->sz_rans1) { double r = (double)metrics->sz_rans1 / best_sz - 1; if (++metrics->rans1_cnt >= MAXFAILS && (metrics->rans1_extra += r) >= MAXDELTA) method &= ~(1<<RANS1); } if (best_method == BZIP2) { metrics->bzip2_cnt = 0; metrics->bzip2_extra = 0; } else if (best_sz < metrics->sz_bzip2) { double r = (double)metrics->sz_bzip2 / best_sz - 1; if (++metrics->bzip2_cnt >= MAXFAILS && (metrics->bzip2_extra += r) >= MAXDELTA) method &= ~(1<<BZIP2); } if (best_method == LZMA) { metrics->lzma_cnt = 0; metrics->lzma_extra = 0; } else if (best_sz < metrics->sz_lzma) { double r = (double)metrics->sz_lzma / best_sz - 1; if (++metrics->lzma_cnt >= MAXFAILS && (metrics->lzma_extra += r) >= MAXDELTA) method &= ~(1<<LZMA); } //if (method != metrics->revised_method) // fprintf(stderr, "%d: method from %x to %x\n", // b->content_id, metrics->revised_method, method); metrics->revised_method = method; } pthread_mutex_unlock(&fd->metrics_lock); } else { strat = metrics->strat; method = metrics->method; pthread_mutex_unlock(&fd->metrics_lock); comp = cram_compress_by_method((char *)b->data, b->uncomp_size, &comp_size, method, level, strat); if (!comp) return -1; free(b->data); b->data = (unsigned char *)comp; b->comp_size = comp_size; b->method = method; } } else { // no cached metrics, so just do zlib? comp = cram_compress_by_method((char *)b->data, b->uncomp_size, &comp_size, GZIP, level, Z_FILTERED); if (!comp) { fprintf(stderr, "Compression failed!\n"); return -1; } free(b->data); b->data = (unsigned char *)comp; b->comp_size = comp_size; b->method = GZIP; } if (fd->verbose) fprintf(stderr, "Compressed block ID %d from %d to %d by method %s\n", b->content_id, b->uncomp_size, b->comp_size, cram_block_method2str(b->method)); if (b->method == RANS1) b->method = RANS0; // Spec just has RANS (not 0/1) with auto-sensing return 0; } cram_metrics *cram_new_metrics(void) { cram_metrics *m = calloc(1, sizeof(*m)); if (!m) return NULL; m->trial = NTRIALS-1; m->next_trial = TRIAL_SPAN; m->method = RAW; m->strat = 0; m->revised_method = 0; return m; } char *cram_block_method2str(enum cram_block_method m) { switch(m) { case RAW: return "RAW"; case GZIP: return "GZIP"; case BZIP2: return "BZIP2"; case LZMA: return "LZMA"; case RANS0: return "RANS0"; case RANS1: return "RANS1"; case GZIP_RLE: return "GZIP_RLE"; case ERROR: break; } return "?"; } char *cram_content_type2str(enum cram_content_type t) { switch (t) { case FILE_HEADER: return "FILE_HEADER"; case COMPRESSION_HEADER: return "COMPRESSION_HEADER"; case MAPPED_SLICE: return "MAPPED_SLICE"; case UNMAPPED_SLICE: return "UNMAPPED_SLICE"; case EXTERNAL: return "EXTERNAL"; case CORE: return "CORE"; case CT_ERROR: break; } return "?"; } /* * Extra error checking on fclose to really ensure data is written. * Care needs to be taken to handle pipes vs real files. * * Returns 0 on success * -1 on failure. */ int paranoid_fclose(FILE *fp) { if (-1 == fflush(fp) && errno != EBADF) { fclose(fp); return -1; } errno = 0; if (-1 == fsync(fileno(fp))) { if (errno != EINVAL) { // eg pipe fclose(fp); return -1; } } return fclose(fp); } /* ---------------------------------------------------------------------- * Reference sequence handling * * These revolve around the refs_t structure, which may potentially be * shared between multiple cram_fd. * * We start with refs_create() to allocate an empty refs_t and then * populate it with @SQ line data using refs_from_header(). This is done on * cram_open(). Also at start up we can call cram_load_reference() which * is used with "scramble -r foo.fa". This replaces the fd->refs with the * new one specified. In either case refs2id() is then called which * maps ref_entry names to @SQ ids (refs_t->ref_id[]). * * Later, possibly within a thread, we will want to know the actual ref * seq itself, obtained by calling cram_get_ref(). This may use the * UR: or M5: fields or the filename specified in the original * cram_load_reference() call. * * Given the potential for multi-threaded reference usage, we have * reference counting (sorry for the confusing double use of "ref") to * track the number of callers interested in any specific reference. */ void refs_free(refs_t *r) { RP("refs_free()\n"); if (--r->count > 0) return; if (!r) return; if (r->pool) string_pool_destroy(r->pool); if (r->h_meta) { khint_t k; for (k = kh_begin(r->h_meta); k != kh_end(r->h_meta); k++) { ref_entry *e; if (!kh_exist(r->h_meta, k)) continue; if (!(e = kh_val(r->h_meta, k))) continue; if (e->seq) free(e->seq); free(e); } kh_destroy(refs, r->h_meta); } if (r->ref_id) free(r->ref_id); if (r->fp) bgzf_close(r->fp); pthread_mutex_destroy(&r->lock); free(r); } static refs_t *refs_create(void) { refs_t *r = calloc(1, sizeof(*r)); RP("refs_create()\n"); if (!r) return NULL; if (!(r->pool = string_pool_create(8192))) goto err; r->ref_id = NULL; // see refs2id() to populate. r->count = 1; r->last = NULL; r->last_id = -1; if (!(r->h_meta = kh_init(refs))) goto err; pthread_mutex_init(&r->lock, NULL); return r; err: refs_free(r); return NULL; } /* * Opens a reference fasta file as a BGZF stream, allowing for * compressed files. It automatically builds a .fai file if * required and if compressed a .gzi bgzf index too. * * Returns a BGZF handle on success; * NULL on failure. */ static BGZF *bgzf_open_ref(char *fn, char *mode) { BGZF *fp; char fai_file[PATH_MAX]; snprintf(fai_file, PATH_MAX, "%s.fai", fn); if (access(fai_file, R_OK) != 0) if (fai_build(fn) != 0) return NULL; if (!(fp = bgzf_open(fn, mode))) { perror(fn); return NULL; } if (fp->is_compressed == 1 && bgzf_index_load(fp, fn, ".gzi") < 0) { fprintf(stderr, "Unable to load .gzi index '%s.gzi'\n", fn); bgzf_close(fp); return NULL; } return fp; } /* * Loads a FAI file for a reference.fasta. * "is_err" indicates whether failure to load is worthy of emitting an * error message. In some cases (eg with embedded references) we * speculatively load, just incase, and silently ignore errors. * * Returns the refs_t struct on success (maybe newly allocated); * NULL on failure */ static refs_t *refs_load_fai(refs_t *r_orig, char *fn, int is_err) { struct stat sb; FILE *fp = NULL; char fai_fn[PATH_MAX]; char line[8192]; refs_t *r = r_orig; size_t fn_l = strlen(fn); int id = 0, id_alloc = 0; RP("refs_load_fai %s\n", fn); if (!r) if (!(r = refs_create())) goto err; /* Open reference, for later use */ if (stat(fn, &sb) != 0) { if (is_err) perror(fn); goto err; } if (r->fp) if (bgzf_close(r->fp) != 0) goto err; r->fp = NULL; if (!(r->fn = string_dup(r->pool, fn))) goto err; if (fn_l > 4 && strcmp(&fn[fn_l-4], ".fai") == 0) r->fn[fn_l-4] = 0; if (!(r->fp = bgzf_open_ref(r->fn, "r"))) goto err; /* Parse .fai file and load meta-data */ sprintf(fai_fn, "%.*s.fai", PATH_MAX-5, r->fn); if (stat(fai_fn, &sb) != 0) { if (is_err) perror(fai_fn); goto err; } if (!(fp = fopen(fai_fn, "r"))) { if (is_err) perror(fai_fn); goto err; } while (fgets(line, 8192, fp) != NULL) { ref_entry *e = malloc(sizeof(*e)); char *cp; int n; khint_t k; if (!e) return NULL; // id for (cp = line; *cp && !isspace(*cp); cp++) ; *cp++ = 0; e->name = string_dup(r->pool, line); // length while (*cp && isspace(*cp)) cp++; e->length = strtoll(cp, &cp, 10); // offset while (*cp && isspace(*cp)) cp++; e->offset = strtoll(cp, &cp, 10); // bases per line while (*cp && isspace(*cp)) cp++; e->bases_per_line = strtol(cp, &cp, 10); // line length while (*cp && isspace(*cp)) cp++; e->line_length = strtol(cp, &cp, 10); // filename e->fn = r->fn; e->count = 0; e->seq = NULL; k = kh_put(refs, r->h_meta, e->name, &n); if (-1 == n) { free(e); return NULL; } if (n) { kh_val(r->h_meta, k) = e; } else { ref_entry *re = kh_val(r->h_meta, k); if (re && (re->count != 0 || re->length != 0)) { /* Keep old */ free(e); } else { /* Replace old */ if (re) free(re); kh_val(r->h_meta, k) = e; } } if (id >= id_alloc) { int x; id_alloc = id_alloc ?id_alloc*2 : 16; r->ref_id = realloc(r->ref_id, id_alloc * sizeof(*r->ref_id)); for (x = id; x < id_alloc; x++) r->ref_id[x] = NULL; } r->ref_id[id] = e; r->nref = ++id; } return r; err: if (fp) fclose(fp); if (!r_orig) refs_free(r); return NULL; } /* * Indexes references by the order they appear in a BAM file. This may not * necessarily be the same order they appear in the fasta reference file. * * Returns 0 on success * -1 on failure */ int refs2id(refs_t *r, SAM_hdr *h) { int i; if (r->ref_id) free(r->ref_id); if (r->last) r->last = NULL; r->ref_id = calloc(h->nref, sizeof(*r->ref_id)); if (!r->ref_id) return -1; r->nref = h->nref; for (i = 0; i < h->nref; i++) { khint_t k = kh_get(refs, r->h_meta, h->ref[i].name); if (k != kh_end(r->h_meta)) { r->ref_id[i] = kh_val(r->h_meta, k); } else { fprintf(stderr, "Unable to find ref name '%s'\n", h->ref[i].name); } } return 0; } /* * Generates refs_t entries based on @SQ lines in the header. * Returns 0 on success * -1 on failure */ static int refs_from_header(refs_t *r, cram_fd *fd, SAM_hdr *h) { int i, j; if (!h || h->nref == 0) return 0; //fprintf(stderr, "refs_from_header for %p mode %c\n", fd, fd->mode); /* Existing refs are fine, as long as they're compatible with the hdr. */ if (!(r->ref_id = realloc(r->ref_id, (r->nref + h->nref) * sizeof(*r->ref_id)))) return -1; /* Copy info from h->ref[i] over to r */ for (i = 0, j = r->nref; i < h->nref; i++) { SAM_hdr_type *ty; SAM_hdr_tag *tag; khint_t k; int n; k = kh_get(refs, r->h_meta, h->ref[i].name); if (k != kh_end(r->h_meta)) // Ref already known about continue; if (!(r->ref_id[j] = calloc(1, sizeof(ref_entry)))) return -1; if (!h->ref[j].name) return -1; r->ref_id[j]->name = string_dup(r->pool, h->ref[i].name); r->ref_id[j]->length = 0; // marker for not yet loaded /* Initialise likely filename if known */ if ((ty = sam_hdr_find(h, "SQ", "SN", h->ref[i].name))) { if ((tag = sam_hdr_find_key(h, ty, "M5", NULL))) { r->ref_id[j]->fn = string_dup(r->pool, tag->str+3); //fprintf(stderr, "Tagging @SQ %s / %s\n", r->ref_id[h]->name, r->ref_id[h]->fn); } } k = kh_put(refs, r->h_meta, r->ref_id[j]->name, &n); if (n <= 0) // already exists or error return -1; kh_val(r->h_meta, k) = r->ref_id[j]; j++; } r->nref = j; return 0; } /* * Attaches a header to a cram_fd. * * This should be used when creating a new cram_fd for writing where * we have an SAM_hdr already constructed (eg from a file we've read * in). */ int cram_set_header(cram_fd *fd, SAM_hdr *hdr) { if (fd->header) sam_hdr_free(fd->header); fd->header = hdr; return refs_from_header(fd->refs, fd, hdr); } /* * Converts a directory and a filename into an expanded path, replacing %s * in directory with the filename and %[0-9]+s with portions of the filename * Any remaining parts of filename are added to the end with /%s. */ void expand_cache_path(char *path, char *dir, char *fn) { char *cp; while ((cp = strchr(dir, '%'))) { strncpy(path, dir, cp-dir); path += cp-dir; if (*++cp == 's') { strcpy(path, fn); path += strlen(fn); fn += strlen(fn); cp++; } else if (*cp >= '0' && *cp <= '9') { char *endp; long l; l = strtol(cp, &endp, 10); l = MIN(l, strlen(fn)); if (*endp == 's') { strncpy(path, fn, l); path += l; fn += l; *path = 0; cp = endp+1; } else { *path++ = '%'; *path++ = *cp++; } } else { *path++ = '%'; *path++ = *cp++; } dir = cp; } strcpy(path, dir); path += strlen(dir); if (*fn && path[-1] != '/') *path++ = '/'; strcpy(path, fn); } /* * Make the directory containing path and any prefix directories. */ void mkdir_prefix(char *path, int mode) { char *cp = strrchr(path, '/'); if (!cp) return; *cp = 0; if (is_directory(path)) { *cp = '/'; return; } if (mkdir(path, mode) == 0) { chmod(path, mode); *cp = '/'; return; } mkdir_prefix(path, mode); mkdir(path, mode); chmod(path, mode); *cp = '/'; } /* * Return the cache directory to use, based on the first of these * environment variables to be set to a non-empty value. */ static const char *get_cache_basedir(const char **extra) { char *base; *extra = ""; base = getenv("XDG_CACHE_HOME"); if (base && *base) return base; base = getenv("HOME"); if (base && *base) { *extra = "/.cache"; return base; } base = getenv("TMPDIR"); if (base && *base) return base; base = getenv("TEMP"); if (base && *base) return base; return "/tmp"; } /* * Queries the M5 string from the header and attempts to populate the * reference from this using the REF_PATH environment. * * Returns 0 on sucess * -1 on failure */ static int cram_populate_ref(cram_fd *fd, int id, ref_entry *r) { char *ref_path = getenv("REF_PATH"); SAM_hdr_type *ty; SAM_hdr_tag *tag; char path[PATH_MAX], path_tmp[PATH_MAX], cache[PATH_MAX]; char *local_cache = getenv("REF_CACHE"); mFILE *mf; if (fd->verbose) fprintf(stderr, "cram_populate_ref on fd %p, id %d\n", fd, id); if (!ref_path || *ref_path == '\0') { /* * If we have no ref path, we use the EBI server. * However to avoid spamming it we require a local ref cache too. */ ref_path = "http://www.ebi.ac.uk:80/ena/cram/md5/%s"; if (!local_cache || *local_cache == '\0') { const char *extra; const char *base = get_cache_basedir(&extra); snprintf(cache,PATH_MAX, "%s%s/hts-ref/%%2s/%%2s/%%s", base, extra); local_cache = cache; if (fd->verbose) fprintf(stderr, "Populating local cache: %s\n", local_cache); } } if (!r->name) return -1; if (!(ty = sam_hdr_find(fd->header, "SQ", "SN", r->name))) return -1; if (!(tag = sam_hdr_find_key(fd->header, ty, "M5", NULL))) goto no_M5; if (fd->verbose) fprintf(stderr, "Querying ref %s\n", tag->str+3); /* Use cache if available */ if (local_cache && *local_cache) { struct stat sb; BGZF *fp; expand_cache_path(path, local_cache, tag->str+3); if (0 == stat(path, &sb) && (fp = bgzf_open(path, "r"))) { r->length = sb.st_size; r->offset = r->line_length = r->bases_per_line = 0; r->fn = string_dup(fd->refs->pool, path); if (fd->refs->fp) if (bgzf_close(fd->refs->fp) != 0) return -1; fd->refs->fp = fp; fd->refs->fn = r->fn; // Fall back to cram_get_ref() where it'll do the actual // reading of the file. return 0; } } /* Otherwise search */ if ((mf = open_path_mfile(tag->str+3, ref_path, NULL))) { size_t sz; r->seq = mfsteal(mf, &sz); r->length = sz; } else { refs_t *refs; char *fn; no_M5: /* Failed to find in search path or M5 cache, see if @SQ UR: tag? */ if (!(tag = sam_hdr_find_key(fd->header, ty, "UR", NULL))) return -1; fn = (strncmp(tag->str+3, "file:", 5) == 0) ? tag->str+8 : tag->str+3; if (fd->refs->fp) { if (bgzf_close(fd->refs->fp) != 0) return -1; fd->refs->fp = NULL; } if (!(refs = refs_load_fai(fd->refs, fn, 0))) return -1; fd->refs = refs; if (fd->refs->fp) { if (bgzf_close(fd->refs->fp) != 0) return -1; fd->refs->fp = NULL; } if (!fd->refs->fn) return -1; if (-1 == refs2id(fd->refs, fd->header)) return -1; if (!fd->refs->ref_id || !fd->refs->ref_id[id]) return -1; // Local copy already, so fall back to cram_get_ref(). return 0; } /* Populate the local disk cache if required */ if (local_cache && *local_cache) { FILE *fp; int i; expand_cache_path(path, local_cache, tag->str+3); if (fd->verbose) fprintf(stderr, "Path='%s'\n", path); mkdir_prefix(path, 01777); i = 0; do { sprintf(path_tmp, "%s.tmp_%d", path, /*getpid(),*/ i); i++; fp = fopen(path_tmp, "wx"); } while (fp == NULL && errno == EEXIST); if (!fp) { perror(path_tmp); // Not fatal - we have the data already so keep going. return 0; } if (r->length != fwrite(r->seq, 1, r->length, fp)) { perror(path); } if (-1 == paranoid_fclose(fp)) { unlink(path_tmp); } else { if (0 == chmod(path_tmp, 0444)) rename(path_tmp, path); else unlink(path_tmp); } } return 0; } static void cram_ref_incr_locked(refs_t *r, int id) { RP("%d INC REF %d, %d %p\n", gettid(), id, (int)(id>=0?r->ref_id[id]->count+1:-999), id>=0?r->ref_id[id]->seq:(char *)1); if (id < 0 || !r->ref_id[id]->seq) return; if (r->last_id == id) r->last_id = -1; ++r->ref_id[id]->count; } void cram_ref_incr(refs_t *r, int id) { pthread_mutex_lock(&r->lock); cram_ref_incr_locked(r, id); pthread_mutex_unlock(&r->lock); } static void cram_ref_decr_locked(refs_t *r, int id) { RP("%d DEC REF %d, %d %p\n", gettid(), id, (int)(id>=0?r->ref_id[id]->count-1:-999), id>=0?r->ref_id[id]->seq:(char *)1); if (id < 0 || !r->ref_id[id]->seq) { assert(r->ref_id[id]->count >= 0); return; } if (--r->ref_id[id]->count <= 0) { assert(r->ref_id[id]->count == 0); if (r->last_id >= 0) { if (r->ref_id[r->last_id]->count <= 0 && r->ref_id[r->last_id]->seq) { RP("%d FREE REF %d (%p)\n", gettid(), r->last_id, r->ref_id[r->last_id]->seq); free(r->ref_id[r->last_id]->seq); r->ref_id[r->last_id]->seq = NULL; r->ref_id[r->last_id]->length = 0; } } r->last_id = id; } } void cram_ref_decr(refs_t *r, int id) { pthread_mutex_lock(&r->lock); cram_ref_decr_locked(r, id); pthread_mutex_unlock(&r->lock); } /* * Used by cram_ref_load and cram_ref_get. The file handle will have * already been opened, so we can catch it. The ref_entry *e informs us * of whether this is a multi-line fasta file or a raw MD5 style file. * Either way we create a single contiguous sequence. * * Returns all or part of a reference sequence on success (malloced); * NULL on failure. */ static char *load_ref_portion(BGZF *fp, ref_entry *e, int start, int end) { off_t offset, len; char *seq; if (end < start) end = start; /* * Compute locations in file. This is trivial for the MD5 files, but * is still necessary for the fasta variants. */ offset = e->line_length ? e->offset + (start-1)/e->bases_per_line * e->line_length + (start-1) % e->bases_per_line : start-1; len = (e->line_length ? e->offset + (end-1)/e->bases_per_line * e->line_length + (end-1) % e->bases_per_line : end-1) - offset + 1; if (bgzf_useek(fp, offset, SEEK_SET) < 0) { perror("bgzf_useek() on reference file"); return NULL; } if (len == 0 || !(seq = malloc(len))) { return NULL; } if (len != bgzf_read(fp, seq, len)) { perror("bgzf_read() on reference file"); free(seq); return NULL; } /* Strip white-space if required. */ if (len != end-start+1) { int i, j; char *cp = seq; char *cp_to; for (i = j = 0; i < len; i++) { if (cp[i] >= '!' && cp[i] <= '~') cp[j++] = cp[i] & ~0x20; } cp_to = cp+j; if (cp_to - seq != end-start+1) { fprintf(stderr, "Malformed reference file?\n"); free(seq); return NULL; } } else { int i; for (i = 0; i < len; i++) { seq[i] = seq[i] & ~0x20; // uppercase in ASCII } } return seq; } /* * Load the entire reference 'id'. * This also increments the reference count by 1. * * Returns ref_entry on success; * NULL on failure */ ref_entry *cram_ref_load(refs_t *r, int id) { ref_entry *e = r->ref_id[id]; int start = 1, end = e->length; char *seq; if (e->seq) { return e; } assert(e->count == 0); if (r->last) { #ifdef REF_DEBUG int idx = 0; for (idx = 0; idx < r->nref; idx++) if (r->last == r->ref_id[idx]) break; RP("%d cram_ref_load DECR %d\n", gettid(), idx); #endif assert(r->last->count > 0); if (--r->last->count <= 0) { RP("%d FREE REF %d (%p)\n", gettid(), id, r->ref_id[id]->seq); if (r->last->seq) { free(r->last->seq); r->last->seq = NULL; } } } /* Open file if it's not already the current open reference */ if (strcmp(r->fn, e->fn) || r->fp == NULL) { if (r->fp) if (bgzf_close(r->fp) != 0) return NULL; r->fn = e->fn; if (!(r->fp = bgzf_open_ref(r->fn, "r"))) return NULL; } RP("%d Loading ref %d (%d..%d)\n", gettid(), id, start, end); if (!(seq = load_ref_portion(r->fp, e, start, end))) { return NULL; } RP("%d Loaded ref %d (%d..%d) = %p\n", gettid(), id, start, end, seq); RP("%d INC REF %d, %d\n", gettid(), id, (int)(e->count+1)); e->seq = seq; e->count++; /* * Also keep track of last used ref so incr/decr loops on the same * sequence don't cause load/free loops. */ RP("%d cram_ref_load INCR %d => %d\n", gettid(), id, e->count+1); r->last = e; e->count++; return e; } /* * Returns a portion of a reference sequence from start to end inclusive. * The returned pointer is owned by either the cram_file fd or by the * internal refs_t structure and should not be freed by the caller. * * The difference is whether or not this refs_t is in use by just the one * cram_fd or by multiples, or whether we have multiple threads accessing * references. In either case fd->shared will be true and we start using * reference counting to track the number of users of a specific reference * sequence. * * Otherwise the ref seq returned is allocated as part of cram_fd itself * and will be freed up on the next call to cram_get_ref or cram_close. * * To return the entire reference sequence, specify start as 1 and end * as 0. * * To cease using a reference, call cram_ref_decr(). * * Returns reference on success, * NULL on failure */ char *cram_get_ref(cram_fd *fd, int id, int start, int end) { ref_entry *r; char *seq; int ostart = start; if (id == -1) return NULL; /* FIXME: axiomatic query of r->seq being true? * Or shortcut for unsorted data where we load once and never free? */ //fd->shared_ref = 1; // hard code for now to simplify things pthread_mutex_lock(&fd->ref_lock); RP("%d cram_get_ref on fd %p, id %d, range %d..%d\n", gettid(), fd, id, start, end); /* * Unsorted data implies we want to fetch an entire reference at a time. * We just deal with this at the moment by claiming we're sharing * references instead, which has the same requirement. */ if (fd->unsorted) fd->shared_ref = 1; /* Sanity checking: does this ID exist? */ if (id >= fd->refs->nref) { fprintf(stderr, "No reference found for id %d\n", id); pthread_mutex_unlock(&fd->ref_lock); return NULL; } if (!fd->refs || !fd->refs->ref_id[id]) { fprintf(stderr, "No reference found for id %d\n", id); pthread_mutex_unlock(&fd->ref_lock); return NULL; } if (!(r = fd->refs->ref_id[id])) { fprintf(stderr, "No reference found for id %d\n", id); pthread_mutex_unlock(&fd->ref_lock); return NULL; } /* * It has an entry, but may not have been populated yet. * Any manually loaded .fai files have their lengths known. * A ref entry computed from @SQ lines (M5 or UR field) will have * r->length == 0 unless it's been loaded once and verified that we have * an on-disk filename for it. * * 19 Sep 2013: Moved the lock here as the cram_populate_ref code calls * open_path_mfile and libcurl, which isn't multi-thread safe unless I * rewrite my code to have one curl handle per thread. */ pthread_mutex_lock(&fd->refs->lock); if (r->length == 0) { if (cram_populate_ref(fd, id, r) == -1) { fprintf(stderr, "Failed to populate reference for id %d\n", id); pthread_mutex_unlock(&fd->refs->lock); pthread_mutex_unlock(&fd->ref_lock); return NULL; } r = fd->refs->ref_id[id]; if (fd->unsorted) cram_ref_incr_locked(fd->refs, id); } /* * We now know that we the filename containing the reference, so check * for limits. If it's over half the reference we'll load all of it in * memory as this will speed up subsequent calls. */ if (end < 1) end = r->length; if (end >= r->length) end = r->length; assert(start >= 1); if (end - start >= 0.5*r->length || fd->shared_ref) { start = 1; end = r->length; } /* * Maybe we have it cached already? If so use it. * * Alternatively if we don't have the sequence but we're sharing * references and/or are asking for the entire length of it, then * load the full reference into the refs structure and return * a pointer to that one instead. */ if (fd->shared_ref || r->seq || (start == 1 && end == r->length)) { char *cp; if (id >= 0) { if (r->seq) { cram_ref_incr_locked(fd->refs, id); } else { ref_entry *e; if (!(e = cram_ref_load(fd->refs, id))) { pthread_mutex_unlock(&fd->refs->lock); pthread_mutex_unlock(&fd->ref_lock); return NULL; } /* unsorted data implies cache ref indefinitely, to avoid * continually loading and unloading. */ if (fd->unsorted) cram_ref_incr_locked(fd->refs, id); } fd->ref = NULL; /* We never access it directly */ fd->ref_start = 1; fd->ref_end = r->length; fd->ref_id = id; cp = fd->refs->ref_id[id]->seq + ostart-1; } else { fd->ref = NULL; cp = NULL; } RP("%d cram_get_ref returning for id %d, count %d\n", gettid(), id, (int)r->count); pthread_mutex_unlock(&fd->refs->lock); pthread_mutex_unlock(&fd->ref_lock); return cp; } /* * Otherwise we're not sharing, we don't have a copy of it already and * we're only asking for a small portion of it. * * In this case load up just that segment ourselves, freeing any old * small segments in the process. */ /* Unmapped ref ID */ if (id < 0) { if (fd->ref_free) { free(fd->ref_free); fd->ref_free = NULL; } fd->ref = NULL; fd->ref_id = id; pthread_mutex_unlock(&fd->refs->lock); pthread_mutex_unlock(&fd->ref_lock); return NULL; } /* Open file if it's not already the current open reference */ if (strcmp(fd->refs->fn, r->fn) || fd->refs->fp == NULL) { if (fd->refs->fp) if (bgzf_close(fd->refs->fp) != 0) return NULL; fd->refs->fn = r->fn; if (!(fd->refs->fp = bgzf_open_ref(fd->refs->fn, "r"))) { pthread_mutex_unlock(&fd->refs->lock); pthread_mutex_unlock(&fd->ref_lock); return NULL; } } if (!(fd->ref = load_ref_portion(fd->refs->fp, r, start, end))) { pthread_mutex_unlock(&fd->refs->lock); pthread_mutex_unlock(&fd->ref_lock); return NULL; } if (fd->ref_free) free(fd->ref_free); fd->ref_id = id; fd->ref_start = start; fd->ref_end = end; fd->ref_free = fd->ref; seq = fd->ref; pthread_mutex_unlock(&fd->refs->lock); pthread_mutex_unlock(&fd->ref_lock); return seq + ostart - start; } /* * If fd has been opened for reading, it may be permitted to specify 'fn' * as NULL and let the code auto-detect the reference by parsing the * SAM header @SQ lines. */ int cram_load_reference(cram_fd *fd, char *fn) { if (fn) { fd->refs = refs_load_fai(fd->refs, fn, !(fd->embed_ref && fd->mode == 'r')); fn = fd->refs ? fd->refs->fn : NULL; } fd->ref_fn = fn; if ((!fd->refs || (fd->refs->nref == 0 && !fn)) && fd->header) { if (fd->refs) refs_free(fd->refs); if (!(fd->refs = refs_create())) return -1; if (-1 == refs_from_header(fd->refs, fd, fd->header)) return -1; } if (fd->header) if (-1 == refs2id(fd->refs, fd->header)) return -1; return fn ? 0 : -1; } /* ---------------------------------------------------------------------- * Containers */ /* * Creates a new container, specifying the maximum number of slices * and records permitted. * * Returns cram_container ptr on success * NULL on failure */ cram_container *cram_new_container(int nrec, int nslice) { cram_container *c = calloc(1, sizeof(*c)); enum cram_DS_ID id; if (!c) return NULL; c->curr_ref = -2; c->max_c_rec = nrec * nslice; c->curr_c_rec = 0; c->max_rec = nrec; c->record_counter = 0; c->num_bases = 0; c->max_slice = nslice; c->curr_slice = 0; c->pos_sorted = 1; c->max_apos = 0; c->multi_seq = 0; c->bams = NULL; if (!(c->slices = (cram_slice **)calloc(nslice, sizeof(cram_slice *)))) goto err; c->slice = NULL; if (!(c->comp_hdr = cram_new_compression_header())) goto err; c->comp_hdr_block = NULL; for (id = DS_RN; id < DS_TN; id++) if (!(c->stats[id] = cram_stats_create())) goto err; //c->aux_B_stats = cram_stats_create(); if (!(c->tags_used = kh_init(s_i2i))) goto err; c->refs_used = 0; return c; err: if (c) { if (c->slices) free(c->slices); free(c); } return NULL; } void cram_free_container(cram_container *c) { enum cram_DS_ID id; int i; if (!c) return; if (c->refs_used) free(c->refs_used); if (c->landmark) free(c->landmark); if (c->comp_hdr) cram_free_compression_header(c->comp_hdr); if (c->comp_hdr_block) cram_free_block(c->comp_hdr_block); if (c->slices) { for (i = 0; i < c->max_slice; i++) if (c->slices[i]) cram_free_slice(c->slices[i]); free(c->slices); } for (id = DS_RN; id < DS_TN; id++) if (c->stats[id]) cram_stats_free(c->stats[id]); //if (c->aux_B_stats) cram_stats_free(c->aux_B_stats); if (c->tags_used) kh_destroy(s_i2i, c->tags_used); free(c); } /* * Reads a container header. * * Returns cram_container on success * NULL on failure or no container left (fd->err == 0). */ cram_container *cram_read_container(cram_fd *fd) { cram_container c2, *c; int i, s; size_t rd = 0; fd->err = 0; fd->eof = 0; memset(&c2, 0, sizeof(c2)); if (CRAM_MAJOR_VERS(fd->version) == 1) { if ((s = itf8_decode(fd, &c2.length)) == -1) { fd->eof = fd->empty_container ? 1 : 2; return NULL; } else { rd+=s; } } else { if ((s = int32_decode(fd, &c2.length)) == -1) { if (CRAM_MAJOR_VERS(fd->version) == 2 && CRAM_MINOR_VERS(fd->version) == 0) fd->eof = 1; // EOF blocks arrived in v2.1 else fd->eof = fd->empty_container ? 1 : 2; return NULL; } else { rd+=s; } } if ((s = itf8_decode(fd, &c2.ref_seq_id)) == -1) return NULL; else rd+=s; if ((s = itf8_decode(fd, &c2.ref_seq_start))== -1) return NULL; else rd+=s; if ((s = itf8_decode(fd, &c2.ref_seq_span)) == -1) return NULL; else rd+=s; if ((s = itf8_decode(fd, &c2.num_records)) == -1) return NULL; else rd+=s; if (CRAM_MAJOR_VERS(fd->version) == 1) { c2.record_counter = 0; c2.num_bases = 0; } else { if (CRAM_MAJOR_VERS(fd->version) >= 3) { if ((s = ltf8_decode(fd, &c2.record_counter)) == -1) return NULL; else rd += s; } else { int32_t i32; if ((s = itf8_decode(fd, &i32)) == -1) return NULL; else rd += s; c2.record_counter = i32; } if ((s = ltf8_decode(fd, &c2.num_bases))== -1) return NULL; else rd += s; } if ((s = itf8_decode(fd, &c2.num_blocks)) == -1) return NULL; else rd+=s; if ((s = itf8_decode(fd, &c2.num_landmarks))== -1) return NULL; else rd+=s; if (!(c = calloc(1, sizeof(*c)))) return NULL; *c = c2; if (!(c->landmark = malloc(c->num_landmarks * sizeof(int32_t))) && c->num_landmarks) { fd->err = errno; cram_free_container(c); return NULL; } for (i = 0; i < c->num_landmarks; i++) { if ((s = itf8_decode(fd, &c->landmark[i])) == -1) { cram_free_container(c); return NULL; } else { rd += s; } } if (CRAM_MAJOR_VERS(fd->version) >= 3) { uint32_t crc, i; unsigned char *dat = malloc(50 + 5*(c->num_landmarks)), *cp = dat; if (!dat) { cram_free_container(c); return NULL; } if (-1 == int32_decode(fd, (int32_t *)&c->crc32)) return NULL; else rd+=4; /* Reencode first as we can't easily access the original byte stream. * * FIXME: Technically this means this may not be fool proof. We could * create a CRAM file using a 2 byte ITF8 value that can fit in a * 1 byte field, meaning the encoding is different to the original * form and so has a different CRC. * * The correct implementation would be to have an alternative form * of itf8_decode which also squirrels away the raw byte stream * during decoding so we can then CRC that. */ *(unsigned int *)cp = le_int4(c->length); cp += 4; cp += itf8_put(cp, c->ref_seq_id); cp += itf8_put(cp, c->ref_seq_start); cp += itf8_put(cp, c->ref_seq_span); cp += itf8_put(cp, c->num_records); cp += ltf8_put((char *)cp, c->record_counter); cp += itf8_put(cp, c->num_bases); cp += itf8_put(cp, c->num_blocks); cp += itf8_put(cp, c->num_landmarks); for (i = 0; i < c->num_landmarks; i++) { cp += itf8_put(cp, c->landmark[i]); } crc = crc32(0L, dat, cp-dat); if (crc != c->crc32) { fprintf(stderr, "Container header CRC32 failure\n"); cram_free_container(c); return NULL; } } c->offset = rd; c->slices = NULL; c->curr_slice = 0; c->max_slice = c->num_landmarks; c->slice_rec = 0; c->curr_rec = 0; c->max_rec = 0; if (c->ref_seq_id == -2) { c->multi_seq = 1; fd->multi_seq = 1; } fd->empty_container = (c->num_records == 0 && c->ref_seq_id == -1 && c->ref_seq_start == 0x454f46 /* EOF */) ? 1 : 0; return c; } /* * Writes a container structure. * * Returns 0 on success * -1 on failure */ int cram_write_container(cram_fd *fd, cram_container *c) { char buf_a[1024], *buf = buf_a, *cp; int i; if (55 + c->num_landmarks * 5 >= 1024) buf = malloc(55 + c->num_landmarks * 5); cp = buf; if (CRAM_MAJOR_VERS(fd->version) == 1) { cp += itf8_put(cp, c->length); } else { *(int32_t *)cp = le_int4(c->length); cp += 4; } if (c->multi_seq) { cp += itf8_put(cp, -2); cp += itf8_put(cp, 0); cp += itf8_put(cp, 0); } else { cp += itf8_put(cp, c->ref_seq_id); cp += itf8_put(cp, c->ref_seq_start); cp += itf8_put(cp, c->ref_seq_span); } cp += itf8_put(cp, c->num_records); if (CRAM_MAJOR_VERS(fd->version) == 2) { cp += itf8_put(cp, c->record_counter); cp += ltf8_put(cp, c->num_bases); } else if (CRAM_MAJOR_VERS(fd->version) >= 3) { cp += ltf8_put(cp, c->record_counter); cp += ltf8_put(cp, c->num_bases); } cp += itf8_put(cp, c->num_blocks); cp += itf8_put(cp, c->num_landmarks); for (i = 0; i < c->num_landmarks; i++) cp += itf8_put(cp, c->landmark[i]); if (CRAM_MAJOR_VERS(fd->version) >= 3) { c->crc32 = crc32(0L, (uc *)buf, cp-buf); cp[0] = c->crc32 & 0xff; cp[1] = (c->crc32 >> 8) & 0xff; cp[2] = (c->crc32 >> 16) & 0xff; cp[3] = (c->crc32 >> 24) & 0xff; cp += 4; } if (cp-buf != hwrite(fd->fp, buf, cp-buf)) { if (buf != buf_a) free(buf); return -1; } if (buf != buf_a) free(buf); return 0; } // common component shared by cram_flush_container{,_mt} static int cram_flush_container2(cram_fd *fd, cram_container *c) { int i, j; //fprintf(stderr, "Writing container %d, sum %u\n", c->record_counter, sum); /* Write the container struct itself */ if (0 != cram_write_container(fd, c)) return -1; /* And the compression header */ if (0 != cram_write_block(fd, c->comp_hdr_block)) return -1; /* Followed by the slice blocks */ for (i = 0; i < c->curr_slice; i++) { cram_slice *s = c->slices[i]; if (0 != cram_write_block(fd, s->hdr_block)) return -1; for (j = 0; j < s->hdr->num_blocks; j++) { if (0 != cram_write_block(fd, s->block[j])) return -1; } } return hflush(fd->fp) == 0 ? 0 : -1; } /* * Flushes a completely or partially full container to disk, writing * container structure, header and blocks. This also calls the encoder * functions. * * Returns 0 on success * -1 on failure */ int cram_flush_container(cram_fd *fd, cram_container *c) { /* Encode the container blocks and generate compression header */ if (0 != cram_encode_container(fd, c)) return -1; return cram_flush_container2(fd, c); } typedef struct { cram_fd *fd; cram_container *c; } cram_job; void *cram_flush_thread(void *arg) { cram_job *j = (cram_job *)arg; /* Encode the container blocks and generate compression header */ if (0 != cram_encode_container(j->fd, j->c)) { fprintf(stderr, "cram_encode_container failed\n"); return NULL; } return arg; } static int cram_flush_result(cram_fd *fd) { int i, ret = 0; t_pool_result *r; while ((r = t_pool_next_result(fd->rqueue))) { cram_job *j = (cram_job *)r->data; cram_container *c; if (!j) { t_pool_delete_result(r, 0); return -1; } fd = j->fd; c = j->c; if (0 != cram_flush_container2(fd, c)) return -1; /* Free the container */ for (i = 0; i < c->max_slice; i++) { cram_free_slice(c->slices[i]); c->slices[i] = NULL; } c->slice = NULL; c->curr_slice = 0; cram_free_container(c); ret |= hflush(fd->fp) == 0 ? 0 : -1; t_pool_delete_result(r, 1); } return ret; } int cram_flush_container_mt(cram_fd *fd, cram_container *c) { cram_job *j; if (!fd->pool) return cram_flush_container(fd, c); if (!(j = malloc(sizeof(*j)))) return -1; j->fd = fd; j->c = c; t_pool_dispatch(fd->pool, fd->rqueue, cram_flush_thread, j); return cram_flush_result(fd); } /* ---------------------------------------------------------------------- * Compression headers; the first part of the container */ /* * Creates a new blank container compression header * * Returns header ptr on success * NULL on failure */ cram_block_compression_hdr *cram_new_compression_header(void) { cram_block_compression_hdr *hdr = calloc(1, sizeof(*hdr)); if (!hdr) return NULL; if (!(hdr->TD_blk = cram_new_block(CORE, 0))) { free(hdr); return NULL; } if (!(hdr->TD_hash = kh_init(m_s2i))) { cram_free_block(hdr->TD_blk); free(hdr); return NULL; } if (!(hdr->TD_keys = string_pool_create(8192))) { kh_destroy(m_s2i, hdr->TD_hash); cram_free_block(hdr->TD_blk); free(hdr); return NULL; } return hdr; } void cram_free_compression_header(cram_block_compression_hdr *hdr) { int i; if (hdr->landmark) free(hdr->landmark); if (hdr->preservation_map) kh_destroy(map, hdr->preservation_map); for (i = 0; i < CRAM_MAP_HASH; i++) { cram_map *m, *m2; for (m = hdr->rec_encoding_map[i]; m; m = m2) { m2 = m->next; if (m->codec) m->codec->free(m->codec); free(m); } } for (i = 0; i < CRAM_MAP_HASH; i++) { cram_map *m, *m2; for (m = hdr->tag_encoding_map[i]; m; m = m2) { m2 = m->next; if (m->codec) m->codec->free(m->codec); free(m); } } for (i = 0; i < DS_END; i++) { if (hdr->codecs[i]) hdr->codecs[i]->free(hdr->codecs[i]); } if (hdr->TL) free(hdr->TL); if (hdr->TD_blk) cram_free_block(hdr->TD_blk); if (hdr->TD_hash) kh_destroy(m_s2i, hdr->TD_hash); if (hdr->TD_keys) string_pool_destroy(hdr->TD_keys); free(hdr); } /* ---------------------------------------------------------------------- * Slices and slice headers */ void cram_free_slice_header(cram_block_slice_hdr *hdr) { if (!hdr) return; if (hdr->block_content_ids) free(hdr->block_content_ids); free(hdr); return; } void cram_free_slice(cram_slice *s) { if (!s) return; if (s->hdr_block) cram_free_block(s->hdr_block); if (s->block) { int i; if (s->hdr) { for (i = 0; i < s->hdr->num_blocks; i++) { cram_free_block(s->block[i]); } } free(s->block); } if (s->block_by_id) free(s->block_by_id); if (s->hdr) cram_free_slice_header(s->hdr); if (s->seqs_blk) cram_free_block(s->seqs_blk); if (s->qual_blk) cram_free_block(s->qual_blk); if (s->name_blk) cram_free_block(s->name_blk); if (s->aux_blk) cram_free_block(s->aux_blk); if (s->aux_OQ_blk) cram_free_block(s->aux_OQ_blk); if (s->aux_BQ_blk) cram_free_block(s->aux_BQ_blk); if (s->aux_FZ_blk) cram_free_block(s->aux_FZ_blk); if (s->aux_oq_blk) cram_free_block(s->aux_oq_blk); if (s->aux_os_blk) cram_free_block(s->aux_os_blk); if (s->aux_oz_blk) cram_free_block(s->aux_oz_blk); if (s->base_blk) cram_free_block(s->base_blk); if (s->soft_blk) cram_free_block(s->soft_blk); if (s->cigar) free(s->cigar); if (s->crecs) free(s->crecs); if (s->features) free(s->features); if (s->TN) free(s->TN); if (s->pair_keys) string_pool_destroy(s->pair_keys); if (s->pair[0]) kh_destroy(m_s2i, s->pair[0]); if (s->pair[1]) kh_destroy(m_s2i, s->pair[1]); free(s); } /* * Creates a new empty slice in memory, for subsequent writing to * disk. * * Returns cram_slice ptr on success * NULL on failure */ cram_slice *cram_new_slice(enum cram_content_type type, int nrecs) { cram_slice *s = calloc(1, sizeof(*s)); if (!s) return NULL; if (!(s->hdr = (cram_block_slice_hdr *)calloc(1, sizeof(*s->hdr)))) goto err; s->hdr->content_type = type; s->hdr_block = NULL; s->block = NULL; s->block_by_id = NULL; s->last_apos = 0; if (!(s->crecs = malloc(nrecs * sizeof(cram_record)))) goto err; s->cigar = NULL; s->cigar_alloc = 0; s->ncigar = 0; if (!(s->seqs_blk = cram_new_block(EXTERNAL, 0))) goto err; if (!(s->qual_blk = cram_new_block(EXTERNAL, DS_QS))) goto err; if (!(s->name_blk = cram_new_block(EXTERNAL, DS_RN))) goto err; if (!(s->aux_blk = cram_new_block(EXTERNAL, DS_aux))) goto err; if (!(s->base_blk = cram_new_block(EXTERNAL, DS_IN))) goto err; if (!(s->soft_blk = cram_new_block(EXTERNAL, DS_SC))) goto err; s->features = NULL; s->nfeatures = s->afeatures = 0; #ifndef TN_external s->TN = NULL; s->nTN = s->aTN = 0; #endif // Volatile keys as we do realloc in dstring if (!(s->pair_keys = string_pool_create(8192))) goto err; if (!(s->pair[0] = kh_init(m_s2i))) goto err; if (!(s->pair[1] = kh_init(m_s2i))) goto err; #ifdef BA_external s->BA_len = 0; #endif return s; err: if (s) cram_free_slice(s); return NULL; } /* * Loads an entire slice. * FIXME: In 1.0 the native unit of slices within CRAM is broken * as slices contain references to objects in other slices. * To work around this while keeping the slice oriented outer loop * we read all slices and stitch them together into a fake large * slice instead. * * Returns cram_slice ptr on success * NULL on failure */ cram_slice *cram_read_slice(cram_fd *fd) { cram_block *b = cram_read_block(fd); cram_slice *s = calloc(1, sizeof(*s)); int i, n, max_id, min_id; if (!b || !s) goto err; s->hdr_block = b; switch (b->content_type) { case MAPPED_SLICE: case UNMAPPED_SLICE: if (!(s->hdr = cram_decode_slice_header(fd, b))) goto err; break; default: fprintf(stderr, "Unexpected block of type %s\n", cram_content_type2str(b->content_type)); goto err; } s->block = calloc(n = s->hdr->num_blocks, sizeof(*s->block)); if (!s->block) goto err; for (max_id = i = 0, min_id = INT_MAX; i < n; i++) { if (!(s->block[i] = cram_read_block(fd))) goto err; if (s->block[i]->content_type == EXTERNAL) { if (max_id < s->block[i]->content_id) max_id = s->block[i]->content_id; if (min_id > s->block[i]->content_id) min_id = s->block[i]->content_id; } } if (min_id >= 0 && max_id < 1024) { if (!(s->block_by_id = calloc(1024, sizeof(s->block[0])))) goto err; for (i = 0; i < n; i++) { if (s->block[i]->content_type != EXTERNAL) continue; s->block_by_id[s->block[i]->content_id] = s->block[i]; } } /* Initialise encoding/decoding tables */ s->cigar = NULL; s->cigar_alloc = 0; s->ncigar = 0; if (!(s->seqs_blk = cram_new_block(EXTERNAL, 0))) goto err; if (!(s->qual_blk = cram_new_block(EXTERNAL, DS_QS))) goto err; if (!(s->name_blk = cram_new_block(EXTERNAL, DS_RN))) goto err; if (!(s->aux_blk = cram_new_block(EXTERNAL, DS_aux))) goto err; if (!(s->base_blk = cram_new_block(EXTERNAL, DS_IN))) goto err; if (!(s->soft_blk = cram_new_block(EXTERNAL, DS_SC))) goto err; s->crecs = NULL; s->last_apos = s->hdr->ref_seq_start; return s; err: if (b) cram_free_block(b); if (s) { s->hdr_block = NULL; cram_free_slice(s); } return NULL; } /* ---------------------------------------------------------------------- * CRAM file definition (header) */ /* * Reads a CRAM file definition structure. * Returns file_def ptr on success * NULL on failure */ cram_file_def *cram_read_file_def(cram_fd *fd) { cram_file_def *def = malloc(sizeof(*def)); if (!def) return NULL; if (26 != hread(fd->fp, &def->magic[0], 26)) { free(def); return NULL; } if (memcmp(def->magic, "CRAM", 4) != 0) { free(def); return NULL; } if (def->major_version > 3) { fprintf(stderr, "CRAM version number mismatch\n" "Expected 1.x, 2.x or 3.x, got %d.%d\n", def->major_version, def->minor_version); free(def); return NULL; } fd->first_container += 26; fd->last_slice = 0; return def; } /* * Writes a cram_file_def structure to cram_fd. * Returns 0 on success * -1 on failure */ int cram_write_file_def(cram_fd *fd, cram_file_def *def) { return (hwrite(fd->fp, &def->magic[0], 26) == 26) ? 0 : -1; } void cram_free_file_def(cram_file_def *def) { if (def) free(def); } /* ---------------------------------------------------------------------- * SAM header I/O */ /* * Reads the SAM header from the first CRAM data block. * Also performs minimal parsing to extract read-group * and sample information. * Returns SAM hdr ptr on success * NULL on failure */ SAM_hdr *cram_read_SAM_hdr(cram_fd *fd) { int32_t header_len; char *header; SAM_hdr *hdr; /* 1.1 onwards stores the header in the first block of a container */ if (CRAM_MAJOR_VERS(fd->version) == 1) { /* Length */ if (-1 == int32_decode(fd, &header_len)) return NULL; /* Alloc and read */ if (NULL == (header = malloc(header_len+1))) return NULL; *header = 0; if (header_len != hread(fd->fp, header, header_len)) return NULL; fd->first_container += 4 + header_len; } else { cram_container *c = cram_read_container(fd); cram_block *b; int i, len; if (!c) return NULL; if (c->num_blocks < 1) { cram_free_container(c); return NULL; } if (!(b = cram_read_block(fd))) { cram_free_container(c); return NULL; } cram_uncompress_block(b); len = b->comp_size + 2 + 4*(CRAM_MAJOR_VERS(fd->version) >= 3) + itf8_size(b->content_id) + itf8_size(b->uncomp_size) + itf8_size(b->comp_size); /* Extract header from 1st block */ if (-1 == int32_get(b, &header_len) || b->uncomp_size - 4 < header_len) { cram_free_container(c); cram_free_block(b); return NULL; } if (NULL == (header = malloc(header_len+1))) { cram_free_container(c); cram_free_block(b); return NULL; } memcpy(header, BLOCK_END(b), header_len); header[header_len]='\0'; cram_free_block(b); /* Consume any remaining blocks */ for (i = 1; i < c->num_blocks; i++) { if (!(b = cram_read_block(fd))) { cram_free_container(c); return NULL; } len += b->comp_size + 2 + 4*(CRAM_MAJOR_VERS(fd->version) >= 3) + itf8_size(b->content_id) + itf8_size(b->uncomp_size) + itf8_size(b->comp_size); cram_free_block(b); } if (c->length && c->length > len) { // Consume padding char *pads = malloc(c->length - len); if (!pads) { cram_free_container(c); return NULL; } if (c->length - len != hread(fd->fp, pads, c->length - len)) { cram_free_container(c); return NULL; } free(pads); } cram_free_container(c); } /* Parse */ hdr = sam_hdr_parse_(header, header_len); free(header); return hdr; } /* * Converts 'in' to a full pathname to store in out. * Out must be at least PATH_MAX bytes long. */ static void full_path(char *out, char *in) { if (*in == '/') { strncpy(out, in, PATH_MAX); out[PATH_MAX-1] = 0; } else { int len; // unable to get dir or out+in is too long if (!getcwd(out, PATH_MAX) || (len = strlen(out))+1+strlen(in) >= PATH_MAX) { strncpy(out, in, PATH_MAX); out[PATH_MAX-1] = 0; return; } sprintf(out+len, "/%.*s", PATH_MAX - len, in); // FIXME: cope with `pwd`/../../../foo.fa ? } } /* * Writes a CRAM SAM header. * Returns 0 on success * -1 on failure */ int cram_write_SAM_hdr(cram_fd *fd, SAM_hdr *hdr) { int header_len; int blank_block = (CRAM_MAJOR_VERS(fd->version) >= 3); /* Write CRAM MAGIC if not yet written. */ if (fd->file_def->major_version == 0) { fd->file_def->major_version = CRAM_MAJOR_VERS(fd->version); fd->file_def->minor_version = CRAM_MINOR_VERS(fd->version); if (0 != cram_write_file_def(fd, fd->file_def)) return -1; } /* 1.0 requires and UNKNOWN read-group */ if (CRAM_MAJOR_VERS(fd->version) == 1) { if (!sam_hdr_find_rg(hdr, "UNKNOWN")) if (sam_hdr_add(hdr, "RG", "ID", "UNKNOWN", "SM", "UNKNOWN", NULL)) return -1; } /* Fix M5 strings */ if (fd->refs && !fd->no_ref) { int i; for (i = 0; i < hdr->nref; i++) { SAM_hdr_type *ty; char *ref; if (!(ty = sam_hdr_find(hdr, "SQ", "SN", hdr->ref[i].name))) return -1; if (!sam_hdr_find_key(hdr, ty, "M5", NULL)) { char unsigned buf[16], buf2[33]; int j, rlen; MD5_CTX md5; if (!fd->refs || !fd->refs->ref_id || !fd->refs->ref_id[i]) { return -1; } rlen = fd->refs->ref_id[i]->length; MD5_Init(&md5); ref = cram_get_ref(fd, i, 1, rlen); if (NULL == ref) return -1; rlen = fd->refs->ref_id[i]->length; /* In case it just loaded */ MD5_Update(&md5, ref, rlen); MD5_Final(buf, &md5); cram_ref_decr(fd->refs, i); for (j = 0; j < 16; j++) { buf2[j*2+0] = "0123456789abcdef"[buf[j]>>4]; buf2[j*2+1] = "0123456789abcdef"[buf[j]&15]; } buf2[32] = 0; if (sam_hdr_update(hdr, ty, "M5", buf2, NULL)) return -1; } if (fd->ref_fn) { char ref_fn[PATH_MAX]; full_path(ref_fn, fd->ref_fn); if (sam_hdr_update(hdr, ty, "UR", ref_fn, NULL)) return -1; } } } if (sam_hdr_rebuild(hdr)) return -1; /* Length */ header_len = sam_hdr_length(hdr); if (CRAM_MAJOR_VERS(fd->version) == 1) { if (-1 == int32_encode(fd, header_len)) return -1; /* Text data */ if (header_len != hwrite(fd->fp, sam_hdr_str(hdr), header_len)) return -1; } else { /* Create block(s) inside a container */ cram_block *b = cram_new_block(FILE_HEADER, 0); cram_container *c = cram_new_container(0, 0); int padded_length; char *pads; int is_cram_3 = (CRAM_MAJOR_VERS(fd->version) >= 3); if (!b || !c) { if (b) cram_free_block(b); if (c) cram_free_container(c); return -1; } int32_put(b, header_len); BLOCK_APPEND(b, sam_hdr_str(hdr), header_len); BLOCK_UPLEN(b); // Compress header block if V3.0 and above if (CRAM_MAJOR_VERS(fd->version) >= 3 && fd->level > 0) { int method = 1<<GZIP; if (fd->use_bz2) method |= 1<<BZIP2; if (fd->use_lzma) method |= 1<<LZMA; cram_compress_block(fd, b, NULL, method, fd->level); } if (blank_block) { c->length = b->comp_size + 2 + 4*is_cram_3 + itf8_size(b->content_id) + itf8_size(b->uncomp_size) + itf8_size(b->comp_size); c->num_blocks = 2; c->num_landmarks = 2; if (!(c->landmark = malloc(2*sizeof(*c->landmark)))) { cram_free_block(b); cram_free_container(c); return -1; } c->landmark[0] = 0; c->landmark[1] = c->length; // Plus extra storage for uncompressed secondary blank block padded_length = MIN(c->length*.5, 10000); c->length += padded_length + 2 + 4*is_cram_3 + itf8_size(b->content_id) + itf8_size(padded_length)*2; } else { // Pad the block instead. c->num_blocks = 1; c->num_landmarks = 1; if (!(c->landmark = malloc(sizeof(*c->landmark)))) return -1; c->landmark[0] = 0; padded_length = MAX(c->length*1.5, 10000) - c->length; c->length = b->comp_size + padded_length + 2 + 4*is_cram_3 + itf8_size(b->content_id) + itf8_size(b->uncomp_size) + itf8_size(b->comp_size); if (NULL == (pads = calloc(1, padded_length))) { cram_free_block(b); cram_free_container(c); return -1; } BLOCK_APPEND(b, pads, padded_length); BLOCK_UPLEN(b); free(pads); } if (-1 == cram_write_container(fd, c)) { cram_free_block(b); cram_free_container(c); return -1; } if (-1 == cram_write_block(fd, b)) { cram_free_block(b); cram_free_container(c); return -1; } if (blank_block) { BLOCK_RESIZE(b, padded_length); memset(BLOCK_DATA(b), 0, padded_length); BLOCK_SIZE(b) = padded_length; BLOCK_UPLEN(b); b->method = RAW; if (-1 == cram_write_block(fd, b)) { cram_free_block(b); cram_free_container(c); return -1; } } cram_free_block(b); cram_free_container(c); } if (-1 == refs_from_header(fd->refs, fd, fd->header)) return -1; if (-1 == refs2id(fd->refs, fd->header)) return -1; if (0 != hflush(fd->fp)) return -1; RP("=== Finishing saving header ===\n"); return 0; } /* ---------------------------------------------------------------------- * The top-level cram opening, closing and option handling */ /* * Initialises the lookup tables. These could be global statics, but they're * clumsy to setup in a multi-threaded environment unless we generate * verbatim code and include that. */ static void cram_init_tables(cram_fd *fd) { int i; memset(fd->L1, 4, 256); fd->L1['A'] = 0; fd->L1['a'] = 0; fd->L1['C'] = 1; fd->L1['c'] = 1; fd->L1['G'] = 2; fd->L1['g'] = 2; fd->L1['T'] = 3; fd->L1['t'] = 3; memset(fd->L2, 5, 256); fd->L2['A'] = 0; fd->L2['a'] = 0; fd->L2['C'] = 1; fd->L2['c'] = 1; fd->L2['G'] = 2; fd->L2['g'] = 2; fd->L2['T'] = 3; fd->L2['t'] = 3; fd->L2['N'] = 4; fd->L2['n'] = 4; if (CRAM_MAJOR_VERS(fd->version) == 1) { for (i = 0; i < 0x200; i++) { int f = 0; if (i & CRAM_FPAIRED) f |= BAM_FPAIRED; if (i & CRAM_FPROPER_PAIR) f |= BAM_FPROPER_PAIR; if (i & CRAM_FUNMAP) f |= BAM_FUNMAP; if (i & CRAM_FREVERSE) f |= BAM_FREVERSE; if (i & CRAM_FREAD1) f |= BAM_FREAD1; if (i & CRAM_FREAD2) f |= BAM_FREAD2; if (i & CRAM_FSECONDARY) f |= BAM_FSECONDARY; if (i & CRAM_FQCFAIL) f |= BAM_FQCFAIL; if (i & CRAM_FDUP) f |= BAM_FDUP; fd->bam_flag_swap[i] = f; } for (i = 0; i < 0x1000; i++) { int g = 0; if (i & BAM_FPAIRED) g |= CRAM_FPAIRED; if (i & BAM_FPROPER_PAIR) g |= CRAM_FPROPER_PAIR; if (i & BAM_FUNMAP) g |= CRAM_FUNMAP; if (i & BAM_FREVERSE) g |= CRAM_FREVERSE; if (i & BAM_FREAD1) g |= CRAM_FREAD1; if (i & BAM_FREAD2) g |= CRAM_FREAD2; if (i & BAM_FSECONDARY) g |= CRAM_FSECONDARY; if (i & BAM_FQCFAIL) g |= CRAM_FQCFAIL; if (i & BAM_FDUP) g |= CRAM_FDUP; fd->cram_flag_swap[i] = g; } } else { /* NOP */ for (i = 0; i < 0x1000; i++) fd->bam_flag_swap[i] = i; for (i = 0; i < 0x1000; i++) fd->cram_flag_swap[i] = i; } memset(fd->cram_sub_matrix, 4, 32*32); for (i = 0; i < 32; i++) { fd->cram_sub_matrix[i]['A'&0x1f]=0; fd->cram_sub_matrix[i]['C'&0x1f]=1; fd->cram_sub_matrix[i]['G'&0x1f]=2; fd->cram_sub_matrix[i]['T'&0x1f]=3; fd->cram_sub_matrix[i]['N'&0x1f]=4; } for (i = 0; i < 20; i+=4) { int j; for (j = 0; j < 20; j++) { fd->cram_sub_matrix["ACGTN"[i>>2]&0x1f][j]=3; fd->cram_sub_matrix["ACGTN"[i>>2]&0x1f][j]=3; fd->cram_sub_matrix["ACGTN"[i>>2]&0x1f][j]=3; fd->cram_sub_matrix["ACGTN"[i>>2]&0x1f][j]=3; } fd->cram_sub_matrix["ACGTN"[i>>2]&0x1f][CRAM_SUBST_MATRIX[i+0]&0x1f]=0; fd->cram_sub_matrix["ACGTN"[i>>2]&0x1f][CRAM_SUBST_MATRIX[i+1]&0x1f]=1; fd->cram_sub_matrix["ACGTN"[i>>2]&0x1f][CRAM_SUBST_MATRIX[i+2]&0x1f]=2; fd->cram_sub_matrix["ACGTN"[i>>2]&0x1f][CRAM_SUBST_MATRIX[i+3]&0x1f]=3; } } // Default version numbers for CRAM static int major_version = 2; static int minor_version = 1; /* * Opens a CRAM file for read (mode "rb") or write ("wb"). * The filename may be "-" to indicate stdin or stdout. * * Returns file handle on success * NULL on failure. */ cram_fd *cram_open(const char *filename, const char *mode) { hFILE *fp; cram_fd *fd; char fmode[3]= { mode[0], '\0', '\0' }; if (strlen(mode) > 1 && (mode[1] == 'b' || mode[1] == 'c')) { fmode[1] = 'b'; } fp = hopen(filename, fmode); if (!fp) return NULL; fd = cram_dopen(fp, filename, mode); if (!fd) hclose_abruptly(fp); return fd; } /* Opens an existing stream for reading or writing. * * Returns file handle on success; * NULL on failure. */ cram_fd *cram_dopen(hFILE *fp, const char *filename, const char *mode) { int i; char *cp; cram_fd *fd = calloc(1, sizeof(*fd)); if (!fd) return NULL; fd->level = 5; for (i = 0; mode[i]; i++) { if (mode[i] >= '0' && mode[i] <= '9') { fd->level = mode[i] - '0'; break; } } fd->fp = fp; fd->mode = *mode; fd->first_container = 0; if (fd->mode == 'r') { /* Reader */ if (!(fd->file_def = cram_read_file_def(fd))) goto err; fd->version = fd->file_def->major_version * 256 + fd->file_def->minor_version; if (!(fd->header = cram_read_SAM_hdr(fd))) goto err; } else { /* Writer */ cram_file_def *def = calloc(1, sizeof(*def)); if (!def) return NULL; fd->file_def = def; def->magic[0] = 'C'; def->magic[1] = 'R'; def->magic[2] = 'A'; def->magic[3] = 'M'; def->major_version = 0; // Indicator to write file def later. def->minor_version = 0; memset(def->file_id, 0, 20); strncpy(def->file_id, filename, 20); fd->version = major_version * 256 + minor_version; /* SAM header written later along with this file_def */ } cram_init_tables(fd); fd->prefix = strdup((cp = strrchr(filename, '/')) ? cp+1 : filename); if (!fd->prefix) goto err; fd->first_base = fd->last_base = -1; fd->record_counter = 0; fd->ctr = NULL; fd->refs = refs_create(); if (!fd->refs) goto err; fd->ref_id = -2; fd->ref = NULL; fd->decode_md = 0; fd->verbose = 0; fd->seqs_per_slice = SEQS_PER_SLICE; fd->slices_per_container = SLICE_PER_CNT; fd->embed_ref = 0; fd->no_ref = 0; fd->ignore_md5 = 0; fd->use_bz2 = 0; fd->use_rans = (CRAM_MAJOR_VERS(fd->version) >= 3); fd->use_lzma = 0; fd->multi_seq = -1; fd->unsorted = 0; fd->shared_ref = 0; fd->index = NULL; fd->own_pool = 0; fd->pool = NULL; fd->rqueue = NULL; fd->job_pending = NULL; fd->ooc = 0; fd->required_fields = INT_MAX; for (i = 0; i < DS_END; i++) fd->m[i] = cram_new_metrics(); fd->range.refid = -2; // no ref. fd->eof = 1; // See samtools issue #150 fd->ref_fn = NULL; fd->bl = NULL; /* Initialise dummy refs from the @SQ headers */ if (-1 == refs_from_header(fd->refs, fd, fd->header)) goto err; return fd; err: if (fd) free(fd); return NULL; } /* * Seek within a CRAM file. * * Returns 0 on success * -1 on failure */ int cram_seek(cram_fd *fd, off_t offset, int whence) { char buf[65536]; fd->ooc = 0; if (hseek(fd->fp, offset, whence) >= 0) return 0; if (!(whence == SEEK_CUR && offset >= 0)) return -1; /* Couldn't fseek, but we're in SEEK_CUR mode so read instead */ while (offset > 0) { int len = MIN(65536, offset); if (len != hread(fd->fp, buf, len)) return -1; offset -= len; } return 0; } /* * Flushes a CRAM file. * Useful for when writing to stdout without wishing to close the stream. * * Returns 0 on success * -1 on failure */ int cram_flush(cram_fd *fd) { if (!fd) return -1; if (fd->mode == 'w' && fd->ctr) { if(fd->ctr->slice) fd->ctr->curr_slice++; if (-1 == cram_flush_container_mt(fd, fd->ctr)) return -1; } return 0; } /* * Closes a CRAM file. * Returns 0 on success * -1 on failure */ int cram_close(cram_fd *fd) { spare_bams *bl, *next; int i; if (!fd) return -1; if (fd->mode == 'w' && fd->ctr) { if(fd->ctr->slice) fd->ctr->curr_slice++; if (-1 == cram_flush_container_mt(fd, fd->ctr)) return -1; } if (fd->pool) { t_pool_flush(fd->pool); if (0 != cram_flush_result(fd)) return -1; pthread_mutex_destroy(&fd->metrics_lock); pthread_mutex_destroy(&fd->ref_lock); pthread_mutex_destroy(&fd->bam_list_lock); fd->ctr = NULL; // prevent double freeing //fprintf(stderr, "CRAM: destroy queue %p\n", fd->rqueue); t_results_queue_destroy(fd->rqueue); } if (fd->mode == 'w') { /* Write EOF block */ if (CRAM_MAJOR_VERS(fd->version) == 3) { if (38 != hwrite(fd->fp, "\x0f\x00\x00\x00\xff\xff\xff\xff" // Cont HDR "\x0f\xe0\x45\x4f\x46\x00\x00\x00" // Cont HDR "\x00\x01\x00" // Cont HDR "\x05\xbd\xd9\x4f" // CRC32 "\x00\x01\x00\x06\x06" // Comp.HDR blk "\x01\x00\x01\x00\x01\x00" // Comp.HDR blk "\xee\x63\x01\x4b", // CRC32 38)) return -1; } else { if (30 != hwrite(fd->fp, "\x0b\x00\x00\x00\xff\xff\xff\xff" "\x0f\xe0\x45\x4f\x46\x00\x00\x00" "\x00\x01\x00\x00\x01\x00\x06\x06" "\x01\x00\x01\x00\x01\x00", 30)) return -1; } } for (bl = fd->bl; bl; bl = next) { int i, max_rec = fd->seqs_per_slice * fd->slices_per_container; next = bl->next; for (i = 0; i < max_rec; i++) { if (bl->bams[i]) bam_free(bl->bams[i]); } free(bl->bams); free(bl); } if (hclose(fd->fp) != 0) return -1; if (fd->file_def) cram_free_file_def(fd->file_def); if (fd->header) sam_hdr_free(fd->header); free(fd->prefix); if (fd->ctr) cram_free_container(fd->ctr); if (fd->refs) refs_free(fd->refs); if (fd->ref_free) free(fd->ref_free); for (i = 0; i < DS_END; i++) if (fd->m[i]) free(fd->m[i]); if (fd->index) cram_index_free(fd); if (fd->own_pool && fd->pool) t_pool_destroy(fd->pool, 0); free(fd); return 0; } /* * Returns 1 if we hit an EOF while reading. */ int cram_eof(cram_fd *fd) { return fd->eof; } /* * Sets options on the cram_fd. See CRAM_OPT_* definitions in cram_structs.h. * Use this immediately after opening. * * Returns 0 on success * -1 on failure */ int cram_set_option(cram_fd *fd, enum cram_option opt, ...) { int r; va_list args; va_start(args, opt); r = cram_set_voption(fd, opt, args); va_end(args); return r; } /* * Sets options on the cram_fd. See CRAM_OPT_* definitions in cram_structs.h. * Use this immediately after opening. * * Returns 0 on success * -1 on failure */ int cram_set_voption(cram_fd *fd, enum cram_option opt, va_list args) { refs_t *refs; if (!fd) return -1; switch (opt) { case CRAM_OPT_DECODE_MD: fd->decode_md = va_arg(args, int); break; case CRAM_OPT_PREFIX: if (fd->prefix) free(fd->prefix); if (!(fd->prefix = strdup(va_arg(args, char *)))) return -1; break; case CRAM_OPT_VERBOSITY: fd->verbose = va_arg(args, int); break; case CRAM_OPT_SEQS_PER_SLICE: fd->seqs_per_slice = va_arg(args, int); break; case CRAM_OPT_SLICES_PER_CONTAINER: fd->slices_per_container = va_arg(args, int); break; case CRAM_OPT_EMBED_REF: fd->embed_ref = va_arg(args, int); break; case CRAM_OPT_NO_REF: fd->no_ref = va_arg(args, int); break; case CRAM_OPT_IGNORE_MD5: fd->ignore_md5 = va_arg(args, int); break; case CRAM_OPT_USE_BZIP2: fd->use_bz2 = va_arg(args, int); break; case CRAM_OPT_USE_RANS: fd->use_rans = va_arg(args, int); break; case CRAM_OPT_USE_LZMA: fd->use_lzma = va_arg(args, int); break; case CRAM_OPT_SHARED_REF: fd->shared_ref = 1; refs = va_arg(args, refs_t *); if (refs != fd->refs) { if (fd->refs) refs_free(fd->refs); fd->refs = refs; fd->refs->count++; } break; case CRAM_OPT_RANGE: fd->range = *va_arg(args, cram_range *); return cram_seek_to_refpos(fd, &fd->range); case CRAM_OPT_REFERENCE: return cram_load_reference(fd, va_arg(args, char *)); case CRAM_OPT_VERSION: { int major, minor; char *s = va_arg(args, char *); if (2 != sscanf(s, "%d.%d", &major, &minor)) { fprintf(stderr, "Malformed version string %s\n", s); return -1; } if (!((major == 1 && minor == 0) || (major == 2 && (minor == 0 || minor == 1)) || (major == 3 && minor == 0))) { fprintf(stderr, "Unknown version string; " "use 1.0, 2.0, 2.1 or 3.0\n"); return -1; } fd->version = major*256 + minor; if (CRAM_MAJOR_VERS(fd->version) >= 3) fd->use_rans = 1; break; } case CRAM_OPT_MULTI_SEQ_PER_SLICE: fd->multi_seq = va_arg(args, int); break; case CRAM_OPT_NTHREADS: { int nthreads = va_arg(args, int); if (nthreads > 1) { if (!(fd->pool = t_pool_init(nthreads*2, nthreads))) return -1; fd->rqueue = t_results_queue_init(); pthread_mutex_init(&fd->metrics_lock, NULL); pthread_mutex_init(&fd->ref_lock, NULL); pthread_mutex_init(&fd->bam_list_lock, NULL); fd->shared_ref = 1; fd->own_pool = 1; } break; } case CRAM_OPT_THREAD_POOL: fd->pool = va_arg(args, t_pool *); if (fd->pool) { fd->rqueue = t_results_queue_init(); pthread_mutex_init(&fd->metrics_lock, NULL); pthread_mutex_init(&fd->ref_lock, NULL); pthread_mutex_init(&fd->bam_list_lock, NULL); } fd->shared_ref = 1; // Needed to avoid clobbering ref between threads fd->own_pool = 0; //fd->qsize = 1; //fd->decoded = calloc(fd->qsize, sizeof(cram_container *)); //t_pool_dispatch(fd->pool, cram_decoder_thread, fd); break; case CRAM_OPT_REQUIRED_FIELDS: fd->required_fields = va_arg(args, int); break; default: fprintf(stderr, "Unknown CRAM option code %d\n", opt); return -1; } return 0; }