Skip to content

DASK

Dask is a popular Python library for parallel computing. It includes, via dask-jobqueue the ability to launch sub tasks in HTCondor.

This document is intended to give hints as to how to use dask-jobqueue with the CERN setup, along with some current limitations.

Setting up DASK

It's recommended to refer to the upstream documentation but I will leave some basic tips here, in order that future sections of this doc make a little more sense.

Conda can be used to create a python installation with the correct utilities needed for dask. In my case, my $HOME is in afs but I wanted my installation in eos. My ~/.condarc looked like this:

channels:
  - conda-forge
  - bioconda
  - defaults

pkgs_dirs:
  - /eos/user/b/bejones/conda
You can create conda environments as required.

[bejones@lxplus778 ~]$ conda-env list
# conda environments:
#
                         /eos/user/b/bejones/dask
base                  *  /usr

In my case I will activate via the path

[bejones@lxplus778 ~]$ conda activate /eos/user/b/bejones/dask
(/eos/user/b/bejones/dask) [bejones@lxplus778 ~]$

The following packages will be needed as a minimum:

$ conda install dask dask-jobqueue

Running jobs

The way dask-jobqueue works is that it submit jobs via condor that then call back to a scheduler that the initiating code is running. It currently requires a shared filesystem to communicate. What this means for us is that python code running on lxplus needs to open a known port, and that there are no firewalls blocking that port when workers call back to perform work.

Here therefore is a toy example to demonstrate the mechanics:

#!/usr/bin/env python

from distributed import Client
from dask_jobqueue import HTCondorCluster
import socket

def hname():
    import socket
    return socket.gethostname()

def main():
    n_port = 8786
    with HTCondorCluster(
            cores=1,
            memory='2000MB',
            disk='1000MB',
            death_timeout = '60',
            nanny = False,
            scheduler_options={
                'port': n_port,
                'host': socket.gethostname()
                },
            job_extra={
                'log': 'dask_job_output.log',
                'output': 'dask_job_output.out',
                'error': 'dask_job_output.err',
                'should_transfer_files': 'Yes',
                'when_to_transfer_output': 'ON_EXIT',
                '+JobFlavour': '"tomorrow"',
                },
            extra = ['--worker-port 10000:10100']
            ) as cluster:
        print(cluster.job_script())
        with Client(cluster) as client:
            futures = []
            cluster.scale(4)
            for i in range(4):
              f = client.submit(hname)
              futures.append(f)
            print('Result is {}'.format(client.gather(futures)))

if __name__ == '__main__':
    main()

It's important to note a few things. Firstly the python interpreter in the shebang only works if you're running from an activated conda env. Otherwise it would need a conda path to the binary, ie /eos/user/b/bejones/conda/bin/python in my case.

The third is that despite the fact that in CERN we fiddle around with resource requirements to force 1 core == 2Gib RAM == 20GiB disk, dask does not know this, and requires you to say something that it will turn into a condor submit file.

Thirdly, it's important for here to include those extra fields such as +JobFlavour

Finally, and importantly, the current setup means that the port 8786 is required so only one Dask client is possible per lxplus node. We should be able to improve that, but for now if there's a port collision, try a different node... For the workers, a port range has been defined: 10000:10100 which should help avoid collisions when jobs are scheduled on the same workers.