How to run the parallel command in script block of a nextflow process?

Hi, I have this process in nextflow DSL2:

process permuting_scores {

  input:
    tuple val(PERMS), path(BINS), path(SCORES), path(HHOTNET)

  output:
    path "scores_${PERMS}.tsv", emit: scores

  script:
    """
      python ${HHOTNET}/src/permute_scores.py \
        -i ${SCORES} \
        -bf ${BINS} \
        -s "${PERMS}" \
        -o scores_${PERMS}.tsv
    """

}

Which works fine and the pipeline runs in reasonable time (~6 hours), however, I need to increase the number of permutations (PERMS) to 100 (currently, it’s set to 5). I launched the pipeline and after 10 days was still running. One idea is to use parallel GNU command, but I haven’t achieved it. I tried this:

process permuting_scores {

  input:
    tuple val(PERMS), path(BINS), path(SCORES), path(HHOTNET)

  output:
    path "scores_*.tsv", emit: scores

  script:
    """
      parallel -j ${task.cpus} --bar \
        python ${HHOTNET}/src/permute_scores.py \
          -i ${SCORES} \
          -bf ${BINS} \
          -s {} \
          -o scores_{}.tsv \
        ::: \$(seq ${PERMS})
    """

}

But I got this error message multiple times:

executor >  local (40)
  [bf/f66d92] data_formatting        | 1 of 1 ✔
  [25/d11c16] similarity_matrix      | 1 of 1 ✔
  [db/c4aa4d] permuting_network (2)  | 0 of 4
  [e6/d2825e] find_permutation_bins  | 1 of 1 ✔
  [22/32a6bc] permuting_scores (36)  | 21 of 100
  [-        ] construct_hierarchies  | 0 of 56
  [-        ] processing_hierarchies -
  [-        ] performing_consensus   -
  ERROR ~ Invalid method invocation `call` with arguments: [work/a0/c50bf04ce2a2e75d6faf42a07064b2/scores_1.tsv, work/a0/c50bf04ce2a2e75d6faf42a07064b2/scores_2.tsv, work/a0/c50bf04ce2a2e75d
6faf42a07064b2/scores_3.tsv, work/25/d11c16a64dc28763b949e97548e1f9/similarity_matrix.h5, work/bf/f66d9224caca980a3068d57a4f7882/index_gene.tsv, 0] (java.util.LinkedList) on _closure30 type

Likewise, I have this code which works perfectly after the previous permuting_scores process without using parallel:

process construct_hierarchies {

  input:
    tuple val(PERMS), path(SIMMATRIX), path(IDXGENE), path(SCORES), path(HHOTNET)

  output:
    tuple path("hierarchy_edge_list_${PERMS}.tsv"), path("hierarchy_index_gene_${PERMS}.tsv")

  script:
    """
      python ${HHOTNET}/src/construct_hierarchy.py \
        -smf ${SIMMATRIX} \
        -igf ${IDXGENE} \
        -gsf ${SCORES} \
        -helf hierarchy_edge_list_${PERMS}.tsv \
        -higf hierarchy_index_gene_${PERMS}.tsv
    """

}

And its dual parallel version is:

process construct_hierarchies {

  input:
    tuple val(PERMS), path(SIMMATRIX), path(IDXGENE), path(SCORES), path(HHOTNET)

  output:
    tuple path("hierarchy_edge_list_*.tsv"), path("hierarchy_index_gene_*.tsv")

  script:
    """
      parallel -j ${task.cpus} --bar \
        python ${HHOTNET}/src/construct_hierarchy.py \
          -smf ${SIMMATRIX} \
          -igf ${IDXGENE} \
          -gsf ${SCORES}_{} \
          -helf hierarchy_edge_list_{}.tsv \
          -higf hierarchy_index_gene_{}.tsv \
        ::: \$(seq 0 ${PERMS})
    """

}

How can I incorporate parallel to my code?

It looks like Nextflow is having trouble parsing the SCORES paths. From the log, it looks like your list of scores has a zero at the end? Perhaps that’s the issue? I can’t see why this would be different between the version with parallel and without though.

If you want, you can parallelise in Nextflow using some channel operations. Add a range from 0 to PERM to your tuple, then transpose to flatten it down:

params.perms = 5

workflow {
    scores = Channel.fromPath("*.txt")
    scores
        .map { scores ->
            [scores, 1..params.perms]  // Add a range from 1 to perms
        }
        .transpose()                   // Flatten to 1 per perm
        .view() { x, y -> "${x}: ${y}"}
}

