Can process tasks read/write a shared path directly with Fusion?

Hello. We are in the process of migrating a workflow from a SLURM environment to our K8+Fusion based Nextflow cluster and it’s largely going well, except for one thing.

TLDR: Process tasks running in parallel need some basic way to communicate to know when they’re done.

The workflow must generate a predetermined number of artifacts (eg. 1000) but significant computation is required, so it is split over dozens of tasks running in parallel. However, generation is non deterministic and despite the tasks having the same initial parameters, some are far more productive than others. Each will attempt up to generate up to 1000, but only 1000 are required in total. Furthermore the 1000 figure is intrinsic to the model, eg. telling each task to generate 100 each will result in a different outcome. So, a mechanism is needed to alert each running task when the total has been reached so they stop generating.

In SLURM, this can be achieved simply by writing a counter to a shared filesystem. However with Fusion this does not appear to be an option. When directing all processes to write to a common Fusion path, nothing is synchronised across the task containers during execution even with ‘scratch=false’.

I can think of a few side channel solutions to solve this problem, some better than others, but they all add a degree of complexity and other concerns. Have I missed something in terms of native Nextflow that may help here? Can Fusion be made to serve this purpose?

Cheers
Rob

Fascinating question, @racosgrove!

My experience with shared filesystems is that they can be rarely relied upon, particularly under load. If you need inter-task communication, there are purpose-built synchronization primitives (MPI, for example).

Inter-process communication break the independence and idempotent model of Nextflow, but if you can break the tasks down into smaller pieces, we can create a looping channel and spawn tasks until the required number of files are generated. This solution involves the topic channel which is an incubating Nextflow feature, and I also use the nf-boost plugin from @bentsherman to provide the “scan” operator.

nextflow.preview.topic = true

include { scan } from 'plugin/nf-boost'

workflow {
    concurrency = Channel.of(0..12)
    artifacts = Channel.topic('artifacts')

    artifacts
        .map { artifact_files -> artifact_files.size() }
        .scan { acc, size -> acc + size }
        .until { it >= 1000 }
        .mix(concurrency) // "Seed" the channel with 12 values
        .set { limiter }

    GenerateArtifacts(limiter)

    artifacts
        .collect()
        .map { it.take(1000) }
        .view { "Found ${it.size()} artifacts" }
}


process GenerateArtifacts {
    input: 
    val(limit)

    output: 
    path("*.artifact"), topic: artifacts

    script:
    """
    COUNT=\$((RANDOM % 8 + 2)) 
    for i in \$(seq 1 \$COUNT); do
        touch "file_\$i.artifact"
    done
    """
}

Note that because we are not doing any inter-task communications, there will be some tasks ongoing when we reach the 1000 artifact threshold. By creating smaller tasks (in this example, we generate at most 10 artifacts per task), we reduce the number of redundant artifacts produced.

1 Like

Thanks for your quick reply… Very interesting, Your example looks pretty reasonable but I’ll need to wrap my head around it a bit more.

Point taken with shared filesystems. I did consider MPI but it’s another layer of complexity in itself. Ideally I would like a solution that fits with the Nextflow model, so I will give this a go.

Cheers
Rob

Fantastic. I’ll concede that this uses two new features (topic channels and Ben’s scan operator).

If it’s not making sense, don’t hesitate to ping me here and I can step through it in more detail.

Hi again.
I have applied this example and so far it is working quite well. I had some concern about container init overheads when I first saw it, but I am pleased to say I don’t think it’s a problem, and i’m confident I can optimise the container further. But I do have one problem which could be described as an edge case with topics.

The workflow originally presented is actually just a sub workflow that represents one ‘generation’ of the whole. It can occur up to 10 times in series (the output of each generation is sent to a collate process which then becomes the input for the next).

I had been invoking the sub workflow for each generation using module aliasing. But this is where it now goes wrong (and didn’t prior to topics). If I chain two workflows the pipeline hangs with the final process of the sub workflow (collate) never starting. Presumably the reason is this statement in the doc.

“Any process that consumes a topic channel (directly or indirectly) should not send any outputs to that topic, or else the pipeline will hang forever.”

I was hoping topic was scoped to a workflow block, but it seems like it’s global. I guess that is reasonable, but problematic when using module aliasing. As a workaround I attempted to dynamically name the topic to isolate it, which seems fine when defining one:

Channel.topic("particles_${gen}")

But it doesn’t appear to be supported when specifying an output, eg.

    input: 
    val(gen)
    tuple val(taskId), path('In')

    output:
    path('Out/gen_*.RData'), topic: "particles_${gen}" <- ERROR ~ No such variable: gen

Acknowledging that topics is a preview feature so understandably may not do everything yet. But If this were supported it would provide a nice workaround. If you think it’s reasonable I will raise an issue on Git. There may be other ways forward I am not seeing. Any advice appreciated and thanks for your help so far.

Kind regards
Rob

Good points here. The discussion on whether topics should be scoped to workflows would be best done on GitHub - nextflow-io/nextflow: A DSL for data-driven computational pipelines.

I can’t think of a neat way to do this, but it’s likely that the Nextflow eng team might have some good inputs. I’d raise an issue there.