import sqlite3 import time import os import configparser import logging from datetime import datetime, timedelta from db import DatabaseManager # --- КЛАСС АГРЕГАЦИИ ДАННЫХ (TSDB LOGIC) --- class TimeSeriesAggregator: def __init__(self, db_provider): self.db_provider = db_provider self.logger = logging.getLogger(__name__) def _upsert_bucket(self, cursor, table, timestamp, client_id, rx, tx): """ Вставляет или обновляет запись в таблицу агрегации. Использует ON CONFLICT для атомарного обновления счетчиков. """ cursor.execute(f''' INSERT INTO {table} (timestamp, client_id, bytes_received, bytes_sent) VALUES (?, ?, ?, ?) ON CONFLICT(timestamp, client_id) DO UPDATE SET bytes_received = bytes_received + excluded.bytes_received, bytes_sent = bytes_sent + excluded.bytes_sent ''', (timestamp, client_id, rx, tx)) def aggregate(self, client_updates): """ Распределяет инкременты трафика по временным слотам (5m, 15m, 1h, 6h, 1d). """ if not client_updates: return conn = self.db_provider() cursor = conn.cursor() now = datetime.now() # --- РАСЧЕТ ВРЕМЕННЫХ КВАНТОВ --- # 1. Сутки (00:00:00) ts_1d = now.replace(hour=0, minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') # 2. 6 часов (00, 06, 12, 18) hour_6h = now.hour - (now.hour % 6) ts_6h = now.replace(hour=hour_6h, minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') # 3. 1 час (XX:00:00) ts_1h = now.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') # 4. 15 минут (00, 15, 30, 45) min_15m = now.minute - (now.minute % 15) ts_15m = now.replace(minute=min_15m, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') # 5. 5 минут (00, 05, 10...) min_5m = now.minute - (now.minute % 5) ts_5m = now.replace(minute=min_5m, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') try: updates_count = 0 for client in client_updates: client_id = client.get('db_id') # Пропускаем, если ID не определен if client_id is None: continue rx = client.get('bytes_received_inc', 0) tx = client.get('bytes_sent_inc', 0) # Пропускаем, если нет трафика if rx == 0 and tx == 0: continue # Запись во все уровни агрегации self._upsert_bucket(cursor, 'stats_5min', ts_5m, client_id, rx, tx) self._upsert_bucket(cursor, 'stats_15min', ts_15m, client_id, rx, tx) self._upsert_bucket(cursor, 'stats_hourly', ts_1h, client_id, rx, tx) self._upsert_bucket(cursor, 'stats_6h', ts_6h, client_id, rx, tx) self._upsert_bucket(cursor, 'stats_daily', ts_1d, client_id, rx, tx) updates_count += 1 conn.commit() # Логируем только если были обновления if updates_count > 0: self.logger.debug(f"TS Aggregation: Updated buckets for {updates_count} clients") except Exception as e: self.logger.error(f"Error in TimeSeriesAggregator: {e}") conn.rollback() finally: conn.close() # --- ОСНОВНОЙ КЛАСС --- class OpenVPNDataGatherer: def __init__(self, config_file='config.ini'): self.config = self.load_config(config_file) self.setup_logging() self.last_check_time = None # Инициализируем дату последней очистки вчерашним днем для корректного старта self.last_cleanup_date = (datetime.now() - timedelta(days=1)).date() self.last_cleanup_date = (datetime.now() - timedelta(days=1)).date() self.db_manager = DatabaseManager(config_file) self.db_manager.init_database() # Инициализация модуля агрегации # Передаем ссылку на метод подключения к БД self.ts_aggregator = TimeSeriesAggregator(self.db_manager.get_connection) def load_config(self, config_file): """Загрузка конфигурации или создание дефолтной со сложной структурой""" config = configparser.ConfigParser() # Полная структура конфига согласно требованиям defaults = { 'api': { 'host': '0.0.0.0', 'port': '5000', 'debug': 'false' }, 'openvpn_monitor': { 'log_path': '/var/log/openvpn/openvpn-status.log', 'db_path': 'openvpn_monitor.db', 'check_interval': '10', # Интервал 10 секунд }, 'logging': { 'level': 'INFO', 'log_file': 'openvpn_gatherer.log' }, 'retention': { 'raw_retention_days': '7', # 1 неделя 'agg_5m_retention_days': '14', # 2 недели 'agg_15m_retention_days': '28', # 4 недели 'agg_1h_retention_days': '90', # 3 месяца 'agg_6h_retention_days': '180', # 6 месяцев 'agg_1d_retention_days': '365' # 12 месяцев }, 'visualization': { 'refresh_interval': '5', 'max_display_rows': '50' }, 'certificates': { 'certificates_path': '/opt/ovpn/pki/issued', 'certificate_extensions': 'crt' } } try: if os.path.exists(config_file): config.read(config_file) # Проверка: если каких-то новых секций нет в старом файле, добавляем их updated = False for section, options in defaults.items(): if not config.has_section(section): config.add_section(section) updated = True for key, val in options.items(): if not config.has_option(section, key): config.set(section, key, val) updated = True if updated: with open(config_file, 'w') as f: config.write(f) print(f"Updated configuration file: {config_file}") else: # Создаем файл с нуля for section, options in defaults.items(): config[section] = options with open(config_file, 'w') as f: config.write(f) print(f"Created default configuration file: {config_file}") except Exception as e: print(f"Error loading config: {e}") # Fallback в памяти for section, options in defaults.items(): if not config.has_section(section): config.add_section(section) for key, val in options.items(): config.set(section, key, val) return config def setup_logging(self): try: log_level = self.config.get('logging', 'level', fallback='INFO') log_file = self.config.get('logging', 'log_file', fallback='openvpn_gatherer.log') # Создаем директорию для логов если нужно log_dir = os.path.dirname(log_file) if log_dir and not os.path.exists(log_dir): os.makedirs(log_dir) logging.basicConfig( level=getattr(logging, log_level.upper()), format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) except Exception as e: print(f"Logging setup failed: {e}") logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) def get_config_value(self, section, key, default=None): try: return self.config.get(section, key, fallback=default) except: return default # get_db_connection and init_database removed def cleanup_old_data(self): """Очистка данных согласно retention policies в config.ini""" self.logger.info("Starting data cleanup procedure...") conn = self.db_manager.get_connection() cursor = conn.cursor() # Маппинг: Таблица -> Ключ конфига -> Дефолт (дни) retention_rules = [ ('usage_history', 'raw_retention_days', 7), ('stats_5min', 'agg_5m_retention_days', 14), ('stats_15min', 'agg_15m_retention_days', 28), ('stats_hourly', 'agg_1h_retention_days', 90), ('stats_6h', 'agg_6h_retention_days', 180), ('stats_daily', 'agg_1d_retention_days', 365), ] try: total_deleted = 0 for table, config_key, default_days in retention_rules: days = int(self.get_config_value('retention', config_key, default_days)) if days > 0: cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S') cursor.execute(f'DELETE FROM {table} WHERE timestamp < ?', (cutoff_date,)) deleted = cursor.rowcount if deleted > 0: self.logger.info(f"Cleaned {table}: removed {deleted} records older than {days} days") total_deleted += deleted conn.commit() if total_deleted == 0: self.logger.info("Cleanup finished: nothing to delete") except Exception as e: self.logger.error(f"Cleanup Error: {e}") conn.rollback() finally: conn.close() def parse_log_file(self): """ Парсинг лога версии 2 (CSV формат). Ожидает формат: CLIENT_LIST,Common Name,Real Address,...,Bytes Received,Bytes Sent,... """ log_path = self.get_config_value('openvpn_monitor', 'log_path', '/var/log/openvpn/openvpn-status.log') clients = [] try: if not os.path.exists(log_path): self.logger.warning(f"Log file not found: {log_path}") return clients with open(log_path, 'r') as file: for line in file: line = line.strip() # Фильтруем только строки с данными клиентов if not line.startswith('CLIENT_LIST'): continue parts = line.split(',') # V2 Index Map: # 1: Common Name # 2: Real Address # 5: Bytes Received # 6: Bytes Sent if len(parts) >= 8 and parts[1] != 'Common Name': try: client = { 'common_name': parts[1].strip(), 'real_address': parts[2].strip(), 'bytes_received': int(parts[5].strip()), 'bytes_sent': int(parts[6].strip()), 'status': 'Active' } clients.append(client) except (ValueError, IndexError) as e: self.logger.warning(f"Error parsing client line: {e}") self.logger.debug(f"Parsed {len(clients)} active clients") except Exception as e: self.logger.error(f"Error parsing log file: {e}") return clients def update_client_status_and_bytes(self, active_clients): """Обновление статусов и расчет инкрементов трафика""" conn = self.db_manager.get_connection() cursor = conn.cursor() try: # Загружаем текущее состояние всех клиентов cursor.execute('SELECT id, common_name, status, last_bytes_received, last_bytes_sent FROM clients') db_clients = {} for row in cursor.fetchall(): db_clients[row[1]] = { 'id': row[0], 'status': row[2], 'last_bytes_received': row[3], 'last_bytes_sent': row[4] } active_names = set() for client in active_clients: name = client['common_name'] active_names.add(name) curr_recv = client['bytes_received'] curr_sent = client['bytes_sent'] if name in db_clients: # Клиент существует в базе db_client = db_clients[name] client['db_id'] = db_client['id'] # ID для агрегатора и истории # Проверка на рестарт сервера/сессии (сброс счетчиков) # Если текущее значение меньше сохраненного, значит был сброс -> берем всё текущее значение как дельту if curr_recv < db_client['last_bytes_received']: inc_recv = curr_recv self.logger.info(f"Counter reset detected for {name} (Recv)") else: inc_recv = curr_recv - db_client['last_bytes_received'] if curr_sent < db_client['last_bytes_sent']: inc_sent = curr_sent self.logger.info(f"Counter reset detected for {name} (Sent)") else: inc_sent = curr_sent - db_client['last_bytes_sent'] # Обновляем клиента cursor.execute(''' UPDATE clients SET status = 'Active', real_address = ?, total_bytes_received = total_bytes_received + ?, total_bytes_sent = total_bytes_sent + ?, last_bytes_received = ?, last_bytes_sent = ?, updated_at = CURRENT_TIMESTAMP, last_activity = CURRENT_TIMESTAMP WHERE id = ? ''', ( client['real_address'], inc_recv, inc_sent, curr_recv, curr_sent, db_client['id'] )) client['bytes_received_inc'] = inc_recv client['bytes_sent_inc'] = inc_sent else: # Новый клиент cursor.execute(''' INSERT INTO clients (common_name, real_address, status, total_bytes_received, total_bytes_sent, last_bytes_received, last_bytes_sent) VALUES (?, ?, 'Active', 0, 0, ?, ?) ''', ( name, client['real_address'], curr_recv, curr_sent )) new_id = cursor.lastrowid client['db_id'] = new_id # Для первой записи считаем инкремент 0 (или можно считать весь трафик) client['bytes_received_inc'] = 0 client['bytes_sent_inc'] = 0 self.logger.info(f"New client added: {name}") # Помечаем отключенных for name, db_client in db_clients.items(): if name not in active_names and db_client['status'] == 'Active': cursor.execute(''' UPDATE clients SET status = 'Disconnected', updated_at = CURRENT_TIMESTAMP WHERE id = ? ''', (db_client['id'],)) self.logger.info(f"Client disconnected: {name}") conn.commit() except Exception as e: self.logger.error(f"Error updating client status: {e}") conn.rollback() finally: conn.close() return active_clients def calculate_rates(self, clients, time_diff): """Расчет скорости в Mbps""" if time_diff <= 0: time_diff = 1.0 # Защита от деления на 0 # Коэффициент: (байты * 8 бит) / (секунды * 1 млн) factor = 8 / (time_diff * 1_000_000) for client in clients: client['bytes_received_rate_mbps'] = client.get('bytes_received_inc', 0) * factor client['bytes_sent_rate_mbps'] = client.get('bytes_sent_inc', 0) * factor return clients def store_usage_history(self, clients): """Сохранение высокодетализированной (Raw) истории""" if not clients: return conn = self.db_manager.get_connection() cursor = conn.cursor() try: for client in clients: if client.get('db_id'): cursor.execute(''' INSERT INTO usage_history (client_id, bytes_received, bytes_sent, bytes_received_rate_mbps, bytes_sent_rate_mbps) VALUES (?, ?, ?, ?, ?) ''', ( client['db_id'], client.get('bytes_received_inc', 0), client.get('bytes_sent_inc', 0), client.get('bytes_received_rate_mbps', 0), client.get('bytes_sent_rate_mbps', 0) )) conn.commit() except Exception as e: self.logger.error(f"Error storing raw history: {e}") conn.rollback() finally: conn.close() def run_monitoring_cycle(self): """Один цикл мониторинга""" current_time = datetime.now() # 1. Получаем активных клиентов active_clients = self.parse_log_file() # 2. Обновляем статусы и считаем дельту трафика clients_with_updates = self.update_client_status_and_bytes(active_clients) if clients_with_updates: # 3. Считаем интервал времени time_diff = 10.0 # Номинал if self.last_check_time: time_diff = (current_time - self.last_check_time).total_seconds() # 4. Считаем скорости clients_rated = self.calculate_rates(clients_with_updates, time_diff) # 5. Сохраняем RAW историю (для графиков реального времени) self.store_usage_history(clients_rated) # 6. Агрегируем в TSDB (5m, 15m, 1h, 6h, 1d) self.ts_aggregator.aggregate(clients_rated) self.last_check_time = current_time # 7. Проверка необходимости очистки (раз в сутки) if current_time.date() > self.last_cleanup_date: self.logger.info("New day detected. Initiating cleanup.") self.cleanup_old_data() self.last_cleanup_date = current_time.date() def start_monitoring(self): """Запуск цикла""" interval = int(self.get_config_value('openvpn_monitor', 'check_interval', 10)) self.logger.info(f"Starting OpenVPN Monitoring. Interval: {interval}s") self.logger.info("Press Ctrl+C to stop") try: while True: self.run_monitoring_cycle() time.sleep(interval) except KeyboardInterrupt: self.logger.info("Monitoring stopped by user") except Exception as e: self.logger.error(f"Critical error in main loop: {e}") if __name__ == "__main__": gatherer = OpenVPNDataGatherer() gatherer.start_monitoring()