Call methods in groovy classes don't seem to work

In groovy, you can create classes with call() methods:

class Demo {
    def call() {}
}

demo = new Demo()
demo() // Runs call() method

This appears not to work in nextflow. The following code gives the error Missing process or function demo()

class Demo {
    def call() {}
}

workflow {
    demo = new Demo()
    demo() // Runs call() method
}

Is there a way to make classes callable?

You have to call (heh) the method:

class Demo {
    def call() {
        println "HELLO"
    }
}

workflow {
    demo = new Demo()
    demo.call()
}

Could you explain why you are doing this? Adding classes and Java features to Nextflow is normally a sign you are going in the wrong direction and I’m worried you have an XY problem.

1 Like

My impression is that call() methods in groovy are supposed to work using the syntax I described in my OP, otherwise you just have a method called “call” which isn’t quite what I was trying to achieve.

I have a couple of goals motivating use of classes. What ties them together is a desire to stay DRY and adhere to SOLID principles. If there are equivalent ways to get this done I am all ears! Since you asked…

  1. Create the ability to insert pause points into the run.

I have a class WorkflowSequencePermit which has params and a description of the workflow steps loaded in, and is passed between subworkflows. Its allows() method lets the subworkflows determine if they should be run.

This assigns responsibility for parsing this aspect of nextflow params to the WorkflowSequencePermit and allows a description of the expected linear sequence of processing steps to be articulated in the base workflow only.

  1. Wrapping importable common closures to facilitate operating on elements structured as a single LinkedHashMap item rather than as a list of items.

I use a variety of closures to enable more powerful manipulation of channel contents than Nextflow affords. Since Nextflow doesn’t provide functionality to import Groovy classes and methods using the include statement, and I don’t want to compile them, I wrap the closures as statics in a class called ExOp. This is imported in a common.groovy file using the GroovyClassLoader, which can then be included in my .nf modules via evaluate(new File("common.groovy")).

The reason I use these closures is that I want the ability to conveniently identify channel item elements with keys, so that instead of a list of elements, we have a LinkedHashMap of elements. The closures I wrote facilitate extracting and ordering channel items as process inputs and receiving outputs back into the LinkedHashMap format. They also facilitate doing joins with the join operator by splintering the appropriately-ordered LInkedHashMap into individual key:value pairs, doing the join, and then reassembling the complete LinkedHashMap.

This functionality gives several advantages:

  • Each subworkflow is shielded from having to know the order in which outputs are provided/needed by previous/subsequent subworkflows.
  • We can select desired elements from subworkflow inputs using legible key strings rather than inscrutable integer offsets.
  • We can much more easily organize parameters provided by users in a .csv specifying per-sample processing params and trivially update them with sensible defaults.
  • Channels become more inspectable, since the view operator will display key labels along with values.
  1. I am still experimenting with this, but I will probably create classes specialized for parsing specific aspects of nextflow params.

I want to decouple parsing parameters from implementing the behavior those params are meant to control. The natural way to do this is to create a class that parses params and provides more stable methods that workflows can poll to determine how they ought to behave. The WorkflowSequencePermit I described above is an example.

My overriding goal here is to make it so that workflows don’t have any responsibility for keeping track of the UI (i.e. the mapping from nextflow params to desired behaviors). This is because they have the responsibility for implementing the control logic that governs which processes are run and how. Something has to map params to desired behaviors, though, and the best answer I’ve come up with is a class.

OK so let’s break it down.

  1. Create the ability to insert pause points into the run.

I have a class WorkflowSequencePermit which has params and a description of the workflow steps loaded in, and is passed between subworkflows. Its allows() method lets the subworkflows determine if they should be run.

This assigns responsibility for parsing this aspect of nextflow params to the WorkflowSequencePermit and allows a description of the expected linear sequence of processing steps to be articulated in the base workflow only.

I’m not 100% sure the purpose of this is over the normal execution of Nextflow. I can’t see anything here that can’t be achieved with channels, could you provide a concrete example?

  1. Wrapping importable common closures to facilitate operating on elements structured as a single LinkedHashMap item rather than as a list of items.

