Scatter of Scatter by leveraging csv files?

I have a series of TIFF files. I collected into a Channel like so:

Channel.fromPath("${params.seg_dir}/*Mask.tiff") 
			.ifEmpty { error "No SegMasks in ${params.seg_dir}" }
			.set { segMasks }

But I want to execute a massive parallel strategy on all of them. Not knowing how to do that within a process…I’m attempting to create the parallel execution compartimentalization by writing the start and end point of an integer series to rows of a csv file. For example:

Start,End
1,600
601,1200
1201,1800
1801,2400
2401,3000
3001,3600
3601,4200

it’s a configurable interval.

Then I have attempted to fan out the channel to incorporate these new divisions, in a scatter of scatter type operation. But regardless of all the tweeks and adjustments to this, I cannot get it to work.

mxByImage = identifyMaxValue(segMasks)

// Use Channel.splitCsv to split each CSV into rows and pair with the corresponding image
scatterChannel = mxByImage
		.flatMap { imageFile, csvFile -> 
		    Channel.fromPath(csvFile) 
		        .splitCsv(header: true)
		        .map { row -> 
		            tuple(path(imageFile), row.toString()) 
		        }
		}.flatten()

I end up with output or errors indicating something to do with this:

DataflowBroadcast around DataflowStream[?]

Not a valid path value type: groovyx.gpars.dataflow.DataflowBroadcast (DataflowBroadcast around DataflowStream[?])

I really don’t have any good examples of how to do scatter of scatter operations in nextflow. Any help is appreciated.

