Skip to content

DASK

Dask is a popular Python library for parallel computing. It includes, via dask-jobqueue the ability to launch sub tasks in HTCondor. It is also used as part of other frameworks such as Coffea.

Where to find Dask

The Dask libraries are now available in LCG release in CVMFS, and it's recommended to use them from there. Dask really requires the scheduler side and the worker side to be the same, so if, as is true at the time of writing this doc, the batch system is mostly CentOS7, then it's best used from lxplus.cern.ch which is also CentOS7. We have also ensured that ports are opened on the lxplus nodes. The Dask software, and associated things like coffea, are available in LCG_102 and also in the latest devswan nightlies. To load:

# LCG_102
$ . /cvmfs/sft.cern.ch/lcg/views/LCG_102/x86_64-centos7-gcc11-opt/setup.sh

or..

# nightlies
$ . /cvmfs/sft-nightlies.cern.ch/lcg/views/devswan/latest/x86_64-centos7-gcc11-opt/setup.sh

Where to find dask_lxplus (and why?)

Dask jobqueue makes some assumptions about HTCondor clusters that are not completely aligned with the CERN cluster. Therefore there is a small wrapper called dask_lxplus which is strongly recommended. It is available in the nightlies, but if using LCG_102 it is currently necessary to use the library on the submit/scheduler side. For example:

$ . /cvmfs/sft.cern.ch/lcg/views/LCG_102/x86_64-centos7-gcc11-opt/setup.sh
$ pip3 install --user dask_lxplus
[...]
Installing collected packages: dask_lxplus
Successfully installed dask_lxplus-0.2.2
$ export PYTHONPATH=~/.local/lib/python3.9/site-packages:$PYTHONPATH

Using containers

The dask_lxplus wrapper is also supplied in a singularity ready container: /cvmfs/unpacked.cern.ch/gitlab-registry.cern.ch/batch-team/dask-lxplus/lxdask-cc7\:latest. Example usage below.

Just use pip / conda in AFS?

This isn't recommended. At scale it's likely to cause problems in AFS that will lead to filesystem slowdowns and throttling. Using cvmfs, either the LCG release or singularity containers at least allows for some cache on the workers, and won't cause knock-on impact to shared filesystems.

Example use

The way dask-jobqueue works is that it submit jobs via condor that then call back to a scheduler that the initiating code is running. What this means for us is that python code running on lxplus needs to open a known port, and that there are no firewalls blocking that port when workers call back to perform work. We have opened some defined ports for Dask on lxplus & lxbatch machines.

Here therefore is a toy example to demonstrate the mechanics:

$ . /cvmfs/sft.cern.ch/lcg/views/LCG_102/x86_64-centos7-gcc11-opt/setup.sh
#!/usr/bin/env python3

from distributed import Client
from dask_lxplus import CernCluster
import socket

def hname():
    import socket
    return socket.gethostname()

def main():
    n_port = 8786
    with CernCluster(
            cores=1,
            memory='2000MB',
            disk='1000MB',
            death_timeout = '60',
            lcg = True,
            nanny = False,
            container_runtime = "none",
            log_directory = "/eos/user/b/bejones/condor/log",
            scheduler_options={
                'port': n_port,
                'host': socket.gethostname(),
                },
            job_extra={
                '+JobFlavour': '"tomorrow"',
                },
            extra = ['--worker-port 10000:10100']
            ) as cluster:
        print(cluster.job_script())
        with Client(cluster) as client:
            futures = []
            cluster.scale(4)
            for i in range(4):
              f = client.submit(hname)
              futures.append(f)
            print('Result is {}'.format(client.gather(futures)))

if __name__ == '__main__':
    main()

It's important to note a few things. Firstly the python interpreter in the shebang is assuming of course that you've set it up correctly, per the lcg recommendation.

Secondly, to use containers instead, the CernCluster() should be some something like:

with CernCluster(
            cores=1,
            memory='2000MB',
            disk='1000MB',
            death_timeout = '60',
            nanny = False,
            container_runtime = "singularity",
            log_directory = "/eos/user/b/bejones/condor/log",
            scheduler_options={
                'port': n_port,
                'host': socket.gethostname(),
                },
            job_extra={
                '+JobFlavour': '"tomorrow"',
                },
            extra = ['--worker-port 10000:10100']
            ) as cluster:

The third thing to say is that despite the fact that in CERN we fiddle around with resource requirements to force 1 core == 2Gib RAM == 20GiB disk, dask does not know this, and requires you to say something that it will turn into a condor submit file.

Also, it's important for here to include those extra fields such as +JobFlavour

Finally, and importantly, the current setup means that the port 8786 is required so only one Dask client is possible per lxplus node. We should be able to improve that, but for now if there's a port collision, try a different node... For the workers, a port range has been defined: 10000:10100 which should help avoid collisions when jobs are scheduled on the same workers.


Last update: June 22, 2024