Branching and splitting

So lets say I have a process (Counts) output called counts that looks like this:

[/home/ubuntu/work/6d/db934dc132cab4147947e3a9d34b8b/PCR35036-35009_counts.csv, 0]
[/home/ubuntu/work/f4/a54dd91f349afc355871ee768eb85d/PCR35039-35012_counts.csv, 3]
[/home/ubuntu/work/14/14f97f85572b6c68f6cc314b161c93/PCR35037-35010_counts.csv, 1]
[/home/ubuntu/work/74/0b0c49fd1af48daeecbac698eea8c5/PCR35038-35011_counts.csv, 2]
[/home/ubuntu/work/a0/6ae399d518b948d98d6b9b8640d386/PCR35120-35016_counts.csv, control]

I want to split this into three outputs - the first is a list of all the filepaths in the first column when the timepoint (in the second column) != control. The second is a list of the timepoints in the second column that is !=control. I want to maintain the order relationship between the two lists. in the third output, I want to just return the file path for timepoint == control.

So, I’m thinking that I want to use a branch on the the output of counts:

Counts.out.counts
    .branch {
         timepoints: it[1] != control
         control: true
    }
.set { results }

results.timepoints
    .multiMap { it ->
         points: it[1]
         files: it[0]
    }
.set { file_points }

This is what I want:

results.control[0] // the value in the first position of the control output (control file name)

/home/ubuntu/work/a0/6ae399d518b948d98d6b9b8640d386/PCR35120-35016_counts.csv
file_points.points // list of time points

[0 3 1 2]
file_points.files // list of file locations corresponding to the time points above

[/home/ubuntu/work/6d/db934dc132cab4147947e3a9d34b8b/PCR35036-35009_counts.csv /home/ubuntu/work/f4/a54dd91f349afc355871ee768eb85d/PCR35039-35012_counts.csv
/home/ubuntu/work/14/14f97f85572b6c68f6cc314b161c93/PCR35037-35010_counts.csv
/home/ubuntu/work/74/0b0c49fd1af48daeecbac698eea8c5/PCR35038-35011_counts.csv ]

I’m not getting this to work and I bet there is a more straightforward way. the lists need to be space separated to work in the processes python script as currently written.

Updated

This is closer -

Counts.out.counts
        .branch {
            timepoints: it[1] != 'control'
            control: true
        }
        .set {split_controls}

    //split_controls.timepoints.view()
    split_controls.control
        .collect { it[0] }
        .toList()
        .set { control_file }

    control_file.view()
    
    split_controls.timepoints
        .multiMap {it ->
            points: it[1]
            files: it[0]
        }
        .set { file_points }

But when I use the points and file ‘lists’ in the next process - it only reads the first one in each - so I think maybe its still not a space separated list

More Progress

AggregateCounts( 
        file_points.files.toList(), 
        file_points.points.toList() 
        )

and a little tweak to my process script and the files seem to be set up as I require:

script:
def files = count_files_str.join(" ")
def times = timepoints_str.join(" ")
"""
python /usr/src/app/aggregate_counts.py \
--file_paths ${files} \
--timepoints ${times}
"""

But now a harder issue. The python script within the docker container invoked by this process can’t find the file.

 FileNotFoundError: [Errno 2] No such file or directory: '/home/ubuntu/work/f9/c3a347ab4717a4712705ecabc991ea/PCR35038-35011_counts.csv'

This file absolutely exists. I’m thinking maybe a permissions issue between the host and the container?

Hello @James_Beck. Please, edit your post to add updates instead of replying to it. It’s more organized this way, and it’s easier for us to understand what’s going on :sweat_smile:.

In order for Nextflow to stage input files to a container image, you need to tell Nextflow these files are inputs. I believe you’re not doing that, and that’s why the files can’t be found. The path to the file that is not found makes me convinced that’s what’s going on. You’re probably passing a string with the paths to the process, and with that, it’s considered to be just a string. Ideally, I would ask you to share a minimal reproducible example, so that I can test. With what you shared, I’m left with guessing :laughing:

Apologies for the bad form. :grinning:

as for the files not being found, I have in my nextflow.config

docker {
    enabled = true
}

I thought that maybe might take care of it, but I think you’re maybe talking about something else.

