Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster shutdown hangs in batch mode on Linux Python >3.8 #87

Open
kmpaul opened this issue Apr 19, 2022 · 9 comments
Open

Cluster shutdown hangs in batch mode on Linux Python >3.8 #87

kmpaul opened this issue Apr 19, 2022 · 9 comments

Comments

@kmpaul
Copy link
Collaborator

kmpaul commented Apr 19, 2022

What happened:

When Dask-MPI is used in batch mode (i.e., using initialize()) on Linux with Python >3.8, it does not properly shut down the scheduler and worker processes when the client script completes. It hangs during shutdown. This means that the Python 3.9 and Python 3.10 tests of dask_mpi/tests/test_core.py and dask_mpi/teststest_no_exit.py hangs and never finish on CI.

Note that this only occurs on Linux. MacOS executes without hanging.

What you expected to happen:

When the client script completes, the scheduler and worker processes should be shut down without error or hanging.

Minimal Complete Verifiable Example:

Manually executing the dask_mpi/tests/core_basic.py script, with Python 3.9+ on Linux, like so:

mpirun -l -np 4 python dask_mpi/tests/core_basic.py

results in:

Full Logs
[0] 2022-04-20 19:45:00,550 - distributed.scheduler - INFO - State start
[0] 2022-04-20 19:45:00,556 - distributed.scheduler - INFO - Clear task state
[0] 2022-04-20 19:45:00,557 - distributed.scheduler - INFO -   Scheduler at:    tcp://172.17.0.2:36407
[0] 2022-04-20 19:45:00,557 - distributed.scheduler - INFO -   dashboard at:                     :8787
[2] 2022-04-20 19:45:00,573 - distributed.worker - INFO -       Start worker at:     tcp://172.17.0.2:45639
[2] 2022-04-20 19:45:00,573 - distributed.worker - INFO -          Listening to:     tcp://172.17.0.2:45639
[2] 2022-04-20 19:45:00,574 - distributed.worker - INFO -          dashboard at:           172.17.0.2:37653
[2] 2022-04-20 19:45:00,574 - distributed.worker - INFO - Waiting to connect to:     tcp://172.17.0.2:36407
[2] 2022-04-20 19:45:00,574 - distributed.worker - INFO - -------------------------------------------------
[2] 2022-04-20 19:45:00,574 - distributed.worker - INFO -               Threads:                          1
[2] 2022-04-20 19:45:00,575 - distributed.worker - INFO -                Memory:                   0.96 GiB
[2] 2022-04-20 19:45:00,576 - distributed.worker - INFO -       Local Directory: /root/dask-mpi/dask_mpi/tests/dask-worker-space/worker-sev4vqjo
[3] 2022-04-20 19:45:00,579 - distributed.worker - INFO -       Start worker at:     tcp://172.17.0.2:38821
[3] 2022-04-20 19:45:00,580 - distributed.worker - INFO -          Listening to:     tcp://172.17.0.2:38821
[3] 2022-04-20 19:45:00,580 - distributed.worker - INFO -          dashboard at:           172.17.0.2:45157
[3] 2022-04-20 19:45:00,580 - distributed.worker - INFO - Waiting to connect to:     tcp://172.17.0.2:36407
[3] 2022-04-20 19:45:00,581 - distributed.worker - INFO - -------------------------------------------------
[3] 2022-04-20 19:45:00,581 - distributed.worker - INFO -               Threads:                          1
[3] 2022-04-20 19:45:00,581 - distributed.worker - INFO -                Memory:                   0.96 GiB
[3] 2022-04-20 19:45:00,581 - distributed.worker - INFO -       Local Directory: /root/dask-mpi/dask_mpi/tests/dask-worker-space/worker-08kqqntu
[3] 2022-04-20 19:45:00,582 - distributed.worker - INFO - -------------------------------------------------
[2] 2022-04-20 19:45:00,585 - distributed.worker - INFO - -------------------------------------------------
[0] 2022-04-20 19:45:00,998 - distributed.scheduler - INFO - Receive client connection: Client-5d823051-c0e2-11ec-8020-0242ac110002
[0] 2022-04-20 19:45:01,009 - distributed.core - INFO - Starting established connection
[0] 2022-04-20 19:45:01,053 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.17.0.2:45639', name: 2, status: undefined, memory: 0, processing: 0>
[0] 2022-04-20 19:45:01,054 - distributed.scheduler - INFO - Starting worker compute stream, tcp://172.17.0.2:45639
[0] 2022-04-20 19:45:01,054 - distributed.core - INFO - Starting established connection
[2] 2022-04-20 19:45:01,055 - distributed.worker - INFO -         Registered to:     tcp://172.17.0.2:36407
[2] 2022-04-20 19:45:01,056 - distributed.worker - INFO - -------------------------------------------------
[0] 2022-04-20 19:45:01,057 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://172.17.0.2:38821', name: 3, status: undefined, memory: 0, processing: 0>
[2] 2022-04-20 19:45:01,059 - distributed.core - INFO - Starting established connection
[0] 2022-04-20 19:45:01,060 - distributed.scheduler - INFO - Starting worker compute stream, tcp://172.17.0.2:38821
[0] 2022-04-20 19:45:01,060 - distributed.core - INFO - Starting established connection
[3] 2022-04-20 19:45:01,060 - distributed.worker - INFO -         Registered to:     tcp://172.17.0.2:36407
[3] 2022-04-20 19:45:01,061 - distributed.worker - INFO - -------------------------------------------------
[3] 2022-04-20 19:45:01,063 - distributed.core - INFO - Starting established connection
[0] 2022-04-20 19:45:01,325 - distributed.scheduler - INFO - Remove client Client-5d823051-c0e2-11ec-8020-0242ac110002
[0] 2022-04-20 19:45:01,325 - distributed.scheduler - INFO - Remove client Client-5d823051-c0e2-11ec-8020-0242ac110002
[0] 2022-04-20 19:45:01,326 - distributed.scheduler - INFO - Close client connection: Client-5d823051-c0e2-11ec-8020-0242ac110002
[1] Error in atexit._run_exitfuncs:
[1] Traceback (most recent call last):
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/utils.py", line 349, in f
[1]     result = yield future
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
[1]     value = future.result()
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/client.py", line 1193, in _start
[1]     await self._ensure_connected(timeout=timeout)
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/client.py", line 1256, in _ensure_connected
[1]     comm = await connect(
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/comm/core.py", line 289, in connect
[1]     comm = await asyncio.wait_for(
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/asyncio/tasks.py", line 479, in wait_for
[1]     return fut.result()
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/comm/tcp.py", line 439, in connect
[1]     stream = await self.client.connect(
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/tornado/tcpclient.py", line 265, in connect
[1]     addrinfo = await self.resolver.resolve(host, port, af)
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/site-packages/distributed/comm/tcp.py", line 424, in resolve
[1]     for fam, _, _, _, address in await asyncio.get_running_loop().getaddrinfo(
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/asyncio/base_events.py", line 861, in getaddrinfo
[1]     return await self.run_in_executor(
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/asyncio/base_events.py", line 819, in run_in_executor
[1]     executor.submit(func, *args), loop=self)
[1]   File "/root/miniconda/envs/py-3.9/lib/python3.9/concurrent/futures/thread.py", line 169, in submit
[1]     raise RuntimeError('cannot schedule new futures after '
[1] RuntimeError: cannot schedule new futures after interpreter shutdown

HANGS HERE!!! Requires CTRL-C to exit.

Anything else we need to know?:

I believe this is due to changes in asyncio that occurred with the release of Python 3.9+. In particular, it seems that the asyncio.wait_for function blocks when cancelling a task due to timeout until the task has finished cancellation. (See the Python 3.9 release notes) This appears to be due to the dask_mpi.initialize() shutdown procedure depending upon an asyncio call taking place in an atexit handler. It seems that at the time the atexit handler is called, the asyncio loop has been closed, resulting in the RuntimeError: cannot schedule new futures after interpreter shutdown and the subsequent hanging.

Environment:

  • Dask version: 2022.4.1
  • Python version: 3.9.12
  • Operating System: Linux
  • Install method (conda, pip, source): conda
@kmpaul kmpaul changed the title Cluster shutdown fails in batch mode on Python >3.8 Cluster shutdown hangs in batch mode on Linux Python >3.8 Apr 20, 2022
@kmpaul
Copy link
Collaborator Author

kmpaul commented Apr 20, 2022

Note that even with the merge of #89, these errors still occur.

@kmpaul
Copy link
Collaborator Author

kmpaul commented Apr 20, 2022

One potential solution to this is to not use an atexit handler to shut down the cluster. Instead, require that the user shutdown the cluster manually at the end of the script. This would effectively mean that the canonical use case of dask_mpi.initialize() would change from:

initialize()

with Client() as c:
    # dask stuff

with the Client.shutdown() method called by the atexit handler, to

initialize()

with Client() as c:
    # dask stuff
    c.shutdown()

with the c.shutdown() call above being required or the process will hang. We could even encapsulate the c.shutdown() in a function that bookends the initialize() method like so:

def finalize(...):
    with Client() as c:
        c.shutdown()

so that the dask-mpi client script might look like:

initialize()

with Client() as c:
    # dask stuff

# maybe more stuff

finalize()

However, this definitely breaks backwards compatibility. It does fix the above hanging, though. Is this acceptable?

CC @mrocklin @jacobtomlinson

@kmpaul
Copy link
Collaborator Author

kmpaul commented Apr 20, 2022

Possible API changes

If we have to break backwards compatibility, then I want to suggest some other changes that might work with the suggested changes above.

With @joezuntz's changes to allow users to pass into Dask-MPI an existing MPI communicator object and turn off explicit "shutdown" of the Dask cluster, there is now a foreseeable mechanism for stopping/restarting a Dask-MPI cluster from within a client batch process. With my understanding, I think this requires 3 components:

  1. a Dask-MPI initialize() function that starts the scheduler and worker processes and subsequently block on all MPII ranks except the client rank (e.g., rank==1),
  2. a Dask-MPI is_client() function that returns True when the MPI rank is equal to the client rank, and
  3. a Dask-MPI finalize() function that stops the scheduler and worker processes.

The general outline of how to use these three functions together would be like so:

from dask_mpi import initialize, is_client, finalize

initialize()  # Blocks scheduler and worker MPI ranks HERE!!!

if is_client():
    # Do your client dask operations here
    # When the scheduler and worker MPI ranks unblock due to the finalize call below,
    # this section will be skipped by the scheduler and worker MPI ranks

finalize() # Everything after this can act like a normal mpi4py script

...This whole thing is starting to look to me like a context manager, no?

@jacobtomlinson
Copy link
Member

I wonder if using a context manager would feel more natural here.

def initialize(...):
    with Client() as c:
        ...

@kmpaul
Copy link
Collaborator Author

kmpaul commented Apr 25, 2022

@jacobtomlinson: Yeah. I was thinking of something like:

with MPICluster(...) as cluster, Client(cluster) as c:
    if cluster.is_client():
        ...

where the MPICluster(...) object would essentially be the initialize(...) method, except turned into a context manager with finalize() being called in __exit__().

...but I don't like the nested if pattern. It's fiddly and prone to errors, I think.

@jacobtomlinson
Copy link
Member

This is kind of where I was going with the MPIRunner class in dask/distributed#4710 and #69. The Cluster class doesn't quite line up with the use case here, given that the job and resources already exist and we are trying to populate them, rather than create them in the first place. This is why I tried to add Runner and a sibling to Cluster for this use case.

However @mrocklin suggested that Cluster would be a fine way to implement it and adding a new base class only increases complexity. So perhaps it would be good to resurrect #69 but base it on Cluster instead.

@zhiyuli
Copy link

zhiyuli commented May 14, 2022

same issue here. I will try python3.8 for now

@evgri243
Copy link

evgri243 commented May 26, 2022

Experiencing the same, python 3.8.

Can we think of some temporary workaround here? Maybe closing the MPI cluster manually after the c.shutdown or "request killing the workers/check worker count/repeat again if needed/close scheduler"? I do already implement exiting logic manually, so that I don't mind adding a few more lines...

@kmpaul
Copy link
Collaborator Author

kmpaul commented May 26, 2022

Yeah. If you have time for a PR, that would be great, @evgri243!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants