Skip to content

Commit

Permalink
fix: Improve handling of AnyBodyCon processes when on shutdown (#113)
Browse files Browse the repository at this point in the history
* Ensure we return a defragmented DataFrame

* fix: Improved the way AnyPyTools kills AnyBody on exits and on forced shutdown

AnyPyTools now better handles shuting down running AnyBody processes when it user braks (ctrl-c) or it exists early.

It also ties the anybodycon subprocesses to the main process using some Win CreateProcess tricks. So it the main python process is killed the AnyBodyCon proesses will be killed automatically

* fix formatting

* Move Windows only import

* Update version number

* use correct version number

* Add changelog entry
  • Loading branch information
melund authored Jun 2, 2024
1 parent 2badedc commit a3e351b
Show file tree
Hide file tree
Showing 8 changed files with 2,590 additions and 754 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# AnyPyTools Change Log


## v1.12.0

**Added:**

* Added a way of controlling AnyBodyCon processes, which forces the process to
automatically end wwhen the Python process ends. This prevents the need to
manually close the AnyBodyCon processes if the parent process was force killed.


## v1.11.5

**Fixed:**
Expand Down
2 changes: 1 addition & 1 deletion anypytools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"NORMAL_PRIORITY_CLASS",
]

__version__ = "1.11.6"
__version__ = "1.12.0"


