Running workflow on multiple samples

Hi there, I’m quite new to Nextflow and I was wondering what is the best approach for running the same workflow for multiple samples. Should I run my workflow independently for each sample (for instance using a bash script to orchestrate it), or should I let Nextflow read all file pairs for all my samples to process?
A bit of context… I have Illumina paired-end reads as input files for multiple samples. Initially I was creating a Channel.fromFilePairs where file pairs for each sample were being read, and being passed onto different processes (FastQC, FastP, MultiQC…). I obtained coherent results up to the point where I created a new process (lets call it collectInfo) where I intended to parse previous process results, saving them onto a json file.

workflow { 
    read_pair_ch = Channel.fromFilePairs("${rawReadsDir}/*r{1,2}.fq.gz", type: 'file')

    // Run quality checks
    runPreFastQC(read_pair_ch)
    runMultiQC(runPreFastQC.out.pFQC_files.collect(), runPreFastQC.out.sampleID)
    runFastPQT(read_pair_ch)
    runFastPMultiQC(runFastPQT.out.pFP_files.collect(), runFastPQT.out.sampleID)
    unzip_pFQC_files(runPreFastQC.out.sampleID, runPreFastQC.out.pFQC_files)

    // Create summary json file
    collectInfo(params.projectName, unzip_pFQC_files.out.sampleID, unzip_pFQC_files.out.pFQC_files_unzipped, runFastPQT.out.pFP_files.collect())

}

When running this workflow, Nextflow would prompt the typical “no such file or directory” error because, as the previous processes were being run for different samples, output flows were being mixed between samples for the collectInfo process. I solved this problem by checking if the file to be parsed exists, but I don’t feel this is entirely right. I am basically checking if a file exists, knowing it is impossible for it to exist (the file path would be something like this: /home/process1output/sample1/sample2_process1.html, and the correct one like this: /home/process1output/sample1/sample1_process1.html)

Is there a way to control which samples are being run for which process, and if possible, run each process for each sample at the same time? I may have misunderstood the point of all this, as, the first and second block of processes do not depend on each other.

Anyways, I think this might be more a philosophical question… Thank you in advance!

Yes, people use Nextflow to analyse many thousands of samples at once. Without seeing the exact code it’s very hard to know exactly what’s happening but something is wrong with your code.

When running this workflow, Nextflow would prompt the typical “no such file or directory” error because, as the previous processes were being run for different samples, output flows were being mixed between samples for the collectInfo process.

This shouldn’t happen, pretty much ever.

I’m guessing you’re being tripped up by the *.out.sampleID channel, which is a queue channel but you are just using the first element. You should make sure to convert it to a value channel. Check out the documentation around this here:

https://www.nextflow.io/docs/latest/process.html#process-multiple-input-channels

Perhaps rather than outputting two channels and adding them to a process as separate inputs, you should output a tuple?

e.g., the output of runPreFastQC could be:

output:
    tuple val(sampleID), path("*.zip"), emit: pFQC_files

and the input to runMultiQC could be:

input:
     tuple val(sampleID), path(zips)

Then you can connect them like so:

workflow {
    read_pair_ch = Channel.fromFilePairs("${rawReadsDir}/*r{1,2}.fq.gz", type: 'file')
    runPreFastQC(read_pair_ch)
    runMultiQC(runPreFastQC.out.pFQC_files.collect())
}

Thank you for your answer… so… I’m not having trouble with that part specifically. Ill try to explain with some examples.
I have 3 processes:

  1. runFastQC
  2. runFastP
  3. CollectInfo

The problem I’m facing is that process 1 and 2 are not dependent from one another and so, each process is being run in parallel for different samples randomly. So for instance, process 1 is being run for sample1 and process 2 is being run for sample2. Then, when these two outputs are needed in process 3, as the processes have been run for two different samples, the outputs correspond to different samples, and that’s why they are getting mixed up.

I kind of solved it using:

collectInfo(unzip_pFQC_files.out.sampleID, unzip_pFQC_files.out.pFQC_files_unzipped.mix(runFastPQT.out.pFP_files).collect())

where the different files from both processes and all samples are being mixed and collected so I can access all files for all samples. I still don’t think this is the way to do it as I am still not able to parse information for all samples at the same time and output the results into a unique summary file.

process collectInfo looks like this:

process collectInfo {
    conda "${params.conda_env}"

    cpus 4

    tag { "${params.projectName}.collectInfo.${sampleID}" }

    publishDir "${outDir}/05-collectInfo", mode: 'copy', overwrite: 'false'

    input:
    val (sampleID)
    path (pFQC_files_unzipped)
    path (pFP_files)

    output:
    path ("${projectName}_${sampleID}_summary.json"), emit: samp_summary
    
    """
    #!/usr/bin/env python  
    with open ($sampleID_pFQC_files_unzipped/$sampleID_fastqc.txt) as fastqc:
        do something
    with open ($sampleID_pFP_files_unzipped/$sampleID_fastp.txt) as fastqc:
        do something else
    """

I will try what you said, but I still think the sampleID for pFQC_files will be different from sampleID for pFP_files and I’ll have the same error.

Pd: If you happen to have an example where different process outputs are being parsed for all samples in the same process, please share it with me!

The secret sauce is to use tuples. Rather than producing lots of output channels with a single item in them, produce outputs which contain the sample ID and the output in a tuple, like so:

tuple val(sampleId), path("*.txt")

Now, the first element can act as a joining key for an operator like join or combine.

Here’s an example where I:

  • Create a channel of 3 ‘samples’: A, B and C
  • Run two processes on every sample, ECHO_1 and ECHO_2. Both create a single output file.
  • Capture the outputs of both processes as a tuple
  • Rejoin the outputs based on the sample ID.
  • Use the rejoined output as an input for the COMBINE process.
process ECHO_1 {
    input:
        val sample
    output:
        tuple val(sample), path("${sample}_echo_1.txt"), emit: output

    script:
    """
    touch ${sample}_echo_1.txt
    """
}

process ECHO_2 {
    input:
        val sample
    output:
        tuple val(sample), path("${sample}_echo_2.txt"), emit: output

    script:
    """
    touch ${sample}_echo_2.txt
    """
}

process COMBINE {
    input:
        tuple val(sample), path(echo1), path(echo2)
    output:
        tuple val(sample), path("${sample}_combined.txt"), emit: output

    script:
    """
    cat ${echo1} ${echo2} > ${sample}_combined.txt
    """
}

workflow {
    input_ch = Channel.of("A", "B", "C")
    echo_1_output = ECHO_1(input_ch)
    echo_2_output = ECHO_2(input_ch)

    // Rejoin outputs based on sample ID
    combined_input = echo_1_output.join(echo_2_output)
    combined_output = COMBINE(combined_input)
}

The problem I’m facing is that process 1 and 2 are not dependent from one another and so, each process is being run in parallel for different samples randomly. So for instance, process 1 is being run for sample1 and process 2 is being run for sample2.

Yes, this is by design. Nextflow is asynchronous and always parallelised.