OK I feel very silly.
The problem is the mix
operator. mix
essentially appends the 2nd channel onto the first, creating a new channel in the process. Then we do the groupTuple to reduce the channel to the sample items. The problem is a channel order isn’t guaranteed, by it’s nature Nextflow is asynchronous. Sometimes, we are grouping channel [a, b, c]
and sometimes we are grouping [a, c, b]
.
So what’s the solution? Well what we are trying to do is essentially a left join on channel items. Luckily for us, Nextflow comes with a join
operator. Here’s an example of using it with your channel above:
workflow {
REMOVE_UNMAPPED_READS_out_bam = Channel.of(
[[id:'test1_replicate1_signal', sample:'test1', replicate:'replicate1', type:'signal', single_end:false], 'test1_replicate1_signal_mapped.bam'],
[[id:'test1_replicate1_background', sample:'test1', replicate:'replicate1', type:'background', single_end:false], 'test1_replicate1_background_mapped.bam']
)
CREATEREADNUM_out_readnum = Channel.of(
[[id:'test1_replicate1_background', sample:'test1', replicate:'replicate1', type:'background', single_end:false], 'test1_replicate1_background.clip.peakClusters.bed'],
[[id:'test1_replicate1_signal', sample:'test1', replicate:'replicate1', type:'signal', single_end:false], 'test1_replicate1_signal.clip.peakClusters.bed']
)
CLIPPER_out_bed = Channel.of(
[[id:'test1_replicate1_signal', sample:'test1', replicate:'replicate1', type:'signal', single_end:false], 'test1_replicate1_signal.readnum.txt'],
[[id:'test1_replicate1_background', sample:'test1', replicate:'replicate1', type:'background', single_end:false], 'test1_replicate1_background.readnum.txt']
)
REMOVE_UNMAPPED_READS_out_bam
.join(CREATEREADNUM_out_readnum)
.join(CLIPPER_out_bed)
.view()
}
The result looks like this:
[[id:test1_replicate1_signal, sample:test1, replicate:replicate1, type:signal, single_end:false], test1_replicate1_signal_mapped.bam, test1_replicate1_signal.clip.peakClusters.bed, test1_replicate1_signal.readnum.txt]
[[id:test1_replicate1_background, sample:test1, replicate:replicate1, type:background, single_end:false], test1_replicate1_background_mapped.bam, test1_replicate1_background.clip.peakClusters.bed, test1_replicate1_background.readnum.txt]
I think in the original example you wanted to group on sample and replicate. Well after we’ve performed the join, it becomes relatively simple to do a groupTuple afterwards. I’ve added an additional sample and replicate for sample test1
here to make it more realistic, sorry for the wall of code:
workflow {
REMOVE_UNMAPPED_READS_out_bam = Channel.of(
[[id:'test1_replicate1_signal', sample:'test1', replicate:'replicate1', type:'signal', single_end:false], 'test1_replicate1_signal_mapped.bam'],
[[id:'test1_replicate1_background', sample:'test1', replicate:'replicate1', type:'background', single_end:false], 'test1_replicate1_background_mapped.bam'],
[[id:'test1_replicate2_signal', sample:'test1', replicate:'replicate2', type:'signal', single_end:false], 'test1_replicate2_signal_mapped.bam'],
[[id:'test1_replicate2_background', sample:'test1', replicate:'replicate2', type:'background', single_end:false], 'test1_replicate2_background_mapped.bam'],
[[id:'test2_replicate1_signal', sample:'test2', replicate:'replicate1', type:'signal', single_end:false], 'test2_replicate1_signal_mapped.bam'],
[[id:'test2_replicate1_background', sample:'test2', replicate:'replicate1', type:'background', single_end:false], 'test2_replicate1_background_mapped.bam'],
)
CREATEREADNUM_out_readnum = Channel.of(
[[id:'test1_replicate1_background', sample:'test1', replicate:'replicate1', type:'background', single_end:false], 'test1_replicate1_background.clip.peakClusters.bed'],
[[id:'test1_replicate1_signal', sample:'test1', replicate:'replicate1', type:'signal', single_end:false], 'test1_replicate1_signal.clip.peakClusters.bed'],
[[id:'test1_replicate2_background', sample:'test1', replicate:'replicate2', type:'background', single_end:false], 'test1_replicate2_background.clip.peakClusters.bed'],
[[id:'test1_replicate2_signal', sample:'test1', replicate:'replicate2', type:'signal', single_end:false], 'test1_replicate2_signal.clip.peakClusters.bed'],
[[id:'test2_replicate1_background', sample:'test2', replicate:'replicate1', type:'background', single_end:false], 'test2_replicate1_background.clip.peakClusters.bed'],
[[id:'test2_replicate1_signal', sample:'test2', replicate:'replicate1', type:'signal', single_end:false], 'test2_replicate1_signal.clip.peakClusters.bed'],
)
CLIPPER_out_bed = Channel.of(
[[id:'test1_replicate1_signal', sample:'test1', replicate:'replicate1', type:'signal', single_end:false], 'test1_replicate1_signal.readnum.txt'],
[[id:'test1_replicate1_background', sample:'test1', replicate:'replicate1', type:'background', single_end:false], 'test1_replicate1_background.readnum.txt'],
[[id:'test1_replicate2_signal', sample:'test1', replicate:'replicate2', type:'signal', single_end:false], 'test1_replicate2_signal.readnum.txt'],
[[id:'test1_replicate2_background', sample:'test1', replicate:'replicate2', type:'background', single_end:false], 'test1_replicate2_background.readnum.txt'],
[[id:'test2_replicate1_signal', sample:'test2', replicate:'replicate1', type:'signal', single_end:false], 'test2_replicate1_signal.readnum.txt'],
[[id:'test2_replicate1_background', sample:'test2', replicate:'replicate1', type:'background', single_end:false], 'test2_replicate1_background.readnum.txt'],
)
joined_ch = REMOVE_UNMAPPED_READS_out_bam
.join(CREATEREADNUM_out_readnum)
.join(CLIPPER_out_bed)
joined_ch
.map { meta, bam, bed, readnum ->
tuple(
meta.subMap('sample', 'replicate'),
meta,
bam,
bed,
readnum
)
}
.groupTuple(remainder: true)
.map { sample_map, meta, bam, bed, readnum ->
tuple(
meta,
bam,
bed,
readnum
)
}
.view()
}
The output should be:
[ maps, bams, beds, reanums ]
[[[id:test1_replicate1_background, sample:test1, replicate:replicate1, type:background, single_end:false], [id:test1_replicate1_signal, sample:test1, replicate:replicate1, type:signal, single_end:false]], [test1_replicate1_background_mapped.bam, test1_replicate1_signal_mapped.bam], [test1_replicate1_background.clip.peakClusters.bed, test1_replicate1_signal.clip.peakClusters.bed], [test1_replicate1_background.readnum.txt, test1_replicate1_signal.readnum.txt]]
[[[id:test1_replicate2_background, sample:test1, replicate:replicate2, type:background, single_end:false], [id:test1_replicate2_signal, sample:test1, replicate:replicate2, type:signal, single_end:false]], [test1_replicate2_background_mapped.bam, test1_replicate2_signal_mapped.bam], [test1_replicate2_background.clip.peakClusters.bed, test1_replicate2_signal.clip.peakClusters.bed], [test1_replicate2_background.readnum.txt, test1_replicate2_signal.readnum.txt]]
[[[id:test2_replicate1_background, sample:test2, replicate:replicate1, type:background, single_end:false], [id:test2_replicate1_signal, sample:test2, replicate:replicate1, type:signal, single_end:false]], [test2_replicate1_background_mapped.bam, test2_replicate1_signal_mapped.bam], [test2_replicate1_background.clip.peakClusters.bed, test2_replicate1_signal.clip.peakClusters.bed], [test2_replicate1_background.readnum.txt, test2_replicate1_signal.readnum.txt]]