Help with process that merges results of previous processes

Hello,

I am trying to build a pipeline that uses snippy (GitHub - tseemann/snippy: ✂ Rapid haploid variant calling and core genome alignment) and gubbins (GitHub - nickjcroucher/gubbins: Rapid phylogenetic analysis of large samples of recombinant bacterial whole genome sequences using Gubbins) to generate alignments and build a tree. The pipeline should generate one alignment per sample using the snippy process, then run snippy-core once on all alignments. I am having trouble setting up my snippy-core process to take in all alignments generated for all samples. Right now it seems like the pipeline is trying to run snippy-core per sample instead of on all collected results.

I tried to use versions of snippy nextflow processes I found but I couldn’t figure out the current best practice (snippy/main.nf at 9483c8b5085ad690009e3d90cef8e4000ef0a62e · nf-modules-hub/snippy · GitHub, modules/modules/nf-core/snippy at master · nf-core/modules · GitHub).

The code for my pipeline is below. Any help on figuring this out would be great!

#!/usr/bin/env nextflow     

params.sample_sheet = './samples.csv'
params.output_dir = './results'

params.reference_genome = 'GCF_000195955.2_ASM19595v2_genomic.fna'

process snippy {
    publishDir (
        path: "${params.output_dir}/snippy/${sample_id}",
    )

    conda 'snippy'

    input:
    tuple val(sample_id), path(read1), path(read2)

    output:
    path "snippy/${sample_id}/snps.aligned.fa", emit: aln
    path "snippy/${sample_id}/snps.vcf", emit: vcf

    script:
    """
    snippy --reference ${params.reference_genome} --R1 ${read1} --R2 ${read2} --outdir snippy/${sample_id}
    """
}

process snippy_core {
    conda 'snippy'
    
    publishDir(
        path: "${params.output_dir}/snippy_core"
    )

    input:
    path(vcf)
    path(aligned_fa)

    output:
    path 'snippy_core/core.aln'

    script:
    """
    snippy-core --prefix snippy/core --ref ${params.reference_genome} snippy/*
    """
}

process gubbins {
    conda 'gubbins'

    publishDir(
        path: "${params.output_dir}/gubbins"
    )

    input:
    path 'snippy/core/core.aln'

    output:
    path 'gubbins/*'

    script:
    """
    run_gubbins.py --prefix gubbins/ snippy/core/core.aln
    """
}

workflow {
    Channel.fromPath(params.sample_sheet) \
        | splitCsv(header:true) \
        | map { row-> tuple(row.sample_id, file(row.read1), file(row.read2)) } \
        | snippy | snippy_core | gubbins
}

Hi @Michael_Shaffer. Welcome to the community forum :slight_smile:

Your workflow block is telling Nextflow to pass each output from snippy to snippy_core, so yes, it is being given a sample each time. One solution is to use collect (more info here) to have all the outputs from snippy as a single element and then pass it to snippy_core once.

You can find an example here.

You’ll probably need to rewrite this using dot syntax rather than the pipe syntax since you need to apply .collect on both channels and the pipe syntax is not flexible enough for that.

workflow {
    ch_snippy_in = Channel.fromPath(params.sample_sheet)
        .splitCsv(header:true)
        .map { row-> tuple(row.sample_id, file(row.read1), file(row.read2)) }
    snippy( ch_snippy_in )
    snippy_core(
        snippy.out.aln.collect(),
        snippy.out.vcf.collect()
    )
    gubbins( snippy_core.out )
}
1 Like