Hello,
I’m trying to set up a series of subworkflows that each have a different process associated with them. 1) Download files if they aren’t present in my working directory and 2) Run a script on the downloaded files (such as a multisample seurat R script). I have subworkflow1
that relies on channel1
(a channel from the entries in the csv file samplesheet
) calling the DOWNLOAD
process, and subworkflow2
that only relies on the file samplesheet
calling the RUN_SCRIPT
process.
My goal is to wait for all the instances of subworkflow1
before proceeding to subworkflow2
.
I understand how to accomplish the state dependency with just processes as demonstrated here, however I am currently stuck on how to transfer this into the subworkflow setting. When I run the pipeline, normally get the following error in the nextflow log:
Missing process or function or([[DataflowBroadcast around DataflowStream[?]]])
I’m sure it’s something simple, but I can’t find a clear explanation or an obvious place in the docs for a solution.
Below is an example of the pipeline I’m trying to accomplish this with.
process DOWNLOAD {
input:
tuple val(sample), val(url)
output:
val true
script:
"""
#!/bin/bash
# Download the data for the sample if it doesn't already exist
if [ -e "${sample}" ]; then
echo "Sample ${sample} already exists, skipping download"
exit 0
fi
wget --no-check-certificate --content-disposition --continue --${url}
"""
}
process RUN_SCRIPT {
input:
samplesheet
script:
"""
Rscript {$projectDir}/seurat_example.R -i {$samplesheet}
"""
}
workflow subworkflow1 {
take:
samples
main:
DOWNLOAD(samples)
emit:
state = DOWNLOAD.out.collect()
}
workflow subworkflow2 {
take:
samplesheet
main:
RUN_SCRIPT(samplesheet)
}
ch_samples <- Channel
.fromList(samplesheetToList(params.samplesheet, "${projectDir}/assets/schema_input.json"))
.multiMap { sample, url ->
samples: tuple(sample, url)
}
workflow {
subworkflow1(ch_samples) // I want to wait for this subworkflow to finish for all instances
subworkflow2(params.samplesheet)
}
Please let me know if you need further clarification and thank you for any help you can provide.
So you still need to have that state dependency between the upstream and downstream process. On top of that you need to collect
the channel from the upstream workflow and provide it as the state dependency to the downstream process.
You’re already halfway there – you collected the subworkflow1 output into the state
output. Now you just need to pass that channel as an additional input into the RUN_SCRIPT process.
I would also map the state dependency into a dummy value so that it doesn’t cause the downstream process to unstage any extra files:
emit:
state = DOWNLOAD.out.collect().map { true }
1 Like
It seems to me like you’re trying to achieve this a weird way around. I’m assuming your challenge is this:
- You have a samplesheet where each row has either sample data, or a URL to sample data but not both
- If you have the data, you can use it directly
- If you have the URL, you must download it then use it
- You would like to analyse all samples in the same manner
Currently, you are trying to:
- Take all the data, if data is already present skip that step
- If data is not present, download from the URL
- After all data has been downloaded, analyse together
The problem seems to be there is no dependency between 2 and 3, therefore your pipeline will eagerly start 3 before 1 and 2 have completed. What we need to do is tell Nextflow process 3 is dependent on 1 and 2 completing.
As Ben says, you can re-wire the pipeline to wait for 1 and 2 to complete, essentially providing a dummy output between the two steps. Simply provide a dummy input to process 3 which originates in process 2 and the pipeline will wait until it’s ready.
However, I think there’s a better way of actually wiring up your pipeline to use the DAG properly.
In the following example, I have made some changes to your pipeline:
- The
DOWNLOAD
process actually exports the data as a path
output, so Nextflow can handle the output file properly.
- The output of
RUN_SCRIPT
is captured as well. In this case I use stdout
since I don’t know what the output format is, but I will assume something gets printed to the logs. Ideally you would capture any files created here.
- Moved everything into the main workflow
- Used branch to separate out samples that have data or a URL.
DOWNLOAD
the samples that have a URL
- Ignore the samples that do not have a URL
- mix the samples that were downloaded in DOWNLOAD and already arrived
- collect them so all sample data gets analysed together in
RUN_SCRIPT
The important parts here are:
- Use channels to handle inputs/outputs
- Let Nextflow handle the paths and files within processes
- Use inputs/outputs to determine the order of processes.
include { samplesheetToList } from 'plugin/nf-schema'
process DOWNLOAD {
input:
val url
output:
path "*" // picks up any file created in the working dir
script:
"""
wget --no-check-certificate --content-disposition --continue ${url}
"""
}
process RUN_SCRIPT {
input:
path samples
output:
stdout
script:
"""
Rscript {$projectDir}/seurat_example.R -i {$samples}
"""
}
workflow subworkflow1 {
take:
samples
main:
DOWNLOAD(samples)
emit:
state = DOWNLOAD.out
}
workflow subworkflow2 {
take:
samples
main:
RUN_SCRIPT(samples)
}
workflow {
ch_samples = Channel
.fromList(samplesheetToList(params.samplesheet, "${projectDir}/assets/schema_input.json"))
.branch { sample, url ->
with_data: sample
return file(sample)
with_url: url
return url
}
subworkflow1(ch_samples.with_url)
ch_all_sample_data = ch_samples.with_data.mix(subworkflow1.out.state)
subworkflow2(ch_all_sample_data.collect())
}