Conditional workflow execution

Hi, I have a Nextflow pipeline where the contents of one of the output files in the first process determines what subsequent processes run. The first process is checking something about its inputs, and its output is a file that says either “passed” or “failed”; the idea is that if this check doesn’t succeed, there’s no point in running any subsequent time-consuming analyses. I wrote code that looks at the output file contents and sets process execution with an if statement, but the Boolean check doesn’t seem to be working. In the code below, the line with check_text.contains("passed") always returns true regardless of input. I’d appreciate any help to understand why I’m not getting the intended result.

main:
# Run a process
process1(...)

# Retrieve contents of process1 output file
check_text = process1.out.check.map{ it -> file(it).text }

if (check_text.contains("passed")) { // <-- This always returns "true"
    // Run this set of processes
} else {
    // Run that set of processes
}

You should use the .branch channel operator for this kind of thing.

This doesn’t work because you’re using a function that shouldn’t work on a channel (I’m rather confused as why it’s working. It should error)

This example does what you want. Here branch is used to check the value of status

workflow {
    QC_TASK()
    qced_samples = QC_TASK.out.csv
        .splitCsv( header: ['id', 'status'], sep: ',' )
        .branch { sample ->
            pass: sample.status == 'pass'
            fail: sample.status == 'fail'
        }
    qced_samples.pass
        .view()
}

process QC_TASK {
    script:
    """
    cat <<-EOF > pass_fail.csv
    sample1,pass
    sample2,fail
    sample3,pass
    EOF
    """

    output:
    path "pass_fail.csv", emit: csv
}

Mahesh’s answer is correct and contains a good suggestion for a possible fix. Here’s some additional context that might be helpful in understanding that why your code doesn’t work.

When you are working with channels within a subworkflow, it helps to keep in mind that you are not executing on a specific sample, but are setting up routes for execution.

In your example the check_text object is not a specific file contents, but it is a channel of file contents. Along with that check_text.contains("passed") is not a specific boolean, but it is a channel of booleans. When your if statements checks the truthiness of check_text.contains("passed") it always returns true because the channel exists. (Honestly, this probably should at least return a warning instead.)

Here’s an adaptation of Mahesh’s suggestion for a single file with PASS or fail"

workflow {
    QC_TASK()
    qced_samples = QC_TASK.out.check
 
        .branch { id, qc_check ->
            pass: qc_check.text.contains("passed")
            fail: qc_check.text.contains("failed")
        }
    qced_samples.pass
        .view()
}

process QC_TASK {

    input:
    tuple(val, sample_id, path reads)

    output:
    tuple(val sample_id, path qc_results)

    script:
    """
    cat <<-EOF > check_qc.txt
    passed
    EOF
    """

    output:
    path "check_qc.txt", emit: check
}

Resurrecting an old thread here, but I like a pattern where you filter based on the contents rather than use branch.

process1.out.check
    .filter { it -> file(it).text.contains("passed") }
    .set { pass_samples }

process1.out.check
    .filter { it -> file(it).text.contains("failed") }
    .set { fail_samples }

// Proceed with your workflow...
process2(pass_samples)
process3(fail_samples)