import sqlite3 import configparser import os import logging class DatabaseManager: def __init__(self, config_file='config.ini'): self.config_file = config_file self.config = configparser.ConfigParser() self.logger = logging.getLogger(__name__) self.load_config() def load_config(self): if os.path.exists(self.config_file): self.config.read(self.config_file) # Priority: ENV > Config File > Fallback env_db_path = os.getenv('OVPMON_OPENVPN_MONITOR_DB_PATH') if env_db_path: self.db_path = env_db_path else: self.db_path = self.config.get('openvpn_monitor', 'db_path', fallback='openvpn_monitor.db') def get_connection(self): """Get a database connection""" return sqlite3.connect(self.db_path) def init_database(self): """Initialize the database schema""" # Create directory if needed db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): try: os.makedirs(db_dir) except OSError: pass self.logger.info(f"Using database: {self.db_path}") conn = self.get_connection() cursor = conn.cursor() def _column_exists(table, column): cursor.execute(f"PRAGMA table_info({table})") return any(row[1] == column for row in cursor.fetchall()) def _ensure_column(table, column, type_def): if not _column_exists(table, column): self.logger.info(f"Adding missing column {column} to table {table}") cursor.execute(f"ALTER TABLE {table} ADD COLUMN {column} {type_def}") try: # 1. Clients Table cursor.execute(''' CREATE TABLE IF NOT EXISTS clients ( id INTEGER PRIMARY KEY AUTOINCREMENT, common_name TEXT UNIQUE NOT NULL, real_address TEXT, status TEXT DEFAULT 'Active', total_bytes_received INTEGER DEFAULT 0, total_bytes_sent INTEGER DEFAULT 0, last_bytes_received INTEGER DEFAULT 0, last_bytes_sent INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 2. Raw Usage History cursor.execute(''' CREATE TABLE IF NOT EXISTS usage_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, client_id INTEGER, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, bytes_received INTEGER, bytes_sent INTEGER, bytes_received_rate_mbps REAL, bytes_sent_rate_mbps REAL, FOREIGN KEY (client_id) REFERENCES clients (id) ) ''') cursor.execute('CREATE INDEX IF NOT EXISTS idx_usage_ts ON usage_history(timestamp)') # 2.1 Active Sessions (Temporary state table) cursor.execute(''' CREATE TABLE IF NOT EXISTS active_sessions ( client_id INTEGER, common_name TEXT, real_address TEXT, bytes_received INTEGER, bytes_sent INTEGER, connected_since TIMESTAMP, last_seen TIMESTAMP, FOREIGN KEY (client_id) REFERENCES clients (id) ) ''') # 2.2 Users and Auth cursor.execute(''' CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, totp_secret TEXT, is_2fa_enabled INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 2.3 Login Attempts (Brute-force protection) cursor.execute(''' CREATE TABLE IF NOT EXISTS login_attempts ( ip_address TEXT PRIMARY KEY, attempts INTEGER DEFAULT 0, last_attempt TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # 3. Aggregated Stats Tables tables = ['stats_5min', 'stats_15min', 'stats_hourly', 'stats_6h', 'stats_daily'] for table in tables: cursor.execute(f''' CREATE TABLE IF NOT EXISTS {table} ( timestamp TEXT NOT NULL, client_id INTEGER NOT NULL, bytes_received INTEGER DEFAULT 0, bytes_sent INTEGER DEFAULT 0, PRIMARY KEY (timestamp, client_id), FOREIGN KEY (client_id) REFERENCES clients (id) ) ''') cursor.execute(f'CREATE INDEX IF NOT EXISTS idx_{table}_ts ON {table}(timestamp)') # 4. Migrations (Ensure columns in existing tables) # If users table existed but was old, ensure it has 2FA columns cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users'") if cursor.fetchone(): _ensure_column('users', 'totp_secret', 'TEXT') _ensure_column('users', 'is_2fa_enabled', 'INTEGER DEFAULT 0') conn.commit() self.logger.info("Database initialized with full schema and migrations") except Exception as e: self.logger.error(f"Database initialization error: {e}") finally: conn.close()