Why are my samples being mixed up in the work dirs?

I have the following few processes, my goal being to take some reads through fastp, assemble them, then align the reads back to the assembly using hisat2. I know you’re supposed to use general names for inputs and outputs, but I started making them name themselves with the “${sampleID}” bit so I could figure out what is going wrong.

process FASTP {
    input:
    tuple val(sampleID), path(read1), path(read2)

    output:
    tuple val("${sampleID}"), path("${sampleID}_R1.fq.gz"), path("${sampleID}_R2.fq.gz"), emit: fastp_reads
    path "fastp.json"
}

process SHOVILL {
    input:
    tuple val(sampleID), path(R1), path(R2)

    output:
    tuple val("${sampleID}"), path("${sampleID}_shovill_res"), emit: assembly
    path "shovill_log.txt"
}

process MAKEINDICES {
    input:
    tuple val(sampleID), path(shovill_res)

    \\ in the work dir, a folder that contains the indices is made and passed on
    output:
    path "${sampleID}_indexfolder", emit: "indexfolder"
    path "${sampleID}_indexfolder/*.ht2"
}

process SELFALIGN {
    input:
    tuple val(sampleID), path(R1), path(R2)
    path "indexfolder"

    output:
    tuple val("${sampleID}"), path("${sampleID}.bam"), path("${sampleID}.bam.bai"), emit: "aln"
    path "hisat2.txt"
}

workflow{
    reads_ch = channel
    .fromFilePairs('../../seqdata/testing/*_L001_R{1,2}_001.fastq.gz', checkIfExists: true, flat: true)

    FASTP(reads_ch)

    SHOVILL(FASTP.out.fastp_reads)

    MAKEINDICES(SHOVILL.out.assembly)

    SELFALIGN(FASTP.out.fastp_reads, MAKEINDICES.out.indexfolder)
}

The way this is configured, the name of the index is the name of the sample, so they should match. When I run with one pair of reads, it works. When I run with 3 pairs of reads, the SELFALIGN process ends up with the wrong index most of the time and rarely works. This makes me think that the processes are not linked correctly. I thought I made it so that SELFALIGN waits for all processes for a single sample to be done, but it seems it’s just taking whichever ones get done first.

Basically, I think something is wrong with the workflow block but I don’t know what. Should I be making channels out of these?

The question is why would you expect them to match in the first place? Have a look at your process invokation here:
SELFALIGN(FASTP.out.fastp_reads, MAKEINDICES.out.indexfolder)
The SELFALIGN process takes two queue channels, which are output channels from previous processes. The entries in these channels appear as they are emitted by the previous process. There is no reason to assume that the FASTP process emits its entries in exactly the same order as the MAKEINDICES process. You should assume that they are effectively random and handle the channels accordingly. That means you have to join corresponding entries in these channels based on a common key before you pass them to SELFALIGN process.

I think I understand this.

And that this is a fix, but I’m not sure how it would be implemented. Could you provide an example? Even if not, thank you for your response.

You have to get a unique key to join entries in your channels. In your case, this can be sampleID. So first you want to modify the output block of MAKEINDICES to emit a channel in the form of tuple val(sampleID), path(indexfolder). Now you have keys available for a join operator:

FASTP.out.fastp_reads
    .join(MAKEINDICES.out.indexfolder, failOnDuplicate:true, failOnMismatch:true)
    .multiMap { sampleID, R1, R2, index ->
         reads: [ sampleID, R1, R2 ]
         index: index
    } .set { ch_to_align }

SELFALIGN(ch_to_align.reads, ch_to_align.index)

Ah, I see. I also found this Nextflow DSL2: how to combine outputs (channels) from multiple processes into input of another process by part of filename? - Stack Overflow. It appears I should do some more reading and testing. Thank you for you input. I was under the impression that things were just intrinsically matched somehow but it appears I was wrong!

@Alexander_Nater provided you with a solution (please, mark his reply as a solution to this question), but I would like to expand on the first point where you said

I want to make sure you understand why this is happening :slight_smile:. I’ll use an obvious case, but that will guarantee you will understand what’s going on. Let’s say you have a text file and you want to copy it to a certain number of locations. The input to your process is the text file and a list of paths where you should copy this file into. Nextflow guarantees it will create tasks following their order in the channel.

Let’s say the first element in your channel is the text file with 200 locations, and the second element in your channel is the same text file with 2 locations. The first one will spin up a task first, followed by the second one, so it’s almost sure that the first one will start first. Depending on where you’re running your pipeline, there’s a chance other variables are taken into consideration, but let’s say that the first task starts running first. However, what’s quicker? copying a file to 200 locations or to 2? Even though the second task started later, it will finish first and because of that it will be the first element in the output channel of this process. If the other process (that does something else) had the first element in the input channel as the first element in the output channel, you will have two different samples being taken as the same sample in your pipeline, because you thought the order would always be respected.

The correct way to handle this is how @Alexander_Nater pointed out. However, I’d like to clarify that there is a process directive created specifically for your original reasoning, which is fair threading. You can read more about the fair process directive in the Nextflow official documentation here. You will lose performance, but Nextflow will make sure the first task finishes first, then the second, and so on.

Thank you for the additional clarification. I see that fair directive but as you said, I think @Alexander_Nater’s solution is the way to go.

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.