Inability to parallelize sequential processes

I am trying to put together a simple fastq down-sampling pipeline. I have the first step using seqtk to down-sample fastqs, then pass to a second process to gzip all the files. Additionally, I am passing multiple down-sampling levels and seeds. Here are the processes:

process DOWNSAMPLE {
    tag "Downsample on ${sample_id} with sub level ${sub_level} using seed ${seed}"
    
    input:
    tuple val(sample_id), path(file), val(sub_level), val(seed)

    output:
    tuple val(sample_id), path("${file.simpleName}.fastq"), val(sub_level), val(seed)

    script:
    """
    seqtk sample -s${seed} ${file} ${sub_level} > ${file.simpleName}.fastq
    """
}

process GZIP {
    tag "Gzip files for ${sample_id}"

    publishDir(
        path: "${params.outdir}/seed_${seed}/sub_${sub_level}/Sample_${sample_id}",
        mode: "COPY"
    )

    input:
    tuple val(sample_id), path(file), val(sub_level), val(seed)

    output:
    tuple val(sample_id), path("${file.baseName}.fastq.gz"), val(sub_level), val(seed)

    script:
    """
    gzip -c ${file} > ${file.baseName}.fastq.gz
    """
}

and a simplified workflow:

workflow {
    // Collect the files and flatten to allow for individual file processing
    files = Channel
            .fromFilePairs("${params.reads}//Project/*/*R{1,2}*.fastq.gz") { file -> 
                def matcher = file.name =~ /(RD_\d+_\d+)/
                matcher ? matcher[0][1] : null }
            .flatMap { sample_id, files ->
            // Emit each file separately paired with the sample ID
            files.collect { file -> [sample_id, file] } 
            }

    // Begin pairing the files with the sub levels and seeds
    downsample_files = files
        .flatMap { sample_id, file -> 
            params.sub.collect { sub_level -> tuple(sample_id, file, sub_level) } }
        .flatMap { sample_id, file, sub_level -> 
            params.seed.collect { seed -> tuple(sample_id, file, sub_level, seed) } }

    // Create a new channel to hold the downsampled files
    downsampled_ch = DOWNSAMPLE(downsample_files)
    
    // Run GZIP in parallel with DOWNSAMPLE
    GZIP(downsampled_ch)
}

While the DOWNSAMPLE process runs fine, once the files are passed to the GZIP process, the executor never begins (until all the other DOWNSAMPLE processes complete). For example:

executor >  local (76)
[e8/5b6547] process > DOWNSAMPLE (Downsample on RD_24_250047 with sub level 8000000 using seed 10) [  3%] 75 of 2304
[-        ] process > GZIP                                                                         [  0%] 0 of 75

For context, I am running this on a remote EC2 instance with moderate compute (16 vcpu, 64gb).

What am I doing wrong?

Got about halfway through writing a couple of ideas to test this as a “bug” :laughing:

[e8/5b6547] process > DOWNSAMPLE (Downsample on RD_24_250047 with sub level 8000000 using seed 10) [  3%] 75 of 2304
[-        ] process > GZIP                                                                         [  0%] 0 of 75

The 0 of 75 is saying that Nextflow has submitted those jobs, they’re just at the end of the queue behind the 2000+ Downsampling jobs. They’ll get there :laughing:

I don’t think maxForks does what you’re looking for here but you could try it.

I think you want executor.queueSize

Which is n/a by default on local. Try setting it to 50 in your nextflow.config and you should see more “parallel” execution.

1 Like

You were right, I just wasn’t being patient enough. I will try to play with the executor.queueSize and see if that can change the behavior. Thanks!!