How to optimise the pipeline for resume functionality?

Hello, my pipeline doesn’t support the resume functionality in the second process due to how I built the pipeline. I was wondering what kind of change I can make to be able to use resume.

My pipeline uses different Python scripts in each step. I produce some files in the first step, and use these files and the parameters of each in the second step. To be able to use all of them, I join the each input type together with comma. So I can read all of them in Python, split each by comma and use them in the script (I couldn’t find any smarter way on how to do this properly). One of the inputs is the file and since its path changes every run, the second step is not skipped and runs every time.

I’ve always thought that this part of the pipeline can be implemented better, so that’s why I wanted to ask this by sharing a part of my pipeline below. I am still new to nextflow and some parts might seem stupid, so feedbacks to improve the pipeline are really appreciated!!

process first_process {

  publishDir "${launchDir}/results-${params.project_tag}/objects", pattern: '*.h5ad', mode: 'copy', saveAs: {filename -> "${dims1}i_${keys1}_${dims2}s_${keys2}.h5ad"}
  publishDir "${launchDir}/results-${params.project_tag}/models", pattern: '*.pt', mode: 'copy', saveAs: {filename -> "${dims1}i_${keys1}_${dims2}s_${keys2}.pt"}

  input:
  path(obj)
  val(project_tag)
  val(dims1)
  val(keys1)
  val(dims2)
  val(keys2)

  output:
  tuple path("f.h5ad"), val(dims1), val(keys1), val(dims2), val(keys2), emit: comb
  path("model.pt")

  script:
  """
  python ${baseDir}/bin/first_process.py --object ${obj} --project_tag ${params.project_tag} --dims1 ${dims1} --keys1 ${keys1} --dims2 ${dims2} --keys2 ${keys2}
  """

}

process combine_objects {

  publishDir "${launchDir}/results-${params.project_tag}/", pattern: '*.h5ad', mode: 'copy'

  input:
  path(first_obj)
  val(objs)   // I use this part as val deliberately since I need to combine all the paths and it doesn't work when I try with path
  val(dims1)
  val(keys1)
  val(dims2)
  val(keys2)

  output:
  path("final.h5ad")

  script:
  """
  python ${baseDir}/bin/combine_objects.py --object ${first_obj} --others ${objs} --dims1 ${dims1} --keys1 ${keys1} --dims2 ${dims2} --keys2 ${keys2}
  """
}

workflow {
  Channel.of("${params.dims1}")
         .splitCsv()
         .flatten()
         .combine( Channel.of("${params.dims2}")
                          .splitCsv()
                          .flatten() )
         .combine( Channel.of("${params.keys1}")
                          .splitCsv( sep: '+' )
                          .flatten() )
         .combine( Channel.of("${params.keys2}")
                          .splitCsv( sep: '+' )
                          .flatten() )
         .multiMap { it ->
              dims1: it[0]
              dims2: it[1]
              keys1: it[2]
              keys2: it[3]
              }
         .set {runs}

  first_process(params.object, params.project_tag, runs.dims1, runs.keys1, runs.dims2, runs.keys2)

  first_process.out.comb.multiMap { it -> 
                              obj: it[0]
                              dims1: it[1]
                              keys1: it[2]
                              dims2: it[3]
                              keys2: it[4]
                              }
                      .set {outs}
  
  combine_objects(params.object, outs.obj.collect().map { it.join(',') }, outs.dims1.collect().map { it.join(',') }, outs.keys1.collect().map { it.join(',') }, outs.dims2.collect().map { it.join(',') }, outs.keys2.collect().map { it.join(',') })

Initially I was not quite sure if using collect() and map() for each input would change the order so wrong parameters could be used in the wrong file since I had some issues about the order in another pipeline I wrote. But after some tests, I could validate that the order of the file and their parameters are correct. I feel that there should be a way to improve this pipeline significantly, but I couldn’t find anything to make it better. Do you have any suggestions on how to resolve this and how to improve this pipeline? Many thanks!!

Hey, @batu. Welcome to Seqera Community Forum :wink:

You should be able to use path as an input qualifier in the second process even if it’s a list of paths. Check my snippet below:

process FIRST {
  input:
  val x

  output:
  path "${x}.txt"

  script:
  """
  touch "${x}.txt"
  """
}

process SECOND {
  debug true

  input:
  path list_of_files

  output:
  stdout

  script:
  lfiles = list_of_files.join(',')
  """
  echo my_command --argument ${lfiles} --foo
  """
}

workflow {
  Channel
    .of('a'..'z')
    | FIRST
    | collect
    | SECOND
}

Output: