Skip to content

Commit

Permalink
Merge pull request #6 from Azureblade3808/v0.1.5
Browse files Browse the repository at this point in the history
  • Loading branch information
Azureblade3808 authored Jul 25, 2024
2 parents ff107f9 + db9956f commit 0d920d3
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 225 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "runtime-keypath"
version = "0.1.4"
version = "0.1.5"
authors = [{ name = "Chris Fu", email = "17433201@qq.com" }]
description = "Supports runtime key-path recording/accessing for Python."
classifiers = [
Expand All @@ -17,6 +17,7 @@ classifiers = [
readme = "README.md"
license = { file = "LICENSE" }
requires-python = ">=3.8"
dependencies = ["typing_extensions >= 4.9"]

[project.urls]
Homepage = "https://github.com/Azureblade3808/py-runtime-keypath"
Expand Down
186 changes: 101 additions & 85 deletions runtime_keypath/_core.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,82 @@
# pyright: reportImplicitOverride=false


from __future__ import annotations

__all__ = [
"KeyPath",
"KeyPathSupporting",
]

# region[Keywords]

from typing import TYPE_CHECKING, Final, Generic, Protocol, TypeVar, cast, final
import threading
from collections.abc import Sequence

# endregion[Keywords]
from typing_extensions import (
TYPE_CHECKING,
Any,
Final,
Generic,
Protocol,
TypeVar,
cast,
deprecated,
final,
override,
)

# region[Types]
_Value_co = TypeVar("_Value_co", covariant=True)
_Value_0 = TypeVar("_Value_0")

if TYPE_CHECKING:
from typing import Any, Sequence
_MISSING = cast("Any", object())

# endregion[Types]

import threading
from dataclasses import dataclass, field
from typing import NamedTuple
class KeyPathSupporting:
"""
A base class that indicates an object can be used as a chain in
`KeyPath.of(...)` call.
"""

_Value_t = TypeVar("_Value_t")
# ! This method is intentially not named as `__getattribute__`. See below for
# ! reason.
def _(self, key: str, /) -> Any:
try:
recorder = _thread_local.recorder
except AttributeError:
# There is no recorder, which means that `KeyPath.of` is not being called.
# So we don't need to record this key.
return super().__getattribute__(key)

if recorder.busy:
# The recorder is busy, which means that another member is being accessed,
# typically because the computation of that member is dependent on this one.
# So we don't need to record this key.
return super().__getattribute__(key)

class _ThreadLocalProtocol(Protocol):
recorder: _KeyPathRecorder
"""
The active key-path recorder for this thread. May not exist.
"""
recorder.busy = True

if recorder.start is not _MISSING and recorder.end is not self:
raise RuntimeError(
" ".join(
[
"Key-path is broken. Check if there is something that does NOT",
"support key-paths in the member chain.",
]
)
)

_thread_local = cast("_ThreadLocalProtocol", threading.local())
value = super().__getattribute__(key)

recorder.busy = False
if recorder.start is _MISSING:
recorder.start = self
recorder.end = value
recorder.key_list.append(key)

@final
class _Terminals(NamedTuple):
start: Any
end: Any
return value

# ! `__getattribute__(...)` is declared against `TYPE_CHECKING`, so that unknown
# ! attributes on conforming classes won't be regarded as known by type-checkers.
if not TYPE_CHECKING:
__getattribute__ = _

@final
@dataclass
class _KeyPathRecorder:
terminals: _Terminals | None = None
key_list: list[str] = field(default_factory=list)
busy: bool = False
del _


# ! A metaclass is made for class `KeyPath`, and `KeyPath.of` is provided as a property
Expand All @@ -59,6 +86,7 @@ class _KeyPathRecorder:
class _KeyPathMeta(type):
@property
def of(self, /) -> _KeyPathOfFunction:
# ! Docstring here is for Pylance hint.
"""
Returns the key-path for accessing a certain value from a target
object with a key sequence such as `a.b.c`.
Expand Down Expand Up @@ -128,6 +156,7 @@ def of(self, /) -> _KeyPathOfFunction:
# ! exception occurred during the key-path access, there would still be a chance to
# ! perform some finalization.
class _KeyPathOfFunction:
# ! Docstring here is for runtime help.
"""
Returns the key-path for accessing a certain value from a target
object with a key sequence such as `a.b.c`.
Expand Down Expand Up @@ -174,7 +203,7 @@ class _KeyPathOfFunction:

__invoked: bool = False

def __call__(self, value: _Value_t, /) -> KeyPath[_Value_t]:
def __call__(self, value: _Value_0, /) -> KeyPath[_Value_0]:
self.__invoked = True

try:
Expand All @@ -193,16 +222,16 @@ def __call__(self, value: _Value_t, /) -> KeyPath[_Value_t]:

assert not recorder.busy

terminals = recorder.terminals
start = recorder.start
key_list = recorder.key_list
if terminals is None:
if start is _MISSING:
assert len(key_list) == 0

raise RuntimeError("No key has been recorded.")
else:
assert len(key_list) > 0

if terminals.end is not value:
if recorder.end is not value:
raise RuntimeError(
" ".join(
[
Expand All @@ -212,7 +241,7 @@ def __call__(self, value: _Value_t, /) -> KeyPath[_Value_t]:
)
)

key_path = KeyPath(terminals.start, key_list)
key_path = KeyPath(start, key_list)
return key_path

def __del__(self, /) -> None:
Expand All @@ -224,7 +253,11 @@ def __del__(self, /) -> None:


@final
class KeyPath(Generic[_Value_t], metaclass=_KeyPathMeta):
class KeyPath(Generic[_Value_co], metaclass=_KeyPathMeta):
"""
An object that stands for a member chain from a base object.
"""

__target: Final[Any]
__keys: Final[Sequence[str]]

Expand All @@ -245,82 +278,65 @@ def target(self, /) -> Any:
def keys(self, /) -> Sequence[str]:
return self.__keys

def get(self, /) -> _Value_t:
def get(self, /) -> _Value_co:
value = self.__target
for key in self.__keys:
value = getattr(value, key)
return value

def set(self, value: _Value_t, /) -> None:
def unsafe_set(self: KeyPath[_Value_0], value: _Value_0, /) -> None:
target = self.__target
keys = self.__keys
n = len(keys) - 1
for i in range(n):
i_last_key = len(keys) - 1
for i in range(i_last_key):
target = getattr(target, keys[i])
setattr(target, keys[n], value)
setattr(target, keys[i_last_key], value)

@deprecated("`KeyPath.set` is deprecated. Use `KeyPath.unsafe_set` instead.")
def set(self: KeyPath[_Value_0], value: _Value_0, /) -> None:
return self.unsafe_set(value)

@override
def __hash__(self, /) -> int:
return hash((self.target, self.keys))

@override
def __eq__(self, other: object, /) -> bool:
return (
isinstance(other, KeyPath)
and self.target is other.target
and self.keys == other.keys
)

@override
def __repr__(self, /) -> str:
return f"{KeyPath.__name__}(target={self.target!r}, keys={self.keys!r})"

def __call__(self, /) -> _Value_t:
def __call__(self, /) -> _Value_co:
return self.get()


class KeyPathSupporting:
# ! This method is purposedly not named as `__getattribute__`. See below for reason.
def __getattribute_0__(self, key: str, /) -> Any:
try:
recorder = _thread_local.recorder
except AttributeError:
# There is no recorder, which means that `KeyPath.of` is not being called.
# So we don't need to record this key.
return super().__getattribute__(key)

if recorder.busy:
# The recorder is busy, which means that another member is being accessed,
# typically because the computation of that member is dependent on this one.
# So we don't need to record this key.
return super().__getattribute__(key)

recorder.busy = True

terminals = recorder.terminals
if terminals is not None and terminals.end is not self:
raise RuntimeError(
" ".join(
[
"Key-path is broken. Check if there is something that does NOT",
"support key-paths in the member chain.",
]
)
)
class _ThreadLocalProtocol(Protocol):
recorder: _KeyPathRecorder
"""
The active key-path recorder for this thread. May not exist.
"""

value = super().__getattribute__(key)

if terminals is None:
terminals = _Terminals(self, value)
else:
terminals = terminals._replace(end=value)
_thread_local = cast("_ThreadLocalProtocol", threading.local())

recorder.terminals = terminals
recorder.key_list.append(key)
recorder.busy = False

return value
@final
class _KeyPathRecorder:
__slots__ = ("busy", "start", "end", "key_list")

# ! `__getattribute__(...)` is declared against `TYPE_CHECKING`, so that unknown
# ! attributes on conforming classes won't be regarded as known by type-checkers.
if not TYPE_CHECKING:
__getattribute__ = __getattribute_0__
busy: bool
start: Any
end: Any
key_list: list[str]

del __getattribute_0__
def __init__(self, /) -> None:
self.busy = False
self.start = _MISSING
self.end = _MISSING
self.key_list = []
Loading

0 comments on commit 0d920d3

Please sign in to comment.