Recently, I decided to revisit this issue and came up with the following approach, which works well for me. I typed up what I did in case it can help anyone else. Let me know if I missed something or made any typos! 
Configuring Nextflow and AWS to Automatically Delete Old Nextflow Work Files
When a Nextflow pipeline is run, it uses a work folder where it stages and processes the files needed by each process defined in the pipeline. Over time, these work folders can become quite large as they contain full copies of the files used by the pipeline. Nextflow offers two options to deal with this. First, as described here, one can set cleanup = true in the config file so that all files associated with a run in the work directory will be deleted when the run completes successfully. Second, as described here, one can manually run nextflow clean to remove work files from previous executions. Unfortunately, neither of these options is currently supported when using Nextflow with Fusion, as the work folder is located in AWS S3.
One natural way to deal with this is to configure a Lifecycle rule in AWS that will periodically delete objects with a given tag. By default, as described here, Fusion is configured to tag all non-metadata work files with “nextflow.io/temporary=true”. When processed files are transferred from the Nextflow work folder to a results folder using the publishDir directive (described here), they keep their tags as-is. This would prevent the Lifecycle rule from being able to exclusively target Nextflow work files for removal, but thankfully, the publishDir directive has a “tags” option that one can use to update the tags when objects are published to a results folder. Unfortunately, there are two outstanding bugs that prevent the above approach from working. First, Fusion often fails to tag files in the work folder, preventing them from being targetable by Lifecycle rules. Second, when tags are specified with the publishDir directive, they aren’t applied to files in subfolders, so it’s not possible to reliably replace tags applied by Fusion. Given these issues, another approach was identified to selectively delete Nextflow work files in AWS.
Adjusting Your Nextflow Config to Make Use of the Following Setup
The following configuration is designed to tag *untagged* S3 objects that contain “nextflow_work” in their name/path. This requires two changes to the Nextflow defaults:
-
To make it easier to exclusively tag Nextflow work objects, change the Nextflow work folder from simply “work” to “nextflow_work” by adding workDir = nextflow_work to the nextflow.config file.
-
As mentioned above, Fusion is configured to tag all non-metadata work files in the Nextflow work folder with “nextflow.io/temporary=true”. To avoid replacing the tags on the metadata files, the Lambda function we configure below is designed to only tag untagged S3 objects in nextflow_work folders. To stop Fusion from tagging objects with “nextflow.io/temporary=true”, add fusion.tags = ‘[.command.*|.exitcode|.fusion.*](nextflow.io/metadata=true)’ to the nextflow.config file.
Configuring a Lambda Function in AWS to Tag Old Nextflow Work Files
AWS Lambda is a way to run custom code without having to manage the underlying infrastructure. Here, we will configure a Lambda function to tag old untagged Nextflow work files so that they can be targeted for deletion by Lifecycle rules.
Creating a Custom Role for the Lambda Function
First, it’s necessary to create a role that can be adopted by the Lambda function with the necessary permissions for it to tag objects in S3. Here are the steps to do this:
-
Navigate to the IAM portal in AWS and select “Roles” in the left-hand menu.
-
Select “Create role” at the top right of the page.
-
Under “Trusted entity type” select “AWS service”, and under “Service or use case” select “Lambda”. Select “Next”.
-
Under “Permissions policies”, search for “AWSLambdaBasicExecutionRole” and add it. This policy will allow the Lambda function to write to the CloudWatch Logs, which is helpful for tracking activity. Select “Next”.
-
Under “Role name,” enter a clear name for the role and select “Create role” in the bottom right corner of the page.
-
Search for the role you just created and select it.
-
Under “Permissions policies”, select “Create inline policy” from the “Add permissions” dropdown menu:
-
Under “Policy editor”, select “JSON” and replace the text with:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3TagList",
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::*"
]
},
{
"Sid": "S3TagReadWrite",
"Effect": "Allow",
"Action": [
"s3:GetObjectTagging",
"s3:PutObjectTagging"
],
"Resource": [
"arn:aws:s3:::*/*"
]
}
]
}
This will allow the Lambda Function to tag objects in S3. Select “Next”.
- Under “Policy name,” add a meaningful name and select “Create policy”.
Create a Lambda Function
-
Navigate to the Lambda portal in AWS and select “Functions” in the left-hand menu.
-
Select “Create function” in the top right corner of the page.
-
Under “Create function”, leave the default (Author from scratch) selected.
-
Under “Function name”, add a meaningful name.
-
Under “Runtime”, select “Python 3.12”.
-
Under “Architecture”, leave the default (x86_64) selected.
-
Expand “Change default execution role” and select “Use an existing role”. Under “Existing role”, search for the role you created above and select it.
-
Select “Create function” in the bottom right corner of the page.
-
Replace the code in the “lambda_function.py” script with the following:
import os
from datetime import datetime, timezone, timedelta
import boto3
from botocore.config import Config
# ---------- GLOBAL DEFAULTS ----------
DEFAULTS = {
"root_prefix": "", # e.g., "projects/" or "" for whole bucket
"search_substr": "nextflow_work", # substring to look for anywhere in the key
"age_days": 7, # tag only if >= N days old
"max_ops_per_bucket": 20000, # safety cap per bucket per run
}
# Hard-coded targets (edit to suit). Any missing field inherits from DEFAULTS.
BUCKETS = [
{"bucket": "<your_bucket_name>"},
{"bucket": "<your_second_bucket_name>", "root_prefix": "2025/"},
# {"bucket": "third-bucket", "search_substr": "nextflow_work", "age_days": 14},
]
# Tuning knobs
PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "1000")) # S3 list page size (<=1000)
SAFETY_MS = int(os.environ.get("SAFETY_MS", "3000")) # stop this many ms before timeout
TAG_KEY = os.environ.get("TAG_KEY", "Lifecycle")
TAG_VALUE = os.environ.get("TAG_VALUE", "purge-candidate")
s3 = boto3.client(
"s3",
config=Config(
retries={"max_attempts": 8, "mode": "standard"},
connect_timeout=3,
read_timeout=10,
),
)
def _key_matches(key: str, needle: str) -> bool:
return needle in key
def _is_old_enough(last_modified: datetime, age_days: int) -> bool:
return (datetime.now(timezone.utc) - last_modified) >= timedelta(days=age_days)
def _has_any_tags(bucket: str, key: str) -> bool:
resp = s3.get_object_tagging(Bucket=bucket, Key=key)
return len(resp.get("TagSet", [])) > 0
def _put_single_tag(bucket: str, key: str):
s3.put_object_tagging(
Bucket=bucket,
Key=key,
Tagging={"TagSet": [{"Key": TAG_KEY, "Value": TAG_VALUE}]},
)
def process_bucket(cfg: dict, context):
bucket = cfg["bucket"]
root_prefix = cfg["root_prefix"]
search_substr= cfg["search_substr"]
age_days = int(cfg["age_days"])
max_ops = int(cfg["max_ops_per_bucket"])
processed = tagged = skipped_young = skipped_tagged = skipped_nomatch = 0
paginator = s3.get_paginator("list_objects_v2")
try:
page_iter = paginator.paginate(
Bucket=bucket,
Prefix=root_prefix,
PaginationConfig={"PageSize": PAGE_SIZE},
)
except Exception as e:
return {"bucket": bucket, "error": f"list_failed: {e.__class__.__name__}: {e}"}
for page in page_iter:
if context.get_remaining_time_in_millis() <= SAFETY_MS:
break
contents = page.get("Contents", [])
if not contents:
continue
for obj in contents:
if context.get_remaining_time_in_millis() <= SAFETY_MS:
break
key = obj["Key"]
processed += 1
if not _key_matches(key, search_substr):
skipped_nomatch += 1
continue
if not _is_old_enough(obj["LastModified"], age_days):
skipped_young += 1
continue
try:
if _has_any_tags(bucket, key):
skipped_tagged += 1
continue
_put_single_tag(bucket, key)
tagged += 1
except Exception as e:
# Log the failure and continue
print({"bucket": bucket, "key": key, "tag_error": str(e)})
if tagged % 50 == 0:
print({"bucket": bucket, "progress": {"processed": processed, "tagged": tagged}})
if tagged >= max_ops:
break
if tagged >= max_ops:
break
result = {
"bucket": bucket,
"prefix": root_prefix,
"needle": search_substr,
"processed": processed,
"tagged": tagged,
"skipped": {
"not_old_enough": skipped_young,
"already_tagged": skipped_tagged,
"no_match": skipped_nomatch,
},
}
print({"summary": result})
return result
def lambda_handler(event, context):
# Merge each entry with defaults
targets = [{**DEFAULTS, **b} for b in BUCKETS]
summaries = []
for bcfg in targets:
if context.get_remaining_time_in_millis() <= SAFETY_MS:
print({"status": "stopping_early_time_budget"})
break
try:
summaries.append(process_bucket(bcfg, context))
except Exception as e:
summaries.append({"bucket": bcfg["bucket"], "error": f"unhandled: {e}"})
return {"per_bucket": summaries}
In summary, this script will search for keys (i.e., AWS objects) that contain the substring “nextflow_work” in all buckets in the BUCKETS list and tag them with Lifecycle = purge-candidate if they are untagged and at least age_days (here, 7) old. The root_prefix option allows you to restrict which folders will be searched for keys containing “nextflow_work”.
-
Select the “Configuration” tab and select “General configuration” in the resulting menu.
-
Select “Edit” and under “Timeout” set 15 min and 0 sec:

