How can I dynamically name collectFile output based on input file

I am using the collectFile() operator to produce a single output file from several “split” input files. I’d like this output filename to use the basename of the input files, and to be sorted based on a tuple key.

Example input file, named test.txt:

hi
hello
goodbye
farewell

The (non-working) code below uses splitText() to split test.txt into two files, each containing two lines (test.1.txt and test.2.txt). The process convert_to_csv then converts each test file to a csv (test.1.csv, test.2.csv). I’d like to use collectFile() to sort by the tuple key, combine the contents of both csv files, and write them to a file called test.csv by dynamically capturing the basename of these files.

#!/usr/bin/env nextflow
nextflow.enable.dsl=2

params.input_file = "test.txt"

process convert_to_csv {
    input:
    tuple val(idx), path(input_file_split)

    output:
    tuple val(idx), path("${input_file_split.baseName}.csv")

    shell:
    """
    paste -d, -s $input_file_split > "${input_file_split.baseName}.csv"
    """
}

workflow {
    def file_idx=0
    Channel
    .fromPath(params.input_file)
    .splitText(by: 2, file: true)
    .map { file -> tuple(++file_idx, file) }
    .set { file_split }

    convert_to_csv(file_split)

    convert_to_csv.out
    .collectFile(
        sort: { it[0] }
    ){
        ["${it[1].getBaseName(2)}.csv", it[1] + '\n']         // This section throws an error.
    }
}

This code results in the error:

ERROR ~ No such file or directory: /Users/gnxsf/Documents/Projects/nextflow_tests/work/20/88647e9258d69156b99398c0d9ff54/test.2.csv

even though the file exists in that exact location.

First of all, I must congratulate (and thank) you for a perfectly laid out minimal reproducible example!

When you are passing a closure the collectFile operator, the closure needs to return a collection of two elements where:

  • the first element is the name of the file into which the results are collected and
  • the second element in the array is either the text to be collected, or a path to a file (and the contents of that file are collected).

In your example, the it[1] is a Path, onto which you are appending a newline. Instead of instructing Nextflow to look for the file

test.2.csv

… you are asking Nextflow to look for the file

test.2.csv\n

The fix would be to remove operation where you append a newline character:

.collectFile( sort: {it[0]} ){ id, infile ->
    ["${infile.getBaseName(2)}.csv", infile]
}

Alternatively, you can call the .text (syntactic sugar for the .getText() method) to return a String to which you can add the newline character:

.collectFile( sort: {it[0]} ){ id, infile ->
    ["${infile.getBaseName(2)}.csv", infile.text + '\n']
}

In your simple example the files being fed in already have a newline character so the first solution is probably more appropriate and cleaner. If the input files in your real-world problem don’t include the newline character, you may prefer to use the second solution.

2 Likes

I would also like to shamelessly plug my nf-boost plugin which provides a “mergeText” function, essentially collectFile as a regular function instead of an operator:

You can use groupTuple and regular list sorting with mergeText to do what you would otherwise do, but it’s more flexible and I would argue easier to read and understand.

I hope to add mergeText to core Nextflow, but for now you can use the plugin if you’d like.

3 Likes

Thank you for your help! Your suggestion solved the issue of writing to a properly named file. However, I notice that the sort{ it[0] } closure in collectFile() doesn’t sort by the key in my tuple. Instead, the split csv files are written in the order of their completion by the convert_to_csv process. What am I doing wrong here?

Also, a bit of a tangential follow-up question:
Is there a way to deposit this collected file into an s3 bucket? I’m using publishDir in all of my processes to copy outputs to a user-specified s3 folder but the output of collectFile() gets deposited in the file system where the head job is running. It would be preferable to have all files written to one output location so the user doesn’t have to collect these from different places.

Pulling from both of your suggestions, here is a working solution that will allow for outputting a sorted and combined file in an s3 bucket:

#!/usr/bin/env nextflow
nextflow.enable.dsl=2

params.input_file = "test.txt"

include {mergeText} from 'plugin/nf-boost'

process convert_to_csv {
    input:
    tuple val(idx), path(input_file_split)

    output:
    tuple val(idx), path("${input_file_split.baseName}.csv")

    shell:
    """
    paste -d, -s $input_file_split > "${input_file_split.baseName}.csv"
    """
}

process ITEMS_TO_TXT {
  // publishDir "s3://path/to/folder", mode: "copy"

  input:
  val items

  output:
  path "${items[0].getBaseName(2)}.csv"

  exec:
  def path = task.workDir.resolve("${items[0].getBaseName(2)}.csv")
  mergeText(items, path)
}


workflow {
    def file_idx=0
    Channel.fromPath(params.input_file)
    | splitText(by: 2, file: true)
    | map { file -> tuple(++file_idx, file) }
    | set { file_split }

    convert_to_csv(file_split)

    convert_to_csv.out
    | toSortedList { a, b -> a[0] <=> b[0] }
    | flatMap
    | map { key, csv -> csv }
    | collect
    | ITEMS_TO_TXT
}

Thank you both for your help and quick responses!

2 Likes

This topic was automatically closed 7 days after the last reply. New replies are no longer allowed.