Will map process inputs synchrously maintaining order?

Hi, I have this bit of code generated by AI this morning and I am trying to double check if it is correct. Also looking for some sort of documentation I can link here when my future self tries to blame this on me.

    ch_average_inputs = bigwigs_ch
        .combine(windows_ch)
        .map { meta, bigwig, _windows_meta, windows_bed ->
            tuple(meta, windows_bed, bigwig)
        }

    ch_bed_inputs = ch_average_inputs.map { meta, windows_bed, _bigwig ->
        tuple(meta, windows_bed)
    }
    ch_bigwig_inputs = ch_average_inputs.map { _meta, _windows_bed, bigwig ->
        bigwig
    }

    UCSC_BIGWIGAVERAGEOVERBED(ch_bed_inputs, ch_bigwig_inputs)

I know that channels are somewhat asynchronous in nextflow, right? meaning the the order of the channel is not guaranteed. So that’s why I am a bit worried about this implementation. I am assuming that map is however synchronous and will maintain the order considering the input of the two maps is the same channel. Is this true?

Note that this particular “hack” is needed because I can’t (or rather I don’t want to) change the implementation of the nf-core module itself. I would normally declare an input channel with multiple inputs to maintain the order.

.map isn’t guaranteed although I believe in this case it would work fine.

For this specific case, you can use multiMap which is guaranteed to come out in the same order:

ch_average_inputs = bigwigs_ch
    .combine(windows_ch)
    .multiMap { meta, bigwig, _windows_meta, windows_bed ->
        bed: tuple(meta, windows_bed)
        bigwig: tuple(meta, bigwig)
    }

UCSC_BIGWIGAVERAGEOVERBED(ch_average_inputs.bed, ch_average_inputs.bigwig)

You can see further discussion on this here: Is channel output from multiMap guaranteed to be in the same order it comes in? · nextflow-io/nextflow · Discussion #2682 · GitHub

The long term solution is to combine the UCSC_BIGWIGAVERAGEOVERBED input to a single input which could support any combination of inputs. This is the direction the language is going with features such as record types.

process UCSC_BIGWIGAVERAGEOVERBED {
    input:
        tuple val(meta), path(bigwig), path(bed), path(windows)

    // etc
}

// in the workflow...
ch_average_inputs = bigwigs_ch
    .combine(windows_ch)
    .map { meta, bigwig, _windows_meta, windows_bed ->
        tuple(meta, bigwig, windows_bed)
    }

UCSC_BIGWIGAVERAGEOVERBED(ch_average_inputs)

As Adam said we normally use multiMap, mostly to keep the code together. map does work too, but the source channels need to be the same. This is because channels are also queues ( first-in, first-out ). The asynchronicity comes from when objects are put into channels, so when map receives something from an upstream channel, it should process it in the same order as that upstream channel.

2 Likes

Awesome! Thanks for the quick reply! I ll add multimap for now, indeed looks a cleaner result. Record types looks interesting indeed! I have to get another look at it. Thanks again for sharing.

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.