Reuse channel result in subworkflows

I am utilizing the branch operator to configure different scenarios. For each scenario that meets its condition, a specific file path is received which then serves as the input for its respective process. However, I’ve encountered an issue where once a channel is read, it is consumed and cannot be directly reused. My goal is to reuse the same file, which is output from a branch condition, multiple times in various processes within a subworkflow. Currently, my workaround involves running the first process in the subworkflow, making a copy of the input file, and emitting it in the output to be reused as needed.

Is there a more efficient method to achieve this without needing to duplicate files? Any suggestions or examples would be greatly appreciated!

You should be able to reuse a channel as simply as using the variable. E.g.:

workflow {
    process_1_out = PROCESS_1(ch_inputs)
    PROCESS_2(process_1_out)
    PROCESS_3(process_1_out)
}

Could you share an example of your code and the problems you are facing?

Thank you for the reply. I am still confused regarding reuse channels, it seems the output from a process can be reused as a channel multiple times if needed in DSL2, however, after processing with operators, it can’t be reuse anymore.

Here is part of the main workflow:

  // link barcode with species
    SpeciesID.out
        .splitCsv(header: true, sep: "\t", strip: true)
        .map { row ->
            tuple(row.file.split("\\.")[0], row.species)
        }
        .set { species_anchor }

    //Conditional execution with proper mapping after branch
    species_anchor
        .join(De_novo_assembly_polish.out.consensus, by: 0)
        .map { row -> 
            tuple(row[1], row[2])  // Creating a new tuple with (species, path)
        }
        .branch {
            saureus: it[0] == "Staphylococcus aureus"
                return it[1]
            klebsiella: it[0] == "Klebsiella pneumoniae" || it[0] == "Klebsiella quasipneumoniae" || it[0] == "Klebsiella variicola"
                return it[1]
            others: true
                return it[1]
        }
        .set { scenario }

    // Execute based on the scenario
    Default(scenario.others)
    S_aureus(scenario.saureus)
    Klebsiella(scenario.klebsiella)

This is one of the subworkflows:

process MLST {
    label "process_low"
    publishDir "${params.outdir}/typing", mode: "copy"
    conda "/root/miniconda3/envs/mlst"

    input:
    path consensus

    output:
    path "${consensus.getSimpleName()}_mlst.csv", emit: mlst
    path "clone/${consensus}", emit: clone

    script:
    """
    mlst ${consensus} --csv > ${consensus.getSimpleName()}_mlst.csv
    mkdir clone
    cp ${consensus} clone/${consensus}
    """
}

process AMR_prediction {
    label "process_low"
    publishDir "${params.outdir}/amr", mode: "copy"
    conda "/root/miniconda3/envs/ABRicate"

    input:
    path consensus

    output:
    path "${consensus.getSimpleName()}_amr_vfdb.tsv", emit: vfdb
    path "${consensus.getSimpleName()}_amr_ncbi.tsv", emit: ncbi

    script:
    """
    abricate ${consensus} --db vfdb > ${consensus.getSimpleName()}_amr_vfdb.tsv
    abricate ${consensus} --db ncbi > ${consensus.getSimpleName()}_amr_ncbi.tsv
    """
}

workflow Default {
    take:
        defaultConsensus

    main:
        MLST(defaultConsensus)
        AMR_prediction(MLST.out.clone) 

    emit:
        mlst = MLST.out.mlst
        clone = MLST.out.clone
        vfdb =  AMR_prediction.out.vfdb
        ncbi =  AMR_prediction.out.ncbi
}

in the subworkflow if I use defaultConsensus for both MLST and AMR_prediction processes, the subworkflow won’t run. My workaround, as shown in the code, is to run the first process in the subworkflow, making a copy of the input file, and emitting it in the output to be reused as needed.

Here, it worked just fine. Check the code below.

process FIRST {
  input:
  val x

  output:
  val y

  exec:
  y = x + 1
}

process SECOND {
  input:
  val x

  output:
  val y

  exec:
  y = x - 1
}

process THIRD {
  input:
    val x

  output:
    val y

  exec:
  y = x * 100
}

workflow mysub {
  take:
  a_ch

  main:
    SECOND(a_ch)
    THIRD(a_ch)

  emit:
    SECOND_out = SECOND.out
    THIRD_out = THIRD.out
}

workflow {
  Channel
    .of(1..10)
    | FIRST
    | set { my_ch }

  mysub(my_ch)
  mysub.out.THIRD_out.view()
}

Do you have a minimal reproducible example that we can have a look?

I removed the clone and reused the conditional channels, and it worked as Adam_Talbot and mribeirodantas suggested. I guess the workflow didn’t work before not because of reusing channels, but due to other bugs, and I fixed it without realizing how. Sorry for the trouble, and thank you both for the help.

1 Like

.clone() is a method in groovy, I wonder if you were hitting some bug where you accidentally used it instead of assigning a variable?

I’m happy your problem is resolved!

It’s great to hear you succeeded, @Hanjiewu. Please, pick one of the replies and mark it as a solution to your question. It can be @Adam_Talbot’s first message, for example :wink:

1 Like

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.