I use a variety of closures to enable more powerful manipulation of channel contents than Nextflow affords. Since Nextflow doesn’t provide functionality to import Groovy classes and methods using the include statement, and I don’t want to compile them, I wrap the closures as statics in a class called ExOp. This is imported in a common.groovy file using the GroovyClassLoader, which can then be included in my .nf modules via evaluate(new File("common.groovy")).

The reason I use these closures is that I want the ability to conveniently identify channel item elements with keys, so that instead of a list of elements, we have a LinkedHashMap of elements. The closures I wrote facilitate extracting and ordering channel items as process inputs and receiving outputs back into the LinkedHashMap format. They also facilitate doing joins with the join operator by splintering the appropriately-ordered LInkedHashMap into individual key:value pairs, doing the join, and then reassembling the complete LinkedHashMap.

This functionality gives several advantages:

  • Each subworkflow is shielded from having to know the order in which outputs are provided/needed by previous/subsequent subworkflows.
  • We can select desired elements from subworkflow inputs using legible key strings rather than inscrutable integer offsets.
  • We can much more easily organize parameters provided by users in a .csv specifying per-sample processing params and trivially update them with sensible defaults.
  • Channels become more inspectable, since the view operator will display key labels along with values.

Although it may not solve all of your issues, you can use a map (LinkedHashMap) as an input to Nextflow and access elements by name. This is a very common pattern in nf-core and widely used in all Nextflow. Here’s a simple example:

process CAT_FILE {

    input:
        val input

    output:
        // Note how we preserve the map as a through value for joining etc.
        tuple val(input), path("${input.name}.txt")

    script:
    """
    cat ${input.file} > ${input.name}.txt
    """
}

workflow {
    // Channel of 2 maps.
    ch_input = Channel.of(
        [name: "test_copy"        , file: file("test.txt", checkIfExists: true)],
        [name: "test_another_copy", file: file("test.txt", checkIfExists: true)]
    )

    CAT_FILE(ch_input).view()
}

However, this is hardly complete. It sounds like what you are after is a full typing system, with the ability to parse arbitrary objects as inputs and outputs. This is exactly something @bentsherman has been working on and has some prototypes already. I don’t think we can give any firm timelines yet but using a more robust input and output system than tuples is a clear improvement for the Nextflow language and key objective. I’m sure he’d appreciate some input to design decisions or code if you have any thoughts.

  1. I am still experimenting with this, but I will probably create classes specialized for parsing specific aspects of nextflow params.

I want to decouple parsing parameters from implementing the behavior those params are meant to control. The natural way to do this is to create a class that parses params and provides more stable methods that workflows can poll to determine how they ought to behave. The WorkflowSequencePermit I described above is an example.

My overriding goal here is to make it so that workflows don’t have any responsibility for keeping track of the UI (i.e. the mapping from nextflow params to desired behaviors). This is because they have the responsibility for implementing the control logic that governs which processes are run and how. Something has to map params to desired behaviors, though, and the best answer I’ve come up with is a class.

So a simple use case of yours might be:

workflow {
    ch_qc_files
         // Skip QC boolean based on a number of parameters
         | filter { !WorkflowSequencePermit.skipQc() }
         | QC
}

Where WorkflowSequencePermit.skipQc checks for number of parameters for whether to run QC.

Of course, this example doesn’t have to use a class:

def skip_qc(channel, params) {
    // extremely complicated logic here
    return true
}

workflow {
    ch_qc_files 
        | filter { !skip_qc(it, params) }
        | QC
}

Naturally, you could do this with the filter operator (as above), if statements or then when statement, in conjunction with functions and/or closures. You can inspect the contents of a channel with these tools to make fairly complicated evaluations, however if your example is exceedingly complex you may want a method. You can do this using a class in the lib/ directory although I note that nf-core has moved away from this recently because they found it wasn’t necessary: GitHub - nf-core/sarek at dev.

In summary, I think you can achieve a good chunk without adding classes and the additional parts should be language level features which we may already be working on. I think you should be able to write your pipeline with Nextflow in it’s current form, but additional features would help and I think these would make excellent examples of the direction the language could take, we always appreciate feedback and ideas so don’t hesitate to post them here or Github.

Wanted to echo Adam here. It sounds like you just want to use map/record types and you are subverting the normal workflow execution model to try and make it work.

I am working on support for typed process inputs/outputs, including record types, here: Static types for process inputs/outputs by bentsherman · Pull Request #4553 · nextflow-io/nextflow · GitHub

Would love to get your feedback to make sure it meets your needs. The PR includes updates to the docs which you can read to see what the syntax looks like. You shouldn’t need to go to all of this effort just to use record types.

I’ll respond in a few comments for different threads.

I still use channels to define the complete workflow as normal. However, my workflow produces intermediate QC and statistics at each processing step, which guide the user’s decision on which parameters to select for processing at the subsequent step. It may be that later processing steps reveal that an earlier step must be redone and processed differently. So we want the ability to run part of the workflow, stop, inspect the QC/stats, make a decision on what params to set for the next workflow step, and then run the pipeline with -resume.

The way I implement this is by having subworkflows that normally run process N query the WorkflowSequencePermit class on whether process N is allowed to run (which is in turn determined by the user’s param settings). If the process runs, it generates an output channel which is emitted by the subworkflow. If not, then the process is not run and no output is emitted, which brings the run to a close.

The example you provided may be a more elegant way to do what I’m attempting, so thank you for that! I have been taking a LinkedHashMap containing all the params that need to be associated with a particular sample and extracting and reordering the specific values for a process, specified as individual arguments. This will let me just pass it in directly and then join the new outputs from the process into the LinkedHashMap for emission from the calling subworkflow.

I’m not too concerned about a full-on typing system for my own work - I just wanted to be able to refer to the parameters I’m interested in by name.

Edit - I tried it out and it did in let me accomplish my goal of using keys to access specific values with much more elegance. Thanks again!

You are right, except that in Groovy, you can import a class without compiling it using GroovyClassLoader, while I haven’t found an equivalent way to import a closure or function unless it’s compiled. So I’ve been wrapping groups of related methods as static class methods and importing those, using them as if the class were a namespace.

Thanks for the link! For this project, I don’t have an immediate need for static types. I just wanted to be able to access parameter values via named keys, and @Adam_Talbot’s example looks like it will work well, hopefully getting rid of all the extra complicated things I’m currently doing except the more fine-grained workflow control.

I think the fine-grained workflow control class is ultimately necessary. I need my entire workflow to be based on a single Nextflow workflow so that I can use the -resume feature, while being able to run only a part of it so I can use early outputs to guide decisions on how to run later processes. So processes are only run if two conditions are met: their input channel has content (i.e. based on the output of previous processes, as normal) and if the user has permitted that process to run (based on params set in nextflow.config or at the command line).

I’ve only been working with nextflow for a few weeks so again, please let me know if I’m missing something!

Thanks to you and Adam for your help.

Actually @Adam_Talbot @bentsherman, there is one other thing that I still need complicated extra functions for to fully use LinkedHashMaps as my channel values.

My users will be using .csv files with headers to specify their data structure. The headers may be in any order. The data is partially normalized in the manner of a relational database, so that there is a sample_id column in both a per-sample .csv and in a sample group .csv. I need to join the data from the two .csvs together. Since the user may specify sample_id at any column index in both .csv files, I need a way to join on that index.

My current best way is with a function that takes a LinkedHashMap and a subset of its keys, and puts that key subset in the specified order as the first keys in the LinkedHashMap. Example:

lhm = [key1:“value1”, key2:“value2”, sample_id:“sid”]
result = order(lhm, [“key3”])
//result == [sample_id:“sid”, key1:“value1”, key2:“value2”]

This would be done in the context of a map{} operator on the two channels to be joined prior to the join, to ensure that their sample_id columns are first.

My users will be using .csv files with headers to specify their data structure. The headers may be in any order. The data is partially normalized in the manner of a relational database, so that there is a sample_id column in both a per-sample .csv and in a sample group .csv. I need to join the data from the two .csvs together. Since the user may specify sample_id at any column index in both .csv files, I need a way to join on that index.

This one is actually fairly straightforward. The principle is, coerce your channel so you have a matching key (sampleId), then use the join operator. Note, your matching key can be a map and therefore order does not matter, just contents. After you have joined the channels, coerce the channel again back into the shape you need for downstream processes. Here’s an example.

Given data.csv:

