new awesome build

This commit is contained in:
Антон
2026-01-28 22:37:47 +03:00
parent 848646003c
commit fcb8f6bac7
119 changed files with 7291 additions and 5575 deletions

View File

@@ -0,0 +1,622 @@
import sqlite3
import time
import os
import configparser
import logging
from datetime import datetime, timedelta
from db import DatabaseManager
# --- КЛАСС АГРЕГАЦИИ ДАННЫХ (TSDB LOGIC) ---
class TimeSeriesAggregator:
def __init__(self, db_provider):
self.db_provider = db_provider
self.logger = logging.getLogger(__name__)
def _upsert_bucket(self, cursor, table, timestamp, client_id, rx, tx):
"""
Вставляет или обновляет запись в таблицу агрегации.
Использует ON CONFLICT для атомарного обновления счетчиков.
"""
cursor.execute(f'''
INSERT INTO {table} (timestamp, client_id, bytes_received, bytes_sent)
VALUES (?, ?, ?, ?)
ON CONFLICT(timestamp, client_id) DO UPDATE SET
bytes_received = bytes_received + excluded.bytes_received,
bytes_sent = bytes_sent + excluded.bytes_sent
''', (timestamp, client_id, rx, tx))
def aggregate(self, client_updates):
"""
Распределяет инкременты трафика по временным слотам (5m, 15m, 1h, 6h, 1d).
"""
if not client_updates:
return
conn = self.db_provider()
cursor = conn.cursor()
now = datetime.utcnow()
# --- РАСЧЕТ ВРЕМЕННЫХ КВАНТОВ ---
# 1. Сутки (00:00:00)
ts_1d = now.replace(hour=0, minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
# 2. 6 часов (00, 06, 12, 18)
hour_6h = now.hour - (now.hour % 6)
ts_6h = now.replace(hour=hour_6h, minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
# 3. 1 час (XX:00:00)
ts_1h = now.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
# 4. 15 минут (00, 15, 30, 45)
min_15m = now.minute - (now.minute % 15)
ts_15m = now.replace(minute=min_15m, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
# 5. 5 минут (00, 05, 10...)
min_5m = now.minute - (now.minute % 5)
ts_5m = now.replace(minute=min_5m, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
try:
updates_count = 0
for client in client_updates:
client_id = client.get('db_id')
# Пропускаем, если ID не определен
if client_id is None:
continue
rx = client.get('bytes_received_inc', 0)
tx = client.get('bytes_sent_inc', 0)
# Пропускаем, если нет трафика
if rx == 0 and tx == 0:
continue
# Запись во все уровни агрегации
self._upsert_bucket(cursor, 'stats_5min', ts_5m, client_id, rx, tx)
self._upsert_bucket(cursor, 'stats_15min', ts_15m, client_id, rx, tx)
self._upsert_bucket(cursor, 'stats_hourly', ts_1h, client_id, rx, tx)
self._upsert_bucket(cursor, 'stats_6h', ts_6h, client_id, rx, tx)
self._upsert_bucket(cursor, 'stats_daily', ts_1d, client_id, rx, tx)
updates_count += 1
conn.commit()
# Логируем только если были обновления
if updates_count > 0:
self.logger.debug(f"TS Aggregation: Updated buckets for {updates_count} clients")
except Exception as e:
self.logger.error(f"Error in TimeSeriesAggregator: {e}")
conn.rollback()
finally:
conn.close()
# --- ОСНОВНОЙ КЛАСС ---
class OpenVPNDataGatherer:
def __init__(self, config_file='config.ini'):
self.config = self.load_config(config_file)
self.setup_logging()
self.last_check_time = None
# Инициализируем дату последней очистки вчерашним днем для корректного старта
self.last_cleanup_date = (datetime.now() - timedelta(days=1)).date()
self.last_cleanup_date = (datetime.now() - timedelta(days=1)).date()
self.db_manager = DatabaseManager(config_file)
self.db_manager.init_database()
# Инициализация модуля агрегации
# Передаем ссылку на метод подключения к БД
self.ts_aggregator = TimeSeriesAggregator(self.db_manager.get_connection)
# In-Memory Cache для отслеживания сессий (CN, RealAddress) -> {last_bytes...}
# Используется для корректного расчета инкрементов при множественных сессиях одного CN
self.session_cache = {}
def load_config(self, config_file):
"""Загрузка конфигурации или создание дефолтной со сложной структурой"""
config = configparser.ConfigParser()
# Полная структура конфига согласно требованиям
defaults = {
'api': {
'host': '0.0.0.0',
'port': '5000',
'debug': 'false'
},
'openvpn_monitor': {
'log_path': '/var/log/openvpn/openvpn-status.log',
'db_path': 'openvpn_monitor.db',
'check_interval': '10', # Интервал 10 секунд
},
'logging': {
'level': 'INFO',
'log_file': 'openvpn_gatherer.log'
},
'retention': {
'raw_retention_days': '7', # 1 неделя
'agg_5m_retention_days': '14', # 2 недели
'agg_15m_retention_days': '28', # 4 недели
'agg_1h_retention_days': '90', # 3 месяца
'agg_6h_retention_days': '180', # 6 месяцев
'agg_1d_retention_days': '365' # 12 месяцев
},
'visualization': {
'refresh_interval': '5',
'max_display_rows': '50'
},
'certificates': {
'certificates_path': '/opt/ovpn/pki/issued',
'certificate_extensions': 'crt'
}
}
try:
if os.path.exists(config_file):
config.read(config_file)
# Проверка: если каких-то новых секций нет в старом файле, добавляем их
updated = False
for section, options in defaults.items():
if not config.has_section(section):
config.add_section(section)
updated = True
for key, val in options.items():
if not config.has_option(section, key):
config.set(section, key, val)
updated = True
if updated:
with open(config_file, 'w') as f:
config.write(f)
print(f"Updated configuration file: {config_file}")
else:
# Создаем файл с нуля
for section, options in defaults.items():
config[section] = options
with open(config_file, 'w') as f:
config.write(f)
print(f"Created default configuration file: {config_file}")
except Exception as e:
print(f"Error loading config: {e}")
# Fallback в памяти
for section, options in defaults.items():
if not config.has_section(section):
config.add_section(section)
for key, val in options.items():
config.set(section, key, val)
return config
def setup_logging(self):
try:
log_level = self.config.get('logging', 'level', fallback='INFO')
log_file = self.config.get('logging', 'log_file', fallback='openvpn_gatherer.log')
# Создаем директорию для логов если нужно
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
except Exception as e:
print(f"Logging setup failed: {e}")
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def get_config_value(self, section, key, default=None):
try:
return self.config.get(section, key, fallback=default)
except:
return default
# get_db_connection and init_database removed
def cleanup_old_data(self):
"""Очистка данных согласно retention policies в config.ini"""
self.logger.info("Starting data cleanup procedure...")
conn = self.db_manager.get_connection()
cursor = conn.cursor()
# Маппинг: Таблица -> Ключ конфига -> Дефолт (дни)
retention_rules = [
('usage_history', 'raw_retention_days', 7),
('stats_5min', 'agg_5m_retention_days', 14),
('stats_15min', 'agg_15m_retention_days', 28),
('stats_hourly', 'agg_1h_retention_days', 90),
('stats_6h', 'agg_6h_retention_days', 180),
('stats_daily', 'agg_1d_retention_days', 365),
]
try:
total_deleted = 0
for table, config_key, default_days in retention_rules:
days = int(self.get_config_value('retention', config_key, default_days))
if days > 0:
cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute(f'DELETE FROM {table} WHERE timestamp < ?', (cutoff_date,))
deleted = cursor.rowcount
if deleted > 0:
self.logger.info(f"Cleaned {table}: removed {deleted} records older than {days} days")
total_deleted += deleted
conn.commit()
if total_deleted == 0:
self.logger.info("Cleanup finished: nothing to delete")
except Exception as e:
self.logger.error(f"Cleanup Error: {e}")
conn.rollback()
finally:
conn.close()
def parse_log_file(self):
"""
Парсинг лога версии 2 (CSV формат).
Ожидает формат: CLIENT_LIST,Common Name,Real Address,...,Bytes Received,Bytes Sent,...
"""
log_path = self.get_config_value('openvpn_monitor', 'log_path', '/var/log/openvpn/openvpn-status.log')
clients = []
try:
if not os.path.exists(log_path):
self.logger.warning(f"Log file not found: {log_path}")
return clients
with open(log_path, 'r') as file:
for line in file:
line = line.strip()
# Фильтруем только строки с данными клиентов
if not line.startswith('CLIENT_LIST'):
continue
parts = line.split(',')
# V2 Index Map:
# 1: Common Name
# 2: Real Address
# 5: Bytes Received
# 6: Bytes Sent
if len(parts) >= 8 and parts[1] != 'Common Name':
# SKIPPING 'UNDEF' CLIENTS
if parts[1].strip() == 'UNDEF':
continue
try:
client = {
'common_name': parts[1].strip(),
'real_address': parts[2].strip(),
'bytes_received': int(parts[5].strip()),
'bytes_sent': int(parts[6].strip()),
'status': 'Active'
}
clients.append(client)
except (ValueError, IndexError) as e:
self.logger.warning(f"Error parsing client line: {e}")
self.logger.debug(f"Parsed {len(clients)} active clients")
except Exception as e:
self.logger.error(f"Error parsing log file: {e}")
return clients
def update_client_status_and_bytes(self, active_clients):
"""
Обновление статусов и расчет инкрементов трафика.
Использует In-Memory Cache (self.session_cache) для корректной обработки
множественных сессий одного пользователя (Ping-Pong effect fix).
"""
conn = self.db_manager.get_connection()
cursor = conn.cursor()
try:
# 1. Загружаем текущее состояние CNs из БД для обновления статусов
cursor.execute('SELECT id, common_name, status, total_bytes_received, total_bytes_sent FROM clients')
db_clients = {row[1]: {'id': row[0], 'status': row[2]} for row in cursor.fetchall()}
# Структура для агрегации инкрементов по Common Name перед записью в БД
# cn -> { 'inc_rx': 0, 'inc_tx': 0, 'curr_rx': 0, 'curr_tx': 0, 'real_address': '...'}
cn_updates = {}
# Множество активных ключей сессий (CN, RealAddr) для очистки кэша
active_session_keys = set()
active_cns = set()
# 2. Обрабатываем каждую активную сессию
for client in active_clients:
name = client['common_name']
real_addr = client['real_address']
curr_recv = client['bytes_received']
curr_sent = client['bytes_sent']
# Уникальный ключ сессии
session_key = (name, real_addr)
active_session_keys.add(session_key)
active_cns.add(name)
# --- ЛОГИКА РАСЧЕТА ДЕЛЬТЫ (In-Memory) ---
if session_key in self.session_cache:
prev_state = self.session_cache[session_key]
prev_recv = prev_state['bytes_received']
prev_sent = prev_state['bytes_sent']
# Расчет RX
if curr_recv < prev_recv:
# Рестарт сессии (счетчик сбросился)
inc_recv = curr_recv
self.logger.info(f"Session reset detected for {session_key} (Recv)")
else:
inc_recv = curr_recv - prev_recv
# Расчет TX
if curr_sent < prev_sent:
inc_sent = curr_sent
self.logger.info(f"Session reset detected for {session_key} (Sent)")
else:
inc_sent = curr_sent - prev_sent
else:
# Новая сессия (или после рестарта сервиса)
# Если сервиса только запустился, мы не знаем предыдущего состояния.
# Чтобы избежать спайков, считаем инкремент = 0 для первого появления,
# если это похоже на продолжающуюся сессию (большие числа).
# Если числа маленькие (<10MB), считаем как новую.
# 10 MB threshold
threshold = 10 * 1024 * 1024
if curr_recv < threshold and curr_sent < threshold:
inc_recv = curr_recv
inc_sent = curr_sent
else:
# Скорее всего рестарт сервиса, пропускаем первый тик
inc_recv = 0
inc_sent = 0
self.logger.debug(f"New session tracking started for {session_key}. Initializing baseline.")
# Обновляем кэш
if session_key not in self.session_cache:
# New session
self.session_cache[session_key] = {
'bytes_received': curr_recv,
'bytes_sent': curr_sent,
'last_seen': datetime.now(),
'connected_since': datetime.now() # Track start time
}
else:
# Update existing
self.session_cache[session_key]['bytes_received'] = curr_recv
self.session_cache[session_key]['bytes_sent'] = curr_sent
self.session_cache[session_key]['last_seen'] = datetime.now()
# Добавляем в клиентский объект (для истории/графиков)
# Важно: это инкремент конкретной сессии
client['bytes_received_inc'] = inc_recv
client['bytes_sent_inc'] = inc_sent
# Ensure db_id is available for active_sessions later (populated in step 4 or from cache)
# We defer writing to active_sessions until we have DB IDs
client['session_key'] = session_key
# --- АГРЕГАЦИЯ ДЛЯ БД (по Common Name) ---
if name not in cn_updates:
cn_updates[name] = {
'inc_recv': 0, 'inc_tx': 0,
'max_rx': 0, 'max_tx': 0, # Для last_bytes в БД сохраним текущие счетчики самой большой сессии (примерно)
'real_address': real_addr # Берем последний адрес
}
cn_updates[name]['inc_recv'] += inc_recv
cn_updates[name]['inc_tx'] += inc_sent
# Сохраняем "текущее" значение как максимальное из сессий, чтобы в БД last_bytes было хоть что-то осмысленное
# (хотя при in-memory подходе поле last_bytes в БД теряет смысл для логики, но нужно для UI)
cn_updates[name]['max_rx'] = max(cn_updates[name]['max_rx'], curr_recv)
cn_updates[name]['max_tx'] = max(cn_updates[name]['max_tx'], curr_sent)
cn_updates[name]['real_address'] = real_addr
# 3. Очистка кэша от мертвых сессий
# Создаем список ключей для удаления, чтобы не менять словарь во время итерации
dead_sessions = [k for k in self.session_cache if k not in active_session_keys]
for k in dead_sessions:
del self.session_cache[k]
self.logger.debug(f"Removed inactive session from cache: {k}")
# 4. Обновление БД (Upsert Clients)
for name, data in cn_updates.items():
if name in db_clients:
# UPDATE
db_id = db_clients[name]['id']
cursor.execute('''
UPDATE clients
SET status = 'Active',
real_address = ?,
total_bytes_received = total_bytes_received + ?,
total_bytes_sent = total_bytes_sent + ?,
last_bytes_received = ?,
last_bytes_sent = ?,
updated_at = CURRENT_TIMESTAMP,
last_activity = CURRENT_TIMESTAMP
WHERE id = ?
''', (
data['real_address'],
data['inc_recv'],
data['inc_tx'],
data['max_rx'],
data['max_tx'],
db_id
))
# Прокидываем DB ID обратно в объекты клиентов (для TSDB)
# Так как active_clients - это список сессий, ищем все сессии этого юзера
for client in active_clients:
if client['common_name'] == name:
client['db_id'] = db_id
else:
# INSERT (New Client)
cursor.execute('''
INSERT INTO clients
(common_name, real_address, status, total_bytes_received, total_bytes_sent, last_bytes_received, last_bytes_sent)
VALUES (?, ?, 'Active', 0, 0, ?, ?)
''', (
name,
data['real_address'],
data['max_rx'],
data['max_tx']
))
new_id = cursor.lastrowid
# Прокидываем ID
for client in active_clients:
if client['common_name'] == name:
client['db_id'] = new_id
# 5. Помечаем отключенных
for name, db_info in db_clients.items():
if name not in active_cns and db_info['status'] == 'Active':
cursor.execute('''
UPDATE clients
SET status = 'Disconnected', updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (db_info['id'],))
self.logger.info(f"Client disconnected: {name}")
# 6. SYNC ACTIVE SESSIONS TO DB (Snapshot)
# Clear old state
cursor.execute('DELETE FROM active_sessions')
# Insert current state
for client in active_clients:
# client['db_id'] should be populated by now (from step 4)
if 'db_id' in client and 'session_key' in client:
sess_data = self.session_cache.get(client['session_key'])
if sess_data:
cursor.execute('''
INSERT INTO active_sessions
(client_id, common_name, real_address, bytes_received, bytes_sent, connected_since, last_seen)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
client['db_id'],
client['common_name'],
client['real_address'],
client['bytes_received'],
client['bytes_sent'],
sess_data.get('connected_since', datetime.now()),
sess_data.get('last_seen', datetime.now())
))
conn.commit()
except Exception as e:
self.logger.error(f"Error updating client status: {e}")
conn.rollback()
finally:
conn.close()
return active_clients
def calculate_rates(self, clients, time_diff):
"""Расчет скорости в Mbps"""
if time_diff <= 0:
time_diff = 1.0 # Защита от деления на 0
# Коэффициент: (байты * 8 бит) / (секунды * 1 млн)
factor = 8 / (time_diff * 1_000_000)
for client in clients:
client['bytes_received_rate_mbps'] = client.get('bytes_received_inc', 0) * factor
client['bytes_sent_rate_mbps'] = client.get('bytes_sent_inc', 0) * factor
return clients
def store_usage_history(self, clients):
"""Сохранение высокодетализированной (Raw) истории"""
if not clients:
return
conn = self.db_manager.get_connection()
cursor = conn.cursor()
try:
for client in clients:
if client.get('db_id'):
cursor.execute('''
INSERT INTO usage_history
(client_id, bytes_received, bytes_sent, bytes_received_rate_mbps, bytes_sent_rate_mbps)
VALUES (?, ?, ?, ?, ?)
''', (
client['db_id'],
client.get('bytes_received_inc', 0),
client.get('bytes_sent_inc', 0),
client.get('bytes_received_rate_mbps', 0),
client.get('bytes_sent_rate_mbps', 0)
))
conn.commit()
except Exception as e:
self.logger.error(f"Error storing raw history: {e}")
conn.rollback()
finally:
conn.close()
def run_monitoring_cycle(self):
"""Один цикл мониторинга"""
current_time = datetime.now()
# 1. Получаем активных клиентов
active_clients = self.parse_log_file()
# 2. Обновляем статусы и считаем дельту трафика
clients_with_updates = self.update_client_status_and_bytes(active_clients)
if clients_with_updates:
# 3. Считаем интервал времени
time_diff = 10.0 # Номинал
if self.last_check_time:
time_diff = (current_time - self.last_check_time).total_seconds()
# 4. Считаем скорости
clients_rated = self.calculate_rates(clients_with_updates, time_diff)
# 5. Сохраняем RAW историю (для графиков реального времени)
self.store_usage_history(clients_rated)
# 6. Агрегируем в TSDB (5m, 15m, 1h, 6h, 1d)
self.ts_aggregator.aggregate(clients_rated)
self.last_check_time = current_time
# 7. Проверка необходимости очистки (раз в сутки)
if current_time.date() > self.last_cleanup_date:
self.logger.info("New day detected. Initiating cleanup.")
self.cleanup_old_data()
self.last_cleanup_date = current_time.date()
def start_monitoring(self):
"""Запуск цикла"""
interval = int(self.get_config_value('openvpn_monitor', 'check_interval', 10))
self.logger.info(f"Starting OpenVPN Monitoring. Interval: {interval}s")
self.logger.info("Press Ctrl+C to stop")
try:
while True:
self.run_monitoring_cycle()
time.sleep(interval)
except KeyboardInterrupt:
self.logger.info("Monitoring stopped by user")
except Exception as e:
self.logger.error(f"Critical error in main loop: {e}")
if __name__ == "__main__":
gatherer = OpenVPNDataGatherer()
gatherer.start_monitoring()