Handling Partial Pipeline Execution - How to skip successful upstream tasks without breaking cache

Hi everyone,

I’m currently migrating my pipeline from Snakemake to Nextflow. One feature I’m struggling to replicate is partial workflow execution. my goal is resuming a pipeline where some upstream tasks failed or were skipped, without re-triggering the entire graph.

The Scenario (simplified, see code blow): I have the workflow: A → B → C

  1. Process B is long-running and some tasks may fail

  2. If I re-run the pipeline, I want to skip all successfully completed B tasks and proceed to C for those specific samples.

  3. If I try to manually “short-circuit” the flow by feeding publishDir outputs back into the workflow via if statements, the cache is invalidated. Nextflow treats these as new inputs, re-running B even though the data is identical to the previous run’s work/ directory.

In Snakemake, the file-presence check handles this naturally combined with the named rules.

Hope someone can help :slight_smile:

Thanks,

Christoffer

process A {
    output:
    path("A/*")

    script:
    """
    mkdir -p A
    touch A/{1..100}.txt
    sleep 5
    """
}

process B {
    errorStrategy 'ignore'
    tag "${id}"
    input:
    tuple val(id), path('input.txt')

    output:
    tuple val(id), path("B/${id}.txt")

    script:
    """
    mkdir -p B
    cp input.txt B/${id}.txt
    sleep 5
    """
}

process C {
    errorStrategy 'ignore'
    tag "${id}"
    input:
    tuple val(id), path('input.txt')

    output:
    tuple val(id), path("C/${id}.txt")

    script:
    """
    mkdir -p C
    cp input.txt C/${id}.txt
    sleep 5
    """
}

workflow {
    Afiles = A().flatten().map {
        file ->
        return [file.baseName, file]
    }
    Bfiles = B(Afiles)
    Cfiles = C(Bfiles)
}

Hi @COASD, welcome to the community :slight_smile:

I think that what you’re looking for is -resume. See the docs:

No need for any special workflow logic, just run the same nextflow run command again and tack on -resume and Nextflow should automatically pick up the cached results from all successfully completed tasks in the previous run.

If you forget to use the flag so that the last run is now only 3 seconds long, you can use nextflow log to list all executions, then -resume <run_name> to resume a specific pipeline execution.

Hope that helps!

Phil

Thanks for the quick reply,

I am already using -resume but it does not quite solve the usecase i am looking at.

For instance if B represents a long calculation with 50% success rate and C a large post processing step that has a script for processing as an inputs. I then make a change in my postprocessing tool and want to re-postprocess. When i then run my pipeline, even with -resume, the pipeline will try calculate all of my failed B results again.

If i start wrapping steps in conditionals it gets even worse since the published results are not recognized as the results of previous steps. So even if A→B finished. if i then with conditionals run B→C with the published A results it is not recognized as the same input and all B jobs will rerun.

This is what i would like to prevent.

Yes, Nextflow cares about tasks and work directories - once files are published via publishDir then they are effectively out of the DAG and no longer tracked (you can trick it if you’re careful, but that way madness lies).

So to recap: It sounds like Nextflow is doing what I’d expect it to do (reuse results from successfully completed tasks), but you would like it to skip over the failed tasks from B instead of attempting them again, is that right?

We’re getting a bit into the weeds here and there are many possible solutions. But one idea: could you capture the errors and basically report them as successful tasks, then have workflow logic to only proceed to downstream based on output filenames etc? I guess it depends a little on what the processes are doing.

What you’re coming up against is a fairly fundamental difference between Nextflow and Snakemake though. Nextflow pipelines are really good at being super reproducible and trustworthy as a whole unit, so they really resist doing the kind of middle-of-workflow-execution meddling that you’re trying to do. That’s an intentional design decision.

Others might have more experience & better ideas in how to do what you’re trying to do though, most things are possible :smiling_face: Maybe @robsyme or @mahesh.binzerpanchal or @Adam_Talbot or @mribeirodantas or someone?

Phil

I think i managed to find a 90% solution for my usecase.

Instead of creating a channel from the results in the publishDir i filter the channel based on the files that are there.

In my workflow i have added param.base which i then check in my workflow. If then i want to run only from B i filter based on what has already completed successfully through B:

//"A" Channel contains tuples of [id, path]
if(params.base=="B"){
    A = A.filter {
        elem -> file("run/B/${elem[0]}.txt").exists()
    }
}else if(params.base=="C"){
    A= A.filter {
        elem -> file("run/C/${elem[0]}.txt").exists()
    }
}

Conceptually it is quite different to what i had in Snakemake, but from my initial testing the workflow is more reproducible and evaluates a lot faster than what i had. So i am happy :slight_smile:

Thanks,

Christoffer

1 Like

Great! Glad you got it working, thanks for posting your solution :slight_smile:

OK I managed it. Like Phil says, we’re in anti-pattern territory here, but it’s possible.

The strategy:

  1. Run the pipeline with ignore to go to completion
  2. Write results from B() to results/B directory
  3. Continue to C()
  4. On run 2, check the results/B directory for files and make a list of files by name
  5. Compare the output of A()and either:
    1. select the file if the output of A() is not in the results/B directory (i.e. it failed on run 1).
    2. select the file if results/B is empty.
  6. Pass this channel to B()

Code:

process A {
    output:
    path("A/*")

    script:
    """
    mkdir -p A
    touch A/{1..5}.txt
    sleep 1
    """
}

process B {
    errorStrategy 'ignore'
    tag "${id}"

    input:
    tuple val(id), path('input.txt')

    output:
    tuple val(id), path("B/${id}.txt")

    script:
    """
    mkdir -p B
    cp input.txt B/${id}.txt
    sleep 1
    exit ${id.toInteger() % 2}
    """
}

process C {
    errorStrategy 'ignore'
    tag "${id}"

    input:
    tuple val(id), path('input.txt')

    output:
    tuple val(id), path("C/${id}.txt")

    script:
    """
    mkdir -p C
    cp input.txt C/${id}.txt
    sleep 1
    """
}

workflow {
    Afiles = A().flatten().map {
        file ->
        return [file.baseName, file]
    }

    // Get IDs of files that already have successful B results
    successful_ids = channel.fromPath("results/B/*.txt", checkIfExists: false)
        .map { file -> file.baseName }
        .collect()
        .map { it -> [it] }  // Wrap in list to prevent spreading
        .ifEmpty([[]])

    // First run: process all A files. Subsequent runs: only process files with existing results
    B_input = Afiles.combine(successful_ids)
        .filter { id, a_file, existing_ids ->
            existing_ids.isEmpty() || existing_ids.any { it == id }
        }
        .map { id, a_file, existing_ids -> [id, a_file] }

    Bfiles = B(B_input)
    Cfiles = C(Bfiles)

    outfiles = Bfiles.mix(Cfiles)

    publish:
    outfiles = outfiles

}

output {
    outfiles {
        path "."
    }
}

Run 1:

remember to rm -rf results otherwise this doesn’t make sense

>  rm -rf results && nextflow run .                                                                     
Nextflow 25.10.3 is available - Please consider updating your version to it
N E X T F L O W  ~  version 25.10.2
Launching `./main.nf` [peaceful_wiles] DSL2 - revision: 309b52844a
[c7/599fad] Submitted process > A
[1b/bfe1d1] Submitted process > B (4)
[35/a3ef87] Submitted process > B (3)
[7f/7ad079] Submitted process > B (5)
[f9/552ab6] Submitted process > B (2)
[78/6e7ae2] Submitted process > B (1)
[7f/7ad079] NOTE: Process `B (5)` terminated with an error exit status (1) -- Error is ignored
[78/6e7ae2] NOTE: Process `B (1)` terminated with an error exit status (1) -- Error is ignored
[35/a3ef87] NOTE: Process `B (3)` terminated with an error exit status (1) -- Error is ignored
[f3/f5be09] Submitted process > C (2)
[0f/214cba] Submitted process > C (4)

Run 2:

>  nextflow run .                  
Nextflow 25.10.3 is available - Please consider updating your version to it
N E X T F L O W  ~  version 25.10.2
Launching `./main.nf` [magical_shannon] DSL2 - revision: 309b52844a
[71/f57c23] Submitted process > A
[5e/ce5f23] Submitted process > B (2)
[f2/67b937] Submitted process > B (4)
[c1/45f3c0] Submitted process > C (2)
[34/1abbd8] Submitted process > C (4)
1 Like

I wrote this a while back: How to skip specific/failed samples on next `-resume`. I think this is what is wanted.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.