How to check of a channel is empty?

I want to check if the channel output of a process is empty. Here is my script:

process PREPROCESS_FASTQ {

    publishDir "${params.workDir}" , mode: 'copy'
    beforeScript 'chmod o+rw .'

    input:
    tuple val(sample_id), val(order_id), path(fastqs), path(frum_fastqs)

    output:
    tuple val(sample_id), val(order_id), path("${sample_id}_a_all.fastq.gz"), path(frum_fastqs), emit: reuse
    tuple val(sample_id), val(order_id), path("${sample_id}_a_all.fastq.gz"), emit: noreuse
    val(frum_fastqs), emit: check

    script:
    """
    zcat ${fastqs} | gzip > ${sample_id}_a_all.fastq.gz
    """
}
workflow {
    combined_fastq = PREPROCESS_FASTQ(frum_files)
    if(combined_fastq.check.ifEmpty(true))
    {
        combined_fastq.reuse.view() 
    }
    else{
        combined_fastq.noreuse.view()
    }
}

Here is my samplesheet:

sample,samplename,orderid,fastq_dir,reference,frum_fastq_dir
sample01_run1,sample01,ord_01,/sample01/b03/*.fastq.gz,,
sample02_run1,sample02,ord_02,/sample02/b03/*.fastq.gz,,/fastq_pass/b01/*fastq.gz

For the first row in the samplesheet, I want the script to generate “combined_fastq.noreuse.view()” and for the second row of the samplesheet, generate “combined_fastq.reuse.view()”. How do i do this?

There’s an important concept here, you need to check to be operating on the contents of a channel, not the channel itself. Let me explain…

When you do if(combined_fastq.check.ifEmpty(true)) you are saying the following:

if the channel combined_fastq.check is empty, view the reuse channel

But this doesn’t really make sense, because a channel is an object. It’s a way of chaining processes together, so when you check it you are saying “hey, does this channel contain anything?”, but of course the channel does contain stuff, because you just populated it from the process. If you check the documentation for the ifEmpty operator, you will see it creates a channel when the channel is empty, so your example code is creating a new channel containing the value true within the if statement, instead of filtering on a criteria. Darn!

What you want to do is check the contents of the channel, to see if it contains the item contain frum_fastq. If we rephrase your question slighty…

if items in the channel combined_fastq.reuse contains frum_fastqs, view them

This now becomes more clear what to do. We should look inside the items within the combined_fastq outputs and see if they include frum_fastqs.

Based on your samplesheet, we can actually do this without running a process at all! Let’s have a go. I’ve saved your example samplesheet into a file input.csv.

workflow {
    samplesheet_ch = Channel.fromPath("input.csv", checkIfExists: true)
        .splitCsv(header: true) // Split the CSV file into individual items

    // Let's get only the samples that have a valid value for frum_fastq_dir 
    samplesheet_ch
        .filter { it.frum_fastq_dir }
        .view()
}

This should write the following to your terminal:

> nextflow run .
N E X T F L O W  ~  version 23.10.1
Launching `./main.nf` [chaotic_cray] DSL2 - revision: 62eb8c2da7
[sample:sample02_run1, samplename:sample02, orderid:ord_02, fastq_dir:/sample02/b03/*.fastq.gz, reference:, frum_fastq_dir:/fastq_pass/b01/*fastq.gz]

The first part is reading the samplesheet and parsing it using the splitCsv operator.

The second part uses filter to remove any samples from the channel that do not contain a value for frum_fastq_dir. This leaves us with a single sample for frum_fastq.

So how can we use this? Well it depends on exactly what you want to do, but let’s imagine you want to run PREPROCESS_FASTQS on samples that do not contain frum_fastqs. We could do this:

process PREPROCESS_FASTQ {

    input:
    tuple val(sample_id), val(order_id), path(fastqs)

    output:
    tuple val(sample_id), val(order_id), path("${sample_id}_a_all.fastq.gz")

    script:
    """
    echo "myfastqdatagoeshere" | gzip > ${sample_id}_a_all.fastq.gz
    """
}
workflow {
    samplesheet_ch = Channel.fromPath("input.csv", checkIfExists: true)
        .splitCsv(header: true) // Split the CSV file into individual items

    samplesheet_ch
        // Let's remove samples that do not include frum_fastq_dir
        .filter { !it.frum_fastq_dir }
        // We use a map to make the channel fit the input tuple of the process
        .map { meta ->
            tuple(meta.sample_id, meta.order_id, file(meta.fastq_dir, checkIfExists: true))
        }
        .set { for_preprocessing_ch }

    for_preprocessing_ch.view() // I put this here for debugging. It can be removed.
    PREPROCESS_FASTQ(for_preprocessing_ch)
}

In conclusion, it’s possible to check if a channel is empty using isEmpty, however, I’m not sure this is what you want to achieve. Instead, you have to think about operating on the contents of your channels and using them to connect your processes together and build your pipeline. I hope this helps!

this is very helpful, thank you @Adam_Talbot !!

HI @Adam_Talbot I have a follow up question.

This is essentially the pseudocode of what I am trying to do:

channel_1 = Channel.fromPath(params.samplesheet)
.splitCsv(header: true)
.filter {it.frum_fastq_dir}

channel_2 = Channel.fromPath(params.samplesheet)
.splitCsv(header: true)
.filter {!it.frum_fastq_dir}

if(channel_1){
channel_3 = processA(channel_1)
}

else if(channel_2){
channel_3 = processB(channel_2)
}

processC(channel_3)

What i am trying to do here is, if frum_fastq_dir column in the samplesheet has values, a) create channel_1 if no values in frum_fastq_dir column which then is used in processA, b) then create channel_2 which is used in processB and c) i want to create a third processC that takes channel_3 as input, which is that collected from the output of process A or B.

The implementation of this logic doesn’t work because even if the channel_1 or channel_2 are empty, they still exist and the if and else if statements are both true. So how do i implement this logic in nextflow?

Remember, you are checking the contents of the channels, not the channels themselves.

But there’s something else you need to consider. If a channel is empty, Nextflow will not run a process! In that way, we don’t need to use an if statement at all!

In this example, I use a branch operator to split the channel into two (this is basically like two filter operators together). Then I run PROCESS_A and PROCESS_B on the channels. Afterwards, I mix the results of both channels so that they are concatenated together. If you play with the input samplesheet, you will note that the executed processes change based on channel contents. You have your conditional logic defined in your pipeline structure, no if statements in sight!

process PROCESS_A {
    input:
        val frum_fastq

    output:
        val frum_fastq

    script:
    """
    echo $frum_fastq
    """
}

process PROCESS_B {
    input:
        val frum_fastq

    output:
        val frum_fastq

    script:
    """
    echo $frum_fastq
    """
}

process PROCESS_C {
    input:
        val frum_fastq

    output:
        val frum_fastq

    script:
    """
    echo $frum_fastq
    """
}

workflow {
    samplesheet_ch = Channel.fromPath("input.csv", checkIfExists: true)
        .splitCsv(header: true) // Split the CSV file into individual items

    samplesheet_ch
        // Split samples into with frum_fastq_dir and without frum_fastq_dir
        .branch { 
            frum_fastq: it.frum_fastq_dir
            no_frum_fastq: !it.frum_fastq_dir 
        }
        .set { for_preprocessing_ch }

    // Execute each process independently
    process_a_out = PROCESS_A(for_preprocessing_ch.frum_fastq)
    process_b_out = PROCESS_B(for_preprocessing_ch.no_frum_fastq)

    // Get the results back together
    results = process_a_out.mix(process_b_out)
    process_c_out = PROCESS_C(results)
    process_c_out.view()
}

i tried that and this is the error i get: Multi-channel output cannot be applied to operator mix for which argument is already provided

You need to make sure you apply to the split channel, like so:

for_preprocessing_ch.frum_fastq.mix(etc) // THIS
for_preprocessing_ch                     // NOT THIS

Or, if you are dealing with the outputs of a process make sure you do it on the contents of out. For example:

PROCESS_A.out.frum_fastq  // THIS
PROCESS_A.out             // NOT THIS