import os import subprocess from pathlib import Path import shutil class PKIManager: def __init__(self, easyrsa_path, pki_path): self.easyrsa_dir = Path(easyrsa_path) self.pki_path = Path(pki_path) self.easyrsa_bin = self.easyrsa_dir / 'easyrsa' # Ensure easyrsa script is executable if self.easyrsa_bin.exists(): os.chmod(self.easyrsa_bin, 0o755) def run_easyrsa(self, args): """Run easyrsa command""" cmd = [str(self.easyrsa_bin)] + args env = os.environ.copy() # Ensure we point to the correct PKI dir if flexible # But EasyRSA usually expects to be run inside the dir or have env var? # Standard: run in easyrsa_dir, but PKI might be elsewhere. # usually invoke like: easyrsa --pki-dir=/path/to/pki cmd # We'll use the --pki-dir arg if supported or just chdir if needed. # EasyRSA 3 supports --pki-dir key. final_cmd = [str(self.easyrsa_bin), f'--pki-dir={self.pki_path}'] + args try: # We run from easyrsa dir so it finds openssl-easyrsa.cnf etc if needed result = subprocess.run( final_cmd, cwd=self.easyrsa_dir, capture_output=True, text=True, check=True ) return True, result.stdout except subprocess.CalledProcessError as e: return False, e.stderr + "\n" + e.stdout def validate_pki_path(self, path_str): """Check if a path contains a valid initialized PKI or EasyRSA structure""" path = Path(path_str) if not path.exists(): return False, "Path does not exist" # Check for essential items: pki dir or easyrsa script inside # Or if it IS the pki dir (contains ca.crt, issued, private) is_pki_root = (path / "ca.crt").exists() and (path / "private").exists() has_pki_subdir = (path / "pki" / "ca.crt").exists() if is_pki_root or has_pki_subdir: return True, "Valid PKI structure found" return False, "No PKI structure found (missing ca.crt or private key dir)" def init_pki(self, force=False): """Initialize PKI""" if force and self.pki_path.exists(): shutil.rmtree(self.pki_path) if not self.pki_path.exists(): return self.run_easyrsa(['init-pki']) if (self.pki_path / "private").exists(): return True, "PKI already initialized" return self.run_easyrsa(['init-pki']) def update_vars(self, vars_dict): """Update vars file with provided dictionary""" vars_path = self.easyrsa_dir / 'vars' # Ensure vars file is created in the EasyRSA directory that we run commands from # Note: If we use --pki-dir, easyrsa might look for vars in the pki dir or the basedir. # Usually it looks in the directory we invoke it from (cwd). # Base content content = [ "# Easy-RSA 3 vars file", "set_var EASYRSA_DN \"org\"", "set_var EASYRSA_BATCH \"1\"" ] # Map of keys to allow allowed_keys = [ 'EASYRSA_REQ_COUNTRY', 'EASYRSA_REQ_PROVINCE', 'EASYRSA_REQ_CITY', 'EASYRSA_REQ_ORG', 'EASYRSA_REQ_EMAIL', 'EASYRSA_REQ_OU', 'EASYRSA_KEY_SIZE', 'EASYRSA_CA_EXPIRE', 'EASYRSA_CERT_EXPIRE', 'EASYRSA_CRL_DAYS', 'EASYRSA_REQ_CN' ] for key, val in vars_dict.items(): if key in allowed_keys and val: content.append(f"set_var {key} \"{val}\"") try: with open(vars_path, 'w') as f: f.write('\n'.join(content)) return True except Exception as e: return False def build_ca(self, cn="OpenVPN-CA"): """Build CA""" # EasyRSA 3 uses 'build-ca nopass' and takes CN from vars or interactive. # With batch mode, we rely on vars. But CN is special. # We can pass --req-cn=NAME (if supported) or rely on vars having EASYRSA_REQ_CN? # Actually in batch mode `build-ca nopass` uses the common name from vars/env. # If we updated vars with EASYRSA_REQ_CN, then just run it. # But to be safe, we can try to set it via env var too. # args: build-ca nopass return self.run_easyrsa(['build-ca', 'nopass']) def build_server(self, name="server"): """Build Server Cert""" return self.run_easyrsa(['build-server-full', name, 'nopass']) def build_client(self, name): """Build Client Cert""" return self.run_easyrsa(['build-client-full', name, 'nopass']) def gen_dh(self): """Generate Diffie-Hellman""" return self.run_easyrsa(['gen-dh']) def gen_crl(self): """Generate CRL""" return self.run_easyrsa(['gen-crl']) def revoke_client(self, name): """Revoke Client""" # 1. Revoke succ, out = self.run_easyrsa(['revoke', name]) if not succ: return False, out # 2. Update CRL return self.gen_crl() def gen_ta_key(self, path): """Generate TA Key using openvpn directly""" try: # openvpn --genkey --secret path subprocess.run(['openvpn', '--genkey', '--secret', str(path)], check=True) return True, "TA key generated" except Exception as e: return False, str(e)