def print_versions():
Expand Down
103 changes: 41 additions & 62 deletions anypytools/abcutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,47 +6,50 @@
@author: Morten
"""

import atexit
import collections
import copy
import ctypes
import logging
import os
import io
import pathlib
import shelve
import sys
import time
import copy
import types
import ctypes
import shelve
import atexit
import pathlib
import logging
import warnings
import collections
from pathlib import Path
from subprocess import Popen, TimeoutExpired
from contextlib import suppress
from tempfile import NamedTemporaryFile
from threading import Thread, RLock
from pathlib import Path
from queue import Queue

from subprocess import TimeoutExpired
from tempfile import NamedTemporaryFile
from threading import RLock, Thread
from typing import Generator, List

import numpy as np
from tqdm.auto import tqdm

from .macroutils import AnyMacro, MacroCommand
from .tools import (
BELOW_NORMAL_PRIORITY_CLASS,
ON_WINDOWS,
make_hash,
AnyPyProcessOutput,
AnyPyProcessOutputList,
parse_anybodycon_output,
getsubdirs,
case_preserving_replace,
get_anybodycon_path,
BELOW_NORMAL_PRIORITY_CLASS,
AnyPyProcessOutput,
run_from_ipython,
get_ncpu,
winepath,
getsubdirs,
make_hash,
parse_anybodycon_output,
silentremove,
case_preserving_replace,
winepath,
)
from .macroutils import AnyMacro, MacroCommand

if ON_WINDOWS:
from .jobpopen import JobPopen as Popen
from subprocess import CREATE_NEW_PROCESS_GROUP
else:
from subprocess import Popen

logger = logging.getLogger("abt.anypytools")

Expand All @@ -60,65 +63,40 @@
class _SubProcessContainer(object):
"""Class to hold a record of process pids from Popen.
Properties
----------
stop_all: boolean
If set to True all process held by the object will be automatically
killed
Methods
-------
stop_all():
Kill all process held by the object
add(pid):
Add process id to the record of process
remove(pid):
Remove process id from the record
"""

def __init__(self):
self._pids = set()
self._stop_all = False
self._pids: set = set()

def add(self, pid):
with _thread_lock:
self._pids.add(pid)
if self.stop_all:
self._kill_running_processes()

def remove(self, pid):
with _thread_lock:
try:
self._pids.remove(pid)
except KeyError:
pass
self._pids.pop(pid, None)

@property
def stop_all(self):
return self._stop_all

@stop_all.setter
def stop_all(self, value):
if value:
self._stop_all = True
self._kill_running_processes()
else:
self._stop_all = False

def _kill_running_processes(self):
"""Clean up and shut down any running processes."""
# Kill any rouge processes that are still running.
with _thread_lock:
killed = []
for pid in self._pids:
with suppress(Exception):
os.kill(pid, _KILLED_BY_ANYPYTOOLS)
killed.append(str(pid))
self._pids.clear()


_subprocess_container = _SubProcessContainer()
atexit.register(_subprocess_container._kill_running_processes)
atexit.register(_subprocess_container.stop_all)


def execute_anybodycon(
Expand Down Expand Up @@ -212,6 +190,7 @@ def execute_anybodycon(
ctypes.windll.kernel32.SetErrorMode(SEM_NOGPFAULTERRORBOX)
subprocess_flags = 0x8000000 # win32con.CREATE_NO_WINDOW?
subprocess_flags |= priority
subprocess_flags |= CREATE_NEW_PROCESS_GROUP
extra_kwargs = {"creationflags": subprocess_flags}

anybodycmd = [
Expand Down Expand Up @@ -275,6 +254,7 @@ def execute_anybodycon(
cwd=folder,
)

retcode = None
_subprocess_container.add(proc.pid)
try:
proc.wait(timeout=timeout)
Expand All @@ -283,13 +263,16 @@ def execute_anybodycon(
proc.kill()
proc.communicate()
retcode = _TIMEDOUT_BY_ANYPYTOOLS
except KeyboardInterrupt:
except KeyboardInterrupt as e:
proc.terminate()
proc.communicate()
retcode = _KILLED_BY_ANYPYTOOLS
raise
raise e
finally:
_subprocess_container.remove(proc.pid)
if not retcode:
proc.kill()
else:
_subprocess_container.remove(proc.pid)

if retcode == _TIMEDOUT_BY_ANYPYTOOLS:
logfile.write(f"\nERROR: AnyPyTools : Timeout after {int(timeout)} sec.")
Expand Down Expand Up @@ -844,20 +827,17 @@ def start_macro(
if hasattr(pbar, "container"):
pbar.container.children[0].bar_style = "danger"
pbar.update()
except KeyboardInterrupt as e:
_subprocess_container.stop_all = True
except KeyboardInterrupt:
tqdm.write("KeyboardInterrupt: User aborted")
time.sleep(1)
finally:
_subprocess_container.stop_all()
if not self.silent:
tqdm.write(tasklist_summery(tasklist))

self.cleanup_logfiles(tasklist)
# Cache the processed tasklist for restarting later
self.cached_tasklist = tasklist
# self.summery.final_summery(process_time, tasklist)
task_output = [task.get_output() for task in tasklist]
return AnyPyProcessOutputList(task_output)
return AnyPyProcessOutputList(t.get_output() for t in tasklist)

def _worker(self, task, task_queue):
"""Handle processing of the tasks."""
Expand Down Expand Up @@ -935,7 +915,6 @@ def _worker(self, task, task_queue):
task_queue.put(task)

def _schedule_processes(self, tasklist) -> Generator[_Task, None, None]:
_subprocess_container.stop_all = False
# Make a shallow copy of the task list,
# so we don't mess with the callers list.
tasklist = copy.copy(tasklist)
Expand Down
103 changes: 103 additions & 0 deletions anypytools/jobpopen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# coding: utf-8
"""
Adaapted from https://stackoverflow.com/a/56632466
This module provides a JobPopen class that is a subclass of the Popen class from the subprocess module.
"""

import subprocess
from subprocess import Popen

import win32api
import win32job
import win32process


class JobPopen(Popen):
"""Start a process in a new Win32 job object.
This `subprocess.Popen` subclass takes the same arguments as Popen and
behaves the same way. In addition to that, created processes will be
assigned to a new anonymous Win32 job object on startup, which will
guarantee that the processes will be terminated by the OS as soon as
either the Popen object, job object handle or parent Python process are
closed.
"""

class _winapijobhandler(object):
"""Patches the native CreateProcess function in the subprocess module
to assign created threads to the given job"""

def __init__(self, oldapi, job):
self._oldapi = oldapi
self._job = job

def __getattr__(self, key):
if key != "CreateProcess":
return getattr(self._oldapi, key) # Any other function is run as before
else:
return self.CreateProcess # CreateProcess will call the function below

def CreateProcess(self, *args, **kwargs):
hp, ht, pid, tid = self._oldapi.CreateProcess(*args, **kwargs)
win32job.AssignProcessToJobObject(self._job, hp)
win32process.ResumeThread(ht)
return hp, ht, pid, tid

def __init__(self, *args, **kwargs):
"""Start a new process using an anonymous job object. Takes the same arguments as Popen"""

# Create a new job object
self._win32_job = self._create_job_object()

# Temporarily patch the subprocess creation logic to assign created
# processes to the new job, then resume execution normally.
CREATE_SUSPENDED = 0x00000004
kwargs.setdefault("creationflags", 0)
kwargs["creationflags"] |= CREATE_SUSPENDED
_winapi = subprocess._winapi # Python 3
_winapi_key = "_winapi"
try:
setattr(
subprocess,
_winapi_key,
JobPopen._winapijobhandler(_winapi, self._win32_job),
)
super(JobPopen, self).__init__(*args, **kwargs)
finally:
setattr(subprocess, _winapi_key, _winapi)

def _create_job_object(self):
"""Create a new anonymous job object"""
hjob = win32job.CreateJobObject(None, "")
extended_info = win32job.QueryInformationJobObject(
hjob, win32job.JobObjectExtendedLimitInformation
)
extended_info["BasicLimitInformation"][
"LimitFlags"
] = win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
win32job.SetInformationJobObject(
hjob, win32job.JobObjectExtendedLimitInformation, extended_info
)
return hjob

def _close_job_object(self, hjob):
"""Close the handle to a job object, terminating all processes inside it"""
if self._win32_job:
win32api.CloseHandle(self._win32_job)
self._win32_job = None

# This ensures that no remaining subprocesses are found when the process
# exits from a `with JobPopen(...)` block.
def __exit__(self, exc_type, value, traceback):
super(JobPopen, self).__exit__(exc_type, value, traceback)
self._close_job_object(self._win32_job)

# Python does not keep a reference outside of the parent class when the
# interpreter exits, which is why we keep it here.
_Popen = subprocess.Popen

def __del__(self):
self._Popen.__del__(self)
self._close_job_object(self._win32_job)
4 changes: 2 additions & 2 deletions anypytools/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,8 @@ def to_dataframe(
dfout[time_columns] = dfout[time_columns].interpolate(interp_method)
dfout[constant_columns] = dfout[constant_columns].bfill()

dfout = dfout.loc[interp_val]
dfout.reset_index(inplace=True)
dfout = dfout.loc[interp_val].copy()
dfout = dfout.reset_index()

return dfout

Expand Down
Loading

0 comments on commit a3e351b

Please sign in to comment.