Handle NA and use branch operator - then send to process

I’ve a thread earlier (link in the end) but it’s messed up now, so creating a new one.

The input data is multi columnar separated by comma:

batch timepoint normal_WES_R1 normal_WES_R2 tumor_WES_R1 tumor_WES_R2 RNA_R1 RNA_R2
batch1 sample11 /path/wes/wes_normal_sample11_R1.fastq.gz /path/wes/wes_normal_sample11_R2.fastq.gz /path/wes/wes_tumor_sample11_R1.fastq.gz /path/wes/wes_tumor_sample11_R2.fastq.gz /path/rna/rna_tumor_sample11_R1.fastq.gz /path/rna/rna_tumor_sample11_R2.fastq.gz
batch1 sample1 /path/wes/wes_normal_sample1_R1.fastq.gz /path/wes/wes_normal_sample1_R2.fastq.gz /path/wes/wes_tumor_sample1_R1.fastq.gz /path/wes/wes_tumor_sample1_R2.fastq.gz /path/rna/rna_tumor_sample1_R1.fastq.gz /path/rna/rna_tumor_sample1_R2.fastq.gz
batch2 sample2 /path/wes/wes_normal_sample2_R1.fastq.gz /path/wes/wes_normal_sample2_R2.fastq.gz /path/wes/wes_tumor_sample2_R1.fastq.gz /path/wes/wes_tumor_sample2_R2.fastq.gz /path/rna/rna_tumor_sample2_R1.fastq.gz /path/rna/rna_tumor_sample2_R2.fastq.gz
batch3 sample3 /path/wes/wes_normal_sample3_R1.fastq.gz /path/wes/wes_normal_sample3_R2.fastq.gz /path/wes/wes_tumor_sample3_R1.fastq.gz /path/wes/wes_tumor_sample3_R2.fastq.gz NA NA
batch3 sample4 /path/wes/wes_normal_sample4_R1.fastq.gz /path/wes/wes_normal_sample2_R4.fastq.gz /path/wes/wes_tumor_sample4_R1.fastq.gz /path/wes/wes_tumor_sample4_R2.fastq.gz /path/rna/rna_tumor_sample4_R1.fastq.gz /path/rna/rna_tumor_sample4_R2.fastq.gz

That is batch,timepoint,normal_WES_R1,normal_WES_R2,tumor_WES_R1,tumor_WES_R2,RNA_R1,RNA_R2

There can be missing seventh and eighth column, not all samples may have RNA sequenced.

There are two workflows: RNA and WES based on data type.

There are two scenarios:

  1. Process WES - normal and tumor regardless when RNA is present or not

  2. process RNA when RNA data are available. In the given example batch3,sample3 will be skipped. But I need to process WES (tumor and normal) files for this sample.

I’ve tried following code that uses map and branch:


def createTupleOrString(fileString) {
  if (fileString == "NA") {
      return "NA"
  } else {
      return file(fileString)
  }
}

    Channel.fromPath(file("temp_timestamp_NA.csv"))  
        .splitCsv(sep: ',').map{ row -> 
            // Extract relevant information
            def batch_info = row[0]
            def time_point=row[1]
            def normal_reads = tuple((row[2]),(row[3]))
            def tumor_reads = tuple((row[4]), (row[5]))
            def rna_reads = tuple(createTupleOrString(row[6]), createTupleOrString(row[7]))           
            
                         [
            [type: "tumor", data: tumor_reads],
            [type: "normal", data: normal_reads],
            [type: "rna", data: rna_reads]
        ]
             
        }.branch { type, reads ->
             tumor: type == "tumor"
        normal: type == "normal"
        rna: type == "rna" && reads.data[0] != 'NA'
    }.set {hello}

   // hello.tumor.view { "$it is a tumor" }

   hello.rna | view { "Tumor: $it"}

I get error as:

ERROR ~ Invalid method invocation doCall with arguments:

I cannot see/view the variable hello created.

I do not know how batch_info and timepoint metadata are stored/collected and sent as I’m unable see the content of hello variable.

Sorry for a redundant post.

Link of older post: How to handle NA in file/path in map and process?

Question 1: How do I check a null value in Nextflow?

Nextflow uses null to equal ‘does not exist’. You can use 0, blank or false to be ‘nullable’ for most use cases. In your case, you can test the string is equal to NA then return null like so.

def createTupleOrString(fileString) {
    return fileString != "NA" ? fileString : null
}

The if-else statement here is done using a ternary: The Apache Groovy programming language - Operators

Question 2: How do I parse the samplesheet so I can process RNA if it exists but ignore it if not?

Using the above, test for whether an RNA FASTQ exists:

workflow {
    Channel.fromPath("original.csv")
        .splitCsv(header: true)
        // Replace NA with null. 
        // Adding maps together replaces the values
        .map { it -> 
            it + [
                RNA_R1: it.RNA_R1 != "NA" ? it.RNA_R1 : null,
                RNA_R2: it.RNA_R2 != "NA" ? it.RNA_R2 : null,
            ]
        }
        // Branch based on whether the first read of the RNA exists
        .branch { it ->
            rna: it.RNA_R1
            other: !it.RNA_R1
        }
        .set { input_ch }
    
    input_ch.rna.view { "${it.batch}:${it.timepoint} has an RNA sample!" }
    input_ch.other.view { "${it.batch}:${it.timepoint} does not have an RNA sample!" }
}

You now have two channels to handle, one including RNA one without.

Question 3: How do we make this all a bit easier

However, it seems like your initial premise is quite awkward. Your initial data format is a bit weird, which is making the input tricky. Instead of going along (wide), I would use a long format, like so:

batch timepoint tissue sequencing_type fastq_1 fastq_2
batch1 sample1 germline wes /path/wes/wes_normal_sample1_R1.fastq.gz /path/wes/wes_normal_sample1_R2.fastq.gz
batch1 sample1 tumor wes /path/wes/wes_normal_sample1_R1.fastq.gz /path/wes/wes_normal_sample1_R2.fastq.gz
batch1 sample1 rna rna /path/rna/rna_tumor_sample1_R1.fastq.gz /path/rna/rna_tumor_sample1_R2.fastq.gz
batch3 sample3 germline wes /path/wes/wes_normal_sample3_R1.fastq.gz /path/wes/wes_normal_sample3_R2.fastq.gz
batch3 sample3 tumor wes /path/wes/wes_tumor_sample3_R1.fastq.gz /path/wes/wes_tumor_sample3_R2.fastq.gz

I’ve only included two samples in this example to keep it simple. Now our code looks way simpler to handle, because all we need to do is branch on tissue and sequencing type:

workflow {
    Channel.fromPath("input.csv")
        .splitCsv(header: true)
        .branch { it ->
            rna: it.tissue == "rna" && it.sequencing_type == "rna"
            germline: it.tissue == "germline" && it.sequencing_type == "wes"
            tumor: it.tissue == "tumor" && it.sequencing_type == "wes"
            other: true
        }
        .set { input_ch }

    input_ch.rna.view { "${it.batch}:${it.timepoint} has an RNA sample!" }
    input_ch.germline.view { "${it.batch}:${it.timepoint} has a germline sample!" }
    input_ch.tumor.view { "${it.batch}:${it.timepoint} has a tumor sample!" }
    input_ch.other.view { "${it.batch}:${it.timepoint} is something else!" }
}

Now, the germline, tumor and RNA-Seq samples can be analysed separately.

If we want to analyse samples together, we can rejoin them using join or combine followed by filter.

workflow {
    Channel.fromPath("input.csv")
        .splitCsv(header: true)
        // Let's create a map as the first value and put the fastqs later in the tuple:
        .map { it ->
            [
                it.subMap("batch", "timepoint", "tissue", "sequencing_type"),
                [
                    file(it.fastq_1),
                    file(it.fastq_2)
                ]
            ]
        }
        .branch { meta, fastq ->
            rna: meta.tissue == "rna" && meta.sequencing_type == "rna"
            germline: meta.tissue == "germline" && meta.sequencing_type == "wes"
            tumor: meta.tissue == "tumor" && meta.sequencing_type == "wes"
            other: true
        }
        .set { input_ch }

    // Combine germline and tumor samples based on ID
    input_ch.germline
        // Mix all samples using combine
        .combine(input_ch.tumor)
        // Filter to only the ones where batch and timepoint are the same
        .filter { germline_meta, germline_fastq, tumor_meta, tumor_fastq ->
            germline_meta.batch == tumor_meta.batch && germline_meta.timepoint == tumor_meta.timepoint
        }
        .view() // Should see a channel of [germline_info, germline_fastqs, tumor_info, tumor_fastqs]
}

I’ve attached the CSV files I’ve used here.
input.csv (589 Bytes)
original.csv (1.3 KB)

@Adam_Talbot
Gosh, this is difficult. Thank you for your detailed reply.

Few things:

  1. I used code until branch (without merge). Please see below how I use it.

My concern - when I send to a process I’ve to accept them using all the values, in this instance, tissue, sequencing type which I think is extraneous. How can I skip tissue and seq_type in the input?
If I do not accept them I get error for cardinality.

process wes_test {

    input:
    tuple val(normal_batch),val(normal_timepoint),val(normal_tissue),val(normal_seq_type),val(normal_read1),val(normal_read2)
    tuple val(tumor_batch),val(tumor_timepoint),val(tumor_tissue),val(tumor_seq_type),val(tumor_read1),val(tumor_read2)
    output: stdout
    script:
    """
echo ${normal_batch}, ${normal_timepoint}, ${normal_read1}, ${normal_read2},${normal_seq_type},${normal_tissue}
echo ${tumor_batch}, ${tumor_timepoint}, ${tumor_read1}, ${tumor_read2},${tumor_seq_type},${tumor_tissue}
    """ 
}

process test {

    input:
    tuple val(batch),val(timepoint),val(tissue),val(seq_type),val(read1),val(read2)
    output: stdout
    script:
    """
echo ${batch}, ${timepoint}, ${read1}, ${read2},${seq_type},${tissue}
    """ 
}

workflow {

Channel.fromPath("long_format_data.csv")
        .splitCsv(header: true)
        .branch { it ->
            rna: it.tissue == "rna" && it.sequencing_type == "rna"
            normal: it.tissue == "normal" && it.sequencing_type == "wes"
            tumor: it.tissue == "tumor" && it.sequencing_type == "wes"
        }
        .set { input_ch }
    
        test(input_ch.rna).view()
    wes_test(input_ch.normal,input_ch.tumor).view()        
}
  1. Can you please help me fix the error in my code (original post)? I’d still like to print the values.

Thank you.

My example 2 above should have fixed your error.

Re tissue and sequencing type, these are just examples you can put anything in that CSV. If you don’t want that information, I would just remove them from the input table.

If you wanted to keep them in the channel but ignore them in the process, you could leave them in the input tuple but not use them in the shell block. They aren’t causing any harm in there.

It’s still not clear what you are trying to achieve here. Do you want to analyse the normal and tumour samples together and RNA-Seq data separately? Are you planning on combining the results?

@Adam_Talbot Thank you for your response.
I’ve trios: normal, tumor and RNA.
Not all the the time RNA would be available. I don’t have normal for tumor, just the tumor.

I run normal and tumor together for WES, and RNA separately.

I don’t know how WES and RNA are combined later, but they are.

So the first step is to get WES: normal-tumor going, and RNA going.

This is exactly what I do in my second example. In this code I do the following:

  • Combine the germline and tumour sample channels (all-by-all)
  • Filter to only the ones where the sample matches
  • We now have a channel of germline and tumour data, ready for analysis
    // Combine germline and tumor samples based on ID
    input_ch.germline
        // Mix all samples using combine
        .combine(input_ch.tumor)
        // Filter to only the ones where batch and timepoint are the same
        .filter { germline_meta, germline_fastq, tumor_meta, tumor_fastq ->
            ( germline_meta.batch == tumor_meta.batch ) && ( germline_meta.timepoint == tumor_meta.timepoint )
        }
        .dump(tag: 'analysis', pretty: true)

Each output will now contain a tuple of germline_meta, germline_fastq, tumour_meta, tumour_fastq. Or it will look like this:

[
    {
        "batch": "batch1",
        "timepoint": "sample1",
        "tissue": "germline",
        "sequencing_type": "wes"
    },
    [
        "/path/wes/wes_normal_sample1_R1.fastq.gz",
        "/path/wes/wes_normal_sample1_R2.fastq.gz"
    ],
    {
        "batch": "batch1",
        "timepoint": "sample1",
        "tissue": "tumor",
        "sequencing_type": "wes"
    },
    [
        "/path/wes/wes_normal_sample1_R1.fastq.gz",
        "/path/wes/wes_normal_sample1_R2.fastq.gz"
    ]
]
[
    {
        "batch": "batch3",
        "timepoint": "sample3",
        "tissue": "germline",
        "sequencing_type": "wes"
    },
    [
        "/path/wes/wes_normal_sample3_R1.fastq.gz",
        "/path/wes/wes_normal_sample3_R2.fastq.gz"
    ],
    {
        "batch": "batch3",
        "timepoint": "sample3",
        "tissue": "tumor",
        "sequencing_type": "wes"
    },
    [
        "/path/wes/wes_tumor_sample3_R1.fastq.gz",
        "/path/wes/wes_tumor_sample3_R2.fastq.gz"
    ]
]

You can use this as an input in a process:

input:
    tuple val(germline_meta), path(germline_fastq), val(tumour_meta), path(tumour_fastq)

@Adam_Talbot
Thank you for your replies.

I think I need to use your merge/filter strategy, seems when I resume there’s cross happening with files randomly.

I cannot get the following working:

   input_ch.normal
        // Mix all samples using combine
        .combine(input_ch.tumor)
        // Filter to only the ones where batch and timepoint are the same
        .filter { normal_meta, normal_fastq, tumor_meta, tumor_fastq ->
            (normal_meta.batch == tumor_meta.batch) && (normal_meta.timepoint == tumor_meta.timepoint)
        }
        .dump(tag: 'analysis', pretty: true)

I get error:

ERROR ~ Invalid method invocation call with arguments: [[batch:SEMA-MM-001, timepoint:MM-3309-T-01, tissue:normal, sequencing_type:wes, fastq_1:/data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-N-01-01_L001_R1_001.fastq.gz, fastq_2:/data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-N-01-01_L001_R2_001.fastq.gz], [batch:SEMA-MM-001, timepoint:MM-3309-T-01, tissue:tumor, sequencing_type:wes, fastq_1:/data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-T-01-01_L001_R1_001.fastq.gz, fastq_2:/data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-T-01-01_L001ERROR ~ Invalid method invocation call with arguments: [[batch:SEMA-MM-001, timepoint:MM-3309-T-01, tissue:normal, sequencing_type:wes, fastq_1:/data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-N-01-01_L001_R1_001.fastq.gz, fastq_2:/data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-N-01-01_L001_R2_001.fastq.gz], [batch:SEMA-MM-001, timepoint:MM-3309-T-01, tissue:tumor, sequencing_type:wes, fastq_1:/data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-T-01-01_L001_R1_001.fastq.gz, fastq_2:/data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-3309-DNA-T-01-01_L001_R2_001.fastq.gz]] (java.util.LinkedList) on _closure9 type

– Check ‘.nextflow.log’ file for details

I am on N E X T F L O W ~ version 23.10.0
Let me know if any other details are needed.
Thank you again~

There appears to be a syntax error in your code. I’m afraid there’s only so far I can go with this without you supplying the full code base.

The answer to the initial question is you must separate your samples by channels. Filter them, separate, branch. Get each sample in a separate channel then just pass that channel into the relevant process. This can be a tricky concept for people unfamiliar with functional programming, but once you get it Nextflow will become fluid and easy.

@Adam_Talbot
Please see below the full code base:

Channel.fromPath("long_format_data.csv")
        .splitCsv(header: true).map { it ->
            [
                it.subMap("batch", "timepoint", "tissue", "sequencing_type"),
                [
                    file(it.fastq_1),
                    file(it.fastq_2)
                ]
            ]
        }
        .branch { meta, fastq ->
            rna: meta.tissue == "rna" && meta.sequencing_type == "rna"
            germline: meta.tissue == "normal" && meta.sequencing_type == "wes"
            tumor: meta.tissue == "tumor" && meta.sequencing_type == "wes"
            other: true
        }
        .set { input_ch }

input_ch.germline
        // Mix all samples using combine
        .combine(input_ch.tumor)
        // Filter to only the ones where batch and timepoint are the same
        .filter { germline_meta, germline_fastq, tumor_meta, tumor_fastq ->
            ( germline_meta.batch == tumor_meta.batch ) && ( germline_meta.timepoint == tumor_meta.timepoint )
        }
        .dump(tag: 'analysis', pretty: true)

I’m using attached file as an input.
long_format_data.csv (11.8 KB)

It works without errors at this time.
However, I cannot see/print anything despite using dump; view works.

Second question, how do I send them to a process after filter/combine? I get error for cardinality:

FASTP(input_ch.tumor, input_ch.germline)


process FASTP {
	conda '/data1/software/miniconda/envs/MMRADAR/'
	maxForks 5
	debug true
	errorStrategy 'retry'
    maxRetries 2
 label 'low_mem'

	publishDir path: "${params.outdir}/${batch}/${timepoint}/WES/primary/fastp/normal/", mode: 'copy', pattern: '*_N*'
    publishDir path: "${params.outdir}/${batch}/${timepoint}/WES/primary/fastp/tumor/", mode: 'copy', pattern: '*_T*'

    input:	
    tuple val(batch),val(timepoint),val(tissue),val(seq_type),path(tumor_reads,stageAs:'fastp_reads/*')
	//tuple val(batch),val(timepoint),val(tissue),val(seq_type),path(tumor_read1,stageAs:'fastp_reads/*'),path(tumor_read2,stageAs:'fastp_reads/*')
	tuple val(batch),val(timepoint),val(tissue),val(seq_type),path(normal_reads,stageAs:'fastp_reads/*')
	//tuple val(batch),val(timepoint),val(tissue),val(seq_type),path(normal_read1,stageAs:'fastp_reads/*'),path(normal_read2,stageAs:'fastp_reads/*')

	output:
	tuple val(batch),val(patient_id_tumor), val(timepoint), path("${patient_id_tumor}_trim_{1,2}.fq.gz"), emit: reads_tumor
	path("${patient_id_tumor}.fastp.json"), emit: json_tumor
	path("${patient_id_tumor}.fastp.html"), emit: html_tumor
	
	tuple val(batch),val(patient_id_normal), val(timepoint),path("${patient_id_normal}_trim_{1,2}.fq.gz"), emit: reads_normal
	path("${patient_id_normal}.fastp.json"), emit: json_normal
	path("${patient_id_normal}.fastp.html"), emit: html_normal
	
    script:
	patient_id_normal=timepoint+"_N"
	patient_id_tumor=timepoint+"_T"
	def(r1_normal, r2_normal)=normal_reads
	def(r1_tumor,r2_tumor)=tumor_reads

"""

fastp  --in1 "${r1_tumor}" --in2 "${r2_tumor}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${patient_id_tumor}_trim_1.fq.gz" \
--out2 "${patient_id_tumor}_trim_2.fq.gz" --json "${patient_id_tumor}.fastp.json" \
--html "${patient_id_tumor}.fastp.html" --thread 10

fastp  --in1 "${r1_normal}" --in2 "${r2_normal}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${patient_id_normal}_trim_1.fq.gz" \
--out2 "${patient_id_normal}_trim_2.fq.gz" --json "${patient_id_normal}.fastp.json" \
--html "${patient_id_normal}.fastp.html" --thread 10 

   """
}

workflow.onComplete { 
	log.info ( workflow.success ? "completed fastp primary WES!" : "Oops .. something went wrong in fastp primary WES")
}

Error:


WARN: Input tuple does not match input set cardinality declared by process `wes:FASTP` -- offending value: [[batch:SEMA-MM-001, timepoint:MM-0473-T-02, tissue:tumor, sequencing_type:wes], [/data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-0473-DNA-T-02-01_L001_R1_001.fastq.gz, /data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-0473-DNA-T-02-01_L001_R2_001.fastq.gz]]
ERROR ~ Error executing process > 'wes:FASTP (7)'

Caused by:
  Path value cannot be null