How to wait until a process is complete?

Hello there,

I’d like to wait until a process is complete. Once the process is complete I’d like to initiate a next process that takes no inputs.

How do I achieve that?

As of now, I use collect to wait, store into a variable and pass into a process. However, it doesn’t give me desired behaviour.

workflow {
        featurecounts(markduplicates.out.markduplicate_bam)

        featurecounts.out.foldchange | collect(flat: false)  | flatMap()|set { out_foldchange }

        featurecounts.out.foldchange_summary | collect (flat: false) |flatMap()| set { out_foldchange_summary }

        merge_feature(out_foldchange,out_foldchange_summary)
}

I do not want to send any files/channel to merge feature. However, I’d like all samples/data to be through with the featurecounts step.

Please let me know if any other details are needed.

We call state dependency what you’re referring to. There is no data dependency between processes, but for some reason, you still want to create one. We can do that through a state dependency, which is similar to what you described.

See the snippet bellow:

process foo {
    output: 
    val true
    script:
    """
    echo your_command_here
    """
}

process bar {
    input: 
    val ready
    path fq
    script:
    """
    echo other_command_here --reads $fq
    """
}

workflow {
    reads_ch = Channel.fromPath("$baseDir/data/reads/11010*.fq.gz", checkIfExists:true)

    foo()
    bar(foo.out, reads_ch)
}

You can read more about it here.

@mribeirodantas

Thank you for your response.
Sorry, this is not what I’m looking for.

The next step does depend on the output of the previous steps. It’s a hard-coded script that loops around the folders generated in the earlier script, eventually applies batch correction and merges feature counts (RNA-seq data).

feature counts nextflow is as below:

process featurecounts {

tag "$meta.timepoint"
    maxForks 5
        //debug true
        errorStrategy 'retry'
    maxRetries 2
publishDir path: "${params.outdir}/${meta.batch}/${meta.timepoint}/RNA/primary/featurecounts/", mode: 'copy' 

input:
tuple val(meta), path(markdupli_bam, stageAs: 'temp_featurecount/*')

output:
tuple val(meta), path("subreadout.fc.txt"), emit: foldchange
tuple val(meta), path("subreadout.fc.txt.summary"), emit: foldchange_summary

    script:
    """
    /data1/software/subread-2.0.6-Linux-x86_64/bin/featureCounts -a $params.gtf_annotation_file -T 24 -o "subreadout.fc.txt" -p ${markdupli_bam}
    """
}

With current merge feature nextflow code, it runs for each sample. For e.g., if there are 19 samples the merge feature is executed 19 times. However, it’s to be run only once.

merge_feature.nf is as below

process merge_feature {
    
    tag "$meta.timepoint"
        //debug true
        errorStrategy 'retry'
    maxRetries 2
publishDir path: "${params.outdir}/secondary_RNA/merged_featurecounts/", mode: 'copy' 

input:
tuple val(meta), path( foldchange, stageAs: 'temp_mergefeature/*')
tuple val(meta), path( foldchange_summary, stageAs: 'temp_mergefeature/*')
 
output:
tuple val(meta), path("RNAdata_latest.RData"), emit: rdata


path("normCorrCounts_vst_*.tsv"  )
path("rawCounts_*.tsv")  

tuple val(meta),path("zscores_latest.tsv"), emit: latest_scores

path("zscores_*.tsv")

script:

"""
Rscript /data1/software/Rscripts/Daphni2_scripts/RNA_Merge_NextFlow.R $params.outdir ./

"""

}

Even though the merge_feature step depends on previous steps, I do not want to pass the collected channel as an input.

Can that be achieved?

I don’t think the code you shared is what would be applicable in my scenario.

You’re right. I misread what you wrote, sorry for that. In your scenario, you don’t need to create dependency, as it’s already there. collect alone should be enough, though.

I believe so it can be achieved. That’s how MultiQC works in nf-core pipelines, for example. It waits for processes to be over so that it starts. I think the issue in your script is probably related to the input/output channels, not to the collect part, but it’s hard to say unless you give me a minimal reproducible example.

I’m sorry for that, but it’s very counterproductive to help you with this custom scripts. Please, try to bring a minimal reproducible example so that we can work together on a solution.

By the way, it’s good practice to make your pipeline portable which means, among other things, having scripts in the bin folder. You have it hard coded in your script block, which is not recommended.

Check the snippet below. It’s not the best example, but I believe it does what you want:

process FOO {
  maxForks 1

  input:
  tuple val(letter_id), val(letter)

  output:
  tuple val(letter_id), path("${letter_id}_${letter}.txt")

  script:
  """
  #sleep 1
  echo ${letter} > ${letter_id}_${letter}.txt
  """
}

process BAR {
  debug true

  input:
  tuple val(letter_ids), path(letter_files)

  output:
  stdout

  script:
  """
  ls *.txt
  """
}

