import os import subprocess import logging import time import psutil logger = logging.getLogger(__name__) def is_container(): """ Checks if the application is running inside a Docker container. """ if os.path.exists('/.dockerenv'): return True try: with open('/proc/self/cgroup', 'rt') as f: if 'docker' in f.read(): return True except: pass return False def control_service(action: str): """ Action: start, stop, restart """ if action not in ["start", "stop", "restart"]: raise ValueError("Invalid action") CONFIG_PATH = "/etc/openvpn/server.conf" PID_FILE = "/run/openvpn.pid" # In Container: Use direct execution to avoid OpenRC/cgroups issues if is_container(): logger.info(f"[PROCESS] Container detected, using direct execution for {action}") def start_vpn_direct(): if not os.path.exists(CONFIG_PATH): # Check for alternative location in dev/non-root environments if os.path.exists("staging/server.conf"): alt_path = os.path.abspath("staging/server.conf") logger.info(f"[PROCESS] Using alternative config: {alt_path}") config = alt_path else: return {"status": "error", "message": f"Configuration not found at {CONFIG_PATH}. Please generate it first."} else: config = CONFIG_PATH # Check if already running for proc in psutil.process_iter(['name']): if proc.info['name'] == 'openvpn': return {"status": "success", "message": "OpenVPN is already running"} cmd = ["openvpn", "--config", config, "--daemon", "--writepid", PID_FILE] try: subprocess.run(cmd, check=True) return {"status": "success", "message": "OpenVPN started successfully (direct)"} except subprocess.CalledProcessError as e: return {"status": "error", "message": f"Failed to start OpenVPN: {str(e)}"} def stop_vpn_direct(): procs_to_stop = [] for proc in psutil.process_iter(['name']): if proc.info['name'] == 'openvpn': try: proc.terminate() procs_to_stop.append(proc) except (psutil.NoSuchProcess, psutil.AccessDenied): pass if procs_to_stop: # Wait for processes to actually exit logger.info(f"[PROCESS] Waiting for {len(procs_to_stop)} OpenVPN process(es) to terminate...") gone, alive = psutil.wait_procs(procs_to_stop, timeout=5) for p in alive: try: logger.warning(f"[PROCESS] Process {p.pid} did not terminate, killing...") p.kill() except: pass if os.path.exists(PID_FILE): try: os.remove(PID_FILE) except: pass if procs_to_stop: return {"status": "success", "message": "OpenVPN stopped successfully"} else: return {"status": "success", "message": "OpenVPN was not running"} if action == "start": return start_vpn_direct() elif action == "stop": return stop_vpn_direct() elif action == "restart": stop_vpn_direct() return start_vpn_direct() # On Host OS: Use system service manager os_type = get_os_type() logger.info(f"[PROCESS] Host OS detected ({os_type}), using service manager for {action}") cmd = [] if os_type == "alpine": cmd = ["rc-service", "openvpn", action] else: cmd = ["systemctl", action, "openvpn"] try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) return { "status": "success", "message": f"Service {action} executed successfully via {cmd[0]}", "stdout": result.stdout } except subprocess.CalledProcessError as e: logger.error(f"Service control failed: {e.stderr}") return { "status": "error", "message": f"Failed to {action} service via {cmd[0]}", "stderr": e.stderr } except FileNotFoundError: return { "status": "error", "message": f"Command {cmd[0]} not found found for OS type {os_type}" } def get_process_stats(): """ Returns dict with pid, cpu_percent, memory_mb, uptime. Uses psutil for robust telemetry. """ pid = None process = None # Find the process try: # Iterate over all running processes for proc in psutil.process_iter(['pid', 'name']): try: if proc.info['name'] == 'openvpn': pid = proc.info['pid'] process = proc break except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass except Exception as e: logger.error(f"Failed to find process via psutil: {e}") if not pid or not process: return { "status": "stopped", "pid": None, "cpu_percent": 0.0, "memory_mb": 0.0, "uptime": None } # Get Stats try: # CPU Percent # Increasing interval to 1.0s to ensure we capture enough ticks for accurate reading # especially on systems with low clock resolution (e.g. 100Hz = 10ms ticks) cpu = process.cpu_percent(interval=1.0) # Memory (RSS) mem_info = process.memory_info() rss_mb = round(mem_info.rss / 1024 / 1024, 2) # Uptime create_time = process.create_time() uptime_seconds = time.time() - create_time uptime_str = format_seconds(uptime_seconds) return { "status": "running", "pid": pid, "cpu_percent": cpu, "memory_mb": rss_mb, "uptime": uptime_str } except (psutil.NoSuchProcess, psutil.AccessDenied): # Process might have died between discovery and stats return { "status": "stopped", "pid": None, "cpu_percent": 0.0, "memory_mb": 0.0, "uptime": None } except Exception as e: logger.error(f"Failed to get process stats: {e}") return { "status": "running", "pid": pid, "cpu_percent": 0.0, "memory_mb": 0.0, "uptime": None } def format_seconds(seconds: float) -> str: seconds = int(seconds) days, seconds = divmod(seconds, 86400) hours, seconds = divmod(seconds, 3600) minutes, seconds = divmod(seconds, 60) parts = [] if days > 0: parts.append(f"{days}d") if hours > 0: parts.append(f"{hours}h") if minutes > 0: parts.append(f"{minutes}m") parts.append(f"{seconds}s") return " ".join(parts)