How to join with multiple files in emits?

Hi there,

I’ve three processes that perform variant calling.
I’d like to use their output to generate a consensus variant list.

At the moment, when I send the outputs to the consensus process files from different samples are sent, that is, for two of the variant callers file for sample A is sent, but for the third variant caller file for sample B is sent.

How do I do a join/check to ensure files for the sample are sent in the consensus process?

I run the variant callers as:

mutect2( ch_input.tumor,ch_input.normal )
manta(ch_input.tumor,ch_input.normal )
lancet(ch_input.tumor,ch_input.normal )

I’ve output from three callers as follows:

mutect2

output:
        
        tuple val(meta) , path("${meta.timepoint}.mutect2out.vcf"), emit: mutect_vcf
        tuple val(meta),  path("${meta.timepoint}.mutect2out_filtered.vcf"), emit: mutect_vcf_filtered
        tuple val(meta),  path("${meta.timepoint}.mutect2out_filtered.vcf.filteringStats.tsv"), emit: filtered_stats
        tuple val(meta), path("${meta.timepoint}.mutect2out.vcf.idx"), emit: mutect2_vcf_idx
        tuple val(meta), path("${meta.timepoint}.mutect2out.vcf.stats"), emit: mutect2_stats
        tuple val(meta), path("${meta.timepoint}.mutect2out_filtered.vcf.idx"), emit: filtered_vcf_idx

I’ve lancet output as:

output:
                tuple val(meta),path("${patient_id}.lancet.vcf" ), emit: lancet_file

I’ve manta-strelka output as:

output:
tuple val(meta),  path("manta/results/variants/candidateSmallIndels.vcf.gz"), emit: manta_small_indels_vcf
 tuple val(meta),  path("manta/results/variants/candidateSmallIndels.vcf.gz.tbi"), emit: manta_small_indels_vcf_tbi
 tuple val(meta),  path("manta/results/variants/candidateSV.vcf.gz"), emit: manta_candidateSV_vcf
 tuple val(meta),  path("manta/results/variants/candidateSV.vcf.gz.tbi"), emit: manta_candidateSV_vcf_tbi
 tuple val(meta),  path("manta/results/variants/diploidSV.vcf.gz"), emit: manta_diploidSV_vcf
 tuple val(meta),  path("manta/results/variants/diploidSV.vcf.gz.tbi"), emit: manta_diploidSV_vcf_tbi
 tuple val(meta),  path("manta/results/variants/somaticSV.vcf.gz"), emit: manta_somaticSV_vcf
 tuple val(meta),  path("manta/results/variants/somaticSV.vcf.gz.tbi"), emit: manta_somaticSV_vcf_tbi
tuple val(meta), path("manta/results/stats/alignmentStatsSummary.txt"), emit: manta_stats_align_stats_summary
tuple val(meta),  path("manta/results/stats/svCandidateGenerationStats.tsv")  , emit: manta_stats_svCandidates_stats_tsv
tuple val(meta),  path("manta/results/stats/svCandidateGenerationStats.xml")  , emit: manta_stats_svCandidates_stats_xml
tuple val(meta) ,path("manta/results/stats/svLocusGraphStats.tsv"), emit: manta_stats_svLocus_graph_stats_tsv
tuple val(meta),  path("strelka/results/variants/somatic.indels.vcf.gz") , emit: strelka_somatic_indels_vcf
tuple val(meta),  path("strelka/results/variants/somatic.indels.vcf.gz.tbi") , emit:strelka_somatic_indels_vcf_tbi
tuple val(meta),  path("strelka/results/variants/somatic.snvs.vcf.gz")  , emit: strelka_somatic_snvs_vcf
tuple val(meta), path("strelka/results/variants/somatic.snvs.vcf.gz.tbi"), emit: strelka_somatics_snvs_vcf_tbi
tuple val(meta),  path("strelka/results/stats/runStats.tsv"), emit: strelka_stats_tsv
tuple val(meta) , path("strelka/results/stats/runStats.xml"), emit: strelka_stats_xml

The consensus step’s input at the moment is as the following:

input:

tuple val(meta) , path(strelka_somatic_indels_vcf,stageAs: 'consensus_variants/*' )
tuple val(meta) , path(strelka_somatic_snvs_vcf, stageAs: 'consensus_variants/*') 
tuple val(meta) , path(mutect_vcf_filtered, stageAs: 'consensus_variants/*' ) 
tuple val(meta),  path(lancet_file, stageAs:'consensus_variants/*'  )

