Files
OpenVPN-Monitoring-Simple/APP/openvpn_gatherer_v3.py
Антон c9af0a5bb1 init commit
2026-01-09 01:05:50 +03:00

511 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sqlite3
import time
import os
import configparser
import logging
from datetime import datetime, timedelta
from db import DatabaseManager
# --- КЛАСС АГРЕГАЦИИ ДАННЫХ (TSDB LOGIC) ---
class TimeSeriesAggregator:
def __init__(self, db_provider):
self.db_provider = db_provider
self.logger = logging.getLogger(__name__)
def _upsert_bucket(self, cursor, table, timestamp, client_id, rx, tx):
"""
Вставляет или обновляет запись в таблицу агрегации.
Использует ON CONFLICT для атомарного обновления счетчиков.
"""
cursor.execute(f'''
INSERT INTO {table} (timestamp, client_id, bytes_received, bytes_sent)
VALUES (?, ?, ?, ?)
ON CONFLICT(timestamp, client_id) DO UPDATE SET
bytes_received = bytes_received + excluded.bytes_received,
bytes_sent = bytes_sent + excluded.bytes_sent
''', (timestamp, client_id, rx, tx))
def aggregate(self, client_updates):
"""
Распределяет инкременты трафика по временным слотам (5m, 15m, 1h, 6h, 1d).
"""
if not client_updates:
return
conn = self.db_provider()
cursor = conn.cursor()
now = datetime.now()
# --- РАСЧЕТ ВРЕМЕННЫХ КВАНТОВ ---
# 1. Сутки (00:00:00)
ts_1d = now.replace(hour=0, minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
# 2. 6 часов (00, 06, 12, 18)
hour_6h = now.hour - (now.hour % 6)
ts_6h = now.replace(hour=hour_6h, minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
# 3. 1 час (XX:00:00)
ts_1h = now.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
# 4. 15 минут (00, 15, 30, 45)
min_15m = now.minute - (now.minute % 15)
ts_15m = now.replace(minute=min_15m, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
# 5. 5 минут (00, 05, 10...)
min_5m = now.minute - (now.minute % 5)
ts_5m = now.replace(minute=min_5m, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
try:
updates_count = 0
for client in client_updates:
client_id = client.get('db_id')
# Пропускаем, если ID не определен
if client_id is None:
continue
rx = client.get('bytes_received_inc', 0)
tx = client.get('bytes_sent_inc', 0)
# Пропускаем, если нет трафика
if rx == 0 and tx == 0:
continue
# Запись во все уровни агрегации
self._upsert_bucket(cursor, 'stats_5min', ts_5m, client_id, rx, tx)
self._upsert_bucket(cursor, 'stats_15min', ts_15m, client_id, rx, tx)
self._upsert_bucket(cursor, 'stats_hourly', ts_1h, client_id, rx, tx)
self._upsert_bucket(cursor, 'stats_6h', ts_6h, client_id, rx, tx)
self._upsert_bucket(cursor, 'stats_daily', ts_1d, client_id, rx, tx)
updates_count += 1
conn.commit()
# Логируем только если были обновления
if updates_count > 0:
self.logger.debug(f"TS Aggregation: Updated buckets for {updates_count} clients")
except Exception as e:
self.logger.error(f"Error in TimeSeriesAggregator: {e}")
conn.rollback()
finally:
conn.close()
# --- ОСНОВНОЙ КЛАСС ---
class OpenVPNDataGatherer:
def __init__(self, config_file='config.ini'):
self.config = self.load_config(config_file)
self.setup_logging()
self.last_check_time = None
# Инициализируем дату последней очистки вчерашним днем для корректного старта
self.last_cleanup_date = (datetime.now() - timedelta(days=1)).date()
self.last_cleanup_date = (datetime.now() - timedelta(days=1)).date()
self.db_manager = DatabaseManager(config_file)
self.db_manager.init_database()
# Инициализация модуля агрегации
# Передаем ссылку на метод подключения к БД
self.ts_aggregator = TimeSeriesAggregator(self.db_manager.get_connection)
def load_config(self, config_file):
"""Загрузка конфигурации или создание дефолтной со сложной структурой"""
config = configparser.ConfigParser()
# Полная структура конфига согласно требованиям
defaults = {
'api': {
'host': '0.0.0.0',
'port': '5000',
'debug': 'false'
},
'openvpn_monitor': {
'log_path': '/var/log/openvpn/openvpn-status.log',
'db_path': 'openvpn_monitor.db',
'check_interval': '10', # Интервал 10 секунд
},
'logging': {
'level': 'INFO',
'log_file': 'openvpn_gatherer.log'
},
'retention': {
'raw_retention_days': '7', # 1 неделя
'agg_5m_retention_days': '14', # 2 недели
'agg_15m_retention_days': '28', # 4 недели
'agg_1h_retention_days': '90', # 3 месяца
'agg_6h_retention_days': '180', # 6 месяцев
'agg_1d_retention_days': '365' # 12 месяцев
},
'visualization': {
'refresh_interval': '5',
'max_display_rows': '50'
},
'certificates': {
'certificates_path': '/opt/ovpn/pki/issued',
'certificate_extensions': 'crt'
}
}
try:
if os.path.exists(config_file):
config.read(config_file)
# Проверка: если каких-то новых секций нет в старом файле, добавляем их
updated = False
for section, options in defaults.items():
if not config.has_section(section):
config.add_section(section)
updated = True
for key, val in options.items():
if not config.has_option(section, key):
config.set(section, key, val)
updated = True
if updated:
with open(config_file, 'w') as f:
config.write(f)
print(f"Updated configuration file: {config_file}")
else:
# Создаем файл с нуля
for section, options in defaults.items():
config[section] = options
with open(config_file, 'w') as f:
config.write(f)
print(f"Created default configuration file: {config_file}")
except Exception as e:
print(f"Error loading config: {e}")
# Fallback в памяти
for section, options in defaults.items():
if not config.has_section(section):
config.add_section(section)
for key, val in options.items():
config.set(section, key, val)
return config
def setup_logging(self):
try:
log_level = self.config.get('logging', 'level', fallback='INFO')
log_file = self.config.get('logging', 'log_file', fallback='openvpn_gatherer.log')
# Создаем директорию для логов если нужно
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir)
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
except Exception as e:
print(f"Logging setup failed: {e}")
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def get_config_value(self, section, key, default=None):
try:
return self.config.get(section, key, fallback=default)
except:
return default
# get_db_connection and init_database removed
def cleanup_old_data(self):
"""Очистка данных согласно retention policies в config.ini"""
self.logger.info("Starting data cleanup procedure...")
conn = self.db_manager.get_connection()
cursor = conn.cursor()
# Маппинг: Таблица -> Ключ конфига -> Дефолт (дни)
retention_rules = [
('usage_history', 'raw_retention_days', 7),
('stats_5min', 'agg_5m_retention_days', 14),
('stats_15min', 'agg_15m_retention_days', 28),
('stats_hourly', 'agg_1h_retention_days', 90),
('stats_6h', 'agg_6h_retention_days', 180),
('stats_daily', 'agg_1d_retention_days', 365),
]
try:
total_deleted = 0
for table, config_key, default_days in retention_rules:
days = int(self.get_config_value('retention', config_key, default_days))
if days > 0:
cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
cursor.execute(f'DELETE FROM {table} WHERE timestamp < ?', (cutoff_date,))
deleted = cursor.rowcount
if deleted > 0:
self.logger.info(f"Cleaned {table}: removed {deleted} records older than {days} days")
total_deleted += deleted
conn.commit()
if total_deleted == 0:
self.logger.info("Cleanup finished: nothing to delete")
except Exception as e:
self.logger.error(f"Cleanup Error: {e}")
conn.rollback()
finally:
conn.close()
def parse_log_file(self):
"""
Парсинг лога версии 2 (CSV формат).
Ожидает формат: CLIENT_LIST,Common Name,Real Address,...,Bytes Received,Bytes Sent,...
"""
log_path = self.get_config_value('openvpn_monitor', 'log_path', '/var/log/openvpn/openvpn-status.log')
clients = []
try:
if not os.path.exists(log_path):
self.logger.warning(f"Log file not found: {log_path}")
return clients
with open(log_path, 'r') as file:
for line in file:
line = line.strip()
# Фильтруем только строки с данными клиентов
if not line.startswith('CLIENT_LIST'):
continue
parts = line.split(',')
# V2 Index Map:
# 1: Common Name
# 2: Real Address
# 5: Bytes Received
# 6: Bytes Sent
if len(parts) >= 8 and parts[1] != 'Common Name':
try:
client = {
'common_name': parts[1].strip(),
'real_address': parts[2].strip(),
'bytes_received': int(parts[5].strip()),
'bytes_sent': int(parts[6].strip()),
'status': 'Active'
}
clients.append(client)
except (ValueError, IndexError) as e:
self.logger.warning(f"Error parsing client line: {e}")
self.logger.debug(f"Parsed {len(clients)} active clients")
except Exception as e:
self.logger.error(f"Error parsing log file: {e}")
return clients
def update_client_status_and_bytes(self, active_clients):
"""Обновление статусов и расчет инкрементов трафика"""
conn = self.db_manager.get_connection()
cursor = conn.cursor()
try:
# Загружаем текущее состояние всех клиентов
cursor.execute('SELECT id, common_name, status, last_bytes_received, last_bytes_sent FROM clients')
db_clients = {}
for row in cursor.fetchall():
db_clients[row[1]] = {
'id': row[0],
'status': row[2],
'last_bytes_received': row[3],
'last_bytes_sent': row[4]
}
active_names = set()
for client in active_clients:
name = client['common_name']
active_names.add(name)
curr_recv = client['bytes_received']
curr_sent = client['bytes_sent']
if name in db_clients:
# Клиент существует в базе
db_client = db_clients[name]
client['db_id'] = db_client['id'] # ID для агрегатора и истории
# Проверка на рестарт сервера/сессии (сброс счетчиков)
# Если текущее значение меньше сохраненного, значит был сброс -> берем всё текущее значение как дельту
if curr_recv < db_client['last_bytes_received']:
inc_recv = curr_recv
self.logger.info(f"Counter reset detected for {name} (Recv)")
else:
inc_recv = curr_recv - db_client['last_bytes_received']
if curr_sent < db_client['last_bytes_sent']:
inc_sent = curr_sent
self.logger.info(f"Counter reset detected for {name} (Sent)")
else:
inc_sent = curr_sent - db_client['last_bytes_sent']
# Обновляем клиента
cursor.execute('''
UPDATE clients
SET status = 'Active',
real_address = ?,
total_bytes_received = total_bytes_received + ?,
total_bytes_sent = total_bytes_sent + ?,
last_bytes_received = ?,
last_bytes_sent = ?,
updated_at = CURRENT_TIMESTAMP,
last_activity = CURRENT_TIMESTAMP
WHERE id = ?
''', (
client['real_address'],
inc_recv,
inc_sent,
curr_recv,
curr_sent,
db_client['id']
))
client['bytes_received_inc'] = inc_recv
client['bytes_sent_inc'] = inc_sent
else:
# Новый клиент
cursor.execute('''
INSERT INTO clients
(common_name, real_address, status, total_bytes_received, total_bytes_sent, last_bytes_received, last_bytes_sent)
VALUES (?, ?, 'Active', 0, 0, ?, ?)
''', (
name,
client['real_address'],
curr_recv,
curr_sent
))
new_id = cursor.lastrowid
client['db_id'] = new_id
# Для первой записи считаем инкремент 0 (или можно считать весь трафик)
client['bytes_received_inc'] = 0
client['bytes_sent_inc'] = 0
self.logger.info(f"New client added: {name}")
# Помечаем отключенных
for name, db_client in db_clients.items():
if name not in active_names and db_client['status'] == 'Active':
cursor.execute('''
UPDATE clients
SET status = 'Disconnected', updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (db_client['id'],))
self.logger.info(f"Client disconnected: {name}")
conn.commit()
except Exception as e:
self.logger.error(f"Error updating client status: {e}")
conn.rollback()
finally:
conn.close()
return active_clients
def calculate_rates(self, clients, time_diff):
"""Расчет скорости в Mbps"""
if time_diff <= 0:
time_diff = 1.0 # Защита от деления на 0
# Коэффициент: (байты * 8 бит) / (секунды * 1 млн)
factor = 8 / (time_diff * 1_000_000)
for client in clients:
client['bytes_received_rate_mbps'] = client.get('bytes_received_inc', 0) * factor
client['bytes_sent_rate_mbps'] = client.get('bytes_sent_inc', 0) * factor
return clients
def store_usage_history(self, clients):
"""Сохранение высокодетализированной (Raw) истории"""
if not clients:
return
conn = self.db_manager.get_connection()
cursor = conn.cursor()
try:
for client in clients:
if client.get('db_id'):
cursor.execute('''
INSERT INTO usage_history
(client_id, bytes_received, bytes_sent, bytes_received_rate_mbps, bytes_sent_rate_mbps)
VALUES (?, ?, ?, ?, ?)
''', (
client['db_id'],
client.get('bytes_received_inc', 0),
client.get('bytes_sent_inc', 0),
client.get('bytes_received_rate_mbps', 0),
client.get('bytes_sent_rate_mbps', 0)
))
conn.commit()
except Exception as e:
self.logger.error(f"Error storing raw history: {e}")
conn.rollback()
finally:
conn.close()
def run_monitoring_cycle(self):
"""Один цикл мониторинга"""
current_time = datetime.now()
# 1. Получаем активных клиентов
active_clients = self.parse_log_file()
# 2. Обновляем статусы и считаем дельту трафика
clients_with_updates = self.update_client_status_and_bytes(active_clients)
if clients_with_updates:
# 3. Считаем интервал времени
time_diff = 10.0 # Номинал
if self.last_check_time:
time_diff = (current_time - self.last_check_time).total_seconds()
# 4. Считаем скорости
clients_rated = self.calculate_rates(clients_with_updates, time_diff)
# 5. Сохраняем RAW историю (для графиков реального времени)
self.store_usage_history(clients_rated)
# 6. Агрегируем в TSDB (5m, 15m, 1h, 6h, 1d)
self.ts_aggregator.aggregate(clients_rated)
self.last_check_time = current_time
# 7. Проверка необходимости очистки (раз в сутки)
if current_time.date() > self.last_cleanup_date:
self.logger.info("New day detected. Initiating cleanup.")
self.cleanup_old_data()
self.last_cleanup_date = current_time.date()
def start_monitoring(self):
"""Запуск цикла"""
interval = int(self.get_config_value('openvpn_monitor', 'check_interval', 10))
self.logger.info(f"Starting OpenVPN Monitoring. Interval: {interval}s")
self.logger.info("Press Ctrl+C to stop")
try:
while True:
self.run_monitoring_cycle()
time.sleep(interval)
except KeyboardInterrupt:
self.logger.info("Monitoring stopped by user")
except Exception as e:
self.logger.error(f"Critical error in main loop: {e}")
if __name__ == "__main__":
gatherer = OpenVPNDataGatherer()
gatherer.start_monitoring()