How to connect workflow variables to input

Hi, I have just started looking at Nextflow to make a more reproducible version of my genome assembly pipe which consists of a series of bash scripts. I seem to be quite confused with the input section of the process and how it links to the workflow and params argument. e.g here, i wanted to provide individual paths for the hifi and hic reads directory and then have some sort of python script which spits out the associated reads based on the prefix… from what I understood of “input” is it had to somehow be associated with a channel which takes in some form of data (here I wanted it to be the params argument), but I am not understanding how the input knows exactly which paths to take? or can the input not directly take variables from the workflow… can they only be used in the script section?

params.hifi_dir = ‘/path/to/hifi_reads’
params.hic_dir = ‘/path/to/HiC_reads’

process collectsamples {

  input:
  path hifi
  path hic

  script:
  “”"
  """
}

workflow {
  hifi_ch = Channel.fromPath(“${params.hifi_dir}/*.fastq.gz”)
  hic_ch  = Channel.fromPath(“${params.hic_dir}/*.fq.gz”)
}

I hope that made some sense, Thank you!!

It’s not quite clear what you want to achieve, but generally, you call a process from the workflow using channel inputs that match the structure in your process input specifier. In your case, you would be calling the process like this:

collectsamples (
    hifi_ch,
    hic_ch
)

Most likely, this would be a bad idea though, since you don’t know exactly how the elements of hifi_ch and hic_ch will be paired for each task of the process. Ideally, your channels should therefore contain tuples with some form of unique key that allows you to join the channels. You probably also want to just run a single process task with all the elements of the two channels together. In this case, you need to collect the channel entries first:

hifi_ch = Channel.fromPath(“${params.hifi_dir}/*.fastq.gz”).collect()
hic_ch = Channel.fromPath(“${params.hic_dir}/*.fq.gz”).collect()

Hi @Rhianah , welcome to Nextflow! You might find it useful to run through our beginners’ tutorial which explains in detail how to connect inputs using CLI parmeters, load them into Channels etc. I’d recommend starting with either Hello Nextflow (which has full walkthrough videos available) or the shorter version, Nextflow Run.

The process inputs/outputs can be confusing when you’ are new to Nextflow. It is something I hope to improve in the future.

When you call a process, you should supply the arguments as channels. But when you define the process inputs, each input should correspond to a single value in the corresponding channel.

The following example is not real Nextflow code, but hopefully it explains what happens under the hood:

hifi_ch.merge(hic_ch).map { hifi, hic ->
  collectsamples( hifi, hic )
}

In other words, the channels are “merged” into a single tuple channel, and the process is invoked for each value in the merged channel.

As @Alexander_Nater pointed out, this would be dangerous since hifi_ch and hic_chmight not have the same ordering. But from your process name it sounds like you want to invoke the process once on all samples, in which case you simply need to collect each channel before calling the process.

Hi, thank you all for your responses! :slight_smile: yes, unfortunately I watched the walkthrough videos and went through the documentations a few times but I still couldn’t quite figure out what to do here. Sorry if it wasn’t clear as to what I wanted to achieve! Essentially, I wanted to put quite a large number of samples through a genome assembly pipe starting with hifiasm . I have a directory with all the HiFi reads and one with all the HiC reads. I wanted a script which can take corresponding Hifi and HiC reads and put them through the pipe in parallel, and create a new directory based on the prefix name where all the outputs can be copied to.

I see what you mean with the ordering! So instead, I added this as part of the work flow, hoping that the “join” operator would select the correct samples and then I defined “prefix” so I can tell it to create a directory for the associated reads later…:

workflow {

hic_ch = Channel.fromFilePairs(“${params.hic_dir}/*_{1,2}.fq.gz”)

hifi_ch = Channel.fromPath(“${params.hifi_dir}/*.fastq.gz”)

    .map { file ->

def prefix = file.getBaseName().replaceAll(/\.fastq$/, “”)

tuple(prefix, file)

    }

// Join Hifi + Hic by matching key (prefix)

samples_ch = hifi_ch.join(hic_ch)

samples_ch.view()

// Run prep

prepared= prepare_samples(samples_ch)

I had originally made this prepare_samples.nf module which specifically creates all the directories for each of my samples, but I scrapped it because it was only working when my output was “stdout” (I thought I could output the paths for each reads here and put it into hifiasm but that didn’t work) and the hifiasm module was expecting a tuple, I thought it would be better to just create the directory after the running hifiasm and not use prepare_samples.nf

Just as a reference this was the prepare_samples process:

process prepare_samples {

publishDir “{params.outdir}/{prefix}”, mode: ‘copy’ // symlink to know which files used?

input:

tuple val(prefix), path(hifi), path(hic_files)

output:

stdout

script:

“”"

mkdir -p ${params.outdir}/${prefix}

ln -s ${params.hifi_dir}/$hifi ${params.outdir}/${prefix}/$hifi

ln -s ${params.hic_dir}/${hic_files\[0\]} ${params.outdir}/${prefix}/${hic_files\[0\]}

ln -s ${params.hic_dir}/${hic_files\[1\]} ${params.outdir}/${prefix}/${hic_files\[1\]}

"""

}

