Recursion in workflow control

I’ve created a mini-example of the workflow design issue I’m having, because the full example is probably too complicated to start with.

In this example, the user should be able to specify a starting list of any length N = 2^A, such as 2, 4, 8, 16, etc. The idea is to collate the list into 2-item pairs, call foo on each pair, collect the outputs, and then repeat iteratively until we are left with a single-element channel. The order in which the input list items are represented in the final output doesn’t matter. For example, 1_2_3_4_5_6_7_8 is fine and so is 1_2_5_6_3_4_7_8.

The problem I’m having is I can’t figure out how to impose a halt condition on the recursion below so that collate_foo_collect will stop calling itself when we have produced a single-element channel. Is there a way to get the results I’m looking for with Nextflow?

process foo {
    input:
    tuple(val(a), val(b))

    output:
    path("${a}_${b}.txt")

    shell:
    "touch !{a}_!{b}"
}

workflow collate_foo_collect {
    take:
    start

    main:
    start | collate(2) | foo | collect | set{result}

    result | view
    result | collate_foo_collect

    emit:
    result
}

workflow {
    channel.of(1, 2, 3, 4, 5, 6, 7, 8) | set{start}
    start | collate_foo_collect
}

Could you please check this pattern and let me know if it helps?

https://nextflow-io.github.io/patterns/feedback-loop/

@mribeirodantas I don’t think that will work unfortunately. The real application here is more complex - merging samples and performing additional processing on the merge prior to feeding those results into a new merge.

My needs include:

  • Recursing a workflow rather than a process
  • Grouping together outputs from the process and using the groups as inputs to the next process call.
  • Calling additional processes (not always the same) on the outputs of the recursive process prior to feeding them back in.

In the toy example I gave, it would be necessary to recurse the collate_foo_collect subworkflow rather than the foo process.

Actually @mribeirodantas, it looks like the recurse feature can be used on workflows, so hypothetically it might be usable for my use case. I’ll play around with it and see if I can get it working and get back to you.

Edit: It does not work because it doesn’t accept queue channels.

1 Like

@mribeirodantas Looping in @bentsherman who seems to have participated quite a bit in the conversation around this experimental feature.

On closer inspection, it looks like the immediate barrier to using the recurse feature for my purposes is that it doesn’t accept queue channels at this time, as stated in the documentation for the feedback loop pattern.

You can get around the queue channel restriction by collecting the inputs beforehand. Instead of passing a queue channel, collect the queue channel into a list of pairs.

You will likely need to refactor the foo process to handle N pairs instead of a single pair. This will reduce the parallelism, but you can recover it by requesting more CPUs per task and processing each pair in parallel via bash & and wait.

I don’t know if the idea will hold when you include all the other details from the actual pipeline. Iterative workflows in general are quite limited at the moment; you can usually make it work but it requires lots of gymnastics.

Alternatively, if you know the maximum possible value of A in 2^A, you can hardcode a fixed number of workflow iterations. I have a code sample for this somewhere, I will try to dig it up

Here is a code sample I wrote to do a fixed number of iterations:

Here is a code sample based on the example you gave:

nextflow.preview.recursion=true

process foo {
    input:
    tuple val(a), val(b)

    output:
    val ab

    exec:
    assert a instanceof Collection
    assert b instanceof Collection
    assert a.size() == b.size()
    ab = (0 ..< a.size()).collect { i -> a[i] + b[i] }
}

workflow iterate {
    take:
    start

    main:
    start
    | map { vals -> vals.collate(2).transpose() }
    | view
    | foo
    | view
    | set { result }

    emit:
    result
}

workflow {
    channel.fromList( 1..8 )
    | collect
    | set { start }

    iterate.recurse(start).until { v -> v.size() == 1 }
}

@bentsherman Thanks for the thoughtful response as usual. My goal is to let users use a .csv to define merges of samples or of the outputs of previous merges. This is difficult to implement without an iteration-based architecture.

For the time being, it sounds like the best options are either to limit users to a finite series of hardcoded merges, each with their own process, or to create a separate workflow for the merge steps that the user will run repeatedly.

I think my rewritten example can be adapted to meet your needs. The input can be anything, including a CSV file. You just have to handle all pairs in a given iteration in a single process. As this is relatively uncharted territory, I’d be happy to help you through the adaptation. It’s a good way to battle test the recursion feature.

Thanks for offering to guide me through the adaptation process! I need to give a bit more thought to this particular feature to better figure out the user interface and whether it seems tractable, since the real problem is more complex than the toy example I’m giving here. I’ll reach out for further discussion if that route seems like a good idea after I’ve had a chance to think about it more.

1 Like