Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bootc: Keep /usr read-only after transient transactions ("re-locking") #2195

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions dnf/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,37 +218,45 @@ def do_transaction(self, display=()):
logger.info(_("{prog} will only download packages, install gpg keys, and check the "
"transaction.").format(prog=dnf.util.MAIN_PROG_UPPER))

is_bootc_transaction = dnf.util._is_bootc_host() and \
is_bootc_transaction = dnf.util._BootcSystem.is_bootc_system() and \
os.path.realpath(self.conf.installroot) == "/" and \
not self.conf.downloadonly

# Handle bootc transactions. `--transient` must be specified if
# /usr is not already writeable.
bootc_system = None
if is_bootc_transaction:
if self.conf.persistence == "persist":
logger.info(_("Persistent transactions aren't supported on bootc systems."))
raise CliError(_("Operation aborted."))
assert self.conf.persistence in ("auto", "transient")
if not dnf.util._is_bootc_unlocked():

bootc_system = dnf.util._BootcSystem()

if not bootc_system.is_writable():
if self.conf.persistence == "auto":
logger.info(_("This bootc system is configured to be read-only. Pass --transient to "
"perform this and subsequent transactions in a transient overlay which "
"will reset when the system reboots."))
raise CliError(_("Operation aborted."))
assert self.conf.persistence == "transient"
logger.info(_("A transient overlay will be created on /usr that will be discarded on reboot. "
"Keep in mind that changes to /etc and /var will still persist, and packages "
"commonly modify these directories."))
bootc_unlock_requested = True
elif self.conf.persistence == "transient":
raise CliError(_("Transient transactions are only supported on bootc systems."))
if not bootc_system.is_unlocked_transient():
# Only tell the user about the transient overlay if
# it's not already in place
logger.info(_("A transient overlay will be created on /usr that will be discarded on reboot. "
"Keep in mind that changes to /etc and /var will still persist, and packages "
"commonly modify these directories."))
else:
# Not a bootc transaction.
if self.conf.persistence == "transient":
raise CliError(_("Transient transactions are only supported on bootc systems."))

if self._promptWanted():
if self.conf.assumeno or not self.output.userconfirm():
raise CliError(_("Operation aborted."))

if bootc_unlock_requested:
dnf.util._bootc_unlock()
if bootc_system:
bootc_system.make_writable()
else:
logger.info(_('Nothing to do.'))
return
Expand Down
134 changes: 106 additions & 28 deletions dnf/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .pycomp import PY3, basestring
from dnf.i18n import _, ucd
import argparse
import ctypes
import dnf
import dnf.callback
import dnf.const
Expand Down Expand Up @@ -642,33 +643,110 @@ def _is_file_pattern_present(specs):
return False


def _is_bootc_host():
"""Returns true is the system is managed as an immutable container, false
otherwise."""
ostree_booted = "/run/ostree-booted"
return os.path.isfile(ostree_booted)


def _is_bootc_unlocked():
"""Check whether /usr is writeable, e.g. if we are in a normal mutable
system or if we are in a bootc after `bootc usr-overlay` or `ostree admin
unlock` was run."""
class _BootcSystem:
usr = "/usr"
return os.access(usr, os.W_OK)


def _bootc_unlock():
"""Set up a writeable overlay on bootc systems."""
CLONE_NEWNS = 0x00020000 # defined in linux/include/uapi/linux/sched.h

if _is_bootc_unlocked():
return

unlock_command = ["bootc", "usr-overlay"]

try:
completed_process = subprocess.run(unlock_command, text=True)
completed_process.check_returncode()
except FileNotFoundError:
raise dnf.exceptions.Error(_("bootc command not found. Is this a bootc system?"))
except subprocess.CalledProcessError:
raise dnf.exceptions.Error(_("Failed to unlock system: %s", completed_process.stderr))
def __init__(self):
if not self.is_bootc_system():
raise RuntimeError(_("Not running on a bootc system."))

