Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

magicbot: Add tunable update listener method #136

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion magicbot/magic_tunable.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools
import inspect
import warnings
from typing import Generic, Optional, TypeVar, overload
from typing import Callable, Generic, Optional, TypeVar, overload

from networktables import NetworkTables, Value

Expand Down Expand Up @@ -66,6 +66,7 @@ def execute(self):
# "__doc__",
"_mkv",
"_nt",
"_update_cb",
)

def __init__(
Expand All @@ -85,6 +86,7 @@ def __init__(
d = Value.makeValue(default)
self._mkv = Value.getFactoryByType(d.type())
# self.__doc__ = doc
self._update_cb = None

@overload
def __get__(self, instance: None, owner=None) -> "tunable":
Expand All @@ -102,6 +104,31 @@ def __get__(self, instance, owner=None):
def __set__(self, instance, value: V) -> None:
instance._tunables[self].setValue(self._mkv(value))

def set_callback(self, callback: Callable[[V], None]) -> None:
"""
Set a method to be called when the tunable is updated over NetworkTables.
auscompgeek marked this conversation as resolved.
Show resolved Hide resolved

This can be useful, for example, for changing PID gains on a
motor controller on the fly::

class Component:
pid: ...

kP = tunable(0.01)

@kP.set_callback
def set_kP(self, value: float) -> None:
self.pid.setP(value)

.. note::
The callback will be called on the NetworkTables I/O thread
(not the main robot thread).

.. warning::
This only supports instance methods on the same object as the tunable.
"""
self._update_cb = callback


def setup_tunables(component, cname: str, prefix: Optional[str] = "components") -> None:
"""
Expand Down Expand Up @@ -142,6 +169,13 @@ def setup_tunables(component, cname: str, prefix: Optional[str] = "components")
)
tunables[prop] = ntvalue

if prop._update_cb:
prop._nt._api.addEntryListenerById(
ntvalue._local_id,
lambda notif, cb=prop._update_cb: cb(component, notif.value.value),
NetworkTables.NotifyFlags.UPDATE,
)

component._tunables = tunables


Expand Down