Because I decided to put it all through the hifiasm part first, I thought it would be easier to define HiC1, HiC2 individually… so I added this step to the workflow:

samples_ch_flat = samples_ch.map { tuple ->

def prefix = tuple\[0\]

def hifi = tuple[1]

def hic_list = tuple[2].flatten() // flatten the Hic pair

// recoombine everything into a single tuple: [prefix, hifi, hic1, hic2]

\[prefix, hifi\] + hic_list

}

samples_ch_flat.view()

hifiasm(samples_ch_flat)

This is my hifiasm.nf file:

process hifiasm {

conda './Rhianah/pipeline_yaml/hifiasm.yaml'

publishDir “{params.outdir}/{prefix}”, mode: ‘copy’

input:

tuple val(prefix), path(hifi), path(hic1), path(hic2)

output:

path “${prefix}*”

script:

“”"

#SBATCH -A 

#SBATCH -J hifiasm 

#SBATCH -t 30:00:00  

#SBATCH -p memory  

#SBATCH --nodes=1

#SBATCH --ntasks=1

#SBATCH --cpus-per-task=32 

#SBATCH -o ./OandE/%x.out

#SBATCH -e ./%x.err  

hifiasm -o $prefix --h1 $hic1 --h2 $hic2 $hifi

"""

}

It created the comman.sh file but didn’t submit it because of it “violates accounting policy”, but weirdly, when I manually submit the job, it gets queued…But I had a few question in regards to this entire process:

  1. I had to add the slurm directives header into the script section for it to appear in the .command.sh file, is this the normal thing to do, because even though I put this in the .config file, it only wrote this in the .command.sh script: #!/bin/bash -ue

process {

withName: hifiasm { // then just hifiasm gets this much 

    executor = 'slurm' 

    cpus = 32

    queue = 'memory'

}

}

  1. I realised that the hifiasm command written in the script only had the file names and not the actual paths…e.g. hifiasm -o CAES001 --h1 CAES001_1.fq.gz --h2 CAES001_2.fq.gz CAES001.fastq.gz, I ideally wanted to run these without having to copy the file into another directory. I tried to change the workflow so it incorporates the full paths, however I always end up getting an error that the file is outside the scope of the process work directory. Is this because Nextflow can only work with files in it’s work directory? (sandboxing?), I’m guessing there’s not really a way to bypass this? Even if I do copy the files to the work directory, I don’t understand how it tells the submitted script where to find the files unless I specify it whilst writing the script section?

  2. I also thought if I wanted to keep all the paths ect instead of defining them as variables, would it be easier to just put a big bash script which handles collecting the correct reads based on the prefix and then continuing this with hifiasm… but I wasn’t sure how that would work for downstream tools

  3. How do I know that nextflow is using the conda environment I specified? On the first run it showed that it was creating the environment through the yaml file I gave it, however I don’t know how it uses it for the run itself because it doesn’t activate the conda environment in the .command.sh script.

  4. Will this set up create the runs in parallel? Seeing just one .command.sh script made me think that it wouldn’t, but because it didn’t really submit the job I wonder if it would submit other samples in parallel if this one had gone through.

So sorry for such a long explanation! Thank you for any advice!!

While I don’t want to stop you learning Nextflow, I would like point out that there are many Nextflow genome assembly pipelines out there if you’re interested.

1 Like