Dynamically assign Labels

Hi Guys,

I’m trying to demonstrate a proof of concept for AWS.
Imagine we have a samplesheet input, within this samplesheet we will have multiple sampletypes with differing pipeline configs.

I would have AWS batch configured and in my .config pointing processes with labels like “WGS” to a certain compute environment

Is it possible to dynamically assign labels to processes based on inputs? Am I going about this the wrong way? Everything else works except this Label part. It seems through googling and gpt-ing that sample_row is not available to label as labels get evaluated before inputs?

process PROCESS_SAMPLE {
    tag "${sample_row.ID} - ${sample_row.PIPELINE}"

    // Dynamically assign labels based on the PIPELINE value
    label sample_row.PIPELINE ?: 'default'

    input:
        val sample_row

    output:
        stdout

    script:
        // Dynamically select inputs based on the PIPELINE value
        def pipeline_config = params.pipeline_configs[sample_row.PIPELINE] ?: params.pipeline_configs.default
        """
        echo 'hello ${sample_row.ID}, you want pipeline ${sample_row.PIPELINE} with BEDFILE ${pipeline_config.bedfile} and REFERENCE ${pipeline_config.reference}'
        """
}

workflow {
    // Read the samplesheet.csv file
    Channel.fromPath('samplesheet.csv')
        .splitCsv(header: true, sep: ',') 
        .set { sample_rows } 

    sample_rows.view()

    // Invoke the process with each row individually
    PROCESS_SAMPLE(sample_rows)
}

Usually what one does is to write a closure which can be called from the directives that do accept dynamic inputs.

process {
    withLabel: 'some_label' {
        cpus = {
             def selectCpus = { pipeline ->
                 if ( pipeline == "p1" ){
                     2
                 } else if ( pipeline == "p2" ) {
                     4
                 } else {
                     1
                 }
             }
             selectCpus( sample_row.PIPELINE )
        }
    }
}

Thank you! That makes sense, would you know if cpus could instead be queue or can that only be defined in the configs? I’m assuming that queue definition could be passed in this way?

I.E something like this

main.nf

process {
    withLabel: 'some_label' {
        queue = {
             def selectQueue = { pipeline ->
                 if ( pipeline == "p1" ){
                     'WGS'
                 } else if ( pipeline == "p2" ) {
                     'Somatic'
                 } else {
                     'Default'
                 }
             }
             selectQueue( sample_row.PIPELINE )
        }
    }
}

The way I’ve been show, queues have to be defined on process names or labels in your conf like

process {
    executor = 'awsbatch'
    time = '24:00:00'


    withName: 'Hello' {
        queue = 'Goodbye'
        cpus = 16
        memory = '200 GB' 
    }

I think process.queue does accept dynamic directives so your example should work.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.