-
Select “Save”.
-
Return to the “Code” tab and select “Deploy”.
Configuring an EventBridge Schedule to Run the Lambda Function
On its own, the above Lambda Function won’t do anything and needs to be triggered periodically. This is where the EventBridge comes in. Among other things, EventBridge allows one to configure schedulers that will run Lambda functions on a custom interval.
Creating a Custom Role for EventBridge
First, it’s necessary to create a role that can be adopted by the EventBridge schedule that will allow it to invoke the above Lambda function. Here are the steps to do this:
-
Navigate to the IAM portal in AWS and select “Roles” in the left-hand menu.
-
Select “Create role” at the top right of the page.
-
Under “Trusted entity type,” select “Custom trust policy” and replace the “Custom trust policy” with:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": { "Service": "scheduler.amazonaws.com" },
"Action": "sts:AssumeRole"
}
]
}
Select “Next”.
-
Don’t add any permissions and select “Next” again.
-
Under “Role name,” enter a meaningful name and select “Create role” in the bottom right corner of the page.
-
Search for the role you just created and select it.
-
Under “Permissions policies”, select “Create inline policy” from the “Add permissions” dropdown menu:
-
Under “Policy editor”, select “JSON” and replace the text with:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "InvokeAnyLambdaAnywhere",
"Effect": "Allow",
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:*:*:function:*"
}
]
}
This will allow the EventBridge scheduler to invoke any Lambda function. Select “Next”.
- Under “Policy name” enter a meaningful name and select “Create policy” in the bottom right corner of the page.
Create an EventBridge Schedule
-
Navigate to the Amazon EventBridge portal in AWS and select “Schedules” in the left-hand menu.
-
Under “Schedule name,” enter a clear name for the schedule.
-
Leave “default” for “Schedule group”.
-
Under “Schedule pattern”, choose a schedule that meets your needs. I chose a “Recurring schedule” with a “Cron-based schedule” like this:
This “Cron expression” has the schedule run every day at 6 AM.
-
Under “Flexible time window,” select “Off” and select Next.
-
Under “Select target,” choose “AWS Lambda”:

