Looking for a way to trigger a Nextflow cleanup process (deleting input files) only after the entire workflow completes successfully—any ideas or best practices to handle this efficiently?

Hi Community,
I am working on a Nextflow workflow that involves processing a series of paired input files (e.g., [[bamfile1, bamfile2], [samfile1, samfile2]]). My workflow is running multiple processes, and as a final step, I want to cleanup the input files from the input path, but only after the entire workflow has successfully completed.
Here is my current implementation of the cleanup process:
groovy:

process cleanUpProcess {
    cpus 2
    memory "4 GB"
    container params.cli

    input:
    val fileList   // [[bamfile1,bamfile2],[samfile1,samfile2]]

    output:
    stdout

    script:
    def awsS3Commands = fileList.collect { files ->
        def pairOne = files[0]
        def pairTwo = files[1]
        """
        aws s3 rm ${pairOne}
        aws s3 rm ${pairTwo}
        """
    }.join('\n')

    """
    ${awsS3Commands}
    """
}

workflow MethylDackel {
    main:
    // Multiple processes are being executed here
}

workflow {
    def myParams = params
    def myWorkflow = workflow

    MethylDackel()

    myWorkflow.onComplete {
        if (myWorkflow.success) {
            cleanUpProcess()
        } else {
            log.info "Failure!"
        }
    }
}

Challenge:
I understand that the onComplete block cannot directly invoke the cleanUpProcess. However, I want the cleanup process to only trigger after the workflow has successfully completed. This ensures that the input files are not removed prematurely or during an unsuccessful execution.

Question to the Community
Are there any workarounds or alternative approaches to trigger the cleanup process after successful workflow completion?
I would love to hear your suggestions or ideas on how to handle this use case efficiently while adhering to best practices. Thank you in advance for your guidance! :pray: @mribeirodantas @mahesh.binzerpanchal

Why do you have to wait until the entire workflow has ended successfully to delete pipeline input files?

I mean, if by the time the second process has ended successfully all the other processes will only work on the output of this second process, you don’t need the pipeline input files anymore. You can delete them right away. If something fails, you can always resume and it’ll work because the pipeline will start from the last failed task.

Most of the time people are worried about cleaning up the entire work directory when the pipeline run is over (cleanup = true) or during the run, as soon as possible (nf-boost), but if you want to delete only the pipeline input files, that’s much simpler.

Hi @mribeirodantas Thank you response
The reason for this is that the FASTQ files are stored in an S3 bucket, and as soon as the pipeline execution is successfully completed, they need to be deleted. The pipeline is deployed on AWS HealthOmics, which does not support resuming due to the dynamic nature of the pipeline, as HealthOmics only caches static outputs. Additionally, the pipeline runs in a private VPC with no public internet connection. To minimize effort, we decided to implement this cleanup strategy, triggered only upon successful completion of the pipeline.that’s the reason wait until all process completed successfully

So Here:
How can i verify all the process will complete successfully …then Only on successful completion of all …the cleanup process should invoke otherwise not …

AFAIK, AWS HealthOmics currently supports task caching.

If there’s a last process that is always run, you can add another one after this one is succeeded to do the cleaning up.

I would say go with what marcel suggested, but an alternative is to use some native groovy code in the onComplete block.

You can use a ProcessBuilder object to run the command.
nf-cascade is an example that uses it: nf-cascade/modules/local/nextflow/run/main.nf at 4b83584254fc88fbeb37ebfa421f1ef190da194a · mahesh-panchal/nf-cascade · GitHub

You would basically construct the command like:

def aws_cmd = [
    'aws',
    's3',
    'rm'
]
def builder = new ProcessBuilder( aws_cmd + fileList.flatten() )
process = builder.start()
assert process.waitFor() == 0: process.text

If the rm command cannot take a list of files, then iterate over the list of files using each.

@Kanna_Dhasan were you able to get this working?

I have a similar objective and currently, this appears to be the only question about automatically cleaning the Nextflow work folder in AWS in the Seqera forum.

As described here, I have a slightly different use case though. I would like to automatically clean up the Nextflow work folders when runs finish successfully while keeping all (typically small) Nextflow metadata files. I saw that the nextflow clean command appears to support this with the -keep-logs option but I was wondering if there’s a way to do it with the cleanup = true config setting (or any other way) so that I don’t have to run a separate Nextflow command outside of the pipeline.

Note, as mentioned here and here, I am aware that currently, the Nextflow clean options don’t work on S3 objects. Given that, I’m hoping that they will add support for S3 objects soon or that you’ve found an alternative approach for doing this.

I’ll also mention another approach I came across that doesn’t really work for me (but might work for other people who read this).

For any given bucket in AWS, you can create Lifecycle Rules to automatically manage or delete files. If you only use a single work folder for Nextflow processes, you could specify a hard-coded Prefix so that the rule only applies to the work folder. You can also set how many days to wait after object creation before deleting the file. So, assuming that you’ll be able to correct any pipeline failures within a few days, you could create a lifecycle rule that auto-deletes the work files say 10 days after they are created. It’s not the most efficient option, but S3 storage is generally pretty cheap.

Unfortunately, this solution doesn’t work for me because I want to create distinct work folders for each project. As described in this article, it’s possible to create lifecycle rules that delete objects with specific Object tags, which would be perfect for my use case. Plus, Nextflow already automatically tags the metadata files with nextflow.io/metadata = true e.g.:

Unfortunately (and this seems like an oversight), Nextflow fails to tag the non-metadata files (i.e. the process input files) it creates in the work folder. If this were fixed/updated, I’d recommend tagging those files differently so that they could be handled distinctly from the metadata files by lifecycle rules (as I want to do).

EDIT: I am currently using Nextflow v24.04.4 with Fusion v2.3.8-e3aab5d.