Please Help
I have 3 process a,b,c which is running sequentially through nextflow workflow and each process is running in parallel for each arguments passed. This is running on kubernetes job which uses a single aws eksfargate pod. The nextflow code is working. However the process b fails due to memory exhaust issue. To rectify this I applied below nextflow.config file. Now the b process is not starting after the completion of a process. I’m also doubtful whether below config file will meet my criteria of sequential and parallel execution without pod exhaust issue.
Could you please tell me what is wrong in this config.
The new pods created by nextflow.config is unable to connect to database. I passed below parameters but I could see they are not being added as env variable of pods in console and still db connection fails.
Sorry, without the error or logs or database details it’s very hard for me to understand what’s going wrong. Nextflow isn’t doing anything special here though, it’s just creating ordinary pods.
I am facing an issue with process orchestration. I have process a,b,c which should execute sequentially and each process should run in parallel for arg1, arg2, arg3. However it is observed that process b for arg2 starts before process a for arg2 finishes. Looks like process b is just checking and getting triggered if any of the arguments for process a is passed, in this case if process a(arg1 or agr2) finishes it triggers process b(arg2). Even if a(arg2) fails, b(arg2 and arg1) is triggered. How does nextflow know for which arguments the previous process was successful and continue execution of next process only for the passed arguments as and when each argument completes.
How can this issue be addressed. Could you please review my nf file or config file and #help me out here.
Below is my nf file:
#!/usr/bin/env nextflow
nextflow.enable.dsl=2
directory = "${workflow.launchDir}"
logFile = "${workflow.launchDir}/results.log" // Define a log file for all outputs
ceh_bucket_path = "s3://samplebucket"
process a {
errorStrategy 'ignore'
input:
val arg
output:
path "result_a_${arg}.txt"
script:
"""
chmod +x "${workflow.launchDir}/docker/a.sh"
if (cd "${workflow.launchDir}" && bash ./docker/a.sh "\$PARAMETERS" >> "${logFile}" 2>&1); then
touch result_a_${arg}.txt
aws s3 cp result_a_${arg}.txt ${ceh_bucket_path}/nextflow/results/ || { echo "Failed to copy result file"; exit 1; }
else
echo "Rscript failed" >> ${logFile}
exit 1
fi
aws s3 cp ${logFile} ${ceh_bucket_path}/nextflow/mlops_logs/ || { echo "Failed to copy log file"; exit 1; }
"""
}
process b {
errorStrategy 'ignore'
input:
path prev_output
val arg
output:
path "result_b_${arg}.txt"
script:
"""
if (cd "${workflow.launchDir}" && bash ./docker/b.sh "\$PARAMETERS" >> "${logFile}" 2>&1); then
touch b${arg}.txt
aws s3 cp b${arg}.txt ${ceh_bucket_path}/nextflow/results/ || { echo "Failed to copy result file"; exit 1; }
else
echo "Rscript failed" >> ${logFile}
exit 1
"""
}
process c {
errorStrategy 'ignore'
input:
path prev_output
val arg
output:
path "c_${arg}.txt"
script:
"""
"""
}
workflow {
allargs = []
// Channel.from() creates a channel which emits the given arguments
// The 'a' process will be executed concurrently for each argument emitted by the channel
params.each { key, value ->
allargs.add(value)
}
argChannel = Channel.from(allargs)
inputArgCount = allargs.size()
// Define the first chain of jobs
a(argChannel)
bOut = b(a.out, argChannel).collect()
cOut = c(bOut, argChannel)
cOut.view { "Final output: $it" }
}
I think you need to go back and read through the basic tutorial, you’re missing some basic concepts here and making things much more complicated than it needs to be. Take a look at Welcome - training.nextflow.io to get an understanding of channels, processes and file handling. My general advice remains the same as above.
I have some additional questions you will need to answer to make a reproducible version:
What are the contents of a.sh and b.sh?
What are the contents of the environment variable PARAMETERS in processA and processB?
What is the purpose of processC which does not appear to do anything?
What is the purpose of this pipeline? What does the Rscript do?
Why do you parse the parameters with a list: params.each { key, value -> allargs.add(value)}?