Pipeline not working in AWS Batch because of a fusion problem

First, my pipeline works fine in my local server.

Now I want it to run in AWS Batch using S3 for input and output.

Everything seems ok, it finished the first process, FIND_RAW_FILES, whose input is a “folder” s3://lairc-data-test/sample, and it has to locate the files for the next process, and it will need and save these paths in a txt file. The output is like:

/fusion/s3/lairc-data-test/sample/834-014/V1_02152025/OD/OCT/ANX00018.E2E
/fusion/s3/lairc-data-test/sample/834-014/V1_02152025/OS/OCT/ANX00018.E2E
/fusion/s3/lairc-data-test/sample/834-015/V1_02152025/OD/OCT/ANX00018.E2E
/fusion/s3/lairc-data-test/sample/834-015/V1_02152025/OS/OCT/ANX00018.E2E

Then the second process, EXTRACT_IMAGES, fails with:

Error executing process > 'EXTRACT_IMAGES (1)'

Caused by:
  Can't stage file /fusion/s3/lairc-data-test/sample/834-014/V1_02152025/OS/OCT/ANX00018.E2E -- file does not exist

My current nextflow.config is:

tower {
    enabled = true
    accessToken = System.getenv('TOWER_ACCESS_TOKEN')
    workspaceId = '111111111'
}

docker {
    enabled = true
    runOptions = '--gpus all -u $(id -u):$(id -g)'
}

params {
    input = null
    pattern = ''
    segs = 'fluid,aug'
}

process {
    executor = 'local'
    container = 'ghcr.io/phenopolis/image-analysis:latest'
    withName: RUN_SEG {
        maxForks = 4
    }
}

profiles {
    aws {
        workDir = 's3://lairc-data-test/seqera/work'
        params {
            input = 's3://lairc-data-test/sample'
            pattern = ''
            segs = 'fluid'
        }
        process {
            executor = 'awsbatch'
            queue = 'TowerForge-xxxxxxxxx'
            scratch = false
        }

        wave {
            enabled = true
            strategy = 'container'
        }

        fusion {
            enabled = true
        }

        aws {
            region = 'eu-west-2'
            batch {
                volumes = '/scratch/fusion:/tmp'
            }
        }
    }
}

The command issued: nextflow run segmentation.nf -name test_aws -profile aws.

Here’s my workflow (from segmentation.nf):

workflow {
    input = file(params.input)

    // Create the initial channel based on input type
    if (input.isFile()) {
        // If input is a file, read its content as a list of paths
        raw_files_ch = Channel.fromPath(input).splitText().map { it -> file(it.trim()) }
    }
    else {
        // If input is a directory, use the FIND_RAW_FILES process
        FIND_RAW_FILES(input, params.pattern)
        raw_files_ch = FIND_RAW_FILES.out.found_files.splitText().map { it -> file(it.trim()) }
    }

    // Find the common path - modified to handle single file case
    common_path = raw_files_ch
        .collect()
        .map { files ->
            if (files.size() == 1) {
                // For single file, use its parent directory as common path
                return files[0].parent.toString()
            }
            def paths = files.collect { it -> it.toString().split('/') as List }
            def minLength = paths.min { it -> it.size() }.size()
            def commonParts = (0..<minLength)
                .takeWhile { index ->
                    paths.every { it -> it[index] == paths[0][index] }
                }
                .collect { it -> paths[0][it] }

            return commonParts.join('/')
        }

    // Create the input channel for EXTRACT_IMAGES with unique labels
    extract_images_input_ch = raw_files_ch
        .combine(common_path)
        .map { afile, common ->
            def relativePath = file(common).relativize(afile).toString()
            def unique_label = relativePath.replace('/', '_')
            if (unique_label == afile.name) {
                unique_label = "a${unique_label}"
            }
            [unique_label, afile]
        }
    // extract_images_input_ch.view()

    common_path.view { msg -> "Common path: ${msg}" }

    // Check if no files were found
    extract_images_input_ch.count().view { count -> "Number of files found: ${count}" }

    // Process the found files with EXTRACT_IMAGES
    extract_images_output_ch = EXTRACT_IMAGES(extract_images_input_ch, input)

Now, the funny thing is, if I start a Studio vscode session, install my pipeline from my GitHub repo and issue the same command:

nextflow run segmentation.nf -name test_aws_from_studio -profile aws # (using the very same CE)

it does work beautifully!


So what am I possibly be missing? Reading the forum I mentioned that the “nodes (EC2 node?) need fusion or be compatible with fusion”.

Without seeing the code, I would guess FIND_RAW_FILES is doing something like ls -1 > found_files.txt. This wont work, because the local directory will be different for every task. So the subsequent tasks will look for a file located at /fusion/...

The goal here is to change your samplesheet so that it refers to the files by their absolute paths, instead of the relative paths to the working directory:

s3://lairc-data-test/sample/834-014/V1_02152025/OD/OCT/ANX00018.E2E
s3://lairc-data-test/sample/834-014/V1_02152025/OS/OCT/ANX00018.E2E
s3://lairc-data-test/sample/834-015/V1_02152025/OD/OCT/ANX00018.E2E
s3://lairc-data-test/sample/834-015/V1_02152025/OS/OCT/ANX00018.E2E

Therefore, use the method .list() or .listFiles() on the directory to get the files within:

workflow {
    input = file(params.input)

    // Create the initial channel based on input type
    if (input.isFile()) {
        // If input is a file, read its content as a list of paths
        raw_files_ch = Channel.fromPath(input)
    }
    else if( input.isDirectory() ) {
        // If input is a directory, use the FIND_RAW_FILES process
        raw_files    = input.listFiles()
        raw_files_ch = Channel.of(raw_files)
    }

    raw_files_ch.view()
}

Here’s my example:

> tree
.
├── input
│   ├── input1.txt
│   ├── input2.txt
│   └── input3.txt
├── input.txt
├── main.nf
└── nextflow.config

2 directories, 6 files

> nextflow run . --input input.txt
N E X T F L O W  ~  version 24.10.5
Launching `./main.nf` [ecstatic_raman] DSL2 - revision: d68ff3fb37
/Path/input.txt

> nextflow run . --input input/
N E X T F L O W  ~  version 24.10.5
Launching `./main.nf` [desperate_jennings] DSL2 - revision: d68ff3fb37
/Path/input/input1.txt
/Path/input/input2.txt
/Path/input/input3.txt

Many thanks for your suggestion @Adam_Talbot. I was indeed suspecting of what you mentioned and here’s my original code:

process FIND_RAW_FILES {
    debug false
    tag "${input_dir.name}"
    publishDir "results_${input_dir.name}", mode: 'copy'

    input:
    path input_dir
    val pattern

    output:
    path 'found_files.txt', emit: found_files

    script:
    """
    shopt -s globstar
    # Search for matching files
    find -L ${input_dir}/${pattern} -type f \\( -iname "*.e2e" -o -iname "*.fda" -o -iname "*.sdb" \\) -print0 | xargs -0 realpath > found_files.txt

    # Check if any files were found
    if [[ ! -s found_files.txt ]]; then
        echo "Error: No matching files found in ${input_dir}" >&2
        exit 1
    fi
    """
}

The thing is I looked into list() and listFiles() methods but, unless I’m wrong, they won’t do what I need: the files i need to process can be anywhere in a tree of subfolders.

For example:

../ANX-BES-3112025
├── 834-014
│   └── V1_02152025
│       ├── OD
│       │   ├── FAF
│       │   │   └── ANX00018.E2E
│       │   └── OCT
│       │       └── ANX00018.E2E
│       └── OS
│           ├── FAF
│           │   └── ANX00018.E2E
│           └── OCT
│               └── ANX00018.E2E
...

Running a workflow with listFiles() like your example will give me:

/mnt/data3/awilter/ANX-BES-3112025/834-014
/mnt/data3/awilter/ANX-BES-3112025/834-015
/mnt/data3/awilter/ANX-BES-3112025/834-016
...

which is not what it should get.
Is there any method in NF that would crawl all subfolders etc. returning all found files?
Otherwise I will have to think in another solution for my problem or develop such a method.

file returns a Java Path object, so you can use any method for recursively listing files you would use with Java.

Having said that, you can simplify it by using Channel.fromPath with a recursive glob (**), which will list all relevant files. Here, I’ve fixed my changed my previous example to append a **.txt pattern so it will find all files ending .txt:

workflow {
    input = file(params.input)

    // Create the initial channel based on input type
    if (input.isFile()) {
        // If input is a file, read its content as a list of paths
        raw_files_ch = Channel.fromPath(input)
    }
    else if( input.isDirectory() ) {
        // If input is a directory, list with recursive glob
        raw_files_ch    = Channel.fromPath(input.resolve("**.txt"))
    }

    raw_files_ch.view()
}
> tree input
input
├── sample1
│   ├── input1.txt
│   └── input2.txt
└── sample2
    ├── ignoreme.csv
    └── input3.txt

> nextflow run main.nf --input input.txt
N E X T F L O W  ~  version 24.10.5
Launching `main.nf` [trusting_ramanujan] DSL2 - revision: df1c0f107e
/Path/input.txt

> nextflow run main.nf --input input/
N E X T F L O W  ~  version 24.10.5
Launching `main.nf` [pensive_laplace] DSL2 - revision: df1c0f107e
/Path/input/sample2/input3.txt
/Path/input/sample1/input1.txt
/Path/input/sample1/input2.txt

Thanks @Adam_Talbot!
I was coming here to post that I had found something in this line, using like:

        raw_files_ch = Channel.fromPath(
                [
                    "${params.input}/**/*.e2e",
                    "${params.input}/**/*.fda",
                    "${params.input}/**/*.sdb",
                ]
            )
            .flatten()

And I’m glad to see I’m on the right path now.

The thing is, I’m still thinking with a bash mindset when I must move to the nextflow mindset, and this Forum and Slack channel have been a priceless source for this transition.

The thing is, I’m still thinking with a bash mindset when I must move to the nextflow mindset, and this Forum and Slack channel have been a priceless source for this transition.

That’s a common issue! The trick is to consider a small bash step (i.e. a single command invocation), then use Nextflow to coordinate lots of those bash steps.

Getting started with files and handling is probably the trickiest part when starting a new pipeline.

Now there’s a permanent record of the solution on this forum!

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.