How to send combined channel to a process || facing cardinality issue

Hi developers,

I’m using WES and RNA data.
There are several information: batch, timepoint, tissue, sequencing type associated with input FASTQ files.

I use the following code to read the CSV (attached) to aggregate different data types:

Channel.fromPath("long_format_data.csv")
        .splitCsv(header: true).map { it ->
            [
                it.subMap("batch", "timepoint", "tissue", "sequencing_type"),
                [
                    file(it.fastq_1),
                    file(it.fastq_2)
                ]
            ]
        }
        .branch { meta, fastq ->
            rna: meta.tissue == "rna" && meta.sequencing_type == "rna"
            germline: meta.tissue == "normal" && meta.sequencing_type == "wes"
            tumor: meta.tissue == "tumor" && meta.sequencing_type == "wes"
            other: true
        }
        .set { input_ch }

input_ch.germline
        // Mix all samples using combine
        .combine(input_ch.tumor)
        // Filter to only the ones where batch and timepoint are the same
        .filter { germline_meta, germline_fastq, tumor_meta, tumor_fastq ->
            ( germline_meta.batch == tumor_meta.batch ) && ( germline_meta.timepoint == tumor_meta.timepoint )
        }

It’s fine, however, I do not know how to send/accept this in a process.
I get errors for carinality.

Please see below process:


process FASTP {
	conda '/data1/software/miniconda/envs/MMRADAR/'
	maxForks 5
	debug true
	errorStrategy 'retry'
    maxRetries 2
 label 'low_mem'

	publishDir path: "${params.outdir}/${batch}/${timepoint}/WES/primary/fastp/normal/", mode: 'copy', pattern: '*_N*'
    publishDir path: "${params.outdir}/${batch}/${timepoint}/WES/primary/fastp/tumor/", mode: 'copy', pattern: '*_T*'

    input:	
    //tuple val(batch),val(timepoint),val(tissue),val(seq_type),path(tumor_reads,stageAs:'fastp_reads/*')
	tuple val(batch),val(timepoint),val(tissue),val(seq_type),path(tumor_read1,stageAs:'fastp_reads/*'),path(tumor_read2,stageAs:'fastp_reads/*')
	//tuple val(batch),val(timepoint),val(tissue),val(seq_type),path(normal_reads,stageAs:'fastp_reads/*')
	tuple val(batch),val(timepoint),val(tissue),val(seq_type),path(normal_read1,stageAs:'fastp_reads/*'),path(normal_read2,stageAs:'fastp_reads/*')

	output:
	tuple val(batch),val(patient_id_tumor), val(timepoint), path("${patient_id_tumor}_trim_{1,2}.fq.gz"), emit: reads_tumor
	path("${patient_id_tumor}.fastp.json"), emit: json_tumor
	path("${patient_id_tumor}.fastp.html"), emit: html_tumor
	
	tuple val(batch),val(patient_id_normal), val(timepoint),path("${patient_id_normal}_trim_{1,2}.fq.gz"), emit: reads_normal
	path("${patient_id_normal}.fastp.json"), emit: json_normal
	path("${patient_id_normal}.fastp.html"), emit: html_normal
	
    script:
	patient_id_normal=timepoint+"_N"
	patient_id_tumor=timepoint+"_T"

    """


fastp  --in1 "${tumor_read1}" --in2 "${tumor_read2}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${patient_id_tumor}_trim_1.fq.gz" \
--out2 "${patient_id_tumor}_trim_2.fq.gz" --json "${patient_id_tumor}.fastp.json" \
--html "${patient_id_tumor}.fastp.html" --thread 10

fastp  --in1 "${normal_read1}" --in2 "${normal_read2}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${patient_id_normal}_trim_1.fq.gz" \
--out2 "${patient_id_normal}_trim_2.fq.gz" --json "${patient_id_normal}.fastp.json" \
--html "${patient_id_normal}.fastp.html" --thread 10 

   """
}


Can you please help me with this? I’ve tried with collect/flatMap but nothing solves the issue.

WARN: Input tuple does not match input set cardinality declared by process wes:FASTP – offending value: [[batch:SEMA-MM-001, timepoint:MM-0473-T-02, tissue:tumor, sequencing_type:wes], [/data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-0473-DNA-T-02-01_L001_R1_001.fastq.gz, /data1/raw_data/WES/sema4/SEMA-MM-001DNA/MM-0473-DNA-T-02-01_L001_R2_001.fastq.gz]]
ERROR ~ Error executing process > ‘wes:FASTP (7)’

Caused by:
Path value cannot be null

Please see attached file:
long_format_data.csv (11.8 KB)

I’ve come from another thread to this situation:

@mribeirodantas @Adam_Talbot
Can you please help, look here?

@ewels Can you please help here?

Hey, @complexgenome. Sorry for taking a while to reply to this post.

I recommend you to view() your output channels in order to understand how your process input block should be built. Cardinality issues refer to when you have more or less items in your channel element than what’s defined in the input block of the process receiving this channel. If you have a null value for a path qualifier, you’ll also get the error you’re getting.

Each element in your channel has TWO items: a map and a list. But in your input declaration you have a lot of items :sweat_smile: In the snippet below, as an example, I converted these two items into something similar to what you have in your input block:

process FASTP {
  conda '/data1/software/miniconda/envs/MMRADAR/'
  maxForks 5
  debug true
  errorStrategy 'retry'
  maxRetries 2
  label 'low_mem'
  publishDir path: "${params.outdir}/${batch}/${timepoint}/WES/primary/fastp/normal/", mode: 'copy', pattern: '*_N*'
  publishDir path: "${params.outdir}/${batch}/${timepoint}/WES/primary/fastp/tumor/", mode: 'copy', pattern: '*_T*'

  input:
  tuple val(batch), val(timepoint), val(tissue), val(seq_type), path(tumor_read1), path(tumor_read2)
  tuple val(batch), val(timepoint), val(tissue), val(seq_type), path(normal_read1), path(normal_read2)

  output:
  tuple val(batch),val(patient_id_tumor), val(timepoint), path("${patient_id_tumor}_trim_{1,2}.fq.gz"), emit: reads_tumor
  path("${patient_id_tumor}.fastp.json"), emit: json_tumor
  path("${patient_id_tumor}.fastp.html"), emit: html_tumor

  tuple val(batch),val(patient_id_normal), val(timepoint),path("${patient_id_normal}_trim_{1,2}.fq.gz"), emit: reads_normal
  path("${patient_id_normal}.fastp.json"), emit: json_normal
  path("${patient_id_normal}.fastp.html"), emit: html_normal

  script:
  patient_id_normal=timepoint+"_N"
  patient_id_tumor=timepoint+"_T"
  """
  fastp  --in1 "${tumor_read1}" --in2 "${tumor_read2}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${patient_id_tumor}_trim_1.fq.gz" \
         --out2 "${patient_id_tumor}_trim_2.fq.gz" --json "${patient_id_tumor}.fastp.json" \
         --html "${patient_id_tumor}.fastp.html" --thread 10

  fastp  --in1 "${normal_read1}" --in2 "${normal_read2}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${patient_id_normal}_trim_1.fq.gz" \
         --out2 "${patient_id_normal}_trim_2.fq.gz" --json "${patient_id_normal}.fastp.json" \
         --html "${patient_id_normal}.fastp.html" --thread 10
   """
}

workflow {
  Channel
    .fromPath("long_format_data.csv")
    .splitCsv(header: true).map { it ->
      [
      it.subMap("batch", "timepoint", "tissue", "sequencing_type"),
        [
          file(it.fastq_1),
          file(it.fastq_2)
        ]
      ]
    }
    .branch { meta, fastq ->
      rna: meta.tissue == "rna" && meta.sequencing_type == "rna"
      germline: meta.tissue == "normal" && meta.sequencing_type == "wes"
      tumor: meta.tissue == "tumor" && meta.sequencing_type == "wes"
      other: true
    }
    .set { input_ch }

  input_ch.germline
    // Mix all samples using combine
    .combine(input_ch.tumor)
    // Filter to only the ones where batch and timepoint are the same
    .filter { germline_meta, germline_fastq, tumor_meta, tumor_fastq ->
      ( germline_meta.batch == tumor_meta.batch ) && ( germline_meta.timepoint == tumor_meta.timepoint )
    }
  input_ch.tumor
    .map {
      tumor_map, tumor_list -> [tumor_map['batch'], tumor_map['timepoint'], tumor_map['tissue'], tumor_map['sequencing_type'],
                                tumor_list[0], tumor_list[1]]
    }
    .set { input_ch_tumor }
  input_ch.germline
    .map {
      tumor_map, tumor_list -> [tumor_map['batch'], tumor_map['timepoint'], tumor_map['tissue'], tumor_map['sequencing_type'],
                                tumor_list[0], tumor_list[1]]
    }
  .set { input_ch_germline }
  FASTP(input_ch_tumor, input_ch_germline)
}

Alternatively, you could have something like:

...
  input:
  tuple val(meta), path(reads)
...

  do_something_with ${meta['batch']} ${meta['tissue']} -r1 ${reads[0]} -r2 ${reads[1]}

...