Nextflow configuration for sequential and parallel execution in multipod

Please Help
I have 3 process a,b,c which is running sequentially through nextflow workflow and each process is running in parallel for each arguments passed. This is running on kubernetes job which uses a single aws eksfargate pod. The nextflow code is working. However the process b fails due to memory exhaust issue. To rectify this I applied below nextflow.config file. Now the b process is not starting after the completion of a process. I’m also doubtful whether below config file will meet my criteria of sequential and parallel execution without pod exhaust issue.
Could you please tell me what is wrong in this config.

cloud {
      driver = 'k8s'
      workDir = '/bbb/work'
    }
    k8s {
      namespace = 'bbb-a'
    }
    profiles {
    dev {
        params {
            ceh_bucket_path = 's3://dev'
        }
    }
    prod {
        params {
            ceh_bucket_path = 's3://prod'
        }
    }
}
process {
    executor = 'k8s'
    workDir = '/bbb/work'
    memory = '4G'
    cpus = '1'
    container = 'imageid'
    pod {
        label = 'app.lrl.nino.com/compute'
        value = 'serverless'
    }
    withName: 'a' {
        memory = '4G'
        cpus = '1'
        // Add your container image if required
        //container = 'features_engineering'
    }
    withName: 'b' {
        memory = '120G'
        cpus = '12'
        //container = 'models'
    }
    withName: 'c' {
        memory = '8G'
        cpus = '1'
        // container = 'revenues'
    }
}
k8s {
    storageClaimName = 'bbb'
    storageMountPath = '/bbb/work'
}
docker {
    enabled = true
    fixOwnership = true
    runOptions = '-u $(id -u):$(id -g)'
}

Below is my nf file

#!/usr/bin/env nextflow
nextflow.enable.dsl=2
directory = "${workflow.launchDir}"
logFile = "${workflow.launchDir}/results.log" // Define a log file for all outputs
ceh_bucket_path = "s3://samplebucket"
process a {
    errorStrategy 'ignore'
    input:
    val arg
    output:
    path "result_a_${arg}.txt"
    script:
    """
    chmod +x "${workflow.launchDir}/docker/a.sh"
    if (cd "${workflow.launchDir}" && bash ./docker/a.sh "\$PARAMETERS" >> "${logFile}" 2>&1); then
        touch result_a_${arg}.txt
        aws s3 cp result_a_${arg}.txt ${ceh_bucket_path}/nextflow/results/ || { echo "Failed to copy result file"; exit 1; }
    else
        echo "Rscript failed" >> ${logFile}
        exit 1
    fi
    aws s3 cp ${logFile} ${ceh_bucket_path}/nextflow/mlops_logs/ || { echo "Failed to copy log file"; exit 1; }
    """
}
process b {
    errorStrategy 'ignore'
    input:
    path prev_output
    val arg
    output:
    path "result_b_${arg}.txt"
    script:
    """
    if (cd "${workflow.launchDir}" && bash ./docker/b.sh "\$PARAMETERS" >> "${logFile}" 2>&1); then
        touch b${arg}.txt
        aws s3 cp b${arg}.txt ${ceh_bucket_path}/nextflow/results/ || { echo "Failed to copy result file"; exit 1; }
    else
        echo "Rscript failed" >> ${logFile}
        exit 1
    """
}
process c {
    errorStrategy 'ignore'
    input:
    path prev_output
    val arg
    output:
    path "c_${arg}.txt"
    script:
    """
    """
}
workflow {
    allargs = []
    // Channel.from() creates a channel which emits the given arguments
    // The 'a' process will be executed concurrently for each argument emitted by the channel
    params.each { key, value ->
        allargs.add(value)
    }
    argChannel = Channel.from(allargs)
    inputArgCount = allargs.size()
    // Define the first chain of jobs
    a(argChannel)
    bOut = b(a.out, argChannel).collect()
    cOut = c(bOut, argChannel)
    cOut.view { "Final output: $it" }
}

Is it possible for this setup to work effectively in a single kubernetes JOB and a container.

I’ve edited your post to use code formatting so it’s easier to read.

Is it possible for this setup to work effectively in a single kubernetes JOB and a container.

Yes, this should work fine. Could you share the error that occurs when you run the pipeline?

I notice process a and b use some odd logic. You should not use this shell script, instead do this:

  • put a.sh and b.sh in the bin/ directory of the repo. They are now available in the path to run.
  • do not use aws s3 cp to copy files in and out, use the publishDir to put these files in the output directory.
  • Do not catch the error, just let Nextflow handle it.
  • process c doesn’t appear to do anything?

After you move the shell script, process a and b could look like this:

process a {

    publishDir "${ceh_bucket_path}/nextflow/results/"

    input:
        val arg
    output:
        path "result_a_${arg}.txt"
    script:
    """
    b.sh
    """
}
process b {

    publishDir "${ceh_bucket_path}/nextflow/results/"

    input:
        path prev_output
        val arg
    output:
        path "result_b_${arg}.txt"

    script:
    """
    b.sh
    """
}

Thanks!

The new pods created by nextflow.config is unable to connect to database. I passed below parameters but I could see they are not being added as env variable of pods in console and still db connection fails.

env {
    rs_uid = 'valueFrom:secretKeyRef:name:bbb-redshift-db-credentials:key:username'
    rs_pwd = 'valueFrom:secretKeyRef:name:bbb-redshift-db-credentials:key:password'
    host = 'valueFrom:secretKeyRef:name:bbb-redshift-db-credentials:key:host'
    db_name = 'valueFrom:secretKeyRef:name:bbb-redshift-db-credentials:key:db'
}

Sorry, without the error or logs or database details it’s very hard for me to understand what’s going wrong. Nextflow isn’t doing anything special here though, it’s just creating ordinary pods.

I am facing an issue with process orchestration. I have process a,b,c which should execute sequentially and each process should run in parallel for arg1, arg2, arg3. However it is observed that process b for arg2 starts before process a for arg2 finishes. Looks like process b is just checking and getting triggered if any of the arguments for process a is passed, in this case if process a(arg1 or agr2) finishes it triggers process b(arg2). Even if a(arg2) fails, b(arg2 and arg1) is triggered. How does nextflow know for which arguments the previous process was successful and continue execution of next process only for the passed arguments as and when each argument completes.
How can this issue be addressed. Could you please review my nf file or config file and #help me out here.
Below is my nf file:

#!/usr/bin/env nextflow
nextflow.enable.dsl=2
directory = "${workflow.launchDir}"
logFile = "${workflow.launchDir}/results.log" // Define a log file for all outputs
ceh_bucket_path = "s3://samplebucket"
process a {
    errorStrategy 'ignore'
    input:
    val arg
    output:
    path "result_a_${arg}.txt"
    script:
    """
    chmod +x "${workflow.launchDir}/docker/a.sh"
    if (cd "${workflow.launchDir}" && bash ./docker/a.sh "\$PARAMETERS" >> "${logFile}" 2>&1); then
        touch result_a_${arg}.txt
        aws s3 cp result_a_${arg}.txt ${ceh_bucket_path}/nextflow/results/ || { echo "Failed to copy result file"; exit 1; }
    else
        echo "Rscript failed" >> ${logFile}
        exit 1
    fi
    aws s3 cp ${logFile} ${ceh_bucket_path}/nextflow/mlops_logs/ || { echo "Failed to copy log file"; exit 1; }
    """
}
process b {
    errorStrategy 'ignore'
    input:
    path prev_output
    val arg
    output:
    path "result_b_${arg}.txt"
    script:
    """
    if (cd "${workflow.launchDir}" && bash ./docker/b.sh "\$PARAMETERS" >> "${logFile}" 2>&1); then
        touch b${arg}.txt
        aws s3 cp b${arg}.txt ${ceh_bucket_path}/nextflow/results/ || { echo "Failed to copy result file"; exit 1; }
    else
        echo "Rscript failed" >> ${logFile}
        exit 1
    """
}
process c {
    errorStrategy 'ignore'
    input:
    path prev_output
    val arg
    output:
    path "c_${arg}.txt"
    script:
    """
    """
}
workflow {
    allargs = []
    // Channel.from() creates a channel which emits the given arguments
    // The 'a' process will be executed concurrently for each argument emitted by the channel
    params.each { key, value ->
    allargs.add(value)
    }
    argChannel = Channel.from(allargs)
    inputArgCount = allargs.size()
    // Define the first chain of jobs
    a(argChannel)
    bOut = b(a.out, argChannel).collect()
    cOut = c(bOut, argChannel)
    cOut.view { "Final output: $it" }
}

I think you need to go back and read through the basic tutorial, you’re missing some basic concepts here and making things much more complicated than it needs to be. Take a look at Welcome - training.nextflow.io to get an understanding of channels, processes and file handling. My general advice remains the same as above.

I have some additional questions you will need to answer to make a reproducible version:

  • What are the contents of a.sh and b.sh?
  • What are the contents of the environment variable PARAMETERS in processA and processB?
  • What is the purpose of processC which does not appear to do anything?
  • What is the purpose of this pipeline? What does the Rscript do?
  • Why do you parse the parameters with a list: params.each { key, value -> allargs.add(value)}?