Files
OpenVPN-Monitoring-Simple/APP/openvpn_api_v3.py

716 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sqlite3
import configparser
from datetime import datetime, timedelta, timezone
from flask import Flask, jsonify, request
from flask_cors import CORS
import logging
import subprocess
import os
from pathlib import Path
import re
from db import DatabaseManager
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
class OpenVPNAPI:
def __init__(self, config_file='config.ini'):
self.db_manager = DatabaseManager(config_file)
self.config = configparser.ConfigParser()
self.config.read(config_file)
self.certificates_path = self.config.get('certificates', 'certificates_path', fallback='/etc/openvpn/certs')
self.certificates_path = self.config.get('certificates', 'certificates_path', fallback='/etc/openvpn/certs')
self.cert_extensions = self.config.get('certificates', 'certificate_extensions', fallback='crt,pem,key').split(',')
self._cert_cache = {} # Cache structure: {filepath: {'mtime': float, 'data': dict}}
def get_db_connection(self):
"""Get a database connection"""
return self.db_manager.get_connection()
# --- БЛОК РАБОТЫ С СЕРТИФИКАТАМИ (Оставлен без изменений) ---
def parse_openssl_date(self, date_str):
try:
parts = date_str.split()
if len(parts[1]) == 1:
parts[1] = f' {parts[1]}'
normalized_date = ' '.join(parts)
return datetime.strptime(normalized_date, '%b %d %H:%M:%S %Y GMT')
except ValueError:
try:
return datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z')
except ValueError:
logger.warning(f"Could not parse date: {date_str}")
return datetime.min
def calculate_days_remaining(self, not_after_str):
if not_after_str == 'N/A': return 'N/A'
try:
expiration_date = self.parse_openssl_date(not_after_str)
if expiration_date == datetime.min: return 'N/A'
days_remaining = (expiration_date - datetime.now()).days
if days_remaining < 0: return f"Expired ({abs(days_remaining)} days ago)"
else: return f"{days_remaining} days"
except Exception: return 'N/A'
def extract_cert_info(self, cert_file):
# Существующая логика парсинга через openssl
try:
result = subprocess.run(['openssl', 'x509', '-in', cert_file, '-noout', '-text'],
capture_output=True, text=True, check=True)
output = result.stdout
data = {'file': os.path.basename(cert_file), 'file_path': cert_file, 'subject': 'N/A',
'issuer': 'N/A', 'not_after': 'N/A'}
for line in output.split('\n'):
line = line.strip()
if line.startswith('Subject:'):
data['subject'] = line.split('Subject:', 1)[1].strip()
cn_match = re.search(r'CN=([^,]+)', data['subject'])
if cn_match: data['common_name'] = cn_match.group(1)
elif 'Not After' in line:
data['not_after'] = line.split(':', 1)[1].strip()
if data['not_after'] != 'N/A':
data['sort_date'] = self.parse_openssl_date(data['not_after']).isoformat()
else:
data['sort_date'] = datetime.min.isoformat()
data['days_remaining'] = self.calculate_days_remaining(data['not_after'])
data['is_expired'] = 'Expired' in data['days_remaining']
return data
except Exception as e:
logger.error(f"Error processing {cert_file}: {e}")
return None
def get_certificates_info(self):
cert_path = Path(self.certificates_path)
if not cert_path.exists(): return []
cert_files = []
for ext in self.cert_extensions:
cert_files.extend(cert_path.rglob(f'*.{ext.strip()}'))
current_valid_files = set()
cert_data = []
for cert_file_path in cert_files:
cert_file = str(cert_file_path)
current_valid_files.add(cert_file)
try:
mtime = os.path.getmtime(cert_file)
# Check cache
cached = self._cert_cache.get(cert_file)
if cached and cached['mtime'] == mtime:
cert_data.append(cached['data'])
else:
# Parse and update cache
parsed_data = self.extract_cert_info(cert_file)
if parsed_data:
self._cert_cache[cert_file] = {
'mtime': mtime,
'data': parsed_data
}
cert_data.append(parsed_data)
except OSError:
continue
# Prune cache for deleted files
for cached_file in list(self._cert_cache.keys()):
if cached_file not in current_valid_files:
del self._cert_cache[cached_file]
return cert_data
# -----------------------------------------------------------
def get_current_stats(self):
"""Get current statistics for all clients"""
conn = self.get_db_connection()
cursor = conn.cursor()
try:
# ИЗМЕНЕНИЕ:
# Вместо "ORDER BY timestamp DESC LIMIT 1" (мгновенное значение),
# мы берем "MAX(rate)" за последние 2 минуты.
# Это фильтрует "нули", возникающие из-за рассинхрона записи логов,
# и показывает реальную пропускную способность канала.
cursor.execute('''
SELECT
c.common_name,
c.real_address,
c.status,
CASE
WHEN c.status = 'Active' THEN 'N/A'
ELSE strftime('%Y-%m-%d %H:%M:%S', c.last_activity)
END as last_activity,
c.total_bytes_received,
c.total_bytes_sent,
-- Пиковая скорость Download за последние 2 минуты
(SELECT MAX(uh.bytes_received_rate_mbps)
FROM usage_history uh
WHERE uh.client_id = c.id
AND uh.timestamp >= datetime('now', '-30 seconds')) as current_recv_rate,
-- Пиковая скорость Upload за последние 2 минуты
(SELECT MAX(uh.bytes_sent_rate_mbps)
FROM usage_history uh
WHERE uh.client_id = c.id
AND uh.timestamp >= datetime('now', '-30 seconds')) as current_sent_rate,
strftime('%Y-%m-%d %H:%M:%S', c.updated_at) as last_updated
FROM clients c
ORDER BY c.status DESC, c.common_name
''')
columns = [column[0] for column in cursor.description]
data = []
for row in cursor.fetchall():
data.append(dict(zip(columns, row)))
return data
except Exception as e:
logger.error(f"Error fetching data: {e}")
return []
finally:
conn.close()
def get_client_history(self, common_name, start_date=None, end_date=None, resolution='auto'):
"""
Получение истории с поддержкой агрегации (TSDB).
Автоматически выбирает таблицу (Raw, Hourly, Daily) в зависимости от периода.
"""
conn = self.get_db_connection()
cursor = conn.cursor()
# 1. Установка временных рамок
if not end_date:
end_date = datetime.now()
if not start_date:
start_date = end_date - timedelta(hours=24) # Дефолт - сутки
# Убедимся, что даты - это объекты datetime
if isinstance(start_date, str):
try: start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
except: pass
if isinstance(end_date, str):
try: end_date = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
except: pass
duration_hours = (end_date - start_date).total_seconds() / 3600
# 2. Маппинг разрешений на таблицы
table_map = {
'raw': 'usage_history',
'5min': 'stats_5min',
'15min': 'stats_15min',
'hourly': 'stats_hourly',
'6h': 'stats_6h',
'daily': 'stats_daily'
}
target_table = 'usage_history'
# 3. Логика выбора таблицы
if resolution == 'auto':
if duration_hours <= 24:
target_table = 'usage_history' # Сырые данные (график за день)
elif duration_hours <= 168: # до 7 дней
target_table = 'stats_hourly' # По часам
elif duration_hours <= 2160: # до 3 месяцев
target_table = 'stats_6h' # Каждые 6 часов
else:
target_table = 'stats_daily' # По дням
elif resolution in table_map:
target_table = table_map[resolution]
# Проверка существования таблицы (fallback, если миграции не было)
try:
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{target_table}'")
if not cursor.fetchone():
logger.warning(f"Table {target_table} missing, fallback to usage_history")
target_table = 'usage_history'
except:
pass
try:
# 4. Request Formation
is_aggregated = target_table != 'usage_history'
# High resolution handling for 1h and 3h ranges
is_high_res = False
interval = 0
points_count = 0
if target_table == 'usage_history':
if duration_hours <= 1.1:
is_high_res = True
interval = 30
points_count = 120
elif duration_hours <= 3.1:
is_high_res = True
interval = 60
points_count = 180
elif duration_hours <= 6.1:
is_high_res = True
interval = 120 # 2 minutes
points_count = 180
elif duration_hours <= 12.1:
is_high_res = True
interval = 300 # 5 minutes
points_count = 144
elif duration_hours <= 24.1:
is_high_res = True
interval = 900 # 15 minutes
points_count = 96
if is_high_res:
query = f'''
SELECT
datetime((strftime('%s', uh.timestamp) / {interval}) * {interval}, 'unixepoch') as timestamp,
SUM(uh.bytes_received) as bytes_received,
SUM(uh.bytes_sent) as bytes_sent,
MAX(uh.bytes_received_rate_mbps) as bytes_received_rate_mbps,
MAX(uh.bytes_sent_rate_mbps) as bytes_sent_rate_mbps
FROM usage_history uh
JOIN clients c ON uh.client_id = c.id
WHERE c.common_name = ? AND uh.timestamp BETWEEN ? AND ?
GROUP BY datetime((strftime('%s', uh.timestamp) / {interval}) * {interval}, 'unixepoch')
ORDER BY timestamp ASC
'''
elif is_aggregated:
query = f'''
SELECT
t.timestamp,
t.bytes_received,
t.bytes_sent,
0 as bytes_received_rate_mbps,
0 as bytes_sent_rate_mbps
FROM {target_table} t
JOIN clients c ON t.client_id = c.id
WHERE c.common_name = ? AND t.timestamp BETWEEN ? AND ?
ORDER BY t.timestamp ASC
'''
else:
query = f'''
SELECT
uh.timestamp,
uh.bytes_received,
uh.bytes_sent,
uh.bytes_received_rate_mbps,
uh.bytes_sent_rate_mbps
FROM usage_history uh
JOIN clients c ON uh.client_id = c.id
WHERE c.common_name = ? AND uh.timestamp BETWEEN ? AND ?
ORDER BY uh.timestamp ASC
'''
s_str = start_date.strftime('%Y-%m-%d %H:%M:%S')
e_str = end_date.strftime('%Y-%m-%d %H:%M:%S')
cursor.execute(query, (common_name, s_str, e_str))
columns = [column[0] for column in cursor.description]
db_data_list = [dict(zip(columns, row)) for row in cursor.fetchall()]
final_data = db_data_list
if is_high_res:
# Zero-filling
final_data = []
db_data_map = {row['timestamp']: row for row in db_data_list}
# Align to nearest interval
ts_end = end_date.timestamp()
ts_aligned = ts_end - (ts_end % interval)
aligned_end = datetime.utcfromtimestamp(ts_aligned)
# Generate points
start_generated = aligned_end - timedelta(seconds=points_count * interval)
current = start_generated
for _ in range(points_count):
current += timedelta(seconds=interval)
ts_str = current.strftime('%Y-%m-%d %H:%M:%S')
if ts_str in db_data_map:
final_data.append(db_data_map[ts_str])
else:
final_data.append({
'timestamp': ts_str,
'bytes_received': 0,
'bytes_sent': 0,
'bytes_received_rate_mbps': 0,
'bytes_sent_rate_mbps': 0
})
return {
'data': final_data,
'meta': {
'resolution_used': target_table + ('_hires' if is_high_res else ''),
'record_count': len(final_data),
'start': s_str,
'end': e_str
}
}
except Exception as e:
logger.error(f"Error fetching history: {e}")
return {'data': [], 'error': str(e)}
finally:
conn.close()
def get_system_stats(self):
"""Общая статистика по системе"""
conn = self.get_db_connection()
cursor = conn.cursor()
try:
cursor.execute('''
SELECT
COUNT(*) as total_clients,
SUM(CASE WHEN status = 'Active' THEN 1 ELSE 0 END) as active_clients,
COALESCE(SUM(total_bytes_received), 0) as total_bytes_received,
COALESCE(SUM(total_bytes_sent), 0) as total_bytes_sent
FROM clients
''')
result = cursor.fetchone()
columns = [column[0] for column in cursor.description]
if result:
stats = dict(zip(columns, result))
# Добавляем человекочитаемые форматы
stats['total_received_gb'] = round(stats['total_bytes_received'] / (1024**3), 2)
stats['total_sent_gb'] = round(stats['total_bytes_sent'] / (1024**3), 2)
return stats
return {}
except Exception as e:
logger.error(f"Error system stats: {e}")
return {}
finally:
conn.close()
def get_analytics_data(self, range_arg='24h'):
"""
Get aggregated analytics with dynamic resolution.
range_arg: '24h', '7d', '30d'
"""
conn = self.get_db_connection()
cursor = conn.cursor()
analytics = {
'max_concurrent_24h': 0,
'top_clients_24h': [],
'global_history_24h': [],
'traffic_distribution': {'rx': 0, 'tx': 0}
}
# 1. Configuration
hours = 24
interval_seconds = 900 # 15 min default
target_table = 'usage_history'
if range_arg == '7d':
hours = 168
interval_seconds = 6300 # 105 min -> 96 points
target_table = 'stats_hourly'
elif range_arg == '30d':
hours = 720
interval_seconds = 27000 # 450 min -> 96 points
target_table = 'stats_hourly' # Fallback to hourly/raw as needed
# Fallback logic for table existence
try:
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{target_table}'")
if not cursor.fetchone():
target_table = 'usage_history' # Fallback to raw if aggregated missing
except:
target_table = 'usage_history'
try:
# 2. Global History (Chart)
if target_table == 'usage_history':
rate_cols = "SUM(bytes_received_rate_mbps) as total_rx_rate, SUM(bytes_sent_rate_mbps) as total_tx_rate,"
else:
rate_cols = "0 as total_rx_rate, 0 as total_tx_rate,"
# Aggregation Query
# Group by interval_seconds
query_hist = f'''
SELECT
datetime((strftime('%s', timestamp) / {interval_seconds}) * {interval_seconds}, 'unixepoch') as timestamp,
SUM(total_rx) as total_rx,
SUM(total_tx) as total_tx,
MAX(total_rx_rate) as total_rx_rate,
MAX(total_tx_rate) as total_tx_rate,
MAX(active_count) as active_count
FROM (
SELECT
timestamp,
SUM(bytes_received) as total_rx,
SUM(bytes_sent) as total_tx,
{rate_cols}
COUNT(DISTINCT client_id) as active_count
FROM {target_table}
WHERE timestamp >= datetime('now', '-{hours} hours')
GROUP BY timestamp
) sub
GROUP BY datetime((strftime('%s', timestamp) / {interval_seconds}) * {interval_seconds}, 'unixepoch')
ORDER BY timestamp ASC
'''
cursor.execute(query_hist)
rows = cursor.fetchall()
columns = [col[0] for col in cursor.description]
db_data = {row[0]: dict(zip(columns, row)) for row in rows}
# Post-processing: Zero Fill
analytics['global_history_24h'] = []
now = datetime.utcnow()
# Round down to nearest interval
ts_now = now.timestamp()
ts_aligned = ts_now - (ts_now % interval_seconds)
now_aligned = datetime.utcfromtimestamp(ts_aligned)
# We want exactly 96 points ending at now_aligned
# Start time = now_aligned - (96 * interval)
start_time = now_aligned - timedelta(seconds=96 * interval_seconds)
current = start_time
# Generate exactly 96 points
for _ in range(96):
current += timedelta(seconds=interval_seconds)
ts_str = current.strftime('%Y-%m-%d %H:%M:%S')
if ts_str in db_data:
analytics['global_history_24h'].append(db_data[ts_str])
else:
analytics['global_history_24h'].append({
'timestamp': ts_str,
'total_rx': 0,
'total_tx': 0,
'total_rx_rate': 0,
'total_tx_rate': 0,
'active_count': 0
})
# Max Clients metric
max_clients = 0
for row in analytics['global_history_24h']:
if row.get('active_count', 0) > max_clients:
max_clients = row['active_count']
analytics['max_concurrent_24h'] = max_clients
# 3. Top Clients & 4. Traffic Distribution (Keep existing logic)
# Use same target table
query_top = f'''
SELECT
c.common_name,
SUM(t.bytes_received) as rx,
SUM(t.bytes_sent) as tx,
(SUM(t.bytes_received) + SUM(t.bytes_sent)) as total_traffic
FROM {target_table} t
JOIN clients c ON t.client_id = c.id
WHERE t.timestamp >= datetime('now', '-{hours} hours')
GROUP BY c.id
ORDER BY total_traffic DESC
LIMIT 3
'''
cursor.execute(query_top)
top_cols = [col[0] for col in cursor.description]
analytics['top_clients_24h'] = [dict(zip(top_cols, row)) for row in cursor.fetchall()]
query_dist = f'''
SELECT
SUM(bytes_received) as rx,
SUM(bytes_sent) as tx
FROM {target_table}
WHERE timestamp >= datetime('now', '-{hours} hours')
'''
cursor.execute(query_dist)
dist_res = cursor.fetchone()
if dist_res:
analytics['traffic_distribution'] = {'rx': dist_res[0] or 0, 'tx': dist_res[1] or 0}
return analytics
except Exception as e:
logger.error(f"Analytics error: {e}")
return analytics
finally:
conn.close()
# Initialize API instance
api = OpenVPNAPI()
# --- ROUTES ---
@app.route('/api/v1/stats', methods=['GET'])
def get_stats():
"""Get current statistics for all clients"""
try:
data = api.get_current_stats()
# Форматирование данных
formatted_data = []
for client in data:
client['total_received_mb'] = round((client['total_bytes_received'] or 0) / (1024*1024), 2)
client['total_sent_mb'] = round((client['total_bytes_sent'] or 0) / (1024*1024), 2)
client['current_recv_rate_mbps'] = client['current_recv_rate'] or 0
client['current_sent_rate_mbps'] = client['current_sent_rate'] or 0
formatted_data.append(client)
return jsonify({
'success': True,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'data': formatted_data,
'count': len(formatted_data)
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/v1/stats/system', methods=['GET'])
def get_system_stats():
"""Get system-wide statistics"""
try:
stats = api.get_system_stats()
return jsonify({
'success': True,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'data': stats
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/v1/stats/<string:common_name>', methods=['GET'])
def get_client_stats(common_name):
"""
Get detailed stats for a client.
Query Params:
- range: '24h' (default), '7d', '30d', '1y' OR custom dates
- resolution: 'auto' (default), 'raw', '5min', 'hourly', 'daily'
"""
try:
# Чтение параметров запроса
range_arg = request.args.get('range', default='24h')
resolution = request.args.get('resolution', default='auto')
# --- ИСПРАВЛЕНИЕ ТУТ ---
# Используем UTC, так как SQLite хранит данные в UTC
end_date = datetime.now(timezone.utc)
start_date = end_date - timedelta(hours=24)
# Парсинг диапазона
if range_arg.endswith('h'):
start_date = end_date - timedelta(hours=int(range_arg[:-1]))
elif range_arg.endswith('d'):
start_date = end_date - timedelta(days=int(range_arg[:-1]))
elif range_arg.endswith('y'):
start_date = end_date - timedelta(days=int(range_arg[:-1]) * 365)
# Получаем текущее состояние
all_stats = api.get_current_stats()
client_data = next((c for c in all_stats if c['common_name'] == common_name), None)
if not client_data:
return jsonify({'success': False, 'error': 'Client not found'}), 404
# Получаем исторические данные
history_result = api.get_client_history(
common_name,
start_date=start_date,
end_date=end_date,
resolution=resolution
)
response = {
'common_name': client_data['common_name'],
'real_address': client_data['real_address'],
'status': client_data['status'],
'totals': {
'received_mb': round((client_data['total_bytes_received'] or 0) / (1024*1024), 2),
'sent_mb': round((client_data['total_bytes_sent'] or 0) / (1024*1024), 2)
},
'current_rates': {
'recv_mbps': client_data['current_recv_rate'] or 0,
'sent_mbps': client_data['current_sent_rate'] or 0
},
'last_activity': client_data['last_activity'],
'history': history_result.get('data', []),
'meta': history_result.get('meta', {})
}
# Для timestamp ответа API лучше тоже использовать UTC или явно указывать смещение,
# но для совместимости с JS new Date() UTC строка идеальна.
return jsonify({
'success': True,
'timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'),
'data': response
})
except Exception as e:
logger.error(f"API Error: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/v1/certificates', methods=['GET'])
def get_certificates():
try:
data = api.get_certificates_info()
return jsonify({'success': True, 'data': data})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/v1/clients', methods=['GET'])
def get_clients_list():
try:
data = api.get_current_stats()
simple_list = [{'common_name': c['common_name'], 'status': c['status']} for c in data]
return jsonify({'success': True, 'data': simple_list})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/v1/health', methods=['GET'])
def health_check():
try:
conn = api.get_db_connection()
conn.close()
return jsonify({'success': True, 'status': 'healthy'})
except Exception as e:
return jsonify({'success': False, 'status': 'unhealthy', 'error': str(e)}), 500
@app.route('/api/v1/analytics', methods=['GET'])
def get_analytics():
"""Get dashboard analytics data"""
try:
range_arg = request.args.get('range', default='24h')
# Маппинг для безопасности
valid_ranges = {'24h': '24h', '7d': '7d', '30d': '30d'}
selected_range = valid_ranges.get(range_arg, '24h')
data = api.get_analytics_data(selected_range)
return jsonify({
'success': True,
'timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S'),
'data': data,
'range': selected_range
})
except Exception as e:
logger.error(f"Error in analytics endpoint: {e}")
return jsonify({'success': False, 'error': str(e)}), 500
if __name__ == "__main__":
host = api.config.get('api', 'host', fallback='0.0.0.0')
port = 5001 # Используем 5001, чтобы не конфликтовать, если что-то уже есть на 5000
debug = api.config.getboolean('api', 'debug', fallback=False)
logger.info(f"Starting API on {host}:{port}")
app.run(host=host, port=port, debug=debug)