Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a @pub kwarg to allow specifying a "startup response message" #189

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

setup(
name="tractor",
version='0.1.0a0', # first ever alpha
version='0.1.0a1', # first ever alpha
description='structured concurrrent "actors"',
long_description=readme,
license='GPLv3',
Expand Down
20 changes: 16 additions & 4 deletions tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ def _get_mod_abspath(module):
return os.path.abspath(module.__file__)


# process-global stack closed at end on actor runtime teardown
_lifetime_stack: ExitStack = ExitStack()


class Actor:
"""The fundamental concurrency primitive.

Expand All @@ -192,7 +196,6 @@ class Actor:
_root_n: Optional[trio.Nursery] = None
_service_n: Optional[trio.Nursery] = None
_server_n: Optional[trio.Nursery] = None
_lifetime_stack: ExitStack = ExitStack()

# Information about `__main__` from parent
_parent_main_data: Dict[str, str]
Expand Down Expand Up @@ -545,8 +548,9 @@ async def _process_messages(
# deadlock and other weird behaviour)
if func != self.cancel:
if isinstance(cs, Exception):
log.warning(f"Task for RPC func {func} failed with"
f"{cs}")
log.warning(
f"Task for RPC func {func} failed with"
f"{cs}")
else:
# mark that we have ongoing rpc tasks
self._ongoing_rpc_tasks = trio.Event()
Expand Down Expand Up @@ -784,7 +788,7 @@ async def _async_main(
# tear down all lifetime contexts if not in guest mode
# XXX: should this just be in the entrypoint?
log.warning("Closing all actor lifetime contexts")
self._lifetime_stack.close()
_lifetime_stack.close()

# Unregister actor from the arbiter
if registered_with_arbiter and (
Expand Down Expand Up @@ -858,6 +862,14 @@ async def _serve_forever(
# signal the server is down since nursery above terminated
self._server_down.set()

def cancel_soon(self) -> None:
"""Cancel this actor asap; can be called from a sync context.

Schedules `.cancel()` to be run immediately just like when
cancelled by the parent.
"""
self._service_n.start_soon(self.cancel)

async def cancel(self) -> bool:
"""Cancel this actor.

Expand Down
1 change: 1 addition & 0 deletions tractor/_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ async def _bp():
# may have the tty locked prior
if _debug_lock.locked(): # root process already has it; ignore
return

await _debug_lock.acquire()
_pdb_release_hook = _debug_lock.release

Expand Down
8 changes: 7 additions & 1 deletion tractor/msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def pub(
wrapped: typing.Callable = None,
*,
tasks: Set[str] = set(),
send_on_connect: Any = None,
):
"""Publisher async generator decorator.

Expand Down Expand Up @@ -182,7 +183,7 @@ async def pub_service(get_topics):

# handle the decorator not called with () case
if wrapped is None:
return partial(pub, tasks=tasks)
return partial(pub, tasks=tasks, send_on_connect=send_on_connect)

task2lock: Dict[str, trio.StrictFIFOLock] = {}

Expand Down Expand Up @@ -225,6 +226,11 @@ async def _execute(

try:
modify_subs(topics2ctxs, topics, ctx)

# if specified send the startup message back to consumer
if send_on_connect is not None:
await ctx.send_yield(send_on_connect)

# block and let existing feed task deliver
# stream data until it is cancelled in which case
# the next waiting task will take over and spawn it again
Expand Down