Divide-and-conquer strategy to call problematic regions

Sometime some regions of the genome are hard to call to get VCF files.

The following Nextflow workflow use a divide-and-conquer strategy: if the region is hard to call - that is the process fails (here as an example in CALL_VARIANTS the region must have a length lower than 10bp) - the error is ignored (errorStrategy=ignore) , the region is divided into 10 smaller regions , and again, and again , until the caller is able complete the job.

Thanks Mahesh Binzer-Panchal for the help. Slack

Usage

nextflow run \
     -c .//workflow.cfg \
     -resume main.nf \
     --bams bams.list \
     --bed intervals.bed

main.nf


include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER1} from "./sub.nf"
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER2} from "./sub.nf"
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER3} from "./sub.nf"
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER4} from "./sub.nf"
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER5} from "./sub.nf"
include {DIVIDE_AND_CONQUER as DIVIDE_AND_CONQUER6} from "./sub.nf"


workflow {
	bam_ch = Channel.fromPath(params.bams).splitText().map{it.trim()}.map{[file(it),file(it+".bai")]}
	intervals1_ch = Channel.fromPath(params.bed).splitCsv(header:false,sep:'\t').map{it[0]+":"+((it[1] as int)+1)+"-"+it[2]}
  interval_and_bam1_ch = intervals1_ch.combine(bam_ch)
	
	merge_ch = Channel.empty()
	
	level1 = DIVIDE_AND_CONQUER1(interval_and_bam1_ch)
	merge_ch = merge_ch.mix(level1.ok)

	level2 = DIVIDE_AND_CONQUER2(level1.failed)
	merge_ch = merge_ch.mix(level2.ok)

	level3 = DIVIDE_AND_CONQUER3(level2.failed)
	merge_ch = merge_ch.mix(level3.ok)

	level4 = DIVIDE_AND_CONQUER4(level3.failed)
	merge_ch = merge_ch.mix(level4.ok)

	level5 = DIVIDE_AND_CONQUER5(level4.failed)
	merge_ch = merge_ch.mix(level5.ok)

	level6 = DIVIDE_AND_CONQUER6(level5.failed)
	merge_ch = merge_ch.mix(level6.ok)


	MERGE(merge_ch.map{it[3]}.collect())

	level6.failed.view{"${it} cannot be called"}

}

process MERGE {
input:
	path("VCF/*")
script:
"""
find VCF -name "*.vcf" > concat.list
"""
}

sub.nf

include {CALL_VARIANTS} from "./part.nf"

def splitIntervals(interval,bam,bai) {
	int colon =  interval.indexOf(":");
	int hyphen = interval.indexOf('-',colon+1);
	String chrom =  interval.substring(0,colon);
	int start =  (interval.substring(colon+1,hyphen) as int)-1;
	int end =    (interval.substring(hyphen+1) as int);
	int length = end-start;
	int l2 = Math.max(1,(int)(length/10));
	def L = [];
	if(l2>=length) {System.err.println("Skipping ${interval}") ; return L;}
	while(start<end) {
		L.add([chrom+":"+(start+1)+"-"+Math.min(start+l2,end),bam,bai]);
		start+=l2;
		}
	return L;
	}


workflow DIVIDE_AND_CONQUER {
	take:
		interval_and_bam_ch // tuple [ interval, bam, bai ]
	main:
		
		sentinel_ch = interval_and_bam_ch.map{[ [it[0],it[1].toRealPath(),it[2].toRealPath()] ,file("NO_VCF")]}
        	call_ch = CALL_VARIANTS(interval_and_bam_ch)

        	call_ch.output.map{[ [it[0],it[1].toRealPath(), it[2].toRealPath()], it[3]]}.
			mix(sentinel_ch).
			groupTuple().
        	        branch {
                	        failed : it[1].size()==1
                        	ok: true
	                }.set{result_ch}

		ok_ch = result_ch.ok.map{[it[0][0], it[0][1], it[0][2], it[1].find{T->!T.endsWith("NO_VCF")} ]}
		failed1_ch = result_ch.failed.map{[it[0][0], it[0][1], it[0][2]]} 

		failed2_ch = failed1_ch.flatMap{splitIntervals(it[0],it[1],it[2])}
	emit:
		ok = ok_ch
		failed= failed2_ch
	} 

part.nf

process CALL_VARIANTS {
tag "${interval} ${bam.name} ${bai.name}"
executor "local"
input:
	tuple val(interval),path(bam),path(bai)
output:
	tuple val(interval),path(bam),path(bai),path("*.vcf"),emit:output
script:

	def colon =  interval.indexOf(":");
	def hyphen = interval.indexOf('-',colon+1);
	def chrom =  interval.substring(0,colon);
	def start =  (interval.substring(colon+1,hyphen) as int)-1;
	def end =    (interval.substring(hyphen+1) as int);
	def len = end-start;

"""


echo "${len}" | awk '(int(\$1)<=10) {print "OK";}'| grep OK && echo '${chrom}\t${start}\t${end}' > ${chrom}_${start+1}_${end}.${bam.simpleName}.vcf
"""
}

workflow.cfg

params {
	bams = "NO_FILE"
  bed = "NO_FILE"
	}


process {

withName : "CALL_VARIANTS" {
	errorStrategy  = "ignore"
        }

}

workflow.png

Pierre Lindenbaum

2 Likes