from fastapi import FastAPI, Query, Body, HTTPException from enum import Enum import json import os from typing import List, Dict from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from cidr_collector import CIDRCollector, FQDNCollector, load_full_config app = FastAPI(title="RIPE CIDR/FQDN API") class AddressType(str, Enum): cidr = "cidr" fqdn = "fqdn" all_types = "all" DATA_FILE = "data.json" FQDN_DATA_FILE = "fqdn_data.json" CONFIG_FILE = "config.json" scheduler = BackgroundScheduler() # Wrapper functions for scheduler def run_asn_job(): print("Running scheduled ASN collection...") # Re-instantiate to ensure fresh config collector = CIDRCollector() collector.run_collection() def run_fqdn_job(): print("Running scheduled FQDN collection...") # Re-instantiate to ensure fresh config collector = FQDNCollector() collector.run_collection() def load_json(filename): if not os.path.exists(filename): return {} try: with open(filename, 'r') as f: return json.load(f) except Exception: return {} def get_cidrs() -> List[str]: data = load_json(DATA_FILE) cidrs = set() for asn_data in data.values(): for prefix in asn_data.get("prefixes", []): cidrs.add(prefix) return list(cidrs) def get_fqdn_ips() -> List[str]: data = load_json(FQDN_DATA_FILE) ips = set() for domain_data in data.values(): for ip in domain_data.get("ips", []): ips.add(ip) return list(ips) @app.on_event("startup") def start_scheduler(): config = load_full_config() schedule_config = config.get("schedule", {}) asn_cron = schedule_config.get("asn", "0 2 * * *") fqdn_cron = schedule_config.get("fqdn", "0 3 * * *") # Add jobs scheduler.add_job(run_asn_job, CronTrigger.from_crontab(asn_cron), id="asn_job", replace_existing=True) scheduler.add_job(run_fqdn_job, CronTrigger.from_crontab(fqdn_cron), id="fqdn_job", replace_existing=True) scheduler.start() print(f"Scheduler started. ASN: {asn_cron}, FQDN: {fqdn_cron}") @app.on_event("shutdown") def shutdown_scheduler(): scheduler.shutdown() @app.get("/addresses", response_model=List[str]) def get_addresses(type: AddressType = Query(AddressType.all_types, description="Filter by address type")): results = set() if type in [AddressType.cidr, AddressType.all_types]: results.update(get_cidrs()) if type in [AddressType.fqdn, AddressType.all_types]: results.update(get_fqdn_ips()) return sorted(list(results)) @app.get("/schedule") def get_schedule(): config = load_full_config() return config.get("schedule", {}) @app.post("/schedule") def update_schedule(schedule_update: Dict[str, str] = Body(..., example={"type": "asn", "cron": "*/10 * * * *"})): job_type = schedule_update.get("type") cron_str = schedule_update.get("cron") if job_type not in ["asn", "fqdn"]: raise HTTPException(status_code=400, detail="Invalid type. Must be 'asn' or 'fqdn'.") if not cron_str: raise HTTPException(status_code=400, detail="Cron string required.") # Validate cron string by attempting to create trigger try: trigger = CronTrigger.from_crontab(cron_str) except Exception as e: raise HTTPException(status_code=400, detail=f"Invalid cron string: {e}") # Update config file config = load_full_config() if "schedule" not in config: config["schedule"] = {} config["schedule"][job_type] = cron_str # Save config with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=4) # Update running job job_id = f"{job_type}_job" func = run_asn_job if job_type == "asn" else run_fqdn_job scheduler.add_job(func, trigger, id=job_id, replace_existing=True) return {"message": "Schedule updated", "type": job_type, "cron": cron_str} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)