Looking for a way to trigger a Nextflow cleanup process (deleting input files) only after the entire workflow completes successfully—any ideas or best practices to handle this efficiently?

Hi Community,
I am working on a Nextflow workflow that involves processing a series of paired input files (e.g., [[bamfile1, bamfile2], [samfile1, samfile2]]). My workflow is running multiple processes, and as a final step, I want to cleanup the input files from the input path, but only after the entire workflow has successfully completed.
Here is my current implementation of the cleanup process:
groovy:

process cleanUpProcess {
    cpus 2
    memory "4 GB"
    container params.cli

    input:
    val fileList   // [[bamfile1,bamfile2],[samfile1,samfile2]]

    output:
    stdout

    script:
    def awsS3Commands = fileList.collect { files ->
        def pairOne = files[0]
        def pairTwo = files[1]
        """
        aws s3 rm ${pairOne}
        aws s3 rm ${pairTwo}
        """
    }.join('\n')

    """
    ${awsS3Commands}
    """
}

workflow MethylDackel {
    main:
    // Multiple processes are being executed here
}

workflow {
    def myParams = params
    def myWorkflow = workflow

    MethylDackel()

    myWorkflow.onComplete {
        if (myWorkflow.success) {
            cleanUpProcess()
        } else {
            log.info "Failure!"
        }
    }
}

Challenge:
I understand that the onComplete block cannot directly invoke the cleanUpProcess. However, I want the cleanup process to only trigger after the workflow has successfully completed. This ensures that the input files are not removed prematurely or during an unsuccessful execution.

Question to the Community
Are there any workarounds or alternative approaches to trigger the cleanup process after successful workflow completion?
I would love to hear your suggestions or ideas on how to handle this use case efficiently while adhering to best practices. Thank you in advance for your guidance! :pray: @mribeirodantas @mahesh.binzerpanchal

Why do you have to wait until the entire workflow has ended successfully to delete pipeline input files?

I mean, if by the time the second process has ended successfully all the other processes will only work on the output of this second process, you don’t need the pipeline input files anymore. You can delete them right away. If something fails, you can always resume and it’ll work because the pipeline will start from the last failed task.

Most of the time people are worried about cleaning up the entire work directory when the pipeline run is over (cleanup = true) or during the run, as soon as possible (nf-boost), but if you want to delete only the pipeline input files, that’s much simpler.

Hi @mribeirodantas Thank you response
The reason for this is that the FASTQ files are stored in an S3 bucket, and as soon as the pipeline execution is successfully completed, they need to be deleted. The pipeline is deployed on AWS HealthOmics, which does not support resuming due to the dynamic nature of the pipeline, as HealthOmics only caches static outputs. Additionally, the pipeline runs in a private VPC with no public internet connection. To minimize effort, we decided to implement this cleanup strategy, triggered only upon successful completion of the pipeline.that’s the reason wait until all process completed successfully

So Here:
How can i verify all the process will complete successfully …then Only on successful completion of all …the cleanup process should invoke otherwise not …

AFAIK, AWS HealthOmics currently supports task caching.

If there’s a last process that is always run, you can add another one after this one is succeeded to do the cleaning up.

I would say go with what marcel suggested, but an alternative is to use some native groovy code in the onComplete block.

You can use a ProcessBuilder object to run the command.
nf-cascade is an example that uses it: nf-cascade/modules/local/nextflow/run/main.nf at 4b83584254fc88fbeb37ebfa421f1ef190da194a · mahesh-panchal/nf-cascade · GitHub

You would basically construct the command like:

def aws_cmd = [
    'aws',
    's3',
    'rm'
]
def builder = new ProcessBuilder( aws_cmd + fileList.flatten() )
process = builder.start()
assert process.waitFor() == 0: process.text

If the rm command cannot take a list of files, then iterate over the list of files using each.

@Kanna_Dhasan were you able to get this working?

I have a similar objective and currently, this appears to be the only question about automatically cleaning the Nextflow work folder in AWS in the Seqera forum.

