Skip to content

Commit

Permalink
Use better defaults when using max_parallel_tasks with an unconfi…
Browse files Browse the repository at this point in the history
…gured threaded scheduler (#2626)

Co-authored-by: Manuel Schlund <32543114+schlunma@users.noreply.github.com>
  • Loading branch information
bouweandela and schlunma authored Jan 17, 2025
1 parent 04e0cbd commit 0dce90c
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 4 deletions.
12 changes: 10 additions & 2 deletions doc/quickstart/configure.rst
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ For example, Python's ``None`` is YAML's ``null``, Python's ``True`` is YAML's
| | :ref:`running`. | | |
+-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+
| ``max_parallel_tasks`` | Maximum number of parallel processes, | :obj:`int` | ``None`` (number of available CPUs) |
| | see also :ref:`task_priority`. | | |
| | see :ref:`task_priority`. [#f5]_ | | |
+-------------------------------+----------------------------------------+-----------------------------+----------------------------------------+
| ``max_years`` | Maximum number of years to use, see | :obj:`int` | ``None`` (all years from recipe) |
| | :ref:`running`. | | |
Expand Down Expand Up @@ -272,7 +272,15 @@ For example, Python's ``None`` is YAML's ``null``, Python's ``True`` is YAML's
found on ESGF is newer than the local data (if any) or the user specifies a
version of the data that is available only from the ESGF, then that data
will be downloaded; otherwise, local data will be used.
.. [#f5] When using ``max_parallel_tasks`` with a value larger than 1 with the
Dask threaded scheduler, every task will start ``num_workers`` threads.
To avoid running out of memory or slowing down computations due to competition
for resources, it is recommended to set ``num_workers`` such that
``max_parallel_tasks * num_workers`` approximately equals the number of CPU cores.
The number of available CPU cores can be found by running
``python -c 'import os; print(len(os.sched_getaffinity(0)))'``.
See :ref:`config-dask-threaded-scheduler` for information on how to configure
``num_workers``.
.. _config-dask:

Expand Down
54 changes: 52 additions & 2 deletions esmvalcore/_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from shutil import which
from typing import Optional

import dask
import psutil
import yaml
from distributed import Client
Expand Down Expand Up @@ -836,6 +837,50 @@ def _run_sequential(self) -> None:
for task in sorted(tasks, key=lambda t: t.priority):
task.run()

def _get_dask_config(self, max_parallel_tasks: int) -> dict:
"""Configure the threaded Dask scheduler.
Configure the threaded Dask scheduler to use a reasonable number
of threads when the user has not done so. We will run multiple
processes, each of which will start its own scheduler with
`num_workers` threads. To avoid too much parallelism, we would like to
create n_threads = n_cpu_cores / n_processes.
"""
# pylint: disable=import-outside-toplevel
from esmvalcore.preprocessor import PreprocessingTask

if dask.config.get("scheduler", "threads") not in (
"threads",
"threading",
):
# No need to do anything when not using the threaded scheduler
# https://github.com/dask/dask/blob/3504bcc89f7a937b2d48306a17b8eeff57b1e5ae/dask/base.py#L1027-L1050
return {}
if dask.config.get("num_workers", None) is not None:
# No need to do anything when the user has configured "num_workers".
return {}

n_preproc_tasks = sum(
isinstance(t, PreprocessingTask) for t in self.flatten()
)
if n_preproc_tasks == 0:
# No need to do anything when we are not running PreprocessingTasks.
return {}

n_available_cpu_cores = len(os.sched_getaffinity(0))
n_threaded_dask_schedulers = min(n_preproc_tasks, max_parallel_tasks)
n_workers = max(
1, round(n_available_cpu_cores / n_threaded_dask_schedulers)
)
logger.info(
"Using the threaded Dask scheduler with %s worker threads per "
"preprocessing task. "
"See https://docs.esmvaltool.org/projects/ESMValCore/en/"
"latest/quickstart/configure.html#f5 for more information.",
n_workers,
)
return {"num_workers": n_workers}

def _run_parallel(self, scheduler_address, max_parallel_tasks):
"""Run tasks in parallel."""
scheduled = self.flatten()
Expand All @@ -845,12 +890,14 @@ def _run_parallel(self, scheduler_address, max_parallel_tasks):
n_running = 0

if max_parallel_tasks is None:
max_parallel_tasks = os.cpu_count()
max_parallel_tasks = len(os.sched_getaffinity(0))
max_parallel_tasks = min(max_parallel_tasks, n_tasks)
logger.info(
"Running %s tasks using %s processes", n_tasks, max_parallel_tasks
)

dask_config = self._get_dask_config(max_parallel_tasks)

def done(task):
"""Assume a task is done if it not scheduled or running."""
return not (task in scheduled or task in running)
Expand All @@ -866,7 +913,10 @@ def done(task):
None if scheduler_address is None else manager.Lock()
)

with multiprocessing.Pool(processes=max_parallel_tasks) as pool:
with (
dask.config.set(dask_config),
multiprocessing.Pool(processes=max_parallel_tasks) as pool,
):
while scheduled or running:
# Submit new tasks to pool
for task in sorted(scheduled, key=lambda t: t.priority):
Expand Down
57 changes: 57 additions & 0 deletions tests/unit/task/test_taskset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import dask
import pytest

from esmvalcore import _task
from esmvalcore.preprocessor import PreprocessingTask


@pytest.mark.parametrize(
"max_parallel_tasks,available_cpu_cores,n_preproc_tasks,scheduler,expected_workers",
[
(8, 128, 100, "distributed", None), # not using threaded scheduler
(8, 128, 0, "threads", None), # not running preproc tasks
(8, 128, 100, "threads", 16),
(4, 20, 4, "threading", 5), # alternative name for threaded scheduler
(2, 4, 3, "threads", 2),
(2, 4, 3, "threads", 2),
(4, 4, 5, "threads", 1),
(4, 4, 2, "threads", 2),
],
)
def test_taskset_get_dask_config(
mocker,
max_parallel_tasks: int,
available_cpu_cores: int,
n_preproc_tasks: int,
scheduler: str,
expected_workers: int | None,
) -> None:
mocker.patch.object(
_task.os,
"sched_getaffinity",
return_value=set(range(available_cpu_cores)),
)

tasks = _task.TaskSet(
{
PreprocessingTask([], name=f"test{i}")
for i in range(n_preproc_tasks)
}
)

with dask.config.set({"num_workers": None, "scheduler": scheduler}):
config = tasks._get_dask_config(max_parallel_tasks=max_parallel_tasks)

if expected_workers is None:
assert config == {}
else:
assert config == {"num_workers": expected_workers}


def test_taskset_get_dask_config_noop(mocker) -> None:
tasks = _task.TaskSet()

with dask.config.set({"num_workers": 4, "scheduler": "threads"}):
config = tasks._get_dask_config(max_parallel_tasks=2)

assert config == {}

0 comments on commit 0dce90c

Please sign in to comment.