Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fuse unmount #112

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 93 additions & 28 deletions src/higlass/fuse/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
from __future__ import annotations

import atexit
import logging
import multiprocessing as mp
import pathlib
import platform
import subprocess
import time
import warnings
from urllib.parse import urlparse

logger = logging.getLogger("hg.fuse")
logger = logging.getLogger("higlass.fuse")

__all__ = [
"fuse",
"FuseProcess",
]

OS_NAME = platform.system()


class FuseProcess:
_mnt_name = "schemas"
Expand All @@ -30,44 +36,55 @@ def start(self, tmp_dir: str | pathlib.Path):
'Install "fusepy" and "simple-httpfs" to enable FUSE.'
) from e

# no need to restart
tmp_dir = pathlib.Path(tmp_dir).absolute()
if self._fuse_process and tmp_dir == self._tmp_dir:
if not tmp_dir.is_dir():
raise NotADirectoryError(f"Mount dir doesn't exist: {tmp_dir}")

mount_point = tmp_dir / self._mnt_name
disk_cache_dir = tmp_dir / self._dircache_name

# No need to restart if already running and mounted in same directory
if self._fuse_process and tmp_dir == self._tmp_dir and mount_point.is_mount():
logger.debug("Skipping start. FUSE running in same directory %s", tmp_dir)
return

self.stop()

assert tmp_dir.is_dir(), f"mount dir doesn't exist: {tmp_dir}"

mount_point = tmp_dir / self._mnt_name
disk_cache_dir = tmp_dir / self._dircache_name

if not mount_point.exists():
mount_point.mkdir()

if not disk_cache_dir.exists():
disk_cache_dir.mkdir()

logger.info("Starting FUSE mount at %s", mount_point)

args = (str(mount_point) + "/", str(disk_cache_dir) + "/")
self._fuse_process = mp.Process(target=run, args=args, daemon=True)
self._fuse_process.start()

max_iters = 10
for i in range(max_iters):
# wait until http is mounted
if (mount_point / "http").exists():
break

if i == max_iters - 1:
self.stop()
raise RuntimeError("Failed to setup FUSE")

time.sleep(0.5)

self._tmp_dir = tmp_dir
if mount_point.is_mount():
logger.info(f"FUSE already mounted at {mount_point}")

warnings.warn(
f"Skipping FUSE mount: {mount_point} already mounted. "
"If you wish to remount call `fuse.unmount()` and "
"`fuse.start(...)` again. Call `fuse.start(...)` with "
"a different `tmp_dir` to mount in a different location."
)
self._tmp_dir = tmp_dir
else:
logger.info(f"Starting FUSE mount at {mount_point}")

args = (str(mount_point) + "/", str(disk_cache_dir) + "/")
self._fuse_process = mp.Process(target=run, args=args, daemon=True)
self._fuse_process.start()
max_iters = 10
for i in range(max_iters):
# wait until http is mounted
if (mount_point / "http").exists():
break

if i == max_iters - 1:
self.stop()
raise RuntimeError("Failed to setup FUSE")

time.sleep(0.5)
self._tmp_dir = tmp_dir
atexit.register(self.stop)

def stop(self):
if self._fuse_process is None:
Expand All @@ -82,9 +99,57 @@ def stop(self):
# TODO: remove cache and mount dirs?
# make sure stuff is no longer mounted

@property
def is_mounted(self):
if self._tmp_dir is None:
return False
return (self._tmp_dir / self._mnt_name).is_mount()

def unmount(self):
if self._tmp_dir is None:
raise RuntimeError("FUSE not started")

if not self.is_mounted:
raise RuntimeError("FUSE not mounted")

mount_point = self._tmp_dir / self._mnt_name

# Stop the FUSE process if it was started by us
fuse.stop()

# FUSE might have been started externally, so we need to unmount it
# manually
if mount_point.is_mount():
if OS_NAME == "Darwin":
p = subprocess.run(["umount", str(mount_point)], capture_output=True)
if p.returncode != 0:
p = subprocess.run(
["diskutil", "unmount", str(mount_point)], capture_output=True
)
else:
p = subprocess.run(
["fusermount", "-uz", str(mount_point)], capture_output=True
)

if not mount_point.is_mount():
return

