new calculation approach with unique sessions, new API endpoint to get list of active sessions, fix for UNDEF user, UI and Back to support certificate management still under development

This commit is contained in:
Антон
2026-01-12 11:44:50 +03:00
parent 839dd4994f
commit 6df0f5e180
10 changed files with 1175 additions and 59 deletions

149
APP/pki_manager.py Normal file
View File

@@ -0,0 +1,149 @@
import os
import subprocess
from pathlib import Path
import shutil
class PKIManager:
def __init__(self, easyrsa_path, pki_path):
self.easyrsa_dir = Path(easyrsa_path)
self.pki_path = Path(pki_path)
self.easyrsa_bin = self.easyrsa_dir / 'easyrsa'
# Ensure easyrsa script is executable
if self.easyrsa_bin.exists():
os.chmod(self.easyrsa_bin, 0o755)
def run_easyrsa(self, args):
"""Run easyrsa command"""
cmd = [str(self.easyrsa_bin)] + args
env = os.environ.copy()
# Ensure we point to the correct PKI dir if flexible
# But EasyRSA usually expects to be run inside the dir or have env var?
# Standard: run in easyrsa_dir, but PKI might be elsewhere.
# usually invoke like: easyrsa --pki-dir=/path/to/pki cmd
# We'll use the --pki-dir arg if supported or just chdir if needed.
# EasyRSA 3 supports --pki-dir key.
final_cmd = [str(self.easyrsa_bin), f'--pki-dir={self.pki_path}'] + args
try:
# We run from easyrsa dir so it finds openssl-easyrsa.cnf etc if needed
result = subprocess.run(
final_cmd,
cwd=self.easyrsa_dir,
capture_output=True,
text=True,
check=True
)
return True, result.stdout
except subprocess.CalledProcessError as e:
return False, e.stderr + "\n" + e.stdout
def validate_pki_path(self, path_str):
"""Check if a path contains a valid initialized PKI or EasyRSA structure"""
path = Path(path_str)
if not path.exists():
return False, "Path does not exist"
# Check for essential items: pki dir or easyrsa script inside
# Or if it IS the pki dir (contains ca.crt, issued, private)
is_pki_root = (path / "ca.crt").exists() and (path / "private").exists()
has_pki_subdir = (path / "pki" / "ca.crt").exists()
if is_pki_root or has_pki_subdir:
return True, "Valid PKI structure found"
return False, "No PKI structure found (missing ca.crt or private key dir)"
def init_pki(self, force=False):
"""Initialize PKI"""
if force and self.pki_path.exists():
shutil.rmtree(self.pki_path)
if not self.pki_path.exists():
return self.run_easyrsa(['init-pki'])
if (self.pki_path / "private").exists():
return True, "PKI already initialized"
return self.run_easyrsa(['init-pki'])
def update_vars(self, vars_dict):
"""Update vars file with provided dictionary"""
vars_path = self.easyrsa_dir / 'vars'
# Ensure vars file is created in the EasyRSA directory that we run commands from
# Note: If we use --pki-dir, easyrsa might look for vars in the pki dir or the basedir.
# Usually it looks in the directory we invoke it from (cwd).
# Base content
content = [
"# Easy-RSA 3 vars file",
"set_var EASYRSA_DN \"org\"",
"set_var EASYRSA_BATCH \"1\""
]
# Map of keys to allow
allowed_keys = [
'EASYRSA_REQ_COUNTRY', 'EASYRSA_REQ_PROVINCE', 'EASYRSA_REQ_CITY',
'EASYRSA_REQ_ORG', 'EASYRSA_REQ_EMAIL', 'EASYRSA_REQ_OU',
'EASYRSA_KEY_SIZE', 'EASYRSA_CA_EXPIRE', 'EASYRSA_CERT_EXPIRE',
'EASYRSA_CRL_DAYS', 'EASYRSA_REQ_CN'
]
for key, val in vars_dict.items():
if key in allowed_keys and val:
content.append(f"set_var {key} \"{val}\"")
try:
with open(vars_path, 'w') as f:
f.write('\n'.join(content))
return True
except Exception as e:
return False
def build_ca(self, cn="OpenVPN-CA"):
"""Build CA"""
# EasyRSA 3 uses 'build-ca nopass' and takes CN from vars or interactive.
# With batch mode, we rely on vars. But CN is special.
# We can pass --req-cn=NAME (if supported) or rely on vars having EASYRSA_REQ_CN?
# Actually in batch mode `build-ca nopass` uses the common name from vars/env.
# If we updated vars with EASYRSA_REQ_CN, then just run it.
# But to be safe, we can try to set it via env var too.
# args: build-ca nopass
return self.run_easyrsa(['build-ca', 'nopass'])
def build_server(self, name="server"):
"""Build Server Cert"""
return self.run_easyrsa(['build-server-full', name, 'nopass'])
def build_client(self, name):
"""Build Client Cert"""
return self.run_easyrsa(['build-client-full', name, 'nopass'])
def gen_dh(self):
"""Generate Diffie-Hellman"""
return self.run_easyrsa(['gen-dh'])
def gen_crl(self):
"""Generate CRL"""
return self.run_easyrsa(['gen-crl'])
def revoke_client(self, name):
"""Revoke Client"""
# 1. Revoke
succ, out = self.run_easyrsa(['revoke', name])
if not succ: return False, out
# 2. Update CRL
return self.gen_crl()
def gen_ta_key(self, path):
"""Generate TA Key using openvpn directly"""
try:
# openvpn --genkey --secret path
subprocess.run(['openvpn', '--genkey', '--secret', str(path)], check=True)
return True, "TA key generated"
except Exception as e:
return False, str(e)