Inability to parallelize sequential processes

I am trying to put together a simple fastq down-sampling pipeline. I have the first step using seqtk to down-sample fastqs, then pass to a second process to gzip all the files. Additionally, I am passing multiple down-sampling levels and seeds. Here are the processes:

process DOWNSAMPLE {
    tag "Downsample on ${sample_id} with sub level ${sub_level} using seed ${seed}"
    
    input:
    tuple val(sample_id), path(file), val(sub_level), val(seed)

    output:
    tuple val(sample_id), path("${file.simpleName}.fastq"), val(sub_level), val(seed)

    script:
    """
    seqtk sample -s${seed} ${file} ${sub_level} > ${file.simpleName}.fastq
    """
}

process GZIP {
    tag "Gzip files for ${sample_id}"

    publishDir(
        path: "${params.outdir}/seed_${seed}/sub_${sub_level}/Sample_${sample_id}",
        mode: "COPY"
    )

    input:
    tuple val(sample_id), path(file), val(sub_level), val(seed)

    output:
    tuple val(sample_id), path("${file.baseName}.fastq.gz"), val(sub_level), val(seed)

    script:
    """
    gzip -c ${file} > ${file.baseName}.fastq.gz
    """
}

and a simplified workflow:

workflow {
    // Collect the files and flatten to allow for individual file processing
    files = Channel
            .fromFilePairs("${params.reads}//Project/*/*R{1,2}*.fastq.gz") { file -> 
                def matcher = file.name =~ /(RD_\d+_\d+)/
                matcher ? matcher[0][1] : null }
            .flatMap { sample_id, files ->
            // Emit each file separately paired with the sample ID
            files.collect { file -> [sample_id, file] } 
            }

    // Begin pairing the files with the sub levels and seeds
    downsample_files = files
        .flatMap { sample_id, file -> 
            params.sub.collect { sub_level -> tuple(sample_id, file, sub_level) } }
        .flatMap { sample_id, file, sub_level -> 
            params.seed.collect { seed -> tuple(sample_id, file, sub_level, seed) } }

    // Create a new channel to hold the downsampled files
    downsampled_ch = DOWNSAMPLE(downsample_files)
    
    // Run GZIP in parallel with DOWNSAMPLE
    GZIP(downsampled_ch)
}

While the DOWNSAMPLE process runs fine, once the files are passed to the GZIP process, the executor never begins (until all the other DOWNSAMPLE processes complete). For example:

executor >  local (76)
[e8/5b6547] process > DOWNSAMPLE (Downsample on RD_24_250047 with sub level 8000000 using seed 10) [  3%] 75 of 2304
[-        ] process > GZIP                                                                         [  0%] 0 of 75

For context, I am running this on a remote EC2 instance with moderate compute (16 vcpu, 64gb).

What am I doing wrong?

Got about halfway through writing a couple of ideas to test this as a “bug” :laughing:

[e8/5b6547] process > DOWNSAMPLE (Downsample on RD_24_250047 with sub level 8000000 using seed 10) [  3%] 75 of 2304
[-        ] process > GZIP                                                                         [  0%] 0 of 75

The 0 of 75 is saying that Nextflow has submitted those jobs, they’re just at the end of the queue behind the 2000+ Downsampling jobs. They’ll get there :laughing:

I don’t think maxForks does what you’re looking for here but you could try it.

I think you want executor.queueSize

Which is n/a by default on local. Try setting it to 50 in your nextflow.config and you should see more “parallel” execution.

1 Like

You were right, I just wasn’t being patient enough. I will try to play with the executor.queueSize and see if that can change the behavior. Thanks!!

1 Like

You are running on a local executor, so queueSize won’t do anything. The question is why do you expect the GZIP tasks to start before finishing the downsampling? There are still plenty of DOWNSAMPLE tasks to do and you have very limited resources available (assuming the tasks need 1 CPU each, only 16 tasks can run in parallel).

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.