Files

135 lines
4.0 KiB
Python
Raw Permalink Normal View History

2026-02-02 11:02:32 +03:00
from fastapi import FastAPI, Query, Body, HTTPException
from enum import Enum
import json
import os
from typing import List, Dict
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from cidr_collector import CIDRCollector, FQDNCollector, load_full_config
app = FastAPI(title="RIPE CIDR/FQDN API")
class AddressType(str, Enum):
cidr = "cidr"
fqdn = "fqdn"
all_types = "all"
DATA_FILE = "data.json"
FQDN_DATA_FILE = "fqdn_data.json"
CONFIG_FILE = "config.json"
scheduler = BackgroundScheduler()
# Wrapper functions for scheduler
def run_asn_job():
print("Running scheduled ASN collection...")
# Re-instantiate to ensure fresh config
collector = CIDRCollector()
collector.run_collection()
def run_fqdn_job():
print("Running scheduled FQDN collection...")
# Re-instantiate to ensure fresh config
collector = FQDNCollector()
collector.run_collection()
def load_json(filename):
if not os.path.exists(filename):
return {}
try:
with open(filename, 'r') as f:
return json.load(f)
except Exception:
return {}
def get_cidrs() -> List[str]:
data = load_json(DATA_FILE)
cidrs = set()
for asn_data in data.values():
for prefix in asn_data.get("prefixes", []):
cidrs.add(prefix)
return list(cidrs)
def get_fqdn_ips() -> List[str]:
data = load_json(FQDN_DATA_FILE)
ips = set()
for domain_data in data.values():
for ip in domain_data.get("ips", []):
ips.add(ip)
return list(ips)
@app.on_event("startup")
def start_scheduler():
config = load_full_config()
schedule_config = config.get("schedule", {})
asn_cron = schedule_config.get("asn", "0 2 * * *")
fqdn_cron = schedule_config.get("fqdn", "0 3 * * *")
# Add jobs
scheduler.add_job(run_asn_job, CronTrigger.from_crontab(asn_cron), id="asn_job", replace_existing=True)
scheduler.add_job(run_fqdn_job, CronTrigger.from_crontab(fqdn_cron), id="fqdn_job", replace_existing=True)
scheduler.start()
print(f"Scheduler started. ASN: {asn_cron}, FQDN: {fqdn_cron}")
@app.on_event("shutdown")
def shutdown_scheduler():
scheduler.shutdown()
@app.get("/addresses", response_model=List[str])
def get_addresses(type: AddressType = Query(AddressType.all_types, description="Filter by address type")):
results = set()
if type in [AddressType.cidr, AddressType.all_types]:
results.update(get_cidrs())
if type in [AddressType.fqdn, AddressType.all_types]:
results.update(get_fqdn_ips())
return sorted(list(results))
@app.get("/schedule")
def get_schedule():
config = load_full_config()
return config.get("schedule", {})
@app.post("/schedule")
def update_schedule(schedule_update: Dict[str, str] = Body(..., example={"type": "asn", "cron": "*/10 * * * *"})):
job_type = schedule_update.get("type")
cron_str = schedule_update.get("cron")
if job_type not in ["asn", "fqdn"]:
raise HTTPException(status_code=400, detail="Invalid type. Must be 'asn' or 'fqdn'.")
if not cron_str:
raise HTTPException(status_code=400, detail="Cron string required.")
# Validate cron string by attempting to create trigger
try:
trigger = CronTrigger.from_crontab(cron_str)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid cron string: {e}")
# Update config file
config = load_full_config()
if "schedule" not in config:
config["schedule"] = {}
config["schedule"][job_type] = cron_str
# Save config
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=4)
# Update running job
job_id = f"{job_type}_job"
func = run_asn_job if job_type == "asn" else run_fqdn_job
scheduler.add_job(func, trigger, id=job_id, replace_existing=True)
return {"message": "Schedule updated", "type": job_type, "cron": cron_str}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)