I am utilizing the branch operator to configure different scenarios. For each scenario that meets its condition, a specific file path is received which then serves as the input for its respective process. However, I’ve encountered an issue where once a channel is read, it is consumed and cannot be directly reused. My goal is to reuse the same file, which is output from a branch condition, multiple times in various processes within a subworkflow. Currently, my workaround involves running the first process in the subworkflow, making a copy of the input file, and emitting it in the output to be reused as needed.
Is there a more efficient method to achieve this without needing to duplicate files? Any suggestions or examples would be greatly appreciated!
Thank you for the reply. I am still confused regarding reuse channels, it seems the output from a process can be reused as a channel multiple times if needed in DSL2, however, after processing with operators, it can’t be reuse anymore.
Here is part of the main workflow:
// link barcode with species
SpeciesID.out
.splitCsv(header: true, sep: "\t", strip: true)
.map { row ->
tuple(row.file.split("\\.")[0], row.species)
}
.set { species_anchor }
//Conditional execution with proper mapping after branch
species_anchor
.join(De_novo_assembly_polish.out.consensus, by: 0)
.map { row ->
tuple(row[1], row[2]) // Creating a new tuple with (species, path)
}
.branch {
saureus: it[0] == "Staphylococcus aureus"
return it[1]
klebsiella: it[0] == "Klebsiella pneumoniae" || it[0] == "Klebsiella quasipneumoniae" || it[0] == "Klebsiella variicola"
return it[1]
others: true
return it[1]
}
.set { scenario }
// Execute based on the scenario
Default(scenario.others)
S_aureus(scenario.saureus)
Klebsiella(scenario.klebsiella)
in the subworkflow if I use defaultConsensus for both MLST and AMR_prediction processes, the subworkflow won’t run. My workaround, as shown in the code, is to run the first process in the subworkflow, making a copy of the input file, and emitting it in the output to be reused as needed.
process FIRST {
input:
val x
output:
val y
exec:
y = x + 1
}
process SECOND {
input:
val x
output:
val y
exec:
y = x - 1
}
process THIRD {
input:
val x
output:
val y
exec:
y = x * 100
}
workflow mysub {
take:
a_ch
main:
SECOND(a_ch)
THIRD(a_ch)
emit:
SECOND_out = SECOND.out
THIRD_out = THIRD.out
}
workflow {
Channel
.of(1..10)
| FIRST
| set { my_ch }
mysub(my_ch)
mysub.out.THIRD_out.view()
}
I removed the clone and reused the conditional channels, and it worked as Adam_Talbot and mribeirodantas suggested. I guess the workflow didn’t work before not because of reusing channels, but due to other bugs, and I fixed it without realizing how. Sorry for the trouble, and thank you both for the help.
It’s great to hear you succeeded, @Hanjiewu. Please, pick one of the replies and mark it as a solution to your question. It can be @Adam_Talbot’s first message, for example