First, my pipeline works fine in my local server.
Now I want it to run in AWS Batch using S3 for input and output.
Everything seems ok, it finished the first process, FIND_RAW_FILES
, whose input is a “folder” s3://lairc-data-test/sample
, and it has to locate the files for the next process, and it will need and save these paths in a txt file. The output is like:
/fusion/s3/lairc-data-test/sample/834-014/V1_02152025/OD/OCT/ANX00018.E2E
/fusion/s3/lairc-data-test/sample/834-014/V1_02152025/OS/OCT/ANX00018.E2E
/fusion/s3/lairc-data-test/sample/834-015/V1_02152025/OD/OCT/ANX00018.E2E
/fusion/s3/lairc-data-test/sample/834-015/V1_02152025/OS/OCT/ANX00018.E2E
Then the second process, EXTRACT_IMAGES
, fails with:
Error executing process > 'EXTRACT_IMAGES (1)'
Caused by:
Can't stage file /fusion/s3/lairc-data-test/sample/834-014/V1_02152025/OS/OCT/ANX00018.E2E -- file does not exist
My current nextflow.config
is:
tower {
enabled = true
accessToken = System.getenv('TOWER_ACCESS_TOKEN')
workspaceId = '111111111'
}
docker {
enabled = true
runOptions = '--gpus all -u $(id -u):$(id -g)'
}
params {
input = null
pattern = ''
segs = 'fluid,aug'
}
process {
executor = 'local'
container = 'ghcr.io/phenopolis/image-analysis:latest'
withName: RUN_SEG {
maxForks = 4
}
}
profiles {
aws {
workDir = 's3://lairc-data-test/seqera/work'
params {
input = 's3://lairc-data-test/sample'
pattern = ''
segs = 'fluid'
}
process {
executor = 'awsbatch'
queue = 'TowerForge-xxxxxxxxx'
scratch = false
}
wave {
enabled = true
strategy = 'container'
}
fusion {
enabled = true
}
aws {
region = 'eu-west-2'
batch {
volumes = '/scratch/fusion:/tmp'
}
}
}
}
The command issued: nextflow run segmentation.nf -name test_aws -profile aws
.
Here’s my workflow
(from segmentation.nf
):
workflow {
input = file(params.input)
// Create the initial channel based on input type
if (input.isFile()) {
// If input is a file, read its content as a list of paths
raw_files_ch = Channel.fromPath(input).splitText().map { it -> file(it.trim()) }
}
else {
// If input is a directory, use the FIND_RAW_FILES process
FIND_RAW_FILES(input, params.pattern)
raw_files_ch = FIND_RAW_FILES.out.found_files.splitText().map { it -> file(it.trim()) }
}
// Find the common path - modified to handle single file case
common_path = raw_files_ch
.collect()
.map { files ->
if (files.size() == 1) {
// For single file, use its parent directory as common path
return files[0].parent.toString()
}
def paths = files.collect { it -> it.toString().split('/') as List }
def minLength = paths.min { it -> it.size() }.size()
def commonParts = (0..<minLength)
.takeWhile { index ->
paths.every { it -> it[index] == paths[0][index] }
}
.collect { it -> paths[0][it] }
return commonParts.join('/')
}
// Create the input channel for EXTRACT_IMAGES with unique labels
extract_images_input_ch = raw_files_ch
.combine(common_path)
.map { afile, common ->
def relativePath = file(common).relativize(afile).toString()
def unique_label = relativePath.replace('/', '_')
if (unique_label == afile.name) {
unique_label = "a${unique_label}"
}
[unique_label, afile]
}
// extract_images_input_ch.view()
common_path.view { msg -> "Common path: ${msg}" }
// Check if no files were found
extract_images_input_ch.count().view { count -> "Number of files found: ${count}" }
// Process the found files with EXTRACT_IMAGES
extract_images_output_ch = EXTRACT_IMAGES(extract_images_input_ch, input)
Now, the funny thing is, if I start a Studio vscode session, install my pipeline from my GitHub repo and issue the same command:
nextflow run segmentation.nf -name test_aws_from_studio -profile aws # (using the very same CE)
it does work beautifully!
So what am I possibly be missing? Reading the forum I mentioned that the “nodes (EC2 node?) need fusion or be compatible with fusion”.