Skip to content

Commit 336ad1d

Browse files
committed
fix: harden LLM guardrails + extract shared scan helper
- XML entity escaping for user-controlled values in LLM prompt - Strict boolean validation (approved is True, not truthy coercion) - Unicode NFKC normalization before keyword detection - Regex word boundaries on injection patterns to reduce false positives - Extract _scan_and_validate() helper to deduplicate security scan logic
1 parent 857beba commit 336ad1d

File tree

3 files changed

+63
-69
lines changed

3 files changed

+63
-69
lines changed

pop_pay/engine/guardrails.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import re
22
import os
3+
import unicodedata
34
from urllib.parse import urlparse
45
from pop_pay.core.models import PaymentIntent, GuardrailPolicy
56

@@ -62,7 +63,7 @@ async def evaluate_intent(self, intent: PaymentIntent, policy: GuardrailPolicy)
6263

6364
# Rule 2: Hallucination/Loop detection
6465
if policy.block_hallucination_loops:
65-
reasoning_lower = intent.reasoning.lower()
66+
reasoning_lower = unicodedata.normalize("NFKC", intent.reasoning).lower()
6667
loop_keywords = ["retry", "failed again", "loop", "ignore previous", "stuck"]
6768

6869
for keyword in loop_keywords:
@@ -72,11 +73,11 @@ async def evaluate_intent(self, intent: PaymentIntent, policy: GuardrailPolicy)
7273
# Rule 3: Injection pattern detection
7374
injection_patterns = [
7475
r'\{.*".*".*:', # JSON-like structure
75-
r'output\s*:', # "output:" pattern
76-
r'you are now', # role injection
77-
r'ignore (all |previous |your |the )', # instruction override
78-
r'already (approved|authorized|confirmed)', # false pre-approval
79-
r'system (says|has|override)', # system impersonation
76+
r'\boutput\s*:', # "output:" pattern
77+
r'\byou are now\b', # role injection
78+
r'\bignore (all |previous |your |the )', # instruction override
79+
r'\balready (approved|authorized|confirmed)\b', # false pre-approval
80+
r'\bsystem (says|has|override)\b', # system impersonation
8081
]
8182
for pattern in injection_patterns:
8283
if re.search(pattern, reasoning_lower):

pop_pay/engine/llm_guardrails.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import json
2+
from html import escape as _html_escape
23
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
34
from pop_pay.core.models import PaymentIntent, GuardrailPolicy
45
from pop_pay.engine.guardrails import GuardrailEngine
56

