From 1cf712cfac58a1e3beef8e108c0f2bd76487539e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 20 Aug 2023 16:22:46 -0400 Subject: [PATCH 1/5] Add `Arbiter.delete_sockaddr()` to remove addrs Since stale addrs can be leaked where the actor transport server task crashes but doesn't (successfully) unregister from the registrar, we need a remote way to remove such entries; hence this new (registrar) method. To implement this make use of the `bidict` lib for the `._registry` table thus making it super simple to do reverse uuid lookups from an input socket-address. --- setup.py | 3 +++ tractor/_runtime.py | 17 +++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index d26deb9b..4cefe676 100755 --- a/setup.py +++ b/setup.py @@ -41,6 +41,9 @@ ], install_requires=[ + # discovery subsys + 'bidict', + # trio related # proper range spec: # https://packaging.python.org/en/latest/discussions/install-requires-vs-requirements/#id5 diff --git a/tractor/_runtime.py b/tractor/_runtime.py index c9e4bfe1..67bb0c58 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -40,6 +40,7 @@ import warnings from async_generator import aclosing +from bidict import bidict from exceptiongroup import BaseExceptionGroup import trio # type: ignore from trio_typing import TaskStatus @@ -1774,10 +1775,10 @@ class Arbiter(Actor): def __init__(self, *args, **kwargs) -> None: - self._registry: dict[ + self._registry: bidict[ tuple[str, str], tuple[str, int], - ] = {} + ] = bidict({}) self._waiters: dict[ str, # either an event to sync to receiving an actor uid (which @@ -1871,3 +1872,15 @@ async def unregister_actor( entry: tuple = self._registry.pop(uid, None) if entry is None: log.warning(f'Request to de-register {uid} failed?') + + + async def delete_sockaddr( + self, + sockaddr: tuple[str, int], + + ) -> tuple[str, str]: + uid: tuple = self._registry.inverse.pop(sockaddr) + log.warning( + f'Deleting registry entry for {sockaddr}@{uid}!' + ) + return uid From d83d991f21d33df641d880232ef1625724caa2eb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Aug 2023 11:26:36 -0400 Subject: [PATCH 2/5] Handle stale registrar entries; detect and delete In cases where an actor's transport server task (by default handling new TCP connections) terminates early but does not de-register from the pertaining registry (aka the registrar) actor's address table, the trying-to-connect client actor will get a connection error on that address. In the case where client handles a (local) `OSError` (meaning the target actor address is likely being contacted over `localhost`) exception, make a further call to the registrar to delete the stale entry and `yield None` gracefully indicating to calling code that no `Portal` can be delivered to the target address. This issue was originally discovered in `piker` where the `emsd` (clearing engine) actor would sometimes crash on rapid client re-connects and then leave a `pikerd` stale entry. With this fix new clients will attempt connect via an endpoint which will re-spawn the `emsd` when a `None` portal is delivered (via `maybe_spawn_em()`). --- tractor/__init__.py | 2 ++ tractor/_discovery.py | 68 +++++++++++++++++++++++++++++++++---------- 2 files changed, 55 insertions(+), 15 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index c653ec05..eba0b454 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -31,6 +31,7 @@ ) from ._discovery import ( get_arbiter, + get_registrar, find_actor, wait_for_actor, query_actor, @@ -77,6 +78,7 @@ 'find_actor', 'query_actor', 'get_arbiter', + 'get_registrar', 'is_root_process', 'msg', 'open_actor_cluster', diff --git a/tractor/_discovery.py b/tractor/_discovery.py index 03775ac2..d23118d5 100644 --- a/tractor/_discovery.py +++ b/tractor/_discovery.py @@ -35,7 +35,7 @@ @acm -async def get_arbiter( +async def get_registrar( host: str, port: int, @@ -56,11 +56,14 @@ async def get_arbiter( # (likely a re-entrant call from the arbiter actor) yield LocalPortal(actor, Channel((host, port))) else: - async with _connect_chan(host, port) as chan: + async with ( + _connect_chan(host, port) as chan, + open_portal(chan) as arb_portal, + ): + yield arb_portal - async with open_portal(chan) as arb_portal: - yield arb_portal +get_arbiter = get_registrar @acm @@ -101,7 +104,10 @@ async def query_actor( # TODO: return portals to all available actors - for now just # the last one that registered - if name == 'arbiter' and actor.is_arbiter: + if ( + name == 'arbiter' + and actor.is_arbiter + ): raise RuntimeError("The current actor is the arbiter") yield sockaddr if sockaddr else None @@ -112,7 +118,7 @@ async def find_actor( name: str, arbiter_sockaddr: tuple[str, int] | None = None -) -> AsyncGenerator[Optional[Portal], None]: +) -> AsyncGenerator[Portal | None, None]: ''' Ask the arbiter to find actor(s) by name. @@ -120,17 +126,49 @@ async def find_actor( known to the arbiter. ''' - async with query_actor( - name=name, - arbiter_sockaddr=arbiter_sockaddr, - ) as sockaddr: + actor = current_actor() + async with get_arbiter( + *arbiter_sockaddr or actor._arb_addr + ) as arb_portal: + + sockaddr = await arb_portal.run_from_ns( + 'self', + 'find_actor', + name=name, + ) + + # TODO: return portals to all available actors - for now just + # the last one that registered + if ( + name == 'arbiter' + and actor.is_arbiter + ): + raise RuntimeError("The current actor is the arbiter") if sockaddr: - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield None + try: + async with _connect_chan(*sockaddr) as chan: + async with open_portal(chan) as portal: + yield portal + return + + # most likely we were unable to connect the + # transport and there is likely a stale entry in + # the registry actor's table, thus we need to + # instruct it to clear that stale entry and then + # more silently (pretend there was no reason but + # to) indicate that the target actor can't be + # contacted at that addr. + except OSError: + # NOTE: ensure we delete the stale entry from the + # registar actor. + uid: tuple[str, str] = await arb_portal.run_from_ns( + 'self', + 'delete_sockaddr', + sockaddr=sockaddr, + ) + + yield None @acm From 687852f368257dc5b4870ad8112bb9a7fd3bff5c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Aug 2023 12:20:12 -0400 Subject: [PATCH 3/5] Add stale entry deleted from registrar test By spawning an actor task that immediately shuts down the transport server and then sleeps, verify that attempting to connect via the `._discovery.find_actor()` helper delivers `None` for the `Portal` value. Relates to #184 and #216 --- tests/conftest.py | 3 +- tests/test_discovery.py | 136 +++++++++++++++++++++++++++++----------- 2 files changed, 100 insertions(+), 39 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 3363cf56..c14cd0d6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -219,7 +219,8 @@ def daemon( arb_addr: tuple[str, int], ): ''' - Run a daemon actor as a "remote arbiter". + Run a daemon actor as a "remote registrar" and/or plain ol + separate actor (service) tree. ''' if loglevel in ('trace', 'debug'): diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 8ba4ebee..95cd985d 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,6 +1,7 @@ -""" -Actor "discovery" testing -""" +''' +Discovery subsystem via a "registrar" actor scenarios. + +''' import os import signal import platform @@ -127,7 +128,10 @@ async def unpack_reg(actor_or_portal): else: msg = await actor_or_portal.run_from_ns('self', 'get_registry') - return {tuple(key.split('.')): val for key, val in msg.items()} + return { + tuple(key.split('.')): val + for key, val in msg.items() + } async def spawn_and_check_registry( @@ -283,37 +287,41 @@ async def close_chans_before_nursery( async with tractor.open_nursery() as tn: portal1 = await tn.start_actor( - name='consumer1', enable_modules=[__name__]) + name='consumer1', + enable_modules=[__name__], + ) portal2 = await tn.start_actor( - 'consumer2', enable_modules=[__name__]) - - # TODO: compact this back as was in last commit once - # 3.9+, see https://github.com/goodboy/tractor/issues/207 - async with portal1.open_stream_from( - stream_forever - ) as agen1: - async with portal2.open_stream_from( + 'consumer2', + enable_modules=[__name__], + ) + + async with ( + portal1.open_stream_from( stream_forever - ) as agen2: - async with trio.open_nursery() as n: - n.start_soon(streamer, agen1) - n.start_soon(cancel, use_signal, .5) - try: - await streamer(agen2) - finally: - # Kill the root nursery thus resulting in - # normal arbiter channel ops to fail during - # teardown. It doesn't seem like this is - # reliably triggered by an external SIGINT. - # tractor.current_actor()._root_nursery.cancel_scope.cancel() - - # XXX: THIS IS THE KEY THING that - # happens **before** exiting the - # actor nursery block - - # also kill off channels cuz why not - await agen1.aclose() - await agen2.aclose() + ) as agen1, + portal2.open_stream_from( + stream_forever + ) as agen2, + ): + async with trio.open_nursery() as n: + n.start_soon(streamer, agen1) + n.start_soon(cancel, use_signal, .5) + try: + await streamer(agen2) + finally: + # Kill the root nursery thus resulting in + # normal arbiter channel ops to fail during + # teardown. It doesn't seem like this is + # reliably triggered by an external SIGINT. + # tractor.current_actor()._root_nursery.cancel_scope.cancel() + + # XXX: THIS IS THE KEY THING that + # happens **before** exiting the + # actor nursery block + + # also kill off channels cuz why not + await agen1.aclose() + await agen2.aclose() finally: with trio.CancelScope(shield=True): await trio.sleep(1) @@ -331,10 +339,12 @@ def test_close_channel_explicit( use_signal, arb_addr, ): - """Verify that closing a stream explicitly and killing the actor's + ''' + Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also results in subactor(s) deregistering from the arbiter. - """ + + ''' with pytest.raises(KeyboardInterrupt): trio.run( partial( @@ -347,16 +357,18 @@ def test_close_channel_explicit( @pytest.mark.parametrize('use_signal', [False, True]) -def test_close_channel_explicit_remote_arbiter( +def test_close_channel_explicit_remote_registrar( daemon, start_method, use_signal, arb_addr, ): - """Verify that closing a stream explicitly and killing the actor's + ''' + Verify that closing a stream explicitly and killing the actor's "root nursery" **before** the containing nursery tears down also results in subactor(s) deregistering from the arbiter. - """ + + ''' with pytest.raises(KeyboardInterrupt): trio.run( partial( @@ -366,3 +378,51 @@ def test_close_channel_explicit_remote_arbiter( remote_arbiter=True, ), ) + + +@tractor.context +async def kill_transport( + ctx: tractor.Context, +) -> None: + + await ctx.started() + actor: tractor.Actor = tractor.current_actor() + actor.cancel_server() + await trio.sleep_forever() + + + +# @pytest.mark.parametrize('use_signal', [False, True]) +def test_stale_entry_is_deleted( + daemon, + start_method, + arb_addr, +): + ''' + Ensure that when a stale entry is detected in the registrar's table + that the `find_actor()` API takes care of deleting the stale entry + and not delivering a bad portal. + + ''' + async def main(): + + name: str = 'transport_fails_actor' + regport: tractor.Portal + tn: tractor.ActorNursery + async with ( + tractor.open_nursery() as tn, + tractor.get_registrar(*arb_addr) as regport, + ): + ptl: tractor.Portal = await tn.start_actor( + name, + enable_modules=[__name__], + ) + async with ptl.open_context( + kill_transport, + ) as (first, ctx): + async with tractor.find_actor(name) as maybe_portal: + assert maybe_portal is None + + await ptl.cancel_actor() + + trio.run(main) From 3dc57e384efa826162cd28d3f7362f80aa66cde5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Sep 2023 14:20:12 -0400 Subject: [PATCH 4/5] Always no-raise try-to-pop registry addrs --- tractor/_runtime.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 67bb0c58..143772d1 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -1879,8 +1879,16 @@ async def delete_sockaddr( sockaddr: tuple[str, int], ) -> tuple[str, str]: - uid: tuple = self._registry.inverse.pop(sockaddr) - log.warning( - f'Deleting registry entry for {sockaddr}@{uid}!' + uid: tuple | None = self._registry.inverse.pop( + sockaddr, + None, ) + if uid: + log.warning( + f'Deleting registry entry for {sockaddr}@{uid}!' + ) + else: + log.warning( + f'No registry entry for {sockaddr}@{uid}!' + ) return uid From 3a31c9d3388b28277bc9af7f3aca22217431072a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 26 Sep 2023 10:32:01 -0400 Subject: [PATCH 5/5] to_asyncio: mask error logging, not sure it adds that much --- tractor/to_asyncio.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tractor/to_asyncio.py b/tractor/to_asyncio.py index 788181e6..cd10ee6d 100644 --- a/tractor/to_asyncio.py +++ b/tractor/to_asyncio.py @@ -216,7 +216,7 @@ async def wait_on_coro_final_result( try: result = await coro except BaseException as aio_err: - log.exception('asyncio task errored') + # log.exception('asyncio task errored:') chan._aio_err = aio_err raise @@ -300,7 +300,7 @@ def cancel_trio(task: asyncio.Task) -> None: elif task_err is None: assert aio_err aio_err.with_traceback(aio_err.__traceback__) - log.error('infected task errorred') + # log.error('infected task errorred') # XXX: alway cancel the scope on error # in case the trio task is blocking @@ -356,7 +356,7 @@ def maybe_raise_aio_err( # relay cancel through to called ``asyncio`` task assert chan._aio_task chan._aio_task.cancel( - msg=f'the `trio` caller task was cancelled: {trio_task.name}' + msg=f'`trio`-side caller task cancelled: {trio_task.name}' ) raise @@ -366,7 +366,7 @@ def maybe_raise_aio_err( trio.ClosedResourceError, # trio.BrokenResourceError, ): - aio_err = chan._aio_err + aio_err: BaseException = chan._aio_err if ( task.cancelled() and type(aio_err) is CancelledError