Wait and retry if not ready

Hi all, I’ve a process, which runs on many files, and which needs to be restarted after some time in case of failure. The use case is the following: my data is archived in the cloud: once I start the rehydratation, I have to wait for the data to be ready; if it is not ready I send a failed message and restart after one hour. I’m using sleep(60*60 *1000 as long) ; return 'retry' as an errorStrategy. Unfortunately, it looks like it is applied globally to all tasks of the same process, intead of each single task.This can be tested with this simple pipeline:

#!/usr/bin/env nextflow

def wait_and_retry = {sleep(30 *1000 as long) ; return 'retry'}

process FAIL_AND_WAIT {

    errorStrategy {task.exitStatus <=2 ? wait_and_retry() : 'terminate'}`
    maxRetries 5
   
     input:
     val(hello)

    script:
    """
    echo "${hello},Attempt ${task.attempt}, \$(date)"
    exit ${task.attempt}
    """
}

workflow {
    ch = channel.from( "Task 1", "Task 3", "Task 2" )
    FAIL_AND_WAIT(ch)`
}

The result is the following:cat work/*/*/.command.out | sort -t ',' -k3
Task 1,Attempt 1, Wed Oct 30 13:51:51 CET 2024
Task 2,Attempt 1, Wed Oct 30 13:51:51 CET 2024
Task 3,Attempt 1, Wed Oct 30 13:51:51 CET 2024
Task 3,Attempt 2, Wed Oct 30 13:52:21 CET 2024
Task 2,Attempt 2, Wed Oct 30 13:52:51 CET 2024
Task 1,Attempt 2, Wed Oct 30 13:53:21 CET 2024
Task 2,Attempt 3, Wed Oct 30 13:53:51 CET 2024
Task 3,Attempt 3, Wed Oct 30 13:54:21 CET 2024

I would expect the second attempt of all task to start at 13:52:21 (after 30 seconds), instead there is a waiting time of 30 seconds between every single task.Anyone can suggest how to solve it? (edited)

As suggested by Thomas Danhorn on Slack (Slack):

the reason is that the errorStrategy needs to be resolved by the main Nextflow process before it can decide what to do with a given worker process (restart or not), so the sleep() does not happen in the worker process, but the main Nextflow process as it is resolving the errorStrategy

I found the following workaround: I define a global variable that I increment before each “wait” to be sure that the waiting time is triggered only once per “retry cycle”:

def wait_and_retry = { retryIndex=retryIndex+1; println "WAIT " + new Date(); sleep(20 *1000 as long) ; return 'retry'}

retryIndex=0

process FAIL_AND_WAIT {
    errorStrategy { task.attempt > retryIndex ? wait_and_retry() : 'retry'}

Glad you found a solution. Thanks for posting it here in the forum too!

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.