Is it bad practice to try and 'pluck' files from an element in a channel that is a directory with channel manipulation?

Lets say you have a process that untars a tar archive, and the process spits out a resulting directory as a single emission (i.e., not the contents ). Please assume this is fixed and I can’t change the output structure of the untarring process :wink:

The contents of the resulting directory emitted hello/ looks like this

hello/
├── hello.csv
└── hello.zip

However, your downstream process needs two specific files that is within the untar emitted directory, as two separate input channels (one for the zip, one for the csv).

Our problem therefore is: how to pick out these two files, and put them in two separate channels - but from within a channel, and where the element in the channel is a directory.

I wanted to ask if this is possible but I also want to ask if this is good practice…

For example: the following appears to work to find the files from the directory

ch_input = Channel
    .fromPath('hello/') // pretend this is an output from the untar process
    .map{
        dir ->
            meta = [ id: dir.name ]
            zip = file(dir + '/**' + '.zip')
            csv = file(dir + '/**' + '.csv')

        [ meta, zip, csv ]
    }
    .transpose()
    .view()

Where the transpose shows you can treat the two files now as separate objects (thus could go into a multiMap or branch to act as the channels for the downstream process).

However I wonder doing this would risk breaking some internal logic where Nextflow is tracking what file is what and where it comes from etc… (e.g., would these two new files not really be recognised as Nextflow as they are appearing out of ‘nowhere’, and break things like resuming)

In general it looks fine, the only problem is the file operation with some string fudging. This line is coercing the directory to a string then back to a file operator:

file(dir + '/**' + '.zip')

You should be wary of this and instead use the native file operations available in Nextflow. Check the docs here: Scripts — Nextflow 23.10.0 documentation.

For this use case, you might want listFiles followed by filter or eachFileMatch.

ch_input = Channel
    .fromPath('hello/') // pretend this is an output from the untar process
    .map{
        input-> 
            def foo = input.listFiles().findAll { it.toString().contains("zip") }
            [foo]
      }
      .view()

.eachFileMatch is slightly uglier but more flexible:

ch_input = Channel
    .fromPath('hello/', type: 'dir', checkIfExists: true) // pretend this is an output from the untar process
    .map{ input ->
        def foo = []
        input.eachFileMatch( ~/.*\.zip$/ ) { myFiles << it }
        foo
      }
      .view()

Relevant Slack thread: Slack

3 Likes