I have a pipeline I’ve been developing where one process may need to run north of one million times. The next processes uses the .collect() operator along with a wildcard in the process input directive to collect and concatenate the previous process’s many outputs.
Here’s the relevant section of the workflow declaration (I lied–the collecting of files actually runs in two stages to make resuming faster):
This works well, but COLLECT_INTERMEDIATE_BATCHES never seems to be resumable; at every rerun, the pipeline starts there. This is inconvenient because of another downstream step that is very slow, which I’d prefer to rerun as little as possible.
Can anyone spot where the resumability issue is popping up? And is there some other approach you might recommend that would guarantee resumability?
The issue is that channels are asynchronous, so there should be no expectation that input comes in in a certain order.
See this example where the start of the list changes when you run with -resume:
workflow {
TASK(Channel.fromList(1..2100))
.buffer( size: 1000, remainder: true )
.map{ list -> list.head() }
.view()
}
process TASK {
input:
val some_number
script:
"""
echo $some_number
"""
output:
stdout
}
gitpod /workspace/Nextflow_sandbox (main) $ nextflow run main.nf -resume
N E X T F L O W ~ version 24.10.3
Launching `main.nf` [agitated_venter] DSL2 - revision: d8983ec4b6
[ee/1e57a1] process > TASK (2099) [100%] 2100 of 2100, cached: 2100 ✔
16
1004
2002
gitpod /workspace/Nextflow_sandbox (main) $ nextflow run main.nf -resume
N E X T F L O W ~ version 24.10.3
Launching `main.nf` [cheesy_noyce] DSL2 - revision: d8983ec4b6
[8b/c9f097] process > TASK (2098) [100%] 2100 of 2100, cached: 2100 ✔
14
986
2004
Have you tried directly concatenating the contents using collectFile? This concatenates contents by hash, so ordering is the same meaning it should be resumable.
I had no idea that . collectFile() concatenates by hash, but that sounds like exactly what I need. The main reason I haven’t used it in the past is, if I’m understanding the docs correctly, . collectFile() accepts a channel of in-memory values as opposed to files. If I used a closure in . collectFile() to do bring each of the 1,00,000+ files into memory, would it do that for all of them before writing them out, or would it do something closer to streaming?
Looking further into the .collectFile() docs, it looks like it definitely does not bring everything into memory, and instead relies on temporary caching to disk:
After implementing it, I can confirm that it is now resumable! Thanks @mahesh.binzerpanchal. For anyone else interesting in this, here’s a dummy version of my implementation: