How to iterate over groupTuple data for processes?

Hello there,

I’ve a data as below (without header):

Patient_ID Sample name DNA_N_R1 DNA_N_R2 DNA_T_R1 DNA_T_R2 RNA_T_R1 RNA_T_R2
patient1 patient1-3 DNA1_N_R1 DNA1_N_R2 DNA_T_T01_R1 DNA_T_T01_R2 RNA_T_T01_R1 RNA_T_T01_R2
patient1 patient1-4 DNA1_N_R1 DNA1_N_R2 DNA_T_T02_R1 DNA_T_T02_R2 RNA_T_T02_R1 RNA_T_T02_R2
patient1 patient1-5 DNA1_N_R1 DNA1_N_R2 DNA_T_T03_R1 DNA_T_T03_R2 RNA_T_T03_R1 RNA_T_T03_R3
patient2 patient2-5 DNA2_N_R1 DNA2_N_R2 DNA2_T_T03_R1 DNA2_T_T03_R2 RNA2_T_T03_R1 RNA2_T_T03_R3

Each of the columns from 3rd are a DNA/RNA compressed raw FASTQ (sequenced) files. For brevity I’ve not put .fastq.gz, however, the idea is to have paired files for eventual data processing.

Goal: I’d like to run per-patient, per row processes.

Steps/code:

    Channel.fromPath(file("input_timestamp.csv"))
    .splitCsv(sep: ',')
    .groupTuple().map { row -> 
            // Extract relevant information
            def patient_info = row[0]
            def sample_info=row[1]
            def normal_reads = tuple((row[2]),(row[3]))
            def tumor_reads = tuple((row[4]), (row[5]))
            def rna_reads = tuple((row[6]), (row[7]))        

            // Return a map with the processed information
            return [patient: patient_info, sample:sample_info,normal: normal_reads, tumor: tumor_reads, rna: rna_reads ]
        }
        .set { samples_grouped }

Now, how do I do what? How do I iterate over each row?

  1. When I print these I think I doubt the tuple/pair/print. For e.g. when I print the above samples_grouped as samples_grouped.view { "$it" } I get output as:

[patient:patient1, sample:[sample1-1, sample1-2], normal:[[DNA_N_R1, DNA_N_R1], [DNA_N_R2, DNA_N_R2]], tumor:[[DNA_T_T01_R1, DNA_T_T02_R1], [DNA_T_R2, DNA_T_T02_R2]], rna:[[RNA_T_01_R1, RNA_T_02_R1], [RNA_T_01_R2, RNA_T_02_R2]]]

[patient:patient2, sample:[sample2], normal:[[DNA_N_R1], [DNA1_N_R2]], tumor:[[DNA_T_T03_R1], [DNA_T_T03_R2]], rna:[[RNA1_T_01_R1], [RNA_T_01_R2]]]

I mean, if I look at row 1 for patient1, the normal is [DNA_N_R1, DNA_N_R1]
Is this correct? How do I access normal DNA_N_R1 with DNA_N_R2?

Thank you in advance.

The short answer is that you don’t need the groupTuple. You can keep the patient information in the channel element so that you’ll always know what patient is a sample from. The only difference from my input CSV file compared to yours is that I added the path to the file (/foo/bar/DNA_N_R2.fast.gz instead of DNA_N_R2). My Nextflow script file (without groupTuple):

process PROCESS_SAMPLES {
  debug true

  input:
  tuple val(patient_id),
        val(sample_id),
        path(normal_reads),
        path(tumor_reads),
        path(rna_reads)

  output:
  stdout

  script:
  """
  echo ---------------------------------------------------
  echo Doing something on ${sample_id} from ${patient_id}
  echo Normal reads: 1- ${normal_reads[0]} 2- ${normal_reads[1]}
  echo Tumor reads: 1- ${tumor_reads[0]} 2- ${tumor_reads[1]}
  echo RNA reads: 1- ${rna_reads[0]} 2- ${rna_reads[1]}
  """
}

workflow {
  Channel
    .fromPath(file("input.csv"))
    .splitCsv(sep: ',')
    .map { row ->
      // Extract relevant information
      def patient_info = row[0]
      def sample_info=row[1]
      def normal_reads = tuple((row[2]),(row[3]))
      def tumor_reads = tuple((row[4]), (row[5]))
      def rna_reads = tuple((row[6]), (row[7]))

      // Return a map with the processed information
      return [patient: patient_info, sample:sample_info, normal: normal_reads, tumor: tumor_reads, rna: rna_reads]
    }
    .set { samples }
  PROCESS_SAMPLES(samples)
}