-
Under “Lambda function,” select the Lambda function that was created above and select “Next”.
-
Under “Action after schedule completion,” select “NONE”.
-
Under “Execution role,” select “Use existing role” and choose the EventBridge role you created above. Select “Next”.
-
Select “Create schedule” in the bottom right corner of the page.
Configuring Lifecycle Rules to Delete Nextflow Work Files
AWS S3 offers Lifecycle rules, which can define actions you want Amazon S3 to take during an object’s lifetime, such as transitioning objects to another storage class, archiving them, or deleting them after a specified period of time. Here, we will set up Lifecycle Rules that will delete objects that were tagged by the Lambda function.
Configuring a Lifecycle Rule
-
Navigate to the S3 portal in AWS and select the bucket where you are running Nextflow.
-
Select the “Management” tab:

-
Under “Lifecycle rules,” select “Create lifecycle rule”.
-
Under “Lifecycle rule name,” enter a clear name.
-
(Optionally) enter a Prefix to limit the scope of the rule.
-
Under “Object tags,” select “Add tag”.
-
If you used the Python code without modifying the TAG_KEY and TAG_VALUE variables in the Lambda function, under “Key” enter “Lifecycle” and under “Value - optional” enter “purge-candidate”.
-
Under “Lifecycle rule actions,” check “Expire current versions of objects”. Note, if you use versioning in your bucket, you’ll likely also want to select “Permanently delete noncurrent versions of objects”.
-
Under “Days after object creation,” enter 1 and select “Create rule” in the bottom right corner of the page.