This will basically make your pipeline concurrent instead of parallelised.

[edit] After you’ve done this, you might want to set maxForks to rate limit your pipeline.

Thanks you Adam for your help!

I have modified the code trying to make it work but still without success. These are my two processes that use parallel (I included a previous process find_permutation_bins just to have more context):

process find_permutation_bins {

  input:
    path NETWORK
    path SCORES
    path IDXGENE
    path HHOTNET

  output:
    path "score_bins.tsv", emit: score_bins

  script:
    """
      python ${HHOTNET}/src/find_permutation_bins.py \
        -gsf ${SCORES} \
        -igf ${IDXGENE} \
        -elf ${NETWORK} \
        -ms 5 \
        -o score_bins.tsv
    """

}

process permuting_scores {

  input:
    tuple val(PERMS), path(BINS), path(SCORES), path(HHOTNET)

  output:
    path "scores_*.tsv", emit: scores

  script:
    """
      parallel -j ${task.cpus} --bar \
        python ${HHOTNET}/src/permute_scores.py \
          -i ${SCORES} \
          -bf ${BINS} \
          -s {} \
          -o scores_{}.tsv \
        ::: \$(seq ${PERMS})
    """

}

process construct_hierarchies {

  input:
    tuple val(PERMS), path(SIMMATRIX), path(IDXGENE), path(SCORES), path(HHOTNET)

  output:
    tuple path("hierarchy_edge_list_*.tsv"), path("hierarchy_index_gene_*.tsv")

  script:
    def scores_base = SCORES.getParent()
    """
      parallel -j ${task.cpus} --bar \
        python ${HHOTNET}/src/construct_hierarchy.py \
          -smf ${SIMMATRIX} \
          -igf ${IDXGENE} \
          -gsf ${scores_base}/scores_{}.tsv \
          -helf hierarchy_edge_list_{}.tsv \
          -higf hierarchy_index_gene_{}.tsv \
        ::: \$(seq 0 ${PERMS})
    """

}

In construct _hierarchies I’m changing one of the input variables: SCORES since (I think) it should change according to the parallel sequence, so it may not correspond to that one inputted by the channel. And this how they are used in the workflow:

// permutation bins

  find_permutation_bins(
    data_formatting.out.edge_list,
    data_formatting.out.gene_score,
    data_formatting.out.index_gene,
    HHOTNET)

  // permutation scores

  chl1_nperms = Channel.of(1..network_permutations)

  scores_perms = find_permutation_bins.out.score_bins
    .combine(data_formatting.out.gene_score)
    .combine(chl1_nperms)
    .map { score_bins, gene_score, nperm -> [nperm, score_bins, gene_score, HHOTNET] }

  permuting_scores(scores_perms)

  // hierarchies construction

  chl0_nperms = Channel.of(0..network_permutations)

  hierarchies_cns = permuting_scores.out.scores
    .map { scores_ -> [ scores_ ] }
    .combine(similarity_matrix.out.sim_mtx)
    .combine(data_formatting.out.index_gene)
    .combine(chl0_nperms)
    .transpose()
    .map { nscores, sim_mtx, index_gene, nperm -> [nperm, sim_mtx, index_gene, nscores, HHOTNET] }

  cnstd_hierarchies = construct_hierarchies(hierarchies_cns)

I transposed the scores thinking about flattening them but I got an error:
FileNotFoundError: [Errno 2] No such file or directory: 'null/scores_17.tsv' then I’m not coding it properly, do you have any idea how to fix this? More importantly, is it the right way to speed up my pipeline? Your approach is applicable in this case? Since I’m already creating a channel 1..network_permutations. Thanks again!

Somewhere, you have a string that goes "${variable}/scores_17.tsv", and the variable isn’t set but I can’t see it in your code examples.

A good question, the answer is…it depends. I’ve never had a lot of success using parallel to speed up my processes, but you’re experience might be different.

There’s a balance between lots of small tasks vs fewer larger ones.

Does your pipeline have lots of small, discrete tasks that have low startup time? In which case splitting it over lots and lots of jobs helps spread it over infrastructure and make it faster. Think about packing 72 x 1 CPU tasks vs packing a few 24 CPU tasks onto a few 32 core nodes and how this will block up your university cluster.

On the other hand, bigger tasks take longer but can share overheads in terms of startup and shutdown time.

Generally, reading and writing from disk is the slowest part of your pipeline, so consider that when chunking up your processes, or even refactoring what your processes do.