Nextflow can't file file from combined Channel when running on running awsbatch executor

I have the following process defined below that takes a tuple of sample_id, bam, bam_index:

process trgt {
    publishDir "${params.output_dir}/TRGT_results", mode: 'copy'
    tag "$sample_id"

    input:
    tuple  val(sample_id), path(bam), path(bam_index)
    path reference
    path reference_index
    path tandem_repeat_bed
    val(karyotype)
    val(cpus)

    output:
    tuple  val(sample_id), path("${sample_id}.trgt.spanning.sorted.bam"), path("${sample_id}.trgt.spanning.sorted.bam.bai"), emit: spanning_reads
    tuple  val(sample_id), path("${sample_id}.trgt.sorted.vcf.gz"), path("${sample_id}.trgt.sorted.vcf.gz.tbi"), emit: repeat_vcf

    
    """
    set -euo pipefail


    trgt --version
    trgt genotype \\
        --threads ${cpus} \\
        --karyotype ${karyotype} \\
        --genome ${reference} \\
        --repeats ${tandem_repeat_bed} \\
        --reads ${bam} \\
        --output-prefix ${sample_id}.trgt

    bcftools --version
    bcftools sort \\
        --output-type z \\
        --output ${sample_id}.trgt.sorted.vcf.gz \\
        ${sample_id}.trgt.vcf.gz

    bcftools index \\
        --threads ${cpus} \\
        --tbi \\
        ${sample_id}.trgt.sorted.vcf.gz

    samtools --version
    samtools sort \\
        -@ ${cpus} \\
        -o ${sample_id}.trgt.spanning.sorted.bam \\
        ${sample_id}.trgt.spanning.bam

    samtools index \\
        -@ ${cpus} \\
        ${sample_id}.trgt.spanning.sorted.bam
    """

    
}

I’m combining two channels for the bam file and the index and the workflow runs fine locally:

workflow {



    
    // Create channel from input BAM files
    bam_ch = Channel.fromPath(params.input_reads)
        .map { bam -> 
            def sample_id = bam.baseName.toString().replaceFirst(/\.aligned$/, '')
            return tuple(sample_id, bam)
        }
    
    // Create channel for BAI files
    bai_ch = bam_ch.map { sample_id, bam -> 
        def bai = file("${bam}.bai")
       
        if (!bai.exists()) {
            error "Index file not found for ${bam}"
        }
        return tuple(sample_id, bai)
    }

    // Combine BAM and BAI channels
    bam_bai_ch = bam_ch.join(bai_ch)
        .map { sample_id, bam, bai -> tuple(sample_id, bam, bai) }

    reference = file(params.reference)
    reference_index = file(params.reference_index)
    trgt_bed = file(params.repeats)
    
    // Print the list of input files
    bam_bai_ch.view { sample_id, bam, bai -> "Input: $sample_id, BAM: $bam, BAI: $bai" }
    
    trgt(bam_bai_ch, reference, reference_index, trgt_bed, params.karyotype, params.sort_threads)

It runs fine locally, but when I when I run as awsbatch as executor I get the error:

Index file not found for /2x-ngs/AlignedBams/hifi_10k.aligned.bam and nextflow exits.

I"m a little concerned it’s not understanding that the file is on s3. I’m not sure if it’s something I’m doing wrong in my config. The trgt program expects the .bai to be in the same directory as the bam, so I have to stage the index file along with the bam file when making the nextflow workflow before calling my process.

process {
    executor = 'awsbatch'
    queue = '2X-NGS-jobs'
    container = '339712742158.dkr.ecr.us-west-2.amazonaws.com/aindap_2x/ngs:latest'
    memory='30GB'
    cpus=16
}

aws {
    region = 'us-west-2'
    accessKey = 'xxx'
    secretKey = 'xxx'

    batch {
        cliPath = '/usr/local/aws-cli/v2/current/bin/aws'
    }
}

params {
    ref_dir = "s3://2x-ngs/genomes"
    reference = "${ref_dir}/Homo_sapiens-GCA_009914755.4-softmasked.fa"
    reference_index = "${reference}.fai"
    
    input_dir  = 's3://2x-ngs/AlignedBams'
    input_reads= "${input_dir}/*.aligned.bam"
    output_dir = 's3://2x-ngs/TRGT_results'

    
    repeats = 's3://2x-ngs/T2T_resources/Homo_sapiens-GCA_009914755.trgt_liftOver_sorted.v0.3.4.bed'
    karyotype = 'XY'
    
    sort_threads=12
    threads = 12
    sort_threads = 4
}

Ok, I think this was a much more straightforward approach:

  bam_bai_ch = Channel.fromFilePairs(params.input_reads + '{,.bai}', size: 2, flat: true)
        .map { sample_id, bam, bai -> 
            def corrected_sample_id = sample_id.replaceFirst(/\.aligned$/, '')
            tuple(corrected_sample_id, bam, bai)
        }

that fixed my original problem!

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.