As described here, I have a slightly different use case though. I would like to automatically clean up the Nextflow work folders when runs finish successfully while keeping all (typically small) Nextflow metadata files. I saw that the nextflow clean command appears to support this with the -keep-logs option but I was wondering if there’s a way to do it with the cleanup = true config setting (or any other way) so that I don’t have to run a separate Nextflow command outside of the pipeline.

Note, as mentioned here and here, I am aware that currently, the Nextflow clean options don’t work on S3 objects. Given that, I’m hoping that they will add support for S3 objects soon or that you’ve found an alternative approach for doing this.

I’ll also mention another approach I came across that doesn’t really work for me (but might work for other people who read this).

For any given bucket in AWS, you can create Lifecycle Rules to automatically manage or delete files. If you only use a single work folder for Nextflow processes, you could specify a hard-coded Prefix so that the rule only applies to the work folder. You can also set how many days to wait after object creation before deleting the file. So, assuming that you’ll be able to correct any pipeline failures within a few days, you could create a lifecycle rule that auto-deletes the work files say 10 days after they are created. It’s not the most efficient option, but S3 storage is generally pretty cheap.

Unfortunately, this solution doesn’t work for me because I want to create distinct work folders for each project. As described in this article, it’s possible to create lifecycle rules that delete objects with specific Object tags, which would be perfect for my use case. Plus, Nextflow already automatically tags the metadata files with nextflow.io/metadata = true e.g.:

Unfortunately (and this seems like an oversight), Nextflow fails to tag the non-metadata files (i.e. the process input files) it creates in the work folder. If this were fixed/updated, I’d recommend tagging those files differently so that they could be handled distinctly from the metadata files by lifecycle rules (as I want to do).

EDIT: I am currently using Nextflow v24.04.4 with Fusion v2.3.8-e3aab5d.

In case this can help someone, I finally found a solution that works for me.

First, fusion has a fusion.tags option that can be set up to tag files that match specified patterns. By default, it is set to tag all metadata files with nextflow.io/metadata=true and all other files with nextflow.io/temporary=true.

With only that option set, the published results files were also tagged with nextflow.io/temporary=true which wasn’t helpful for distinguishing between temporary staged files and final results files with AWS LifeCycle Rules. However, it turns out that the publishDir directive also has a tags option which allows one to override the tags set by Fusion.

By leaving the default fusion tags and then tagging the published results files, I am able to distinguish between Nextflow metadata files, temporary staged files, and final results files. It’s then simple to create lifecycle rules to just delete the temporary files soon after they’ve been created.

1 Like

Update: unfortunately, as described in this issue, files in published subdirectories aren’t being tagged properly per the publishDir tags option, so there’s still no reliable way to remove Nextflow files in an automatic way in S3.

Doing it manually is a real pain so I’m hoping Nextflow will prioritize a solution to this soon!

Recently, I decided to revisit this issue and came up with the following approach, which works well for me. I typed up what I did in case it can help anyone else. Let me know if I missed something or made any typos! :folded_hands:


Configuring Nextflow and AWS to Automatically Delete Old Nextflow Work Files

When a Nextflow pipeline is run, it uses a work folder where it stages and processes the files needed by each process defined in the pipeline. Over time, these work folders can become quite large as they contain full copies of the files used by the pipeline. Nextflow offers two options to deal with this. First, as described here, one can set cleanup = true in the config file so that all files associated with a run in the work directory will be deleted when the run completes successfully. Second, as described here, one can manually run nextflow clean to remove work files from previous executions. Unfortunately, neither of these options is currently supported when using Nextflow with Fusion, as the work folder is located in AWS S3.

One natural way to deal with this is to configure a Lifecycle rule in AWS that will periodically delete objects with a given tag. By default, as described here, Fusion is configured to tag all non-metadata work files with “nextflow.io/temporary=true”. When processed files are transferred from the Nextflow work folder to a results folder using the publishDir directive (described here), they keep their tags as-is. This would prevent the Lifecycle rule from being able to exclusively target Nextflow work files for removal, but thankfully, the publishDir directive has a “tags” option that one can use to update the tags when objects are published to a results folder. Unfortunately, there are two outstanding bugs that prevent the above approach from working. First, Fusion often fails to tag files in the work folder, preventing them from being targetable by Lifecycle rules. Second, when tags are specified with the publishDir directive, they aren’t applied to files in subfolders, so it’s not possible to reliably replace tags applied by Fusion. Given these issues, another approach was identified to selectively delete Nextflow work files in AWS.

