Files
ASN-and-FQDN-IP-Collector/api_server.py
2026-02-02 11:02:32 +03:00

135 lines
4.0 KiB
Python

from fastapi import FastAPI, Query, Body, HTTPException
from enum import Enum
import json
import os
from typing import List, Dict
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from cidr_collector import CIDRCollector, FQDNCollector, load_full_config
app = FastAPI(title="RIPE CIDR/FQDN API")
class AddressType(str, Enum):
cidr = "cidr"
fqdn = "fqdn"
all_types = "all"
DATA_FILE = "data.json"
FQDN_DATA_FILE = "fqdn_data.json"
CONFIG_FILE = "config.json"
scheduler = BackgroundScheduler()
# Wrapper functions for scheduler
def run_asn_job():
print("Running scheduled ASN collection...")
# Re-instantiate to ensure fresh config
collector = CIDRCollector()
collector.run_collection()
def run_fqdn_job():
print("Running scheduled FQDN collection...")
# Re-instantiate to ensure fresh config
collector = FQDNCollector()
collector.run_collection()
def load_json(filename):
if not os.path.exists(filename):
return {}
try:
with open(filename, 'r') as f:
return json.load(f)
except Exception:
return {}
def get_cidrs() -> List[str]:
data = load_json(DATA_FILE)
cidrs = set()
for asn_data in data.values():
for prefix in asn_data.get("prefixes", []):
cidrs.add(prefix)
return list(cidrs)
def get_fqdn_ips() -> List[str]:
data = load_json(FQDN_DATA_FILE)
ips = set()
for domain_data in data.values():
for ip in domain_data.get("ips", []):
ips.add(ip)
return list(ips)
@app.on_event("startup")
def start_scheduler():
config = load_full_config()
schedule_config = config.get("schedule", {})
asn_cron = schedule_config.get("asn", "0 2 * * *")
fqdn_cron = schedule_config.get("fqdn", "0 3 * * *")
# Add jobs
scheduler.add_job(run_asn_job, CronTrigger.from_crontab(asn_cron), id="asn_job", replace_existing=True)
scheduler.add_job(run_fqdn_job, CronTrigger.from_crontab(fqdn_cron), id="fqdn_job", replace_existing=True)
scheduler.start()
print(f"Scheduler started. ASN: {asn_cron}, FQDN: {fqdn_cron}")
@app.on_event("shutdown")
def shutdown_scheduler():
scheduler.shutdown()
@app.get("/addresses", response_model=List[str])
def get_addresses(type: AddressType = Query(AddressType.all_types, description="Filter by address type")):
results = set()
if type in [AddressType.cidr, AddressType.all_types]:
results.update(get_cidrs())
if type in [AddressType.fqdn, AddressType.all_types]:
results.update(get_fqdn_ips())
return sorted(list(results))
@app.get("/schedule")
def get_schedule():
config = load_full_config()
return config.get("schedule", {})
@app.post("/schedule")
def update_schedule(schedule_update: Dict[str, str] = Body(..., example={"type": "asn", "cron": "*/10 * * * *"})):
job_type = schedule_update.get("type")
cron_str = schedule_update.get("cron")
if job_type not in ["asn", "fqdn"]:
raise HTTPException(status_code=400, detail="Invalid type. Must be 'asn' or 'fqdn'.")
if not cron_str:
raise HTTPException(status_code=400, detail="Cron string required.")
# Validate cron string by attempting to create trigger
try:
trigger = CronTrigger.from_crontab(cron_str)
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid cron string: {e}")
# Update config file
config = load_full_config()
if "schedule" not in config:
config["schedule"] = {}
config["schedule"][job_type] = cron_str
# Save config
with open(CONFIG_FILE, 'w') as f:
json.dump(config, f, indent=4)
# Update running job
job_id = f"{job_type}_job"
func = run_asn_job if job_type == "asn" else run_fqdn_job
scheduler.add_job(func, trigger, id=job_id, replace_existing=True)
return {"message": "Schedule updated", "type": job_type, "cron": cron_str}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)