How to wait for a subworkflow to complete before proceeding to another subworkflow?

Hello,

I’m trying to set up a series of subworkflows that each have a different process associated with them. 1) Download files if they aren’t present in my working directory and 2) Run a script on the downloaded files (such as a multisample seurat R script). I have subworkflow1 that relies on channel1 (a channel from the entries in the csv file samplesheet) calling the DOWNLOAD process, and subworkflow2 that only relies on the file samplesheet calling the RUN_SCRIPT process.

My goal is to wait for all the instances of subworkflow1 before proceeding to subworkflow2.

I understand how to accomplish the state dependency with just processes as demonstrated here, however I am currently stuck on how to transfer this into the subworkflow setting. When I run the pipeline, normally get the following error in the nextflow log:

Missing process or function or([[DataflowBroadcast around DataflowStream[?]]])

I’m sure it’s something simple, but I can’t find a clear explanation or an obvious place in the docs for a solution.

Below is an example of the pipeline I’m trying to accomplish this with.

process DOWNLOAD {
    input:
    tuple val(sample), val(url)

    output:
    val true

    script:
    """
    #!/bin/bash

    # Download the data for the sample if it doesn't already exist
    if [ -e "${sample}" ]; then
        echo "Sample ${sample} already exists, skipping download"
        exit 0
    fi

    wget --no-check-certificate --content-disposition --continue --${url}

    """
}

process RUN_SCRIPT {
    input:
    samplesheet

    script:
    """
    Rscript {$projectDir}/seurat_example.R -i {$samplesheet}
    """
}

workflow subworkflow1 {
    take:
    samples

    main:
    DOWNLOAD(samples)

    emit:
    state = DOWNLOAD.out.collect()
}

workflow subworkflow2 {
    take:
    samplesheet

    main:
    RUN_SCRIPT(samplesheet)
}

ch_samples <- Channel
        .fromList(samplesheetToList(params.samplesheet, "${projectDir}/assets/schema_input.json"))
        .multiMap { sample, url ->
            samples:      tuple(sample, url)
        }

workflow {
    subworkflow1(ch_samples)  // I want to wait for this subworkflow to finish for all instances
    subworkflow2(params.samplesheet)
}

Please let me know if you need further clarification and thank you for any help you can provide.

So you still need to have that state dependency between the upstream and downstream process. On top of that you need to collect the channel from the upstream workflow and provide it as the state dependency to the downstream process.

You’re already halfway there – you collected the subworkflow1 output into the state output. Now you just need to pass that channel as an additional input into the RUN_SCRIPT process.

I would also map the state dependency into a dummy value so that it doesn’t cause the downstream process to unstage any extra files:

    emit:
    state = DOWNLOAD.out.collect().map { true }
1 Like

It seems to me like you’re trying to achieve this a weird way around. I’m assuming your challenge is this:

  • You have a samplesheet where each row has either sample data, or a URL to sample data but not both
  • If you have the data, you can use it directly
  • If you have the URL, you must download it then use it
  • You would like to analyse all samples in the same manner

Currently, you are trying to:

  1. Take all the data, if data is already present skip that step
  2. If data is not present, download from the URL
  3. After all data has been downloaded, analyse together

The problem seems to be there is no dependency between 2 and 3, therefore your pipeline will eagerly start 3 before 1 and 2 have completed. What we need to do is tell Nextflow process 3 is dependent on 1 and 2 completing.

As Ben says, you can re-wire the pipeline to wait for 1 and 2 to complete, essentially providing a dummy output between the two steps. Simply provide a dummy input to process 3 which originates in process 2 and the pipeline will wait until it’s ready.

However, I think there’s a better way of actually wiring up your pipeline to use the DAG properly.

In the following example, I have made some changes to your pipeline:

  1. The DOWNLOAD process actually exports the data as a path output, so Nextflow can handle the output file properly.
  2. The output of RUN_SCRIPT is captured as well. In this case I use stdout since I don’t know what the output format is, but I will assume something gets printed to the logs. Ideally you would capture any files created here.
  3. Moved everything into the main workflow
  4. Used branch to separate out samples that have data or a URL.
  5. DOWNLOAD the samples that have a URL
  6. Ignore the samples that do not have a URL
  7. mix the samples that were downloaded in DOWNLOAD and already arrived
  8. collect them so all sample data gets analysed together in RUN_SCRIPT

The important parts here are:

  • Use channels to handle inputs/outputs
  • Let Nextflow handle the paths and files within processes
  • Use inputs/outputs to determine the order of processes.
include { samplesheetToList } from 'plugin/nf-schema'

process DOWNLOAD {
    input:
    val url

    output:
    path "*" // picks up any file created in the working dir

    script:
    """
    wget --no-check-certificate --content-disposition --continue ${url}
    """
}

process RUN_SCRIPT {
    input:
    path samples

    output:
    stdout

    script:
    """
    Rscript {$projectDir}/seurat_example.R -i {$samples}
    """
}

workflow subworkflow1 {
    take:
    samples

    main:
    DOWNLOAD(samples)

    emit:
    state = DOWNLOAD.out
}

workflow subworkflow2 {
    take:
    samples

    main:
    RUN_SCRIPT(samples)
}


workflow {
    ch_samples = Channel
        .fromList(samplesheetToList(params.samplesheet, "${projectDir}/assets/schema_input.json"))
        .branch { sample, url -> 
            with_data: sample
                return file(sample)
            with_url: url
                return url
        }


    subworkflow1(ch_samples.with_url)

    ch_all_sample_data = ch_samples.with_data.mix(subworkflow1.out.state)

    subworkflow2(ch_all_sample_data.collect())
}