This is likely to be really simple as I’m very new to Nextflow. I am trying to write a fairly straight forward workflow that runs a python program on 3 files and then writes the 3 output filenames to a single file i.e. for further processing. However, I only ever get 1 in my output file. Here is the nf code. Thanks for any help.
process runFilterDomains {
publishDir '.' , mode: 'copy'
input:
path chopping_file
output:
path "filtered_${chopping_file.name}"
script:
"""
python ${workflow.projectDir}/filter_domains.py ${chopping_file} -o filtered_${chopping_file.name}
"""
}
/*
* Collect the 3 FILENAMES output from filter_domains.py and save them into a text file as a single line separated by a space
*/
process collectChoppingNames {
publishDir '.' , mode: 'copy'
input:
path filtered_file
output:
path "collected_output.txt"
script:
"""
echo ${filtered_file.name} >> 'collected_output.txt'
"""
}
workflow {
// Create a channel for all chopping files
def chopped_files = Channel.fromPath("chopping_*.txt")
// Run runFilterDomains on the input chopping files
runFilterDomains(chopped_files)
// Collect the filtered output filenames
def filtered_files = Channel.fromPath('filtered_chopping*.txt')
// Run runFilterDomains on the input chopping files
collectChoppingNames(filtered_files)
}
You don’t need to create a channel for intermediate steps. The filtered chopping files are already in a channel named runFilterDomains.out, as they’re the output of this process.
There is also another issue with this second Channel.FromPath call of yours. It will look for these files in the launch directory, while the intermediate files are actually in the task folder (within work/).
When you want to do something at once with collected outputs, you’re probably looking for the collect channel operator. That’s what I did in my snippet below:
main.nf
process runFilterDomains {
publishDir '.' , mode: 'copy'
input:
path chopping_file
output:
path "filtered_${chopping_file.name}"
script:
"""
# python ${workflow.projectDir}/filter_domains.py ${chopping_file} -o filtered_${chopping_file.name}
head -n1 ${chopping_file} > filtered_${chopping_file.name}
"""
}
/*
* Collect the 3 FILENAMES output from filter_domains.py and save them into a text file as a single line separated by a space
*/
process collectChoppingNames {
publishDir '.' , mode: 'copy'
input:
path filtered_files
output:
path "collected_output.txt"
script:
"""
for file in ${filtered_files}; do
cat \$file >> 'collected_output.txt'
done
"""
}
workflow {
// Create a channel for all chopping files
def chopped_files = Channel.fromPath("chopping_*.txt")
// Run runFilterDomains on the input chopping files
runFilterDomains(chopped_files)
// Run runFilterDomains on the input chopping files
collectChoppingNames(runFilterDomains.out.collect())
}