Adjusting Your Nextflow Config to Make Use of the Following Setup

The following configuration is designed to tag *untagged* S3 objects that contain “nextflow_work” in their name/path. This requires two changes to the Nextflow defaults:

  1. To make it easier to exclusively tag Nextflow work objects, change the Nextflow work folder from simply “work” to “nextflow_work” by adding workDir = nextflow_work to the nextflow.config file.

  2. As mentioned above, Fusion is configured to tag all non-metadata work files in the Nextflow work folder with “nextflow.io/temporary=true”. To avoid replacing the tags on the metadata files, the Lambda function we configure below is designed to only tag untagged S3 objects in nextflow_work folders. To stop Fusion from tagging objects with “nextflow.io/temporary=true”, add fusion.tags = ‘[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true)’ to the nextflow.config file.

Configuring a Lambda Function in AWS to Tag Old Nextflow Work Files

AWS Lambda is a way to run custom code without having to manage the underlying infrastructure. Here, we will configure a Lambda function to tag old untagged Nextflow work files so that they can be targeted for deletion by Lifecycle rules.

Creating a Custom Role for the Lambda Function

First, it’s necessary to create a role that can be adopted by the Lambda function with the necessary permissions for it to tag objects in S3. Here are the steps to do this:

  1. Navigate to the IAM portal in AWS and select “Roles” in the left-hand menu.

  2. Select “Create role” at the top right of the page.

  3. Under “Trusted entity type” select “AWS service”, and under “Service or use case” select “Lambda”. Select “Next”.

  4. Under “Permissions policies”, search for “AWSLambdaBasicExecutionRole” and add it. This policy will allow the Lambda function to write to the CloudWatch Logs, which is helpful for tracking activity. Select “Next”.

  5. Under “Role name,” enter a clear name for the role and select “Create role” in the bottom right corner of the page.

  6. Search for the role you just created and select it.

  7. Under “Permissions policies”, select “Create inline policy” from the “Add permissions” dropdown menu:

  8. Under “Policy editor”, select “JSON” and replace the text with:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3TagList",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        },
        {
            "Sid": "S3TagReadWrite",
            "Effect": "Allow",
            "Action": [
                "s3:GetObjectTagging",
                "s3:PutObjectTagging"
            ],
            "Resource": [
                "arn:aws:s3:::*/*"
            ]
        }
    ]
}

This will allow the Lambda Function to tag objects in S3. Select “Next”.

  1. Under “Policy name,” add a meaningful name and select “Create policy”.

Create a Lambda Function

  1. Navigate to the Lambda portal in AWS and select “Functions” in the left-hand menu.

  2. Select “Create function” in the top right corner of the page.

  3. Under “Create function”, leave the default (Author from scratch) selected.

  4. Under “Function name”, add a meaningful name.

  5. Under “Runtime”, select “Python 3.12”.

  6. Under “Architecture”, leave the default (x86_64) selected.

  7. Expand “Change default execution role” and select “Use an existing role”. Under “Existing role”, search for the role you created above and select it.

  8. Select “Create function” in the bottom right corner of the page.

  9. Replace the code in the “lambda_function.py” script with the following:

import os
from datetime import datetime, timezone, timedelta
import boto3
from botocore.config import Config


# ---------- GLOBAL DEFAULTS ----------
DEFAULTS = {
   "root_prefix": "",                 # e.g., "projects/" or "" for whole bucket
   "search_substr": "nextflow_work",  # substring to look for anywhere in the key
   "age_days": 7,                     # tag only if >= N days old
   "max_ops_per_bucket": 20000,       # safety cap per bucket per run
}


# Hard-coded targets (edit to suit). Any missing field inherits from DEFAULTS.
BUCKETS = [
   {"bucket": "<your_bucket_name>"},
   {"bucket": "<your_second_bucket_name>", "root_prefix": "2025/"},
   # {"bucket": "third-bucket", "search_substr": "nextflow_work", "age_days": 14},
]