sample,fastq1,fastq2
sample1,fastq1_1.fastq,fastq1_2.fastq
sample2,fastq2_1.fastq,fastq2_2.fastq
sample3,fastq3_1.fastq,fastq3_2.fastq

and samples.csv (note missing sample!):

sample,type
sample1,germline
sample2,tumour

main.nf:

workflow {
    sample_info = Channel.fromPath("samples.csv", checkIfExists: true)
        .splitCsv(header: true)
        // Break out the joining key into tuple element 0
        .map { row ->
            tuple( row.subMap("sample"), row )
        }
    
    sequencing_data = Channel.fromPath("data.csv", checkIfExists: true)
        .splitCsv(header: true)
        // Break out the joining key into tuple element 0
        // Also turn FASTQs into file objects to make it more realistic
        .map { row ->
            tuple( row.subMap("sample"), file(row.fastq1), file(row.fastq2) )
        }

    // Join the two data sources by the matching key element (index 0)
    // Using remainder: true preserves all samples, even the ones with missing data
    sequencing_data.join(sample_info, by: 0, remainder: true)
        // Now we need to coerce the channel into the right shape.
        // Use a map to combine both sources of sample info and move FASTQs to end of tuple
        .map { sample, fastq1, fastq2, sample_info ->
            tuple( sample + sample_info, fastq1, fastq2 )
        }
        // Use view to dump the channels
        .view()
}

In this example, we read the two CSV files and create a channel for the contents of each. We then move the sample ID to the first element in the tuple using map, which gives us a joining key. We use join to merge the two channels, then use a map again to tidy up the resulting channel.

Some common use cases of this are to attach metadata to samples or create profiles for each sample type (e.g. tumour vs germline) to modify parameters.

The best example of this is Sarek. It does this by periodically writing a samplesheet throughout the pipeline and can ingest the samplesheet at multiple start points. This allows a user to run the pipeline once, then take the results from the middle and run them again. This requires a lot of work on the part of the developer to implement but is actually simple, just verbose. I’ve done something similar where I use branch to split a single samplesheet into multiple channels.

e.g. samplesheet.csv

id,fastq1,fastq2,bam,vcf
sample1,/path/to/sample1_R1.fastq.gz,/path/to/sample1_R2.fastq.gz,,
sample2,,,/path/to/sample2.bam,
sample3,,,,/path/to/sample3.vcf.gz

main.nf:

workflow {
    sample_info = Channel.fromPath("samples.csv", checkIfExists: true)
        .splitCsv(header: true)
        .branch { row ->
            fastq: row.fastq1 && row.fastq2
            bam: row.bam
            vcf: row.vcf
        }

    sample_info.fastq.view()
    sample_info.bam.view()
    sample_info.vcf.view()
}

There is also the resume feature, but this is designed for restarts and failures, which is slightly different but would enable you to ‘jump forward’ in the pipeline.

Of course the ideal situation is a fully idempotent job scheduling system, where processes can be shared across pipeline runs based on the content. That’s a big challenge and probably not going to arrive soon!

I am using the -resume feature for my workflow control. It’s not clear to me what advantage comes from a custom solution for recording and reloading the workflow state as in Sarek. Any thoughts on that? I understand -resume is designed for resuming after a failure, but it seems like it also works fine for resuming after an intentional pause.

I was already using the join operator - the extra operations were for preparing the data for the join. Your method using the subMap method seems better since it lets me delete additional custom code! Thanks for the great examples.

Honestly, it’s absolutely fine for that. In fact, using Nextflow like a notebook and iteratively building on your analysis is a great use case. Nextflow is very defensive with it’s caching, ensuring great reproducibility but is very sensitive and will restart an analysis if anything changes in the provenance of files. Putting an additional entry point to the workflow gives you a way of manually jumping ahead in a situation where the analysis has changed. Let’s say you have been supplied some aligned samples, but you don’t have the resources to realign all the sequencing data to your genome. Using BAMs as an entry point to the workflow enables you to perform downstream steps immediately without requiring a realignment. Of course, you lose the provenance and reproducibility, but this was a conscious decision by the user.

It’s all about balance and compromise, only you can come up with the best solution for you!

That’s great, this is a major reason why I’m basing my tool on Nextflow. I love a platform that lets me add features to my software for free!

Thanks a lot for your many thoughtful replies here, Adam.

1 Like