import gi
self._gi = gi

gi.require_version("OSTree", "1.0")
from gi.repository import OSTree

self._OSTree = OSTree

self._sysroot = self._OSTree.Sysroot.new_default()
assert self._sysroot.load(None)

self._booted_deployment = self._sysroot.require_booted_deployment()
assert self._booted_deployment is not None

@staticmethod
def is_bootc_system():
"""Returns true is the system is managed as an immutable container, false
otherwise."""
ostree_booted = "/run/ostree-booted"
return os.path.isfile(ostree_booted)

@classmethod
def is_writable(cls):
"""Returns true if and only if /usr is writable."""
return os.access(cls.usr, os.W_OK)

def _get_unlocked_state(self):
return self._booted_deployment.get_unlocked()

def is_unlocked_transient(self):
"""Returns true if and only if the bootc system is unlocked in a
transient state, i.e. a overlayfs is mounted as read-only on /usr.
Changes can be made to the overlayfs by remounting /usr as
read/write in a private mount namespace."""
return self._get_unlocked_state() == self._OSTree.DeploymentUnlockedState.TRANSIENT

@classmethod
def _set_up_mountns(cls):
# os.unshare is only available in Python >= 3.12.

# Access symbols in libraries loaded by the Python interpreter,
# which will include libc. See https://bugs.python.org/issue34592.
libc = ctypes.CDLL(None)
if libc.unshare(cls.CLONE_NEWNS) != 0:
raise OSError("Failed to unshare mount namespace")

mount_command = ["mount", "--options-source=disable", "-o", "remount,rw", cls.usr]
try:
completed_process = subprocess.run(mount_command, text=True)
completed_process.check_returncode()
except FileNotFoundError:
raise dnf.exceptions.Error(_("%s: command not found.") % mount_command[0])
except subprocess.CalledProcessError:
raise dnf.exceptions.Error(_("Failed to mount %s as read/write: %s", cls.usr, completed_process.stderr))

@staticmethod
def _unlock():
unlock_command = ["ostree", "admin", "unlock", "--transient"]
try:
completed_process = subprocess.run(unlock_command, text=True)
completed_process.check_returncode()
except FileNotFoundError:
raise dnf.exceptions.Error(_("%s: command not found. Is this a bootc system?") % unlock_command[0])
except subprocess.CalledProcessError:
raise dnf.exceptions.Error(_("Failed to unlock system: %s", completed_process.stderr))

def make_writable(self):
"""Set up a writable overlay on bootc systems."""

bootc_unlocked_state = self._get_unlocked_state()

valid_bootc_unlocked_states = (
self._OSTree.DeploymentUnlockedState.NONE,
self._OSTree.DeploymentUnlockedState.DEVELOPMENT,
self._OSTree.DeploymentUnlockedState.TRANSIENT,
self._OSTree.DeploymentUnlockedState.HOTFIX,
)
if bootc_unlocked_state not in valid_bootc_unlocked_states:
raise ValueError(_("Unhandled bootc unlocked state: %s") % bootc_unlocked_state.value_nick)

writable_unlocked_states = (
self._OSTree.DeploymentUnlockedState.DEVELOPMENT,
self._OSTree.DeploymentUnlockedState.HOTFIX,
)
if bootc_unlocked_state in writable_unlocked_states:
# System is already unlocked in development mode, and usr is
# already mounted read/write.
pass
elif bootc_unlocked_state == self._OSTree.DeploymentUnlockedState.NONE:
# System is not unlocked. Unlock it in transient mode, then set up
# a mount namespace for DNF.
self._unlock()
self._set_up_mountns()
elif bootc_unlocked_state == self._OSTree.DeploymentUnlockedState.TRANSIENT:
# System is unlocked in transient mode, so usr is mounted
# read-only. Set up a mount namespace for DNF.
self._set_up_mountns()

assert os.access(self.usr, os.W_OK)
Loading