import jwt import configparser import os from fastapi import Header, HTTPException, status from pathlib import Path # Load config from the main APP directory CONFIG_FILE = Path(__file__).parent.parent.parent / 'APP' / 'config.ini' def get_secret_key(): # Priority 1: Environment Variable env_secret = os.getenv('OVPMON_SECRET_KEY') if env_secret: print("[AUTH] Using SECRET_KEY from environment variable") return env_secret # Priority 2: Config file (multiple possible locations) # Resolve absolute path to be sure base_path = Path(__file__).resolve().parent.parent config_locations = [ base_path.parent / 'APP' / 'config.ini', # Brother directory (Local/Gitea structure) base_path / 'APP' / 'config.ini', # Child directory base_path / 'config.ini', # Same directory Path('/opt/ovpmon/APP/config.ini'), # Common production path 1 Path('/opt/ovpmon/config.ini'), # Common production path 2 Path('/etc/ovpmon/config.ini'), # Standard linux config path Path('/opt/ovpn_python_profiler/APP/config.ini') # Path based on traceback ] config = configparser.ConfigParser() for loc in config_locations: if loc.exists(): try: config.read(loc) if config.has_section('api') and config.has_option('api', 'secret_key'): key = config.get('api', 'secret_key') if key: print(f"[AUTH] Successfully loaded SECRET_KEY from {loc}") return key except Exception as e: print(f"[AUTH] Error reading config at {loc}: {e}") continue print("[AUTH] WARNING: No config found, using default fallback SECRET_KEY") return 'ovpmon-secret-change-me' SECRET_KEY = get_secret_key() async def verify_token(authorization: str = Header(None)): if not authorization or not authorization.startswith("Bearer "): print(f"[AUTH] Missing or invalid Authorization header: {authorization[:20] if authorization else 'None'}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token is missing or invalid", headers={"WWW-Authenticate": "Bearer"}, ) token = authorization.split(" ")[1] try: # Debug: Log a few chars of the key and token (safely) # print(f"[AUTH] Decoding token with SECRET_KEY starting with: {SECRET_KEY[:3]}...") payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) return payload except Exception as e: error_type = type(e).__name__ error_detail = str(e) print(f"[AUTH] JWT Decode Failed. Type: {error_type}, Detail: {error_detail}") # Handling exceptions dynamically to avoid AttributeError if error_type == "ExpiredSignatureError": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", headers={"WWW-Authenticate": "Bearer"}, ) elif error_type in ["InvalidTokenError", "DecodeError"]: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token is invalid", headers={"WWW-Authenticate": "Bearer"}, ) else: # Check if it's a TypeError (e.g. wrong arguments for decode) if error_type == "TypeError": print("[AUTH] Critical: jwt.decode failed with TypeError. This likely means 'jwt' package is installed instead of 'PyJWT'.") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Authentication error: {error_type}", headers={"WWW-Authenticate": "Bearer"}, )