# Tuning knobs
PAGE_SIZE  = int(os.environ.get("PAGE_SIZE", "1000"))  # S3 list page size (<=1000)
SAFETY_MS  = int(os.environ.get("SAFETY_MS", "3000"))  # stop this many ms before timeout
TAG_KEY    = os.environ.get("TAG_KEY", "Lifecycle")
TAG_VALUE  = os.environ.get("TAG_VALUE", "purge-candidate")


s3 = boto3.client(
   "s3",
   config=Config(
       retries={"max_attempts": 8, "mode": "standard"},
       connect_timeout=3,
       read_timeout=10,
   ),
)


def _key_matches(key: str, needle: str) -> bool:
   return needle in key


def _is_old_enough(last_modified: datetime, age_days: int) -> bool:
   return (datetime.now(timezone.utc) - last_modified) >= timedelta(days=age_days)


def _has_any_tags(bucket: str, key: str) -> bool:
   resp = s3.get_object_tagging(Bucket=bucket, Key=key)
   return len(resp.get("TagSet", [])) > 0


def _put_single_tag(bucket: str, key: str):
   s3.put_object_tagging(
       Bucket=bucket,
       Key=key,
       Tagging={"TagSet": [{"Key": TAG_KEY, "Value": TAG_VALUE}]},
   )


def process_bucket(cfg: dict, context):
   bucket       = cfg["bucket"]
   root_prefix  = cfg["root_prefix"]
   search_substr= cfg["search_substr"]
   age_days     = int(cfg["age_days"])
   max_ops      = int(cfg["max_ops_per_bucket"])


   processed = tagged = skipped_young = skipped_tagged = skipped_nomatch = 0


   paginator = s3.get_paginator("list_objects_v2")
   try:
       page_iter = paginator.paginate(
           Bucket=bucket,
           Prefix=root_prefix,
           PaginationConfig={"PageSize": PAGE_SIZE},
       )
   except Exception as e:
       return {"bucket": bucket, "error": f"list_failed: {e.__class__.__name__}: {e}"}


   for page in page_iter:
       if context.get_remaining_time_in_millis() <= SAFETY_MS:
           break
       contents = page.get("Contents", [])
       if not contents:
           continue


       for obj in contents:
           if context.get_remaining_time_in_millis() <= SAFETY_MS:
               break


           key = obj["Key"]
           processed += 1


           if not _key_matches(key, search_substr):
               skipped_nomatch += 1
               continue


           if not _is_old_enough(obj["LastModified"], age_days):
               skipped_young += 1
               continue


           try:
               if _has_any_tags(bucket, key):
                   skipped_tagged += 1
                   continue
               _put_single_tag(bucket, key)
               tagged += 1
           except Exception as e:
               # Log the failure and continue
               print({"bucket": bucket, "key": key, "tag_error": str(e)})


           if tagged % 50 == 0:
               print({"bucket": bucket, "progress": {"processed": processed, "tagged": tagged}})


           if tagged >= max_ops:
               break


       if tagged >= max_ops:
           break


   result = {
       "bucket": bucket,
       "prefix": root_prefix,
       "needle": search_substr,
       "processed": processed,
       "tagged": tagged,
       "skipped": {
           "not_old_enough": skipped_young,
           "already_tagged": skipped_tagged,
           "no_match": skipped_nomatch,
       },
   }
   print({"summary": result})
   return result


def lambda_handler(event, context):
   # Merge each entry with defaults
   targets = [{**DEFAULTS, **b} for b in BUCKETS]
   summaries = []


   for bcfg in targets:
       if context.get_remaining_time_in_millis() <= SAFETY_MS:
           print({"status": "stopping_early_time_budget"})
           break
       try:
           summaries.append(process_bucket(bcfg, context))
       except Exception as e:
           summaries.append({"bucket": bcfg["bucket"], "error": f"unhandled: {e}"})


   return {"per_bucket": summaries}

