Five files in to a process, but only 1 comes out for the next?

So I have this in my workflow

// Merge paired end reads using Fastp
Merge( DownloadFromS3.out.download_to_merge_ch )
//Merge.out.merged.view()
    
MakeCounts( 
  Merge.out.merged, 
  Channel.fromPath(file('barcodes.csv')), 
  Channel.fromPath(file('spacers.csv')) , 
  Channel.fromPath(file("PAMDA.yaml"))
  )

When I view the output of the Merge process, I have 5 output files (within a tuple - just like I expected) from

 process Merge {

    publishDir 'output/fastp', mode: 'copy'
    
    container 'quay.io/biocontainers/fastp:0.23.3--h5f740d0_0' 
    
    cpus 8
    
    input:
    tuple val(study), val(sample), val(dir), path(fq1), path(fq2), val(timepoint)

    output:
    tuple val(study), val(sample), val(dir), path("${sample}.fastq.gz"), val(timepoint),  emit: merged
    path("*_unpaired1.fastq")   , emit: unpaired1
    path("*_unpaired2.fastq")   , emit: unpaired2
    path("*_failed.fastq")      , emit: failed
    path("*.json")              , emit: json_report
    path("*.html")              , emit: fastp_html

    script:
    """
    fastp \
    -i ${fq1} \
    -I ${fq2} \
    --merge \
    --merged_out="${sample}.fastq.gz" \
    --unpaired1="${sample}_unpaired1.fastq" \
    --unpaired2="${sample}_unpaired2.fastq" \
    --failed_out="${sample}_failed.fastq" \
    --json="${sample}.json" \
    --html="${sample}.html" \
    --report_title="Fastp Report for ${sample}" \
    --thread=8
    """
}

But when I run MakeCounts - only one of the tuples makes it through.

here is MakeCounts

 process MakeCounts {

    publishDir 'output', mode: 'copy'
    
    container '668591248114.dkr.ecr.us-east-1.amazonaws.com/pamda:1.0'

    cpus 8

    input:
    tuple val(study), val(sample),  val(dir), path(merged_fastq), val(timepoint)
    path(barcode_file)
    path(spacer_file)
    path(yaml_file)
    
    output:
    tuple val(sample), val(timepoint), path("${sample}_counts.csv"), emit: counts
    
    script:
    """
    python /usr/src/app/make_counts.py \
    --merged_fastq ${merged_fastq} \
    --yaml_file ${yaml_file} \
    --barcode_file ${barcode_file} \
    --spacer_file ${spacer_file} \
    --sample ${sample} \
    --timepoint ${timepoint}
    """
}

Its weird that I get the next one of the five when I run the pipeline again with -resume.

Not sure what to do - maybe add collect() to Merge.out.merged?

1 Like

Hello, @James_Beck ! Welcome to the forum :slight_smile:

Could you please share a minimal reproducible example? It makes it much easier for us to try to reproduce what you’re running into and find a solution. It’s also helpful to share what compute environment you’re using. Based on your container directive, I’d guess it’s AWS Batch, but it’s better to clarify that.

Emulating containers, such as running on Apple Silicon a container image built for Intel, tends to hang, among some unexpected issues. It’s not rare for people to see their pipelines stuck on a sample, kill it, rerun it (with -resume), and see things working. It’s a side effect of this container emulation. I don’t think it’s your situation, as you seem to be running the pipeline on the cloud, but I thought it was worth sharing it here :wink:

My guess is that one of the merged files is going in at a time, but the barcode_file etc channels are being consumed on the first task. So it only runs once. The order of the merged tuples will be random, so when you re-run with -resume it could just pick the next one by chance.

The fix is to use toList() or on barcode_file, spacer_file and yaml_file, as done in the nf-core template for example. This converts those queue channels to value channels, which can be used an unlimited number of times (see the Nextflow docs).

Hope that works / makes sense!

1 Like

Good to know - since these files are used several times through other processes in my pipeline can I set it to list initially and then pass that around - like:

barcodes_list = Channel.fromPath(file('barcodes.csv')).toList()

and then use barcodes_list wherever it is required?

also, one of those files is a yaml file - does toList also work with that as well?

1 Like

Yes, you can, and yes, any file would work fine as toList doesn’t really mind about what type the elements in the channel are.

Actually, you don’t even need the channel factory. The file method is enough and as a single value, it would automatically be converted to a value channel when provided as input to a process.

barcodes_list = file(“barcodes.csv”)

Cool.

but I have something like this:

metadata_ch = Channel.fromPath(file('metadata.csv'))

metadata_ch
    .splitCsv(header: true, sep: ',')
    .map { row -> 
        def study = row.study
        def sample = row.sample
        def dir = row.directory
        def fq1 = row.fastq_1
        def fq2 = row.fastq_2
        def timepoint = row.timepoint
        def s3_path1 = "${dir}/${fq1}"
        def s3_path2 = "${dir}/${fq2}"
        return [study, sample, dir, fq1, fq2, timepoint, s3_path1, s3_path2]
    }
    .set { read_s3_data }

And I don’t seem to be able to run that just on the file(metadata.csv). SO do I need to put it into a channel to mainuplate it?

and for the inputs to processes, would I need to change that up at all if I just used file()

input:
    path(barcode_file)
    path(spacer_file)
    path(yaml_file)

Yes, you’re totally right. It must be in a channel if you want to use channel operators to manipulate it. If you wanted the whole raw file as input to a process, you could still use path as input qualifier. If you want to read more about input qualifiers, I recommend this section of the official documentation (bleeding edge here).

So, if barcodes.csv is just a .csv file that I’m not manipulating with channel operations, then

Channel.fromPath(file('barcodes.csv')).toList()

is a functional equivalent to

barcodes = file('barcodes.csv')

when passing to a process, like

MakeCounts( 
        Merge.out.merged, 
        barcodes,
        spacers,
        yaml, 
        )

Yes, but not only that. You don’t need the file method in the first case.

file('barcodes.csv') is not the same thing as Channel.frompath('barcodes.csv').toList(), but when you call a process and pass file('barcodes.csv') as input, Nextflow will convert this file into a value channel containing this file, which is what channel operators that return a single value such as toList or collect will do. To be precise:

FOO(file('barcodes.csv')) is equivalent to FOO(Channel.fromPath('barcodes.csv').toList()), given that FOO is a Nextflow process.

Besides, to make sure this is very clear to you, you need a value channel in some situations because a process will only generate a task if there are elements to be consumed in all input channels that this process requires. There’s a very nice example discussing this here.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.