What do I do what to ensure that files from lancet, mutect2 and manta are sent for the same sample?

Thanks.

You have the metas with sample information. Group the output channel elements based on that and then feed the consensus process with this new grouped channel :smile:

How do I do that?
Please share example.

If you run the following code:

tuple val(meta), val(my_value)

  output:
  tuple val(meta), path('DO_SOMETHING_output.txt')

  script:
  """
  echo ${meta['some_field']} > DO_SOMETHING_output.txt
  """
}

process DO_SOMETHING_ELSE {
  input:
  tuple val(meta), val(my_value)

  output:
  tuple val(meta), path('DO_SOMETHING_ELSE_output.txt')

  script:
  """
  echo ${my_value} > DO_SOMETHING_ELSE_output.txt
  """
}

process LAST_STEP {
  debug true
  maxForks 1

  input:
  tuple val(meta1), path(output_f1)
  tuple val(meta2), path(output_f2)

  output:
  stdout

  script:
  """
  echo ${meta1} ${meta2}
  sleep 5
  """
}

workflow {
  Channel
    .of([['meta': 'SAMPLE001', 'some_field': 5], ['something_here']],
        [['meta': 'SAMPLE002', 'some_field': 5], ['something_here']],
        [['meta': 'SAMPLE003', 'some_field': 102], ['something_here']])
    .set { my_ch }
  DO_SOMETHING(my_ch)
  DO_SOMETHING_ELSE(my_ch)

  LAST_STEP(DO_SOMETHING.out, DO_SOMETHING_ELSE.out)
}

You should get an output similar to the one below:

N E X T F L O W  ~  version 23.10.1
Launching `main.nf` [focused_brattain] DSL2 - revision: 7c5056a178
executor >  local (3)
[ca/85734d] process > DO_SOMETHING (1)      [100%] 3 of 3, cached: 3 ✔
[27/8bf65c] process > DO_SOMETHING_ELSE (3) [100%] 3 of 3, cached: 3 ✔
[7a/6d5666] process > LAST_STEP (2)         [100%] 3 of 3 ✔
[meta:SAMPLE003, some_field:102] [meta:SAMPLE001, some_field:5]

[meta:SAMPLE002, some_field:5] [meta:SAMPLE002, some_field:5]

[meta:SAMPLE001, some_field:5] [meta:SAMPLE003, some_field:102]

As you can see, we have elements from different samples getting together to a process instance (task) which is not what we want. Now see the modified code below:

process DO_SOMETHING {
  input:
  tuple val(meta), val(my_value)

  output:
  tuple val(meta), path('DO_SOMETHING_output.txt')

  script:
  """
  echo ${meta['some_field']} > DO_SOMETHING_output.txt
  """
}

process DO_SOMETHING_ELSE {
  input:
  tuple val(meta), val(my_value)

  output:
  tuple val(meta), path('DO_SOMETHING_ELSE_output.txt')

  script:
  """
  echo ${my_value} > DO_SOMETHING_ELSE_output.txt
  """
}

process LAST_STEP {
  debug true
  maxForks 1

  input:
  tuple val(meta), path(files)

  output:
  stdout

  script:
  """
  echo ${meta}
  cat ${files[0]}
  cat ${files[1]}
  sleep 2
  """
}

workflow {
  Channel
    .of([['meta': 'SAMPLE001', 'some_field': 3], ['something_here 001']],
        [['meta': 'SAMPLE002', 'some_field': 5], ['something_here 002']],
        [['meta': 'SAMPLE003', 'some_field': 102], ['something_here 003']])
    .set { my_ch }
  DO_SOMETHING(my_ch)
  DO_SOMETHING_ELSE(my_ch)
  DO_SOMETHING.out
    .mix(DO_SOMETHING_ELSE.out)
    .groupTuple()
    .set { my_new_ch }
  LAST_STEP(my_new_ch)
}

You will see the following output (a bit verbose, but at least you see the content of files and can confirm everything is OK):