7+
8+
def _escape_xml(s: str) -> str:
9+
return _html_escape(s, quote=True)
10+
611
# Exceptions that warrant a retry (rate limits, transient server errors).
712
# Defined at module level so the @retry decorator can reference them before
813
# openai is imported — the actual classes are resolved lazily inside the engine.
@@ -34,10 +39,10 @@ async def evaluate_intent(self, intent: PaymentIntent, policy: GuardrailPolicy)
3439
prompt = f"""Evaluate the following agent payment intent and determine if it should be approved.
3540
3641
<payment_request>
37-
<vendor>{intent.target_vendor}</vendor>
42+
<vendor>{_escape_xml(intent.target_vendor)}</vendor>
3843
<amount>{intent.requested_amount}</amount>
39-
<allowed_categories>{policy.allowed_categories}</allowed_categories>
40-
<agent_reasoning>{intent.reasoning}</agent_reasoning>
44+
<allowed_categories>{_escape_xml(str(policy.allowed_categories))}</allowed_categories>
45+
<agent_reasoning>{_escape_xml(intent.reasoning)}</agent_reasoning>
4146
</payment_request>
4247
4348
Rules:
@@ -61,7 +66,8 @@ async def evaluate_intent(self, intent: PaymentIntent, policy: GuardrailPolicy)
6166
response = await self.client.chat.completions.create(**kwargs)
6267
result_text = response.choices[0].message.content
6368
result = json.loads(result_text)
64-
return result.get("approved", False), result.get("reason", "Unknown")
69+
approved = result.get("approved", False) is True
70+
return approved, result.get("reason", "Unknown")
6571
except self._openai.APIStatusError as e:
6672
# Re-raise retriable status codes (rate limit, server errors) so
6773
# tenacity's @retry decorator can back off and retry.

pop_pay/mcp_server.py

Lines changed: 46 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,45 @@ async def _request_human_approval(
267267
return True, "auto-approved (no approval webhook configured)"
268268

269269

270+
async def _scan_and_validate(page_url: str, action_label: str = "Payment") -> tuple[bool, str]:
271+
"""Run security scan on page_url. Returns (ok, message).
272+
273+
ok=True means scan passed (or was skipped). ok=False means the caller
274+
should return `message` immediately as a rejection.
275+
"""
276+
if not page_url:
277+
return True, f" (security scan skipped — no page_url provided)"
278+
279+
# Check cache first (reuse recent scan within 5 minutes)
280+
cached = snapshot_cache.get(page_url)
281+
if cached and datetime.now() - cached["timestamp"] < timedelta(minutes=5):
282+
scan_result = {
283+
"flags": cached["flags"],
284+
"snapshot_id": cached["snapshot_id"],
285+
"safe": "hidden_instructions_detected" not in cached["flags"],
286+
"error": None,
287+
}
288+
else:
289+
scan_result = await _scan_page(page_url)
290+
291+
if scan_result.get("error"):
292+
return False, (
293+
f"{action_label} rejected. Security scan failed: {scan_result['error']} "
294+
f"Snapshot ID: {scan_result['snapshot_id']}. "
295+
f"Fix the URL or skip page_url if the page has no associated URL."
296+
)
297+
298+
if not scan_result["safe"]:
299+
return False, (
300+
f"{action_label} rejected. Security scan detected hidden prompt injection. "
301+
f"Snapshot ID: {scan_result['snapshot_id']}. "
302+
f"Flags: {scan_result['flags']}. "
303+
f"Do not retry this."
304+
)
305+
306+
return True, ""
307+
308+
270309
# ---------------------------------------------------------------------------
271310
# MCP Tools
272311
# ---------------------------------------------------------------------------
@@ -303,37 +342,10 @@ async def request_virtual_card(
303342
# -------------------------------------------------------------------
304343
# P1: Automatic security scan (runs whenever page_url is provided)
305344
# -------------------------------------------------------------------
306-
scan_note = ""
307-
if page_url:
308-
# Check cache first (reuse recent scan within 5 minutes)
309-
cached = snapshot_cache.get(page_url)
310-
if cached and datetime.now() - cached["timestamp"] < timedelta(minutes=5):
311-
scan_result = {
312-
"flags": cached["flags"],
313-
"snapshot_id": cached["snapshot_id"],
314-
"safe": "hidden_instructions_detected" not in cached["flags"],
315-
"error": None,
316-
}
317-
else:
318-
scan_result = await _scan_page(page_url)
319-
320-
if scan_result.get("error"):
321-
# Network/URL error — treat as unsafe; do not issue card
322-
return (
323-
f"Payment rejected. Security scan failed: {scan_result['error']} "
324-
f"Snapshot ID: {scan_result['snapshot_id']}. "
325-
f"Fix the URL or skip page_url if the checkout has no associated URL."
326-
)
327-
328-
if not scan_result["safe"]:
329-
return (
330-
f"Payment rejected. Security scan detected hidden prompt injection. "
331-
f"Snapshot ID: {scan_result['snapshot_id']}. "
332-
f"Flags: {scan_result['flags']}. "
333-
f"Do not retry this payment."
334-
)
335-
else:
336-
scan_note = " (security scan skipped — no page_url provided)"
345+
scan_ok, scan_msg = await _scan_and_validate(page_url, action_label="Payment")
346+
if not scan_ok:
347+
return scan_msg
348+
scan_note = scan_msg # empty string when scan passed, skip note when no page_url
337349

338350
# Human approval gate (if POP_APPROVAL_WEBHOOK is configured)
339351
require_approval = os.getenv("POP_REQUIRE_HUMAN_APPROVAL", "false").lower() == "true"
@@ -503,34 +515,9 @@ async def request_purchaser_info(
503515
# -------------------------------------------------------------------
504516
# P1: Automatic security scan (runs whenever page_url is provided)
505517
# -------------------------------------------------------------------
506-
if page_url:
507-
# Check cache first (reuse recent scan within 5 minutes)
508-
cached = snapshot_cache.get(page_url)
509-
if cached and datetime.now() - cached["timestamp"] < timedelta(minutes=5):
510-
scan_result = {
511-
"flags": cached["flags"],
512-
"snapshot_id": cached["snapshot_id"],
513-
"safe": "hidden_instructions_detected" not in cached["flags"],
514-
"error": None,
515-
}
516-
else:
517-
scan_result = await _scan_page(page_url)
518-
519-
if scan_result.get("error"):
520-
# Network/URL error — treat as unsafe; do not inject info
521-
return (
522-
f"Billing info rejected. Security scan failed: {scan_result['error']} "
523-
f"Snapshot ID: {scan_result['snapshot_id']}. "
524-
f"Fix the URL or skip page_url if the page has no associated URL."
525-
)
526-
527-
if not scan_result["safe"]:
528-
return (
529-
f"Billing info rejected. Security scan detected hidden prompt injection. "
530-
f"Snapshot ID: {scan_result['snapshot_id']}. "
531-
f"Flags: {scan_result['flags']}. "
532-
f"Do not retry this."
533-
)
518+
scan_ok, scan_msg = await _scan_and_validate(page_url, action_label="Billing info")
519+
if not scan_ok:
520+
return scan_msg
534521

535522
if injector is None:
536523
return (

0 commit comments

Comments
 (0)