import os import subprocess import logging from .config import get_pki_settings from sqlalchemy.orm import Session from datetime import datetime logger = logging.getLogger(__name__) EASY_RSA_DIR = os.path.join(os.getcwd(), "easy-rsa") PKI_DIR = os.path.join(EASY_RSA_DIR, "pki") INDEX_PATH = os.path.join(PKI_DIR, "index.txt") def get_pki_index_data(db: Session = None): """ Parses easy-rsa/pki/index.txt to get certificate expiration dates. Returns a dict: { "common_name": datetime_expiration } """ if not os.path.exists(INDEX_PATH): logger.warning(f"PKI index file not found at {INDEX_PATH}") return {} pki_data = {} try: with open(INDEX_PATH, "r") as f: for line in f: parts = line.strip().split('\t') # OpenSSL index.txt format: # 0: Flag (V=Valid, R=Revoked, E=Expired) # 1: Expiration Date (YYMMDDHHMMSSZ) # 2: Revocation Date (Can be empty) # 3: Serial # 4: Filename (unknown, often empty) # 5: Distinguished Name (DN) -> /CN=username if len(parts) < 6: continue flag = parts[0] exp_date_str = parts[1] dn = parts[5] if flag != 'V': # Only interested in valid certs expiration? Or all? User wants 'expired_at'. # Even if revoked/expired, 'expired_at' (valid_until) is still a property of the cert. pass # Extract CN # dn formats: /CN=anton or /C=RU/ST=Moscow.../CN=anton cn = None for segment in dn.split('/'): if segment.startswith("CN="): cn = segment.split("=", 1)[1] break if cn: # Parse Date: YYMMDDHHMMSSZ -> 250116102805Z # Note: OpenSSL uses 2-digit year. stored as 20YY or 19YY. # python strptime %y handles 2-digit years (00-68 -> 2000-2068, 69-99 -> 1969-1999) try: exp_dt = datetime.strptime(exp_date_str, "%y%m%d%H%M%SZ") pki_data[cn] = exp_dt except ValueError: logger.error(f"Failed to parse date: {exp_date_str}") except Exception as e: logger.error(f"Error reading PKI index: {e}") return pki_data def _run_easyrsa(command: list, env: dict): env_vars = os.environ.copy() env_vars.update(env) # Ensure EASYRSA_PKI is set env_vars["EASYRSA_PKI"] = PKI_DIR cmd = [os.path.join(EASY_RSA_DIR, "easyrsa")] + command try: result = subprocess.run( cmd, capture_output=True, text=True, env=env_vars, cwd=EASY_RSA_DIR ) if result.returncode != 0: logger.error(f"EasyRSA command failed: {cmd}") logger.error(f"Stderr: {result.stderr}") raise Exception(f"EasyRSA command failed: {result.stderr}") return result.stdout except Exception as e: logger.exception("Error running EasyRSA") raise e def _get_easyrsa_env(db: Session): pki = get_pki_settings(db) env = { "EASYRSA_DN": pki.easyrsa_dn, "EASYRSA_REQ_COUNTRY": pki.easyrsa_req_country, "EASYRSA_REQ_PROVINCE": pki.easyrsa_req_province, "EASYRSA_REQ_CITY": pki.easyrsa_req_city, "EASYRSA_REQ_ORG": pki.easyrsa_req_org, "EASYRSA_REQ_EMAIL": pki.easyrsa_req_email, "EASYRSA_REQ_OU": pki.easyrsa_req_ou, "EASYRSA_KEY_SIZE": str(pki.easyrsa_key_size), "EASYRSA_CA_EXPIRE": str(pki.easyrsa_ca_expire), "EASYRSA_CERT_EXPIRE": str(pki.easyrsa_cert_expire), "EASYRSA_CERT_RENEW": str(pki.easyrsa_cert_renew), "EASYRSA_CRL_DAYS": str(pki.easyrsa_crl_days), "EASYRSA_BATCH": "1" if pki.easyrsa_batch else "0" } return env def init_pki(db: Session): env = _get_easyrsa_env(db) pki_settings = get_pki_settings(db) if os.path.exists(os.path.join(PKI_DIR, "ca.crt")): logger.warning("PKI already initialized") return "PKI already initialized" # Init PKI _run_easyrsa(["init-pki"], env) # Build CA _run_easyrsa(["--req-cn=" + pki_settings.fqdn_ca, "build-ca", "nopass"], env) # Build Server Cert _run_easyrsa(["build-server-full", pki_settings.fqdn_server, "nopass"], env) # Gen DH _run_easyrsa(["gen-dh"], env) # Gen TLS Auth Key (requires openvpn, not easyrsa) ta_key_path = os.path.join(PKI_DIR, "ta.key") subprocess.run(["openvpn", "--genkey", "secret", ta_key_path], check=True) # Gen CRL _run_easyrsa(["gen-crl"], env) return "PKI Initialized" def clear_pki(db: Session): # 1. Clear Database Users from models import UserProfile try: db.query(UserProfile).delete() db.commit() except Exception as e: logger.error(f"Failed to clear user profiles from DB: {e}") db.rollback() # 2. Clear Client Configs client_conf_dir = os.path.join(os.getcwd(), "client-config") if os.path.exists(client_conf_dir): import shutil try: shutil.rmtree(client_conf_dir) # Recreate empty dir os.makedirs(client_conf_dir, exist_ok=True) except Exception as e: logger.error(f"Failed to clear client configs: {e}") # 3. Clear PKI if os.path.exists(PKI_DIR): import shutil shutil.rmtree(PKI_DIR) return "System cleared: PKI environment, User DB, and Client profiles wiped." return "PKI directory did not exist, but User DB and Client profiles were wiped." def build_client(username: str, db: Session): env = _get_easyrsa_env(db) _run_easyrsa(["build-client-full", username, "nopass"], env) return True def revoke_client(username: str, db: Session): env = _get_easyrsa_env(db) _run_easyrsa(["revoke", username], env) _run_easyrsa(["gen-crl"], env) return True