This is the command call in nextflow:

python /usr/src/app/aggregate_counts.py  \   
--file_paths \ 
/home/ubuntu/Scratch/pamda_test2/work/f9/c3a347ab4717a4712705ecabc991ea/PCR35038-35011_counts.csv \ 
/home/ubuntu/Scratch/pamda_test2/work/e5/7a7c05a5471ef2d0ea43b97794705f/PCR35039-35012_counts.csv \ 
/home/ubuntu/Scratch/pamda_test2/work/8a/b291bcae57884d5fcee45cf6083eb7/PCR35036-35009_counts.csv \ 
/home/ubuntu/Scratch/pamda_test2/work/a9/10afaf5ccb310053e24759def9ff78/PCR35037-35010_counts.csv \
 --timepoints 2 3 0 1

And the file paths are absolutely strings and not paths.

I have this process which counts occurrences of some set of subsequences from merged fastqs.

this is that process and it works producing outputs of the counts correctly.

 process MakeCounts {

    publishDir 'output', mode: 'copy'
    
    container '668591248114.dkr.ecr.us-east-1.amazonaws.com/pamda:1.0'

    cpus 8

    input:
    tuple val(study), val(sample),  val(dir), path(merged_fastq), val(timepoint)
    path(barcode_file)
    path(spacer_file)
    path(yaml_file)
    
    output:
    tuple path("${sample}_counts.csv"),val(timepoint), emit: counts
    
    script:
    """
    python /usr/src/app/make_counts.py \
    --merged_fastq ${merged_fastq} \
    --yaml_file ${yaml_file} \
    --barcode_file ${barcode_file} \
    --spacer_file ${spacer_file} \
    --sample ${sample} \
    --timepoint ${timepoint}
    """
}

That output feeds the AggregateCounts process to aggregate counts:

process AggregateCounts {

    publishDir 'output', mode: 'copy'

    container '668591248114.dkr.ecr.us-east-1.amazonaws.com/pamda:1.0'

    cpus 8

    input:
    val(count_files_str) 
    val(timepoints_str)

    output:
        path("aggregated_counts.csv"), emit: aggregated_csv
        path("summary_counts.csv"), emit: summary_csv

    script:
    def files = count_files_str.join(" ")
    def times = timepoints_str.join(" ")
    """
    python /usr/src/app/aggregate_counts.py \
    --file_paths ${files} \
    --timepoints ${times}
    """
}

so all of the code I wrote above is to take the outputs from MakeCounts an input them correctly into AggregateCounts (and to set aside the place with the control file for later use). This is that code:

MakeCounts.out.counts
        .branch {
            timepoints: it[1] != 'control'
            control: true
        }
        .set {split_controls}

    //split_controls.timepoints.view()
    split_controls.control
        .collect { it[0] }
        .toList()
        .set { control_file }
    
    split_controls.timepoints
        .multiMap {it ->
            points: it[1]
            files: it[0]
        }
        .set { file_points }

    // Aggregation and normalization
    AggregateCounts( 
        file_points.files.toList(), 
        file_points.points.toList() 
        )

In the AggregateCounts process, you’re telling Nextflow that no files are needed as inputs to this process, just values, but this is wrong. You need the files for the aggregate_counts.py script. If this was done without Docker, it would work because the task is run where the files are, so the string being a path to a file is enough, but you’re running in a container, and the container and the host operating system have independent file systems. Usually, you’d need to manually mount a volume so that the container can see the files, but Nextflow does this automatically for you, as long as you tell Nextflow that the task requires access to files, which you are not doing :sweat_smile:

I understand that you need a string with spaces in between for the aggregate_counts.py script, but pass the files correctly to the AggregateCounts process and, within it, convert the list of paths to a string with every item separated by a space (with join, for example).

If you provide me a nextflow.config, a main.nf file and some fake data, I can tell you exactly what you need to do. But the way you’re doing, it’s a bit difficult :sweat_smile: Maybe it’s just a matter of changing val to path, not sure.

Thanks for the guidance. I see that I output the MakeCounts as a path, but took it into AggregateCounts as a val.

I appreciate the offer. I think I’m close to getting this working and kind of want to struggle through incrementally. I think I’ll learn more that way.

