Sometime some regions of the genome are hard to call to get VCF files.
The following Nextflow workflow use a divide-and-conquer strategy: if the region is hard to call - that is the process fails (here as an example in CALL_VARIANTS
the region must have a length lower than 10bp) - the error is ignored (errorStrategy=ignore
) , the region is divided into 10 smaller regions , and again, and again , until the caller is able complete the job.
Thanks Mahesh Binzer-Panchal for the help. Slack
Usage
nextflow run \
-c .//workflow.cfg \
-resume main.nf \
--bams bams.list \
--bed intervals.bed
main.nf
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER1} from "./sub.nf"
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER2} from "./sub.nf"
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER3} from "./sub.nf"
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER4} from "./sub.nf"
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER5} from "./sub.nf"
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER6} from "./sub.nf"
workflow {
bam_ch = Channel.fromPath(params.bams).splitText().map{it.trim()}.map{[file(it),file(it+".bai")]}
intervals1_ch = Channel.fromPath(params.bed).splitCsv(header:false,sep:'\t').map{it[0]+":"+((it[1] as int)+1)+"-"+it[2]}
interval_and_bam1_ch = intervals1_ch.combine(bam_ch)
merge_ch = Channel.empty()
level1 = DIVIDE_AND_CONQUER1(interval_and_bam1_ch)
merge_ch = merge_ch.mix(level1.ok)
level2 = DIVIDE_AND_CONQUER2(level1.failed)
merge_ch = merge_ch.mix(level2.ok)
level3 = DIVIDE_AND_CONQUER3(level2.failed)
merge_ch = merge_ch.mix(level3.ok)
level4 = DIVIDE_AND_CONQUER4(level3.failed)
merge_ch = merge_ch.mix(level4.ok)
level5 = DIVIDE_AND_CONQUER5(level4.failed)
merge_ch = merge_ch.mix(level5.ok)
level6 = DIVIDE_AND_CONQUER6(level5.failed)
merge_ch = merge_ch.mix(level6.ok)
MERGE(merge_ch.map{it[3]}.collect())
level6.failed.view{"${it} cannot be called"}
}
process MERGE {
input:
path("VCF/*")
script:
"""
find VCF -name "*.vcf" > concat.list
"""
}
sub.nf
include {CALL_VARIANTS} from "./part.nf"
def splitIntervals(interval,bam,bai) {
int colon = interval.indexOf(":");
int hyphen = interval.indexOf('-',colon+1);
String chrom = interval.substring(0,colon);
int start = (interval.substring(colon+1,hyphen) as int)-1;
int end = (interval.substring(hyphen+1) as int);
int length = end-start;
int l2 = Math.max(1,(int)(length/10));
def L = [];
if(l2>=length) {System.err.println("Skipping ${interval}") ; return L;}
while(start<end) {
L.add([chrom+":"+(start+1)+"-"+Math.min(start+l2,end),bam,bai]);
start+=l2;
}
return L;
}
workflow DIVIDE_AND_CONQUER {
take:
interval_and_bam_ch // tuple [ interval, bam, bai ]
main:
sentinel_ch = interval_and_bam_ch.map{[ [it[0],it[1].toRealPath(),it[2].toRealPath()] ,file("NO_VCF")]}
call_ch = CALL_VARIANTS(interval_and_bam_ch)
call_ch.output.map{[ [it[0],it[1].toRealPath(), it[2].toRealPath()], it[3]]}.
mix(sentinel_ch).
groupTuple().
branch {
failed : it[1].size()==1
ok: true
}.set{result_ch}
ok_ch = result_ch.ok.map{[it[0][0], it[0][1], it[0][2], it[1].find{T->!T.endsWith("NO_VCF")} ]}
failed1_ch = result_ch.failed.map{[it[0][0], it[0][1], it[0][2]]}
failed2_ch = failed1_ch.flatMap{splitIntervals(it[0],it[1],it[2])}
emit:
ok = ok_ch
failed= failed2_ch
}
part.nf
process CALL_VARIANTS {
tag "${interval} ${bam.name} ${bai.name}"
executor "local"
input:
tuple val(interval),path(bam),path(bai)
output:
tuple val(interval),path(bam),path(bai),path("*.vcf"),emit:output
script:
def colon = interval.indexOf(":");
def hyphen = interval.indexOf('-',colon+1);
def chrom = interval.substring(0,colon);
def start = (interval.substring(colon+1,hyphen) as int)-1;
def end = (interval.substring(hyphen+1) as int);
def len = end-start;
"""
echo "${len}" | awk '(int(\$1)<=10) {print "OK";}'| grep OK && echo '${chrom}\t${start}\t${end}' > ${chrom}_${start+1}_${end}.${bam.simpleName}.vcf
"""
}
workflow.cfg
params {
bams = "NO_FILE"
bed = "NO_FILE"
}
process {
withName : "CALL_VARIANTS" {
errorStrategy = "ignore"
}
}
workflow.png
Pierre Lindenbaum