There’s a confusion here between Channels and the Objects that come through them. Channels pass Objects (data, e.g. files, lists, maps/dictionaries, etc) from one process to another. Channel factories like Channel.fromPath create Channels. Channel operators like flatMap operate on the Objects so you need to use methods available to the Object to manipulate it. The Objects are often Groovy Collections or Paths so they can confusingly share similar method names to channel operators but do different things. The DataflowStream error means you’re trying to reference a Channel rather than an Object for example a List. Channel operators that use a closure { <code> } should not reference channels, only the objects that come through them. For example .map { meta, img -> img } means my Channel has a List type Object passing through it which I infer from the closure input { meta, img -> . The first element is some kind of Object which I can reference using the variable meta, and the second element is some kind of Object I can reference using img. In this closure, I return the img part of the list. Closures are like anonymous functions. The methods that are available on these Objects are based on what you know you’ve put into the Channel in the first place.

If I understand your question correctly, you’re attempting to generate a cartesian product (outer product, cross product) of the list files and the list of intervals. This is done with the combine operator. Operators — Nextflow documentation

workflow {
    img_files = Channel.fromPath('images/image*.tiff')
    intervals = Channel.fromPath('intervals.csv').splitCsv(header: true)
    img_files.combine(intervals).view()
}

Produces these entries in a channel:

[/workspace/Nextflow_sandbox/images/image1.tiff, [Start:1, End:600]]
[/workspace/Nextflow_sandbox/images/image2.tiff, [Start:1, End:600]]
[/workspace/Nextflow_sandbox/images/image3.tiff, [Start:1, End:600]]
[/workspace/Nextflow_sandbox/images/image1.tiff, [Start:601, End:1200]]
[/workspace/Nextflow_sandbox/images/image2.tiff, [Start:601, End:1200]]
[/workspace/Nextflow_sandbox/images/image3.tiff, [Start:601, End:1200]]
[/workspace/Nextflow_sandbox/images/image1.tiff, [Start:1201, End:1800]]
[/workspace/Nextflow_sandbox/images/image2.tiff, [Start:1201, End:1800]]
[/workspace/Nextflow_sandbox/images/image3.tiff, [Start:1201, End:1800]]
[/workspace/Nextflow_sandbox/images/image1.tiff, [Start:1801, End:2400]]
[/workspace/Nextflow_sandbox/images/image2.tiff, [Start:1801, End:2400]]
[/workspace/Nextflow_sandbox/images/image3.tiff, [Start:1801, End:2400]]
[/workspace/Nextflow_sandbox/images/image1.tiff, [Start:2401, End:3000]]
[/workspace/Nextflow_sandbox/images/image2.tiff, [Start:2401, End:3000]]
[/workspace/Nextflow_sandbox/images/image3.tiff, [Start:2401, End:3000]]
[/workspace/Nextflow_sandbox/images/image1.tiff, [Start:3001, End:3600]]
[/workspace/Nextflow_sandbox/images/image2.tiff, [Start:3001, End:3600]]
[/workspace/Nextflow_sandbox/images/image3.tiff, [Start:3001, End:3600]]
[/workspace/Nextflow_sandbox/images/image1.tiff, [Start:3601, End:4200]]
[/workspace/Nextflow_sandbox/images/image2.tiff, [Start:3601, End:4200]]
[/workspace/Nextflow_sandbox/images/image3.tiff, [Start:3601, End:4200]]

Is this what you’re looking for in the output?

I agree with what @mahesh.binzerpanchal said above, however I interpreted your problem differently, or maybe I jumped ahead to the end. I think you just want the collate or buffer operator.

workflow {
    Channel
        .of(1..100)
        .set { segMasks }

    segMasks.collate(10, true).view()
}

In your case, you would just do .collate(600, remainder: true).

Ah, yeah. That could have been another possibility.

I think I did a poor job of explaining my situation. The CSV files are not distinct, they have been generated from the Tiff files.

I have a process that read the max value, and then creates the buffering parameters for how to parallelize.

process identifyMaxValueNGenerateCsv {
    input:
    path tiffFilePath

    output:
    tuple(path(tiffFilePath), path("pixleSplitOut_*.csv"))

	script:
    template 'splitout_pixel_series.py'
}

PYTHON:

#!/usr/bin/env python3

import tifffile, csv, random, string
import numpy as np

labelMask = "${tiffFilePath}"
interval = 600

random_5_letters = ''.join(random.choice(string.ascii_letters) for _ in range(5))
output_csv = "pixleSplitOut_{}.csv".format(random_5_letters)

slide = tifffile.imread(labelMask)
print(f" Dimensions: {slide.shape}")
topMx = np.max(slide)+1
print(" > Last Label:"+str(topMx))
       
# Create intervals from 1 to topMx with the given interval
intervals = [(i, min(i + interval - 1, topMx)) for i in range(1, topMx, interval)]
print(" N Intervals:"+str(len(intervals)))

# Write intervals to CSV file
with open(output_csv, mode='w', newline='') as file:
	writer = csv.writer(file)
	writer.writerow(['Start', 'End'])
	writer.writerows(intervals)

So that when I get to this point, I am essentially doing 2 scatter operations…but I cannot get the channel to interpret objects directly.

This workflow code:

workflow{
       mxByImage = identifyMaxValue(segMasks)
    	
      scatterChannel = mxByImage
		.map { imageFile, csvFile ->
		    // Split the CSV into rows with header and map to the desired format
		    Channel
		        .fromPath(csvFile) // Convert csvFile to a path object
		        .splitCsv(header: true) // Split CSV into rows with headers
		        .map { row -> path(imageFile), row.start.toInteger(), row.end.toInteger() }
		        
		}.view()

gives me this (only 3 entities):

[a2/749ac3] process > identifyMaxValue (2) [100%] 3 of 3 :heavy_check_mark:
DataflowVariable(value=null)
DataflowVariable(value=null)
DataflowVariable(value=null)

Instead of what I want like this (3 x sets of intervals):

(path(“/path/to/image1.tiff”), 0, 600)
(path(“/path/to/image1.tiff”), 601, 1200)
(path(“/path/to/image2.tiff”), 0, 600)
(path(“/path/to/image2.tiff”), 601, 1200)

As Mahesh said, you want combine:

workflow {

    images = Channel.of(
        "image1.tiff",
        "image2.tiff",
        "image3.tiff"
    )

    steps = Channel
        .of(1..7)
        .map { n -> 
            def step = n * 600
            return [step - 600, step]
        }
    
    images.combine(steps).view()
}
> nextflow run .
N E X T F L O W  ~  version 24.04.4
Launching `./main.nf` [gigantic_sammet] DSL2 - revision: bb0d6a2597
[image1.tiff, 0, 600]
[image2.tiff, 0, 600]
[image3.tiff, 0, 600]
[image1.tiff, 600, 1200]
[image2.tiff, 600, 1200]
[image3.tiff, 600, 1200]
[image1.tiff, 1200, 1800]
[image2.tiff, 1200, 1800]
[image3.tiff, 1200, 1800]
[image1.tiff, 1800, 2400]
[image2.tiff, 1800, 2400]
[image3.tiff, 1800, 2400]
[image1.tiff, 2400, 3000]
[image2.tiff, 2400, 3000]
[image3.tiff, 2400, 3000]
[image1.tiff, 3000, 3600]
[image2.tiff, 3000, 3600]
[image3.tiff, 3000, 3600]
[image1.tiff, 3600, 4200]
[image2.tiff, 3600, 4200]
[image3.tiff, 3600, 4200]