N E X T F L O W  ~  version 23.10.1
Launching `main.nf` [extravagant_torvalds] DSL2 - revision: 7579411748
executor >  local (5)
[e4/93faa8] process > DO_SOMETHING (1)      [100%] 3 of 3, cached: 2 ✔
[77/6abd1c] process > DO_SOMETHING_ELSE (1) [100%] 3 of 3, cached: 2 ✔
[ce/0597e4] process > LAST_STEP (2)         [100%] 3 of 3 ✔
[meta:SAMPLE001, some_field:3]
3
[something_here 001]

[meta:SAMPLE003, some_field:102]
[something_here 003]
102

[meta:SAMPLE002, some_field:5]
[something_here 002]
5

@mribeirodantas
Thank you.

When I try to run on my side, I get error as:

Multi-channel output cannot be applied to operator mix for which argument is already provided

Code I used is:

  mutect2.out
    .mix(manta.out)
    .groupTuple()
    .set { my_new_ch }

I’ve multiple files from different processes.

Multi-channel output is when a process or channel operator produces as output multiple channels. When you use another operator such as mix to combine this and another channel, Nextflow tells you to pick one channel from this multi-channel output. That’s what is missing for you to make it work.

Instead of mutect2.out, use mutect2.out.mutect_vcf instead.

I really think you can benefit a lot from having a look at the fundamentals training here. There’s even an online virtual machine through Gitpod for you to play with the content/exercises, without having to install/configure/run in your own machine :wink:

@mribeirodantas

Do I have to mix with each file separately?

mutect2.out.mutect_vcf
.mix(manta.out.file1)
.mix(manta.out.file2)
.mix(manta.out.file3)
.groupTuple()
    .set { my_new_ch }

Would this be a correct way?

Thanks for sharing link. I’ve checked and gone through the training documents/links few times. Unfortunately, the data type, structure, challenges I’m running into aren’t covered. Thus, I reach out on this forum.

You can pass all the other channels as arguments to a single mix call. See the example below :wink:

Channel
  .of(1..10)
  .set { ch1 }
Channel
  .of(11..20)
  .set { ch2 }
Channel
  .of(21..30)
  .set { ch3 }

ch1.mix(ch2, ch3).view()

The output:

Hey, @complexgenome. Did the solutions above solve the problem described in this post?

@mribeirodantas
Unfortunately, it didn’t help.

I wrote the following code which works:

mutect2.out.mutect_vcf_filtered.map { meta,filtered_vcf -> [ "${meta.timepoint}", meta,filtered_vcf  ] }
    .join(lancet.out.lancet_file.map { meta, lancet_file -> [ "${meta.timepoint}", meta ,lancet_file ] }, failOnMismatch:true, failOnDuplicate:true)
	.join(manta.out.strelka_somatic_indels_vcf.map{meta, strelka_indels-> [ "${meta.timepoint}", meta,strelka_indels ] } , failOnMismatch:true, failOnDuplicate:true)
	.join(manta.out.strelka_somatic_snvs_vcf.map{meta, strelka_snvs->["${meta.timepoint}", meta,strelka_snvs] }, failOnMismatch:true, failOnDuplicate:true)
    .multiMap { pid, meta1, filtered_vcf, meta2, lancet_file , meta3, strelka_indels,meta4, strelka_snvs->
    mutect2_vcf: [ meta1, filtered_vcf ]
    lancet_vcf: [ meta2, lancet_file ]
	strelka_indels_vcf:[meta3,strelka_indels] 
	strelka_snvs_vcf:[meta3,strelka_snvs] 
    }.set { ch_joined_callers }

If you could help what’s multiMap doing with pid and where does pid come from that would be helpful. I got some help from the nextflow slack channel.

Sadly, the nextflow documentation doesn’t help due to the complex nature of the pipeline I’ve. https://www.nextflow.io/docs/latest/operator.html#multimap

Thanks for sharing the updates. Based on what I know about join, I would expect pid to be the key through which the operator joined the channel elements. Check the example below, from the docs:

left  = Channel.of(['X', 1], ['Y', 2], ['Z', 3], ['P', 7])
right = Channel.of(['Z', 6], ['Y', 5], ['X', 4])
left.join(right).view()

Output

[Z, 3, 6]
[Y, 2, 5]
[X, 1, 4]

In the example you shared, what’s the key mentioned?
Is there a default parameter?

Thank you for your reply.

By default, the first item is the key. In the example above, Z, Y and X.


From here.

Great, thank you.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.