|
| 1 | +"""Audit Sanitizer — strips sensitive data from fleet audit output. |
| 2 | +
|
| 3 | +Deep-walks all dicts/lists in audit results and removes or redacts: |
| 4 | + - Absolute file paths (replaced with relative) |
| 5 | + - Keys matching sensitive patterns (password, secret, token, etc.) |
| 6 | + - OS/system fingerprints (hostname, IP addresses, platform, pid) |
| 7 | +
|
| 8 | +Preserves: scores, grades, dimension names, findings categories/counts, |
| 9 | +timestamps, before/after deltas. |
| 10 | +
|
| 11 | +Configurable via a dict or optional JSON config file. |
| 12 | +""" |
| 13 | +import copy |
| 14 | +import json |
| 15 | +import logging |
| 16 | +import re |
| 17 | +from pathlib import Path |
| 18 | +from typing import Any |
| 19 | + |
| 20 | +log = logging.getLogger("audit_sanitizer") |
| 21 | + |
| 22 | +# ── Default Configuration ──────────────────────────────────────────────── |
| 23 | + |
| 24 | +_DEFAULT_CONFIG: dict = { |
| 25 | + # Keys whose values get stripped entirely (case-insensitive substring match) |
| 26 | + "strip_keys": [ |
| 27 | + "password", "secret", "key", "token", "api_key", |
| 28 | + "endpoint", "hostname", "username", "env", "pid", "port", |
| 29 | + "ip_address", "home_dir", "user_home", |
| 30 | + ], |
| 31 | + # Keys that are always preserved even if they match strip_keys |
| 32 | + "keep_keys": [ |
| 33 | + "api_health", "api_key_count", |
| 34 | + ], |
| 35 | + # Path prefixes that indicate absolute paths needing sanitization |
| 36 | + "path_prefixes": [ |
| 37 | + "/home/", "/c/Users/", "C:\\Users\\", "C:/Users/", |
| 38 | + "/opt/", "/usr/", "/var/", "/tmp/", |
| 39 | + "/root/", "/etc/", |
| 40 | + ], |
| 41 | + # Regex for Windows drive letters (D:\, E:\, etc.) |
| 42 | + "windows_drive_re": r"[A-Za-z]:[\\\/]", |
| 43 | + # Regex for IP addresses (v4) |
| 44 | + "ipv4_re": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", |
| 45 | + # OS fingerprint keys to strip |
| 46 | + "os_fingerprint_keys": [ |
| 47 | + "os.name", "os_name", "platform", "sys.platform", |
| 48 | + "hostname", "machine", "node", "release", "version", |
| 49 | + ], |
| 50 | + # Preserve these keys unconditionally (scores, grades, etc.) |
| 51 | + "preserve_keys": [ |
| 52 | + "score", "auto_score", "grade", "auto_grade", "manual_grade", |
| 53 | + "overall_score", "overall_grade", "s_tier_grade", "s_tier_eligible", |
| 54 | + "dimension", "dimensions", "tier", "confidence", |
| 55 | + "weight", "part", "type", "message", |
| 56 | + "timestamp", "count", "total", "ok", |
| 57 | + "passed", "failed", "stale", "found", "missing", |
| 58 | + "divergence", "divergences", "acknowledged", |
| 59 | + "gaps", "issues", "evidence", |
| 60 | + "ratchet_grade", "ratchet_score", "ratchet_violations", |
| 61 | + "context_avg", "output_avg", |
| 62 | + "under_1500", "total_checked", |
| 63 | + "ruff_clean", "no_raw_sqlite", "no_bare_excepts", |
| 64 | + "rbac_ok", "rbac_roles", "path_traversal_blocked", |
| 65 | + "page_score", "feedback_score", |
| 66 | + "sse_template", "audit_log", |
| 67 | + "dimensions_scored", |
| 68 | + ], |
| 69 | +} |
| 70 | + |
| 71 | +# Compiled regexes (built once at import time) |
| 72 | +_IPV4_RE = re.compile(_DEFAULT_CONFIG["ipv4_re"]) |
| 73 | +_WIN_DRIVE_RE = re.compile(_DEFAULT_CONFIG["windows_drive_re"]) |
| 74 | + |
| 75 | + |
| 76 | +# ── Config Loader ──────────────────────────────────────────────────────── |
| 77 | + |
| 78 | +def load_sanitizer_config(config_path: Path | None = None) -> dict: |
| 79 | + """Load sanitizer config from JSON file, falling back to defaults. |
| 80 | +
|
| 81 | + Args: |
| 82 | + config_path: optional path to a JSON config file. Keys in the file |
| 83 | + are merged over the defaults. |
| 84 | +
|
| 85 | + Returns: |
| 86 | + Merged config dict. |
| 87 | + """ |
| 88 | + config = copy.deepcopy(_DEFAULT_CONFIG) |
| 89 | + if config_path and config_path.exists(): |
| 90 | + try: |
| 91 | + with open(config_path, "r", encoding="utf-8") as f: |
| 92 | + overrides = json.load(f) |
| 93 | + for key in ("strip_keys", "keep_keys", "path_prefixes", |
| 94 | + "os_fingerprint_keys", "preserve_keys"): |
| 95 | + if key in overrides: |
| 96 | + config[key] = overrides[key] |
| 97 | + for key in ("windows_drive_re", "ipv4_re"): |
| 98 | + if key in overrides: |
| 99 | + config[key] = overrides[key] |
| 100 | + except Exception: |
| 101 | + log.warning("Failed to load sanitizer config from %s", config_path, |
| 102 | + exc_info=True) |
| 103 | + return config |
| 104 | + |
| 105 | + |
| 106 | +# ── Path Sanitization ──────────────────────────────────────────────────── |
| 107 | + |
| 108 | +def _sanitize_path(value: str, config: dict) -> str: |
| 109 | + """Replace absolute paths with relative equivalents. |
| 110 | +
|
| 111 | + Detects common prefixes (/home/user/..., C:\\Users\\..., /opt/..., etc.) |
| 112 | + and strips them down to the project-relative portion. |
| 113 | + """ |
| 114 | + result = value |
| 115 | + |
| 116 | + # Handle Windows-style paths: C:\Users\max\Projects\Education\fleet\foo |
| 117 | + # Convert to forward slashes first for uniform processing |
| 118 | + for prefix in config.get("path_prefixes", []): |
| 119 | + if prefix in result: |
| 120 | + # Find the path segment after common project markers |
| 121 | + idx = result.find(prefix) |
| 122 | + path_portion = result[idx:] |
| 123 | + # Normalize to forward slashes |
| 124 | + normalized = path_portion.replace("\\", "/") |
| 125 | + # Try to find a project-relative anchor |
| 126 | + for anchor in ("fleet/", "BigEd/", "Education/fleet/", |
| 127 | + "Education/BigEd/", "autoresearch/"): |
| 128 | + anchor_idx = normalized.find(anchor) |
| 129 | + if anchor_idx >= 0: |
| 130 | + relative = normalized[anchor_idx:] |
| 131 | + result = result[:idx] + relative |
| 132 | + break |
| 133 | + else: |
| 134 | + # No anchor found — just strip the home/system prefix |
| 135 | + # Keep the last 2 path components |
| 136 | + parts = normalized.rstrip("/").split("/") |
| 137 | + if len(parts) > 2: |
| 138 | + result = result[:idx] + "/".join(parts[-2:]) |
| 139 | + |
| 140 | + # Handle bare Windows drive letters not caught by prefix list |
| 141 | + if _WIN_DRIVE_RE.search(result): |
| 142 | + normalized = result.replace("\\", "/") |
| 143 | + for anchor in ("fleet/", "BigEd/", "Education/"): |
| 144 | + anchor_idx = normalized.find(anchor) |
| 145 | + if anchor_idx >= 0: |
| 146 | + result = normalized[anchor_idx:] |
| 147 | + break |
| 148 | + |
| 149 | + return result |
| 150 | + |
| 151 | + |
| 152 | +# ── Value Sanitization ─────────────────────────────────────────────────── |
| 153 | + |
| 154 | +def _sanitize_value(value: Any, config: dict) -> Any: |
| 155 | + """Sanitize a single value — redact IPs, paths, OS fingerprints.""" |
| 156 | + if isinstance(value, str): |
| 157 | + # Replace IP addresses with placeholder |
| 158 | + sanitized = _IPV4_RE.sub("[REDACTED_IP]", value) |
| 159 | + # Sanitize absolute paths |
| 160 | + sanitized = _sanitize_path(sanitized, config) |
| 161 | + return sanitized |
| 162 | + return value |
| 163 | + |
| 164 | + |
| 165 | +def _is_strip_key(key: str, config: dict) -> bool: |
| 166 | + """Check if a key matches the strip list but not the keep list.""" |
| 167 | + key_lower = key.lower() |
| 168 | + |
| 169 | + # Check keep list first (exact match or substring) |
| 170 | + for keep in config.get("keep_keys", []): |
| 171 | + if keep.lower() == key_lower or keep.lower() in key_lower: |
| 172 | + return False |
| 173 | + |
| 174 | + # Check preserve list (exact match) |
| 175 | + if key_lower in [k.lower() for k in config.get("preserve_keys", [])]: |
| 176 | + return False |
| 177 | + |
| 178 | + # Check strip list (substring match) |
| 179 | + for strip in config.get("strip_keys", []): |
| 180 | + if strip.lower() in key_lower: |
| 181 | + return True |
| 182 | + |
| 183 | + # Check OS fingerprint keys (exact match) |
| 184 | + for fp_key in config.get("os_fingerprint_keys", []): |
| 185 | + if fp_key.lower() == key_lower: |
| 186 | + return True |
| 187 | + |
| 188 | + return False |
| 189 | + |
| 190 | + |
| 191 | +# ── Deep Walk ──────────────────────────────────────────────────────────── |
| 192 | + |
| 193 | +def _sanitize_dict(data: dict, config: dict) -> dict: |
| 194 | + """Deep-walk a dict, stripping sensitive keys and sanitizing values.""" |
| 195 | + result = {} |
| 196 | + for key, value in data.items(): |
| 197 | + if _is_strip_key(key, config): |
| 198 | + result[key] = "[REDACTED]" |
| 199 | + continue |
| 200 | + result[key] = _sanitize_any(value, config) |
| 201 | + return result |
| 202 | + |
| 203 | + |
| 204 | +def _sanitize_list(data: list, config: dict) -> list: |
| 205 | + """Deep-walk a list, sanitizing each element.""" |
| 206 | + return [_sanitize_any(item, config) for item in data] |
| 207 | + |
| 208 | + |
| 209 | +def _sanitize_any(data: Any, config: dict) -> Any: |
| 210 | + """Dispatch to the appropriate sanitizer based on type.""" |
| 211 | + if isinstance(data, dict): |
| 212 | + return _sanitize_dict(data, config) |
| 213 | + if isinstance(data, list): |
| 214 | + return _sanitize_list(data, config) |
| 215 | + if isinstance(data, str): |
| 216 | + return _sanitize_value(data, config) |
| 217 | + # int, float, bool, None — pass through |
| 218 | + return data |
| 219 | + |
| 220 | + |
| 221 | +# ── Public API ─────────────────────────────────────────────────────────── |
| 222 | + |
| 223 | +def sanitize(data: Any, config: dict | None = None, |
| 224 | + config_path: Path | None = None) -> Any: |
| 225 | + """Sanitize audit output data, stripping sensitive information. |
| 226 | +
|
| 227 | + Deep-walks all dicts/lists and: |
| 228 | + - Replaces absolute paths with relative |
| 229 | + - Strips keys matching sensitive patterns |
| 230 | + - Redacts IP addresses |
| 231 | + - Preserves scores, grades, dimensions, timestamps, findings |
| 232 | +
|
| 233 | + Args: |
| 234 | + data: The audit data to sanitize (dict, list, or primitive). |
| 235 | + config: Optional config dict (merged over defaults). |
| 236 | + config_path: Optional path to a JSON config file. |
| 237 | +
|
| 238 | + Returns: |
| 239 | + A deep copy of the data with sensitive information removed. |
| 240 | + """ |
| 241 | + effective_config = load_sanitizer_config(config_path) |
| 242 | + if config: |
| 243 | + for key, value in config.items(): |
| 244 | + effective_config[key] = value |
| 245 | + |
| 246 | + # Work on a deep copy to avoid mutating the original |
| 247 | + return _sanitize_any(copy.deepcopy(data), effective_config) |
| 248 | + |
| 249 | + |
| 250 | +def sanitize_scores(scores: list[dict], config: dict | None = None) -> list[dict]: |
| 251 | + """Convenience: sanitize a list of score dicts from get_latest_scores(). |
| 252 | +
|
| 253 | + Preserves the score/grade/dimension structure while stripping |
| 254 | + sensitive details from auto_detail JSON blobs. |
| 255 | + """ |
| 256 | + sanitized = [] |
| 257 | + effective_config = load_sanitizer_config() |
| 258 | + if config: |
| 259 | + for key, value in config.items(): |
| 260 | + effective_config[key] = value |
| 261 | + |
| 262 | + for score_row in scores: |
| 263 | + row = dict(score_row) |
| 264 | + # Parse and sanitize auto_detail if it's a JSON string |
| 265 | + if isinstance(row.get("auto_detail"), str): |
| 266 | + try: |
| 267 | + detail = json.loads(row["auto_detail"]) |
| 268 | + row["auto_detail"] = json.dumps( |
| 269 | + _sanitize_any(detail, effective_config) |
| 270 | + ) |
| 271 | + except (json.JSONDecodeError, TypeError): |
| 272 | + row["auto_detail"] = _sanitize_value( |
| 273 | + row["auto_detail"], effective_config |
| 274 | + ) |
| 275 | + elif isinstance(row.get("auto_detail"), dict): |
| 276 | + row["auto_detail"] = _sanitize_any( |
| 277 | + row["auto_detail"], effective_config |
| 278 | + ) |
| 279 | + sanitized.append(row) |
| 280 | + return sanitized |
0 commit comments