Data are getting mixed // when resume is used

Hi there,

I’ve an input file as attached. I read it in, and send it to the process as shown below:

Channel.fromPath(csvFile)
                .splitCsv(header: true).map { it ->
            [
                it.subMap("batch", "timepoint", "tissue", "sequencing_type"),
                [
                    file(it.fastq_1),
                    file(it.fastq_2)
                ]
            ]
        }
        .branch { meta, fastq ->
            rna: meta.tissue == "rna" && meta.sequencing_type == "rna"
            germline: meta.tissue == "normal" && meta.sequencing_type == "wes"
            tumor: meta.tissue == "tumor" && meta.sequencing_type == "wes"
            other: true
        }
        .set { samples }

samples.germline
        // Mix all samples using combine
        .combine(samples.tumor)
        // Filter to only the ones where batch and timepoint are the same
        .filter { germline_meta, germline_fastq, tumor_meta, tumor_fastq ->
            ( germline_meta.batch == tumor_meta.batch ) && ( germline_meta.timepoint == tumor_meta.timepoint )
        } | FASTP

FASTP code:


process FASTP {
	conda '/data1/software/miniconda/envs/MMRADAR/'
	tag "$germline_meta.timepoint"
	maxForks 5
	debug true
	errorStrategy 'retry'
    maxRetries 2
 label 'low_mem'

	publishDir path: "${params.outdir}/${germline_meta.batch}/${germline_meta.timepoint}/WES/primary/fastp/normal/", mode: 'copy', pattern: '*_N*'
    publishDir path: "${params.outdir}/${germline_meta.batch}/${germline_meta.timepoint}/WES/primary/fastp/tumor/", mode: 'copy', pattern: '*_T*'

    input:	
	 tuple val(germline_meta), path(germline_fastq), val(tumour_meta), path(tumour_fastq)
	
output:
	
	tuple val(germline_meta.batch),val(patient_id_tumor), val(germline_meta.timepoint), path("${patient_id_tumor}_trim_{1,2}.fq.gz"), emit: reads_tumor
	path("${patient_id_tumor}.fastp.json"), emit: json_tumor
	path("${patient_id_tumor}.fastp.html"), emit: html_tumor
	
	tuple val(germline_meta.batch),val(patient_id_normal), val(germline_meta.timepoint),path("${patient_id_normal}_trim_{1,2}.fq.gz"), emit: reads_normal
	path("${patient_id_normal}.fastp.json"), emit: json_normal
	path("${patient_id_normal}.fastp.html"), emit: html_normal
	
    script:
	patient_id_normal=germline_meta.timepoint+"_N"
	patient_id_tumor=germline_meta.timepoint+"_T"
	def(r1_normal, r2_normal)=germline_fastq
	def(r1_tumor,r2_tumor)=tumour_fastq

    """

echo "$germline_meta"
echo "$tumour_meta.timepoint"
echo "$patient_id_normal"
echo "$patient_id_tumor"

fastp  --in1 "${r1_tumor}" --in2 "${r2_tumor}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${patient_id_tumor}_trim_1.fq.gz" \
--out2 "${patient_id_tumor}_trim_2.fq.gz" --json "${patient_id_tumor}.fastp.json" \
--html "${patient_id_tumor}.fastp.html" --thread 10

fastp  --in1 "${r1_normal}" --in2 "${r2_normal}" -q 20  -u 20 -l 40 --detect_adapter_for_pe --out1 "${patient_id_normal}_trim_1.fq.gz" \
--out2 "${patient_id_normal}_trim_2.fq.gz" --json "${patient_id_normal}.fastp.json" \
--html "${patient_id_normal}.fastp.html" --thread 10 

   """
}

workflow.onComplete { 
	log.info ( workflow.success ? "completed fastp primary WES!" : "Oops .. something went wrong in fastp primary WES")
}

However, as and when I use resume the data (output files) gets mixed from different samples. For e.g.

/mnt/data1/software/gatk-4.4.0.0/gatk ApplyBQSR -R /home/sanjeev/data1/resources/hg38/bwa/GRCh38.primary_assembly.genome.fa  -I tumor_applybqsr_bam/MM-0256-T-02_T.dedup.sorted.bam \
        --bqsr-recal-file tumor_applybqsr_recal/MM-3309-T-01_T_table.recal -O MM-3309-T-01_T.sorted.markdup.recal.bam \
  --create-output-bam-index true --java-options -Xmx180g

/mnt/data1/software/gatk-4.4.0.0/gatk ApplyBQSR -R /home/sanjeev/data1/resources/hg38/bwa/GRCh38.primary_assembly.genome.fa -I normal_applybqsr_bam/MM-0256-T-02_N.dedup.sorted.bam \
        --bqsr-recal-file normal_applybqsr_recal/MM-3309-T-01_N_table.recal \
    -O MM-3309-T-01_N.sorted.markdup.recal.bam \
        --create-output-bam-index true  --java-options -Xmx180g

This is from a .command.sh file.
As you can see MM-0256-T-02_N is mixed up with MM-3309-T-01_N

applybqsr code is as follows:

process applybqsr {
tag "$timepoint"
	maxForks 3
	debug true
	errorStrategy 'retry'
    maxRetries 2
publishDir path: "${params.outdir}/${batch}/${timepoint}/WES/primary/applybqsr/normal", mode: 'copy', pattern: '*_N*'
    publishDir path: "${params.outdir}/${batch}/${timepoint}/WES/primary/applybqsr/tumor", mode: 'copy', pattern: '*_T*'
 label 'big_mem'

	input:
	tuple val(batch),val(patient_id_tumor),val(timepoint),path(markbam_tumor, stageAs: 'tumor_applybqsr_bam/*')
	tuple val(batch),val(patient_id_tumor),val(timepoint),path(recaltable_tumor,  stageAs: 'tumor_applybqsr_recal/*')
	tuple val(batch),val(patient_id_normal),val(timepoint),path(markbam_normal, stageAs: 'normal_applybqsr_bam/*')
	tuple val(batch),val(patient_id_normal),val(timepoint),path(recaltable_normal  ,stageAs: 'normal_applybqsr_recal/*')

	output:
	tuple val(batch),val(patient_id_tumor),val(timepoint),path("${patient_id_tumor}.sorted.markdup.recal.bam"), emit: recal_bam_tumor
 	tuple val(batch),val(patient_id_tumor),val(timepoint),path("${patient_id_tumor}.sorted.markdup.recal.bai"),  emit: recal_bai_tumor
	tuple val(batch),val(patient_id_normal),val(timepoint),path("${patient_id_normal}.sorted.markdup.recal.bam"), emit: recal_bam_normal
 	tuple val(batch),val(patient_id_normal),val(timepoint),path("${patient_id_normal}.sorted.markdup.recal.bai"),  emit: recal_bai_normal
	
	script:
 patient_id=patient_id_normal.split('_N')[0]

	"""
	/mnt/data1/software/samtools-1.18/samtools index ${markbam_tumor}
	/mnt/data1/software/samtools-1.18/samtools index ${markbam_normal}

/mnt/data1/software/gatk-4.4.0.0/gatk ApplyBQSR -R $params.hg38genome  -I ${markbam_tumor} \\
        --bqsr-recal-file ${recaltable_tumor} -O ${patient_id_tumor}.sorted.markdup.recal.bam \\
  --create-output-bam-index true --java-options -Xmx${task.memory.toGiga()}g

/mnt/data1/software/gatk-4.4.0.0/gatk ApplyBQSR -R $params.hg38genome -I ${markbam_normal} \\
        --bqsr-recal-file ${recaltable_normal} \\
    -O ${patient_id_normal}.sorted.markdup.recal.bam \\
	--create-output-bam-index true  --java-options -Xmx${task.memory.toGiga()}g

        """
}

workflow.onComplete { 
	log.info ( workflow.success ? "completed applyBQSR primary WES!" : "Oops .. something went wrong in applyBQSR primary WES")
}

I am unable to understand what causes it.

I found similar issue on biostars:
https://www.biostars.org/p/9545985/

How do I implement map in my code?
long_format_data.csv (11.8 KB)

Any help would be highly appreciated. Thank you in advance.

I am unable to edit the main post, my WES pipeline looks like as the below:


Channel.fromPath(csvFile)
                .splitCsv(header: true).map { it ->
            [
                it.subMap("batch", "timepoint", "tissue", "sequencing_type"),
                [
                    file(it.fastq_1),
                    file(it.fastq_2)
                ]
            ]
        }
        .branch { meta, fastq ->
            rna: meta.tissue == "rna" && meta.sequencing_type == "rna"
            germline: meta.tissue == "normal" && meta.sequencing_type == "wes"
            tumor: meta.tissue == "tumor" && meta.sequencing_type == "wes"
            other: true
        }
        .set { samples }

samples.germline
        // Mix all samples using combine
        .combine(samples.tumor)
        // Filter to only the ones where batch and timepoint are the same
        .filter { germline_meta, germline_fastq, tumor_meta, tumor_fastq ->
            ( germline_meta.batch == tumor_meta.batch ) && ( germline_meta.timepoint == tumor_meta.timepoint )
        } | FASTP 

align_bwa_mem(FASTP.out.reads_tumor,FASTP.out.reads_normal) 	//already_created index

fixmate(align_bwa_mem.out.sorted_bams_tumor,align_bwa_mem.out.sorted_bams_normal)

gatk_markduplicates(fixmate.out.sorted_fixmate_bams_tumor,fixmate.out.sorted_fixmate_bams_normal)

setupnmdtags(gatk_markduplicates.out.sorted_bam_mark_duplicates_tumor,gatk_markduplicates.out.sorted_bam_mark_duplicates_normal)

recalibrate_bam(setupnmdtags.out.setupnmdtags_bam_tumor,setupnmdtags.out.setupnmdtags_bam_normal)

applybqsr(gatk_markduplicates.out.sorted_bam_mark_duplicates_tumor,recalibrate_bam.out.recalib_table_tumor,gatk_markduplicates.out.sorted_bam_mark_duplicates_normal,recalibrate_bam.out.recalib_table_normal)