import sqlite3 import configparser from datetime import datetime, timedelta, timezone from flask import Flask, jsonify, request, send_file from flask_cors import CORS import logging import subprocess import os from pathlib import Path import re import jwt import pyotp import bcrypt from functools import wraps from db import DatabaseManager import io # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = Flask(__name__) # Enable CORS for all routes with specific headers support CORS(app, resources={r"/api/*": {"origins": "*"}}, supports_credentials=True) class OpenVPNAPI: def __init__(self, config_file='config.ini'): self.db_manager = DatabaseManager(config_file) self.db_manager.init_database() self.config = configparser.ConfigParser() self.config.read(config_file) # Paths self.certificates_path = self.config.get('certificates', 'certificates_path', fallback='/etc/openvpn/certs') self.easyrsa_path = self.config.get('pki', 'easyrsa_path', fallback='/etc/openvpn/easy-rsa') self.pki_path = self.config.get('pki', 'pki_path', fallback='/etc/openvpn/pki') # Fixed default to match Settings self.templates_path = self.config.get('api', 'templates_path', fallback='templates') self.server_config_dir = self.config.get('server', 'config_dir', fallback='/etc/openvpn') self.server_config_path = self.config.get('server', 'config_path', fallback=os.path.join(self.server_config_dir, 'server.conf')) # Specific file self.public_ip = self.config.get('openvpn_monitor', 'public_ip', fallback='') self.cert_extensions = self.config.get('certificates', 'certificate_extensions', fallback='crt,pem,key').split(',') self._cert_cache = {} # Security # Priority 1: Environment Variable # Priority 2: Config file self.secret_key = os.getenv('OVPMON_SECRET_KEY') or self.config.get('api', 'secret_key', fallback='ovpmon-secret-change-me') app.config['SECRET_KEY'] = self.secret_key # Ensure at least one user exists self.ensure_default_admin() # Managers def get_db_connection(self): """Get a database connection""" return self.db_manager.get_connection() def ensure_default_admin(self): """Create a default admin user if no users exist""" conn = self.get_db_connection() cursor = conn.cursor() try: cursor.execute("SELECT COUNT(*) FROM users") if cursor.fetchone()[0] == 0: # Default: admin / password password_hash = bcrypt.hashpw('password'.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') cursor.execute("INSERT INTO users (username, password_hash) VALUES (?, ?)", ('admin', password_hash)) conn.commit() logger.info("Default admin user created (admin/password)") except Exception as e: logger.error(f"Error ensuring default admin: {e}") finally: conn.close() def check_rate_limit(self, ip): """Verify login attempts for an IP (max 5 attempts, 15m lockout)""" conn = self.get_db_connection() cursor = conn.cursor() try: cursor.execute("SELECT attempts, last_attempt FROM login_attempts WHERE ip_address = ?", (ip,)) row = cursor.fetchone() if row: attempts, last_attempt = row last_dt = datetime.strptime(last_attempt, '%Y-%m-%d %H:%M:%S') if attempts >= 5 and datetime.utcnow() - last_dt < timedelta(minutes=15): return False # If lockout expired, reset if datetime.utcnow() - last_dt >= timedelta(minutes=15): cursor.execute("UPDATE login_attempts SET attempts = 0 WHERE ip_address = ?", (ip,)) conn.commit() return True except Exception as e: logger.error(f"Rate limit error: {e}") return True # Allow login if rate limiting fails finally: conn.close() def record_login_attempt(self, ip, success): """Update login attempts for an IP""" conn = self.get_db_connection() cursor = conn.cursor() try: if success: cursor.execute("DELETE FROM login_attempts WHERE ip_address = ?", (ip,)) else: cursor.execute(''' INSERT INTO login_attempts (ip_address, attempts, last_attempt) VALUES (?, 1, datetime('now')) ON CONFLICT(ip_address) DO UPDATE SET attempts = attempts + 1, last_attempt = datetime('now') ''', (ip,)) conn.commit() except Exception as e: logger.error(f"Error recording login attempt: {e}") finally: conn.close() # --- БЛОК РАБОТЫ С СЕРТИФИКАТАМИ (Оставлен без изменений) --- def parse_openssl_date(self, date_str): try: parts = date_str.split() if len(parts[1]) == 1: parts[1] = f' {parts[1]}' normalized_date = ' '.join(parts) return datetime.strptime(normalized_date, '%b %d %H:%M:%S %Y GMT') except ValueError: try: return datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z') except ValueError: logger.warning(f"Could not parse date: {date_str}") return datetime.min def calculate_days_remaining(self, not_after_str): if not_after_str == 'N/A': return 'N/A' try: expiration_date = self.parse_openssl_date(not_after_str) if expiration_date == datetime.min: return 'N/A' days_remaining = (expiration_date - datetime.now()).days if days_remaining < 0: return f"Expired ({abs(days_remaining)} days ago)" else: return f"{days_remaining} days" except Exception: return 'N/A' def extract_cert_info(self, cert_file): try: result = subprocess.run(['openssl', 'x509', '-in', cert_file, '-noout', '-text'], capture_output=True, text=True, check=True) output = result.stdout data = {'file': os.path.basename(cert_file), 'file_path': cert_file, 'subject': 'N/A', 'issuer': 'N/A', 'not_after': 'N/A', 'not_before': 'N/A', 'serial': 'N/A', 'type': 'Unknown'} is_ca = False extended_usage = "" for line in output.split('\n'): line = line.strip() if line.startswith('Subject:'): data['subject'] = line.split('Subject:', 1)[1].strip() cn_match = re.search(r'CN\s*=\s*([^,]+)', data['subject']) if cn_match: data['common_name'] = cn_match.group(1).strip() elif 'Not After' in line: data['not_after'] = line.split(':', 1)[1].strip() elif 'Not Before' in line: data['not_before'] = line.split(':', 1)[1].strip() elif 'Serial Number:' in line: data['serial'] = line.split(':', 1)[1].strip() elif 'CA:TRUE' in line: is_ca = True elif 'TLS Web Server Authentication' in line: extended_usage += "Server " elif 'TLS Web Client Authentication' in line: extended_usage += "Client " # Determine Type if is_ca: data['type'] = 'CA' elif 'Server' in extended_usage: data['type'] = 'Server' elif 'Client' in extended_usage: data['type'] = 'Client' elif 'server' in data.get('common_name', '').lower(): data['type'] = 'Server' else: data['type'] = 'Client' # Default to client if ambiguous if data['not_after'] != 'N/A': data['sort_date'] = self.parse_openssl_date(data['not_after']).isoformat() else: data['sort_date'] = datetime.min.isoformat() # Parse dates for UI if data['not_after'] != 'N/A': dt = self.parse_openssl_date(data['not_after']) data['expires_iso'] = dt.isoformat() if data['not_before'] != 'N/A': dt = self.parse_openssl_date(data['not_before']) data['issued_iso'] = dt.isoformat() data['days_remaining'] = self.calculate_days_remaining(data['not_after']) data['is_expired'] = 'Expired' in data['days_remaining'] # State for UI if data['is_expired']: data['state'] = 'Expired' else: data['state'] = 'Valid' return data except Exception as e: logger.error(f"Error processing {cert_file}: {e}") return None def get_certificates_info(self): cert_path = Path(self.certificates_path) if not cert_path.exists(): return [] cert_files = [] for ext in self.cert_extensions: cert_files.extend(cert_path.rglob(f'*.{ext.strip()}')) current_valid_files = set() cert_data = [] for cert_file_path in cert_files: cert_file = str(cert_file_path) current_valid_files.add(cert_file) try: mtime = os.path.getmtime(cert_file) # Check cache cached = self._cert_cache.get(cert_file) if cached and cached['mtime'] == mtime: cert_data.append(cached['data']) else: # Parse and update cache parsed_data = self.extract_cert_info(cert_file) if parsed_data: self._cert_cache[cert_file] = { 'mtime': mtime, 'data': parsed_data } cert_data.append(parsed_data) except OSError: continue # Prune cache for deleted files for cached_file in list(self._cert_cache.keys()): if cached_file not in current_valid_files: del self._cert_cache[cached_file] return cert_data # ----------------------------------------------------------- def get_current_stats(self): """Get current statistics for all clients""" conn = self.get_db_connection() cursor = conn.cursor() try: # ИЗМЕНЕНИЕ: # Вместо "ORDER BY timestamp DESC LIMIT 1" (мгновенное значение), # мы берем "MAX(rate)" за последние 2 минуты. # Это фильтрует "нули", возникающие из-за рассинхрона записи логов, # и показывает реальную пропускную способность канала. cursor.execute(''' SELECT c.common_name, c.real_address, c.status, CASE WHEN c.status = 'Active' THEN 'N/A' ELSE strftime('%Y-%m-%d %H:%M:%S', c.last_activity) END as last_activity, c.total_bytes_received, c.total_bytes_sent, -- Пиковая скорость Download за последние 2 минуты (SELECT MAX(uh.bytes_received_rate_mbps) FROM usage_history uh WHERE uh.client_id = c.id AND uh.timestamp >= datetime('now', '-30 seconds')) as current_recv_rate, -- Пиковая скорость Upload за последние 2 минуты (SELECT MAX(uh.bytes_sent_rate_mbps) FROM usage_history uh WHERE uh.client_id = c.id AND uh.timestamp >= datetime('now', '-30 seconds')) as current_sent_rate, strftime('%Y-%m-%d %H:%M:%S', c.updated_at) as last_updated FROM clients c ORDER BY c.status DESC, c.common_name ''') columns = [column[0] for column in cursor.description] data = [] for row in cursor.fetchall(): data.append(dict(zip(columns, row))) return data except Exception as e: logger.error(f"Error fetching data: {e}") return [] finally: conn.close() def get_client_history(self, common_name, start_date=None, end_date=None, resolution='auto'): """ Получение истории с поддержкой агрегации (TSDB). Автоматически выбирает таблицу (Raw, Hourly, Daily) в зависимости от периода. """ conn = self.get_db_connection() cursor = conn.cursor() # 1. Установка временных рамок if not end_date: end_date = datetime.utcnow() if not start_date: start_date = end_date - timedelta(hours=24) # Дефолт - сутки # Убедимся, что даты - это объекты datetime if isinstance(start_date, str): try: start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S') except: pass if isinstance(end_date, str): try: end_date = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S') except: pass duration_hours = (end_date - start_date).total_seconds() / 3600 # 2. Маппинг разрешений на таблицы table_map = { 'raw': 'usage_history', '5min': 'stats_5min', '15min': 'stats_15min', 'hourly': 'stats_hourly', '6h': 'stats_6h', 'daily': 'stats_daily' } target_table = 'usage_history' # 3. Логика выбора таблицы if resolution == 'auto': if duration_hours <= 24: target_table = 'usage_history' # Сырые данные (график за день) elif duration_hours <= 168: # до 7 дней target_table = 'stats_hourly' # По часам elif duration_hours <= 2160: # до 3 месяцев target_table = 'stats_6h' # Каждые 6 часов else: target_table = 'stats_daily' # По дням elif resolution in table_map: target_table = table_map[resolution] # Проверка существования таблицы (fallback, если миграции не было) try: cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{target_table}'") if not cursor.fetchone(): logger.warning(f"Table {target_table} missing, fallback to usage_history") target_table = 'usage_history' except: pass try: # 4. Request Formation is_aggregated = target_table != 'usage_history' # High resolution handling for 1h and 3h ranges is_high_res = False interval = 0 points_count = 0 if target_table == 'usage_history': if duration_hours <= 1.1: is_high_res = True interval = 30 points_count = 120 elif duration_hours <= 3.1: is_high_res = True interval = 60 points_count = 180 elif duration_hours <= 6.1: is_high_res = True interval = 120 # 2 minutes points_count = 180 elif duration_hours <= 12.1: is_high_res = True interval = 300 # 5 minutes points_count = 144 elif duration_hours <= 24.1: is_high_res = True interval = 900 # 15 minutes points_count = 96 if is_high_res: query = f''' SELECT datetime((strftime('%s', uh.timestamp) / {interval}) * {interval}, 'unixepoch') as timestamp, SUM(uh.bytes_received) as bytes_received, SUM(uh.bytes_sent) as bytes_sent, MAX(uh.bytes_received_rate_mbps) as bytes_received_rate_mbps, MAX(uh.bytes_sent_rate_mbps) as bytes_sent_rate_mbps FROM usage_history uh JOIN clients c ON uh.client_id = c.id WHERE c.common_name = ? AND uh.timestamp BETWEEN ? AND ? GROUP BY datetime((strftime('%s', uh.timestamp) / {interval}) * {interval}, 'unixepoch') ORDER BY timestamp ASC ''' elif is_aggregated: query = f''' SELECT t.timestamp, t.bytes_received, t.bytes_sent, 0 as bytes_received_rate_mbps, 0 as bytes_sent_rate_mbps FROM {target_table} t JOIN clients c ON t.client_id = c.id WHERE c.common_name = ? AND t.timestamp BETWEEN ? AND ? ORDER BY t.timestamp ASC ''' else: query = f''' SELECT uh.timestamp, uh.bytes_received, uh.bytes_sent, uh.bytes_received_rate_mbps, uh.bytes_sent_rate_mbps FROM usage_history uh JOIN clients c ON uh.client_id = c.id WHERE c.common_name = ? AND uh.timestamp BETWEEN ? AND ? ORDER BY uh.timestamp ASC ''' s_str = start_date.strftime('%Y-%m-%d %H:%M:%S') e_str = end_date.strftime('%Y-%m-%d %H:%M:%S') cursor.execute(query, (common_name, s_str, e_str)) columns = [column[0] for column in cursor.description] db_data_list = [dict(zip(columns, row)) for row in cursor.fetchall()] final_data = db_data_list if is_high_res: # Zero-filling final_data = [] db_data_map = {row['timestamp']: row for row in db_data_list} # Align to nearest interval ts_end = end_date.timestamp() ts_aligned = ts_end - (ts_end % interval) aligned_end = datetime.utcfromtimestamp(ts_aligned) # Generate points start_generated = aligned_end - timedelta(seconds=points_count * interval) current = start_generated for _ in range(points_count): current += timedelta(seconds=interval) ts_str = current.strftime('%Y-%m-%d %H:%M:%S') ts_iso = ts_str.replace(' ', 'T') + 'Z' if ts_str in db_data_map: item = db_data_map[ts_str].copy() item['timestamp'] = ts_iso final_data.append(item) else: final_data.append({ 'timestamp': ts_iso, 'bytes_received': 0, 'bytes_sent': 0, 'bytes_received_rate_mbps': 0, 'bytes_sent_rate_mbps': 0 }) else: for item in final_data: if 'timestamp' in item and isinstance(item['timestamp'], str): item['timestamp'] = item['timestamp'].replace(' ', 'T') + 'Z' return { 'data': final_data, 'meta': { 'resolution_used': target_table + ('_hires' if is_high_res else ''), 'record_count': len(final_data), 'start': s_str, 'end': e_str } } except Exception as e: logger.error(f"Error fetching history: {e}") return {'data': [], 'error': str(e)} finally: conn.close() def get_system_stats(self): """Общая статистика по системе""" conn = self.get_db_connection() cursor = conn.cursor() try: cursor.execute(''' SELECT COUNT(*) as total_clients, SUM(CASE WHEN status = 'Active' THEN 1 ELSE 0 END) as active_clients, COALESCE(SUM(total_bytes_received), 0) as total_bytes_received, COALESCE(SUM(total_bytes_sent), 0) as total_bytes_sent FROM clients ''') result = cursor.fetchone() columns = [column[0] for column in cursor.description] if result: stats = dict(zip(columns, result)) # Добавляем человекочитаемые форматы stats['total_received_gb'] = round(stats['total_bytes_received'] / (1024**3), 2) stats['total_sent_gb'] = round(stats['total_bytes_sent'] / (1024**3), 2) return stats return {} except Exception as e: logger.error(f"Error system stats: {e}") return {} finally: conn.close() def get_analytics_data(self, range_arg='24h'): """ Get aggregated analytics with dynamic resolution. range_arg: '24h', '7d', '30d' """ conn = self.get_db_connection() cursor = conn.cursor() analytics = { 'max_concurrent_24h': 0, 'top_clients_24h': [], 'global_history_24h': [], 'traffic_distribution': {'rx': 0, 'tx': 0} } # 1. Configuration hours = 24 interval_seconds = 900 # 15 min default target_table = 'usage_history' if range_arg == '7d': hours = 168 interval_seconds = 6300 # 105 min -> 96 points target_table = 'stats_hourly' elif range_arg == '30d': hours = 720 interval_seconds = 27000 # 450 min -> 96 points target_table = 'stats_hourly' # Fallback to hourly/raw as needed # Fallback logic for table existence try: cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{target_table}'") if not cursor.fetchone(): target_table = 'usage_history' # Fallback to raw if aggregated missing except: target_table = 'usage_history' try: # 2. Global History (Chart) if target_table == 'usage_history': rate_cols = "SUM(bytes_received_rate_mbps) as total_rx_rate, SUM(bytes_sent_rate_mbps) as total_tx_rate," else: rate_cols = "0 as total_rx_rate, 0 as total_tx_rate," # Aggregation Query # Group by interval_seconds query_hist = f''' SELECT datetime((strftime('%s', timestamp) / {interval_seconds}) * {interval_seconds}, 'unixepoch') as timestamp, SUM(total_rx) as total_rx, SUM(total_tx) as total_tx, MAX(total_rx_rate) as total_rx_rate, MAX(total_tx_rate) as total_tx_rate, MAX(active_count) as active_count FROM ( SELECT timestamp, SUM(bytes_received) as total_rx, SUM(bytes_sent) as total_tx, {rate_cols} COUNT(DISTINCT client_id) as active_count FROM {target_table} WHERE timestamp >= datetime('now', '-{hours} hours') GROUP BY timestamp ) sub GROUP BY datetime((strftime('%s', timestamp) / {interval_seconds}) * {interval_seconds}, 'unixepoch') ORDER BY timestamp ASC ''' cursor.execute(query_hist) rows = cursor.fetchall() columns = [col[0] for col in cursor.description] db_data = {row[0]: dict(zip(columns, row)) for row in rows} # Post-processing: Zero Fill analytics['global_history_24h'] = [] now = datetime.now(timezone.utc) # Round down to nearest interval ts_now = now.timestamp() ts_aligned = ts_now - (ts_now % interval_seconds) now_aligned = datetime.utcfromtimestamp(ts_aligned) # We want exactly 96 points ending at now_aligned # Start time = now_aligned - (96 * interval) start_time = now_aligned - timedelta(seconds=96 * interval_seconds) current = start_time # Generate exactly 96 points for _ in range(96): current += timedelta(seconds=interval_seconds) ts_str = current.strftime('%Y-%m-%d %H:%M:%S') ts_iso = ts_str.replace(' ', 'T') + 'Z' if ts_str in db_data: item = db_data[ts_str].copy() item['timestamp'] = ts_iso analytics['global_history_24h'].append(item) else: analytics['global_history_24h'].append({ 'timestamp': ts_iso, 'total_rx': 0, 'total_tx': 0, 'total_rx_rate': 0, 'total_tx_rate': 0, 'active_count': 0 }) # Max Clients metric max_clients = 0 for row in analytics['global_history_24h']: if row.get('active_count', 0) > max_clients: max_clients = row['active_count'] analytics['max_concurrent_24h'] = max_clients # 3. Top Clients & 4. Traffic Distribution (Keep existing logic) # Use same target table query_top = f''' SELECT c.common_name, SUM(t.bytes_received) as rx, SUM(t.bytes_sent) as tx, (SUM(t.bytes_received) + SUM(t.bytes_sent)) as total_traffic FROM {target_table} t JOIN clients c ON t.client_id = c.id WHERE t.timestamp >= datetime('now', '-{hours} hours') GROUP BY c.id ORDER BY total_traffic DESC LIMIT 10 ''' cursor.execute(query_top) top_cols = [col[0] for col in cursor.description] analytics['top_clients_24h'] = [dict(zip(top_cols, row)) for row in cursor.fetchall()] query_dist = f''' SELECT SUM(bytes_received) as rx, SUM(bytes_sent) as tx FROM {target_table} WHERE timestamp >= datetime('now', '-{hours} hours') ''' cursor.execute(query_dist) dist_res = cursor.fetchone() if dist_res: analytics['traffic_distribution'] = {'rx': dist_res[0] or 0, 'tx': dist_res[1] or 0} return analytics except Exception as e: logger.error(f"Analytics error: {e}") return analytics finally: conn.close() def get_active_sessions(self): """Get list of currently active sessions from temporary table""" conn = self.get_db_connection() conn.row_factory = sqlite3.Row cursor = conn.cursor() try: # Check if table exists first (graceful degradation) cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='active_sessions'") if not cursor.fetchone(): return [] cursor.execute(''' SELECT client_id, common_name, real_address, bytes_received, bytes_sent, connected_since, last_seen FROM active_sessions ORDER BY connected_since DESC ''') rows = cursor.fetchall() result = [] for row in rows: result.append({ 'client_id': row['client_id'], 'common_name': row['common_name'], 'real_address': row['real_address'], 'bytes_received': row['bytes_received'], 'bytes_sent': row['bytes_sent'], 'connected_since': row['connected_since'], 'last_seen': row['last_seen'], # Calculated fields for convenience 'received_mb': round((row['bytes_received'] or 0) / (1024*1024), 2), 'sent_mb': round((row['bytes_sent'] or 0) / (1024*1024), 2) }) return result except Exception as e: logger.error(f"Error fetching active sessions: {e}") return [] finally: conn.close() # Initialize API instance api = OpenVPNAPI() # --- SECURITY DECORATORS --- def token_required(f): @wraps(f) def decorated(*args, **kwargs): token = None if 'Authorization' in request.headers: auth_header = request.headers['Authorization'] if auth_header.startswith('Bearer '): token = auth_header.split(' ')[1] if not token: return jsonify({'success': False, 'error': 'Token is missing'}), 401 try: data = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) # In a real app, you might want to verify user still exists in DB except jwt.ExpiredSignatureError: return jsonify({'success': False, 'error': 'Token has expired'}), 401 except Exception: return jsonify({'success': False, 'error': 'Token is invalid'}), 401 return f(*args, **kwargs) return decorated # --- AUTH ROUTES --- @app.route('/api/auth/login', methods=['POST']) def login(): data = request.get_json() if not data or not data.get('username') or not data.get('password'): return jsonify({'success': False, 'error': 'Missing credentials'}), 400 ip = request.remote_addr if not api.check_rate_limit(ip): return jsonify({'success': False, 'error': 'Too many login attempts. Try again in 15 minutes.'}), 429 conn = api.get_db_connection() cursor = conn.cursor() try: cursor.execute("SELECT id, password_hash, totp_secret, is_2fa_enabled FROM users WHERE username = ?", (data['username'],)) user = cursor.fetchone() if user and bcrypt.checkpw(data['password'].encode('utf-8'), user[1].encode('utf-8')): api.record_login_attempt(ip, True) # If 2FA enabled, don't issue final token yet if user[3]: # is_2fa_enabled # Issue a short-lived temporary token for 2FA verification temp_token = jwt.encode({ 'user_id': user[0], 'is_2fa_pending': True, 'exp': datetime.utcnow() + timedelta(minutes=5) }, app.config['SECRET_KEY'], algorithm="HS256") return jsonify({'success': True, 'requires_2fa': True, 'temp_token': temp_token}) # Standard login token = jwt.encode({ 'user_id': user[0], 'exp': datetime.utcnow() + timedelta(hours=8) }, app.config['SECRET_KEY'], algorithm="HS256") return jsonify({'success': True, 'token': token, 'username': data['username']}) api.record_login_attempt(ip, False) return jsonify({'success': False, 'error': 'Invalid username or password'}), 401 except Exception as e: logger.error(f"Login error: {e}") return jsonify({'success': False, 'error': 'Internal server error'}), 500 finally: conn.close() @app.route('/api/auth/verify-2fa', methods=['POST']) def verify_2fa(): data = request.get_json() token = data.get('temp_token') otp = data.get('otp') if not token or not otp: return jsonify({'success': False, 'error': 'Missing data'}), 400 try: decoded = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) if not decoded.get('is_2fa_pending'): raise ValueError("Invalid token type") user_id = decoded['user_id'] conn = api.get_db_connection() cursor = conn.cursor() cursor.execute("SELECT username, totp_secret FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() conn.close() if not user: return jsonify({'success': False, 'error': 'User not found'}), 404 totp = pyotp.TOTP(user[1]) if totp.verify(otp): final_token = jwt.encode({ 'user_id': user_id, 'exp': datetime.utcnow() + timedelta(hours=8) }, app.config['SECRET_KEY'], algorithm="HS256") return jsonify({'success': True, 'token': final_token, 'username': user[0]}) return jsonify({'success': False, 'error': 'Invalid 2FA code'}), 401 except Exception as e: return jsonify({'success': False, 'error': 'Session expired or invalid'}), 401 @app.route('/api/auth/setup-2fa', methods=['POST']) @token_required def setup_2fa(): # This route is called to generate a new secret token = request.headers['Authorization'].split(' ')[1] decoded = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) user_id = decoded['user_id'] secret = pyotp.random_base32() totp = pyotp.TOTP(secret) provisioning_uri = totp.provisioning_uri(name="admin", issuer_name="OpenVPN-Monitor") return jsonify({ 'success': True, 'secret': secret, 'uri': provisioning_uri }) @app.route('/api/auth/enable-2fa', methods=['POST']) @token_required def enable_2fa(): try: data = request.get_json() secret = data.get('secret') otp = data.get('otp') if not secret or not otp: return jsonify({'success': False, 'error': 'Missing data'}), 400 logger.info(f"Attempting 2FA activation. User OTP: {otp}, Secret: {secret}") totp = pyotp.TOTP(secret) # Adding valid_window=1 to allow ±30 seconds clock drift if totp.verify(otp, valid_window=1): token = request.headers['Authorization'].split(' ')[1] decoded = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) user_id = decoded['user_id'] conn = api.get_db_connection() cursor = conn.cursor() cursor.execute("UPDATE users SET totp_secret = ?, is_2fa_enabled = 1 WHERE id = ?", (secret, user_id)) conn.commit() conn.close() logger.info(f"2FA enabled successfully for user ID: {user_id}") return jsonify({'success': True, 'message': '2FA enabled successfully'}) # TIME DRIFT DIAGNOSTIC current_utc = datetime.now(timezone.utc) logger.warning(f"Invalid 2FA code provided. Server time (UTC): {current_utc.strftime('%Y-%m-%d %H:%M:%S')}") # Check if code matches a different hour (Common timezone issue) found_drift = False for h in range(-12, 13): if h == 0: continue if totp.verify(otp, for_time=(current_utc + timedelta(hours=h))): logger.error(f"CRITICAL TIME MISMATCH: The provided OTP matches server time WITH A {h} HOUR OFFSET. " f"Please ensure the phone is set to 'Automatic Date and Time' and the correct Timezone.") found_drift = True break if not found_drift: logger.info("No simple hour-offset matches found. The code might be for a different secret or time is completely desynced.") return jsonify({'success': False, 'error': 'Invalid 2FA code. Check your phone clock synchronization.'}), 400 except Exception as e: logger.error(f"Error in enable_2fa: {e}") return jsonify({'success': False, 'error': 'Internal server error'}), 500 @app.route('/api/auth/disable-2fa', methods=['POST']) @token_required def disable_2fa(): try: token = request.headers['Authorization'].split(' ')[1] decoded = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) user_id = decoded['user_id'] conn = api.get_db_connection() cursor = conn.cursor() cursor.execute("UPDATE users SET totp_secret = NULL, is_2fa_enabled = 0 WHERE id = ?", (user_id,)) conn.commit() conn.close() logger.info(f"2FA disabled for user ID: {user_id}") return jsonify({'success': True, 'message': '2FA disabled successfully'}) except Exception as e: logger.error(f"Error in disable_2fa: {e}") return jsonify({'success': False, 'error': 'Internal server error'}), 500 @app.route('/api/auth/change-password', methods=['POST']) @token_required def change_password(): data = request.get_json() current_password = data.get('current_password') new_password = data.get('new_password') if not current_password or not new_password: return jsonify({'success': False, 'error': 'Missing password data'}), 400 token = request.headers['Authorization'].split(' ')[1] decoded = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) user_id = decoded['user_id'] conn = api.get_db_connection() cursor = conn.cursor() try: cursor.execute("SELECT password_hash FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or not bcrypt.checkpw(current_password.encode('utf-8'), user[0].encode('utf-8')): return jsonify({'success': False, 'error': 'Invalid current password'}), 401 new_hash = bcrypt.hashpw(new_password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') cursor.execute("UPDATE users SET password_hash = ? WHERE id = ?", (new_hash, user_id)) conn.commit() logger.info(f"Password changed for user ID: {user_id}") return jsonify({'success': True, 'message': 'Password changed successfully'}) except Exception as e: logger.error(f"Error changing password: {e}") return jsonify({'success': False, 'error': 'Internal server error'}), 500 finally: conn.close() # --- USER ROUTES --- @app.route('/api/v1/user/me', methods=['GET']) @token_required def get_me(): try: token = request.headers['Authorization'].split(' ')[1] decoded = jwt.decode(token, app.config['SECRET_KEY'], algorithms=["HS256"]) user_id = decoded['user_id'] conn = api.get_db_connection() cursor = conn.cursor() cursor.execute("SELECT username, is_2fa_enabled FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() conn.close() if not user: return jsonify({'success': False, 'error': 'User not found'}), 404 return jsonify({ 'success': True, 'username': user[0], 'is_2fa_enabled': bool(user[1]) }) except Exception as e: logger.error(f"Error in get_me: {e}") return jsonify({'success': False, 'error': 'Internal server error'}), 500 # --- PROTECTED ROUTES --- @app.route('/api/v1/stats', methods=['GET']) @token_required def get_stats(): """Get current statistics for all clients""" try: data = api.get_current_stats() # Форматирование данных formatted_data = [] for client in data: client['total_received_mb'] = round((client['total_bytes_received'] or 0) / (1024*1024), 2) client['total_sent_mb'] = round((client['total_bytes_sent'] or 0) / (1024*1024), 2) client['current_recv_rate_mbps'] = client['current_recv_rate'] or 0 client['current_sent_rate_mbps'] = client['current_sent_rate'] or 0 formatted_data.append(client) return jsonify({ 'success': True, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'data': formatted_data, 'count': len(formatted_data) }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/v1/stats/system', methods=['GET']) @token_required def get_system_stats(): """Get system-wide statistics""" try: stats = api.get_system_stats() return jsonify({ 'success': True, 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'data': stats }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/v1/stats/', methods=['GET']) @token_required def get_client_stats(common_name): """ Get detailed stats for a client. Query Params: - range: '24h' (default), '7d', '30d', '1y' OR custom dates - resolution: 'auto' (default), 'raw', '5min', 'hourly', 'daily' """ try: # Чтение параметров запроса range_arg = request.args.get('range', default='24h') resolution = request.args.get('resolution', default='auto') # --- ИСПРАВЛЕНИЕ ТУТ --- # Используем UTC, так как SQLite хранит данные в UTC end_date = datetime.now(timezone.utc) start_date = end_date - timedelta(hours=24) # Парсинг диапазона if range_arg.endswith('h'): start_date = end_date - timedelta(hours=int(range_arg[:-1])) elif range_arg.endswith('d'): start_date = end_date - timedelta(days=int(range_arg[:-1])) elif range_arg.endswith('y'): start_date = end_date - timedelta(days=int(range_arg[:-1]) * 365) # Получаем текущее состояние all_stats = api.get_current_stats() client_data = next((c for c in all_stats if c['common_name'] == common_name), None) if not client_data: return jsonify({'success': False, 'error': 'Client not found'}), 404 # Получаем исторические данные history_result = api.get_client_history( common_name, start_date=start_date, end_date=end_date, resolution=resolution ) response = { 'common_name': client_data['common_name'], 'real_address': client_data['real_address'], 'status': client_data['status'], 'totals': { 'received_mb': round((client_data['total_bytes_received'] or 0) / (1024*1024), 2), 'sent_mb': round((client_data['total_bytes_sent'] or 0) / (1024*1024), 2) }, 'current_rates': { 'recv_mbps': client_data['current_recv_rate'] or 0, 'sent_mbps': client_data['current_sent_rate'] or 0 }, 'last_activity': client_data['last_activity'], 'history': history_result.get('data', []), 'meta': history_result.get('meta', {}) } # Для timestamp ответа API лучше тоже использовать UTC или явно указывать смещение, # но для совместимости с JS new Date() UTC строка идеальна. return jsonify({ 'success': True, 'timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), 'data': response }) except Exception as e: logger.error(f"API Error: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/v1/certificates', methods=['GET']) @token_required def get_certificates(): try: data = api.get_certificates_info() return jsonify({'success': True, 'data': data}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/v1/clients', methods=['GET']) @token_required def get_clients_list(): try: data = api.get_current_stats() simple_list = [{'common_name': c['common_name'], 'status': c['status']} for c in data] return jsonify({'success': True, 'data': simple_list}) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/v1/health', methods=['GET']) def health_check(): try: conn = api.get_db_connection() conn.close() return jsonify({'success': True, 'status': 'healthy'}) except Exception as e: return jsonify({'success': False, 'status': 'unhealthy', 'error': str(e)}), 500 @app.route('/api/v1/analytics', methods=['GET']) @token_required def get_analytics(): """Get dashboard analytics data""" try: range_arg = request.args.get('range', default='24h') # Маппинг для безопасности valid_ranges = {'24h': '24h', '7d': '7d', '30d': '30d'} selected_range = valid_ranges.get(range_arg, '24h') data = api.get_analytics_data(selected_range) return jsonify({ 'success': True, 'timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), 'data': data, 'range': selected_range }) except Exception as e: logger.error(f"Error in analytics endpoint: {e}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/v1/sessions', methods=['GET']) @token_required def get_sessions(): """Get all currently active sessions (real-time)""" try: data = api.get_active_sessions() return jsonify({ 'success': True, 'timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'), 'data': data, 'count': len(data) }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 500 if __name__ == "__main__": host = api.config.get('api', 'host', fallback='0.0.0.0') port = 5001 # Используем 5001, чтобы не конфликтовать, если что-то уже есть на 5000 debug = api.config.getboolean('api', 'debug', fallback=False) logger.info(f"Starting API on {host}:{port}") app.run(host=host, port=port, debug=debug)