import json import requests import datetime import os import argparse import sys import socket import subprocess import logging import ipaddress # --- Configuration --- CONFIG_FILE = "config.json" DATA_FILE = "data.json" FQDN_DATA_FILE = "fqdn_data.json" BASE_URL = "https://stat.ripe.net/data/announced-prefixes/data.json" LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" # --- Logging Setup --- logging.basicConfig(level=logging.INFO, format=LOG_FORMAT, stream=sys.stderr) logger = logging.getLogger(__name__) # --- Helper Functions --- def load_full_config(): if not os.path.exists(CONFIG_FILE): # Create default config if missing default_config = {"asns": [], "fqdns": [], "routing_table": "main"} save_full_config(default_config) return default_config try: with open(CONFIG_FILE, 'r') as f: config = json.load(f) # Ensure backward compatibility if "routing_table" not in config: config["routing_table"] = "main" save_full_config(config) return config except json.JSONDecodeError: logger.error(f"Failed to decode {CONFIG_FILE}. Using defaults.") return {"asns": [], "fqdns": [], "routing_table": "main"} def save_full_config(config): try: with open(CONFIG_FILE, 'w') as f: json.dump(config, f, indent=4) except Exception as e: logger.error(f"Failed to save config: {e}") # --- Routing Manager --- class RoutingManager: RT_TABLES_FILE = "/etc/iproute2/rt_tables" def __init__(self, table_name, gateway=None): self.table_name = str(table_name) self.gateway = gateway self.existing_routes = set() self.table_id = None def ensure_table_exists(self): """Check if table exists, create if missing.""" if self._read_table_id(): logger.info(f"Routing table '{self.table_name}' (ID: {self.table_id}) found.") return True logger.warning(f"Routing table '{self.table_name}' not found. Creating...") if self._create_table(): logger.info(f"Routing table '{self.table_name}' created successfully.") self._read_table_id() return True else: logger.error(f"Failed to create routing table '{self.table_name}'.") return False def _read_table_id(self): """Read table ID from /etc/iproute2/rt_tables.""" try: if not os.path.exists(self.RT_TABLES_FILE): return False with open(self.RT_TABLES_FILE, 'r') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split() if len(parts) >= 2: tid, tname = parts[0], parts[1] if tname == self.table_name: self.table_id = tid return True return False except Exception as e: logger.error(f"Error reading {self.RT_TABLES_FILE}: {e}") return False def _create_table(self): """Add new table to /etc/iproute2/rt_tables with free ID.""" try: rt_dir = os.path.dirname(self.RT_TABLES_FILE) if not os.path.exists(rt_dir): os.makedirs(rt_dir, exist_ok=True) used_ids = set() if os.path.exists(self.RT_TABLES_FILE): with open(self.RT_TABLES_FILE, 'r') as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split() if len(parts) >= 2: try: used_ids.add(int(parts[0])) except ValueError: continue free_id = None for tid in range(2, 253): if tid not in used_ids: free_id = tid break if free_id is None: logger.error("No available table IDs (2-252).") return False with open(self.RT_TABLES_FILE, 'a') as f: f.write(f"{free_id}\t{self.table_name}\n") self.table_id = str(free_id) return True except PermissionError: logger.error(f"Permission denied writing to {self.RT_TABLES_FILE}. Run as root.") return False except Exception as e: logger.error(f"Error creating table: {e}") return False def check_table_exists(self): """Wrapper to ensure table exists and load routes.""" if not self.ensure_table_exists(): return False # Validate Gateway if self.gateway: try: ipaddress.ip_address(self.gateway) logger.info(f"Using gateway: {self.gateway}") except ValueError: logger.error(f"Invalid gateway IP address: {self.gateway}") return False else: logger.warning("No gateway specified. Routes will be added without 'via'.") # Load existing routes try: result = subprocess.run( ["ip", "route", "show", "table", self.table_name], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: self._parse_existing_routes(result.stdout) return True except Exception as e: logger.error(f"Error listing routes: {e}") return False def _parse_existing_routes(self, output): """Parse 'ip route' output to extract prefixes.""" for line in output.splitlines(): parts = line.split() if parts: prefix = parts[0] if prefix != "default" and "/" in prefix: try: ipaddress.ip_network(prefix, strict=False) self.existing_routes.add(prefix) except ValueError: continue def add_routes(self, prefixes): """Add IPv4 prefixes to the table if they don't exist.""" added_count = 0 failed_count = 0 for prefix in prefixes: try: net = ipaddress.ip_network(prefix, strict=False) if net.version != 4: continue except ValueError: logger.warning(f"Invalid prefix format: {prefix}") continue if prefix in self.existing_routes: logger.debug(f"Route {prefix} already exists in table {self.table_name}.") continue try: # Correct syntax: ip route add via table cmd = ["ip", "route", "add", prefix] if self.gateway: cmd.extend(["via", self.gateway]) cmd.extend(["table", self.table_name]) subprocess.run(cmd, check=True, capture_output=True, text=True) logger.info(f"Injected route: {prefix} via {self.gateway or 'direct'} -> table {self.table_name}") added_count += 1 except subprocess.CalledProcessError as e: if "File exists" in e.stderr: logger.debug(f"Route {prefix} already exists (race condition).") else: logger.error(f"Failed to add route {prefix}: {e.stderr.strip()}") failed_count += 1 except Exception as e: logger.error(f"Unexpected error adding {prefix}: {e}") failed_count += 1 logger.info(f"Routing injection complete: {added_count} added, {failed_count} failed.") return failed_count == 0 class CIDRCollector: def __init__(self): self.config = load_full_config() self.asns = self.config.get("asns", []) def save_config(self): self.config["asns"] = self.asns save_full_config(self.config) def add_asn(self, asn): if asn not in self.asns: self.asns.append(asn) self.save_config() logger.info(f"ASN {asn} added.") else: logger.info(f"ASN {asn} already in list.") def remove_asn(self, asn): if asn in self.asns: self.asns.remove(asn) self.save_config() logger.info(f"ASN {asn} removed.") else: logger.warning(f"ASN {asn} not found in list.") def list_asns(self): logger.info(f"Current ASNs: {self.asns}") def fetch_prefixes(self, asn): params = {'resource': f'AS{asn}'} try: response = requests.get(BASE_URL, params=params, timeout=10) response.raise_for_status() data = response.json() prefixes = [] if 'data' in data and 'prefixes' in data['data']: for item in data['data']['prefixes']: if 'prefix' in item: prefixes.append(item['prefix']) return prefixes except Exception as e: logger.error(f"Error fetching data for AS{asn}: {e}") return None def load_data(self): if not os.path.exists(DATA_FILE): return {} try: with open(DATA_FILE, 'r') as f: return json.load(f) except json.JSONDecodeError: logger.error(f"Failed to decode {DATA_FILE}") return {} def save_data(self, data): with open(DATA_FILE, 'w') as f: json.dump(data, f, indent=4) def run_collection(self): current_data = self.load_data() updated = False current_time = datetime.datetime.now().isoformat() logger.info("Starting ASN CIDR collection...") for asn in self.asns: str_asn = str(asn) logger.info(f"Processing AS{asn}...") fetched_prefixes = self.fetch_prefixes(asn) if fetched_prefixes is None: continue fetched_set = set(fetched_prefixes) if str_asn not in current_data: current_data[str_asn] = { "last_updated": current_time, "prefixes": sorted(list(fetched_set)) } logger.info(f" - New ASN. Added {len(fetched_set)} prefixes.") updated = True else: existing_prefixes = set(current_data[str_asn].get("prefixes", [])) new_prefixes = fetched_set - existing_prefixes if new_prefixes: updated_set = existing_prefixes.union(fetched_set) current_data[str_asn]["prefixes"] = sorted(list(updated_set)) current_data[str_asn]["last_updated"] = current_time logger.info(f" - Updates found. Added {len(new_prefixes)} new prefixes.") updated = True else: logger.debug(f" - No new prefixes for AS{asn}.") if updated: self.save_data(current_data) logger.info("CIDR Data saved to data.json") else: logger.info("No CIDR changes to save.") return current_data class FQDNCollector: def __init__(self): self.config = load_full_config() self.fqdns = self.config.get("fqdns", []) def save_config(self): self.config["fqdns"] = self.fqdns save_full_config(self.config) def add_fqdn(self, fqdn): if fqdn not in self.fqdns: self.fqdns.append(fqdn) self.save_config() logger.info(f"FQDN {fqdn} added.") else: logger.info(f"FQDN {fqdn} already in list.") def remove_fqdn(self, fqdn): if fqdn in self.fqdns: self.fqdns.remove(fqdn) self.save_config() logger.info(f"FQDN {fqdn} removed.") else: logger.warning(f"FQDN {fqdn} not found in list.") def list_fqdns(self): logger.info(f"Current FQDNs: {self.fqdns}") def resolve_fqdn(self, fqdn): try: results = socket.getaddrinfo(fqdn, None) ips = set() for result in results: ip_addr = result[4][0] ips.add(ip_addr) return list(ips) except socket.gaierror as e: logger.error(f"Error resolving {fqdn}: {e}") return [] def load_data(self): if not os.path.exists(FQDN_DATA_FILE): return {} try: with open(FQDN_DATA_FILE, 'r') as f: return json.load(f) except json.JSONDecodeError: return {} def save_data(self, data): with open(FQDN_DATA_FILE, 'w') as f: json.dump(data, f, indent=4) def run_collection(self): current_data = self.load_data() updated = False current_time = datetime.datetime.now().isoformat() logger.info("Starting FQDN IP collection...") for fqdn in self.fqdns: logger.info(f"Processing {fqdn}...") resolved_ips = self.resolve_fqdn(fqdn) if not resolved_ips: logger.warning(f" - No IPs resolved for {fqdn}") continue fetched_set = set(resolved_ips) if fqdn not in current_data: current_data[fqdn] = { "last_updated": current_time, "ips": sorted(list(fetched_set)) } logger.info(f" - New FQDN. Added {len(fetched_set)} IPs.") updated = True else: existing_ips = set(current_data[fqdn].get("ips", [])) new_ips = fetched_set - existing_ips if new_ips: updated_set = existing_ips.union(fetched_set) current_data[fqdn]["ips"] = sorted(list(updated_set)) current_data[fqdn]["last_updated"] = current_time logger.info(f" - Updates found. Added {len(new_ips)} new IPs.") updated = True else: logger.debug(" - No new IPs found.") if updated: self.save_data(current_data) logger.info(f"FQDN Data saved to {FQDN_DATA_FILE}") else: logger.info("No FQDN changes to save.") # --- Main Logic --- def main(): parser = argparse.ArgumentParser(description="Collector for RIPE AS CIDRs and FQDN IPs") subparsers = parser.add_subparsers(dest="command") # Command: run parser_run = subparsers.add_parser("run", help="Run the collection process") parser_run.add_argument("--mode", choices=["asn", "fqdn", "all"], default="all", help="Collection mode") parser_run.add_argument("--inject", action="store_true", help="Inject collected IPv4 prefixes into routing table") # ASN Commands parser_add = subparsers.add_parser("add", help="Add an ASN") parser_add.add_argument("asn", type=int, help="ASN to add") parser_remove = subparsers.add_parser("remove", help="Remove an ASN") parser_remove.add_argument("asn", type=int, help="ASN to remove") # FQDN Commands parser_add_fqdn = subparsers.add_parser("add-fqdn", help="Add an FQDN") parser_add_fqdn.add_argument("fqdn", type=str, help="FQDN to add") parser_remove_fqdn = subparsers.add_parser("remove-fqdn", help="Remove an FQDN") parser_remove_fqdn.add_argument("fqdn", type=str, help="FQDN to remove") # Command: list parser_list = subparsers.add_parser("list", help="List ASNs and FQDNs") args = parser.parse_args() asn_collector = CIDRCollector() fqdn_collector = FQDNCollector() try: if args.command == "add": asn_collector.add_asn(args.asn) elif args.command == "remove": asn_collector.remove_asn(args.asn) elif args.command == "add-fqdn": fqdn_collector.add_fqdn(args.fqdn) elif args.command == "remove-fqdn": fqdn_collector.remove_fqdn(args.fqdn) elif args.command == "list": asn_collector.list_asns() fqdn_collector.list_fqdns() elif args.command == "run": mode = args.mode all_prefixes = [] # 1. Collection Phase if mode in ["asn", "all"]: data = asn_collector.run_collection() # Aggregate all prefixes for potential injection for asn_data in data.values(): all_prefixes.extend(asn_data.get("prefixes", [])) if mode == "all": logger.info("-" * 20) if mode in ["fqdn", "all"]: fqdn_collector.run_collection() # 2. Injection Phase (Only if --inject is present) if args.inject: logger.info("Starting Routing Injection phase...") if os.geteuid() != 0: logger.error("Error: --inject requires root privileges (sudo).") sys.exit(1) config = load_full_config() table_name = config.get("routing_table", "main") gateway = config.get("pbr_gateway") # Получаем шлюз из конфига router = RoutingManager(table_name, gateway=gateway) # Передаем шлюз if router.check_table_exists(): unique_prefixes = list(set(all_prefixes)) logger.info(f"Attempting to inject {len(unique_prefixes)} unique prefixes into table '{table_name}'.") router.add_routes(unique_prefixes) else: logger.error("Routing table check failed. Injection aborted.") sys.exit(1) else: logger.debug("Injection skipped (--inject not provided).") else: parser.print_help() except Exception as e: logger.critical(f"Unhandled exception: {e}") sys.exit(1) if __name__ == "__main__": main()