Great. I will attach a snippet below that does slightly what I think you’re going through, and working. I tried with the nextflow/nextflow container image and it was able to see the files. Prepare for spoilers :stuck_out_tongue_winking_eye:

// Preparation of dummy data
// Output of makeCounts
Channel
  .of([file('/home/ubuntu/work/6d/db934dc132cab4147947e3a9d34b8b/PCR35036-35009_counts.csv'), 0],
      [file('/home/ubuntu/work/f4/a54dd91f349afc355871ee768eb85d/PCR35039-35012_counts.csv'), 3],
      [file('/home/ubuntu/work/14/14f97f85572b6c68f6cc314b161c93/PCR35037-35010_counts.csv'), 1],
      [file('/home/ubuntu/work/74/0b0c49fd1af48daeecbac698eea8c5/PCR35038-35011_counts.csv'), 2],
      [file('/home/ubuntu/work/a0/6ae399d518b948d98d6b9b8640d386/PCR35120-35016_counts.csv'), 'control']
  )
  .set { makeCount_output }

process AggregateCounts {
  debug true
  container 'nextflow/nextflow'

  input:
    path filepaths
    val types
  output:
    stdout

  script:
  def file = filepaths
  def times = types.join(' ')
  """
    echo python /usr/src/app/aggregate_counts.py \
    --file_paths ${file} \
    --timepoints ${times}
    stat ${file[0]}
  """
}

workflow {
  makeCount_output
    .multiMap {
      paths: it[0]
      types: it[1]
    }
    .set { AggregateCounts_input }
  AggregateCounts(AggregateCounts_input.paths.collect(),
                  AggregateCounts_input.types.collect())
}

Output:

N E X T F L O W  ~  version 23.09.2-edge
Launching `main.nf` [distracted_turing] DSL2 - revision: 56db98ffbf
executor >  local (1)
[3e/b26f26] process > AggregateCounts [100%] 1 of 1 ✔
python /usr/src/app/aggregate_counts.py --file_paths PCR35036-35009_counts.csv PCR35039-35012_counts.csv PCR35037-35010_counts.csv PCR35038-35011_counts.csv PCR35120-35016_counts.csv --timepoints 0 3 1 2 control
  File: ‘PCR35036-35009_counts.csv’ -> ‘/home/ubuntu/work/6d/db934dc132cab4147947e3a9d34b8b/PCR35036-35009_counts.csv’
  Size: 77        	Blocks: 0          IO Block: 4096   symbolic link
Device: 20h/32d	Inode: 21          Links: 1
Access: (0755/lrwxr-xr-x)  Uid: (    0/    root)   Gid: (    0/    root)
Access: 2023-10-06 16:03:54.494488994 +0000
Modify: 2023-10-06 16:03:54.494488994 +0000
Change: 2023-10-06 16:03:54.494488994 +0000
 Birth: -

You can also find below a slightly harder to read but more concise way of doing it using tuples:

// Preparation of dummy data
// Output of makeCounts
Channel
  .of([file('/home/ubuntu/work/6d/db934dc132cab4147947e3a9d34b8b/PCR35036-35009_counts.csv'), 0],
      [file('/home/ubuntu/work/f4/a54dd91f349afc355871ee768eb85d/PCR35039-35012_counts.csv'), 3],
      [file('/home/ubuntu/work/14/14f97f85572b6c68f6cc314b161c93/PCR35037-35010_counts.csv'), 1],
      [file('/home/ubuntu/work/74/0b0c49fd1af48daeecbac698eea8c5/PCR35038-35011_counts.csv'), 2],
      [file('/home/ubuntu/work/a0/6ae399d518b948d98d6b9b8640d386/PCR35120-35016_counts.csv'), 'control']
  )
  .set { makeCount_output }

process AggregateCounts {
  debug true
  container 'nextflow/nextflow'

  input:
    tuple path(filepaths), val(types)
  output:
    stdout

  script:
  def file = filepaths
  def times = types.join(" ")
  """
    echo python /usr/src/app/aggregate_counts.py \
    --file_paths ${file} \
    --timepoints ${times}
    stat ${file[0]}
  """
}

workflow {
  makeCount_output
    .toList()
    .map { it.transpose() }
    .set { AggregateCounts_input }

  AggregateCounts(AggregateCounts_input)
}

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.