I’m currently migrating my pipeline from Snakemake to Nextflow. One feature I’m struggling to replicate is partial workflow execution. my goal is resuming a pipeline where some upstream tasks failed or were skipped, without re-triggering the entire graph.
The Scenario (simplified, see code blow): I have the workflow: A → B → C
Process B is long-running and some tasks may fail
If I re-run the pipeline, I want to skip all successfully completed B tasks and proceed to C for those specific samples.
If I try to manually “short-circuit” the flow by feeding publishDir outputs back into the workflow via if statements, the cache is invalidated. Nextflow treats these as new inputs, re-running B even though the data is identical to the previous run’s work/ directory.
In Snakemake, the file-presence check handles this naturally combined with the named rules.
Hope someone can help
Thanks,
Christoffer
process A {
output:
path("A/*")
script:
"""
mkdir -p A
touch A/{1..100}.txt
sleep 5
"""
}
process B {
errorStrategy 'ignore'
tag "${id}"
input:
tuple val(id), path('input.txt')
output:
tuple val(id), path("B/${id}.txt")
script:
"""
mkdir -p B
cp input.txt B/${id}.txt
sleep 5
"""
}
process C {
errorStrategy 'ignore'
tag "${id}"
input:
tuple val(id), path('input.txt')
output:
tuple val(id), path("C/${id}.txt")
script:
"""
mkdir -p C
cp input.txt C/${id}.txt
sleep 5
"""
}
workflow {
Afiles = A().flatten().map {
file ->
return [file.baseName, file]
}
Bfiles = B(Afiles)
Cfiles = C(Bfiles)
}
I think that what you’re looking for is -resume. See the docs:
No need for any special workflow logic, just run the same nextflow run command again and tack on -resume and Nextflow should automatically pick up the cached results from all successfully completed tasks in the previous run.
If you forget to use the flag so that the last run is now only 3 seconds long, you can use nextflow log to list all executions, then -resume <run_name> to resume a specific pipeline execution.
I am already using -resume but it does not quite solve the usecase i am looking at.
For instance if B represents a long calculation with 50% success rate and C a large post processing step that has a script for processing as an inputs. I then make a change in my postprocessing tool and want to re-postprocess. When i then run my pipeline, even with -resume, the pipeline will try calculate all of my failed B results again.
If i start wrapping steps in conditionals it gets even worse since the published results are not recognized as the results of previous steps. So even if A→B finished. if i then with conditionals run B→C with the published A results it is not recognized as the same input and all B jobs will rerun.
Yes, Nextflow cares about tasks and work directories - once files are published via publishDir then they are effectively out of the DAG and no longer tracked (you can trick it if you’re careful, but that way madness lies).
So to recap: It sounds like Nextflow is doing what I’d expect it to do (reuse results from successfully completed tasks), but you would like it to skip over the failed tasks from B instead of attempting them again, is that right?
We’re getting a bit into the weeds here and there are many possible solutions. But one idea: could you capture the errors and basically report them as successful tasks, then have workflow logic to only proceed to downstream based on output filenames etc? I guess it depends a little on what the processes are doing.
What you’re coming up against is a fairly fundamental difference between Nextflow and Snakemake though. Nextflow pipelines are really good at being super reproducible and trustworthy as a whole unit, so they really resist doing the kind of middle-of-workflow-execution meddling that you’re trying to do. That’s an intentional design decision.
I think i managed to find a 90% solution for my usecase.
Instead of creating a channel from the results in the publishDir i filter the channel based on the files that are there.
In my workflow i have added param.base which i then check in my workflow. If then i want to run only from B i filter based on what has already completed successfully through B:
Conceptually it is quite different to what i had in Snakemake, but from my initial testing the workflow is more reproducible and evaluates a lot faster than what i had. So i am happy
OK I managed it. Like Phil says, we’re in anti-pattern territory here, but it’s possible.
The strategy:
Run the pipeline with ignore to go to completion
Write results from B() to results/B directory
Continue to C()…
On run 2, check the results/B directory for files and make a list of files by name
Compare the output of A()and either:
select the file if the output of A() is not in the results/B directory (i.e. it failed on run 1).
select the file if results/B is empty.
Pass this channel to B()
Code:
process A {
output:
path("A/*")
script:
"""
mkdir -p A
touch A/{1..5}.txt
sleep 1
"""
}
process B {
errorStrategy 'ignore'
tag "${id}"
input:
tuple val(id), path('input.txt')
output:
tuple val(id), path("B/${id}.txt")
script:
"""
mkdir -p B
cp input.txt B/${id}.txt
sleep 1
exit ${id.toInteger() % 2}
"""
}
process C {
errorStrategy 'ignore'
tag "${id}"
input:
tuple val(id), path('input.txt')
output:
tuple val(id), path("C/${id}.txt")
script:
"""
mkdir -p C
cp input.txt C/${id}.txt
sleep 1
"""
}
workflow {
Afiles = A().flatten().map {
file ->
return [file.baseName, file]
}
// Get IDs of files that already have successful B results
successful_ids = channel.fromPath("results/B/*.txt", checkIfExists: false)
.map { file -> file.baseName }
.collect()
.map { it -> [it] } // Wrap in list to prevent spreading
.ifEmpty([[]])
// First run: process all A files. Subsequent runs: only process files with existing results
B_input = Afiles.combine(successful_ids)
.filter { id, a_file, existing_ids ->
existing_ids.isEmpty() || existing_ids.any { it == id }
}
.map { id, a_file, existing_ids -> [id, a_file] }
Bfiles = B(B_input)
Cfiles = C(Bfiles)
outfiles = Bfiles.mix(Cfiles)
publish:
outfiles = outfiles
}
output {
outfiles {
path "."
}
}
Run 1:
remember to rm -rf results otherwise this doesn’t make sense
> rm -rf results && nextflow run .
Nextflow 25.10.3 is available - Please consider updating your version to it
N E X T F L O W ~ version 25.10.2
Launching `./main.nf` [peaceful_wiles] DSL2 - revision: 309b52844a
[c7/599fad] Submitted process > A
[1b/bfe1d1] Submitted process > B (4)
[35/a3ef87] Submitted process > B (3)
[7f/7ad079] Submitted process > B (5)
[f9/552ab6] Submitted process > B (2)
[78/6e7ae2] Submitted process > B (1)
[7f/7ad079] NOTE: Process `B (5)` terminated with an error exit status (1) -- Error is ignored
[78/6e7ae2] NOTE: Process `B (1)` terminated with an error exit status (1) -- Error is ignored
[35/a3ef87] NOTE: Process `B (3)` terminated with an error exit status (1) -- Error is ignored
[f3/f5be09] Submitted process > C (2)
[0f/214cba] Submitted process > C (4)
Run 2:
> nextflow run .
Nextflow 25.10.3 is available - Please consider updating your version to it
N E X T F L O W ~ version 25.10.2
Launching `./main.nf` [magical_shannon] DSL2 - revision: 309b52844a
[71/f57c23] Submitted process > A
[5e/ce5f23] Submitted process > B (2)
[f2/67b937] Submitted process > B (4)
[c1/45f3c0] Submitted process > C (2)
[34/1abbd8] Submitted process > C (4)