import sys import os import re from datetime import datetime # Add project root to sys.path sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from database import SessionLocal, engine, Base from models import SystemSettings, PKISetting, UserProfile from services import config as config_service def parse_bash_array(content, var_name): # Dumb parser for bash arrays: VAR=( "val1" "val2" ) # This is fragile but fits the simple format used in confvars pattern = float = fr'{var_name}=\((.*?)\)' match = re.search(pattern, content, re.DOTALL) if match: items = re.findall(r'"([^"]*)"', match.group(1)) return items return [] def parse_bash_var(content, var_name): pattern = fr'{var_name}="?([^"\n]*)"?' match = re.search(pattern, content) if match: return match.group(1) return None def migrate_confvars(db): confvars_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "confvars") if not os.path.exists(confvars_path): print(f"No confvars found at {confvars_path}") return with open(confvars_path, "r") as f: content = f.read() print("Migrating System Settings...") sys_settings = config_service.get_system_settings(db) # Map variables protocol = parse_bash_var(content, "TPROTO") if protocol: sys_settings.protocol = protocol port = parse_bash_var(content, "TPORT") if port: sys_settings.port = int(port) vpn_network = parse_bash_var(content, "TSERNET") if vpn_network: sys_settings.vpn_network = vpn_network.strip('"') vpn_netmask = parse_bash_var(content, "TSERMASK") if vpn_netmask: sys_settings.vpn_netmask = vpn_netmask.strip('"') tunnel_type = parse_bash_var(content, "TTUNTYPE") if tunnel_type: sys_settings.tunnel_type = tunnel_type duplicate_cn = parse_bash_var(content, "TDCN") sys_settings.duplicate_cn = (duplicate_cn == "YES") client_to_client = parse_bash_var(content, "TC2C") sys_settings.client_to_client = (client_to_client == "YES") crl_verify = parse_bash_var(content, "TREVO") sys_settings.crl_verify = (crl_verify == "YES") # Arrays split_routes = parse_bash_array(content, "TTUNNETS") if split_routes: sys_settings.split_routes = split_routes dns_servers = parse_bash_array(content, "TDNS") if dns_servers: sys_settings.dns_servers = dns_servers sys_settings.user_defined_dns = True # Scripts conn_scripts = parse_bash_var(content, "T_CONNSCRIPTS") sys_settings.user_defined_cdscripts = (conn_scripts == "YES") conn_script = parse_bash_var(content, "T_CONNSCRIPT_STRING") if conn_script: sys_settings.connect_script = conn_script.strip('"') disconn_script = parse_bash_var(content, "T_DISCONNSCRIPT_STRING") if disconn_script: sys_settings.disconnect_script = disconn_script.strip('"') # Mgmt mgmt = parse_bash_var(content, "T_MGMT") sys_settings.management_interface = (mgmt == "YES") mgmt_addr = parse_bash_var(content, "T_MGMT_ADDR") if mgmt_addr: sys_settings.management_interface_address = mgmt_addr.strip('"') mgmt_port = parse_bash_var(content, "T_MGMT_PORT") if mgmt_port: sys_settings.management_port = int(mgmt_port) db.commit() print("System Settings Migrated.") print("Migrating PKI Settings...") pki_settings = config_service.get_pki_settings(db) fqdn_server = parse_bash_var(content, "FQDN_SERVER") if fqdn_server: pki_settings.fqdn_server = fqdn_server fqdn_ca = parse_bash_var(content, "FQDN_CA") if fqdn_ca: pki_settings.fqdn_ca = fqdn_ca # EasyRSA vars for line in content.splitlines(): if line.startswith("export EASYRSA_"): parts = line.split("=") if len(parts) == 2: key = parts[0].replace("export ", "").strip() val = parts[1].strip().strip('"') # Map to model fields (lowercase) if hasattr(pki_settings, key.lower()): # Simple type conversion field_type = type(getattr(pki_settings, key.lower())) if field_type == int: setattr(pki_settings, key.lower(), int(val)) elif field_type == bool: # Handle varied boolean strings if val.lower() in ["1", "yes", "true", "on"]: setattr(pki_settings, key.lower(), True) else: setattr(pki_settings, key.lower(), False) else: setattr(pki_settings, key.lower(), val) db.commit() print("PKI Settings Migrated.") def migrate_users(db): client_config_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "client-config") if not os.path.exists(client_config_dir): print("No client-config directory found.") return print("Migrating Users...") for filename in os.listdir(client_config_dir): if filename.endswith(".ovpn"): username = filename[:-5] # remove .ovpn # Check overlap existing = db.query(UserProfile).filter(UserProfile.username == username).first() if not existing: # Basic import, we don't have createdAt date easily unless we stat the file file_path = os.path.join(client_config_dir, filename) stat = os.stat(file_path) created_at = datetime.fromtimestamp(stat.st_ctime) # Try to parse ID from filename if it matches format "ID-Name" (common in this script) # But the bash script logic was "ID-Name" -> client_name # The UserProfile username should probably be the CommonName profile = UserProfile( username=username, status="active", created_at=created_at, file_path=file_path ) db.add(profile) print(f"Imported user: {username}") db.commit() print("Users Migrated.") if __name__ == "__main__": # Ensure tables exist Base.metadata.create_all(bind=engine) db = SessionLocal() try: migrate_confvars(db) migrate_users(db) finally: db.close()