comparison ezBAMQC/src/htslib/synced_bcf_reader.c @ 0:dfa3745e5fd8

Uploaded
author youngkim
date Thu, 24 Mar 2016 17:12:52 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:dfa3745e5fd8
1 /* synced_bcf_reader.c -- stream through multiple VCF files.
2
3 Copyright (C) 2012-2014 Genome Research Ltd.
4
5 Author: Petr Danecek <pd3@sanger.ac.uk>
6
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13
14 The above copyright notice and this permission notice shall be included in
15 all copies or substantial portions of the Software.
16
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
20 THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 DEALINGS IN THE SOFTWARE. */
24
25 #include <stdio.h>
26 #include <unistd.h>
27 #include <string.h>
28 #include <limits.h>
29 #include <errno.h>
30 #include <ctype.h>
31 #include <sys/stat.h>
32 #include "htslib/synced_bcf_reader.h"
33 #include "htslib/kseq.h"
34 #include "htslib/khash_str2int.h"
35
36 #define MAX_CSI_COOR 0x7fffffff // maximum indexable coordinate of .csi
37
38 typedef struct
39 {
40 uint32_t start, end;
41 }
42 region1_t;
43
44 typedef struct _region_t
45 {
46 region1_t *regs;
47 int nregs, mregs, creg;
48 }
49 region_t;
50
51 static void _regions_add(bcf_sr_regions_t *reg, const char *chr, int start, int end);
52 static bcf_sr_regions_t *_regions_init_string(const char *str);
53 static int _regions_match_alleles(bcf_sr_regions_t *reg, int als_idx, bcf1_t *rec);
54
55 char *bcf_sr_strerror(int errnum)
56 {
57 switch (errnum)
58 {
59 case open_failed:
60 return strerror(errno); break;
61 case not_bgzf:
62 return "not compressed with bgzip"; break;
63 case idx_load_failed:
64 return "could not load index"; break;
65 case file_type_error:
66 return "unknown file type"; break;
67 case api_usage_error:
68 return "API usage error"; break;
69 case header_error:
70 return "could not parse header"; break;
71 default: return "";
72 }
73 }
74
75 static int *init_filters(bcf_hdr_t *hdr, const char *filters, int *nfilters)
76 {
77 kstring_t str = {0,0,0};
78 const char *tmp = filters, *prev = filters;
79 int nout = 0, *out = NULL;
80 while ( 1 )
81 {
82 if ( *tmp==',' || !*tmp )
83 {
84 out = (int*) realloc(out, (nout+1)*sizeof(int));
85 if ( tmp-prev==1 && *prev=='.' )
86 out[nout] = -1;
87 else
88 {
89 str.l = 0;
90 kputsn(prev, tmp-prev, &str);
91 out[nout] = bcf_hdr_id2int(hdr, BCF_DT_ID, str.s);
92 }
93 nout++;
94 if ( !*tmp ) break;
95 prev = tmp+1;
96 }
97 tmp++;
98 }
99 if ( str.m ) free(str.s);
100 *nfilters = nout;
101 return out;
102 }
103
104 int bcf_sr_set_regions(bcf_srs_t *readers, const char *regions, int is_file)
105 {
106 assert( !readers->regions );
107 if ( readers->nreaders )
108 {
109 fprintf(stderr,"[%s:%d %s] Error: bcf_sr_set_regions() must be called before bcf_sr_add_reader()\n", __FILE__,__LINE__,__FUNCTION__);
110 return -1;
111 }
112 readers->regions = bcf_sr_regions_init(regions,is_file,0,1,-2);
113 if ( !readers->regions ) return -1;
114 readers->explicit_regs = 1;
115 readers->require_index = 1;
116 return 0;
117 }
118 int bcf_sr_set_targets(bcf_srs_t *readers, const char *targets, int is_file, int alleles)
119 {
120 assert( !readers->targets );
121 if ( targets[0]=='^' )
122 {
123 readers->targets_exclude = 1;
124 targets++;
125 }
126 readers->targets = bcf_sr_regions_init(targets,is_file,0,1,-2);
127 if ( !readers->targets ) return -1;
128 readers->targets_als = alleles;
129 return 0;
130 }
131
132 int bcf_sr_add_reader(bcf_srs_t *files, const char *fname)
133 {
134 htsFile* file_ptr = hts_open(fname, "r");
135 if ( ! file_ptr ) {
136 files->errnum = open_failed;
137 return 0;
138 }
139
140 files->has_line = (int*) realloc(files->has_line, sizeof(int)*(files->nreaders+1));
141 files->has_line[files->nreaders] = 0;
142 files->readers = (bcf_sr_t*) realloc(files->readers, sizeof(bcf_sr_t)*(files->nreaders+1));
143 bcf_sr_t *reader = &files->readers[files->nreaders++];
144 memset(reader,0,sizeof(bcf_sr_t));
145
146 reader->file = file_ptr;
147
148 files->errnum = 0;
149
150 if ( files->require_index )
151 {
152 if ( reader->file->format.format==vcf )
153 {
154 if ( reader->file->format.compression!=bgzf )
155 {
156 files->errnum = not_bgzf;
157 return 0;
158 }
159
160 reader->tbx_idx = tbx_index_load(fname);
161 if ( !reader->tbx_idx )
162 {
163 files->errnum = idx_load_failed;
164 return 0;
165 }
166
167 reader->header = bcf_hdr_read(reader->file);
168 }
169 else if ( reader->file->format.format==bcf )
170 {
171 if ( reader->file->format.compression!=bgzf )
172 {
173 files->errnum = not_bgzf;
174 return 0;
175 }
176
177 reader->header = bcf_hdr_read(reader->file);
178
179 reader->bcf_idx = bcf_index_load(fname);
180 if ( !reader->bcf_idx )
181 {
182 files->errnum = idx_load_failed;
183 return 0;
184 }
185 }
186 else
187 {
188 files->errnum = file_type_error;
189 return 0;
190 }
191 }
192 else
193 {
194 if ( reader->file->format.format==bcf || reader->file->format.format==vcf )
195 {
196 reader->header = bcf_hdr_read(reader->file);
197 }
198 else
199 {
200 files->errnum = file_type_error;
201 return 0;
202 }
203 files->streaming = 1;
204 }
205 if ( files->streaming && files->nreaders>1 )
206 {
207 files->errnum = api_usage_error;
208 fprintf(stderr,"[%s:%d %s] Error: %d readers, yet require_index not set\n", __FILE__,__LINE__,__FUNCTION__,files->nreaders);
209 return 0;
210 }
211 if ( files->streaming && files->regions )
212 {
213 files->errnum = api_usage_error;
214 fprintf(stderr,"[%s:%d %s] Error: cannot tabix-jump in streaming mode\n", __FILE__,__LINE__,__FUNCTION__);
215 return 0;
216 }
217 if ( !reader->header )
218 {
219 files->errnum = header_error;
220 return 0;
221 }
222
223 reader->fname = fname;
224 if ( files->apply_filters )
225 reader->filter_ids = init_filters(reader->header, files->apply_filters, &reader->nfilter_ids);
226
227 // Update list of chromosomes
228 if ( !files->explicit_regs && !files->streaming )
229 {
230 int n,i;
231 const char **names = reader->tbx_idx ? tbx_seqnames(reader->tbx_idx, &n) : bcf_hdr_seqnames(reader->header, &n);
232 for (i=0; i<n; i++)
233 {
234 if ( !files->regions )
235 files->regions = _regions_init_string(names[i]);
236 else
237 _regions_add(files->regions, names[i], -1, -1);
238 }
239 free(names);
240 }
241
242 return 1;
243 }
244
245 bcf_srs_t *bcf_sr_init(void)
246 {
247 bcf_srs_t *files = (bcf_srs_t*) calloc(1,sizeof(bcf_srs_t));
248 return files;
249 }
250
251 static void bcf_sr_destroy1(bcf_sr_t *reader)
252 {
253 if ( reader->tbx_idx ) tbx_destroy(reader->tbx_idx);
254 if ( reader->bcf_idx ) hts_idx_destroy(reader->bcf_idx);
255 bcf_hdr_destroy(reader->header);
256 hts_close(reader->file);
257 if ( reader->itr ) tbx_itr_destroy(reader->itr);
258 int j;
259 for (j=0; j<reader->mbuffer; j++)
260 bcf_destroy1(reader->buffer[j]);
261 free(reader->buffer);
262 free(reader->samples);
263 free(reader->filter_ids);
264 }
265 void bcf_sr_destroy(bcf_srs_t *files)
266 {
267 int i;
268 for (i=0; i<files->nreaders; i++)
269 bcf_sr_destroy1(&files->readers[i]);
270 free(files->has_line);
271 free(files->readers);
272 for (i=0; i<files->n_smpl; i++) free(files->samples[i]);
273 free(files->samples);
274 if (files->targets) bcf_sr_regions_destroy(files->targets);
275 if (files->regions) bcf_sr_regions_destroy(files->regions);
276 if ( files->tmps.m ) free(files->tmps.s);
277 free(files);
278 }
279
280 void bcf_sr_remove_reader(bcf_srs_t *files, int i)
281 {
282 assert( !files->samples ); // not ready for this yet
283 bcf_sr_destroy1(&files->readers[i]);
284 if ( i+1 < files->nreaders )
285 {
286 memmove(&files->readers[i], &files->readers[i+1], (files->nreaders-i-1)*sizeof(bcf_sr_t));
287 memmove(&files->has_line[i], &files->has_line[i+1], (files->nreaders-i-1)*sizeof(int));
288 }
289 files->nreaders--;
290 }
291
292
293 /*
294 Removes duplicate records from the buffer. The meaning of "duplicate" is
295 controlled by the $collapse variable, which can cause that from multiple
296 <indel|snp|any> lines only the first is considered and the rest is ignored.
297 The removal is done by setting the redundant lines' positions to -1 and
298 moving these lines at the end of the buffer.
299 */
300 static void collapse_buffer(bcf_srs_t *files, bcf_sr_t *reader)
301 {
302 int irec,jrec, has_snp=0, has_indel=0, has_any=0;
303 for (irec=1; irec<=reader->nbuffer; irec++)
304 {
305 bcf1_t *line = reader->buffer[irec];
306 if ( line->pos != reader->buffer[1]->pos ) break;
307 if ( files->collapse&COLLAPSE_ANY )
308 {
309 if ( !has_any ) has_any = 1;
310 else line->pos = -1;
311 }
312 int line_type = bcf_get_variant_types(line);
313 if ( files->collapse&COLLAPSE_SNPS && line_type&(VCF_SNP|VCF_MNP) )
314 {
315 if ( !has_snp ) has_snp = 1;
316 else line->pos = -1;
317 }
318 if ( files->collapse&COLLAPSE_INDELS && line_type&VCF_INDEL )
319 {
320 if ( !has_indel ) has_indel = 1;
321 else line->pos = -1;
322 }
323 }
324 bcf1_t *tmp;
325 irec = jrec = 1;
326 while ( irec<=reader->nbuffer && jrec<=reader->nbuffer )
327 {
328 if ( reader->buffer[irec]->pos != -1 ) { irec++; continue; }
329 if ( jrec<=irec ) jrec = irec+1;
330 while ( jrec<=reader->nbuffer && reader->buffer[jrec]->pos==-1 ) jrec++;
331 if ( jrec<=reader->nbuffer )
332 {
333 tmp = reader->buffer[irec]; reader->buffer[irec] = reader->buffer[jrec]; reader->buffer[jrec] = tmp;
334 }
335 }
336 reader->nbuffer = irec - 1;
337 }
338
339 void debug_buffer(FILE *fp, bcf_sr_t *reader)
340 {
341 int j;
342 for (j=0; j<=reader->nbuffer; j++)
343 {
344 bcf1_t *line = reader->buffer[j];
345 fprintf(fp,"%s%s\t%s:%d\t%s ", reader->fname,j==0?"*":"",reader->header->id[BCF_DT_CTG][line->rid].key,line->pos+1,line->n_allele?line->d.allele[0]:"");
346 int k;
347 for (k=1; k<line->n_allele; k++) fprintf(fp," %s", line->d.allele[k]);
348 fprintf(fp,"\n");
349 }
350 }
351
352 void debug_buffers(FILE *fp, bcf_srs_t *files)
353 {
354 int i;
355 for (i=0; i<files->nreaders; i++)
356 {
357 fprintf(fp, "has_line: %d\t%s\n", bcf_sr_has_line(files,i),files->readers[i].fname);
358 debug_buffer(fp, &files->readers[i]);
359 }
360 fprintf(fp,"\n");
361 }
362
363 static inline int has_filter(bcf_sr_t *reader, bcf1_t *line)
364 {
365 int i, j;
366 if ( !line->d.n_flt )
367 {
368 for (j=0; j<reader->nfilter_ids; j++)
369 if ( reader->filter_ids[j]<0 ) return 1;
370 return 0;
371 }
372 for (i=0; i<line->d.n_flt; i++)
373 {
374 for (j=0; j<reader->nfilter_ids; j++)
375 if ( line->d.flt[i]==reader->filter_ids[j] ) return 1;
376 }
377 return 0;
378 }
379
380 static int _reader_seek(bcf_sr_t *reader, const char *seq, int start, int end)
381 {
382 if ( end>=MAX_CSI_COOR )
383 {
384 fprintf(stderr,"The coordinate is out of csi index limit: %d\n", end+1);
385 exit(1);
386 }
387 if ( reader->itr )
388 {
389 hts_itr_destroy(reader->itr);
390 reader->itr = NULL;
391 }
392 reader->nbuffer = 0;
393 if ( reader->tbx_idx )
394 {
395 int tid = tbx_name2id(reader->tbx_idx, seq);
396 if ( tid==-1 ) return -1; // the sequence not present in this file
397 reader->itr = tbx_itr_queryi(reader->tbx_idx,tid,start,end+1);
398 }
399 else
400 {
401 int tid = bcf_hdr_name2id(reader->header, seq);
402 if ( tid==-1 ) return -1; // the sequence not present in this file
403 reader->itr = bcf_itr_queryi(reader->bcf_idx,tid,start,end+1);
404 }
405 assert(reader->itr);
406 return 0;
407 }
408
409 /*
410 * _readers_next_region() - jumps to next region if necessary
411 * Returns 0 on success or -1 when there are no more regions left
412 */
413 static int _readers_next_region(bcf_srs_t *files)
414 {
415 // Need to open new chromosome? Check number of lines in all readers' buffers
416 int i, eos = 0;
417 for (i=0; i<files->nreaders; i++)
418 if ( !files->readers[i].itr && !files->readers[i].nbuffer ) eos++;
419
420 if ( eos!=files->nreaders )
421 {
422 // Some of the readers still has buffered lines
423 return 0;
424 }
425
426 // No lines in the buffer, need to open new region or quit
427 if ( bcf_sr_regions_next(files->regions)<0 ) return -1;
428
429 for (i=0; i<files->nreaders; i++)
430 _reader_seek(&files->readers[i],files->regions->seq_names[files->regions->iseq],files->regions->start,files->regions->end);
431
432 return 0;
433 }
434
435 /*
436 * _reader_fill_buffer() - buffers all records with the same coordinate
437 */
438 static void _reader_fill_buffer(bcf_srs_t *files, bcf_sr_t *reader)
439 {
440 // Return if the buffer is full: the coordinate of the last buffered record differs
441 if ( reader->nbuffer && reader->buffer[reader->nbuffer]->pos != reader->buffer[1]->pos ) return;
442
443 // No iterator (sequence not present in this file) and not streaming
444 if ( !reader->itr && !files->streaming ) return;
445
446 // Fill the buffer with records starting at the same position
447 int i, ret = 0;
448 while (1)
449 {
450 if ( reader->nbuffer+1 >= reader->mbuffer )
451 {
452 // Increase buffer size
453 reader->mbuffer += 8;
454 reader->buffer = (bcf1_t**) realloc(reader->buffer, sizeof(bcf1_t*)*reader->mbuffer);
455 for (i=8; i>0; i--) // initialize
456 {
457 reader->buffer[reader->mbuffer-i] = bcf_init1();
458 reader->buffer[reader->mbuffer-i]->max_unpack = files->max_unpack;
459 reader->buffer[reader->mbuffer-i]->pos = -1; // for rare cases when VCF starts from 1
460 }
461 }
462 if ( files->streaming )
463 {
464 if ( reader->file->format.format==vcf )
465 {
466 if ( (ret=hts_getline(reader->file, KS_SEP_LINE, &files->tmps)) < 0 ) break; // no more lines
467 int ret = vcf_parse1(&files->tmps, reader->header, reader->buffer[reader->nbuffer+1]);
468 if ( ret<0 ) break;
469 }
470 else if ( reader->file->format.format==bcf )
471 {
472 if ( (ret=bcf_read1(reader->file, reader->header, reader->buffer[reader->nbuffer+1])) < 0 ) break; // no more lines
473 }
474 else
475 {
476 fprintf(stderr,"[%s:%d %s] fixme: not ready for this\n", __FILE__,__LINE__,__FUNCTION__);
477 exit(1);
478 }
479 }
480 else if ( reader->tbx_idx )
481 {
482 if ( (ret=tbx_itr_next(reader->file, reader->tbx_idx, reader->itr, &files->tmps)) < 0 ) break; // no more lines
483 vcf_parse1(&files->tmps, reader->header, reader->buffer[reader->nbuffer+1]);
484 }
485 else
486 {
487 if ( (ret=bcf_itr_next(reader->file, reader->itr, reader->buffer[reader->nbuffer+1])) < 0 ) break; // no more lines
488 bcf_subset_format(reader->header,reader->buffer[reader->nbuffer+1]);
489 }
490
491 // apply filter
492 if ( !reader->nfilter_ids )
493 bcf_unpack(reader->buffer[reader->nbuffer+1], BCF_UN_STR);
494 else
495 {
496 bcf_unpack(reader->buffer[reader->nbuffer+1], BCF_UN_STR|BCF_UN_FLT);
497 if ( !has_filter(reader, reader->buffer[reader->nbuffer+1]) ) continue;
498 }
499 reader->nbuffer++;
500
501 if ( reader->buffer[reader->nbuffer]->pos != reader->buffer[1]->pos ) break; // the buffer is full
502 }
503 if ( ret<0 )
504 {
505 // done for this region
506 tbx_itr_destroy(reader->itr);
507 reader->itr = NULL;
508 }
509 if ( files->collapse && reader->nbuffer>=2 && reader->buffer[1]->pos==reader->buffer[2]->pos )
510 collapse_buffer(files, reader);
511 }
512
513 /*
514 * _readers_shift_buffer() - removes the first line and all subsequent lines with the same position
515 */
516 static void _reader_shift_buffer(bcf_sr_t *reader)
517 {
518 int i;
519 for (i=2; i<=reader->nbuffer; i++)
520 if ( reader->buffer[i]->pos!=reader->buffer[1]->pos ) break;
521 if ( i<=reader->nbuffer )
522 {
523 // A record with a different position follows, swap it. Because of the reader's logic,
524 // only one such line can be present.
525 bcf1_t *tmp = reader->buffer[1]; reader->buffer[1] = reader->buffer[i]; reader->buffer[i] = tmp;
526 reader->nbuffer = 1;
527 }
528 else
529 reader->nbuffer = 0; // no other line
530 }
531
532 /*
533 * _reader_match_alleles() - from multiple buffered lines selects the one which
534 * corresponds best to the template line. The logic is controlled by COLLAPSE_*
535 * Returns 0 on success or -1 when no good matching line is found.
536 */
537 static int _reader_match_alleles(bcf_srs_t *files, bcf_sr_t *reader, bcf1_t *tmpl)
538 {
539 int i, irec = -1;
540
541 // if no template given, use the first available record
542 if ( !tmpl )
543 irec = 1;
544 else
545 {
546 int tmpl_type = bcf_get_variant_types(tmpl);
547 for (i=1; i<=reader->nbuffer; i++)
548 {
549 bcf1_t *line = reader->buffer[i];
550 if ( line->pos != reader->buffer[1]->pos ) break; // done with this reader
551
552 // Easiest case: matching by position only
553 if ( files->collapse&COLLAPSE_ANY ) { irec=i; break; }
554
555 int line_type = bcf_get_variant_types(line);
556
557 // No matter what the alleles are, as long as they are both SNPs
558 if ( files->collapse&COLLAPSE_SNPS && tmpl_type&VCF_SNP && line_type&VCF_SNP ) { irec=i; break; }
559 // ... or indels
560 if ( files->collapse&COLLAPSE_INDELS && tmpl_type&VCF_INDEL && line_type&VCF_INDEL ) { irec=i; break; }
561
562 // More thorough checking: REFs must match
563 if ( tmpl->rlen != line->rlen ) continue; // different length
564 if ( strcmp(tmpl->d.allele[0], line->d.allele[0]) ) continue; // the strings do not match
565
566 int ial,jal;
567 if ( files->collapse==COLLAPSE_NONE )
568 {
569 // Exact match, all alleles must be identical
570 if ( tmpl->n_allele!=line->n_allele ) continue; // different number of alleles, skip
571
572 int nmatch = 1; // REF has been already checked
573 for (ial=1; ial<tmpl->n_allele; ial++)
574 {
575 for (jal=1; jal<line->n_allele; jal++)
576 if ( !strcmp(tmpl->d.allele[ial], line->d.allele[jal]) ) { nmatch++; break; }
577 }
578 if ( nmatch==tmpl->n_allele ) { irec=i; break; } // found: exact match
579 continue;
580 }
581
582 if ( line->n_allele==1 && tmpl->n_allele==1 ) { irec=i; break; } // both sites are non-variant
583
584 // COLLAPSE_SOME: at least some ALTs must match
585 for (ial=1; ial<tmpl->n_allele; ial++)
586 {
587 for (jal=1; jal<line->n_allele; jal++)
588 if ( !strcmp(tmpl->d.allele[ial], line->d.allele[jal]) ) { irec=i; break; }
589 if ( irec>=1 ) break;
590 }
591 if ( irec>=1 ) break;
592 }
593 if ( irec==-1 ) return -1; // no matching line was found
594 }
595
596 // Set the selected line (irec) as active: set it to buffer[0], move the remaining lines forward
597 // and put the old bcf1_t record at the end.
598 bcf1_t *tmp = reader->buffer[0];
599 reader->buffer[0] = reader->buffer[irec];
600 for (i=irec+1; i<=reader->nbuffer; i++) reader->buffer[i-1] = reader->buffer[i];
601 reader->buffer[ reader->nbuffer ] = tmp;
602 reader->nbuffer--;
603
604 return 0;
605 }
606
607 int _reader_next_line(bcf_srs_t *files)
608 {
609 int i, min_pos = INT_MAX;
610
611 // Loop until next suitable line is found or all readers have finished
612 while ( 1 )
613 {
614 // Get all readers ready for the next region.
615 if ( files->regions && _readers_next_region(files)<0 ) break;
616
617 // Fill buffers
618 const char *chr = NULL;
619 for (i=0; i<files->nreaders; i++)
620 {
621 _reader_fill_buffer(files, &files->readers[i]);
622
623 // Update the minimum coordinate
624 if ( !files->readers[i].nbuffer ) continue;
625 if ( min_pos > files->readers[i].buffer[1]->pos )
626 {
627 min_pos = files->readers[i].buffer[1]->pos;
628 chr = bcf_seqname(files->readers[i].header, files->readers[i].buffer[1]);
629 }
630 }
631 if ( min_pos==INT_MAX )
632 {
633 if ( !files->regions ) break;
634 continue;
635 }
636
637 // Skip this position if not present in targets
638 if ( files->targets )
639 {
640 int ret = bcf_sr_regions_overlap(files->targets, chr, min_pos, min_pos);
641 if ( (!files->targets_exclude && ret<0) || (files->targets_exclude && !ret) )
642 {
643 // Remove all lines with this position from the buffer
644 for (i=0; i<files->nreaders; i++)
645 if ( files->readers[i].nbuffer && files->readers[i].buffer[1]->pos==min_pos )
646 _reader_shift_buffer(&files->readers[i]);
647 min_pos = INT_MAX;
648 continue;
649 }
650 }
651
652 break; // done: min_pos is set
653 }
654
655 // There can be records with duplicate positions. Set the active line intelligently so that
656 // the alleles match.
657 int nret = 0; // number of readers sharing the position
658 bcf1_t *first = NULL; // record which will be used for allele matching
659 for (i=0; i<files->nreaders; i++)
660 {
661 files->has_line[i] = 0;
662
663 // Skip readers with no records at this position
664 if ( !files->readers[i].nbuffer || files->readers[i].buffer[1]->pos!=min_pos ) continue;
665
666 // Until now buffer[0] of all reader was empty and the lines started at buffer[1].
667 // Now lines which are ready to be output will be moved to buffer[0].
668 if ( _reader_match_alleles(files, &files->readers[i], first) < 0 ) continue;
669 if ( !first ) first = files->readers[i].buffer[0];
670
671 nret++;
672 files->has_line[i] = 1;
673 }
674 return nret;
675 }
676
677 int bcf_sr_next_line(bcf_srs_t *files)
678 {
679 if ( !files->targets_als )
680 return _reader_next_line(files);
681
682 while (1)
683 {
684 int i, ret = _reader_next_line(files);
685 if ( !ret ) return ret;
686
687 for (i=0; i<files->nreaders; i++)
688 if ( files->has_line[i] ) break;
689
690 if ( _regions_match_alleles(files->targets, files->targets_als-1, files->readers[i].buffer[0]) ) return ret;
691
692 // Check if there are more duplicate lines in the buffers. If not, return this line as if it
693 // matched the targets, even if there is a type mismatch
694 for (i=0; i<files->nreaders; i++)
695 {
696 if ( !files->has_line[i] ) continue;
697 if ( files->readers[i].nbuffer==0 || files->readers[i].buffer[1]->pos!=files->readers[i].buffer[0]->pos ) continue;
698 break;
699 }
700 if ( i==files->nreaders ) return ret; // no more lines left, output even if target alleles are not of the same type
701 }
702 }
703
704 static void bcf_sr_seek_start(bcf_srs_t *readers)
705 {
706 bcf_sr_regions_t *reg = readers->regions;
707 int i;
708 for (i=0; i<reg->nseqs; i++)
709 reg->regs[i].creg = -1;
710 reg->iseq = 0;
711 }
712
713
714 int bcf_sr_seek(bcf_srs_t *readers, const char *seq, int pos)
715 {
716 if ( !seq && !pos )
717 {
718 // seek to start
719 bcf_sr_seek_start(readers);
720 return 0;
721 }
722
723 bcf_sr_regions_overlap(readers->regions, seq, pos, pos);
724 int i, nret = 0;
725 for (i=0; i<readers->nreaders; i++)
726 {
727 nret += _reader_seek(&readers->readers[i],seq,pos,MAX_CSI_COOR-1);
728 }
729 return nret;
730 }
731
732 int bcf_sr_set_samples(bcf_srs_t *files, const char *fname, int is_file)
733 {
734 int i, j, nsmpl, free_smpl = 0;
735 char **smpl = NULL;
736
737 void *exclude = (fname[0]=='^') ? khash_str2int_init() : NULL;
738 if ( exclude || strcmp("-",fname) ) // "-" stands for all samples
739 {
740 smpl = hts_readlist(fname, is_file, &nsmpl);
741 if ( !smpl )
742 {
743 fprintf(stderr,"Could not read the file: \"%s\"\n", fname);
744 return 0;
745 }
746 if ( exclude )
747 {
748 for (i=0; i<nsmpl; i++)
749 khash_str2int_inc(exclude, smpl[i]);
750 }
751 free_smpl = 1;
752 }
753 if ( !smpl )
754 {
755 smpl = files->readers[0].header->samples; // intersection of all samples
756 nsmpl = bcf_hdr_nsamples(files->readers[0].header);
757 }
758
759 files->samples = NULL;
760 files->n_smpl = 0;
761 for (i=0; i<nsmpl; i++)
762 {
763 if ( exclude && khash_str2int_has_key(exclude,smpl[i]) ) continue;
764
765 int n_isec = 0;
766 for (j=0; j<files->nreaders; j++)
767 {
768 if ( bcf_hdr_id2int(files->readers[j].header, BCF_DT_SAMPLE, smpl[i])<0 ) break;
769 n_isec++;
770 }
771 if ( n_isec!=files->nreaders )
772 {
773 fprintf(stderr,"Warning: The sample \"%s\" was not found in %s, skipping\n", smpl[i], files->readers[n_isec].fname);
774 continue;
775 }
776
777 files->samples = (char**) realloc(files->samples, (files->n_smpl+1)*sizeof(const char*));
778 files->samples[files->n_smpl++] = strdup(smpl[i]);
779 }
780
781 if ( exclude ) khash_str2int_destroy(exclude);
782 if ( free_smpl )
783 {
784 for (i=0; i<nsmpl; i++) free(smpl[i]);
785 free(smpl);
786 }
787
788 if ( !files->n_smpl )
789 {
790 if ( files->nreaders>1 )
791 fprintf(stderr,"No samples in common.\n");
792 return 0;
793 }
794 for (i=0; i<files->nreaders; i++)
795 {
796 bcf_sr_t *reader = &files->readers[i];
797 reader->samples = (int*) malloc(sizeof(int)*files->n_smpl);
798 reader->n_smpl = files->n_smpl;
799 for (j=0; j<files->n_smpl; j++)
800 reader->samples[j] = bcf_hdr_id2int(reader->header, BCF_DT_SAMPLE, files->samples[j]);
801 }
802 return 1;
803 }
804
805 // Add a new region into a list sorted by start,end. On input the coordinates
806 // are 1-based, stored 0-based, inclusive.
807 static void _regions_add(bcf_sr_regions_t *reg, const char *chr, int start, int end)
808 {
809 if ( start==-1 && end==-1 )
810 {
811 start = 0; end = MAX_CSI_COOR-1;
812 }
813 else
814 {
815 start--; end--; // store 0-based coordinates
816 }
817
818 if ( !reg->seq_hash )
819 reg->seq_hash = khash_str2int_init();
820
821 int iseq;
822 if ( khash_str2int_get(reg->seq_hash, chr, &iseq)<0 )
823 {
824 // the chromosome block does not exist
825 iseq = reg->nseqs++;
826 reg->seq_names = (char**) realloc(reg->seq_names,sizeof(char*)*reg->nseqs);
827 reg->regs = (region_t*) realloc(reg->regs,sizeof(region_t)*reg->nseqs);
828 memset(&reg->regs[reg->nseqs-1],0,sizeof(region_t));
829 reg->seq_names[iseq] = strdup(chr);
830 reg->regs[iseq].creg = -1;
831 khash_str2int_set(reg->seq_hash,reg->seq_names[iseq],iseq);
832 }
833
834 region_t *creg = &reg->regs[iseq];
835
836 // the regions may not be sorted on input: binary search
837 int i, min = 0, max = creg->nregs - 1;
838 while ( min<=max )
839 {
840 i = (max+min)/2;
841 if ( start < creg->regs[i].start ) max = i - 1;
842 else if ( start > creg->regs[i].start ) min = i + 1;
843 else break;
844 }
845 if ( min>max || creg->regs[i].start!=start || creg->regs[i].end!=end )
846 {
847 // no such region, insert a new one just after max
848 hts_expand(region1_t,creg->nregs+1,creg->mregs,creg->regs);
849 if ( ++max < creg->nregs )
850 memmove(&creg->regs[max+1],&creg->regs[max],(creg->nregs - max)*sizeof(region1_t));
851 creg->regs[max].start = start;
852 creg->regs[max].end = end;
853 creg->nregs++;
854 }
855 }
856
857 // File name or a list of genomic locations. If file name, NULL is returned.
858 static bcf_sr_regions_t *_regions_init_string(const char *str)
859 {
860 bcf_sr_regions_t *reg = (bcf_sr_regions_t *) calloc(1, sizeof(bcf_sr_regions_t));
861 reg->start = reg->end = -1;
862 reg->prev_start = reg->prev_seq = -1;
863
864 kstring_t tmp = {0,0,0};
865 const char *sp = str, *ep = str;
866 int from, to;
867 while ( 1 )
868 {
869 while ( *ep && *ep!=',' && *ep!=':' ) ep++;
870 tmp.l = 0;
871 kputsn(sp,ep-sp,&tmp);
872 if ( *ep==':' )
873 {
874 sp = ep+1;
875 from = strtol(sp,(char**)&ep,10);
876 if ( sp==ep )
877 {
878 fprintf(stderr,"[%s:%d %s] Could not parse the region(s): %s\n", __FILE__,__LINE__,__FUNCTION__,str);
879 free(reg); free(tmp.s); return NULL;
880 }
881 if ( !*ep || *ep==',' )
882 {
883 _regions_add(reg, tmp.s, from, from);
884 sp = ep;
885 continue;
886 }
887 if ( *ep!='-' )
888 {
889 fprintf(stderr,"[%s:%d %s] Could not parse the region(s): %s\n", __FILE__,__LINE__,__FUNCTION__,str);
890 free(reg); free(tmp.s); return NULL;
891 }
892 ep++;
893 sp = ep;
894 to = strtol(sp,(char**)&ep,10);
895 if ( *ep && *ep!=',' )
896 {
897 fprintf(stderr,"[%s:%d %s] Could not parse the region(s): %s\n", __FILE__,__LINE__,__FUNCTION__,str);
898 free(reg); free(tmp.s); return NULL;
899 }
900 if ( sp==ep ) to = MAX_CSI_COOR-1;
901 _regions_add(reg, tmp.s, from, to);
902 if ( !*ep ) break;
903 sp = ep;
904 }
905 else
906 {
907 if ( tmp.l ) _regions_add(reg, tmp.s, -1, -1);
908 if ( !*ep ) break;
909 sp = ++ep;
910 }
911 }
912 free(tmp.s);
913 return reg;
914 }
915
916 // ichr,ifrom,ito are 0-based;
917 // returns -1 on error, 0 if the line is a comment line, 1 on success
918 static int _regions_parse_line(char *line, int ichr,int ifrom,int ito, char **chr,char **chr_end,int *from,int *to)
919 {
920 *chr_end = NULL;
921
922 if ( line[0]=='#' ) return 0;
923
924 int k,l; // index of the start and end column of the tab-delimited file
925 if ( ifrom <= ito )
926 k = ifrom, l = ito;
927 else
928 l = ifrom, k = ito;
929
930 int i;
931 char *se = line, *ss = NULL; // start and end
932 char *tmp;
933 for (i=0; i<=k && *se; i++)
934 {
935 ss = i==0 ? se++ : ++se;
936 while (*se && *se!='\t') se++;
937 }
938 if ( i<=k ) return -1;
939 if ( k==l )
940 {
941 *from = *to = strtol(ss, &tmp, 10);
942 if ( tmp==ss ) return -1;
943 }
944 else
945 {
946 if ( k==ifrom )
947 *from = strtol(ss, &tmp, 10);
948 else
949 *to = strtol(ss, &tmp, 10);
950 if ( ss==tmp ) return -1;
951
952 for (i=k; i<l && *se; i++)
953 {
954 ss = ++se;
955 while (*se && *se!='\t') se++;
956 }
957 if ( i<l ) return -1;
958 if ( k==ifrom )
959 *to = strtol(ss, &tmp, 10);
960 else
961 *from = strtol(ss, &tmp, 10);
962 if ( ss==tmp ) return -1;
963 }
964
965 ss = se = line;
966 for (i=0; i<=ichr && *se; i++)
967 {
968 if ( i>0 ) ss = ++se;
969 while (*se && *se!='\t') se++;
970 }
971 if ( i<=ichr ) return -1;
972 *chr_end = se;
973 *chr = ss;
974 return 1;
975 }
976
977 bcf_sr_regions_t *bcf_sr_regions_init(const char *regions, int is_file, int ichr, int ifrom, int ito)
978 {
979 bcf_sr_regions_t *reg;
980 if ( !is_file ) return _regions_init_string(regions);
981
982 reg = (bcf_sr_regions_t *) calloc(1, sizeof(bcf_sr_regions_t));
983 reg->start = reg->end = -1;
984 reg->prev_start = reg->prev_seq = -1;
985
986 reg->file = hts_open(regions, "rb");
987 if ( !reg->file )
988 {
989 fprintf(stderr,"[%s:%d %s] Could not open file: %s\n", __FILE__,__LINE__,__FUNCTION__,regions);
990 free(reg);
991 return NULL;
992 }
993
994 reg->tbx = tbx_index_load(regions);
995 if ( !reg->tbx )
996 {
997 int len = strlen(regions);
998 int is_bed = strcasecmp(".bed",regions+len-4) ? 0 : 1;
999 if ( !is_bed && !strcasecmp(".bed.gz",regions+len-7) ) is_bed = 1;
1000
1001 if ( reg->file->format.format==vcf ) ito = 1;
1002
1003 // read the whole file, tabix index is not present
1004 while ( hts_getline(reg->file, KS_SEP_LINE, &reg->line) > 0 )
1005 {
1006 char *chr, *chr_end;
1007 int from, to, ret;
1008 ret = _regions_parse_line(reg->line.s, ichr,ifrom,abs(ito), &chr,&chr_end,&from,&to);
1009 if ( ret < 0 )
1010 {
1011 if ( ito<0 )
1012 ret = _regions_parse_line(reg->line.s, ichr,ifrom,ifrom, &chr,&chr_end,&from,&to);
1013 if ( ret<0 )
1014 {
1015 fprintf(stderr,"[%s:%d] Could not parse the file %s, using the columns %d,%d[,%d]\n", __FILE__,__LINE__,regions,ichr+1,ifrom+1,ito+1);
1016 hts_close(reg->file); reg->file = NULL; free(reg);
1017 return NULL;
1018 }
1019 }
1020 if ( !ret ) continue;
1021 if ( is_bed ) from++;
1022 *chr_end = 0;
1023 _regions_add(reg, chr, from, to);
1024 *chr_end = '\t';
1025 }
1026 hts_close(reg->file); reg->file = NULL;
1027 if ( !reg->nseqs ) { free(reg); return NULL; }
1028 return reg;
1029 }
1030
1031 reg->seq_names = (char**) tbx_seqnames(reg->tbx, &reg->nseqs);
1032 if ( !reg->seq_hash )
1033 reg->seq_hash = khash_str2int_init();
1034 int i;
1035 for (i=0; i<reg->nseqs; i++)
1036 {
1037 khash_str2int_set(reg->seq_hash,reg->seq_names[i],i);
1038 }
1039 reg->fname = strdup(regions);
1040 reg->is_bin = 1;
1041 return reg;
1042 }
1043
1044 void bcf_sr_regions_destroy(bcf_sr_regions_t *reg)
1045 {
1046 int i;
1047 free(reg->fname);
1048 if ( reg->itr ) tbx_itr_destroy(reg->itr);
1049 if ( reg->tbx ) tbx_destroy(reg->tbx);
1050 if ( reg->file ) hts_close(reg->file);
1051 if ( reg->als ) free(reg->als);
1052 if ( reg->als_str.s ) free(reg->als_str.s);
1053 free(reg->line.s);
1054 if ( reg->regs )
1055 {
1056 // free only in-memory names, tbx names are const
1057 for (i=0; i<reg->nseqs; i++)
1058 {
1059 free(reg->seq_names[i]);
1060 free(reg->regs[i].regs);
1061 }
1062 }
1063 free(reg->regs);
1064 free(reg->seq_names);
1065 khash_str2int_destroy(reg->seq_hash);
1066 free(reg);
1067 }
1068
1069 int bcf_sr_regions_seek(bcf_sr_regions_t *reg, const char *seq)
1070 {
1071 reg->iseq = reg->start = reg->end = -1;
1072 if ( khash_str2int_get(reg->seq_hash, seq, &reg->iseq) < 0 ) return -1; // sequence seq not in regions
1073
1074 // using in-memory regions
1075 if ( reg->regs )
1076 {
1077 reg->regs[reg->iseq].creg = -1;
1078 return 0;
1079 }
1080
1081 // reading regions from tabix
1082 if ( reg->itr ) tbx_itr_destroy(reg->itr);
1083 reg->itr = tbx_itr_querys(reg->tbx, seq);
1084 if ( reg->itr ) return 0;
1085
1086 return -1;
1087 }
1088
1089 int bcf_sr_regions_next(bcf_sr_regions_t *reg)
1090 {
1091 if ( reg->iseq<0 ) return -1;
1092 reg->start = reg->end = -1;
1093 reg->nals = 0;
1094
1095 // using in-memory regions
1096 if ( reg->regs )
1097 {
1098 while ( reg->iseq < reg->nseqs )
1099 {
1100 reg->regs[reg->iseq].creg++;
1101 if ( reg->regs[reg->iseq].creg < reg->regs[reg->iseq].nregs ) break;
1102 reg->iseq++;
1103 }
1104 if ( reg->iseq >= reg->nseqs ) { reg->iseq = -1; return -1; } // no more regions left
1105 region1_t *creg = &reg->regs[reg->iseq].regs[reg->regs[reg->iseq].creg];
1106 reg->start = creg->start;
1107 reg->end = creg->end;
1108 return 0;
1109 }
1110
1111 // reading from tabix
1112 char *chr, *chr_end;
1113 int ichr = 0, ifrom = 1, ito = 2, is_bed = 0, from, to;
1114 if ( reg->tbx )
1115 {
1116 ichr = reg->tbx->conf.sc-1;
1117 ifrom = reg->tbx->conf.bc-1;
1118 ito = reg->tbx->conf.ec-1;
1119 if ( ito<0 ) ito = ifrom;
1120 is_bed = reg->tbx->conf.preset==TBX_UCSC ? 1 : 0;
1121 }
1122
1123 int ret = 0;
1124 while ( !ret )
1125 {
1126 if ( reg->itr )
1127 {
1128 // tabix index present, reading a chromosome block
1129 ret = tbx_itr_next(reg->file, reg->tbx, reg->itr, &reg->line);
1130 if ( ret<0 ) { reg->iseq = -1; return -1; }
1131 }
1132 else
1133 {
1134 if ( reg->is_bin )
1135 {
1136 // Waited for seek which never came. Reopen in text mode and stream
1137 // through the regions, otherwise hts_getline would fail
1138 hts_close(reg->file);
1139 reg->file = hts_open(reg->fname, "r");
1140 if ( !reg->file )
1141 {
1142 fprintf(stderr,"[%s:%d %s] Could not open file: %s\n", __FILE__,__LINE__,__FUNCTION__,reg->fname);
1143 reg->file = NULL;
1144 bcf_sr_regions_destroy(reg);
1145 return -1;
1146 }
1147 reg->is_bin = 0;
1148 }
1149
1150 // tabix index absent, reading the whole file
1151 ret = hts_getline(reg->file, KS_SEP_LINE, &reg->line);
1152 if ( ret<0 ) { reg->iseq = -1; return -1; }
1153 }
1154 ret = _regions_parse_line(reg->line.s, ichr,ifrom,ito, &chr,&chr_end,&from,&to);
1155 if ( ret<0 )
1156 {
1157 fprintf(stderr,"[%s:%d] Could not parse the file %s, using the columns %d,%d,%d\n", __FILE__,__LINE__,reg->fname,ichr+1,ifrom+1,ito+1);
1158 return -1;
1159 }
1160 }
1161 if ( is_bed ) from++;
1162
1163 *chr_end = 0;
1164 if ( khash_str2int_get(reg->seq_hash, chr, &reg->iseq)<0 )
1165 {
1166 fprintf(stderr,"Broken tabix index? The sequence \"%s\" not in dictionary [%s]\n", chr,reg->line.s);
1167 exit(1);
1168 }
1169 *chr_end = '\t';
1170
1171 reg->start = from - 1;
1172 reg->end = to - 1;
1173 return 0;
1174 }
1175
1176 static int _regions_match_alleles(bcf_sr_regions_t *reg, int als_idx, bcf1_t *rec)
1177 {
1178 int i = 0, max_len = 0;
1179 if ( !reg->nals )
1180 {
1181 char *ss = reg->line.s;
1182 while ( i<als_idx && *ss )
1183 {
1184 if ( *ss=='\t' ) i++;
1185 ss++;
1186 }
1187 char *se = ss;
1188 reg->nals = 1;
1189 while ( *se && *se!='\t' )
1190 {
1191 if ( *se==',' ) reg->nals++;
1192 se++;
1193 }
1194 ks_resize(&reg->als_str, se-ss+1+reg->nals);
1195 reg->als_str.l = 0;
1196 hts_expand(char*,reg->nals,reg->mals,reg->als);
1197 reg->nals = 0;
1198
1199 se = ss;
1200 while ( *(++se) )
1201 {
1202 if ( *se=='\t' ) break;
1203 if ( *se!=',' ) continue;
1204 reg->als[reg->nals] = &reg->als_str.s[reg->als_str.l];
1205 kputsn(ss,se-ss,&reg->als_str);
1206 if ( &reg->als_str.s[reg->als_str.l] - reg->als[reg->nals] > max_len ) max_len = &reg->als_str.s[reg->als_str.l] - reg->als[reg->nals];
1207 reg->als_str.l++;
1208 reg->nals++;
1209 ss = ++se;
1210 }
1211 reg->als[reg->nals] = &reg->als_str.s[reg->als_str.l];
1212 kputsn(ss,se-ss,&reg->als_str);
1213 if ( &reg->als_str.s[reg->als_str.l] - reg->als[reg->nals] > max_len ) max_len = &reg->als_str.s[reg->als_str.l] - reg->als[reg->nals];
1214 reg->nals++;
1215 reg->als_type = max_len > 1 ? VCF_INDEL : VCF_SNP; // this is a simplified check, see vcf.c:bcf_set_variant_types
1216 }
1217 int type = bcf_get_variant_types(rec);
1218 if ( reg->als_type & VCF_INDEL )
1219 return type & VCF_INDEL ? 1 : 0;
1220 return !(type & VCF_INDEL) ? 1 : 0;
1221 }
1222
1223 int bcf_sr_regions_overlap(bcf_sr_regions_t *reg, const char *seq, int start, int end)
1224 {
1225 int iseq;
1226 if ( khash_str2int_get(reg->seq_hash, seq, &iseq)<0 ) return -1; // no such sequence
1227
1228 if ( reg->prev_seq==-1 || iseq!=reg->prev_seq || reg->prev_start > start ) // new chromosome or after a seek
1229 {
1230 // flush regions left on previous chromosome
1231 if ( reg->missed_reg_handler && reg->prev_seq!=-1 && reg->iseq!=-1 )
1232 bcf_sr_regions_flush(reg);
1233
1234 bcf_sr_regions_seek(reg, seq);
1235 reg->start = reg->end = -1;
1236 }
1237 if ( reg->prev_seq==iseq && reg->iseq!=iseq ) return -2; // no more regions on this chromosome
1238 reg->prev_seq = reg->iseq;
1239 reg->prev_start = start;
1240
1241 while ( iseq==reg->iseq && reg->end < start )
1242 {
1243 if ( bcf_sr_regions_next(reg) < 0 ) return -2; // no more regions left
1244 if ( reg->iseq != iseq ) return -1; // does not overlap any regions
1245 if ( reg->missed_reg_handler && reg->end < start ) reg->missed_reg_handler(reg, reg->missed_reg_data);
1246 }
1247 if ( reg->start <= end ) return 0; // region overlap
1248 return -1; // no overlap
1249 }
1250
1251 void bcf_sr_regions_flush(bcf_sr_regions_t *reg)
1252 {
1253 if ( !reg->missed_reg_handler || reg->prev_seq==-1 ) return;
1254 while ( !bcf_sr_regions_next(reg) ) reg->missed_reg_handler(reg, reg->missed_reg_data);
1255 return;
1256 }
1257