workflow {
  Channel
    .of(1..26)
    .set { numbers_ch }

  Channel
    .of('a'..'z')
    .set { letters_ch }

  numbers_ch
    .merge(letters_ch)
    .set { input_ch }

  FOO(input_ch)
  FOO
    .out
    .collect(flat: false)
    .transpose()
    .collect(flat: false)
    .set { final_ch }

  BAR(final_ch)
  }

Output:

1 Like

@mribeirodantas

Thank you for your inputs and example.
Sorry, it’s difficult to create example of such complicated stuff. I’d keep in mind.

I’d like to extend on the example you provided and would try to show what I’d like as an output.

Please be patient, I’ve multiple codes/examples below.

  1. In your example for BAR instead of the ls, I’d like to concat the files.
    I tried below code:
process BAR_cat {

    publishDir path: "/data1/users/nextflow/learn_nextflow/merge_feature", mode: 'copy' 

  debug true

tag "$letter_ids"

  input:
  tuple val(letter_ids), path(letter_files)

  output:
  path("merged.txt")

  script:
  """
  cat ${letter_files} >> "merged.txt"
  """
}

Here it generated all number_letter.txt files in addition to the “merged.txt”. I don’t understand why.

How do I resolve it?

  1. When I run my code of merge_feature, it runs for all the samples, that is if the collected channel had 19 variables in it, it runs 19 times instead of one.

Below is example of output of the feature_count step:

