I have the following process defined below that takes a tuple of sample_id, bam, bam_index:
process trgt {
publishDir "${params.output_dir}/TRGT_results", mode: 'copy'
tag "$sample_id"
input:
tuple val(sample_id), path(bam), path(bam_index)
path reference
path reference_index
path tandem_repeat_bed
val(karyotype)
val(cpus)
output:
tuple val(sample_id), path("${sample_id}.trgt.spanning.sorted.bam"), path("${sample_id}.trgt.spanning.sorted.bam.bai"), emit: spanning_reads
tuple val(sample_id), path("${sample_id}.trgt.sorted.vcf.gz"), path("${sample_id}.trgt.sorted.vcf.gz.tbi"), emit: repeat_vcf
"""
set -euo pipefail
trgt --version
trgt genotype \\
--threads ${cpus} \\
--karyotype ${karyotype} \\
--genome ${reference} \\
--repeats ${tandem_repeat_bed} \\
--reads ${bam} \\
--output-prefix ${sample_id}.trgt
bcftools --version
bcftools sort \\
--output-type z \\
--output ${sample_id}.trgt.sorted.vcf.gz \\
${sample_id}.trgt.vcf.gz
bcftools index \\
--threads ${cpus} \\
--tbi \\
${sample_id}.trgt.sorted.vcf.gz
samtools --version
samtools sort \\
-@ ${cpus} \\
-o ${sample_id}.trgt.spanning.sorted.bam \\
${sample_id}.trgt.spanning.bam
samtools index \\
-@ ${cpus} \\
${sample_id}.trgt.spanning.sorted.bam
"""
}
I’m combining two channels for the bam file and the index and the workflow runs fine locally:
workflow {
// Create channel from input BAM files
bam_ch = Channel.fromPath(params.input_reads)
.map { bam ->
def sample_id = bam.baseName.toString().replaceFirst(/\.aligned$/, '')
return tuple(sample_id, bam)
}
// Create channel for BAI files
bai_ch = bam_ch.map { sample_id, bam ->
def bai = file("${bam}.bai")
if (!bai.exists()) {
error "Index file not found for ${bam}"
}
return tuple(sample_id, bai)
}
// Combine BAM and BAI channels
bam_bai_ch = bam_ch.join(bai_ch)
.map { sample_id, bam, bai -> tuple(sample_id, bam, bai) }
reference = file(params.reference)
reference_index = file(params.reference_index)
trgt_bed = file(params.repeats)
// Print the list of input files
bam_bai_ch.view { sample_id, bam, bai -> "Input: $sample_id, BAM: $bam, BAI: $bai" }
trgt(bam_bai_ch, reference, reference_index, trgt_bed, params.karyotype, params.sort_threads)
It runs fine locally, but when I when I run as awsbatch as executor I get the error:
Index file not found for /2x-ngs/AlignedBams/hifi_10k.aligned.bam
and nextflow exits.
I"m a little concerned it’s not understanding that the file is on s3. I’m not sure if it’s something I’m doing wrong in my config. The trgt program expects the .bai to be in the same directory as the bam, so I have to stage the index file along with the bam file when making the nextflow workflow before calling my process.
process {
executor = 'awsbatch'
queue = '2X-NGS-jobs'
container = '339712742158.dkr.ecr.us-west-2.amazonaws.com/aindap_2x/ngs:latest'
memory='30GB'
cpus=16
}
aws {
region = 'us-west-2'
accessKey = 'xxx'
secretKey = 'xxx'
batch {
cliPath = '/usr/local/aws-cli/v2/current/bin/aws'
}
}
params {
ref_dir = "s3://2x-ngs/genomes"
reference = "${ref_dir}/Homo_sapiens-GCA_009914755.4-softmasked.fa"
reference_index = "${reference}.fai"
input_dir = 's3://2x-ngs/AlignedBams'
input_reads= "${input_dir}/*.aligned.bam"
output_dir = 's3://2x-ngs/TRGT_results'
repeats = 's3://2x-ngs/T2T_resources/Homo_sapiens-GCA_009914755.trgt_liftOver_sorted.v0.3.4.bed'
karyotype = 'XY'
sort_threads=12
threads = 12
sort_threads = 4
}