The output:

Two sample working directories, to show you the files are being correctly staged:

1 Like

@mribeirodantas

Thank you for your reply.

How do I have nextflow accept path even if it doesn’t exist? I get error as:

Caused by:
Not a valid path value type: nextflow.util.ArrayBag ([/data1/daphni2/clinicalfq/WES/sema4/clinical/CLN-22087270-DNA-N-0_HJFJTDSX3_S22_L002_R2.fastq.gz]

or,

Caused by:
Not a valid path value type: nextflow.util.ArrayBag ([NA])

There will be times when patient’s either RNA or WES will only be available.

For e.g., there will be analysis where there is no DNA (normal and tumor) but only RNA, so it will be NA. I get error when file are not present using path.

My code is structured as:
1- user provides a CSV file
2- user select which analysis to run: WES or RNA. user can select both as well, in that case I run DNA and RNA workflows both.
based on selection, I used to pass samples.rna or samples.normal, samples.tumor.

How do I achieve with your code/snippet?

How do I make nextflow accept path for files that is not present?
Or, how do I only pass: patient, sample, normal-reads, tumor-reads and/or rna-reads based on user choice.

The path did not exist in my example. I have no /foo/bar in my machine :sweat_smile: (and I got no errors)

Can you share a minimal reproducible example so that I can try to reproduce your errors here?

Also, if there are specific situations (such as NA) in your problem, you have to handle them. Feel free to open a new topic if you think it strays too much from this one.

@mribeirodantas

I’m using: nextflow version 23.10.0


process DNA {

    input: tuple val(patient_id), val(sample_id), path(normal_reads), path(tumor_reads), path(rna_reads)
    output: stdout

    script:
  """
  echo ---------------------------------------------------
  echo Doing something on ${sample_id} from ${patient_id}
  echo Normal reads: 1- ${normal_reads[0]} 2- ${normal_reads[1]}
  echo Tumor reads: 1- ${tumor_reads[0]} 2- ${tumor_reads[1]}
  echo RNA reads: 1- ${rna_reads[0]} 2- ${rna_reads[1]}
  """
}

workflow {

Channel.fromPath(file("input_timestamp.csv"))
        .splitCsv(sep: ',')
        .map { row -> 
            // Extract relevant information
            def patient_info = row[0]
            def sample_info=row[1]
            def normal_reads = tuple((row[2]),(row[3]))
            def tumor_reads = tuple((row[4]), (row[5]))
            def rna_reads = tuple((row[6]), (row[7]))
            
            // Return a map with the processed information
            return [patient: patient_info, sample:sample_info,normal: normal_reads, tumor: tumor_reads, rna: rna_reads ]
        }
        .set { samples }

DNA(samples).view()
}

Attached is error screenshot.

I used following data in a CSV file:

patient1,sample1-1,DNA_N_R1,DNA_N_R2,DNA_T_T01_R1,DNA_T_R2,RNA_T_01_R1,RNA_T_01_R2
patient1,sample1-2,DNA_N_R1,DNA_N_R2,DNA_T_T02_R1,DNA_T_T02_R2,RNA_T_02_R1,RNA_T_02_R2
patient2,sample2,DNA1_N_R1,DNA1_N_R2,DNA_T_T03_R1,DNA_T_T03_R2,RNA1_T_01_R1,RNA_T_01_R2

You must provide a path. You’re telling Nextflow this is a path, but providing simply a filename string.

@mribeirodantas
I see. Thank you for identifying that.

My follow up questions - 1) how to decide to when use multiMap v/s map?
2) instead of unpacking the map in a process, would you suggest to use some for, each sort of thing to iterate over map?

Thank you again.

Let’s say you have a channel, want to apply a function to every element of this channel, and have a single channel as output/result. That’s a very common scenario and a common use case for the map channel operator.

Channel
  .of(1, 2, 3, 4)
  .map { it -> it*it }
  .view()

Output:

multiMap, on the other hand, is a channel operator that allows you to output a channel with channels inside. See the example below.

Channel
  .of(1, 2, 3, 4)
  .multiMap {
    squared: it * it
    one_more: it + 1
  }
  .one_more
  .view()

As you can see close to the end of the script, I picked .one_more to view, but I could have picked .squared. It’s important to be aware of this structure, as if you provide a channel of channels to a process, you’ll run into trouble.

As a reference, you can find more info about channel operators here. The foundational training material also covers channel operators here.

In Nextflow, it’s more natural to follow a functional paradigm using channel operators such as map. I have seen very few useful cases of for, for example. It’s usually not what you’re looking for.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.