Can I use `pubishDir` as an input for another process?

I won’t be surprise if what I’m asking is very heterodox, at least, but if that works, it would simplify a lot my logic.

Basically, I have several process and all publishing some of the desired results into a folder called results_${input.name}l. Then I have one last process, that should run only when all other processes are done, that would use results_${input.name} as an input to create a CSV file.

My tentative process is:

process SEG_ANALYSIS {
    debug true
    publishDir "results_${input.name}", mode: 'copy'

    input:
    path input
    path res_dir

    output:
    path "scans_metrics.csv", emit: metrics_csv

    script:
    """
    seg-analysis -p ${res_dir} -l IRF SRF SHRM PED RPEL PRD HTR RORA -o scans_metrics.csv
    """
}

And, obviously, it’s not working.

The thing is, my python app seg-analysis crawls the input folder (-p ${res_dir}) and does its magic.
But if I can’t use this approach, then I think I will need to change the way seg-analysis works with the inputs.
I’m open for suggestions of course.

Ideally, you shouldn’t use the output published by publishDir, because there’s no guarantee of it’s correctness between runs. For example, you could run some samples, and get a failure part way through. You then decide to remove those failed samples, and resume. The workflow completes, but you create a csv from tasks before the failed task. The publishDir output will still contain the output of the failed samples because they haven’t been manually removed.

The way I might implement this is to pass a path string value as input: that can be used as-is in publishDir, and is also sent into an output: channel, which can then be used to generate the file.

Ideally though, use the relative path from the process, as Adam describes

publishDir occurs asynchronously and is not considered part of the pipeline. Consider it as coming off the side of the pipeline.

The pattern you describe is very common:

Basically, I have several process and all publishing some of the desired results into a folder called results_${input.name}l . Then I have one last process, that should run only when all other processes are done, that would use results_${input.name} as an input to create a CSV file.

Let’s assume you mean:

  1. The input files are staged into the working directory
  2. You need an input.csv which is roughly a samplesheet where each row points to an input file
  3. You run seg-analysis on that input.csv, where each path is relative to the -p directory.

The best way to do this is:

  1. Create a samplesheet using collectFile
  2. (Optional): If you need to, put the inputs in a directory using stageAs

Here’s an example using some bash in place of your Python script:

process CREATE_FILE {
    input:
        val n

    output:
        path "${n}.txt"
    
    script:
    """
    echo ${n} > ${n}.txt
    """
}

process MERGE_FILES {
    input:
        path csv
        path inFiles
    
    output:
        path "output.txt"

    script:
    """
    # Process the CSV file and concatenate contents of each referenced file
    touch output.txt
    while IFS=, read -r file_path || [[ -n "\$file_path" ]]; do
        if [[ -f "\$file_path" ]]; then
            cat "\$file_path" >> output.txt
        fi
    done < "$csv"
    """
}

workflow {
    input_channel = Channel.of(1..10)
    
    in_files = CREATE_FILE(input_channel)

    csv_file = in_files.collectFile(name: 'samples.csv', newLine: true) { infile ->
        infile.name
    }

    out_files = MERGE_FILES(csv_file, in_files.collect())
    out_files.view()
}

Thank you both. I understand the issues pointed out but for now I just did a simple solution that seems to work fine for my current problem. Later I will work in something better.

I did:

in the workflow:

...
    // Wait for all RUN_SEG outputs before running SEG_ANALYSIS
    run_seg_output_ch.segmented_files
        .collect()
        .map { input }
        .set { seg_analysis_input_ch }

    // Run SEG_ANALYSIS after all other processes are complete
    SEG_ANALYSIS(seg_analysis_input_ch).metrics_csv
        .map { csv ->
            def rowCount = csv.readLines().size() - 1
            // Subtract 1 to exclude the header
            "Number of rows in metrics CSV: ${rowCount}"
        }
        .view()

And then, for the final process:

process SEG_ANALYSIS {
    debug false
    publishDir "results_${input.name}", mode: 'copy'

    input:
    path input

    output:
    path "scans_metrics.csv", emit: metrics_csv

    script:
    def res_dir = file("results_${input.name}")
    """
    seg-analysis -p ${res_dir} -l IRF SRF SHRM PED RPEL PRD HTR RORA -o scans_metrics.csv
    """
}