comparison ezBAMQC/src/htslib/bgzf.c @ 0:dfa3745e5fd8

Uploaded
author youngkim
date Thu, 24 Mar 2016 17:12:52 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:dfa3745e5fd8
1 /* The MIT License
2
3 Copyright (c) 2008 Broad Institute / Massachusetts Institute of Technology
4 2011, 2012 Attractive Chaos <attractor@live.co.uk>
5 Copyright (C) 2009, 2013, 2014 Genome Research Ltd
6
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23 THE SOFTWARE.
24 */
25
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <unistd.h>
31 #include <assert.h>
32 #include <pthread.h>
33 #include <sys/types.h>
34 #include <inttypes.h>
35
36 #include "htslib/hts.h"
37 #include "htslib/bgzf.h"
38 #include "htslib/hfile.h"
39
40 #define BGZF_CACHE
41 #define BGZF_MT
42
43 #define BLOCK_HEADER_LENGTH 18
44 #define BLOCK_FOOTER_LENGTH 8
45
46
47 /* BGZF/GZIP header (speciallized from RFC 1952; little endian):
48 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
49 | 31|139| 8| 4| 0| 0|255| 6| 66| 67| 2|BLK_LEN|
50 +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
51 BGZF extension:
52 ^ ^ ^ ^
53 | | | |
54 FLG.EXTRA XLEN B C
55
56 BGZF format is compatible with GZIP. It limits the size of each compressed
57 block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
58 records the size.
59
60 */
61 static const uint8_t g_magic[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
62
63 #ifdef BGZF_CACHE
64 typedef struct {
65 int size;
66 uint8_t *block;
67 int64_t end_offset;
68 } cache_t;
69 #include "htslib/khash.h"
70 KHASH_MAP_INIT_INT64(cache, cache_t)
71 #endif
72
73 typedef struct
74 {
75 uint64_t uaddr; // offset w.r.t. uncompressed data
76 uint64_t caddr; // offset w.r.t. compressed data
77 }
78 bgzidx1_t;
79
80 struct __bgzidx_t
81 {
82 int noffs, moffs; // the size of the index, n:used, m:allocated
83 bgzidx1_t *offs; // offsets
84 uint64_t ublock_addr; // offset of the current block (uncompressed data)
85 };
86
87 void bgzf_index_destroy(BGZF *fp);
88 int bgzf_index_add_block(BGZF *fp);
89
90 static inline void packInt16(uint8_t *buffer, uint16_t value)
91 {
92 buffer[0] = value;
93 buffer[1] = value >> 8;
94 }
95
96 static inline int unpackInt16(const uint8_t *buffer)
97 {
98 return buffer[0] | buffer[1] << 8;
99 }
100
101 static inline void packInt32(uint8_t *buffer, uint32_t value)
102 {
103 buffer[0] = value;
104 buffer[1] = value >> 8;
105 buffer[2] = value >> 16;
106 buffer[3] = value >> 24;
107 }
108
109 static BGZF *bgzf_read_init(hFILE *hfpr)
110 {
111 BGZF *fp;
112 uint8_t magic[18];
113 ssize_t n = hpeek(hfpr, magic, 18);
114 if (n < 0) return NULL;
115
116 fp = (BGZF*)calloc(1, sizeof(BGZF));
117 if (fp == NULL) return NULL;
118
119 fp->is_write = 0;
120 fp->is_compressed = (n==2 && magic[0]==0x1f && magic[1]==0x8b);
121 fp->uncompressed_block = malloc(BGZF_MAX_BLOCK_SIZE);
122 fp->compressed_block = malloc(BGZF_MAX_BLOCK_SIZE);
123 fp->is_compressed = (n==18 && magic[0]==0x1f && magic[1]==0x8b) ? 1 : 0;
124 fp->is_gzip = ( !fp->is_compressed || ((magic[3]&4) && memcmp(&magic[12], "BC\2\0",4)==0) ) ? 0 : 1;
125 #ifdef BGZF_CACHE
126 fp->cache = kh_init(cache);
127 #endif
128 return fp;
129 }
130
131 // get the compress level from the mode string: compress_level==-1 for the default level, -2 plain uncompressed
132 static int mode2level(const char *__restrict mode)
133 {
134 int i, compress_level = -1;
135 for (i = 0; mode[i]; ++i)
136 if (mode[i] >= '0' && mode[i] <= '9') break;
137 if (mode[i]) compress_level = (int)mode[i] - '0';
138 if (strchr(mode, 'u')) compress_level = -2;
139 return compress_level;
140 }
141 static BGZF *bgzf_write_init(const char *mode)
142 {
143 BGZF *fp;
144 fp = (BGZF*)calloc(1, sizeof(BGZF));
145 fp->is_write = 1;
146 int compress_level = mode2level(mode);
147 if ( compress_level==-2 )
148 {
149 fp->is_compressed = 0;
150 return fp;
151 }
152 fp->is_compressed = 1;
153 fp->uncompressed_block = malloc(BGZF_MAX_BLOCK_SIZE);
154 fp->compressed_block = malloc(BGZF_MAX_BLOCK_SIZE);
155 fp->compress_level = compress_level < 0? Z_DEFAULT_COMPRESSION : compress_level; // Z_DEFAULT_COMPRESSION==-1
156 if (fp->compress_level > 9) fp->compress_level = Z_DEFAULT_COMPRESSION;
157 if ( strchr(mode,'g') )
158 {
159 // gzip output
160 fp->is_gzip = 1;
161 fp->gz_stream = (z_stream*)calloc(1,sizeof(z_stream));
162 fp->gz_stream->zalloc = NULL;
163 fp->gz_stream->zfree = NULL;
164 if ( deflateInit2(fp->gz_stream, fp->compress_level, Z_DEFLATED, 15|16, 8, Z_DEFAULT_STRATEGY)!=Z_OK ) return NULL;
165 }
166 return fp;
167 }
168
169 BGZF *bgzf_open(const char *path, const char *mode)
170 {
171 BGZF *fp = 0;
172 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
173 if (strchr(mode, 'r')) {
174 hFILE *fpr;
175 if ((fpr = hopen(path, mode)) == 0) return 0;
176 fp = bgzf_read_init(fpr);
177 if (fp == 0) { hclose_abruptly(fpr); return NULL; }
178 fp->fp = fpr;
179 } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
180 hFILE *fpw;
181 if ((fpw = hopen(path, mode)) == 0) return 0;
182 fp = bgzf_write_init(mode);
183 fp->fp = fpw;
184 }
185 else { errno = EINVAL; return 0; }
186
187 fp->is_be = ed_is_big();
188 return fp;
189 }
190
191 BGZF *bgzf_dopen(int fd, const char *mode)
192 {
193 BGZF *fp = 0;
194 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
195 if (strchr(mode, 'r')) {
196 hFILE *fpr;
197 if ((fpr = hdopen(fd, mode)) == 0) return 0;
198 fp = bgzf_read_init(fpr);
199 if (fp == 0) { hclose_abruptly(fpr); return NULL; } // FIXME this closes fd
200 fp->fp = fpr;
201 } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
202 hFILE *fpw;
203 if ((fpw = hdopen(fd, mode)) == 0) return 0;
204 fp = bgzf_write_init(mode);
205 fp->fp = fpw;
206 }
207 else { errno = EINVAL; return 0; }
208
209 fp->is_be = ed_is_big();
210 return fp;
211 }
212
213 BGZF *bgzf_hopen(hFILE *hfp, const char *mode)
214 {
215 BGZF *fp = NULL;
216 assert(compressBound(BGZF_BLOCK_SIZE) < BGZF_MAX_BLOCK_SIZE);
217 if (strchr(mode, 'r')) {
218 fp = bgzf_read_init(hfp);
219 if (fp == NULL) return NULL;
220 } else if (strchr(mode, 'w') || strchr(mode, 'a')) {
221 fp = bgzf_write_init(mode);
222 }
223 else { errno = EINVAL; return 0; }
224
225 fp->fp = hfp;
226 fp->is_be = ed_is_big();
227 return fp;
228 }
229
230 static int bgzf_compress(void *_dst, int *dlen, void *src, int slen, int level)
231 {
232 uint32_t crc;
233 z_stream zs;
234 uint8_t *dst = (uint8_t*)_dst;
235
236 // compress the body
237 zs.zalloc = NULL; zs.zfree = NULL;
238 zs.next_in = (Bytef*)src;
239 zs.avail_in = slen;
240 zs.next_out = dst + BLOCK_HEADER_LENGTH;
241 zs.avail_out = *dlen - BLOCK_HEADER_LENGTH - BLOCK_FOOTER_LENGTH;
242 if (deflateInit2(&zs, level, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY) != Z_OK) return -1; // -15 to disable zlib header/footer
243 if (deflate(&zs, Z_FINISH) != Z_STREAM_END) return -1;
244 if (deflateEnd(&zs) != Z_OK) return -1;
245 *dlen = zs.total_out + BLOCK_HEADER_LENGTH + BLOCK_FOOTER_LENGTH;
246 // write the header
247 memcpy(dst, g_magic, BLOCK_HEADER_LENGTH); // the last two bytes are a place holder for the length of the block
248 packInt16(&dst[16], *dlen - 1); // write the compressed length; -1 to fit 2 bytes
249 // write the footer
250 crc = crc32(crc32(0L, NULL, 0L), (Bytef*)src, slen);
251 packInt32((uint8_t*)&dst[*dlen - 8], crc);
252 packInt32((uint8_t*)&dst[*dlen - 4], slen);
253 return 0;
254 }
255
256 static int bgzf_gzip_compress(BGZF *fp, void *_dst, int *dlen, void *src, int slen, int level)
257 {
258 uint8_t *dst = (uint8_t*)_dst;
259 z_stream *zs = fp->gz_stream;
260 int flush = slen ? Z_NO_FLUSH : Z_FINISH;
261 zs->next_in = (Bytef*)src;
262 zs->avail_in = slen;
263 zs->next_out = dst;
264 zs->avail_out = *dlen;
265 if ( deflate(zs, flush) == Z_STREAM_ERROR ) return -1;
266 *dlen = *dlen - zs->avail_out;
267 return 0;
268 }
269
270 // Deflate the block in fp->uncompressed_block into fp->compressed_block. Also adds an extra field that stores the compressed block length.
271 static int deflate_block(BGZF *fp, int block_length)
272 {
273 int comp_size = BGZF_MAX_BLOCK_SIZE;
274 int ret;
275 if ( !fp->is_gzip )
276 ret = bgzf_compress(fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
277 else
278 ret = bgzf_gzip_compress(fp, fp->compressed_block, &comp_size, fp->uncompressed_block, block_length, fp->compress_level);
279
280 if ( ret != 0 )
281 {
282 fp->errcode |= BGZF_ERR_ZLIB;
283 return -1;
284 }
285 fp->block_offset = 0;
286 return comp_size;
287 }
288
289 // Inflate the block in fp->compressed_block into fp->uncompressed_block
290 static int inflate_block(BGZF* fp, int block_length)
291 {
292 z_stream zs;
293 zs.zalloc = NULL;
294 zs.zfree = NULL;
295 zs.next_in = (Bytef*)fp->compressed_block + 18;
296 zs.avail_in = block_length - 16;
297 zs.next_out = (Bytef*)fp->uncompressed_block;
298 zs.avail_out = BGZF_MAX_BLOCK_SIZE;
299
300 if (inflateInit2(&zs, -15) != Z_OK) {
301 fp->errcode |= BGZF_ERR_ZLIB;
302 return -1;
303 }
304 if (inflate(&zs, Z_FINISH) != Z_STREAM_END) {
305 inflateEnd(&zs);
306 fp->errcode |= BGZF_ERR_ZLIB;
307 return -1;
308 }
309 if (inflateEnd(&zs) != Z_OK) {
310 fp->errcode |= BGZF_ERR_ZLIB;
311 return -1;
312 }
313 return zs.total_out;
314 }
315
316 static int inflate_gzip_block(BGZF *fp, int cached)
317 {
318 int ret = Z_OK;
319 do
320 {
321 if ( !cached && fp->gz_stream->avail_out!=0 )
322 {
323 fp->gz_stream->avail_in = hread(fp->fp, fp->compressed_block, BGZF_BLOCK_SIZE);
324 if ( fp->gz_stream->avail_in<=0 ) return fp->gz_stream->avail_in;
325 if ( fp->gz_stream->avail_in==0 ) break;
326 fp->gz_stream->next_in = fp->compressed_block;
327 }
328 else cached = 0;
329 do
330 {
331 fp->gz_stream->next_out = (Bytef*)fp->uncompressed_block + fp->block_offset;
332 fp->gz_stream->avail_out = BGZF_MAX_BLOCK_SIZE - fp->block_offset;
333 ret = inflate(fp->gz_stream, Z_NO_FLUSH);
334 if ( ret==Z_BUF_ERROR ) continue; // non-critical error
335 if ( ret<0 ) return -1;
336 unsigned int have = BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
337 if ( have ) return have;
338 }
339 while ( fp->gz_stream->avail_out == 0 );
340 }
341 while (ret != Z_STREAM_END);
342 return BGZF_MAX_BLOCK_SIZE - fp->gz_stream->avail_out;
343 }
344
345 // Returns: 0 on success (BGZF header); -1 on non-BGZF GZIP header; -2 on error
346 static int check_header(const uint8_t *header)
347 {
348 if ( header[0] != 31 || header[1] != 139 || header[2] != 8 ) return -2;
349 return ((header[3] & 4) != 0
350 && unpackInt16((uint8_t*)&header[10]) == 6
351 && header[12] == 'B' && header[13] == 'C'
352 && unpackInt16((uint8_t*)&header[14]) == 2) ? 0 : -1;
353 }
354
355 #ifdef BGZF_CACHE
356 static void free_cache(BGZF *fp)
357 {
358 khint_t k;
359 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
360 if (fp->is_write) return;
361 for (k = kh_begin(h); k < kh_end(h); ++k)
362 if (kh_exist(h, k)) free(kh_val(h, k).block);
363 kh_destroy(cache, h);
364 }
365
366 static int load_block_from_cache(BGZF *fp, int64_t block_address)
367 {
368 khint_t k;
369 cache_t *p;
370 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
371 k = kh_get(cache, h, block_address);
372 if (k == kh_end(h)) return 0;
373 p = &kh_val(h, k);
374 if (fp->block_length != 0) fp->block_offset = 0;
375 fp->block_address = block_address;
376 fp->block_length = p->size;
377 memcpy(fp->uncompressed_block, p->block, BGZF_MAX_BLOCK_SIZE);
378 if ( hseek(fp->fp, p->end_offset, SEEK_SET) < 0 )
379 {
380 // todo: move the error up
381 fprintf(stderr,"Could not hseek to %"PRId64"\n", p->end_offset);
382 exit(1);
383 }
384 return p->size;
385 }
386
387 static void cache_block(BGZF *fp, int size)
388 {
389 int ret;
390 khint_t k;
391 cache_t *p;
392 khash_t(cache) *h = (khash_t(cache)*)fp->cache;
393 if (BGZF_MAX_BLOCK_SIZE >= fp->cache_size) return;
394 if ((kh_size(h) + 1) * BGZF_MAX_BLOCK_SIZE > (uint32_t)fp->cache_size) {
395 /* A better way would be to remove the oldest block in the
396 * cache, but here we remove a random one for simplicity. This
397 * should not have a big impact on performance. */
398 for (k = kh_begin(h); k < kh_end(h); ++k)
399 if (kh_exist(h, k)) break;
400 if (k < kh_end(h)) {
401 free(kh_val(h, k).block);
402 kh_del(cache, h, k);
403 }
404 }
405 k = kh_put(cache, h, fp->block_address, &ret);
406 if (ret == 0) return; // if this happens, a bug!
407 p = &kh_val(h, k);
408 p->size = fp->block_length;
409 p->end_offset = fp->block_address + size;
410 p->block = (uint8_t*)malloc(BGZF_MAX_BLOCK_SIZE);
411 memcpy(kh_val(h, k).block, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
412 }
413 #else
414 static void free_cache(BGZF *fp) {}
415 static int load_block_from_cache(BGZF *fp, int64_t block_address) {return 0;}
416 static void cache_block(BGZF *fp, int size) {}
417 #endif
418
419 int bgzf_read_block(BGZF *fp)
420 {
421 uint8_t header[BLOCK_HEADER_LENGTH], *compressed_block;
422 int count, size = 0, block_length, remaining;
423
424 // Reading an uncompressed file
425 if ( !fp->is_compressed )
426 {
427 count = hread(fp->fp, fp->uncompressed_block, BGZF_MAX_BLOCK_SIZE);
428 if ( count==0 )
429 {
430 fp->block_length = 0;
431 return 0;
432 }
433 if (fp->block_length != 0) fp->block_offset = 0;
434 fp->block_address += count;
435 fp->block_length = count;
436 return 0;
437 }
438
439 // Reading compressed file
440 int64_t block_address;
441 block_address = htell(fp->fp);
442 if ( fp->is_gzip && fp->gz_stream ) // is this is a initialized gzip stream?
443 {
444 count = inflate_gzip_block(fp, 0);
445 if ( count<0 )
446 {
447 fp->errcode |= BGZF_ERR_ZLIB;
448 return -1;
449 }
450 fp->block_length = count;
451 fp->block_address = block_address;
452 return 0;
453 }
454 if (fp->cache_size && load_block_from_cache(fp, block_address)) return 0;
455 count = hread(fp->fp, header, sizeof(header));
456 if (count == 0) { // no data read
457 fp->block_length = 0;
458 return 0;
459 }
460 int ret;
461 if ( count != sizeof(header) || (ret=check_header(header))==-2 )
462 {
463 fp->errcode |= BGZF_ERR_HEADER;
464 return -1;
465 }
466 if ( ret==-1 )
467 {
468 // GZIP, not BGZF
469 uint8_t *cblock = (uint8_t*)fp->compressed_block;
470 memcpy(cblock, header, sizeof(header));
471 count = hread(fp->fp, cblock+sizeof(header), BGZF_BLOCK_SIZE - sizeof(header)) + sizeof(header);
472 int nskip = 10;
473
474 // Check optional fields to skip: FLG.FNAME,FLG.FCOMMENT,FLG.FHCRC,FLG.FEXTRA
475 // Note: Some of these fields are untested, I did not have appropriate data available
476 if ( header[3] & 0x4 ) // FLG.FEXTRA
477 {
478 nskip += unpackInt16(&cblock[nskip]) + 2;
479 }
480 if ( header[3] & 0x8 ) // FLG.FNAME
481 {
482 while ( nskip<BGZF_BLOCK_SIZE && cblock[nskip] ) nskip++;
483 if ( nskip==BGZF_BLOCK_SIZE )
484 {
485 fp->errcode |= BGZF_ERR_HEADER;
486 return -1;
487 }
488 nskip++;
489 }
490 if ( header[3] & 0x10 ) // FLG.FCOMMENT
491 {
492 while ( nskip<BGZF_BLOCK_SIZE && cblock[nskip] ) nskip++;
493 if ( nskip==BGZF_BLOCK_SIZE )
494 {
495 fp->errcode |= BGZF_ERR_HEADER;
496 return -1;
497 }
498 nskip++;
499 }
500 if ( header[3] & 0x2 ) nskip += 2; // FLG.FHCRC
501
502 fp->is_gzip = 1;
503 fp->gz_stream = (z_stream*) calloc(1,sizeof(z_stream));
504 int ret = inflateInit2(fp->gz_stream, -15);
505 if (ret != Z_OK)
506 {
507 fp->errcode |= BGZF_ERR_ZLIB;
508 return -1;
509 }
510 fp->gz_stream->avail_in = count - nskip;
511 fp->gz_stream->next_in = cblock + nskip;
512 count = inflate_gzip_block(fp, 1);
513 if ( count<0 )
514 {
515 fp->errcode |= BGZF_ERR_ZLIB;
516 return -1;
517 }
518 fp->block_length = count;
519 fp->block_address = block_address;
520 if ( fp->idx_build_otf ) return -1; // cannot build index for gzip
521 return 0;
522 }
523 size = count;
524 block_length = unpackInt16((uint8_t*)&header[16]) + 1; // +1 because when writing this number, we used "-1"
525 compressed_block = (uint8_t*)fp->compressed_block;
526 memcpy(compressed_block, header, BLOCK_HEADER_LENGTH);
527 remaining = block_length - BLOCK_HEADER_LENGTH;
528 count = hread(fp->fp, &compressed_block[BLOCK_HEADER_LENGTH], remaining);
529 if (count != remaining) {
530 fp->errcode |= BGZF_ERR_IO;
531 return -1;
532 }
533 size += count;
534 if ((count = inflate_block(fp, block_length)) < 0) return -1;
535 if (fp->block_length != 0) fp->block_offset = 0; // Do not reset offset if this read follows a seek.
536 fp->block_address = block_address;
537 fp->block_length = count;
538 if ( fp->idx_build_otf )
539 {
540 bgzf_index_add_block(fp);
541 fp->idx->ublock_addr += count;
542 }
543 cache_block(fp, size);
544 return 0;
545 }
546
547 ssize_t bgzf_read(BGZF *fp, void *data, size_t length)
548 {
549 ssize_t bytes_read = 0;
550 uint8_t *output = (uint8_t*)data;
551 if (length <= 0) return 0;
552 assert(fp->is_write == 0);
553 while (bytes_read < length) {
554 int copy_length, available = fp->block_length - fp->block_offset;
555 uint8_t *buffer;
556 if (available <= 0) {
557 if (bgzf_read_block(fp) != 0) return -1;
558 available = fp->block_length - fp->block_offset;
559 if (available <= 0) break;
560 }
561 copy_length = length - bytes_read < available? length - bytes_read : available;
562 buffer = (uint8_t*)fp->uncompressed_block;
563 memcpy(output, buffer + fp->block_offset, copy_length);
564 fp->block_offset += copy_length;
565 output += copy_length;
566 bytes_read += copy_length;
567 }
568 if (fp->block_offset == fp->block_length) {
569 fp->block_address = htell(fp->fp);
570 fp->block_offset = fp->block_length = 0;
571 }
572 fp->uncompressed_address += bytes_read;
573 return bytes_read;
574 }
575
576 ssize_t bgzf_raw_read(BGZF *fp, void *data, size_t length)
577 {
578 return hread(fp->fp, data, length);
579 }
580
581 #ifdef BGZF_MT
582
583 typedef struct {
584 struct bgzf_mtaux_t *mt;
585 void *buf;
586 int i, errcode, toproc, compress_level;
587 } worker_t;
588
589 typedef struct bgzf_mtaux_t {
590 int n_threads, n_blks, curr, done;
591 volatile int proc_cnt;
592 void **blk;
593 int *len;
594 worker_t *w;
595 pthread_t *tid;
596 pthread_mutex_t lock;
597 pthread_cond_t cv;
598 } mtaux_t;
599
600 static int worker_aux(worker_t *w)
601 {
602 int i, stop = 0;
603 // wait for condition: to process or all done
604 pthread_mutex_lock(&w->mt->lock);
605 while (!w->toproc && !w->mt->done)
606 pthread_cond_wait(&w->mt->cv, &w->mt->lock);
607 if (w->mt->done) stop = 1;
608 w->toproc = 0;
609 pthread_mutex_unlock(&w->mt->lock);
610 if (stop) return 1; // to quit the thread
611 w->errcode = 0;
612 for (i = w->i; i < w->mt->curr; i += w->mt->n_threads) {
613 int clen = BGZF_MAX_BLOCK_SIZE;
614 if (bgzf_compress(w->buf, &clen, w->mt->blk[i], w->mt->len[i], w->compress_level) != 0)
615 w->errcode |= BGZF_ERR_ZLIB;
616 memcpy(w->mt->blk[i], w->buf, clen);
617 w->mt->len[i] = clen;
618 }
619 __sync_fetch_and_add(&w->mt->proc_cnt, 1);
620 return 0;
621 }
622
623 static void *mt_worker(void *data)
624 {
625 while (worker_aux((worker_t*)data) == 0);
626 return 0;
627 }
628
629 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
630 {
631 int i;
632 mtaux_t *mt;
633 pthread_attr_t attr;
634 if (!fp->is_write || fp->mt || n_threads <= 1) return -1;
635 mt = (mtaux_t*)calloc(1, sizeof(mtaux_t));
636 mt->n_threads = n_threads;
637 mt->n_blks = n_threads * n_sub_blks;
638 mt->len = (int*)calloc(mt->n_blks, sizeof(int));
639 mt->blk = (void**)calloc(mt->n_blks, sizeof(void*));
640 for (i = 0; i < mt->n_blks; ++i)
641 mt->blk[i] = malloc(BGZF_MAX_BLOCK_SIZE);
642 mt->tid = (pthread_t*)calloc(mt->n_threads, sizeof(pthread_t)); // tid[0] is not used, as the worker 0 is launched by the master
643 mt->w = (worker_t*)calloc(mt->n_threads, sizeof(worker_t));
644 for (i = 0; i < mt->n_threads; ++i) {
645 mt->w[i].i = i;
646 mt->w[i].mt = mt;
647 mt->w[i].compress_level = fp->compress_level;
648 mt->w[i].buf = malloc(BGZF_MAX_BLOCK_SIZE);
649 }
650 pthread_attr_init(&attr);
651 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
652 pthread_mutex_init(&mt->lock, 0);
653 pthread_cond_init(&mt->cv, 0);
654 for (i = 1; i < mt->n_threads; ++i) // worker 0 is effectively launched by the master thread
655 pthread_create(&mt->tid[i], &attr, mt_worker, &mt->w[i]);
656 fp->mt = mt;
657 return 0;
658 }
659
660 static void mt_destroy(mtaux_t *mt)
661 {
662 int i;
663 // signal all workers to quit
664 pthread_mutex_lock(&mt->lock);
665 mt->done = 1; mt->proc_cnt = 0;
666 pthread_cond_broadcast(&mt->cv);
667 pthread_mutex_unlock(&mt->lock);
668 for (i = 1; i < mt->n_threads; ++i) pthread_join(mt->tid[i], 0); // worker 0 is effectively launched by the master thread
669 // free other data allocated on heap
670 for (i = 0; i < mt->n_blks; ++i) free(mt->blk[i]);
671 for (i = 0; i < mt->n_threads; ++i) free(mt->w[i].buf);
672 free(mt->blk); free(mt->len); free(mt->w); free(mt->tid);
673 pthread_cond_destroy(&mt->cv);
674 pthread_mutex_destroy(&mt->lock);
675 free(mt);
676 }
677
678 static void mt_queue(BGZF *fp)
679 {
680 mtaux_t *mt = fp->mt;
681 assert(mt->curr < mt->n_blks); // guaranteed by the caller
682 memcpy(mt->blk[mt->curr], fp->uncompressed_block, fp->block_offset);
683 mt->len[mt->curr] = fp->block_offset;
684 fp->block_offset = 0;
685 ++mt->curr;
686 }
687
688 static int mt_flush_queue(BGZF *fp)
689 {
690 int i;
691 mtaux_t *mt = fp->mt;
692 // signal all the workers to compress
693 pthread_mutex_lock(&mt->lock);
694 for (i = 0; i < mt->n_threads; ++i) mt->w[i].toproc = 1;
695 mt->proc_cnt = 0;
696 pthread_cond_broadcast(&mt->cv);
697 pthread_mutex_unlock(&mt->lock);
698 // worker 0 is doing things here
699 worker_aux(&mt->w[0]);
700 // wait for all the threads to complete
701 while (mt->proc_cnt < mt->n_threads);
702 // dump data to disk
703 for (i = 0; i < mt->n_threads; ++i) fp->errcode |= mt->w[i].errcode;
704 for (i = 0; i < mt->curr; ++i)
705 if (hwrite(fp->fp, mt->blk[i], mt->len[i]) != mt->len[i]) {
706 fp->errcode |= BGZF_ERR_IO;
707 break;
708 }
709 mt->curr = 0;
710 return (fp->errcode == 0)? 0 : -1;
711 }
712
713 static int lazy_flush(BGZF *fp)
714 {
715 if (fp->mt) {
716 if (fp->block_offset) mt_queue(fp);
717 return (fp->mt->curr < fp->mt->n_blks)? 0 : mt_flush_queue(fp);
718 }
719 else return bgzf_flush(fp);
720 }
721
722 #else // ~ #ifdef BGZF_MT
723
724 int bgzf_mt(BGZF *fp, int n_threads, int n_sub_blks)
725 {
726 return 0;
727 }
728
729 static inline int lazy_flush(BGZF *fp)
730 {
731 return bgzf_flush(fp);
732 }
733
734 #endif // ~ #ifdef BGZF_MT
735
736 int bgzf_flush(BGZF *fp)
737 {
738 if (!fp->is_write) return 0;
739 #ifdef BGZF_MT
740 if (fp->mt) {
741 if (fp->block_offset) mt_queue(fp); // guaranteed that assertion does not fail
742 return mt_flush_queue(fp);
743 }
744 #endif
745 while (fp->block_offset > 0) {
746 if ( fp->idx_build_otf )
747 {
748 bgzf_index_add_block(fp);
749 fp->idx->ublock_addr += fp->block_offset;
750 }
751 int block_length = deflate_block(fp, fp->block_offset);
752 if (block_length < 0) return -1;
753 if (hwrite(fp->fp, fp->compressed_block, block_length) != block_length) {
754 fp->errcode |= BGZF_ERR_IO; // possibly truncated file
755 return -1;
756 }
757 fp->block_address += block_length;
758 }
759 return 0;
760 }
761
762 int bgzf_flush_try(BGZF *fp, ssize_t size)
763 {
764 if (fp->block_offset + size > BGZF_BLOCK_SIZE) return lazy_flush(fp);
765 return 0;
766 }
767
768 ssize_t bgzf_write(BGZF *fp, const void *data, size_t length)
769 {
770 if ( !fp->is_compressed )
771 return hwrite(fp->fp, data, length);
772
773 const uint8_t *input = (const uint8_t*)data;
774 ssize_t remaining = length;
775 assert(fp->is_write);
776 while (remaining > 0) {
777 uint8_t* buffer = (uint8_t*)fp->uncompressed_block;
778 int copy_length = BGZF_BLOCK_SIZE - fp->block_offset;
779 if (copy_length > remaining) copy_length = remaining;
780 memcpy(buffer + fp->block_offset, input, copy_length);
781 fp->block_offset += copy_length;
782 input += copy_length;
783 remaining -= copy_length;
784 if (fp->block_offset == BGZF_BLOCK_SIZE) {
785 if (lazy_flush(fp) != 0) return -1;
786 }
787 }
788 return length - remaining;
789 }
790
791 ssize_t bgzf_raw_write(BGZF *fp, const void *data, size_t length)
792 {
793 return hwrite(fp->fp, data, length);
794 }
795
796 int bgzf_close(BGZF* fp)
797 {
798 int ret, block_length;
799 if (fp == 0) return -1;
800 if (fp->is_write && fp->is_compressed) {
801 if (bgzf_flush(fp) != 0) return -1;
802 fp->compress_level = -1;
803 block_length = deflate_block(fp, 0); // write an empty block
804 if (hwrite(fp->fp, fp->compressed_block, block_length) < 0
805 || hflush(fp->fp) != 0) {
806 fp->errcode |= BGZF_ERR_IO;
807 return -1;
808 }
809 #ifdef BGZF_MT
810 if (fp->mt) mt_destroy(fp->mt);
811 #endif
812 }
813 if ( fp->is_gzip )
814 {
815 if (!fp->is_write) (void)inflateEnd(fp->gz_stream);
816 else (void)deflateEnd(fp->gz_stream);
817 free(fp->gz_stream);
818 }
819 ret = hclose(fp->fp);
820 if (ret != 0) return -1;
821 bgzf_index_destroy(fp);
822 free(fp->uncompressed_block);
823 free(fp->compressed_block);
824 free_cache(fp);
825 free(fp);
826 return 0;
827 }
828
829 void bgzf_set_cache_size(BGZF *fp, int cache_size)
830 {
831 if (fp) fp->cache_size = cache_size;
832 }
833
834 int bgzf_check_EOF(BGZF *fp)
835 {
836 uint8_t buf[28];
837 off_t offset = htell(fp->fp);
838 if (hseek(fp->fp, -28, SEEK_END) < 0) {
839 if (errno == ESPIPE) { hclearerr(fp->fp); return 2; }
840 else return -1;
841 }
842 if ( hread(fp->fp, buf, 28) != 28 ) return -1;
843 if ( hseek(fp->fp, offset, SEEK_SET) < 0 ) return -1;
844 return (memcmp("\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0", buf, 28) == 0)? 1 : 0;
845 }
846
847 int64_t bgzf_seek(BGZF* fp, int64_t pos, int where)
848 {
849 int block_offset;
850 int64_t block_address;
851
852 if (fp->is_write || where != SEEK_SET) {
853 fp->errcode |= BGZF_ERR_MISUSE;
854 return -1;
855 }
856 block_offset = pos & 0xFFFF;
857 block_address = pos >> 16;
858 if (hseek(fp->fp, block_address, SEEK_SET) < 0) {
859 fp->errcode |= BGZF_ERR_IO;
860 return -1;
861 }
862 fp->block_length = 0; // indicates current block has not been loaded
863 fp->block_address = block_address;
864 fp->block_offset = block_offset;
865 return 0;
866 }
867
868 int bgzf_is_bgzf(const char *fn)
869 {
870 uint8_t buf[16];
871 int n;
872 hFILE *fp;
873 if ((fp = hopen(fn, "r")) == 0) return 0;
874 n = hread(fp, buf, 16);
875 if ( hclose(fp) < 0 ) return -1;
876 if (n != 16) return 0;
877 return memcmp(g_magic, buf, 16) == 0? 1 : 0;
878 }
879
880 int bgzf_getc(BGZF *fp)
881 {
882 int c;
883 if (fp->block_offset >= fp->block_length) {
884 if (bgzf_read_block(fp) != 0) return -2; /* error */
885 if (fp->block_length == 0) return -1; /* end-of-file */
886 }
887 c = ((unsigned char*)fp->uncompressed_block)[fp->block_offset++];
888 if (fp->block_offset == fp->block_length) {
889 fp->block_address = htell(fp->fp);
890 fp->block_offset = 0;
891 fp->block_length = 0;
892 }
893 fp->uncompressed_address++;
894 return c;
895 }
896
897 #ifndef kroundup32
898 #define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
899 #endif
900
901 int bgzf_getline(BGZF *fp, int delim, kstring_t *str)
902 {
903 int l, state = 0;
904 unsigned char *buf = (unsigned char*)fp->uncompressed_block;
905 str->l = 0;
906 do {
907 if (fp->block_offset >= fp->block_length) {
908 if (bgzf_read_block(fp) != 0) { state = -2; break; }
909 if (fp->block_length == 0) { state = -1; break; }
910 }
911 for (l = fp->block_offset; l < fp->block_length && buf[l] != delim; ++l);
912 if (l < fp->block_length) state = 1;
913 l -= fp->block_offset;
914 if (str->l + l + 1 >= str->m) {
915 str->m = str->l + l + 2;
916 kroundup32(str->m);
917 str->s = (char*)realloc(str->s, str->m);
918 }
919 memcpy(str->s + str->l, buf + fp->block_offset, l);
920 str->l += l;
921 fp->block_offset += l + 1;
922 if (fp->block_offset >= fp->block_length) {
923 fp->block_address = htell(fp->fp);
924 fp->block_offset = 0;
925 fp->block_length = 0;
926 }
927 } while (state == 0);
928 if (str->l == 0 && state < 0) return state;
929 fp->uncompressed_address += str->l;
930 if ( delim=='\n' && str->l>0 && str->s[str->l-1]=='\r' ) str->l--;
931 str->s[str->l] = 0;
932 return str->l;
933 }
934
935 void bgzf_index_destroy(BGZF *fp)
936 {
937 if ( !fp->idx ) return;
938 free(fp->idx->offs);
939 free(fp->idx);
940 fp->idx = NULL;
941 fp->idx_build_otf = 0;
942 }
943
944 int bgzf_index_build_init(BGZF *fp)
945 {
946 bgzf_index_destroy(fp);
947 fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
948 if ( !fp->idx ) return -1;
949 fp->idx_build_otf = 1; // build index on the fly
950 return 0;
951 }
952
953 int bgzf_index_add_block(BGZF *fp)
954 {
955 fp->idx->noffs++;
956 if ( fp->idx->noffs > fp->idx->moffs )
957 {
958 fp->idx->moffs = fp->idx->noffs;
959 kroundup32(fp->idx->moffs);
960 fp->idx->offs = (bgzidx1_t*) realloc(fp->idx->offs, fp->idx->moffs*sizeof(bgzidx1_t));
961 if ( !fp->idx->offs ) return -1;
962 }
963 fp->idx->offs[ fp->idx->noffs-1 ].uaddr = fp->idx->ublock_addr;
964 fp->idx->offs[ fp->idx->noffs-1 ].caddr = fp->block_address;
965 return 0;
966 }
967
968 int bgzf_index_dump(BGZF *fp, const char *bname, const char *suffix)
969 {
970 if (bgzf_flush(fp) != 0) return -1;
971
972 assert(fp->idx);
973 char *tmp = NULL;
974 if ( suffix )
975 {
976 int blen = strlen(bname);
977 int slen = strlen(suffix);
978 tmp = (char*) malloc(blen + slen + 1);
979 if ( !tmp ) return -1;
980 memcpy(tmp,bname,blen);
981 memcpy(tmp+blen,suffix,slen+1);
982 }
983
984 FILE *idx = fopen(tmp?tmp:bname,"wb");
985 if ( tmp ) free(tmp);
986 if ( !idx ) return -1;
987
988 // Note that the index contains one extra record when indexing files opened
989 // for reading. The terminating record is not present when opened for writing.
990 // This is not a bug.
991
992 int i;
993 if ( fp->is_be )
994 {
995 uint64_t x = fp->idx->noffs - 1;
996 fwrite(ed_swap_8p(&x), 1, sizeof(x), idx);
997 for (i=1; i<fp->idx->noffs; i++)
998 {
999 x = fp->idx->offs[i].caddr; fwrite(ed_swap_8p(&x), 1, sizeof(x), idx);
1000 x = fp->idx->offs[i].uaddr; fwrite(ed_swap_8p(&x), 1, sizeof(x), idx);
1001 }
1002 }
1003 else
1004 {
1005 uint64_t x = fp->idx->noffs - 1;
1006 fwrite(&x, 1, sizeof(x), idx);
1007 for (i=1; i<fp->idx->noffs; i++)
1008 {
1009 fwrite(&fp->idx->offs[i].caddr, 1, sizeof(fp->idx->offs[i].caddr), idx);
1010 fwrite(&fp->idx->offs[i].uaddr, 1, sizeof(fp->idx->offs[i].uaddr), idx);
1011 }
1012 }
1013 fclose(idx);
1014 return 0;
1015 }
1016
1017
1018 int bgzf_index_load(BGZF *fp, const char *bname, const char *suffix)
1019 {
1020 char *tmp = NULL;
1021 if ( suffix )
1022 {
1023 int blen = strlen(bname);
1024 int slen = strlen(suffix);
1025 tmp = (char*) malloc(blen + slen + 1);
1026 if ( !tmp ) return -1;
1027 memcpy(tmp,bname,blen);
1028 memcpy(tmp+blen,suffix,slen+1);
1029 }
1030
1031 FILE *idx = fopen(tmp?tmp:bname,"rb");
1032 if ( tmp ) free(tmp);
1033 if ( !idx ) return -1;
1034
1035 fp->idx = (bgzidx_t*) calloc(1,sizeof(bgzidx_t));
1036 uint64_t x;
1037 if ( fread(&x, 1, sizeof(x), idx) != sizeof(x) ) return -1;
1038
1039 fp->idx->noffs = fp->idx->moffs = 1 + (fp->is_be ? ed_swap_8(x) : x);
1040 fp->idx->offs = (bgzidx1_t*) malloc(fp->idx->moffs*sizeof(bgzidx1_t));
1041 fp->idx->offs[0].caddr = fp->idx->offs[0].uaddr = 0;
1042
1043 int i;
1044 if ( fp->is_be )
1045 {
1046 int ret = 0;
1047 for (i=1; i<fp->idx->noffs; i++)
1048 {
1049 ret += fread(&x, 1, sizeof(x), idx); fp->idx->offs[i].caddr = ed_swap_8(x);
1050 ret += fread(&x, 1, sizeof(x), idx); fp->idx->offs[i].uaddr = ed_swap_8(x);
1051 }
1052 if ( ret != sizeof(x)*2*(fp->idx->noffs-1) ) return -1;
1053 }
1054 else
1055 {
1056 int ret = 0;
1057 for (i=1; i<fp->idx->noffs; i++)
1058 {
1059 ret += fread(&x, 1, sizeof(x), idx); fp->idx->offs[i].caddr = x;
1060 ret += fread(&x, 1, sizeof(x), idx); fp->idx->offs[i].uaddr = x;
1061 }
1062 if ( ret != sizeof(x)*2*(fp->idx->noffs-1) ) return -1;
1063 }
1064 fclose(idx);
1065 return 0;
1066
1067 }
1068
1069 int bgzf_useek(BGZF *fp, long uoffset, int where)
1070 {
1071 if ( !fp->is_compressed )
1072 {
1073 if (hseek(fp->fp, uoffset, SEEK_SET) < 0)
1074 {
1075 fp->errcode |= BGZF_ERR_IO;
1076 return -1;
1077 }
1078 fp->block_length = 0; // indicates current block has not been loaded
1079 fp->block_address = uoffset;
1080 fp->block_offset = 0;
1081 bgzf_read_block(fp);
1082 fp->uncompressed_address = uoffset;
1083 return 0;
1084 }
1085
1086 if ( !fp->idx )
1087 {
1088 fp->errcode |= BGZF_ERR_IO;
1089 return -1;
1090 }
1091
1092 // binary search
1093 int ilo = 0, ihi = fp->idx->noffs - 1;
1094 while ( ilo<=ihi )
1095 {
1096 int i = (ilo+ihi)*0.5;
1097 if ( uoffset < fp->idx->offs[i].uaddr ) ihi = i - 1;
1098 else if ( uoffset >= fp->idx->offs[i].uaddr ) ilo = i + 1;
1099 else break;
1100 }
1101 int i = ilo-1;
1102 if (hseek(fp->fp, fp->idx->offs[i].caddr, SEEK_SET) < 0)
1103 {
1104 fp->errcode |= BGZF_ERR_IO;
1105 return -1;
1106 }
1107 fp->block_length = 0; // indicates current block has not been loaded
1108 fp->block_address = fp->idx->offs[i].caddr;
1109 fp->block_offset = 0;
1110 if ( bgzf_read_block(fp) < 0 ) return -1;
1111 if ( uoffset - fp->idx->offs[i].uaddr > 0 )
1112 {
1113 fp->block_offset = uoffset - fp->idx->offs[i].uaddr;
1114 assert( fp->block_offset <= fp->block_length ); // todo: skipped, unindexed, blocks
1115 }
1116 fp->uncompressed_address = uoffset;
1117 return 0;
1118 }
1119
1120 long bgzf_utell(BGZF *fp)
1121 {
1122 return fp->uncompressed_address; // currently maintained only when reading
1123 }
1124