Making processes that collect a large number of input files resumable

Hi all,

I have a pipeline I’ve been developing where one process may need to run north of one million times. The next processes uses the .collect() operator along with a wildcard in the process input directive to collect and concatenate the previous process’s many outputs.

Here’s the relevant section of the workflow declaration (I lied–the collecting of files actually runs in two stages to make resuming faster):

        COLLECT_INTERMEDIATE_BATCHES(
            MAP_CLUSTERS_TO_CDNA.out.buffer(size: 1000, remainder: true)
        )

        COLLECT_FINAL_BATCHES(
            COLLECT_INTERMEDIATE_BATCHES.out.collect()
        )

And here’s the COLLECT_INTERMEDIATE_BATCHES process (assume that COLLECT_FINAL_BATCHES is pretty much the same thing):

process COLLECT_INTERMEDIATE_BATCHES {
    errorStrategy { task.attempt < 3 ? 'retry' : 'ignore' }
    maxRetries 2

    input:
    path all_files_list, stageAs: "unmerged/*"

    output:
    path "merged.aln"

    script:
    """
    echo "Staged ${all_files_list.size()} files..."
    cat unmerged/*.aln > merged.aln
    """
}

This works well, but COLLECT_INTERMEDIATE_BATCHES never seems to be resumable; at every rerun, the pipeline starts there. This is inconvenient because of another downstream step that is very slow, which I’d prefer to rerun as little as possible.

Can anyone spot where the resumability issue is popping up? And is there some other approach you might recommend that would guarantee resumability?

Thanks in advance!

The issue is that channels are asynchronous, so there should be no expectation that input comes in in a certain order.

See this example where the start of the list changes when you run with -resume:

workflow {
    TASK(Channel.fromList(1..2100))
        .buffer( size: 1000, remainder: true )
        .map{ list -> list.head() }
        .view()
}

process TASK {
    input:
    val some_number

    script:
    """
    echo $some_number
    """

    output:
    stdout
}
gitpod /workspace/Nextflow_sandbox (main) $ nextflow run main.nf -resume

 N E X T F L O W   ~  version 24.10.3

Launching `main.nf` [agitated_venter] DSL2 - revision: d8983ec4b6

[ee/1e57a1] process > TASK (2099) [100%] 2100 of 2100, cached: 2100 ✔
16

1004

2002


gitpod /workspace/Nextflow_sandbox (main) $ nextflow run main.nf -resume

 N E X T F L O W   ~  version 24.10.3

Launching `main.nf` [cheesy_noyce] DSL2 - revision: d8983ec4b6

[8b/c9f097] process > TASK (2098) [100%] 2100 of 2100, cached: 2100 ✔
14

986

2004


Have you tried directly concatenating the contents using collectFile? This concatenates contents by hash, so ordering is the same meaning it should be resumable.

workflow {
    TASK(Channel.fromList(1..2100))
        .collectFile(name:'concat.txt')
        .view()
}

process TASK {
    input:
    val some_number

    script:
    """
    echo $some_number > file${some_number}.txt
    """

    output:
    path "*.txt"
}

I had no idea that . collectFile() concatenates by hash, but that sounds like exactly what I need. The main reason I haven’t used it in the past is, if I’m understanding the docs correctly, . collectFile() accepts a channel of in-memory values as opposed to files. If I used a closure in . collectFile() to do bring each of the 1,00,000+ files into memory, would it do that for all of them before writing them out, or would it do something closer to streaming?

You would have to try it out. I’m not sure of the implementation details, but I don’t recall all of it being in memory.

Looking further into the .collectFile() docs, it looks like it definitely does not bring everything into memory, and instead relies on temporary caching to disk:

After implementing it, I can confirm that it is now resumable! Thanks @mahesh.binzerpanchal. For anyone else interesting in this, here’s a dummy version of my implementation:

        COLLECT_FILES(
            BIG_PROCESS.out
                .collectFile(name: "collected.txt") { file -> file.text }
        )
2 Likes