Proper way to utilize >1 GPUs in a single machine?

For nanopore basecalling, I typically set maxForks to 1 because GPU usage changes and it generally causes failures when another sample attempts to start basescalling only to find that the amount of VRAM it thought was available is no longer. Lets say I have a machine with two GPUs in it, what’s the proper way to have sample 1 go to gpu 1 and sample 2 go to gpu 2? I was envisioning some system that labels samples as even or odd and all odd samples get sent to gpu 1 and all even samples to gpu 2 but I wonder if there’s an official way to do this?

The ideal scenario would be if your GPU-enabled application can simply detect and use all available GPUs (e.g. TensorFlow can do this). Then you can just send one task to each machine.

Failing that, you can use the accelerator directive with AWS Batch, Google Batch, and K8s, and they should be able to schedule individual GPUs for you. On HPC systems you might be able to use clusterOptions to schedule GPUs, as long as the scheduler knows how to use cgroups.

Failing that, it is very difficult to schedule GPUs properly with the local executor. It is generally much better to put your GPUs behind one of these GPU-aware executors and save yourself the headache.

I see. My organization isn’t particularly computer savvy so all I have are some desktop computers, each of which could have 2 GPUs in it. So I think I’m stuck with the local executor. I guess maybe I could make my own frakencluster out of some desktops, SLURM, and a laptop as the login node.

I have a similar issue, running nextflow with the local executor and docker on a single machine with 4 GPUs.

I can specify all GPUs using docker.runOptions but if 3 jobs are running they all use GPU 0, rather than use 1 GPU per job. This is expected, as the local executor is not aware of GPUs.

I found I can also select GPUs by ID at the process level with the process directive,

containerOptions "--gpus '\"device=${gpu_id}\"' "

(Resource constraints | Docker Docs).

My proposed solution for dynamically allocating 1 GPU per process is to create a Channel of the GPU IDs, then have each process take an ID from the Channel and then somehow return the ID it used to the Channel afterwards. There’s no operator to insert a value into a channel AFAIK.

The only way I can see to do this is using a Channel.watchPath('gpus/id_*') or similar, to allow the process to take an ID value from the Channel and write/touch a file in the watchPath to add the ID back into the channel.

Maybe there is a native Groovy/Java atomic list I could use instead that might be simpler than using a Channel here.

Any suggestions appreciated.

1 Like

For the local executor we could probably have some custom logic that keeps track of which GPUs are in use and assign them to GPU tasks by setting the NVIDIA_VISIBLE_DEVICES flag. That would at least work for NVIDIA GPUs which is the most common by far.

You could also put this logic into a plugin as a custom “local gpu” executor. That might be a better way to experiment with this functionality.

In any case, the GPU scheduling should be handled by the scheduler rather than the pipeline code.

1 Like

We have a similar situation. And are trying to deal with just like you mentioned. However the issue is when we want multiple processes to use a global queue channel (using watchPath). This however leads to 3 tasks (from three processes) starting at once on one GPU.

Do you have some ideas how we can tackle this?

@thealanjason my current solution for 4GPUS with docker is as follows, the idea is assigning a GPU ID for each job, and having a separate process for each GPU which runs it’s assigned jobs:

// Define 4 "identical" processes, one for each GPU ID
process run_my_process_0 {
    containerOptions = "--cpus=${params.cpus} --gpus '\"device=0\"' "
    maxForks = 1
    ...
}

process run_my_process_1 {
    containerOptions = "--cpus=${params.cpus} --gpus '\"device=1\"' "
    maxForks = 1
    ...
}

process run_my_process_2 {
    containerOptions = "--cpus=${params.cpus} --gpus '\"device=2\"' "
    maxForks = 1
    ...
}

process run_my_process_3 {
    containerOptions = "--cpus=${params.cpus} --gpus '\"device=3\"' "
    maxForks = 1
    ...
}

workflow run_my_process_channels {
    take:
    input_set

    main:
    // Branch each job depending on GPU assignment
    input_set.branch { v ->
        id0: v[2] == 0
        id1: v[2] == 1
        id2: v[2] == 2
        id3: v[2] == 3
    }
    .set { jobs }

    // Run inputs from each branch on the corresponding GPU-ID process
    run_my_process_0(jobs.id0.map{it[0]})
    run_my_process_1(jobs.id1.map{it[0]})
    run_my_process_2(jobs.id2.map{it[0]})
    run_my_process_3(jobs.id3.map{it[0]})
}

workflow {
    // Assign a job ID, use the modulus to generate a GPU assignment
    def job_id = 0
    new_inputs = Channel
        .fromPath("${params.input}/*.txt")
        .map{ v -> [v, ++job_id]}
        .map{ v -> [v[0], v[1], v[1] % 4]}

    run_my_process_channels(new_inputs)
}

There’s a relevant issue here: Ability to dedicate accelerator directive (GPU) over range · Issue #5570 · nextflow-io/nextflow · GitHub

I’ve achieved it in essentially the same way as @warrenson, using modulo per number of GPUs and assigning each one to a different GPU manually. Not pretty, but perfectly functional.

This same functionality might be possible by using a channel as a GPU ID pool, then the main workflow can call the process with the input files channel, and the GPU ID pool channel, as input args. I don’t think they would need to be strictly paired as tuples.

The process will need to output its GPU ID so it can be added back (with tap?) to the GPU pool channel to be reused.

This should prevent wastage due to wall-time variations between jobs, which was a problem with my original solution.

I don’t think ‘tap’ can be used on an existing channel, but it might be possible to create a channel like.

process my_process {
    containerOptions = "--cpus=${params.cpus} --gpus '\"device=${gpuid}\"' "
    maxForks = 4 // pool length
    
    input:
    path(file)
    val(gpuid)

    output:
    val(gpuid) , emit: gpuid
    ...
}

pool = Channel.fromList([0, 1, 2, 3]).concat(my_process.out.gpuid)

my_process(input_data, pool)

However, I suspect this will create a circular referencing issue.

Never mind, this is not possible since clusterOptions cannot use input variables.

This works though,

    containerOptions = "--runtime nvidia -e NVIDIA_VISIBLE_DEVICES"
    beforeScript "export NVIDIA_VISIBLE_DEVICES=${gpuid}"

Did you try using a closure?

process my_process {
    containerOptions { "--cpus=${task.cpus} --gpus '\"device=${gpuid}\"' " }
    maxForks 4 // pool length
    
    input:
    path(file)
    val(gpuid)

    output:
    val(gpuid) , emit: gpuid
    ...
}

pool = Channel.fromList([0, 1, 2, 3]).concat(my_process.out.gpuid)

my_process(input_data, pool)

You could also use the task.index and modulo arithmetic to select the id.

    containerOptions { "--cpus=${task.cpus} --gpus '\"device=${task.index % max_gpus}\"' " }

    input:
    val max_gpus