In summary, this script will search for keys (i.e., AWS objects) that contain the substring “nextflow_work” in all buckets in the BUCKETS list and tag them with Lifecycle = purge-candidate if they are untagged and at least age_days (here, 7) old. The root_prefix option allows you to restrict which folders will be searched for keys containing “nextflow_work”.

  1. Select the “Configuration” tab and select “General configuration” in the resulting menu.

  2. Select “Edit” and under “Timeout” set 15 min and 0 sec:

  3. Select “Save”.

  4. Return to the “Code” tab and select “Deploy”.

Configuring an EventBridge Schedule to Run the Lambda Function

On its own, the above Lambda Function won’t do anything and needs to be triggered periodically. This is where the EventBridge comes in. Among other things, EventBridge allows one to configure schedulers that will run Lambda functions on a custom interval.

Creating a Custom Role for EventBridge

First, it’s necessary to create a role that can be adopted by the EventBridge schedule that will allow it to invoke the above Lambda function. Here are the steps to do this:

  1. Navigate to the IAM portal in AWS and select “Roles” in the left-hand menu.

  2. Select “Create role” at the top right of the page.

  3. Under “Trusted entity type,” select “Custom trust policy” and replace the “Custom trust policy” with:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": { "Service": "scheduler.amazonaws.com" },
      "Action": "sts:AssumeRole"
    }
  ]
}

Select “Next”.

  1. Don’t add any permissions and select “Next” again.

  2. Under “Role name,” enter a meaningful name and select “Create role” in the bottom right corner of the page.

  3. Search for the role you just created and select it.

  4. Under “Permissions policies”, select “Create inline policy” from the “Add permissions” dropdown menu:

  5. Under “Policy editor”, select “JSON” and replace the text with:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "InvokeAnyLambdaAnywhere",
            "Effect": "Allow",
            "Action": "lambda:InvokeFunction",
            "Resource": "arn:aws:lambda:*:*:function:*"
        }
    ]
}

This will allow the EventBridge scheduler to invoke any Lambda function. Select “Next”.

  1. Under “Policy name” enter a meaningful name and select “Create policy” in the bottom right corner of the page.

Create an EventBridge Schedule

  1. Navigate to the Amazon EventBridge portal in AWS and select “Schedules” in the left-hand menu.

  2. Under “Schedule name,” enter a clear name for the schedule.

  3. Leave “default” for “Schedule group”.

  4. Under “Schedule pattern”, choose a schedule that meets your needs. I chose a “Recurring schedule” with a “Cron-based schedule” like this:

This “Cron expression” has the schedule run every day at 6 AM.

  1. Under “Flexible time window,” select “Off” and select Next.

  2. Under “Select target,” choose “AWS Lambda”:

  3. Under “Lambda function,” select the Lambda function that was created above and select “Next”.

  4. Under “Action after schedule completion,” select “NONE”.

  5. Under “Execution role,” select “Use existing role” and choose the EventBridge role you created above. Select “Next”.

  6. Select “Create schedule” in the bottom right corner of the page.

Configuring Lifecycle Rules to Delete Nextflow Work Files

AWS S3 offers Lifecycle rules, which can define actions you want Amazon S3 to take during an object’s lifetime, such as transitioning objects to another storage class, archiving them, or deleting them after a specified period of time. Here, we will set up Lifecycle Rules that will delete objects that were tagged by the Lambda function.

Configuring a Lifecycle Rule

  1. Navigate to the S3 portal in AWS and select the bucket where you are running Nextflow.

  2. Select the “Management” tab:

  3. Under “Lifecycle rules,” select “Create lifecycle rule”.

  4. Under “Lifecycle rule name,” enter a clear name.

  5. (Optionally) enter a Prefix to limit the scope of the rule.

  6. Under “Object tags,” select “Add tag”.

  7. If you used the Python code without modifying the TAG_KEY and TAG_VALUE variables in the Lambda function, under “Key” enter “Lifecycle” and under “Value - optional” enter “purge-candidate”.

  8. Under “Lifecycle rule actions,” check “Expire current versions of objects”. Note, if you use versioning in your bucket, you’ll likely also want to select “Permanently delete noncurrent versions of objects”.

  9. Under “Days after object creation,” enter 1 and select “Create rule” in the bottom right corner of the page.