Skip to content

Commit

Permalink
Sequence fix
Browse files Browse the repository at this point in the history
  • Loading branch information
acockburn committed Jan 31, 2025
1 parent 93ff6e4 commit 6662afb
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 46 deletions.
40 changes: 18 additions & 22 deletions appdaemon/sequences.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async def add_sequences(self, sequences):
if sequence_namespace is not None:
attributes.update({"namespace": sequence_namespace})

if not self.AD.state.entity_exists("rules", entity):
if not await self.AD.state.entity_exists("rules", entity):
# it doesn't exist so add it
await self.AD.state.add_entity(
"rules",
Expand Down Expand Up @@ -83,7 +83,7 @@ async def remove_sequences(self, sequences):
await self.cancel_sequence(sequence)
await self.AD.state.remove_entity("rules", "sequence.{}".format(sequence))

async def run_sequence(self, _name: str, namespace: str, sequence: str | list[str]):
async def run_sequence(self, _name, namespace, sequence):
if isinstance(sequence, str):
if "." in sequence:
# the entity given
Expand Down Expand Up @@ -128,13 +128,13 @@ async def cancel_sequence(self, sequence):
self.AD.futures.cancel_futures(name)
await self.AD.state.set_state("_sequences", "rules", entity_id, state="idle")

async def prep_sequence(self, _name: str, namespace: str, sequence: str | list[str]):
async def prep_sequence(self, _name, namespace, sequence):
ephemeral_entity = False
loop = False

if isinstance(sequence, str):
entity_id = sequence
if self.AD.state.entity_exists("rules", entity_id) is False:
if await self.AD.state.entity_exists("rules", entity_id) is False:
self.logger.warning(
'Unknown sequence "%s" in run_sequence()', sequence)
return None
Expand All @@ -148,33 +148,20 @@ async def prep_sequence(self, _name: str, namespace: str, sequence: str | list[s
#
# Assume it's a list with the actual commands in it
#
assert isinstance(sequence, list) and all(
isinstance(s, str) for s in sequence)
entity_id = "sequence.{}".format(uuid.uuid4().hex)
# Create an ephemeral entity for it
ephemeral_entity = True

await self.AD.state.add_entity(
namespace="rules",
entity=entity_id,
state="idle",
attributes={"steps": sequence}
)
await self.AD.state.add_entity("rules", entity_id, "idle", attributes={"steps": sequence})

seq = sequence
ns = namespace

coro = await self.do_steps(ns, entity_id, seq, ephemeral_entity, loop)
return coro

async def do_steps(self,
namespace: str,
entity_id: str,
seq: str | list[str],
ephemeral_entity: bool = False,
loop: bool = False):
async def do_steps(self, namespace, entity_id, seq, ephemeral_entity, loop):
await self.AD.state.set_state("_sequences", "rules", entity_id, state="active")

try:
while True:
steps = copy.deepcopy(seq)
Expand Down Expand Up @@ -216,7 +203,7 @@ async def do_steps(self,
# now we create the wait entity object
entity_object = Entity(
self.logger, self.AD, name, ns, wait_entity)
if not entity_object.exists():
if not await entity_object.exists():
self.logger.warning(
f"Waiting for an entity {wait_entity}, in sequence {
entity_name}, that doesn't exist"
Expand All @@ -232,6 +219,7 @@ async def do_steps(self,

else:
domain, service = str.split(command, "/")
# parameters["__name"] = entity_id
loop_step = parameters.pop("loop_step", None)
params = copy.deepcopy(parameters)
await self.AD.services.call_service(ns, domain, service, entity_id, params)
Expand All @@ -242,12 +230,14 @@ async def do_steps(self,

if loop is not True:
break

except Exception:
self.logger.error("-" * 60)
self.logger.error("Unexpected error when attempting do_steps()")
self.logger.error("Unexpected error in do_steps()")
self.logger.error("-" * 60)
self.logger.error(traceback.format_exc())
self.logger.error("-" * 60)

finally:
await self.AD.state.set_state("_sequences", "rules", entity_id, state="idle")

Expand All @@ -272,7 +262,13 @@ async def loop_step(self, namespace: str, command: str, parameters: dict, loop_s

except Exception:
self.logger.error("-" * 60)
self.logger.error("Unexpected error when attempting to loop step")
self.logger.error("Unexpected error in loop_step()")
self.logger.error("-" * 60)
self.logger.error(traceback.format_exc())
self.logger.error("-" * 60)

#
# Placeholder for constraints
#
def list_constraints(self):
return []
69 changes: 45 additions & 24 deletions appdaemon/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ def register_service(self, namespace: str, domain: str, service: str, callback:
name = kwargs.get("__name")
# first we confirm if the namespace exists
if name and namespace not in self.AD.state.state:
raise NamespaceException(f"Namespace {namespace}, doesn't exist")
raise NamespaceException(
f"Namespace {namespace}, doesn't exist")

elif not callable(callback):
raise ValueError(f"The given callback {callback} is not a callable function")
raise ValueError(f"The given callback {
callback} is not a callable function")

if namespace not in self.services:
self.services[namespace] = {}
Expand All @@ -99,27 +101,32 @@ def register_service(self, namespace: str, domain: str, service: str, callback:
if service in self.services[namespace][domain]:
# there was a service already registered before
# so if a different app, we ask to deregister first
service_app = self.services[namespace][domain][service].get("__name")
service_app = self.services[namespace][domain][service].get(
"__name")
if service_app and service_app != name:
self.logger.warning(
f"This service '{domain}/{service}' already registered to a different app '{service_app}', and so cannot be registered to {name}. Do deregister from app first"
f"This service '{domain}/{service}' already registered to a different app '{
service_app}', and so cannot be registered to {name}. Do deregister from app first"
)
return

self.services[namespace][domain][service] = {"callback": callback, "__name": name, **kwargs}
self.services[namespace][domain][service] = {
"callback": callback, "__name": name, **kwargs}

if __silent is False:
data = {
"event_type": "service_registered",
"data": {"namespace": namespace, "domain": domain, "service": service},
}
self.AD.loop.create_task(self.AD.events.process_event(namespace, data))
self.AD.loop.create_task(
self.AD.events.process_event(namespace, data))

if name:
if name not in self.app_registered_services:
self.app_registered_services[name] = set()

self.app_registered_services[name].add(f"{namespace}:{domain}:{service}")
self.app_registered_services[name].add(
f"{namespace}:{domain}:{service}")

def deregister_service(self, namespace: str, domain: str, service: str, __name: str) -> bool:
"""Used to unregister a service"""
Expand All @@ -133,12 +140,14 @@ def deregister_service(self, namespace: str, domain: str, service: str, __name:
)

if __name not in self.app_registered_services:
raise ValueError(f"The given App {__name} has no services registered")
raise ValueError(f"The given App {
__name} has no services registered")

app_service = f"{namespace}:{domain}:{service}"

if app_service not in self.app_registered_services[__name]:
raise ValueError(f"The given App {__name} doesn't have the given service registered it")
raise ValueError(f"The given App {
__name} doesn't have the given service registered it")

# if it gets here, then time to deregister
with self.services_lock:
Expand All @@ -149,7 +158,8 @@ def deregister_service(self, namespace: str, domain: str, service: str, __name:
"event_type": "service_deregistered",
"data": {"namespace": namespace, "domain": domain, "service": service, "app": __name},
}
self.AD.loop.create_task(self.AD.events.process_event(namespace, data))
self.AD.loop.create_task(
self.AD.events.process_event(namespace, data))

# now check if that domain is empty
# if it is, remove it also
Expand Down Expand Up @@ -192,29 +202,37 @@ def list_services(self, ns: str = "global") -> list[dict[str, str]]:
]

async def call_service(
self,
namespace: str,
domain: str,
service: str,
name: str | None = None,
data: dict[str, Any] | None = None, # Don't expand with **data
) -> Any:
self,
namespace: str,
domain: str,
service: str,
name: str | None = None,
data: dict[str, Any] | None = None, # Don't expand with **data
) -> Any:
self.logger.debug(
"call_service: namespace=%s domain=%s service=%s data=%s",
namespace,
domain,
service,
data,
)

# data can be None, later on we assume it is not!
if data is None:
data = {}

with self.services_lock:
if namespace not in self.services:
raise NamespaceException(f"Unknown namespace {namespace} in call_service from {name}")
raise NamespaceException(f"Unknown namespace {
namespace} in call_service from {name}")

if domain not in self.services[namespace]:
raise DomainException(f"Unknown domain ({namespace}/{domain}) in call_service from {name}")
raise DomainException(
f"Unknown domain ({namespace}/{domain}) in call_service from {name}")

if service not in self.services[namespace][domain]:
raise ServiceException(f"Unknown service ({namespace}/{domain}/{service}) in call_service from {name}")
raise ServiceException(
f"Unknown service ({namespace}/{domain}/{service}) in call_service from {name}")

# If we have namespace in data it's an override for the domain of the eventual service call, as distinct
# from the namespace the call itself is executed from. e.g. set_state() is in the AppDaemon namespace but
Expand All @@ -230,9 +248,10 @@ async def call_service(
match isasync := service_def.pop("__async", 'auto'):
case 'auto':
# Remove any wrappers from the funcref before determining if it's async or not
isasync = asyncio.iscoroutinefunction(utils.unwrapped(funcref))
isasync = asyncio.iscoroutinefunction(
utils.unwrapped(funcref))
case bool():
pass # isasync already set as a bool from above
pass # isasync already set as a bool from above
case _:
raise TypeError(f'Invalid __async type: {isasync}')

Expand All @@ -247,9 +266,11 @@ async def call_service(
else:
# It's not a coroutine, run it in an executor
if use_dictionary_unpacking:
coro = utils.run_in_executor(self, funcref, ns, domain, service, **data)
coro = utils.run_in_executor(
self, funcref, ns, domain, service, **data)
else:
coro = utils.run_in_executor(self, funcref, ns, domain, service, data)
coro = utils.run_in_executor(
self, funcref, ns, domain, service, data)

@utils.warning_decorator(error_text=f"Unexpected error calling service {ns}/{domain}/{service}")
async def safe_service(self: 'Services'):
Expand Down

0 comments on commit 6662afb

Please sign in to comment.