How to wait until all files are processed at a task for next step?

Hello there,

I’ve a workflow for RNA seq analysis that contains many processes in it: fastp, align, fusion, mark duplicates, feature count, merge.

I’d like to perform merge after all files are processed through feature counts step.

main.nf

include { rna  } from './workflows/rna'
if (params.analysis=="rna"){
        rna() 
    }

rna.nf

include { align} from '../modules/rna/primary/star.nf'
include { arriba} from '../modules/rna/primary/arriba.nf'
include { fastp_rna} from '../modules/rna/primary/fastp_rna.nf'
include { featurecounts} from '../modules/rna/primary/feature.nf'
include { markduplicates} from '../modules/rna/primary/markduplicates.nf'

def createTupleOrString(fileString) {
  if (fileString == "NA") {
      return "NA"
  } else {
      return file(fileString)
  }
}

workflow rna {

def csvFile = params.input_csvFile //def csvFile = file($params.input_csvFile)

Channel.fromPath(csvFile).splitCsv(header:true)
        .branch { it ->
            rna: it.tissue == "rna" && it.sequencing_type == "rna"
            normal: it.tissue == "normal" && it.sequencing_type == "wes"
            tumor: it.tissue == "tumor" && it.sequencing_type == "wes"
        }
        .set { samples }

 fastp_rna (samples.rna)
        align(fastp_rna.out.reads_tumor)
        markduplicates(align.out.aligned_star)
        featurecounts(markduplicates.out.markduplicate_bam)
//wait for all the files/samples to be complete from featurecounts

do_merge() //how do I wait until all files are processed from featurecounts 
}

do_merge doesn’t require any output files from featurecounts per se.
do_merge will run a script, two input parameters - output folder of feature counts and output directory of the merge script.

Problem statement: I’d like to wait until all files are processed from feature counts and them run merge.

How do I achieve this?

Thanks.

As you mentioned, these two processes do not have data dependency. One solution to make them dependent on each other is to use what we call state dependency (simulate data dependency) together with a channel operator such as collect that will wait for all tasks from that process to run before sending it to the next channel operator or process in the chain. Read more about state dependency here, and wait for all tasks to finish here.

PS: Though you say the do_merge process doesn’t depend on the outputs from the previous process per se, I still think that in the Nextflow way of handling this situation it does. You should provide the feature counts file(s) as output of the process, so that they will be symbolic linked to the task folder of the do_merge process task. This way, the path you will provide is the current directory (or something similar, depending on how you will declare the outputs). Doing it this way, means you just have to add a collect and adapt do_merge accordingly.

@mribeirodantas Yes, thank you for your valuable input.

I tried following code using collect that ends in error as:

Missing process or function collect([DataflowStream[?], DataflowStream[?]])

rna.nf

featurecounts(markduplicates.out.markduplicate_bam) \
        | collect \
        | merge_feature

merge_feature process will eventually read from the output folder search for the feature counts outputs. There’s a python script in it, hard coded to read the output structure.

merge_feature.nf


process merge_feature {
    
        debug true
        errorStrategy 'retry'
    maxRetries 2
publishDir path: "${params.outdir}//secondary_RNA/merged_featurecounts/", mode: 'copy' 

output:
 path("*.{csv}")

script:

"""
Rscript /data1/software/Rscripts/Daphni2_scripts/RNA_Merge_NextFlow.R $params.outdir ./

"""

}

workflow.onComplete { 
        log.info ( workflow.success ? "Done merge secondary RNA!" : "Oops .. something went wrong in merge secondary RNA")
}

The feature process looks like:


process featurecounts {

    maxForks 10
        debug true
        errorStrategy 'retry'
    maxRetries 2
publishDir path: "${params.outdir}/${batch}/${timepoint}/RNA/primary/featurecounts/", mode: 'copy' 

input:
tuple val(batch),val(patient_id_tumor),val(timepoint), path(markdupli_bam, stageAs: 'arriba/*')

output:
tuple val(batch),val(patient_id_tumor),val(timepoint), path("subreadout.fc.txt"), emit: foldchange
tuple val(batch),val(patient_id_tumor),val(timepoint), path("subreadout.fc.txt.summary"), emit: foldchange_summary

    script:
     //patient_id=patient_id_tumor.split('_T')[0]
    """
    /data1/software/subread-2.0.6-Linux-x86_64/bin/featureCounts -a $params.gtf_annotation_file -T 24 -o "subreadout.fc.txt" -p ${markdupli_bam}
    """
}

workflow.onComplete { 
        log.info ( workflow.success ? "Done rna feature count primary RNA!" : "Oops .. something went wrong in feature count primary RNA")
}

I’m using nextflow version 23.10.0

I do not understand what the nextflow is expecting as the error message is mysterious. The links shared use collect and I’m trying to do the same thing.

Hi @mribeirodantas

Can you please look into this?

I am badly stuck with collect. I’ve tried different versions:

featurecounts(markduplicates.out.markduplicate_bam) | collect | merge_feature()

featurecounts(markduplicates.out.markduplicate_bam).collect().merge_feature()

Yet same error: Missing process or function collect([DataflowStream[?], DataflowStream[?]])

I do not know what does this mean.

I believe the cause of this is that your process has a multi-channel output and collect doesn’t know what to do with it. That’s what the error message is saying. You should apply the collect channel operator to each channel and pass it to the next process.

Hi, @complexgenome. Let’s continue with the snippet below.

process FOO {
  input:
  val x

  output:
  val y

  script:
  y = x+1
  """
  """
}

workflow {
  FOO(1)
  | collect
  | view
}

The snippet above works fine, as there’s a single output channel from process FOO. If I make the FOO process have two output channels, like in the snippet below, I will run into the error you’re facing.

process FOO {
  input:
  val x

  output:
  val y
  val z

  script:
  y = x+1
  z = y+1
  """
  """
}

workflow {
  FOO(1)
  | collect
  | view
}

As you can see here:

N E X T F L O W  ~  version 23.10.1
Launching `main.nf` [deadly_gilbert] DSL2 - revision: e00f73ca55
Missing process or function collect([DataflowVariable(value=null), DataflowVariable(value=null)])

 -- Check script 'main.nf' at line: 17 or see '.nextflow.log' file for more details

This happens because the collect channel operator is built to work on one channel, so there’s a missing process or function named collect that operates over two channels :wink:

You will find a solution to collect both channel outputs in the snippet below:

process FOO {
  input:
  val x

  output:
  val y
  val z

  script:
  y = x+1
  z = y+1
  """
  """
}

workflow {
  FOO(1)

  FOO.out[0]
  | collect
  | set { y_ch }

  FOO.out[1]
  | collect
  | set { z_ch }

  y_ch.view()
  z_ch.view()
}

You can also use the emit keyword to call the output channels by a name, instead of indices ([0], [1], …).

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.