import sqlite3 import time import os import configparser import logging from datetime import datetime, timedelta from db import DatabaseManager # --- КЛАСС АГРЕГАЦИИ ДАННЫХ (TSDB LOGIC) --- class TimeSeriesAggregator: def __init__(self, db_provider): self.db_provider = db_provider self.logger = logging.getLogger(__name__) def _upsert_bucket(self, cursor, table, timestamp, client_id, rx, tx): """ Вставляет или обновляет запись в таблицу агрегации. Использует ON CONFLICT для атомарного обновления счетчиков. """ cursor.execute(f''' INSERT INTO {table} (timestamp, client_id, bytes_received, bytes_sent) VALUES (?, ?, ?, ?) ON CONFLICT(timestamp, client_id) DO UPDATE SET bytes_received = bytes_received + excluded.bytes_received, bytes_sent = bytes_sent + excluded.bytes_sent ''', (timestamp, client_id, rx, tx)) def aggregate(self, client_updates): """ Распределяет инкременты трафика по временным слотам (5m, 15m, 1h, 6h, 1d). """ if not client_updates: return conn = self.db_provider() cursor = conn.cursor() now = datetime.utcnow() # --- РАСЧЕТ ВРЕМЕННЫХ КВАНТОВ --- # 1. Сутки (00:00:00) ts_1d = now.replace(hour=0, minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') # 2. 6 часов (00, 06, 12, 18) hour_6h = now.hour - (now.hour % 6) ts_6h = now.replace(hour=hour_6h, minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') # 3. 1 час (XX:00:00) ts_1h = now.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') # 4. 15 минут (00, 15, 30, 45) min_15m = now.minute - (now.minute % 15) ts_15m = now.replace(minute=min_15m, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') # 5. 5 минут (00, 05, 10...) min_5m = now.minute - (now.minute % 5) ts_5m = now.replace(minute=min_5m, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') try: updates_count = 0 for client in client_updates: client_id = client.get('db_id') # Пропускаем, если ID не определен if client_id is None: continue rx = client.get('bytes_received_inc', 0) tx = client.get('bytes_sent_inc', 0) # Пропускаем, если нет трафика if rx == 0 and tx == 0: continue # Запись во все уровни агрегации self._upsert_bucket(cursor, 'stats_5min', ts_5m, client_id, rx, tx) self._upsert_bucket(cursor, 'stats_15min', ts_15m, client_id, rx, tx) self._upsert_bucket(cursor, 'stats_hourly', ts_1h, client_id, rx, tx) self._upsert_bucket(cursor, 'stats_6h', ts_6h, client_id, rx, tx) self._upsert_bucket(cursor, 'stats_daily', ts_1d, client_id, rx, tx) updates_count += 1 conn.commit() # Логируем только если были обновления if updates_count > 0: self.logger.debug(f"TS Aggregation: Updated buckets for {updates_count} clients") except Exception as e: self.logger.error(f"Error in TimeSeriesAggregator: {e}") conn.rollback() finally: conn.close() # --- ОСНОВНОЙ КЛАСС --- class OpenVPNDataGatherer: def __init__(self, config_file='config.ini'): self.config = self.load_config(config_file) self.setup_logging() self.last_check_time = None # Инициализируем дату последней очистки вчерашним днем для корректного старта self.last_cleanup_date = (datetime.now() - timedelta(days=1)).date() self.last_cleanup_date = (datetime.now() - timedelta(days=1)).date() self.db_manager = DatabaseManager(config_file) self.db_manager.init_database() # Инициализация модуля агрегации # Передаем ссылку на метод подключения к БД self.ts_aggregator = TimeSeriesAggregator(self.db_manager.get_connection) # In-Memory Cache для отслеживания сессий (CN, RealAddress) -> {last_bytes...} # Используется для корректного расчета инкрементов при множественных сессиях одного CN self.session_cache = {} def load_config(self, config_file): """Загрузка конфигурации или создание дефолтной со сложной структурой""" config = configparser.ConfigParser() # Полная структура конфига согласно требованиям defaults = { 'api': { 'host': '0.0.0.0', 'port': '5000', 'debug': 'false' }, 'openvpn_monitor': { 'log_path': '/var/log/openvpn/openvpn-status.log', 'db_path': 'openvpn_monitor.db', 'check_interval': '10', # Интервал 10 секунд }, 'logging': { 'level': 'INFO', 'log_file': 'openvpn_gatherer.log' }, 'retention': { 'raw_retention_days': '7', # 1 неделя 'agg_5m_retention_days': '14', # 2 недели 'agg_15m_retention_days': '28', # 4 недели 'agg_1h_retention_days': '90', # 3 месяца 'agg_6h_retention_days': '180', # 6 месяцев 'agg_1d_retention_days': '365' # 12 месяцев }, 'visualization': {}, 'certificates': {} } try: if os.path.exists(config_file): config.read(config_file) # Проверка: если каких-то новых секций нет в старом файле, добавляем их updated = False for section, options in defaults.items(): if not config.has_section(section): config.add_section(section) updated = True for key, val in options.items(): if not config.has_option(section, key): config.set(section, key, val) updated = True if updated: with open(config_file, 'w') as f: config.write(f) print(f"Updated configuration file: {config_file}") else: # Создаем файл с нуля for section, options in defaults.items(): config[section] = options with open(config_file, 'w') as f: config.write(f) print(f"Created default configuration file: {config_file}") except Exception as e: print(f"Error loading config: {e}") # Fallback в памяти for section, options in defaults.items(): if not config.has_section(section): config.add_section(section) for key, val in options.items(): config.set(section, key, val) return config def setup_logging(self): try: log_level = self.config.get('logging', 'level', fallback='INFO') log_file = self.config.get('logging', 'log_file', fallback='openvpn_gatherer.log') # Создаем директорию для логов если нужно log_dir = os.path.dirname(log_file) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir) logging.basicConfig( level=getattr(logging, log_level.upper()), format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) except Exception as e: print(f"Logging setup failed: {e}") logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) def get_config_value(self, section, key, default=None): try: # Priority: ENV > Config File > Fallback # Format: OVPMON_SECTION_KEY (all uppercase, underscores for spaces/dashes) env_key = f"OVPMON_{section.upper()}_{key.upper()}".replace('-', '_').replace(' ', '_') env_val = os.getenv(env_key) if env_val is not None: return env_val return self.config.get(section, key, fallback=default) except: return default # get_db_connection and init_database removed def cleanup_old_data(self): """Очистка данных согласно retention policies в config.ini""" self.logger.info("Starting data cleanup procedure...") conn = self.db_manager.get_connection() cursor = conn.cursor() # Маппинг: Таблица -> Ключ конфига -> Дефолт (дни) retention_rules = [ ('usage_history', 'raw_retention_days', 7), ('stats_5min', 'agg_5m_retention_days', 14), ('stats_15min', 'agg_15m_retention_days', 28), ('stats_hourly', 'agg_1h_retention_days', 90), ('stats_6h', 'agg_6h_retention_days', 180), ('stats_daily', 'agg_1d_retention_days', 365), ] try: total_deleted = 0 for table, config_key, default_days in retention_rules: days = int(self.get_config_value('retention', config_key, default_days)) if days > 0: cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute(f'DELETE FROM {table} WHERE timestamp < ?', (cutoff_date,)) deleted = cursor.rowcount if deleted > 0: self.logger.info(f"Cleaned {table}: removed {deleted} records older than {days} days") total_deleted += deleted conn.commit() if total_deleted == 0: self.logger.info("Cleanup finished: nothing to delete") except Exception as e: self.logger.error(f"Cleanup Error: {e}") conn.rollback() finally: conn.close() def parse_log_file(self): """ Парсинг лога версии 2 (CSV формат). Ожидает формат: CLIENT_LIST,Common Name,Real Address,...,Bytes Received,Bytes Sent,... """ log_path = self.get_config_value('openvpn_monitor', 'log_path', '/var/log/openvpn/openvpn-status.log') clients = [] try: if not os.path.exists(log_path): self.logger.warning(f"Log file not found: {log_path}") return clients with open(log_path, 'r') as file: for line in file: line = line.strip() # Фильтруем только строки с данными клиентов if not line.startswith('CLIENT_LIST'): continue parts = line.split(',') # V2 Index Map: # 1: Common Name # 2: Real Address # 5: Bytes Received # 6: Bytes Sent if len(parts) >= 8 and parts[1] != 'Common Name': # SKIPPING 'UNDEF' CLIENTS if parts[1].strip() == 'UNDEF': continue try: client = { 'common_name': parts[1].strip(), 'real_address': parts[2].strip(), 'bytes_received': int(parts[5].strip()), 'bytes_sent': int(parts[6].strip()), 'status': 'Active' } clients.append(client) except (ValueError, IndexError) as e: self.logger.warning(f"Error parsing client line: {e}") self.logger.debug(f"Parsed {len(clients)} active clients") except Exception as e: self.logger.error(f"Error parsing log file: {e}") return clients def update_client_status_and_bytes(self, active_clients): """ Обновление статусов и расчет инкрементов трафика. Использует In-Memory Cache (self.session_cache) для корректной обработки множественных сессий одного пользователя (Ping-Pong effect fix). """ conn = self.db_manager.get_connection() cursor = conn.cursor() try: # 1. Загружаем текущее состояние CNs из БД для обновления статусов cursor.execute('SELECT id, common_name, status, total_bytes_received, total_bytes_sent FROM clients') db_clients = {row[1]: {'id': row[0], 'status': row[2]} for row in cursor.fetchall()} # Структура для агрегации инкрементов по Common Name перед записью в БД # cn -> { 'inc_rx': 0, 'inc_tx': 0, 'curr_rx': 0, 'curr_tx': 0, 'real_address': '...'} cn_updates = {} # Множество активных ключей сессий (CN, RealAddr) для очистки кэша active_session_keys = set() active_cns = set() # 2. Обрабатываем каждую активную сессию for client in active_clients: name = client['common_name'] real_addr = client['real_address'] curr_recv = client['bytes_received'] curr_sent = client['bytes_sent'] # Уникальный ключ сессии session_key = (name, real_addr) active_session_keys.add(session_key) active_cns.add(name) # --- ЛОГИКА РАСЧЕТА ДЕЛЬТЫ (In-Memory) --- if session_key in self.session_cache: prev_state = self.session_cache[session_key] prev_recv = prev_state['bytes_received'] prev_sent = prev_state['bytes_sent'] # Расчет RX if curr_recv < prev_recv: # Рестарт сессии (счетчик сбросился) inc_recv = curr_recv self.logger.info(f"Session reset detected for {session_key} (Recv)") else: inc_recv = curr_recv - prev_recv # Расчет TX if curr_sent < prev_sent: inc_sent = curr_sent self.logger.info(f"Session reset detected for {session_key} (Sent)") else: inc_sent = curr_sent - prev_sent else: # Новая сессия (или после рестарта сервиса) # Если сервиса только запустился, мы не знаем предыдущего состояния. # Чтобы избежать спайков, считаем инкремент = 0 для первого появления, # если это похоже на продолжающуюся сессию (большие числа). # Если числа маленькие (<10MB), считаем как новую. # 10 MB threshold threshold = 10 * 1024 * 1024 if curr_recv < threshold and curr_sent < threshold: inc_recv = curr_recv inc_sent = curr_sent else: # Скорее всего рестарт сервиса, пропускаем первый тик inc_recv = 0 inc_sent = 0 self.logger.debug(f"New session tracking started for {session_key}. Initializing baseline.") # Обновляем кэш if session_key not in self.session_cache: # New session self.session_cache[session_key] = { 'bytes_received': curr_recv, 'bytes_sent': curr_sent, 'last_seen': datetime.now(), 'connected_since': datetime.now() # Track start time } else: # Update existing self.session_cache[session_key]['bytes_received'] = curr_recv self.session_cache[session_key]['bytes_sent'] = curr_sent self.session_cache[session_key]['last_seen'] = datetime.now() # Добавляем в клиентский объект (для истории/графиков) # Важно: это инкремент конкретной сессии client['bytes_received_inc'] = inc_recv client['bytes_sent_inc'] = inc_sent # Ensure db_id is available for active_sessions later (populated in step 4 or from cache) # We defer writing to active_sessions until we have DB IDs client['session_key'] = session_key # --- АГРЕГАЦИЯ ДЛЯ БД (по Common Name) --- if name not in cn_updates: cn_updates[name] = { 'inc_recv': 0, 'inc_tx': 0, 'max_rx': 0, 'max_tx': 0, # Для last_bytes в БД сохраним текущие счетчики самой большой сессии (примерно) 'real_address': real_addr # Берем последний адрес } cn_updates[name]['inc_recv'] += inc_recv cn_updates[name]['inc_tx'] += inc_sent # Сохраняем "текущее" значение как максимальное из сессий, чтобы в БД last_bytes было хоть что-то осмысленное # (хотя при in-memory подходе поле last_bytes в БД теряет смысл для логики, но нужно для UI) cn_updates[name]['max_rx'] = max(cn_updates[name]['max_rx'], curr_recv) cn_updates[name]['max_tx'] = max(cn_updates[name]['max_tx'], curr_sent) cn_updates[name]['real_address'] = real_addr # 3. Очистка кэша от мертвых сессий # Создаем список ключей для удаления, чтобы не менять словарь во время итерации dead_sessions = [k for k in self.session_cache if k not in active_session_keys] for k in dead_sessions: del self.session_cache[k] self.logger.debug(f"Removed inactive session from cache: {k}") # 4. Обновление БД (Upsert Clients) for name, data in cn_updates.items(): if name in db_clients: # UPDATE db_id = db_clients[name]['id'] cursor.execute(''' UPDATE clients SET status = 'Active', real_address = ?, total_bytes_received = total_bytes_received + ?, total_bytes_sent = total_bytes_sent + ?, last_bytes_received = ?, last_bytes_sent = ?, updated_at = CURRENT_TIMESTAMP, last_activity = CURRENT_TIMESTAMP WHERE id = ? ''', ( data['real_address'], data['inc_recv'], data['inc_tx'], data['max_rx'], data['max_tx'], db_id )) # Прокидываем DB ID обратно в объекты клиентов (для TSDB) # Так как active_clients - это список сессий, ищем все сессии этого юзера for client in active_clients: if client['common_name'] == name: client['db_id'] = db_id else: # INSERT (New Client) cursor.execute(''' INSERT INTO clients (common_name, real_address, status, total_bytes_received, total_bytes_sent, last_bytes_received, last_bytes_sent) VALUES (?, ?, 'Active', 0, 0, ?, ?) ''', ( name, data['real_address'], data['max_rx'], data['max_tx'] )) new_id = cursor.lastrowid # Прокидываем ID for client in active_clients: if client['common_name'] == name: client['db_id'] = new_id # 5. Помечаем отключенных for name, db_info in db_clients.items(): if name not in active_cns and db_info['status'] == 'Active': cursor.execute(''' UPDATE clients SET status = 'Disconnected', updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (db_info['id'],)) self.logger.info(f"Client disconnected: {name}") # 6. SYNC ACTIVE SESSIONS TO DB (Snapshot) # Clear old state cursor.execute('DELETE FROM active_sessions') # Insert current state for client in active_clients: # client['db_id'] should be populated by now (from step 4) if 'db_id' in client and 'session_key' in client: sess_data = self.session_cache.get(client['session_key']) if sess_data: cursor.execute(''' INSERT INTO active_sessions (client_id, common_name, real_address, bytes_received, bytes_sent, connected_since, last_seen) VALUES (?, ?, ?, ?, ?, ?, ?) ''', ( client['db_id'], client['common_name'], client['real_address'], client['bytes_received'], client['bytes_sent'], sess_data.get('connected_since', datetime.now()), sess_data.get('last_seen', datetime.now()) )) conn.commit() except Exception as e: self.logger.error(f"Error updating client status: {e}") conn.rollback() finally: conn.close() return active_clients def calculate_rates(self, clients, time_diff): """Расчет скорости в Mbps""" if time_diff <= 0: time_diff = 1.0 # Защита от деления на 0 # Коэффициент: (байты * 8 бит) / (секунды * 1 млн) factor = 8 / (time_diff * 1_000_000) for client in clients: client['bytes_received_rate_mbps'] = client.get('bytes_received_inc', 0) * factor client['bytes_sent_rate_mbps'] = client.get('bytes_sent_inc', 0) * factor return clients def store_usage_history(self, clients): """Сохранение высокодетализированной (Raw) истории""" if not clients: return conn = self.db_manager.get_connection() cursor = conn.cursor() try: for client in clients: if client.get('db_id'): cursor.execute(''' INSERT INTO usage_history (client_id, bytes_received, bytes_sent, bytes_received_rate_mbps, bytes_sent_rate_mbps) VALUES (?, ?, ?, ?, ?) ''', ( client['db_id'], client.get('bytes_received_inc', 0), client.get('bytes_sent_inc', 0), client.get('bytes_received_rate_mbps', 0), client.get('bytes_sent_rate_mbps', 0) )) conn.commit() except Exception as e: self.logger.error(f"Error storing raw history: {e}") conn.rollback() finally: conn.close() def run_monitoring_cycle(self): """Один цикл мониторинга""" current_time = datetime.now() # 1. Получаем активных клиентов active_clients = self.parse_log_file() # 2. Обновляем статусы и считаем дельту трафика clients_with_updates = self.update_client_status_and_bytes(active_clients) if clients_with_updates: # 3. Считаем интервал времени time_diff = 10.0 # Номинал if self.last_check_time: time_diff = (current_time - self.last_check_time).total_seconds() # 4. Считаем скорости clients_rated = self.calculate_rates(clients_with_updates, time_diff) # 5. Сохраняем RAW историю (для графиков реального времени) self.store_usage_history(clients_rated) # 6. Агрегируем в TSDB (5m, 15m, 1h, 6h, 1d) self.ts_aggregator.aggregate(clients_rated) self.last_check_time = current_time # 7. Проверка необходимости очистки (раз в сутки) if current_time.date() > self.last_cleanup_date: self.logger.info("New day detected. Initiating cleanup.") self.cleanup_old_data() self.last_cleanup_date = current_time.date() def start_monitoring(self): """Запуск цикла""" interval = int(self.get_config_value('openvpn_monitor', 'check_interval', 10)) self.logger.info(f"Starting OpenVPN Monitoring. Interval: {interval}s") self.logger.info("Press Ctrl+C to stop") try: while True: self.run_monitoring_cycle() time.sleep(interval) except KeyboardInterrupt: self.logger.info("Monitoring stopped by user") except Exception as e: self.logger.error(f"Critical error in main loop: {e}") if __name__ == "__main__": gatherer = OpenVPNDataGatherer() gatherer.start_monitoring()