From 7507dfaf6ad64360d4b405dc195ef63d65c5dfa3 Mon Sep 17 00:00:00 2001 From: Rasmus Borup Hansen Date: Sun, 29 Jan 2017 20:01:18 +0100 Subject: [PATCH] Allow larger chunks of reads to be processed, and estimate the insert size based on all pairs in "bwa mem", once all reads have been processed. --- bwa.c | 6 +-- bwa.h | 2 +- bwamem.c | 8 ++-- bwamem.h | 8 ++-- bwamem_pair.c | 103 ++++++++++++++++++++++++++++++-------------------- fastmap.c | 32 ++++++++++++---- 6 files changed, 98 insertions(+), 61 deletions(-) diff --git a/bwa.c b/bwa.c index c78ebb67..ab5b1fc1 100644 --- a/bwa.c +++ b/bwa.c @@ -39,10 +39,10 @@ static inline void kseq2bseq1(const kseq_t *ks, bseq1_t *s) s->l_seq = strlen(s->seq); } -bseq1_t *bseq_read(int chunk_size, int *n_, void *ks1_, void *ks2_) +bseq1_t *bseq_read(int64_t chunk_size, int *n_, void *ks1_, void *ks2_) { kseq_t *ks = (kseq_t*)ks1_, *ks2 = (kseq_t*)ks2_; - int size = 0, m, n; + int64_t size = 0, m, n; bseq1_t *seqs; m = n = 0; seqs = 0; while (kseq_read(ks) >= 0) { @@ -64,7 +64,7 @@ bseq1_t *bseq_read(int chunk_size, int *n_, void *ks1_, void *ks2_) seqs[n].id = n; size += seqs[n++].l_seq; } - if (size >= chunk_size && (n&1) == 0) break; + if (chunk_size && size >= chunk_size && (n&1) == 0) break; } if (size == 0) { // test if the 2nd file is finished if (ks2 && kseq_read(ks2) >= 0) diff --git a/bwa.h b/bwa.h index aa21725e..04702882 100644 --- a/bwa.h +++ b/bwa.h @@ -39,7 +39,7 @@ extern char bwa_rg_id[256]; extern "C" { #endif - bseq1_t *bseq_read(int chunk_size, int *n_, void *ks1_, void *ks2_); + bseq1_t *bseq_read(int64_t chunk_size, int *n_, void *ks1_, void *ks2_); void bseq_classify(int n, bseq1_t *seqs, int m[2], bseq1_t *sep[2]); void bwa_fill_scmat(int a, int b, int8_t mat[25]); diff --git a/bwamem.c b/bwamem.c index 535606b6..3f38396e 100644 --- a/bwamem.c +++ b/bwamem.c @@ -851,12 +851,12 @@ void mem_aln2sam(const mem_opt_t *opt, const bntseq_t *bns, kstring_t *str, bseq if (p->rid == m->rid) kputc('=', str); else kputs(bns->anns[m->rid].name, str); kputc('\t', str); - kputl(m->pos + 1, str); kputc('\t', str); + kputl(m->pos + 1, str); kputc('\t', str); // MPOS if (p->rid == m->rid) { int64_t p0 = p->pos + (p->is_rev? get_rlen(p->n_cigar, p->cigar) - 1 : 0); int64_t p1 = m->pos + (m->is_rev? get_rlen(m->n_cigar, m->cigar) - 1 : 0); if (m->n_cigar == 0 || p->n_cigar == 0) kputc('0', str); - else kputl(-(p0 - p1 + (p0 > p1? 1 : p0 < p1? -1 : 0)), str); + else kputl(-(p0 - p1 + (p0 > p1? 1 : p0 < p1? -1 : 0)), str); // TLEN } else kputc('0', str); } else kputsn("*\t0\t0", 5, str); kputc('\t', str); @@ -1194,7 +1194,7 @@ static void worker2(void *data, int i, int tid) } } -void mem_process_seqs(const mem_opt_t *opt, const bwt_t *bwt, const bntseq_t *bns, const uint8_t *pac, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0) +void mem_process_seqs(const mem_opt_t *opt, const bwt_t *bwt, const bntseq_t *bns, const uint8_t *pac, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0, int **global_ins_size_dist) { extern void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n); worker_t w; @@ -1217,7 +1217,7 @@ void mem_process_seqs(const mem_opt_t *opt, const bwt_t *bwt, const bntseq_t *bn free(w.aux); if (opt->flag&MEM_F_PE) { // infer insert sizes if not provided if (pes0) memcpy(pes, pes0, 4 * sizeof(mem_pestat_t)); // if pes0 != NULL, set the insert-size distribution as pes0 - else mem_pestat(opt, bns->l_pac, n, w.regs, pes); // otherwise, infer the insert size distribution from data + else mem_pestat_store(opt, bns->l_pac, n, w.regs, pes, global_ins_size_dist); // otherwise, infer the insert size distribution from data and store the distribution in global_ins_size_dist } kt_for(opt->n_threads, worker2, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // generate alignment free(w.regs); diff --git a/bwamem.h b/bwamem.h index 4e2b1d8c..30a66844 100644 --- a/bwamem.h +++ b/bwamem.h @@ -42,7 +42,7 @@ typedef struct { int max_occ; // skip a seed if its occurence is larger than this value int max_chain_gap; // do not chain seed if it is max_chain_gap-bp away from the closest seed int n_threads; // number of threads - int chunk_size; // process chunk_size-bp sequences in a batch + int64_t chunk_size; // process chunk_size-bp sequences in a batch float mask_level; // regard a hit as redundant if the overlap with another better hit is over mask_level times the min length of the two hits float drop_ratio; // drop a chain if its seed coverage is below drop_ratio times the seed coverage of a better chain overlapping with the small chain float XA_drop_ratio; // when counting hits for the XA tag, ignore alignments with score < XA_drop_ratio * max_score; only effective for the XA tag @@ -130,7 +130,7 @@ extern "C" { * @param pes0 insert-size info; if NULL, infer from data; if not NULL, it should be an array with 4 elements, * corresponding to each FF, FR, RF and RR orientation. See mem_pestat() for more info. */ - void mem_process_seqs(const mem_opt_t *opt, const bwt_t *bwt, const bntseq_t *bns, const uint8_t *pac, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0); + void mem_process_seqs(const mem_opt_t *opt, const bwt_t *bwt, const bntseq_t *bns, const uint8_t *pac, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0, int **global_ins_size_dist); /** * Find the aligned regions for one query sequence @@ -176,8 +176,8 @@ extern "C" { * @param regs region array of size $n; 2i-th and (2i+1)-th elements constitute a pair * @param pes inferred insert size distribution (output) */ - void mem_pestat(const mem_opt_t *opt, int64_t l_pac, int n, const mem_alnreg_v *regs, mem_pestat_t pes[4]); - + void mem_pestat(const mem_opt_t *opt, int **ins_size_dist, mem_pestat_t pes[4]); + void mem_pestat_store(const mem_opt_t *opt, int64_t l_pac, int n, const mem_alnreg_v *regs, mem_pestat_t pes[4], int **global_ins_size_dist); #ifdef __cplusplus } #endif diff --git a/bwamem_pair.c b/bwamem_pair.c index 9beb84b6..e395d103 100644 --- a/bwamem_pair.c +++ b/bwamem_pair.c @@ -43,52 +43,51 @@ static int cal_sub(const mem_opt_t *opt, mem_alnreg_v *r) return j < r->n? r->a[j].score : opt->min_seed_len * opt->a; } -void mem_pestat(const mem_opt_t *opt, int64_t l_pac, int n, const mem_alnreg_v *regs, mem_pestat_t pes[4]) -{ - int i, d, max; - uint64_v isize[4]; - memset(pes, 0, 4 * sizeof(mem_pestat_t)); - memset(isize, 0, sizeof(kvec_t(int)) * 4); - for (i = 0; i < n>>1; ++i) { - int dir; - int64_t is; - mem_alnreg_v *r[2]; - r[0] = (mem_alnreg_v*)®s[i<<1|0]; - r[1] = (mem_alnreg_v*)®s[i<<1|1]; - if (r[0]->n == 0 || r[1]->n == 0) continue; - if (cal_sub(opt, r[0]) > MIN_RATIO * r[0]->a[0].score) continue; - if (cal_sub(opt, r[1]) > MIN_RATIO * r[1]->a[0].score) continue; - if (r[0]->a[0].rid != r[1]->a[0].rid) continue; // not on the same chr - dir = mem_infer_dir(l_pac, r[0]->a[0].rb, r[1]->a[0].rb, &is); - if (is && is <= opt->max_ins) kv_push(uint64_t, isize[dir], is); +void mem_pestat(const mem_opt_t *opt, int **ins_size_dist, mem_pestat_t pes[4]) { + int i, d, k, n[4], x, p25, p50, p75, max; + for (d = 0, max = 0; d < 4; ++d) { + for (i = 0, n[d] = 0; i < opt->max_ins; ++i) + n[d] += ins_size_dist[d][i]; + if (n[d] > max) max=n[d]; } - if (bwa_verbose >= 3) fprintf(stderr, "[M::%s] # candidate unique pairs for (FF, FR, RF, RR): (%ld, %ld, %ld, %ld)\n", __func__, isize[0].n, isize[1].n, isize[2].n, isize[3].n); - for (d = 0; d < 4; ++d) { // TODO: this block is nearly identical to the one in bwtsw2_pair.c. It would be better to merge these two. + if (bwa_verbose >= 3) fprintf(stderr, "[M::%s] # candidate unique pairs for (FF, FR, RF, RR): (%d, %d, %d, %d)\n", __func__, n[0], n[1], n[2], n[3]); + for (d = 0; d < 4; ++d) { mem_pestat_t *r = &pes[d]; - uint64_v *q = &isize[d]; - int p25, p50, p75, x; - if (q->n < MIN_DIR_CNT) { + if (n[d] < MIN_DIR_CNT) { fprintf(stderr, "[M::%s] skip orientation %c%c as there are not enough pairs\n", __func__, "FR"[d>>1&1], "FR"[d&1]); r->failed = 1; - free(q->a); continue; } else fprintf(stderr, "[M::%s] analyzing insert size distribution for orientation %c%c...\n", __func__, "FR"[d>>1&1], "FR"[d&1]); - ks_introsort_64(q->n, q->a); - p25 = q->a[(int)(.25 * q->n + .499)]; - p50 = q->a[(int)(.50 * q->n + .499)]; - p75 = q->a[(int)(.75 * q->n + .499)]; - r->low = (int)(p25 - OUTLIER_BOUND * (p75 - p25) + .499); + // TODO: Code in bwtsw2_pair.c also computes percentiles. It would be better to merge these two. + p25 = 0; + k = 0; + x = (int)(.25 * n[d] + .499); + while (k <= x) + k += ins_size_dist[d][p25++]; + p50 = p25; + x = (int)(.50 * n[d] + .499); + while (k <= x) + k += ins_size_dist[d][p50++]; + p75 = p50; + x = (int)(.75 * n[d] + .499); // Will be strictly less than n[d], unless MIN_DIR_CNT is set to 1 and n[d] equals 1. + while (k <= x) + k += ins_size_dist[d][p75++]; + r->low = (int)(p25 - OUTLIER_BOUND * (p75 - p25) + .499); if (r->low < 1) r->low = 1; r->high = (int)(p75 + OUTLIER_BOUND * (p75 - p25) + .499); + if (r->high > opt->max_ins) r->high = opt->max_ins; fprintf(stderr, "[M::%s] (25, 50, 75) percentile: (%d, %d, %d)\n", __func__, p25, p50, p75); fprintf(stderr, "[M::%s] low and high boundaries for computing mean and std.dev: (%d, %d)\n", __func__, r->low, r->high); - for (i = x = 0, r->avg = 0; i < q->n; ++i) - if (q->a[i] >= r->low && q->a[i] <= r->high) - r->avg += q->a[i], ++x; + for (i = r->low-1, r->avg = 0, x = 0; i < r->high; ++i) { + k = ins_size_dist[d][i]; + x += k; + r->avg += (i+1)*k; + } r->avg /= x; - for (i = 0, r->std = 0; i < q->n; ++i) - if (q->a[i] >= r->low && q->a[i] <= r->high) - r->std += (q->a[i] - r->avg) * (q->a[i] - r->avg); + for (i = r->low-1, r->std = 0; i < r->high; ++i) { + k = ins_size_dist[d][i]; + r->std += k*(i+1-r->avg)*(i+1-r->avg); + } r->std = sqrt(r->std / x); fprintf(stderr, "[M::%s] mean and std.dev: (%.2f, %.2f)\n", __func__, r->avg, r->std); r->low = (int)(p25 - MAPPING_BOUND * (p75 - p25) + .499); @@ -97,17 +96,39 @@ void mem_pestat(const mem_opt_t *opt, int64_t l_pac, int n, const mem_alnreg_v * if (r->high < r->avg + MAX_STDDEV * r->std) r->high = (int)(r->avg + MAX_STDDEV * r->std + .499); if (r->low < 1) r->low = 1; fprintf(stderr, "[M::%s] low and high boundaries for proper pairs: (%d, %d)\n", __func__, r->low, r->high); - free(q->a); } - for (d = 0, max = 0; d < 4; ++d) - max = max > isize[d].n? max : isize[d].n; for (d = 0; d < 4; ++d) - if (pes[d].failed == 0 && isize[d].n < max * MIN_DIR_RATIO) { + if (pes[d].failed == 0 && n[d] < max * MIN_DIR_RATIO) { pes[d].failed = 1; fprintf(stderr, "[M::%s] skip orientation %c%c\n", __func__, "FR"[d>>1&1], "FR"[d&1]); } } +void mem_pestat_store(const mem_opt_t *opt, int64_t l_pac, int n, const mem_alnreg_v *regs, mem_pestat_t pes[4], int **global_ins_size_dist) { + int i, d, *ins_size_dist[4]; + for (d = 0; d < 4; ++d) ins_size_dist[d] = calloc(opt->max_ins, sizeof(int)); + memset(pes, 0, 4 * sizeof(mem_pestat_t)); + for (i = 0; i < n>>1; ++i) { + int dir; + int64_t is; + mem_alnreg_v *r[2]; + r[0] = (mem_alnreg_v*)®s[i<<1|0]; + r[1] = (mem_alnreg_v*)®s[i<<1|1]; + if (r[0]->n == 0 || r[1]->n == 0) continue; + if (cal_sub(opt, r[0]) > MIN_RATIO * r[0]->a[0].score) continue; + if (cal_sub(opt, r[1]) > MIN_RATIO * r[1]->a[0].score) continue; + if (r[0]->a[0].rid != r[1]->a[0].rid) continue; // not on the same chr + dir = mem_infer_dir(l_pac, r[0]->a[0].rb, r[1]->a[0].rb, &is); + if (is && is <= opt->max_ins) ins_size_dist[dir][is-1]++; + } + mem_pestat(opt, ins_size_dist, pes); + for (d = 0; d < 4; ++d) { + for (i = 0; i < opt->max_ins; ++i) + global_ins_size_dist[d][i] += ins_size_dist[d][i]; + free(ins_size_dist[d]); + } +} + int mem_matesw(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], const mem_alnreg_t *a, int l_ms, const uint8_t *ms, mem_alnreg_v *ma) { extern int mem_sort_dedup_patch(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, uint8_t *query, int n, mem_alnreg_t *a); @@ -153,8 +174,8 @@ int mem_matesw(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, co if (aln.score >= opt->min_seed_len && aln.qb >= 0) { // something goes wrong if aln.qb < 0 b.rid = a->rid; b.is_alt = a->is_alt; - b.qb = is_rev? l_ms - (aln.qe + 1) : aln.qb; - b.qe = is_rev? l_ms - aln.qb : aln.qe + 1; + b.qb = is_rev? l_ms - (aln.qe + 1) : aln.qb; + b.qe = is_rev? l_ms - aln.qb : aln.qe + 1; b.rb = is_rev? (l_pac<<1) - (rb + aln.te + 1) : rb + aln.tb; b.re = is_rev? (l_pac<<1) - (rb + aln.tb) : rb + aln.te + 1; b.score = aln.score; diff --git a/fastmap.c b/fastmap.c index 7b7145d9..d6efb3b7 100644 --- a/fastmap.c +++ b/fastmap.c @@ -24,9 +24,10 @@ typedef struct { kseq_t *ks, *ks2; mem_opt_t *opt; mem_pestat_t *pes0; - int64_t n_processed; - int copy_comment, actual_chunk_size; + int64_t n_processed, actual_chunk_size; + int copy_comment; bwaidx_t *idx; + int **global_ins_size_dist; } ktp_aux_t; typedef struct { @@ -70,18 +71,18 @@ static void *process(void *shared, int step, void *_data) fprintf(stderr, "[M::%s] %d single-end sequences; %d paired-end sequences\n", __func__, n_sep[0], n_sep[1]); if (n_sep[0]) { tmp_opt.flag &= ~MEM_F_PE; - mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, aux->n_processed, n_sep[0], sep[0], 0); + mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, aux->n_processed, n_sep[0], sep[0], 0, 0); for (i = 0; i < n_sep[0]; ++i) data->seqs[sep[0][i].id].sam = sep[0][i].sam; } if (n_sep[1]) { tmp_opt.flag |= MEM_F_PE; - mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, aux->n_processed + n_sep[0], n_sep[1], sep[1], aux->pes0); + mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, aux->n_processed + n_sep[0], n_sep[1], sep[1], aux->pes0, aux->global_ins_size_dist); for (i = 0; i < n_sep[1]; ++i) data->seqs[sep[1][i].id].sam = sep[1][i].sam; } free(sep[0]); free(sep[1]); - } else mem_process_seqs(opt, idx->bwt, idx->bns, idx->pac, aux->n_processed, data->n_seqs, data->seqs, aux->pes0); + } else mem_process_seqs(opt, idx->bwt, idx->bns, idx->pac, aux->n_processed, data->n_seqs, data->seqs, aux->pes0, aux->global_ins_size_dist); aux->n_processed += data->n_seqs; return data; } else if (step == 2) { @@ -116,7 +117,7 @@ int main_mem(int argc, char *argv[]) { mem_opt_t *opt, opt0; int fd, fd2, i, c, ignore_alt = 0, no_mt_io = 0; - int fixed_chunk_size = -1; + int64_t fixed_chunk_size = -1; gzFile fp, fp2 = 0; char *p, *rg_line = 0, *hdr_line = 0; const char *mode = 0; @@ -161,7 +162,7 @@ int main_mem(int argc, char *argv[]) else if (c == 'W') opt->min_chain_weight = atoi(optarg), opt0.min_chain_weight = 1; else if (c == 'y') opt->max_mem_intv = atol(optarg), opt0.max_mem_intv = 1; else if (c == 'C') aux.copy_comment = 1; - else if (c == 'K') fixed_chunk_size = atoi(optarg); + else if (c == 'K') fixed_chunk_size = atol(optarg); else if (c == 'X') opt->mask_level = atof(optarg); else if (c == 'h') { opt0.max_XA_hits = opt0.max_XA_hits_alt = 1; @@ -227,6 +228,11 @@ int main_mem(int argc, char *argv[]) else return 1; } + if (!aux.pes0) { + aux.global_ins_size_dist = malloc(sizeof(int *)*4); + for (i = 0; i < 4; ++i) aux.global_ins_size_dist[i] = calloc(opt->max_ins, sizeof(int)); + } + if (rg_line) { hdr_line = bwa_insert_header(rg_line, hdr_line); free(rg_line); @@ -351,10 +357,20 @@ int main_mem(int argc, char *argv[]) } } bwa_print_sam_hdr(aux.idx->bns, hdr_line); - aux.actual_chunk_size = fixed_chunk_size > 0? fixed_chunk_size : opt->chunk_size * opt->n_threads; + aux.actual_chunk_size = fixed_chunk_size >= 0? fixed_chunk_size : opt->chunk_size * opt->n_threads; kt_pipeline(no_mt_io? 1 : 2, process, &aux, 3); free(hdr_line); free(opt); + if (aux.global_ins_size_dist) { + for (i = 0; i < 4; ++i) pes[i].failed = 0; + mem_pestat(opt, aux.global_ins_size_dist, pes); + if (pes[1].failed) + fprintf(stderr, "[E:%s] failed to estimate insert size for direction FR from all reads.\n", __func__); + else + fprintf(stderr, "[E:%s] option to specify insert size based on estimate for direction FR from all reads: -I %.2f,%.2f,%d,%d\n", __func__, pes[1].avg, pes[1].std, pes[1].high, pes[1].low); + for (i = 0; i < 4 ; ++i) free(aux.global_ins_size_dist[i]); + free(aux.global_ins_size_dist); + } bwa_idx_destroy(aux.idx); kseq_destroy(aux.ks); err_gzclose(fp); kclose(ko);