75 lines
2.9 KiB
Python
75 lines
2.9 KiB
Python
import jwt
|
|
import configparser
|
|
import os
|
|
from fastapi import Header, HTTPException, status
|
|
from pathlib import Path
|
|
|
|
from .config import get_config_value
|
|
|
|
def get_secret_key():
|
|
# Use consistent OVPMON_API_SECRET_KEY as primary source
|
|
key = get_config_value('api', 'secret_key', fallback='ovpmon-secret-change-me')
|
|
|
|
if key == 'ovpmon-secret-change-me':
|
|
print("[AUTH] WARNING: Using default fallback SECRET_KEY")
|
|
else:
|
|
# Check if it was from env (get_config_value prioritizes env)
|
|
import os
|
|
if os.getenv('OVPMON_API_SECRET_KEY'):
|
|
print("[AUTH] Using SECRET_KEY from OVPMON_API_SECRET_KEY environment variable")
|
|
elif os.getenv('OVPMON_SECRET_KEY'):
|
|
print("[AUTH] Using SECRET_KEY from OVPMON_SECRET_KEY environment variable")
|
|
else:
|
|
print("[AUTH] SECRET_KEY loaded (config.ini or fallback)")
|
|
|
|
return key
|
|
|
|
SECRET_KEY = get_secret_key()
|
|
|
|
|
|
async def verify_token(authorization: str = Header(None)):
|
|
if not authorization or not authorization.startswith("Bearer "):
|
|
print(f"[AUTH] Missing or invalid Authorization header: {authorization[:20] if authorization else 'None'}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token is missing or invalid",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
token = authorization.split(" ")[1]
|
|
|
|
try:
|
|
# Debug: Log a few chars of the key and token (safely)
|
|
# print(f"[AUTH] Decoding token with SECRET_KEY starting with: {SECRET_KEY[:3]}...")
|
|
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
|
|
return payload
|
|
except Exception as e:
|
|
error_type = type(e).__name__
|
|
error_detail = str(e)
|
|
print(f"[AUTH] JWT Decode Failed. Type: {error_type}, Detail: {error_detail}")
|
|
|
|
# Handling exceptions dynamically to avoid AttributeError
|
|
if error_type == "ExpiredSignatureError":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token has expired",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
elif error_type in ["InvalidTokenError", "DecodeError"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Token is invalid",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
else:
|
|
# Check if it's a TypeError (e.g. wrong arguments for decode)
|
|
if error_type == "TypeError":
|
|
print("[AUTH] Critical: jwt.decode failed with TypeError. This likely means 'jwt' package is installed instead of 'PyJWT'.")
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail=f"Authentication error: {error_type}",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|