Files
OpenVPN-Monitoring-Simple/APP_PROFILER/scripts/migrate_from_bash.py

180 lines
6.4 KiB
Python
Raw Normal View History

2026-01-28 22:37:47 +03:00
import sys
import os
import re
from datetime import datetime
# Add project root to sys.path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from database import SessionLocal, engine, Base
from models import SystemSettings, PKISetting, UserProfile
from services import config as config_service
def parse_bash_array(content, var_name):
# Dumb parser for bash arrays: VAR=( "val1" "val2" )
# This is fragile but fits the simple format used in confvars
pattern = float = fr'{var_name}=\((.*?)\)'
match = re.search(pattern, content, re.DOTALL)
if match:
items = re.findall(r'"([^"]*)"', match.group(1))
return items
return []
def parse_bash_var(content, var_name):
pattern = fr'{var_name}="?([^"\n]*)"?'
match = re.search(pattern, content)
if match:
return match.group(1)
return None
def migrate_confvars(db):
confvars_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "confvars")
if not os.path.exists(confvars_path):
print(f"No confvars found at {confvars_path}")
return
with open(confvars_path, "r") as f:
content = f.read()
print("Migrating System Settings...")
sys_settings = config_service.get_system_settings(db)
# Map variables
protocol = parse_bash_var(content, "TPROTO")
if protocol: sys_settings.protocol = protocol
port = parse_bash_var(content, "TPORT")
if port: sys_settings.port = int(port)
vpn_network = parse_bash_var(content, "TSERNET")
if vpn_network: sys_settings.vpn_network = vpn_network.strip('"')
vpn_netmask = parse_bash_var(content, "TSERMASK")
if vpn_netmask: sys_settings.vpn_netmask = vpn_netmask.strip('"')
tunnel_type = parse_bash_var(content, "TTUNTYPE")
if tunnel_type: sys_settings.tunnel_type = tunnel_type
duplicate_cn = parse_bash_var(content, "TDCN")
sys_settings.duplicate_cn = (duplicate_cn == "YES")
client_to_client = parse_bash_var(content, "TC2C")
sys_settings.client_to_client = (client_to_client == "YES")
crl_verify = parse_bash_var(content, "TREVO")
sys_settings.crl_verify = (crl_verify == "YES")
# Arrays
split_routes = parse_bash_array(content, "TTUNNETS")
if split_routes: sys_settings.split_routes = split_routes
dns_servers = parse_bash_array(content, "TDNS")
if dns_servers:
sys_settings.dns_servers = dns_servers
sys_settings.user_defined_dns = True
# Scripts
conn_scripts = parse_bash_var(content, "T_CONNSCRIPTS")
sys_settings.user_defined_cdscripts = (conn_scripts == "YES")
conn_script = parse_bash_var(content, "T_CONNSCRIPT_STRING")
if conn_script: sys_settings.connect_script = conn_script.strip('"')
disconn_script = parse_bash_var(content, "T_DISCONNSCRIPT_STRING")
if disconn_script: sys_settings.disconnect_script = disconn_script.strip('"')
# Mgmt
mgmt = parse_bash_var(content, "T_MGMT")
sys_settings.management_interface = (mgmt == "YES")
mgmt_addr = parse_bash_var(content, "T_MGMT_ADDR")
if mgmt_addr: sys_settings.management_interface_address = mgmt_addr.strip('"')
mgmt_port = parse_bash_var(content, "T_MGMT_PORT")
if mgmt_port: sys_settings.management_port = int(mgmt_port)
db.commit()
print("System Settings Migrated.")
print("Migrating PKI Settings...")
pki_settings = config_service.get_pki_settings(db)
fqdn_server = parse_bash_var(content, "FQDN_SERVER")
if fqdn_server: pki_settings.fqdn_server = fqdn_server
fqdn_ca = parse_bash_var(content, "FQDN_CA")
if fqdn_ca: pki_settings.fqdn_ca = fqdn_ca
# EasyRSA vars
for line in content.splitlines():
if line.startswith("export EASYRSA_"):
parts = line.split("=")
if len(parts) == 2:
key = parts[0].replace("export ", "").strip()
val = parts[1].strip().strip('"')
# Map to model fields (lowercase)
if hasattr(pki_settings, key.lower()):
# Simple type conversion
field_type = type(getattr(pki_settings, key.lower()))
if field_type == int:
setattr(pki_settings, key.lower(), int(val))
elif field_type == bool:
# Handle varied boolean strings
if val.lower() in ["1", "yes", "true", "on"]:
setattr(pki_settings, key.lower(), True)
else:
setattr(pki_settings, key.lower(), False)
else:
setattr(pki_settings, key.lower(), val)
db.commit()
print("PKI Settings Migrated.")
def migrate_users(db):
client_config_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "client-config")
if not os.path.exists(client_config_dir):
print("No client-config directory found.")
return
print("Migrating Users...")
for filename in os.listdir(client_config_dir):
if filename.endswith(".ovpn"):
username = filename[:-5] # remove .ovpn
# Check overlap
existing = db.query(UserProfile).filter(UserProfile.username == username).first()
if not existing:
# Basic import, we don't have createdAt date easily unless we stat the file
file_path = os.path.join(client_config_dir, filename)
stat = os.stat(file_path)
created_at = datetime.fromtimestamp(stat.st_ctime)
# Try to parse ID from filename if it matches format "ID-Name" (common in this script)
# But the bash script logic was "ID-Name" -> client_name
# The UserProfile username should probably be the CommonName
profile = UserProfile(
username=username,
status="active",
created_at=created_at,
file_path=file_path
)
db.add(profile)
print(f"Imported user: {username}")
db.commit()
print("Users Migrated.")
if __name__ == "__main__":
# Ensure tables exist
Base.metadata.create_all(bind=engine)
db = SessionLocal()
try:
migrate_confvars(db)
migrate_users(db)
finally:
db.close()