message = f"Failed to unmount FUSE"
if p.returncode != 0:
stdout = p.stdout.decode() if p.stdout else ""
stderr = p.stderr.decode() if p.stderr else ""
message += f": {' '.join(p.args)} returned {p.returncode}"
if stdout or stderr:
message += f"\n{stdout}\n{stderr}"
raise RuntimeError(message)

def path(self, href: str):
if self._tmp_dir is None:
raise RuntimeError("FUSE processes not started")
raise RuntimeError("FUSE not started")

if not self.is_mounted:
raise RuntimeError("httpfs FUSE filesystem is not mounted")

url = urlparse(href)
return str(
self._tmp_dir / self._mnt_name / f"{url.scheme}/{url.netloc}{url.path}.."
Expand Down
2 changes: 1 addition & 1 deletion src/higlass/fuse/_httpfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

FsName = Literal["http", "https", "ftp"]

logger = logging.getLogger("hg.fuse")
logger = logging.getLogger("higlass.fuse")


class MultiHttpFs(LoggingMixIn, Operations):
Expand Down
97 changes: 97 additions & 0 deletions test/test_fuse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from pathlib import Path
from tempfile import TemporaryDirectory
import time
import pytest

from higlass.fuse import fuse


def test_fuse_setup_and_teardown():
with TemporaryDirectory(ignore_cleanup_errors=True) as tmp_dir:
mount_dir = Path(tmp_dir) / "schemas"
fuse.start(tmp_dir)
assert mount_dir.is_mount()
assert fuse.path("http://www.example.com") == str(
mount_dir / "http" / "www.example.com.."
)
assert fuse.path("https://www.example.com") == str(
mount_dir / "https" / "www.example.com.."
)
assert fuse.path("ftp://example.com") == str(
mount_dir / "ftp" / "example.com.."
)
fuse.stop()
time.sleep(0.1)
assert not mount_dir.is_mount()


def test_fuse_setup_twice_with_same_dir_is_noop():
with TemporaryDirectory(ignore_cleanup_errors=True) as tmp_dir:
mount_dir = Path(tmp_dir) / "schemas"
fuse.start(tmp_dir)
assert mount_dir.is_mount()
fuse.start(tmp_dir)
assert mount_dir.is_mount()
fuse.stop()
time.sleep(0.1)
assert not mount_dir.is_mount()


def test_fuse_setup_and_teardown_twice_with_different_dir():
with TemporaryDirectory(ignore_cleanup_errors=True) as tmp_dir1, TemporaryDirectory(
ignore_cleanup_errors=True
) as tmp_dir2:
mount_dir1 = Path(tmp_dir1) / "schemas"
mount_dir2 = Path(tmp_dir2) / "schemas"
fuse.start(tmp_dir1)
assert mount_dir1.is_mount()
fuse.start(tmp_dir2)
time.sleep(0.1)
assert not mount_dir1.is_mount()
assert mount_dir2.is_mount()
fuse.stop()
time.sleep(0.1)
assert not mount_dir2.is_mount()


def test_unmount_without_setup():
from higlass.fuse import FuseProcess

fuse = FuseProcess()
with pytest.raises(RuntimeError):
fuse.unmount()


def test_unmount_after_teardown():
with TemporaryDirectory(ignore_cleanup_errors=True) as tmp_dir:
fuse.start(tmp_dir)
fuse.stop()
with pytest.raises(RuntimeError):
fuse.unmount()


def test_unmount_then_start():
with TemporaryDirectory(ignore_cleanup_errors=True) as tmp_dir:
mount_dir = Path(tmp_dir) / "schemas"
fuse.start(tmp_dir)
assert mount_dir.is_mount()
fuse.unmount()
assert not mount_dir.is_mount()
fuse.start(tmp_dir)
assert mount_dir.is_mount()
fuse.stop()
time.sleep(0.1)
assert not mount_dir.is_mount()


def test_path_without_setup():
with pytest.raises(RuntimeError):
fuse.path("http://www.example.com")


def test_path_after_teardown():
with TemporaryDirectory(ignore_cleanup_errors=True) as tmp_dir:
fuse.start(tmp_dir)
fuse.stop()
with pytest.raises(RuntimeError):
fuse.path("http://www.example.com")