Source code for deeprm.utils.memory

"""
Memory management utilities for DeepRM.
"""

import logging
import os
import signal
import threading
import time

import psutil

from deeprm.utils.logging import get_logger


def _check_once(limit_gb: float, logger: logging.Logger) -> bool:
    """
    Check if the resident set size (RSS) exceeds the given limit in GiB.

    Args:
        limit_gb (float): Memory limit in GiB.
        logger (logging.Logger): Logger to log messages.

    Returns:
        bool: True if RSS exceeds limit, False otherwise.
    """
    rss_gb = psutil.Process().memory_info().rss / (1024**3)
    if rss_gb > limit_gb:
        logger.error(f"RSS {rss_gb:.2f} GiB > limit {limit_gb:.2f} GiB – exiting.")
        return True
    return False


[docs] def start_mem_watchdog(limit_gb: float = None, interval_s: int = 10) -> threading.Thread: """ Start a daemon thread that exits the *current process* if RSS exceeds limit. Args: limit_gb (float): Memory limit in GiB. Defaults to 95% of total RAM. (optional) interval_s (int): Check interval in seconds. Defaults to 10. (optional) Returns: threading.Thread: The watchdog thread. """ if limit_gb is None: limit_gb = psutil.virtual_memory().total / (1024**3) * 0.95 # Default to 95% of total RAM def _run(): log = get_logger(__name__) while True: if _check_once(limit_gb, log): os.kill(os.getpid(), signal.SIGTERM) time.sleep(interval_s) t = threading.Thread(target=_run, daemon=True) t.start() return t