When clause depreciated. What's a good alternative?

the nextflow language has depreciated the when cause in nextflow processes. The recommendation is to use an ‘if’ condition presumably in the workflow block. For this control flow I need to retrieve a boolean from a json file. See json, main.nf and output below.

What is a good way to achieve this?

$ cat one.json
{
    "RUN_POST_STAGE": false,
    "RUN_VEP_ANNOTATION": true

}

My first attempt is

% cat main.nf 
#!/usr/bin/env nextflow
nextflow.enable.dsl=2

def runPost(fireflyConfig){
    def jsonSlurper = new groovy.json.JsonSlurper()
    def sampleParams = jsonSlurper.parse(fireflyConfig)
    return sampleParams['RUN_POST_STAGE']
}

process replicate {
    label "Replicate"

    input:
            path(configfile)
    output:
            path(configfile), emit: configoutput
    script:
    """
        echo "configfile:   ${configfile}"
    """
}

workflow {
    configfile = file('/Users/jkern/wip/whh-341/one.json')
    replicate(configfile)
    fireflyConfig = replicate.out.configoutput.flatten()

    
    run_post_stage = fireflyConfig
        .map { runPost(it) } 
        .dump{params -> "runPost: $params"}
        .collect()  // coverts from queue to variable channel
        .map { it[0] } // Get the first (and only) element

    run_post_stage.subscribe { runPost ->
        if (runPost) {
            println 'run_post_stage is true'
        } else {
            println 'run_post_stage is false'
        }
    }
}

it works. Here is output

% nextflow run  main.nf 

 N E X T F L O W   ~  version 24.10.2

Launching `main.nf` [evil_brenner] DSL2 - revision: 164e551660

executor >  local (1)
[08/e0905b] replicate [100%] 1 of 1 ✔
run_post_stage is false

but it seems awkward. At this point, I’m inclined to stay with the ‘when’ clause in the process. Is there a straightforward way to write this? What about a compound decision in this control flow based on the RUN_VEP_ANNOTATION?

-jk

You should use either the .branch or .filter channel operators for implementing conditional logic in workflows.

workflow {
    ch_odds_and_evens = TASK( Channel.of(1..2) )
        .branch { nums ->
            even: nums.toInteger() % 2 == 0
            odd:  nums.toInteger() % 2 == 1
        }
    ch_odds_and_evens.odd.view()
}

process TASK {
    input:
    val num

    script:
    """
    echo $num
    """

    output:
    stdout
}

Hello Mahesh,

Thanks for taking the time to respond to my inquiry.

Things get interesting not simply retrieving the value but using it. Going back to the original. I had to call it like.

This works but seems awkward. How is one expected to use it in a principled way? What happens if there is a second boolean? I haven’t run this but are we expected to run it like…

run_vep_stage.subscribe { runVep -> 
    run_post_stage.subscribe { runPost ->
        if (runPost && runVep) {
            println 'run_post_stage is true'
        } else {
            println 'run_post_stage is false'
        }
    }
}

-jk

So you have a process output which should control whether a downstream process will execute? In that case I would do something like this:

workflow {
    filtered_inputs = fireflyConfig.filter { json -> runPost(json) && runVep(json) }
    DOWNSTREAM_PROC( filtered_inputs )
}

Thanks Ben! that is exactly the question.

For those that may be reading this at a later time. I have modified my test case to highlight Ben’s solution.

runVep() is added to the main.nf along with a nxf process called DOWNSTREAM_PROC.

#!/usr/bin/env nextflow
nextflow.enable.dsl=2

def runPost(fireflyConfig){
    def jsonSlurper = new groovy.json.JsonSlurper()
    def sampleParams = jsonSlurper.parse(fireflyConfig)
    return sampleParams['RUN_POST_STAGE']
}


def runVep(fireflyConfig){
    def jsonSlurper = new groovy.json.JsonSlurper()
    def sampleParams = jsonSlurper.parse(fireflyConfig)
    return sampleParams['RUN_VEP_ANNOTATION']
}

process replicate {
    label "Replicate"

    input:
            path(configfile)
    output:
            path(configfile), emit: configoutput
    script:
    """
        echo "configfile:   ${configfile}"
    """
}

process DOWNSTREAM_PROC {
   label "Downstream_process"

   input:
	val filtered_inputs

   script:
   """
   echo "filtered: ${filtered_inputs}"
   """
}

workflow {
    configfile = file(params.configfilename)
    replicate(configfile)
    fireflyConfig = replicate.out.configoutput.flatten()

    filtered_inputs = fireflyConfig.filter { json -> runPost(json) && runVep(json) }
    DOWNSTREAM_PROC( filtered_inputs )
}

I defined a parameter in the nxf config file

params {
   configfilename = "true.json"
}

The json files are

true.json

{
    "RUN_POST_STAGE": true,
    "RUN_VEP_ANNOTATION": true

}

false.json

{
    "RUN_POST_STAGE": false,
    "RUN_VEP_ANNOTATION": false

}

then we can run it

% nextflow run main.nf  --configfilename="true.json"

 N E X T F L O W   ~  version 24.10.2

Launching `main.nf` [big_watson] DSL2 - revision: 57db1af215

executor >  local (2)
[65/d8fb62] replicate           [100%] 1 of 1 ✔
[8a/da0eb9] DOWNSTREAM_PROC (1) [100%] 1 of 1 ✔

% nextflow run main.nf  --configfilename="false.json"

 N E X T F L O W   ~  version 24.10.2

Launching `main.nf` [desperate_shannon] DSL2 - revision: 57db1af215

executor >  local (1)
[b2/f7221d] replicate       [100%] 1 of 1 ✔
[-        ] DOWNSTREAM_PROC -

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.