How to use collect on two process and do a join?

Hi There,

I’ve an input CSV file with trios, sometimes duos. Example in the end.
At the moment I send fastq data without any pairing (normal/tumor).

There’s a step (applybqsr) that requires inputs from two different previous steps. Since the nature of processes in nextflow is asynchronous the files for this applybqsr step ends up with different files from different samples, as to whichever gets complete first.


Channel.fromPath(csvFile)
                .splitCsv(header: true).map { it ->
            [
                it.subMap("batch", "timepoint", "tissue", "sequencing_type"),
                [
                    file(it.fastq_1),
                    file(it.fastq_2)
                ]
            ]
        }
        .branch { meta, fastq ->
            rna: meta.sequencing_type == "rna"
            wes: meta.sequencing_type == "wes"
            other: true
        }
        .set { samples }

I’ve loads of processes.

FASTP(samples.wes)
align_bwa_mem(FASTP.out.reads) 	
fixmate(align_bwa_mem.out.sorted_bams)
gatk_markduplicates(fixmate.out.sorted_fixmate_bams)
setupnmdtags(gatk_markduplicates.out.sorted_bam_mark_duplicates)
recalibrate_bam(setupnmdtags.out.setupnmdtags_bam)

I take input and output using meta, for example in FASTP:

process FASTP {
	conda '/data1/software/miniconda/envs/MMRADAR/'
	tag "$meta.timepoint"
	maxForks 5
	debug true
	errorStrategy 'retry'
    maxRetries 2
 label 'low_mem'

	publishDir path: "${params.outdir}/${meta.batch}/${meta.timepoint}/WES/primary/fastp/${meta.tissue}/", mode: 'copy'

    input:	
	 tuple val(meta), path(fastq)
    
	output:
	tuple val(meta), path("${patient_id}_trim_{1,2}.fq.gz"), emit: reads
	path("${patient_id}.fastp.json"), emit: json
	path("${patient_id}.fastp.html"), emit: html
		
    script:
	
	if(meta.tissue =="normal"){
		patient_id=meta.timepoint+"_N"
	}
	if(meta.tissue =="tumor"){
		patient_id=meta.timepoint+"_T"
	}

    """

fastp  --in1 "${r1}" --in2 "${r2}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${patient_id}_trim_1.fq.gz" \
--out2 "${patient_id}_trim_2.fq.gz" --json "${patient_id}.fastp.json" \
--html "${patient_id}.fastp.html" --thread 10

   """
}

Similarly for gatk recalibrate bam as:

	input:
	tuple val(meta), path(setupnmd_bam , stageAs: 'recalb_bam/*')

	output:	
	tuple val(meta), path("${patient_id}_table.recal"), emit: recalib_table

I’d like to wait until the processes from GATK markduplicate and recalibrate are over, as if not the below join may not happen.

How do I wait using collect until both the processes are complete?

Next steps on collected files:

gatk_markduplicates.out.sorted_bam.map { meta, bam -> [ meta.patient_id, meta, bam ] }
    .join(recalibrate_bam.out.recalib_table.map { meta, rtable -> [ meta.patient_id, meta, rtable ] }, failOnMismatch:true, failOnDuplicate:true)
    .multiMap { pid, meta1, bam, meta2, rtable ->
        bam:    [ meta1, bam ]
        rtable: [ meta2, rtable ]
    }.set { ch_to_bqsr }
applybqsr(ch_to_bqsr.bam, ch_to_bqsr.rtable)

Example data:

batch timepoint tissue sequencing_type fastq_1 fastq_2
SEMA-MM-001 MM-3309-T-01 normal wes /data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-N-01-01_L001_R1_001.fastq.gz /data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-N-01-01_L001_R2_001.fastq.gz
SEMA-MM-001 MM-3309-T-01 tumor wes /data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-T-01-01_L001_R1_001.fastq.gz /data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-T-01-01_L001_R2_001.fastq.gz
SEMA-MM-001 MM-3309-T-01 rna rna /data1/raw_data/RNA/sema4/SEMA-MM-001RNA/MM-3309-RNA-T-01-01_L002_R1_001.fastq.gz /data1/raw_data/RNA/sema4/SEMA-MM-001RNA/MM-3309-RNA-T-01-01_L002_R2_001.fastq.gz
SEMA-MM-001 MM-0489-T-01 normal wes /data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-0489-DNA-N-01-01_L001_R1_001.fastq.gz /data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-0489-DNA-N-01-01_L001_R2_001.fastq.gz
SEMA-MM-001 MM-0489-T-01 tumor wes /data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-0489-DNA-T-01-01_L001_R1_001.fastq.gz /data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-0489-DNA-T-01-01_L001_R2_001.fastq.gz
SEMA-MM-001 MM-0489-T-01 rna rna /data1/raw_data/RNA/sema4/SEMA-MM-001RNA/MM-0489-RNA-T-01-01_L002_R1_001.fastq.gz /data1/raw_data/RNA/sema4/SEMA-MM-001RNA/MM-0489-RNA-T-01-01_L002_R2_001.fastq.gz

Attached input CSV file.
long_format_data.csv (11.8 KB)

Thanks,

That’s actually a side effect from collect. The collect channel operator when applied on an output channel will only return the “collected” channel when the process is over with all the elements queued up in the output channel.

See more here.

@mribeirodantas
Sorry, I don’t quite follow.

I’m interested to wait for two different processes to complete. I’d like to use collect for that purpose.

Check the snippet below. Even when the second process has had all its tasks finished, the third one will only start when both output channels are full.

process FOO {
  maxForks 1

  input:
  val x

  output:
  path 'foo.txt'

  script:
  """
  sleep 1
  echo ${x} > foo.txt
  """
}

process BAR {
  maxForks 1

  input:
  val x

  output:
  path 'bar.txt'

  script:
  """
  sleep 1
  echo ${x} > bar.txt
  """
}

process ZZZ {
  debug true

  input:
  val x
  val y

  output:
  stdout

  script:
  """
  echo ${x} and ${y}
  """
}

workflow {
  Channel
    .of(1..10)
    .set { my_ch1 }
  Channel
    .of('a'..'g')
    .set { my_ch2 }

  foo_collected = FOO(my_ch1).collect()
  bar_collected = BAR(my_ch2).collect()
  ZZZ(foo_collected, bar_collected)
}