ch_feature_count=channel.of( [[batch:'SEMA-MM-001', timepoint:'MM-0486-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ],
  [[batch:'SEMA-MM-001', timepoint:'MM-0482-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ],
  [[batch:'SEMA-MM-001', timepoint:'MM-0485-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ],
  [[batch:'SEMA-MM-001', timepoint:'MM-048-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ]
  )

I collect it using following code

ch_feature_count|collect(flat: false)  | flatMap() | view

If I send this channel to the merge_feature feature it will run for each timepoint.

Second, I am unable to pick variables from the transpose

ch_feature_count| collect(flat: false)  | transpose() | set { transpose_ch }

I’ve tried the following but all fail:

transpose_ch[0]
transpose_ch[[1]]

Please let me know if any other details are needed.

@mribeirodantas
Hi,
Can you please look?

1 Like

I completely agree with you, some issues come from complex workflows, but that also highlights the importance of minimal reproducible examples :smile:. By breaking your problem into small pieces, you make it easier to spot what’s wrong and find a solution. I have benefited from that multiple times already, that’s one of the reasons for recommending using them for questions :wink:

I will answer your questions below:

  1. What do you mean by generated all number_letter files?


    This? If that’s what you mean, Nextflow creates symbolic links for the input files, so it’s not generating, but linking so that the task can create the merged.txt file.

  2. The only way to handle channel elements is through channel operators, closures or within Nextflow processes. If you want a channel with the first element of a channel named transpose_ch, for example, you’d do something like:

transpose_ch.take(1).set { my_new_ch }

@mribeirodantas

Thank you for your response.

For 1. I understood. My bad.

For 2. I don’t think I get desired result.


ch_feature_count | collect(flat: false)   | transpose() | set { transpose_ch }

transpose_ch.take(2).view()

I’m only expecting the text files to be extracted.

Lastly, I still don’t understand why my feature_merge process runs 19 times instead of one.

Do you think you could help me with it?

Thank you again.

A process runs 19 times (19 tasks) because an input channel with 19 elements was provided. Check the channel before sending it to this process to make sure it’s really what you want (apparently a single channel element with multiple items inside).

It’s hard to help you without a minimal reproducible example :sweat_smile: I’m left with too much guessing, you need to help me too :laughing:

@mribeirodantas

Ah OK.
Any clue why the following doesn’t work?

transpose_ch.take(2).view()

I believe the following is what you’re saying - single channel with multiple items inside?

[/mnt/data1/users/sanjeev/nextflow/learn_nextflow/merge_feature/subreadout.fc.txt, /mnt/data1/users/nextflow/learn_nextflow/merge_feature/subreadout.fc.txt, /mnt/data1/users/nextflow/learn_nextflow/merge_feature/subreadout.fc.txt, /mnt/data1/users/nextflow/learn_nextflow/merge_feature/subreadout.fc.txt]

This is what I’ve been trying to extract using .take(2) but it fails.

Thanks! This helps.

This seems to be a channel with a single element with multiple items inside. You can use closures to access individual items. See the example below:

Channel
  .of('a'..'z')
  .collect()
  .set { my_sing_elem_ch }

my_sing_elem_ch
  .view { it[4] }

@mribeirodantas Sorry, I cannot follow you.

Let me try again. I’ve following channel:


ch_feature_count=channel.of( [[batch:'SEMA-MM-001', timepoint:'MM-0486-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ],
  [[batch:'SEMA-MM-001', timepoint:'MM-0482-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ],
  [[batch:'SEMA-MM-001', timepoint:'MM-0485-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ],
  [[batch:'SEMA-MM-001', timepoint:'MM-048-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ]
  )

I perform some transformation:

ch_feature_count| collect(flat: false)  | transpose() | set { transpose_ch }

Now, how do I access the single channel with multiple files?

I’ve tried the suggested method earlier:

transpose_ch.take(2).view()

This fails.

I hope my example is OK?

It is, thank you for that! :slight_smile:

transpose_ch is a channel with two elements. Let’s draw a diagram to view the content of these two elements more easily.

// First
[
  [batch:SEMA-MM-001, timepoint:MM-0486-T-01, tissue:rna, sequencing_type:rna],
  [batch:SEMA-MM-001, timepoint:MM-0482-T-01, tissue:rna, sequencing_type:rna],
  [batch:SEMA-MM-001, timepoint:MM-0485-T-01, tissue:rna, sequencing_type:rna],
  [batch:SEMA-MM-001, timepoint:MM-048-T-01, tissue:rna, sequencing_type:rna]
]
// Second
[
  /Users/mribeirodantas/subreadout.fc.txt, /Users/mribeirodantas/subreadout.fc.txt, /Users/mribeirodantas/subreadout.fc.txt, /Users/mribeirodantas/subreadout.fc.txt
]

See? That’s why we get two items when we try to view the first item with:

transpose_ch.view { it[0] }

If you add a collect when defining the channel

ch_feature_count| collect(flat: false)  | transpose() | collect | set { transpose_ch }

Then you can get each of all these items with the closure, e.g.:

transpose_ch.view { it[0] }

Or

transpose_ch.view { it[0]['batch'] }

@mribeirodantas
I see. This is complicated.

Anyway, coming back to the issue where my process runs 19 times.

How do I only get the following from the transposed channel that contains two elements?

[
  /Users/mribeirodantas/subreadout.fc.txt, /Users/mribeirodantas/subreadout.fc.txt, /Users/mribeirodantas/subreadout.fc.txt, /Users/mribeirodantas/subreadout.fc.txt
]

If I pass this list/array/channel to the merge_feature channel then my merge_feature will run only once.

Thank you for your constant help and troubleshooting this.

You can get the second element by skipping the first. Because you’d still have a list of a list, you need to extract it with [0]. See the snippet below:

ch_feature_count=channel.of( [[batch:'SEMA-MM-001', timepoint:'MM-0486-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ],
  [[batch:'SEMA-MM-001', timepoint:'MM-0482-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ],
  [[batch:'SEMA-MM-001', timepoint:'MM-0485-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ],
  [[batch:'SEMA-MM-001', timepoint:'MM-048-T-01', tissue:'rna', sequencing_type:'rna'],
      file('subreadout.fc.txt')
  ]
  )

ch_feature_count| collect(flat: false)  | transpose() | set { transpose_ch }

transpose_ch.buffer(skip: 1).view { it[0] }

Output:

PS: Please, for next questions let’s try to organize questions in posts. The discussions we’re having right now are loosely tied to the title of the post, because we discussed so much :laughing:.

@mribeirodantas

Thank you we’re close.

PS: Please, for next questions let’s try to organize questions in posts. The discussions we’re having right now are loosely tied to the title of the post, because we discussed so much :laughing:.

I agree. Sorry, the post revolved around many things.

Sorry, the code doesn’t give what I want in a variable.

transpose_ch.buffer(skip: 1).view()

There’s a [ ]

How do I save or extract the following in a variable?

transpose_ch.buffer(skip: 1).view { it[0] }

Please let me know if my question isn’t clear.

I’d only like to store/save the following.

[/mnt/data1/users/sanjeev/nextflow/learn_nextflow/merge_feature/subreadout.fc.txt, /mnt/data1/users/nextflow/learn_nextflow/merge_feature/subreadout.fc.txt, /mnt/data1/users/nextflow/learn_nextflow/merge_feature/subreadout.fc.txt, /mnt/data1/users/nextflow/learn_nextflow/merge_feature/subreadout.fc.txt]

Skip does it partially, but it also generates []

Is there a way to store it without using the iterator?

You didn’t run view with { it[0] }. You need this exactly to get rid of the [].

@mribeirodantas

I ran as:

transpose_ch.buffer(skip: 1).view{it[0]

It gave me desired results on print.

How do I set into a variable?

transpose_ch.buffer(skip: 1).view{it[0]} | set { transpose_ch_new }

I need to assign into a variable.

You can save it to a channel named my_ch with the following snippet:

transpose_ch.buffer(skip: 1).map { it[0] }.set { my_ch }

The output of my_ch.view() :wink:

@mribeirodantas
Thank you.

I hope this channel won’t run 19 times when sent into a process.