diff --git a/cookbook/05_agent_os/approvals/approval_multi_tool.py b/cookbook/05_agent_os/approvals/approval_multi_tool.py new file mode 100644 index 0000000000..cbc6534ca8 --- /dev/null +++ b/cookbook/05_agent_os/approvals/approval_multi_tool.py @@ -0,0 +1,113 @@ +""" +Approval Multi-Tool (AgentOS web UI) +===================================== + +Two @approval tools on a single agent. When the user asks for both actions in +the same message, OpenAI's Responses API batches them into one model turn and +the run pauses with **two** RunRequirements in a single pause event — agno +records this as one approval row whose `context.tool_names` lists both tools +and whose `requirements[]` carries each tool_execution. + +Run this alongside os.agno.com to inspect the web UI's handling of the +multi-tool case. The cookbook serves AgentOS on port 7779; point os.agno.com +at a tunnel to this port (e.g. ngrok) and trigger: + + "please delete /tmp/demo.txt and transfer $500 to account 42" + +Expected web UI behaviour (per agno-os/src/pages/Approvals/utils/approvalHelpers.ts): + - card title : "delete_file, transfer_funds" (from context.tool_names) + - args section : one row per tool (from requirements[]) + - buttons : single Approve / Reject pair (row-level resolution) + +One click approves the whole row — both tools run. This is by design: agno +stamps a single approval_id on every paused tool and the web UI surfaces all +of them so the user can see exactly what they are authorizing. +""" + +import os + +from agno.agent import Agent +from agno.approval import approval +from agno.db.sqlite import SqliteDb +from agno.models.openai import OpenAIResponses +from agno.os.app import AgentOS +from agno.tools import tool + +DB_FILE = "tmp/approval_multi_tool.db" + + +@approval +@tool(requires_confirmation=True) +def delete_file(path: str) -> str: + """Delete a file at the given path. + + Args: + path (str): Absolute path to the file that should be deleted. + + Returns: + str: Confirmation message. + """ + return f"Deleted {path}" + + +@approval +@tool(requires_confirmation=True) +def transfer_funds(account_id: str, amount_usd: float) -> str: + """Transfer funds from the company account to another account. + + Args: + account_id (str): Destination account identifier. + amount_usd (float): Amount to transfer in USD. + + Returns: + str: Confirmation of the transfer. + """ + return f"Transferred ${amount_usd:.2f} to account {account_id}" + + +os.makedirs("tmp", exist_ok=True) + +db = SqliteDb( + db_file=DB_FILE, + session_table="agent_sessions", + approvals_table="approvals", +) + +agent = Agent( + name="Approval Multi-Tool Agent", + model=OpenAIResponses(id="gpt-5.4"), + tools=[delete_file, transfer_funds], + db=db, + markdown=True, + instructions=( + "When the user asks you to delete a file, call delete_file. When the " + "user asks you to transfer funds, call transfer_funds. Do not ask the " + "user to confirm in chat — the framework pauses the run for human " + "approval automatically. Just make the tool calls." + ), +) + +# db= is passed to AgentOS so /approvals router is enabled (without it the +# endpoint returns 503 "Approvals not available: pass a db to AgentOS"). +agent_os = AgentOS( + description="Multi-tool approval demo for the AgentOS web UI", + agents=[agent], + db=db, +) +app = agent_os.get_app() + + +if __name__ == "__main__": + """Run AgentOS. + + After starting: open os.agno.com and connect it to this server + (via ngrok if you need a public URL for the web frontend to reach). + Trigger a run such as: + + "please delete /tmp/demo.txt and transfer $500 to account 42" + + The run will pause with both tools listed in context.tool_names and + both tool_executions in requirements[]. Inspect how the web UI + renders this vs the Slack HITL interface. + """ + agent_os.serve(app="approval_multi_tool:app", port=7779, reload=True) diff --git a/cookbook/05_agent_os/interfaces/slack/approval_flow.py b/cookbook/05_agent_os/interfaces/slack/approval_flow.py new file mode 100644 index 0000000000..d9b6c09f1a --- /dev/null +++ b/cookbook/05_agent_os/interfaces/slack/approval_flow.py @@ -0,0 +1,144 @@ +""" +Slack Approval Flow (HITL) +========================== + +Human-in-the-loop tool approval via Slack Block Kit buttons. When the agent +calls a tool decorated with @approval + @tool(requires_confirmation=True), +the run pauses and an approval message appears in the Slack thread. + +Clicking Approve resumes the run; clicking Reject stops the tool and lets +the agent respond around the denial. 'Reject with reason' opens a modal +that collects an optional note passed to the LLM as the rejection reason. + +Key concepts: + - ``hitl_enabled=True`` mounts POST /slack/interactions alongside /slack/events. + - ``approval_authorization="requester_only"`` (default) — only the user + who triggered the run can resolve the approval. + - ``rejection_note_mode="optional"`` (default) — two reject buttons: quick + reject (default note) and reject-with-reason (opens modal). + +Slack app requirements: + - Event Subscriptions URL: ``/slack/events`` (existing) + - Interactivity URL: ``/slack/interactions`` (NEW — required) + - Bot scopes: chat:write, app_mentions:read, assistant:write, im:history + +Env: + - SLACK_TOKEN, SLACK_SIGNING_SECRET, OPENAI_API_KEY +""" + +import os + +from agno.agent import Agent +from agno.approval import approval +from agno.db.sqlite import SqliteDb +from agno.models.openai import OpenAIResponses +from agno.os.app import AgentOS +from agno.os.interfaces.slack import Slack +from agno.tools import tool + +DB_FILE = "tmp/slack_approval.db" + + +@approval +@tool(requires_confirmation=True) +def delete_file(path: str) -> str: + """Delete a file at the given path. + + Args: + path (str): Absolute path to the file that should be deleted. + + Returns: + str: Confirmation message. + """ + # Demo stub — no real deletion happens. In production this would call + # Path(path).unlink(), hit an S3 API, or similar irreversible action. + return f"Deleted {path}" + + +@approval +@tool(requires_confirmation=True) +def transfer_funds(account_id: str, amount_usd: float) -> str: + """Transfer funds from the company account to another account. + + Args: + account_id (str): Destination account identifier. + amount_usd (float): Amount to transfer in USD. + + Returns: + str: Confirmation of the transfer. + """ + # Demo stub — no real transfer happens. + return f"Transferred ${amount_usd:.2f} to account {account_id}" + + +os.makedirs("tmp", exist_ok=True) + +agent_db = SqliteDb( + db_file=DB_FILE, + session_table="agent_sessions", + approvals_table="approvals", +) + +approval_agent = Agent( + name="Approval Demo Agent", + model=OpenAIResponses(id="gpt-5.4"), + tools=[delete_file, transfer_funds], + db=agent_db, + add_history_to_context=True, + num_history_runs=3, + add_datetime_to_context=True, + markdown=True, + instructions=( + "You are a safety-conscious assistant. When asked to delete files or " + "transfer funds, call the appropriate tool. Every such call pauses " + "the run for human approval — wait for the decision and respond " + "naturally to both approvals and rejections. When the user requests " + "multiple sensitive actions in one message, call all of them in the " + "same turn — Slack renders each as its own approvable row, and the " + "run continues only after every row is resolved." + ), +) + +# Passing ``db=agent_db`` to AgentOS enables the /approvals router so the +# os.agno.com dashboard can observe and resolve approvals alongside Slack. +# Without this, /approvals returns 503 even though the agent writes approval +# rows into agent_db. +agent_os = AgentOS( + agents=[approval_agent], + db=agent_db, + interfaces=[ + Slack( + agent=approval_agent, + reply_to_mentions_only=False, + hitl_enabled=True, + ) + ], +) + +app = agent_os.get_app() + + +if __name__ == "__main__": + """Run AgentOS. + + Mount point summary: + - Slack events: POST /slack/events + - Slack interactions: POST /slack/interactions + - AgentOS config: GET /config + + Test flow after tunneling and configuring the Slack app: + 1. @mention the bot: "please delete /tmp/demo.txt" + 2. Bot streams a tool call task card, then a separate approval message + appears in the same thread with Approve / Reject / Reject with reason. + 3. Click Approve — the row updates to "APPROVED by @you"; once every + row in the pause is resolved, the agent posts the follow-up response. + 4. For the reject-with-reason path, a modal opens immediately on click; + the note entered there is passed to the LLM as the rejection reason. + + Multi-tool test: + @mention "please delete /tmp/demo.txt and transfer $500 to account 42" + → the pause message shows two rows with per-tool Approve/Reject buttons + → resolve each row independently (or use "Approve all confirmations") + → the run continues only after every row is decided. + """ + agent_os.serve(app="approval_flow:app", port=7778, reload=True) diff --git a/libs/agno/agno/os/interfaces/slack/authorization.py b/libs/agno/agno/os/interfaces/slack/authorization.py new file mode 100644 index 0000000000..f2a9fee971 --- /dev/null +++ b/libs/agno/agno/os/interfaces/slack/authorization.py @@ -0,0 +1,82 @@ +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Literal, Union + +if TYPE_CHECKING: + from slack_sdk.web.async_client import AsyncWebClient + +ApprovalPolicy = Union[ + Literal["requester_only", "channel_members", "any_authenticated"], + Callable[[str, Dict[str, Any]], Awaitable[bool]], +] + + +async def check_approval_authorization( + policy: ApprovalPolicy, + clicker_user_id: str, + approval: Dict[str, Any], + slack_client: "AsyncWebClient", +) -> bool: + if callable(policy): + return await policy(clicker_user_id, approval) + + if policy == "any_authenticated": + return True + + if policy == "requester_only": + slack_meta = get_slack_meta(approval) + return clicker_user_id == slack_meta.get("requester_slack_user_id") + + if policy == "channel_members": + slack_meta = get_slack_meta(approval) + channel = slack_meta.get("channel_id") + if not channel: + return False + return await _is_channel_member(slack_client, channel, clicker_user_id) + + return False + + +def get_slack_meta(approval: Dict[str, Any]) -> Dict[str, Any]: + return (approval.get("resolution_data") or {}).get("interface", {}).get("slack", {}) + + +# Mirrors the closure-scoped _db_call in libs/agno/agno/os/routers/approvals/router.py:37. +# TODO: when agno.os.approval_service lands, delegate both to that. +async def call_db(db: Any, method_name: str, *args: Any, **kwargs: Any) -> Any: + fn = getattr(db, method_name, None) + if fn is None: + return None + if asyncio.iscoroutinefunction(fn): + return await fn(*args, **kwargs) + return fn(*args, **kwargs) + + +async def _is_channel_member(client: "AsyncWebClient", channel: str, user: str) -> bool: + # conversations.members paginates; walk until we find the user or exhaust. + # For very large channels, a caller-supplied callback policy is the escape hatch. + from agno.utils.log import log_error, log_warning + + cursor = None + try: + while True: + resp = await client.conversations_members(channel=channel, limit=200, cursor=cursor) + if user in (resp.get("members") or []): + return True + cursor = (resp.get("response_metadata") or {}).get("next_cursor") + if not cursor: + return False + except Exception as exc: + # Distinguish rate limits from genuine "not a member". 429 should log + # loudly — fail-closed would otherwise silently deny legitimate approvers. + slack_error = getattr(getattr(exc, "response", None), "data", None) or {} + err_code = slack_error.get("error") if isinstance(slack_error, dict) else None + if err_code == "ratelimited": + log_error( + f"channel_members check rate-limited for channel={channel}; user={user} denied. " + "Consider a callback policy for high-volume channels." + ) + else: + log_warning(f"channel_members check failed for channel={channel} user={user}: {err_code or exc}") + return False diff --git a/libs/agno/agno/os/interfaces/slack/block_kit.py b/libs/agno/agno/os/interfaces/slack/block_kit.py new file mode 100644 index 0000000000..e66e821df3 --- /dev/null +++ b/libs/agno/agno/os/interfaces/slack/block_kit.py @@ -0,0 +1,272 @@ +from __future__ import annotations + +from typing import Annotated, Any, Dict, List, Literal, Optional, Union + +from pydantic import BaseModel, ConfigDict, Field + +# Tier 1 message-scoped Block Kit primitives. Scope is intentionally narrow: +# larger schemas make LLM strict-mode output harder to satisfy and harder to +# maintain. Exotic types (multi-selects, channels/conversations pickers, +# timepicker, datetimepicker, radio_buttons, option_groups) are left for +# per-demand additions. Users who need raw Block Kit can post list[dict] +# through an escape-hatch tool. + +_MAX_TEXT = 3000 +_MAX_OPTION_VALUE = 150 +_MAX_BUTTON_VALUE = 2000 +_MAX_ACTION_ID = 255 +_MAX_BLOCK_ID = 255 +_MAX_ALT_TEXT = 2000 +_MAX_URL = 3000 +_MAX_FALLBACK_TEXT = 500 +_MAX_SECTION_FIELDS = 10 +_MAX_ACTIONS_ELEMENTS = 25 +_MAX_CONTEXT_ELEMENTS = 10 +_MAX_MESSAGE_BLOCKS = 50 +_MAX_STATIC_OPTIONS = 100 +_MAX_OVERFLOW_OPTIONS = 5 +_MAX_CHOICE_OPTIONS = 10 + + +class _Strict(BaseModel): + model_config = ConfigDict(extra="forbid", populate_by_name=True) + + +# Text primitives -------------------------------------------------------------- + + +class PlainText(_Strict): + type: Literal["plain_text"] = "plain_text" + text: str = Field(..., max_length=_MAX_TEXT) + emoji: bool = True + + +class Markdown(_Strict): + type: Literal["mrkdwn"] = "mrkdwn" + text: str = Field(..., max_length=_MAX_TEXT) + + +Text = Annotated[Union[PlainText, Markdown], Field(discriminator="type")] + + +# Composition ------------------------------------------------------------------ + + +class Option(_Strict): + text: PlainText + value: str = Field(..., max_length=_MAX_OPTION_VALUE) + description: Optional[PlainText] = None + + +class ConfirmDialog(_Strict): + title: PlainText + text: Union[PlainText, Markdown] + confirm: PlainText + deny: PlainText + style: Optional[Literal["primary", "danger"]] = None + + +# Interactive elements --------------------------------------------------------- + + +class Button(_Strict): + type: Literal["button"] = "button" + action_id: str = Field(..., max_length=_MAX_ACTION_ID) + text: PlainText + value: Optional[str] = Field(None, max_length=_MAX_BUTTON_VALUE) + style: Optional[Literal["primary", "danger"]] = None + url: Optional[str] = Field(None, max_length=_MAX_URL) + confirm: Optional[ConfirmDialog] = None + + +class StaticSelect(_Strict): + type: Literal["static_select"] = "static_select" + action_id: str = Field(..., max_length=_MAX_ACTION_ID) + placeholder: PlainText + options: List[Option] = Field(..., min_length=1, max_length=_MAX_STATIC_OPTIONS) + initial_option: Optional[Option] = None + confirm: Optional[ConfirmDialog] = None + + +class UsersSelect(_Strict): + type: Literal["users_select"] = "users_select" + action_id: str = Field(..., max_length=_MAX_ACTION_ID) + placeholder: PlainText + initial_user: Optional[str] = None + confirm: Optional[ConfirmDialog] = None + + +class Overflow(_Strict): + type: Literal["overflow"] = "overflow" + action_id: str = Field(..., max_length=_MAX_ACTION_ID) + options: List[Option] = Field(..., min_length=1, max_length=_MAX_OVERFLOW_OPTIONS) + confirm: Optional[ConfirmDialog] = None + + +class Datepicker(_Strict): + type: Literal["datepicker"] = "datepicker" + action_id: str = Field(..., max_length=_MAX_ACTION_ID) + placeholder: Optional[PlainText] = None + initial_date: Optional[str] = Field(None, pattern=r"^\d{4}-\d{2}-\d{2}$") + confirm: Optional[ConfirmDialog] = None + + +class Checkboxes(_Strict): + type: Literal["checkboxes"] = "checkboxes" + action_id: str = Field(..., max_length=_MAX_ACTION_ID) + options: List[Option] = Field(..., min_length=1, max_length=_MAX_CHOICE_OPTIONS) + initial_options: Optional[List[Option]] = None + confirm: Optional[ConfirmDialog] = None + + +# Modal-only input element — required for the HITL rejection reason modal. +class PlainTextInput(_Strict): + type: Literal["plain_text_input"] = "plain_text_input" + action_id: str = Field(..., max_length=_MAX_ACTION_ID) + placeholder: Optional[PlainText] = None + initial_value: Optional[str] = None + multiline: Optional[bool] = None + max_length: Optional[int] = Field(None, ge=1, le=_MAX_TEXT) + + +# Image element — used as Section accessory or Context child (non-interactive). +class ImageElement(_Strict): + type: Literal["image"] = "image" + image_url: str = Field(..., max_length=_MAX_URL) + alt_text: str = Field(..., max_length=_MAX_ALT_TEXT) + + +InteractiveElement = Annotated[ + Union[Button, StaticSelect, UsersSelect, Overflow, Datepicker, Checkboxes], + Field(discriminator="type"), +] + +SectionAccessory = Annotated[ + Union[Button, StaticSelect, UsersSelect, Overflow, Datepicker, Checkboxes, ImageElement], + Field(discriminator="type"), +] + +ContextElement = Annotated[Union[PlainText, Markdown, ImageElement], Field(discriminator="type")] + + +# Blocks ----------------------------------------------------------------------- + + +class Header(_Strict): + type: Literal["header"] = "header" + text: PlainText + block_id: Optional[str] = Field(None, max_length=_MAX_BLOCK_ID) + + +class Section(_Strict): + type: Literal["section"] = "section" + text: Optional[Union[PlainText, Markdown]] = None + fields: Optional[List[Union[PlainText, Markdown]]] = Field(None, max_length=_MAX_SECTION_FIELDS) + accessory: Optional[SectionAccessory] = None + block_id: Optional[str] = Field(None, max_length=_MAX_BLOCK_ID) + + +class Divider(_Strict): + type: Literal["divider"] = "divider" + block_id: Optional[str] = Field(None, max_length=_MAX_BLOCK_ID) + + +class Actions(_Strict): + type: Literal["actions"] = "actions" + elements: List[InteractiveElement] = Field(..., min_length=1, max_length=_MAX_ACTIONS_ELEMENTS) + block_id: Optional[str] = Field(None, max_length=_MAX_BLOCK_ID) + + +class Context(_Strict): + type: Literal["context"] = "context" + elements: List[ContextElement] = Field(..., min_length=1, max_length=_MAX_CONTEXT_ELEMENTS) + block_id: Optional[str] = Field(None, max_length=_MAX_BLOCK_ID) + + +class ImageBlock(_Strict): + type: Literal["image"] = "image" + image_url: str = Field(..., max_length=_MAX_URL) + alt_text: str = Field(..., max_length=_MAX_ALT_TEXT) + title: Optional[PlainText] = None + block_id: Optional[str] = Field(None, max_length=_MAX_BLOCK_ID) + + +# Modal-only block — required for the HITL rejection reason modal. +class InputBlock(_Strict): + type: Literal["input"] = "input" + label: PlainText + element: Union[PlainTextInput, InteractiveElement] + hint: Optional[PlainText] = None + optional: Optional[bool] = None + block_id: Optional[str] = Field(None, max_length=_MAX_BLOCK_ID) + + +Block = Annotated[ + Union[Header, Section, Divider, Actions, Context, ImageBlock], + Field(discriminator="type"), +] + +ModalBlock = Annotated[ + Union[Header, Section, Divider, Actions, Context, ImageBlock, InputBlock], + Field(discriminator="type"), +] + + +# Top-level wrappers ----------------------------------------------------------- + + +class BlockKitMessage(_Strict): + text: str = Field(..., max_length=_MAX_FALLBACK_TEXT) + blocks: List[Block] = Field(..., min_length=1, max_length=_MAX_MESSAGE_BLOCKS) + + def to_slack_payload(self) -> Dict[str, Any]: + return { + "text": self.text, + "blocks": [b.model_dump(exclude_none=True, mode="json") for b in self.blocks], + } + + +class ModalView(_Strict): + type: Literal["modal"] = "modal" + title: PlainText + blocks: List[ModalBlock] = Field(..., min_length=1, max_length=100) + close: Optional[PlainText] = None + submit: Optional[PlainText] = None + private_metadata: Optional[str] = Field(None, max_length=3000) + callback_id: Optional[str] = Field(None, max_length=_MAX_ACTION_ID) + clear_on_close: Optional[bool] = None + notify_on_close: Optional[bool] = None + + def to_slack_payload(self) -> Dict[str, Any]: + return self.model_dump(exclude_none=True, mode="json") + + +__all__ = [ + "PlainText", + "Markdown", + "Text", + "Option", + "ConfirmDialog", + "Button", + "StaticSelect", + "UsersSelect", + "Overflow", + "Datepicker", + "Checkboxes", + "PlainTextInput", + "ImageElement", + "InteractiveElement", + "SectionAccessory", + "ContextElement", + "Header", + "Section", + "Divider", + "Actions", + "Context", + "ImageBlock", + "InputBlock", + "Block", + "ModalBlock", + "BlockKitMessage", + "ModalView", +] diff --git a/libs/agno/agno/os/interfaces/slack/blocks.py b/libs/agno/agno/os/interfaces/slack/blocks.py new file mode 100644 index 0000000000..df414f81ae --- /dev/null +++ b/libs/agno/agno/os/interfaces/slack/blocks.py @@ -0,0 +1,642 @@ +from __future__ import annotations + +import json +from typing import Any, Dict, List, Literal, Optional + +from agno.os.interfaces.slack.block_kit import ( + Actions, + BlockKitMessage, + Button, + Checkboxes, + Context, + Divider, + Header, + InputBlock, + Markdown, + ModalView, + Option, + PlainText, + PlainTextInput, + Section, + StaticSelect, +) +from agno.os.interfaces.slack.resolutions import ( + RequirementKind, + REQ_CONFIRMATION, + REQ_EXTERNAL_EXECUTION, + REQ_USER_FEEDBACK, + REQ_USER_INPUT, + classify_tool_execution, +) +from agno.utils.log import log_warning + +# Block Kit option-count caps, mirrored from block_kit.py internals so we can +# clamp user_feedback question options before Slack rejects the view payload. +# Kept as local constants rather than importing the block_kit privates so that +# schema-level changes there don't silently change behavior here. +_CHECKBOXES_MAX_OPTIONS = 10 # block_kit._MAX_CHOICE_OPTIONS +_STATIC_SELECT_MAX_OPTIONS = 100 # block_kit._MAX_STATIC_OPTIONS + +# ----------------------------------------------------------------------------- +# action_id / callback_id constants — the /interactions dispatcher routes on these +# ----------------------------------------------------------------------------- + +# Confirmation rows (row-level) +ACTION_APPROVE = "slack_hitl_approve" +ACTION_REJECT_QUICK = "slack_hitl_reject_quick" +ACTION_REJECT_WITH_REASON = "slack_hitl_reject_with_reason" + +# Non-confirmation rows — opens a modal +ACTION_PROVIDE_INPUT = "slack_hitl_provide_input" +ACTION_ANSWER_FEEDBACK = "slack_hitl_answer_feedback" +ACTION_SUBMIT_RESULT = "slack_hitl_submit_result" + +# view.callback_id for each modal +VIEW_REJECT_MODAL = "slack_hitl_reject_modal" +VIEW_USER_INPUT_MODAL = "slack_hitl_user_input_modal" +VIEW_USER_FEEDBACK_MODAL = "slack_hitl_user_feedback_modal" +VIEW_EXTERNAL_EXECUTION_MODAL = "slack_hitl_external_execution_modal" + +# Tool arg preview is truncated so long payloads don't blow up the message; +# the full args remain accessible via agent logs / DB. +_ARG_PREVIEW_MAX = 400 +_ARG_VALUE_MAX = 120 + +# Slack caps a chat.postMessage at 50 blocks. Each row uses Section + optional +# Actions (2 blocks for confirmation, 1 for non-confirmation with accessory). +# Header/divider/footer eat ~4. We hard-cap pause rendering at 20 rows so the +# worst case (all confirmation) stays under 50 blocks with margin. +_MAX_REQUIREMENTS_PER_MESSAGE = 15 # lower cap: each row now uses Divider + Section + Actions (3 blocks) + +# Emoji prefixes per pause kind — visual cue for the row without requiring +# users to read the tool name. Kept as ASCII slack codes so Slack's emoji +# renderer converts them; falls back to literal on clients that don't. +_KIND_EMOJI: Dict[RequirementKind, str] = { + REQ_CONFIRMATION: ":wrench:", + REQ_USER_INPUT: ":pencil2:", + REQ_USER_FEEDBACK: ":speech_balloon:", + REQ_EXTERNAL_EXECUTION: ":rocket:", +} + +RejectionNoteMode = Literal["optional", "required", "disabled"] + + +def encode_action_value( + entity_id: str, + session_id: str, + run_id: str, + approval_id: str, + kind: str, + tool_name: Optional[str] = None, + requirement_id: Optional[str] = None, +) -> str: + payload: Dict[str, Any] = {"e": entity_id, "s": session_id, "r": run_id, "a": approval_id, "k": kind} + if tool_name: + payload["t"] = tool_name + if requirement_id: + payload["q"] = requirement_id + return json.dumps(payload, separators=(",", ":")) + + +def decode_action_value(value: str) -> Dict[str, str]: + return json.loads(value) + + +def _encode_private_metadata( + *, + entity_id: str, + session_id: str, + run_id: str, + approval_id: str, + requirement_id: Optional[str] = None, +) -> str: + """Compact JSON envelope for modal ``private_metadata`` — 1-letter keys keep + the payload under Slack's 3000-char limit even with long UUIDs.""" + payload: Dict[str, Any] = {"e": entity_id, "s": session_id, "r": run_id, "a": approval_id} + if requirement_id is not None: + payload["q"] = requirement_id + return json.dumps(payload, separators=(",", ":")) + + +def pause_request_blocks( + *, + entity_id: str, + session_id: str, + run_id: str, + approval_id: str, + requirements: List[Dict[str, Any]], + requirement_resolutions: Dict[str, Dict[str, Any]], + rejection_note_mode: RejectionNoteMode = "optional", +) -> List[Dict[str, Any]]: + if not requirements: + # Should never happen — pause_handler guards against it — but degrade + # gracefully to a plain acknowledgement rather than crashing. + msg = BlockKitMessage( + text="Approval required", + blocks=[Section(text=Markdown(text="*Approval required* (no requirements found)"))], + ) + return msg.to_slack_payload()["blocks"] + + truncated = False + rendered_reqs = requirements + if len(requirements) > _MAX_REQUIREMENTS_PER_MESSAGE: + log_warning( + f"Pause has {len(requirements)} requirements; clamping Slack UI to {_MAX_REQUIREMENTS_PER_MESSAGE}. " + "Remaining must be resolved via dashboard." + ) + rendered_reqs = requirements[:_MAX_REQUIREMENTS_PER_MESSAGE] + truncated = True + + blocks: List[Any] = [] + + row_count = len(rendered_reqs) + blocks.append(Header(text=PlainText(text="Approval required"))) + + if truncated: + subtitle = f"Showing {row_count} of {len(requirements)} actions — resolve the rest from the dashboard" + elif row_count == 1: + subtitle = "1 action needs your decision" + else: + subtitle = f"{row_count} actions need your decision — tap Confirm or Reject on each row" + blocks.append(Context(elements=[Markdown(text=subtitle)])) # type: ignore[arg-type] + + for idx, requirement in enumerate(rendered_reqs): + requirement_id = requirement.get("id") or "" + tool_exec = requirement.get("tool_execution") or {} + row_state = requirement_resolutions.get(requirement_id) or {} + + tool_name = tool_exec.get("tool_name") or "tool" + kind = classify_tool_execution(tool_exec) + + blocks.append(Divider()) + + if row_state.get("status", "pending") != "pending": + blocks.append(Section(text=Markdown(text=_row_resolved_text(tool_name, kind, row_state)))) + continue + + if kind == REQ_CONFIRMATION: + blocks.extend( + _confirmation_row_blocks( + entity_id=entity_id, + session_id=session_id, + run_id=run_id, + approval_id=approval_id, + requirement_id=requirement_id, + tool_name=tool_name, + tool_args=tool_exec.get("tool_args"), + rejection_note_mode=rejection_note_mode, + ) + ) + else: + blocks.extend( + _modal_row_blocks( + entity_id=entity_id, + session_id=session_id, + run_id=run_id, + approval_id=approval_id, + requirement_id=requirement_id, + tool_name=tool_name, + tool_args=tool_exec.get("tool_args"), + kind=kind, + ) + ) + + fallback = f"Approval required for {row_count} action{'s' if row_count != 1 else ''}" + msg = BlockKitMessage(text=fallback, blocks=blocks) # type: ignore[arg-type] + return msg.to_slack_payload()["blocks"] + + +def _confirmation_row_blocks( + *, + entity_id: str, + session_id: str, + run_id: str, + approval_id: str, + requirement_id: str, + tool_name: str, + tool_args: Optional[Dict[str, Any]], + rejection_note_mode: RejectionNoteMode, +) -> List[Any]: + emoji = _KIND_EMOJI[REQ_CONFIRMATION] + title_section = Section(text=Markdown(text=f"{emoji} *{tool_name}*")) + + arg_fields = _arg_fields(tool_args) + blocks: List[Any] = [title_section] + if arg_fields: + blocks.append(Section(fields=arg_fields)) # type: ignore[arg-type] + + if rejection_note_mode == "disabled": + reject_action = ACTION_REJECT_QUICK + reject_kind = "reject_quick" + else: + reject_action = ACTION_REJECT_WITH_REASON + reject_kind = "reject_with_reason" + + buttons: List[Button] = [ + Button( + action_id=ACTION_APPROVE, + text=PlainText(text="Confirm"), + value=encode_action_value(entity_id, session_id, run_id, approval_id, "approve", tool_name, requirement_id), + style="primary", + ), + Button( + action_id=reject_action, + text=PlainText(text="Reject"), + value=encode_action_value( + entity_id, session_id, run_id, approval_id, reject_kind, tool_name, requirement_id + ), + style="danger", + ), + ] + blocks.append(Actions(elements=buttons)) # type: ignore[arg-type] + return blocks + + +_MODAL_ROW_UI: Dict[RequirementKind, Dict[str, str]] = { + REQ_USER_INPUT: { + "button_text": "Provide input", + "action_id": ACTION_PROVIDE_INPUT, + "type_label": "Input needed", + "act_kind": "provide_input", + }, + REQ_USER_FEEDBACK: { + "button_text": "Answer", + "action_id": ACTION_ANSWER_FEEDBACK, + "type_label": "Feedback needed", + "act_kind": "answer_feedback", + }, + REQ_EXTERNAL_EXECUTION: { + "button_text": "Submit result", + "action_id": ACTION_SUBMIT_RESULT, + "type_label": "External result needed", + "act_kind": "submit_result", + }, +} + + +def _modal_row_blocks( + *, + entity_id: str, + session_id: str, + run_id: str, + approval_id: str, + requirement_id: str, + tool_name: str, + tool_args: Optional[Dict[str, Any]], + kind: RequirementKind, +) -> List[Any]: + ui = _MODAL_ROW_UI[kind] + button_text = ui["button_text"] + action_id = ui["action_id"] + type_label = ui["type_label"] + act_kind = ui["act_kind"] + emoji = _KIND_EMOJI[kind] + title_section = Section( + text=Markdown(text=f"{emoji} *{tool_name}* _{type_label}_"), + accessory=Button( + action_id=action_id, + text=PlainText(text=button_text), + value=encode_action_value(entity_id, session_id, run_id, approval_id, act_kind, tool_name, requirement_id), + style="primary", + ), + ) + + arg_fields = _arg_fields(tool_args) + blocks: List[Any] = [title_section] + if arg_fields: + blocks.append(Section(fields=arg_fields)) # type: ignore[arg-type] + return blocks + + +def _row_resolved_text(tool_name: str, kind: RequirementKind, row_state: Dict[str, Any]) -> str: + status = row_state.get("status", "approved") + by = row_state.get("by") + at = row_state.get("at") + by_str = f"<@{by}>" if by else "system" + time_str = _format_time(at) if at else "just now" + emoji = _KIND_EMOJI[kind] + + if kind == REQ_CONFIRMATION: + label = status.upper() if isinstance(status, str) else "RESOLVED" + line = f"{emoji} *{label}* `{tool_name}` by {by_str} {time_str}" + note = row_state.get("note") + if note and status == "rejected": + line = f"{line}\n>{_truncate(note, 500)}" + return line + + if kind == REQ_USER_INPUT: + values = row_state.get("values") or {} + count = len(values) + return ( + f"{emoji} *INPUT PROVIDED* `{tool_name}` — {count} field{'s' if count != 1 else ''} by {by_str} {time_str}" + ) + + if kind == REQ_USER_FEEDBACK: + selections = row_state.get("selections") or {} + count = len(selections) + return f"{emoji} *FEEDBACK PROVIDED* `{tool_name}` — {count} answer{'s' if count != 1 else ''} by {by_str} {time_str}" + + if kind == REQ_EXTERNAL_EXECUTION: + return f"{emoji} *RESULT SUBMITTED* `{tool_name}` by {by_str} {time_str}" + + return f"{emoji} *RESOLVED* `{tool_name}` by {by_str} {time_str}" + + +def pause_resolved_message_blocks( + *, + aggregate_status: Literal["approved", "rejected", "cancelled", "expired"], + requirements: List[Dict[str, Any]], + requirement_resolutions: Dict[str, Dict[str, Any]], +) -> List[Dict[str, Any]]: + count = len(requirements) + status_emoji = {"approved": ":white_check_mark:", "rejected": ":x:"}.get(aggregate_status, ":information_source:") + header = Header(text=PlainText(text=f"{aggregate_status.title()}")) + subtitle = Context( + elements=[Markdown(text=f"{status_emoji} {count} action{'s' if count != 1 else ''} resolved — run continued")] + ) # type: ignore[arg-type] + + blocks: List[Any] = [header, subtitle, Divider()] + for requirement in requirements: + requirement_id = requirement.get("id") or "" + tool_exec = requirement.get("tool_execution") or {} + tool_name = tool_exec.get("tool_name") or "tool" + kind = classify_tool_execution(tool_exec) + row_state = requirement_resolutions.get(requirement_id) or {"status": "pending"} + blocks.append(Section(text=Markdown(text=_row_resolved_text(tool_name, kind, row_state)))) + + fallback = f"Approval {aggregate_status}" + msg = BlockKitMessage(text=fallback, blocks=blocks) # type: ignore[arg-type] + return msg.to_slack_payload()["blocks"] + + +def reject_reason_modal_view( + *, + entity_id: str, + session_id: str, + run_id: str, + approval_id: str, + tool_name: str, + requirement_id: Optional[str] = None, +) -> Dict[str, Any]: + private_metadata = _encode_private_metadata( + entity_id=entity_id, + session_id=session_id, + run_id=run_id, + approval_id=approval_id, + requirement_id=requirement_id or "", + ) + view = ModalView( + title=PlainText(text="Reject approval"), + submit=PlainText(text="Reject"), + close=PlainText(text="Cancel"), + callback_id=VIEW_REJECT_MODAL, + private_metadata=private_metadata, + blocks=[ + Section(text=Markdown(text=f"Rejecting `{tool_name}`")), + InputBlock( + block_id="rejection_reason", + optional=True, + label=PlainText(text="Reason (optional)"), + element=PlainTextInput( + action_id="reason_input", + multiline=True, + max_length=500, + placeholder=PlainText(text="Why are you rejecting this?"), + ), + ), + ], + ) + return view.to_slack_payload() + + +def user_input_modal_view( + *, + entity_id: str, + session_id: str, + run_id: str, + approval_id: str, + requirement_id: str, + tool_name: str, + user_input_schema: List[Dict[str, Any]], +) -> Dict[str, Any]: + private_metadata = _encode_private_metadata( + entity_id=entity_id, + session_id=session_id, + run_id=run_id, + approval_id=approval_id, + requirement_id=requirement_id, + ) + blocks: List[Any] = [Section(text=Markdown(text=f"Provide input for `{tool_name}`"))] + + for i, field in enumerate(user_input_schema): + field_name = field.get("name") or f"field_{i}" + description = field.get("description") or "" + field_type = field.get("field_type") or "str" + label = f"{field_name}" + (f" ({field_type})" if field_type != "str" else "") + placeholder = description[:150] if description else f"Value for {field_name}" + + if field_type == "bool": + element: Any = StaticSelect( + action_id="value", + placeholder=PlainText(text="Select"), + options=[ + Option(text=PlainText(text="True"), value="true"), + Option(text=PlainText(text="False"), value="false"), + ], + ) + else: + current_value = field.get("value") + initial = None + if current_value is not None: + try: + initial = str(current_value)[:3000] + except Exception: # noqa: BLE001 + initial = None + element = PlainTextInput( + action_id="value", + placeholder=PlainText(text=placeholder), + multiline=field_type in {"list", "dict"}, + initial_value=initial, + ) + + blocks.append( + InputBlock( + block_id=f"f_{i}", + label=PlainText(text=label[:75]), + element=element, + ) + ) + + view = ModalView( + title=PlainText(text="Provide input"), + submit=PlainText(text="Submit"), + close=PlainText(text="Cancel"), + callback_id=VIEW_USER_INPUT_MODAL, + private_metadata=private_metadata, + blocks=blocks, + ) + return view.to_slack_payload() + + +def user_feedback_modal_view( + *, + entity_id: str, + session_id: str, + run_id: str, + approval_id: str, + requirement_id: str, + tool_name: str, + user_feedback_schema: List[Dict[str, Any]], +) -> Dict[str, Any]: + private_metadata = _encode_private_metadata( + entity_id=entity_id, + session_id=session_id, + run_id=run_id, + approval_id=approval_id, + requirement_id=requirement_id, + ) + blocks: List[Any] = [Section(text=Markdown(text=f"Feedback for `{tool_name}`"))] + + for i, question in enumerate(user_feedback_schema): + q_text = question.get("question") or f"Question {i + 1}" + options_raw = question.get("options") or [] + multi = bool(question.get("multi_select")) + + if not options_raw: + log_warning(f"user_feedback question {i} has no options; skipping") + continue + + # Option.value must be unique within a select; use label itself (clamped). + options: List[Option] = [] + option_cap = _CHECKBOXES_MAX_OPTIONS if multi else _STATIC_SELECT_MAX_OPTIONS + for opt in options_raw[:option_cap]: + label = (opt.get("label") or "").strip() or "Option" + desc = opt.get("description") + options.append( + Option( + text=PlainText(text=label[:75]), + value=label[:150], + description=PlainText(text=desc[:75]) if desc else None, + ) + ) + + if not options: + continue + + if multi: + if len(options_raw) > _CHECKBOXES_MAX_OPTIONS: + log_warning( + f"user_feedback multi_select question {i} truncated to " + f"{_CHECKBOXES_MAX_OPTIONS} options (Slack Checkboxes limit)" + ) + element: Any = Checkboxes(action_id="value", options=options) + else: + element = StaticSelect(action_id="value", placeholder=PlainText(text="Select"), options=options) + + blocks.append( + InputBlock( + block_id=f"q_{i}", + label=PlainText(text=q_text[:2000]), + element=element, + ) + ) + + view = ModalView( + title=PlainText(text="Provide feedback"), + submit=PlainText(text="Submit"), + close=PlainText(text="Cancel"), + callback_id=VIEW_USER_FEEDBACK_MODAL, + private_metadata=private_metadata, + blocks=blocks, + ) + return view.to_slack_payload() + + +def external_execution_modal_view( + *, + entity_id: str, + session_id: str, + run_id: str, + approval_id: str, + requirement_id: str, + tool_name: str, +) -> Dict[str, Any]: + private_metadata = _encode_private_metadata( + entity_id=entity_id, + session_id=session_id, + run_id=run_id, + approval_id=approval_id, + requirement_id=requirement_id, + ) + view = ModalView( + title=PlainText(text="Submit result"), + submit=PlainText(text="Submit"), + close=PlainText(text="Cancel"), + callback_id=VIEW_EXTERNAL_EXECUTION_MODAL, + private_metadata=private_metadata, + blocks=[ + Section(text=Markdown(text=f"Provide the result for `{tool_name}`")), + InputBlock( + block_id="result", + label=PlainText(text="Result"), + element=PlainTextInput( + action_id="value", + multiline=True, + max_length=3000, + placeholder=PlainText(text="Paste the result the tool produced"), + ), + ), + ], + ) + return view.to_slack_payload() + + +def pending_approval_reminder_text() -> str: + return "This thread is waiting on an approval. Use the buttons above, or type `cancel` to abort the pending run." + + +def _format_time(epoch: Optional[int]) -> str: + if not epoch: + return "just now" + import time as _time + + fallback = _time.strftime("%Y-%m-%d %H:%M", _time.gmtime(int(epoch))) + return f"" + + +def _arg_fields(args: Optional[Dict[str, Any]]) -> List[Markdown]: + if not args: + return [] + fields: List[Markdown] = [] + max_pairs = 4 # leave room for the "more truncated" hint if needed (5 pairs total) + items = list(args.items()) + total = 0 + for key, value in items[:max_pairs]: + rendered = _render_value(value) + key_md = f"*{key}*" + val_md = f"`{rendered}`" + pair_len = len(key_md) + len(val_md) + if total + pair_len > _ARG_PREVIEW_MAX: + break + fields.append(Markdown(text=key_md)) + fields.append(Markdown(text=val_md)) + total += pair_len + remaining = len(items) - (len(fields) // 2) + if remaining > 0: + fields.append(Markdown(text="_…_")) + fields.append(Markdown(text=f"_{remaining} more field{'s' if remaining != 1 else ''} truncated_")) + return fields + + +def _render_value(value: Any) -> str: + try: + rendered = value if isinstance(value, str) else json.dumps(value, default=str) + except (TypeError, ValueError): + rendered = str(value) + return _truncate(rendered, _ARG_VALUE_MAX) + + +def _truncate(text: str, limit: int) -> str: + if len(text) <= limit: + return text + return text[: limit - 1] + "…" diff --git a/libs/agno/agno/os/interfaces/slack/events.py b/libs/agno/agno/os/interfaces/slack/events.py index fb44af199a..5c6130ba12 100644 --- a/libs/agno/agno/os/interfaces/slack/events.py +++ b/libs/agno/agno/os/interfaces/slack/events.py @@ -263,6 +263,16 @@ async def _on_run_error(chunk: BaseRunOutputEvent, state: StreamState, stream: A return True +async def _on_run_paused(chunk: BaseRunOutputEvent, state: StreamState, stream: AsyncChatStream) -> bool: + # Pause UI rendering needs entity/DB/channel context that handlers don't have. + # Capture the event on state; the streaming loop's caller renders the UI. + state.paused_event = chunk + # Keep the paused tool's task card in_progress rather than forcing it to + # "complete" via the default final-flush — the tool hasn't executed yet. + state.terminal_status = "in_progress" + return True + + # ============================================================================= # Workflow Event Handlers (require custom logic) # ============================================================================= @@ -387,6 +397,7 @@ async def _on_loop_execution_completed(chunk: BaseRunOutputEvent, state: StreamS RunEvent.run_completed.value: _on_run_completed, RunEvent.run_error.value: _on_run_error, RunEvent.run_cancelled.value: _on_run_error, # Treat cancellation as terminal error + RunEvent.run_paused.value: _on_run_paused, # ------------------------------------------------------------------------- # Workflow Lifecycle Events # ------------------------------------------------------------------------- diff --git a/libs/agno/agno/os/interfaces/slack/interactions.py b/libs/agno/agno/os/interfaces/slack/interactions.py new file mode 100644 index 0000000000..ebc3be83ed --- /dev/null +++ b/libs/agno/agno/os/interfaces/slack/interactions.py @@ -0,0 +1,1117 @@ +from __future__ import annotations + +import json +import time +from dataclasses import dataclass +from ssl import SSLContext +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional + +from fastapi import APIRouter, BackgroundTasks, HTTPException, Request + +from agno.agent import Agent +from agno.team import Team +from agno.os.interfaces.slack.authorization import ( + ApprovalPolicy, + call_db, + check_approval_authorization, + get_slack_meta, +) +from agno.os.interfaces.slack.blocks import ( + ACTION_ANSWER_FEEDBACK, + ACTION_APPROVE, + ACTION_PROVIDE_INPUT, + ACTION_REJECT_QUICK, + ACTION_REJECT_WITH_REASON, + ACTION_SUBMIT_RESULT, + VIEW_EXTERNAL_EXECUTION_MODAL, + VIEW_REJECT_MODAL, + VIEW_USER_FEEDBACK_MODAL, + VIEW_USER_INPUT_MODAL, + RejectionNoteMode, + decode_action_value, + external_execution_modal_view, + pause_request_blocks, + pause_resolved_message_blocks, + reject_reason_modal_view, + user_feedback_modal_view, + user_input_modal_view, +) +from agno.os.interfaces.slack.resolutions import ( + UnresolvedRequirementError, + apply_resolutions_to_requirements, + compute_aggregate_status, + initialize_requirement_resolutions, + write_row_resolution, +) +from agno.os.interfaces.slack.events import process_event +from agno.os.interfaces.slack.helpers import upload_response_media_async +from agno.os.interfaces.slack.security import verify_slack_signature +from agno.os.interfaces.slack.state import STREAM_CHAR_LIMIT, StreamState +from agno.run.requirement import RunRequirement +from agno.utils.log import log_debug, log_error, log_exception, log_info, log_warning + +if TYPE_CHECKING: + from slack_sdk.web.async_client import AsyncWebClient + + +_GENERIC_DENIAL = "You are not authorized to resolve this approval." +_STALE_TEXT = "This row has already been resolved." +_MISSING_APPROVAL = "This approval no longer exists." +_CONTINUE_FAILED = ( + "The run failed to resume. Your decision was saved — retry from the dashboard or re-click a pending row." +) +_FALLBACK_REJECT_NOTE = "Rejected via Slack." + + +EntityType = Literal["agent", "team", "workflow"] + + +@dataclass(frozen=True) +class StreamingConfig: + entity_name: str + entity_type: EntityType + task_display_mode: str + buffer_size: int + + +def attach_interaction_routes( + router: APIRouter, + *, + entity: Agent | Team, + entity_id: str, + token: Optional[str], + signing_secret: Optional[str], + ssl: Optional[SSLContext], + approval_authorization: ApprovalPolicy, + rejection_note_mode: RejectionNoteMode, + streaming_config: StreamingConfig, + op_suffix: str, +) -> None: + # Lazy import — the router handlers need AsyncWebClient per-request, not module-level. + @router.post( + "/interactions", + operation_id=f"slack_interactions_{op_suffix}", + name="slack_interactions", + description="Process Slack interactive payloads (buttons, modal submissions)", + responses={ + 200: {"description": "Interaction accepted"}, + 400: {"description": "Malformed interaction payload"}, + 403: {"description": "Invalid Slack signature"}, + }, + ) + async def slack_interactions(request: Request, background_tasks: BackgroundTasks): + # Signature is HMAC'd over raw bytes — decode only after verification. + body = await request.body() + timestamp = request.headers.get("X-Slack-Request-Timestamp") + slack_signature = request.headers.get("X-Slack-Signature", "") + + if not timestamp or not slack_signature: + raise HTTPException(status_code=400, detail="Missing Slack headers") + + if not verify_slack_signature(body, timestamp, slack_signature, signing_secret=signing_secret): + raise HTTPException(status_code=403, detail="Invalid signature") + + # Slack retries after ~3s on missing 200; since we ack immediately any + # retry duplicates scheduled work — drop it. + if request.headers.get("X-Slack-Retry-Num"): + return {"ok": True} + + form = await request.form() + payload_raw = form.get("payload") + # Slack always sends payload as a URL-encoded string; guard against + # the UploadFile case starlette's form() can technically return. + if not isinstance(payload_raw, str) or not payload_raw: + raise HTTPException(status_code=400, detail="Missing payload") + try: + payload = json.loads(payload_raw) + except json.JSONDecodeError as exc: + raise HTTPException(status_code=400, detail="Invalid payload JSON") from exc + + payload_type = payload.get("type") + + if payload_type == "block_actions": + return await _route_block_actions( + payload=payload, + background_tasks=background_tasks, + entity=entity, + entity_id=entity_id, + token=token, + ssl=ssl, + approval_authorization=approval_authorization, + rejection_note_mode=rejection_note_mode, + streaming_config=streaming_config, + ) + + if payload_type == "view_submission": + return await _route_view_submission( + payload=payload, + background_tasks=background_tasks, + entity=entity, + entity_id=entity_id, + token=token, + ssl=ssl, + approval_authorization=approval_authorization, + rejection_note_mode=rejection_note_mode, + streaming_config=streaming_config, + ) + + log_debug(f"Unhandled Slack interaction type: {payload_type}") + return {"ok": True} + + +# ============================================================================= +# block_actions routing +# ============================================================================= + + +async def _route_block_actions( + *, + payload: Dict[str, Any], + background_tasks: BackgroundTasks, + entity: Agent | Team, + entity_id: str, + token: Optional[str], + ssl: Optional[SSLContext], + approval_authorization: ApprovalPolicy, + rejection_note_mode: RejectionNoteMode, + streaming_config: StreamingConfig, +) -> Dict[str, Any]: + from slack_sdk.web.async_client import AsyncWebClient + + actions = payload.get("actions") or [] + if not actions: + return {"ok": True} + action = actions[0] + action_id = action.get("action_id") + + try: + action_value = decode_action_value(action.get("value") or "{}") + except json.JSONDecodeError: + log_warning(f"Slack action {action_id} had malformed value") + return {"ok": True} + + # entity_id mismatch could indicate a stale button from a different agent + # mounted at the same /interactions endpoint. Silently drop. + if action_value.get("e") != entity_id: + return {"ok": True} + + log_info(f"[slack_hitl] block_actions action_id={action_id} requirement_id={action_value.get('q')}") + + modal_openers = { + ACTION_REJECT_WITH_REASON, + ACTION_PROVIDE_INPUT, + ACTION_ANSWER_FEEDBACK, + ACTION_SUBMIT_RESULT, + } + if action_id in modal_openers: + client = AsyncWebClient(token=token, ssl=ssl) + await _open_modal_for_row( + client=client, + action_id=action_id, + action_value=action_value, + trigger_id=payload.get("trigger_id") or "", + entity=entity, + ) + return {"ok": True} + + background_tasks.add_task( + _handle_resolution_action, + action_id=action_id, + action_value=action_value, + payload=payload, + entity=entity, + entity_id=entity_id, + token=token, + ssl=ssl, + approval_authorization=approval_authorization, + rejection_note_mode=rejection_note_mode, + streaming_config=streaming_config, + ) + return {"ok": True} + + +async def _open_modal_for_row( + *, + client: "AsyncWebClient", + action_id: str, + action_value: Dict[str, Any], + trigger_id: str, + entity: Agent | Team, +) -> None: + if not trigger_id: + log_error("Missing trigger_id on modal-open action") + return + + approval_id = action_value.get("a", "") + requirement_id = action_value.get("q", "") + tool_name = action_value.get("t") or "tool" + + # For reject-with-reason, no schema lookup needed — the modal is just a text input. + if action_id == ACTION_REJECT_WITH_REASON: + view = reject_reason_modal_view( + entity_id=action_value.get("e", ""), + session_id=action_value.get("s", ""), + run_id=action_value.get("r", ""), + approval_id=approval_id, + tool_name=tool_name, + requirement_id=requirement_id, + ) + await _open_view_safely(client, trigger_id, view, approval_id) + return + + # Other modals need the requirement schema, which lives on the approval row. + approval = await call_db(getattr(entity, "db", None), "get_approval", approval_id) + if approval is None: + log_warning(f"Approval {approval_id} not found when opening modal") + return + + req = _find_requirement_in_approval(approval, requirement_id) + if req is None: + log_warning(f"Requirement {requirement_id} not found on approval {approval_id}") + return + + tool_exec = req.get("tool_execution") or {} + + if action_id == ACTION_PROVIDE_INPUT: + view = user_input_modal_view( + entity_id=action_value.get("e", ""), + session_id=action_value.get("s", ""), + run_id=action_value.get("r", ""), + approval_id=approval_id, + requirement_id=requirement_id, + tool_name=tool_exec.get("tool_name") or tool_name, + user_input_schema=tool_exec.get("user_input_schema") or [], + ) + elif action_id == ACTION_ANSWER_FEEDBACK: + view = user_feedback_modal_view( + entity_id=action_value.get("e", ""), + session_id=action_value.get("s", ""), + run_id=action_value.get("r", ""), + approval_id=approval_id, + requirement_id=requirement_id, + tool_name=tool_exec.get("tool_name") or tool_name, + user_feedback_schema=tool_exec.get("user_feedback_schema") or [], + ) + elif action_id == ACTION_SUBMIT_RESULT: + view = external_execution_modal_view( + entity_id=action_value.get("e", ""), + session_id=action_value.get("s", ""), + run_id=action_value.get("r", ""), + approval_id=approval_id, + requirement_id=requirement_id, + tool_name=tool_exec.get("tool_name") or tool_name, + ) + else: + log_warning(f"Unknown modal-opening action_id: {action_id}") + return + + await _open_view_safely(client, trigger_id, view, approval_id) + + +async def _open_view_safely(client: "AsyncWebClient", trigger_id: str, view: Dict[str, Any], approval_id: str) -> None: + try: + await client.views_open(trigger_id=trigger_id, view=view) + except Exception: + log_exception(f"views_open failed for approval {approval_id}") + + +async def _handle_resolution_action( + *, + action_id: str, + action_value: Dict[str, Any], + payload: Dict[str, Any], + entity: Agent | Team, + entity_id: str, + token: Optional[str], + ssl: Optional[SSLContext], + approval_authorization: ApprovalPolicy, + rejection_note_mode: RejectionNoteMode, + streaming_config: StreamingConfig, +) -> None: + from slack_sdk.web.async_client import AsyncWebClient + + client = AsyncWebClient(token=token, ssl=ssl) + actor_slack_user_id = (payload.get("user") or {}).get("id", "") + channel_id = (payload.get("channel") or {}).get("id", "") + message_ts = (payload.get("message") or {}).get("ts", "") + + approval_id = action_value.get("a", "") + requirement_id = action_value.get("q", "") + + approval = await _load_and_authorize( + entity=entity, + approval_id=approval_id, + actor_slack_user_id=actor_slack_user_id, + channel_id=channel_id, + client=client, + approval_authorization=approval_authorization, + ) + if approval is None: + return + + if action_id == ACTION_APPROVE: + if not requirement_id: + log_warning("approve action missing requirement_id") + return + await _resolve_one_row_and_maybe_continue( + entity=entity, + entity_id=entity_id, + client=client, + approval=approval, + requirement_id=requirement_id, + row_update={"status": "approved", "by": actor_slack_user_id, "at": int(time.time())}, + actor_slack_user_id=actor_slack_user_id, + channel_id=channel_id, + message_ts=message_ts, + rejection_note_mode=rejection_note_mode, + streaming_config=streaming_config, + ) + return + + if action_id == ACTION_REJECT_QUICK: + if not requirement_id: + log_warning("reject_quick action missing requirement_id") + return + note = f"{_FALLBACK_REJECT_NOTE} by <@{actor_slack_user_id}>" if actor_slack_user_id else _FALLBACK_REJECT_NOTE + await _resolve_one_row_and_maybe_continue( + entity=entity, + entity_id=entity_id, + client=client, + approval=approval, + requirement_id=requirement_id, + row_update={"status": "rejected", "by": actor_slack_user_id, "at": int(time.time()), "note": note}, + actor_slack_user_id=actor_slack_user_id, + channel_id=channel_id, + message_ts=message_ts, + rejection_note_mode=rejection_note_mode, + streaming_config=streaming_config, + ) + return + + log_debug(f"Unhandled resolution action_id: {action_id}") + + +# ============================================================================= +# view_submission routing +# ============================================================================= + + +async def _route_view_submission( + *, + payload: Dict[str, Any], + background_tasks: BackgroundTasks, + entity: Agent | Team, + entity_id: str, + token: Optional[str], + ssl: Optional[SSLContext], + approval_authorization: ApprovalPolicy, + rejection_note_mode: RejectionNoteMode, + streaming_config: StreamingConfig, +) -> Dict[str, Any]: + view = payload.get("view") or {} + callback_id = view.get("callback_id") + + try: + meta = json.loads(view.get("private_metadata") or "{}") + except json.JSONDecodeError: + log_warning("view_submission has invalid private_metadata JSON") + return {} + + if meta.get("e") != entity_id: + return {} + + approval_id = meta.get("a", "") + requirement_id = meta.get("q", "") + state_values = (view.get("state") or {}).get("values") or {} + actor_slack_user_id = (payload.get("user") or {}).get("id", "") + + try: + if callback_id == VIEW_REJECT_MODAL: + row_update = _extract_reject_row(state_values, actor_slack_user_id) + elif callback_id == VIEW_USER_INPUT_MODAL: + row_update = await _extract_user_input_row( + entity, approval_id, requirement_id, state_values, actor_slack_user_id + ) + elif callback_id == VIEW_USER_FEEDBACK_MODAL: + row_update = await _extract_user_feedback_row( + entity, approval_id, requirement_id, state_values, actor_slack_user_id + ) + elif callback_id == VIEW_EXTERNAL_EXECUTION_MODAL: + row_update = _extract_external_execution_row(state_values, actor_slack_user_id) + else: + log_debug(f"Unknown view callback_id: {callback_id}") + return {} + except _ValidationError as ve: + return {"response_action": "errors", "errors": ve.errors} + + background_tasks.add_task( + _apply_modal_resolution_and_maybe_continue, + approval_id=approval_id, + requirement_id=requirement_id, + row_update=row_update, + actor_slack_user_id=actor_slack_user_id, + entity=entity, + entity_id=entity_id, + token=token, + ssl=ssl, + approval_authorization=approval_authorization, + rejection_note_mode=rejection_note_mode, + streaming_config=streaming_config, + ) + return {} + + +class _ValidationError(Exception): + def __init__(self, errors: Dict[str, str]): + super().__init__("modal validation failed") + self.errors = errors + + +def _extract_reject_row(state_values: Dict[str, Any], actor_slack_user_id: str) -> Dict[str, Any]: + reason = (state_values.get("rejection_reason") or {}).get("reason_input") or {} + note = (reason.get("value") or "").strip() or None + note_with_user = ( + f"{note} (by <@{actor_slack_user_id}>)" if note else f"{_FALLBACK_REJECT_NOTE} by <@{actor_slack_user_id}>" + ) + return {"status": "rejected", "by": actor_slack_user_id, "at": int(time.time()), "note": note_with_user} + + +async def _extract_user_input_row( + entity: Agent | Team, + approval_id: str, + requirement_id: str, + state_values: Dict[str, Any], + actor_slack_user_id: str, +) -> Dict[str, Any]: + schema = await _load_modal_tool_schema( + entity=entity, + approval_id=approval_id, + requirement_id=requirement_id, + schema_key="user_input_schema", + ) + values: Dict[str, Any] = {} + errors: Dict[str, str] = {} + + for i, field in enumerate(schema): + block_id = f"f_{i}" + field_name = field.get("name") or f"field_{i}" + field_type = field.get("field_type") or "str" + block_state = state_values.get(block_id) or {} + element = block_state.get("value") or {} + + if field_type == "bool": + selected = (element.get("selected_option") or {}).get("value") + if selected not in {"true", "false"}: + errors[block_id] = "Please choose True or False" + continue + values[field_name] = selected == "true" + continue + + raw = element.get("value") + if raw is None or (isinstance(raw, str) and not raw.strip()): + errors[block_id] = "Value required" + continue + + try: + values[field_name] = _coerce_scalar(raw, field_type) + except ValueError as e: + errors[block_id] = str(e) + + if errors: + raise _ValidationError(errors) + + return {"status": "resolved", "by": actor_slack_user_id, "at": int(time.time()), "values": values} + + +async def _load_modal_tool_schema( + *, + entity: Agent | Team, + approval_id: str, + requirement_id: str, + schema_key: str, +) -> List[Dict[str, Any]]: + # Shared approval+requirement+schema lookup for view_submission extractors + # (user_input + user_feedback both need the same three-step walk). + approval = await call_db(getattr(entity, "db", None), "get_approval", approval_id) + if approval is None: + raise _ValidationError({"__general__": "Approval no longer exists"}) + requirement = _find_requirement_in_approval(approval, requirement_id) + if requirement is None: + raise _ValidationError({"__general__": "Requirement not found"}) + return (requirement.get("tool_execution") or {}).get(schema_key) or [] + + +def _coerce_scalar(raw: str, field_type: str) -> Any: + if field_type == "str": + return raw + if field_type == "int": + try: + return int(raw.strip()) + except ValueError as exc: + raise ValueError("Must be a whole number") from exc + if field_type == "float": + try: + return float(raw.strip()) + except ValueError as exc: + raise ValueError("Must be a number") from exc + if field_type == "list": + try: + parsed = json.loads(raw) + except json.JSONDecodeError as exc: + raise ValueError("Must be valid JSON (e.g. [1, 2, 3])") from exc + if not isinstance(parsed, list): + raise ValueError("Must be a JSON array") + return parsed + if field_type == "dict": + try: + parsed = json.loads(raw) + except json.JSONDecodeError as exc: + raise ValueError('Must be valid JSON (e.g. {"k": "v"})') from exc + if not isinstance(parsed, dict): + raise ValueError("Must be a JSON object") + return parsed + # Unknown type — keep as string rather than crash. + return raw + + +async def _extract_user_feedback_row( + entity: Agent | Team, + approval_id: str, + requirement_id: str, + state_values: Dict[str, Any], + actor_slack_user_id: str, +) -> Dict[str, Any]: + schema = await _load_modal_tool_schema( + entity=entity, + approval_id=approval_id, + requirement_id=requirement_id, + schema_key="user_feedback_schema", + ) + selections: Dict[str, List[str]] = {} + errors: Dict[str, str] = {} + + for i, question in enumerate(schema): + block_id = f"q_{i}" + q_text = question.get("question") or f"Question {i + 1}" + multi = bool(question.get("multi_select")) + block_state = state_values.get(block_id) or {} + element = block_state.get("value") or {} + + if multi: + raw_options = element.get("selected_options") or [] + picked = [o.get("value", "") for o in raw_options if o.get("value")] + else: + raw = element.get("selected_option") + picked = [raw.get("value", "")] if raw and raw.get("value") else [] + + if not picked: + errors[block_id] = "Please answer" + continue + selections[q_text] = picked + + if errors: + raise _ValidationError(errors) + + return {"status": "resolved", "by": actor_slack_user_id, "at": int(time.time()), "selections": selections} + + +def _extract_external_execution_row(state_values: Dict[str, Any], actor_slack_user_id: str) -> Dict[str, Any]: + block_state = state_values.get("result") or {} + element = block_state.get("value") or {} + raw = (element.get("value") or "").strip() + if not raw: + raise _ValidationError({"result": "Result required"}) + return {"status": "resolved", "by": actor_slack_user_id, "at": int(time.time()), "result": raw} + + +async def _apply_modal_resolution_and_maybe_continue( + *, + approval_id: str, + requirement_id: str, + row_update: Dict[str, Any], + actor_slack_user_id: str, + entity: Agent | Team, + entity_id: str, + token: Optional[str], + ssl: Optional[SSLContext], + approval_authorization: ApprovalPolicy, + rejection_note_mode: RejectionNoteMode, + streaming_config: StreamingConfig, +) -> None: + from slack_sdk.web.async_client import AsyncWebClient + + client = AsyncWebClient(token=token, ssl=ssl) + + approval = await call_db(getattr(entity, "db", None), "get_approval", approval_id) + if approval is None: + log_warning(f"Approval {approval_id} missing when applying modal resolution") + return + + slack_meta = get_slack_meta(approval) + channel_id = slack_meta.get("channel_id", "") + message_ts = slack_meta.get("message_ts", "") + + authorized = await check_approval_authorization(approval_authorization, actor_slack_user_id, approval, client) + if not authorized: + await _post_ephemeral_safely(client, channel_id, actor_slack_user_id, _GENERIC_DENIAL) + return + + await _resolve_one_row_and_maybe_continue( + entity=entity, + entity_id=entity_id, + client=client, + approval=approval, + requirement_id=requirement_id, + row_update=row_update, + actor_slack_user_id=actor_slack_user_id, + channel_id=channel_id, + message_ts=message_ts, + rejection_note_mode=rejection_note_mode, + streaming_config=streaming_config, + ) + + +async def _resolve_one_row_and_maybe_continue( + *, + entity: Agent | Team, + entity_id: str, + client: "AsyncWebClient", + approval: Dict[str, Any], + requirement_id: str, + row_update: Dict[str, Any], + actor_slack_user_id: str, + channel_id: str, + message_ts: str, + rejection_note_mode: RejectionNoteMode, + streaming_config: StreamingConfig, +) -> None: + updated = await _write_row_resolution_cas( + entity=entity, + approval=approval, + requirement_id=requirement_id, + row_update=row_update, + ) + + if updated is None: + await _post_ephemeral_safely(client, channel_id, actor_slack_user_id, _STALE_TEXT) + return + + # Always refresh the pause message so the user sees their row resolved. + await _rerender_pause_message( + client=client, + entity_id=entity_id, + approval=updated, + channel_id=channel_id, + message_ts=message_ts, + rejection_note_mode=rejection_note_mode, + ) + + # Check aggregate — continue only when no rows are pending. + resolutions = (updated.get("resolution_data") or {}).get("requirement_resolutions") or {} + aggregate = compute_aggregate_status(resolutions) + if aggregate is None: + return # still rows pending + + await _continue_run_and_finalize( + entity=entity, + entity_id=entity_id, + client=client, + approval=updated, + aggregate_status=aggregate, + actor_slack_user_id=actor_slack_user_id, + channel_id=channel_id, + message_ts=message_ts, + rejection_note_mode=rejection_note_mode, + streaming_config=streaming_config, + ) + + +async def _write_row_resolution_cas( + *, + entity: Agent | Team, + approval: Dict[str, Any], + requirement_id: str, + row_update: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + existing_res = (approval.get("resolution_data") or {}).get("requirement_resolutions") or {} + existing_res = initialize_requirement_resolutions(approval, existing_res) + + # Refuse to overwrite a row that's already decided — would clobber the + # other user's vote. Equivalent to CAS miss from the user's perspective. + current_row = existing_res.get(requirement_id) or {"status": "pending"} + if current_row.get("status") != "pending": + return None + + new_res = write_row_resolution(existing_res, requirement_id, row_update) + return await _cas_write_resolution( + entity=entity, + approval_id=approval["id"], + new_resolutions=new_res, + base_resolution_data=approval.get("resolution_data") or {}, + ) + + +async def _cas_write_resolution( + *, + entity: Agent | Team, + approval_id: str, + new_resolutions: Dict[str, Dict[str, Any]], + base_resolution_data: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + new_resolution_data = dict(base_resolution_data) + new_resolution_data["requirement_resolutions"] = new_resolutions + return await call_db( + getattr(entity, "db", None), + "update_approval", + approval_id, + expected_status="pending", + resolution_data=new_resolution_data, + ) + + +def _hydrate_requirements(raw: List[Any]) -> List[RunRequirement]: + """Coerce session-serialized requirement dicts back into RunRequirement + instances. DB round-trips of older sessions sometimes yield dicts.""" + hydrated: List[RunRequirement] = [] + for req in raw or []: + if isinstance(req, RunRequirement): + hydrated.append(req) + elif isinstance(req, dict): + try: + hydrated.append(RunRequirement.from_dict(req)) + except Exception: + log_warning("Failed to hydrate requirement dict; dropping it") + else: + log_warning(f"Unexpected requirement type {type(req).__name__}; dropping it") + return hydrated + + +async def _stream_continuation( + *, + entity: Agent | Team, + client: "AsyncWebClient", + run_id: str, + session_id: str, + hydrated: List[RunRequirement], + streaming_config: StreamingConfig, + channel_id: str, + thread_ts: str, + requester_user_id: str, + team_id: Optional[str], +) -> Optional[StreamState]: + """Run acontinue_run as a streaming response into the Slack thread, reusing + the same ``chat_stream`` + ``process_event`` pipeline as the initial run. + + Returns the populated ``StreamState`` on success, or ``None`` when the + continuation raised — in which case the stream, "Thinking..." status, and + task cards are all cleaned up before returning. + """ + state = StreamState( + entity_type=streaming_config.entity_type, + entity_name=streaming_config.entity_name, + ) + stream = None + try: + # Non-assistant threads don't support setStatus — best-effort. + try: + await client.assistant_threads_setStatus(channel_id=channel_id, thread_ts=thread_ts, status="Thinking...") + except Exception: + pass + + response_stream = entity.acontinue_run( + run_id=run_id, + session_id=session_id, + requirements=hydrated, + stream=True, + stream_events=True, + ) + + stream = await client.chat_stream( + channel=channel_id, + thread_ts=thread_ts, + recipient_team_id=team_id, + recipient_user_id=requester_user_id, + task_display_mode=streaming_config.task_display_mode, + buffer_size=streaming_config.buffer_size, + ) + + async for chunk in response_stream: + state.collect_media(chunk) # type: ignore[arg-type] + + ev = getattr(chunk, "event", None) + if ev and await process_event(ev, chunk, state, stream): # type: ignore[arg-type] + break + + if state.has_content(): + content = state.flush() + content_len = len(content) + # Rare continuation overruns: skip the rest rather than rotate. + if state.stream_chars_sent + content_len <= STREAM_CHAR_LIMIT: + await stream.append(markdown_text=content) + state.stream_chars_sent += content_len + else: + log_warning(f"Continuation exceeded STREAM_CHAR_LIMIT; truncating run {run_id}") + break + + final_status = state.terminal_status or "complete" + completion_chunks = state.resolve_all_pending(final_status) if state.task_cards else [] + stop_kwargs: Dict[str, Any] = {} + if state.has_content(): + stop_kwargs["markdown_text"] = state.flush() + if completion_chunks: + stop_kwargs["chunks"] = completion_chunks + + # Nested-pause before any content: don't open an empty bubble above + # the new pause UI (mirrors router.py's same guard). + stream_opened = getattr(stream, "_stream_ts", None) is not None + if state.paused_event is not None and not stop_kwargs and not stream_opened: + try: + await client.assistant_threads_setStatus(channel_id=channel_id, thread_ts=thread_ts, status="") + except Exception: + pass + else: + await stream.stop(**stop_kwargs) # type: ignore[arg-type] + + await upload_response_media_async(client, state, channel_id, thread_ts) + return state + + except Exception: + log_exception(f"acontinue_run streaming failed for run {run_id}") + try: + await client.assistant_threads_setStatus(channel_id=channel_id, thread_ts=thread_ts, status="") + except Exception: + pass + if stream is not None: + try: + err_chunks = state.resolve_all_pending("error") if state.task_cards else [] + await stream.stop(**({"chunks": err_chunks} if err_chunks else {})) # type: ignore[arg-type] + except Exception: + pass + return None + + +async def _finalize_approval_status( + *, + entity: Agent | Team, + approval: Dict[str, Any], + aggregate_status: str, + actor_slack_user_id: str, + client: "AsyncWebClient", + channel_id: str, + message_ts: str, +) -> None: + """Flip approval.status to the aggregate after the stream closed cleanly + and refresh the pause card with the resolved-state summary.""" + finalized = await call_db( + getattr(entity, "db", None), + "update_approval", + approval["id"], + expected_status="pending", + status=aggregate_status, + resolved_by=actor_slack_user_id or None, + resolved_at=int(time.time()), + ) + render_approval = finalized or approval + resolutions_final = (render_approval.get("resolution_data") or {}).get("requirement_resolutions") or {} + await _safe_chat_update( + client, + channel=channel_id, + ts=message_ts, + blocks=pause_resolved_message_blocks( + aggregate_status=aggregate_status, # type: ignore[arg-type] + requirements=render_approval.get("requirements") or [], + requirement_resolutions=resolutions_final, + ), + ) + + +async def _render_nested_pause( + *, + entity: Agent | Team, + entity_id: str, + client: "AsyncWebClient", + state: StreamState, + fallback_run_id: str, + fallback_session_id: str, + channel_id: str, + thread_ts: str, + requester_user_id: str, + team_id: Optional[str], + rejection_note_mode: RejectionNoteMode, +) -> None: + from agno.os.interfaces.slack.pause_handler import render_pause_ui + + paused = state.paused_event + if paused is None: + return + await render_pause_ui( + entity=entity, + entity_id=entity_id, + client=client, + run_id=getattr(paused, "run_id", "") or fallback_run_id, + session_id=getattr(paused, "session_id", None) or fallback_session_id, + channel_id=channel_id, + thread_ts=thread_ts, + requester_slack_user_id=requester_user_id, + team_id=team_id, + rejection_note_mode=rejection_note_mode, + ) + + +async def _continue_run_and_finalize( + *, + entity: Agent | Team, + entity_id: str, + client: "AsyncWebClient", + approval: Dict[str, Any], + aggregate_status: str, + actor_slack_user_id: str, + channel_id: str, + message_ts: str, + rejection_note_mode: RejectionNoteMode, + streaming_config: StreamingConfig, +) -> None: + run_id = approval.get("run_id") or "" + session_id = approval.get("session_id") or "" + slack_meta = get_slack_meta(approval) + thread_ts = slack_meta.get("thread_ts", "") + requester_user_id = slack_meta.get("requester_slack_user_id", "") + team_id = slack_meta.get("team_id") + + log_info(f"[slack_hitl] continue run_id={run_id} aggregate={aggregate_status}") + + session = await entity.aget_session(session_id=session_id) + paused_run = session.get_run(run_id) if session is not None else None + if paused_run is None: + log_error(f"Paused run {run_id} not found on session {session_id}") + await _post_ephemeral_safely(client, channel_id, actor_slack_user_id, _CONTINUE_FAILED) + return + + # Workflow runs lack `requirements`; hitl_enabled is gated to Agent/Team. + hydrated = _hydrate_requirements(list(paused_run.requirements or [])) # type: ignore[union-attr] + resolutions = (approval.get("resolution_data") or {}).get("requirement_resolutions") or {} + try: + apply_resolutions_to_requirements(hydrated, resolutions) + except UnresolvedRequirementError as e: + log_error(f"Cannot apply resolutions for run {run_id}: {e}") + await _post_ephemeral_safely(client, channel_id, actor_slack_user_id, _CONTINUE_FAILED) + return + + state = await _stream_continuation( + entity=entity, + client=client, + run_id=run_id, + session_id=session_id, + hydrated=hydrated, + streaming_config=streaming_config, + channel_id=channel_id, + thread_ts=thread_ts, + requester_user_id=requester_user_id, + team_id=team_id, + ) + if state is None: + await _post_ephemeral_safely(client, channel_id, actor_slack_user_id, _CONTINUE_FAILED) + return + + await _finalize_approval_status( + entity=entity, + approval=approval, + aggregate_status=aggregate_status, + actor_slack_user_id=actor_slack_user_id, + client=client, + channel_id=channel_id, + message_ts=message_ts, + ) + + await _render_nested_pause( + entity=entity, + entity_id=entity_id, + client=client, + state=state, + fallback_run_id=run_id, + fallback_session_id=session_id, + channel_id=channel_id, + thread_ts=thread_ts, + requester_user_id=requester_user_id, + team_id=team_id, + rejection_note_mode=rejection_note_mode, + ) + + +async def _rerender_pause_message( + *, + client: "AsyncWebClient", + entity_id: str, + approval: Dict[str, Any], + channel_id: str, + message_ts: str, + rejection_note_mode: RejectionNoteMode, +) -> None: + resolutions = (approval.get("resolution_data") or {}).get("requirement_resolutions") or {} + await _safe_chat_update( + client, + channel=channel_id, + ts=message_ts, + blocks=pause_request_blocks( + entity_id=entity_id, + session_id=approval.get("session_id") or "", + run_id=approval.get("run_id") or "", + approval_id=approval["id"], + requirements=approval.get("requirements") or [], + requirement_resolutions=resolutions, + rejection_note_mode=rejection_note_mode, + ), + ) + + +async def _load_and_authorize( + *, + entity: Agent | Team, + approval_id: str, + actor_slack_user_id: str, + channel_id: str, + client: "AsyncWebClient", + approval_authorization: ApprovalPolicy, +) -> Optional[Dict[str, Any]]: + db = getattr(entity, "db", None) + approval = await call_db(db, "get_approval", approval_id) + if approval is None: + await _post_ephemeral_safely(client, channel_id, actor_slack_user_id, _MISSING_APPROVAL) + return None + + authorized = await check_approval_authorization(approval_authorization, actor_slack_user_id, approval, client) + if not authorized: + await _post_ephemeral_safely(client, channel_id, actor_slack_user_id, _GENERIC_DENIAL) + log_warning( + f"unauthorized_approval_attempt approval_id={approval_id} actor_slack_user_id={actor_slack_user_id} " + f"policy={approval_authorization!r}" + ) + return None + return approval + + +def _find_requirement_in_approval(approval: Dict[str, Any], requirement_id: str) -> Optional[Dict[str, Any]]: + for req in approval.get("requirements") or []: + if isinstance(req, dict) and req.get("id") == requirement_id: + return req + return None + + +async def _safe_chat_update(client: "AsyncWebClient", *, channel: str, ts: str, blocks: List[Dict[str, Any]]) -> None: + if not (channel and ts): + return + try: + await client.chat_update(channel=channel, ts=ts, blocks=blocks, text="Approval update") + except Exception as exc: + # chat.update failure is cosmetic; DB is source of truth. + slack_err = getattr(getattr(exc, "response", None), "data", None) or {} + log_warning(f"chat.update failed for channel={channel} ts={ts}: {slack_err.get('error', exc)}") + + +async def _post_ephemeral_safely(client: "AsyncWebClient", channel: str, user: str, text: str) -> None: + if not (channel and user): + return + try: + await client.chat_postEphemeral(channel=channel, user=user, text=text) + except Exception: + log_warning(f"chat.postEphemeral failed for channel={channel} user={user}") diff --git a/libs/agno/agno/os/interfaces/slack/pause_handler.py b/libs/agno/agno/os/interfaces/slack/pause_handler.py new file mode 100644 index 0000000000..62b72f8e4b --- /dev/null +++ b/libs/agno/agno/os/interfaces/slack/pause_handler.py @@ -0,0 +1,275 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from agno.os.interfaces.slack.authorization import call_db +from agno.os.interfaces.slack.blocks import ( + RejectionNoteMode, + pause_request_blocks, + pause_resolved_message_blocks, +) +from agno.os.interfaces.slack.resolutions import ( + compute_aggregate_status, + initialize_requirement_resolutions, +) +from agno.utils.log import log_error, log_warning + +if TYPE_CHECKING: + from slack_sdk.web.async_client import AsyncWebClient + + +async def render_pause_ui( + *, + entity: Any, + entity_id: str, + client: "AsyncWebClient", + run_id: str, + session_id: str, + channel_id: str, + thread_ts: str, + requester_slack_user_id: str, + team_id: Optional[str], + rejection_note_mode: RejectionNoteMode, +) -> None: + approval = await _get_pending_approval_for_run(entity, run_id=run_id) + if approval is None: + log_warning( + f"Run {run_id} paused but no 'required' approval record found. " + "Slack HITL currently supports @approval(type='required') only." + ) + return + + requirements = approval.get("requirements") or [] + if not requirements: + log_error(f"Approval {approval['id']} has no requirements — cannot render UI") + return + + reserved = await _reserve_slack_delivery( + entity=entity, + approval=approval, + channel_id=channel_id, + thread_ts=thread_ts, + requester_slack_user_id=requester_slack_user_id, + team_id=team_id, + ) + if reserved is None: + return + + message_ts = await _post_pause_message( + client=client, + entity_id=entity_id, + session_id=session_id, + run_id=run_id, + approval=approval, + reserved_resolution_data=reserved, + requirements=requirements, + rejection_note_mode=rejection_note_mode, + channel_id=channel_id, + thread_ts=thread_ts, + requester_slack_user_id=requester_slack_user_id, + ) + if message_ts is None: + return + + patched = await _patch_message_ts( + entity=entity, + approval_id=approval["id"], + reserved_resolution_data=reserved, + message_ts=message_ts, + ) + if patched is None: + await _replace_stale_pause_message( + client=client, + entity=entity, + approval_id=approval["id"], + message_ts=message_ts, + channel_id=channel_id, + fallback_requirements=requirements, + ) + + +async def _reserve_slack_delivery( + *, + entity: Any, + approval: Dict[str, Any], + channel_id: str, + thread_ts: str, + requester_slack_user_id: str, + team_id: Optional[str], +) -> Optional[Dict[str, Any]]: + # CAS-write Slack metadata into resolution_data BEFORE posting so that + # authorization (which reads slack meta) works even if chat.postMessage + # fails. Returns the resolution_data we wrote, or None if CAS missed + # (approval was resolved by a concurrent write). + reserved_resolution_data = _build_reserved_resolution_data( + approval, + channel_id=channel_id, + thread_ts=thread_ts, + requester_slack_user_id=requester_slack_user_id, + team_id=team_id, + ) + result = await call_db( + getattr(entity, "db", None), + "update_approval", + approval["id"], + expected_status="pending", + resolution_data=reserved_resolution_data, + ) + if result is None: + log_warning(f"Approval {approval['id']} was resolved before UI could render; skipping post") + return None + return reserved_resolution_data + + +async def _post_pause_message( + *, + client: "AsyncWebClient", + entity_id: str, + session_id: str, + run_id: str, + approval: Dict[str, Any], + reserved_resolution_data: Dict[str, Any], + requirements: List[Dict[str, Any]], + rejection_note_mode: RejectionNoteMode, + channel_id: str, + thread_ts: str, + requester_slack_user_id: str, +) -> Optional[str]: + blocks = pause_request_blocks( + entity_id=entity_id, + session_id=session_id, + run_id=run_id, + approval_id=approval["id"], + requirements=requirements, + requirement_resolutions=reserved_resolution_data["requirement_resolutions"], + rejection_note_mode=rejection_note_mode, + ) + fallback = f"Approval required for {len(requirements)} action{'s' if len(requirements) != 1 else ''}" + try: + post_resp = await client.chat_postMessage( + channel=channel_id, + thread_ts=thread_ts, + blocks=blocks, + text=fallback, + ) + except Exception: + log_error(f"Failed to post pause UI for approval {approval['id']}") + try: + await client.chat_postEphemeral( + channel=channel_id, + user=requester_slack_user_id, + text=( + "A tool is waiting for approval but the Slack approval message could not be posted. " + "Resolve via the dashboard or resend the request." + ), + ) + except Exception: + pass + return None + + # AsyncSlackResponse exposes `ts` via either dict access or .data mapping. + message_ts = post_resp.get("ts") if hasattr(post_resp, "get") else getattr(post_resp, "data", {}).get("ts") + if not message_ts: + log_warning(f"chat.postMessage returned no ts for approval {approval['id']}") + return None + return message_ts + + +async def _patch_message_ts( + *, + entity: Any, + approval_id: str, + reserved_resolution_data: Dict[str, Any], + message_ts: str, +) -> Optional[Dict[str, Any]]: + # Store the posted message's ts on the approval so subsequent chat.update + # calls know which message to edit. CAS guards against the approval being + # resolved during the chat.postMessage window. + message_ts_resolution_data = dict(reserved_resolution_data) + interface_meta = dict(message_ts_resolution_data.get("interface") or {}) + slack_meta = dict(interface_meta.get("slack") or {}) + slack_meta["message_ts"] = message_ts + interface_meta["slack"] = slack_meta + message_ts_resolution_data["interface"] = interface_meta + return await call_db( + getattr(entity, "db", None), + "update_approval", + approval_id, + expected_status="pending", + resolution_data=message_ts_resolution_data, + ) + + +async def _replace_stale_pause_message( + *, + client: "AsyncWebClient", + entity: Any, + approval_id: str, + message_ts: str, + channel_id: str, + fallback_requirements: List[Dict[str, Any]], +) -> None: + # The approval resolved during the chat.postMessage window so the buttons + # we just posted are stale. Replace them with the resolved summary so + # users don't click pointlessly. + latest = await call_db(getattr(entity, "db", None), "get_approval", approval_id) or {} + latest_resolutions = (latest.get("resolution_data") or {}).get("requirement_resolutions") or {} + aggregate = compute_aggregate_status(latest_resolutions) or latest.get("status", "approved") + try: + await client.chat_update( + channel=channel_id, + ts=message_ts, + blocks=pause_resolved_message_blocks( + aggregate_status=aggregate, # type: ignore[arg-type] + requirements=latest.get("requirements") or fallback_requirements, + requirement_resolutions=latest_resolutions, + ), + text="Approval update", + ) + except Exception: + log_warning(f"Failed to replace buttons after race on approval {approval_id}") + + +def _build_reserved_resolution_data( + approval: Dict[str, Any], + *, + channel_id: str, + thread_ts: str, + requester_slack_user_id: str, + team_id: Optional[str], +) -> Dict[str, Any]: + resolution_data: Dict[str, Any] = dict(approval.get("resolution_data") or {}) + interface_meta = dict(resolution_data.get("interface") or {}) + interface_meta["slack"] = { + "channel_id": channel_id, + "thread_ts": thread_ts, + "message_ts": None, + "requester_slack_user_id": requester_slack_user_id, + "team_id": team_id, + } + resolution_data["interface"] = interface_meta + existing_res = resolution_data.get("requirement_resolutions") or {} + resolution_data["requirement_resolutions"] = initialize_requirement_resolutions(approval, existing_res) + return resolution_data + + +async def _get_pending_approval_for_run(entity: Any, run_id: str) -> Optional[Dict[str, Any]]: + db = getattr(entity, "db", None) + if db is None or not run_id: + return None + try: + approvals, _ = await call_db( + db, + "get_approvals", + run_id=run_id, + status="pending", + approval_type="required", + ) + except Exception: + log_error(f"Failed to query approvals for run {run_id}") + return None + if not approvals: + return None + # Most recent pending approval wins — DB adapters may return in insertion + # order, which would surface a stale one first on multi-pause runs. + return max(approvals, key=lambda approval: approval.get("created_at") or 0) diff --git a/libs/agno/agno/os/interfaces/slack/resolutions.py b/libs/agno/agno/os/interfaces/slack/resolutions.py new file mode 100644 index 0000000000..3ed3642af6 --- /dev/null +++ b/libs/agno/agno/os/interfaces/slack/resolutions.py @@ -0,0 +1,167 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Literal, Optional + +from agno.run.requirement import RunRequirement +from agno.utils.log import log_warning + +# Narrower than core's pause_type (which conflates user_input and user_feedback). +# Slack needs the 4-way split so it can render the right row UI / modal. +RequirementKind = Literal["confirmation", "user_input", "user_feedback", "external_execution"] + +REQ_CONFIRMATION: RequirementKind = "confirmation" +REQ_USER_INPUT: RequirementKind = "user_input" +REQ_USER_FEEDBACK: RequirementKind = "user_feedback" +REQ_EXTERNAL_EXECUTION: RequirementKind = "external_execution" + + +def _classify_tool_flags( + *, + user_feedback_schema: Any, + requires_user_input: bool, + external_execution_required: bool, +) -> RequirementKind: + if user_feedback_schema: + return REQ_USER_FEEDBACK + if requires_user_input: + return REQ_USER_INPUT + if external_execution_required: + return REQ_EXTERNAL_EXECUTION + return REQ_CONFIRMATION + + +def classify_tool_execution(tool_execution: Optional[Dict[str, Any]]) -> RequirementKind: + if not tool_execution: + # A requirement without a tool_execution shouldn't happen, but if the + # DB round-trip ever loses it we fall through to confirmation so the + # user can at least approve/reject. + return REQ_CONFIRMATION + return _classify_tool_flags( + user_feedback_schema=tool_execution.get("user_feedback_schema"), + requires_user_input=bool(tool_execution.get("requires_user_input")), + external_execution_required=bool(tool_execution.get("external_execution_required")), + ) + + +def classify_requirement(requirement: RunRequirement) -> RequirementKind: + tool = requirement.tool_execution + if tool is None: + return REQ_CONFIRMATION + return _classify_tool_flags( + user_feedback_schema=getattr(tool, "user_feedback_schema", None), + requires_user_input=bool(getattr(tool, "requires_user_input", False)), + external_execution_required=bool(getattr(tool, "external_execution_required", False)), + ) + + +def initialize_requirement_resolutions( + approval: Dict[str, Any], + existing: Optional[Dict[str, Dict[str, Any]]] = None, +) -> Dict[str, Dict[str, Any]]: + if existing: + return existing + + seeded: Dict[str, Dict[str, Any]] = {} + for requirement in approval.get("requirements") or []: + if not isinstance(requirement, dict): + continue + requirement_id = requirement.get("id") + if not requirement_id: + log_warning("Skipping requirement without id during resolution seeding") + continue + seeded[requirement_id] = {"status": "pending"} + return seeded + + +def compute_aggregate_status( + resolutions: Dict[str, Dict[str, Any]], +) -> Optional[str]: + if not resolutions: + # Empty resolutions = the approval has no requirements or they + # haven't been seeded yet. Treat as pending — caller will seed. + return None + + statuses = [row.get("status", "pending") for row in resolutions.values()] + if any(s == "pending" for s in statuses): + return None + if all(s == "rejected" for s in statuses): + return "rejected" + return "approved" + + +class UnresolvedRequirementError(RuntimeError): + pass + + +def _apply_confirmation(requirement: RunRequirement, requirement_id: str, row: Dict[str, Any]) -> None: + status = row["status"] + if status == "approved": + requirement.confirm() + return + if status == "rejected": + requirement.reject(row.get("note")) + return + # "resolved" on a confirmation row is nonsensical — raise rather than + # silently flipping the tool to confirmed=True. + raise UnresolvedRequirementError(f"Confirmation requirement {requirement_id} has status={status!r}") + + +def _apply_user_input(requirement: RunRequirement, requirement_id: str, row: Dict[str, Any]) -> None: + values = row.get("values") + if not isinstance(values, dict): + raise UnresolvedRequirementError(f"user_input requirement {requirement_id} missing values dict") + requirement.provide_user_input(values) + + +def _apply_user_feedback(requirement: RunRequirement, requirement_id: str, row: Dict[str, Any]) -> None: + selections = row.get("selections") + if not isinstance(selections, dict): + raise UnresolvedRequirementError(f"user_feedback requirement {requirement_id} missing selections dict") + requirement.provide_user_feedback(selections) + + +def _apply_external_execution(requirement: RunRequirement, requirement_id: str, row: Dict[str, Any]) -> None: + result = row.get("result") + # External execution RAISES at _tools.py:533 if tool.result is None, so we + # enforce a non-empty string here instead of letting that propagate. + if not isinstance(result, str) or not result.strip(): + raise UnresolvedRequirementError(f"external_execution requirement {requirement_id} missing non-empty result") + requirement.set_external_execution_result(result) + + +_APPLY_BY_KIND: Dict[RequirementKind, Any] = { + REQ_CONFIRMATION: _apply_confirmation, + REQ_USER_INPUT: _apply_user_input, + REQ_USER_FEEDBACK: _apply_user_feedback, + REQ_EXTERNAL_EXECUTION: _apply_external_execution, +} + + +def apply_resolutions_to_requirements( + requirements: List[RunRequirement], + resolutions: Dict[str, Dict[str, Any]], +) -> None: + for requirement in requirements: + requirement_id = getattr(requirement, "id", None) + if not requirement_id: + raise UnresolvedRequirementError("Requirement missing id; cannot match to resolution") + + row = resolutions.get(requirement_id) + if row is None or row.get("status") == "pending": + raise UnresolvedRequirementError(f"Requirement {requirement_id} has no resolution (row={row!r})") + + kind = classify_requirement(requirement) + applicator = _APPLY_BY_KIND.get(kind) + if applicator is None: + raise UnresolvedRequirementError(f"Unknown requirement kind {kind!r} for {requirement_id}") + applicator(requirement, requirement_id, row) + + +def write_row_resolution( + current_resolutions: Dict[str, Dict[str, Any]], + requirement_id: str, + row: Dict[str, Any], +) -> Dict[str, Dict[str, Any]]: + merged = dict(current_resolutions or {}) + merged[requirement_id] = dict(row) + return merged diff --git a/libs/agno/agno/os/interfaces/slack/router.py b/libs/agno/agno/os/interfaces/slack/router.py index 0c946ea852..787090d199 100644 --- a/libs/agno/agno/os/interfaces/slack/router.py +++ b/libs/agno/agno/os/interfaces/slack/router.py @@ -7,6 +7,8 @@ from pydantic import BaseModel, Field from agno.agent import Agent, RemoteAgent +from agno.os.interfaces.slack.authorization import ApprovalPolicy +from agno.os.interfaces.slack.blocks import RejectionNoteMode, pending_approval_reminder_text from agno.os.interfaces.slack.events import process_event from agno.os.interfaces.slack.helpers import ( build_run_metadata, @@ -19,11 +21,14 @@ strip_bot_mention, upload_response_media_async, ) +from agno.os.interfaces.slack.interactions import StreamingConfig, attach_interaction_routes +from agno.os.interfaces.slack.pause_handler import render_pause_ui from agno.os.interfaces.slack.security import verify_slack_signature -from agno.os.interfaces.slack.state import StreamState +from agno.os.interfaces.slack.state import STREAM_CHAR_LIMIT, StreamState +from agno.run.base import RunStatus from agno.team import RemoteTeam, Team from agno.tools.slack import SlackTools -from agno.utils.log import log_error +from agno.utils.log import log_error, log_warning from agno.workflow import RemoteWorkflow, Workflow # Slack sends lifecycle events for bots with these subtypes. Without this @@ -43,8 +48,7 @@ # User-facing error message for failed requests _ERROR_MESSAGE = "Sorry, there was an error processing your message." -# Slack caps streamed messages at ~40K total payload (text + task card blocks) -_STREAM_CHAR_LIMIT = 39000 +# Card overflow triggers stream rotation; the char limit now lives in state.py. _STREAM_CARD_LIMIT = 45 @@ -73,6 +77,9 @@ def attach_routes( buffer_size: int = 100, max_file_size: int = 1_073_741_824, # 1GB resolve_user_identity: bool = False, + hitl_enabled: bool = False, + approval_authorization: ApprovalPolicy = "requester_only", + rejection_note_mode: RejectionNoteMode = "optional", ) -> APIRouter: # Inner functions capture config via closure to keep each instance isolated entity = agent or team or workflow @@ -88,6 +95,27 @@ def attach_routes( slack_tools = SlackTools(token=token, ssl=ssl, max_file_size=max_file_size) + async def _pending_approvals_block_new_run(session_id: str, channel_id: str, user_id: str, async_client) -> bool: + # True → caller should post nothing and return (thread already waiting on approval). + if not hitl_enabled: + return False + db = getattr(entity, "db", None) + if db is None: + return False + try: + approvals, _ = await db.get_approvals(session_id=session_id, status="pending", approval_type="required") + except Exception: + return False + if not approvals: + return False + try: + await async_client.chat_postEphemeral( + channel=channel_id, user=user_id, text=pending_approval_reminder_text() + ) + except Exception: + log_warning(f"Failed to post pending-approval reminder to {user_id} in {channel_id}") + return True + @router.post( "/events", operation_id=f"slack_events_{op_suffix}", @@ -165,6 +193,9 @@ async def _process_slack_event(data: dict): session_id = f"{entity_id}:{ctx['thread_id']}" async_client = AsyncWebClient(token=slack_tools.token, ssl=ssl) + if await _pending_approvals_block_new_run(session_id, ctx["channel_id"], ctx["user"], async_client): + return + try: await async_client.assistant_threads_setStatus( channel_id=ctx["channel_id"], @@ -220,6 +251,21 @@ async def _process_slack_event(data: dict): ) return + if hitl_enabled and getattr(response, "status", None) == RunStatus.paused.value: + await render_pause_ui( + entity=entity, # type: ignore[arg-type] + entity_id=entity_id, + client=async_client, + run_id=getattr(response, "run_id", "") or "", + session_id=session_id, + channel_id=ctx["channel_id"], + thread_ts=ctx["thread_id"], + requester_slack_user_id=ctx["user"], + team_id=data.get("team_id") or event.get("team"), + rejection_note_mode=rejection_note_mode, + ) + return + if hasattr(response, "reasoning_content") and response.reasoning_content: rc = str(response.reasoning_content) formatted = "*Reasoning:*\n> " + rc.replace("\n", "\n> ") @@ -280,6 +326,10 @@ async def _stream_slack_response(data: dict): user_id = ctx["user"] async_client = AsyncWebClient(token=slack_tools.token, ssl=ssl) + + if await _pending_approvals_block_new_run(session_id, ctx["channel_id"], user_id, async_client): + return + state = StreamState(entity_type=entity_type, entity_name=entity_name) stream = None @@ -411,7 +461,7 @@ async def _rotate_stream(pending_text: str = ""): content = state.flush() content_len = len(content) - if state.stream_chars_sent + content_len <= _STREAM_CHAR_LIMIT: + if state.stream_chars_sent + content_len <= STREAM_CHAR_LIMIT: await stream.append(markdown_text=content) state.stream_chars_sent += content_len else: @@ -424,10 +474,40 @@ async def _rotate_stream(pending_text: str = ""): stop_kwargs["markdown_text"] = state.flush() if completion_chunks: stop_kwargs["chunks"] = completion_chunks - await stream.stop(**stop_kwargs) + + # chat_stream is lazy — _stream_ts stays None until the first append or + # stop forces a chat.startStream call. When the agent pauses on its + # very first event (tool call before any text), we have no content and + # no task cards, and calling stop() here would open a bubble purely to + # close it, leaving a visible empty message above the approval card. + stream_opened = getattr(stream, "_stream_ts", None) is not None + if hitl_enabled and state.paused_event is not None and not stop_kwargs and not stream_opened: + try: + await async_client.assistant_threads_setStatus( + channel_id=ctx["channel_id"], thread_ts=ctx["thread_id"], status="" + ) + except Exception: + pass + else: + await stream.stop(**stop_kwargs) await upload_response_media_async(async_client, state, ctx["channel_id"], ctx["thread_id"]) + if hitl_enabled and state.paused_event is not None: + paused = state.paused_event + await render_pause_ui( + entity=entity, # type: ignore[arg-type] + entity_id=entity_id, + client=async_client, + run_id=getattr(paused, "run_id", "") or "", + session_id=getattr(paused, "session_id", None) or session_id, + channel_id=ctx["channel_id"], + thread_ts=ctx["thread_id"], + requester_slack_user_id=user_id, + team_id=team_id, + rejection_note_mode=rejection_note_mode, + ) + except Exception as e: # Check structured response first (cheap); fall back to str(e) only if needed slack_resp = getattr(e, "response", None) @@ -486,4 +566,26 @@ async def _handle_thread_started(event: dict): except Exception as e: log_error(f"Failed to set suggested prompts: {str(e)}") + if hitl_enabled and isinstance(entity, (Agent, Team)): + attach_interaction_routes( + router, + entity=entity, + entity_id=entity_id, + # slack_tools resolves SLACK_TOKEN env var fallback; passing raw + # `token` here would be None when users don't set it explicitly, + # and every AsyncWebClient call in the handler would 401. + token=slack_tools.token, + signing_secret=signing_secret, + ssl=ssl, + approval_authorization=approval_authorization, + rejection_note_mode=rejection_note_mode, + streaming_config=StreamingConfig( + entity_name=entity_name, + entity_type=entity_type, + task_display_mode=task_display_mode, + buffer_size=buffer_size, + ), + op_suffix=op_suffix, + ) + return router diff --git a/libs/agno/agno/os/interfaces/slack/slack.py b/libs/agno/agno/os/interfaces/slack/slack.py index 15d7eb1a1c..52a341864c 100644 --- a/libs/agno/agno/os/interfaces/slack/slack.py +++ b/libs/agno/agno/os/interfaces/slack/slack.py @@ -5,6 +5,8 @@ from agno.agent import Agent, RemoteAgent from agno.os.interfaces.base import BaseInterface +from agno.os.interfaces.slack.authorization import ApprovalPolicy +from agno.os.interfaces.slack.blocks import RejectionNoteMode from agno.os.interfaces.slack.router import attach_routes from agno.team import RemoteTeam, Team from agno.workflow import RemoteWorkflow, Workflow @@ -34,6 +36,9 @@ def __init__( buffer_size: int = 100, max_file_size: int = 1_073_741_824, # 1GB resolve_user_identity: bool = False, + hitl_enabled: bool = False, + approval_authorization: ApprovalPolicy = "requester_only", + rejection_note_mode: RejectionNoteMode = "optional", ): self.agent = agent self.team = team @@ -52,13 +57,42 @@ def __init__( self.buffer_size = buffer_size self.max_file_size = max_file_size self.resolve_user_identity = resolve_user_identity + self.hitl_enabled = hitl_enabled + self.approval_authorization = approval_authorization + self.rejection_note_mode = rejection_note_mode if not (self.agent or self.team or self.workflow): raise ValueError("Slack requires an agent, team, or workflow") + # HITL backend validation deferred to get_router() — AgentOS injects the + # default db into agents AFTER Slack.__init__ but BEFORE get_router(), + # so validating here would false-reject the standard AgentOS pattern. + + def _validate_hitl_backend(self) -> None: + # Validate capabilities rather than concrete DB class so the capability + # check works for any adapter exposing the approvals API. + entity = self.agent or self.team + if entity is None: + raise ValueError("hitl_enabled=True requires an Agent or Team (workflow support is planned)") + if isinstance(entity, (RemoteAgent, RemoteTeam)): + raise ValueError("hitl_enabled=True requires a local Agent or Team; remote variants are not supported") + db = getattr(entity, "db", None) + if db is None: + raise ValueError( + "hitl_enabled=True requires the agent/team to have a db configured (SqliteDb or PostgresDb)" + ) + for method in ("get_approval", "get_approvals", "update_approval"): + if getattr(db, method, None) is None: + raise ValueError( + f"hitl_enabled=True requires a DB implementing {method!r}; " + f"{type(db).__name__} does not. Use SqliteDb or PostgresDb." + ) + def get_router(self) -> APIRouter: + if self.hitl_enabled: + self._validate_hitl_backend() self.router = attach_routes( - router=APIRouter(prefix=self.prefix, tags=self.tags), # type: ignore + router=APIRouter(prefix=self.prefix, tags=self.tags), # type: ignore[arg-type] agent=self.agent, team=self.team, workflow=self.workflow, @@ -74,6 +108,9 @@ def get_router(self) -> APIRouter: buffer_size=self.buffer_size, max_file_size=self.max_file_size, resolve_user_identity=self.resolve_user_identity, + hitl_enabled=self.hitl_enabled, + approval_authorization=self.approval_authorization, + rejection_note_mode=self.rejection_note_mode, ) return self.router diff --git a/libs/agno/agno/os/interfaces/slack/state.py b/libs/agno/agno/os/interfaces/slack/state.py index 3d87e7ff17..6893ac22ec 100644 --- a/libs/agno/agno/os/interfaces/slack/state.py +++ b/libs/agno/agno/os/interfaces/slack/state.py @@ -12,6 +12,9 @@ # Literal not Enum — values flow directly into Slack API dicts as plain strings TaskStatus = Literal["in_progress", "complete", "error"] +# Slack caps streamed messages at ~40K payload (text + task card blocks combined). +STREAM_CHAR_LIMIT = 39000 + class TaskUpdateDict(TypedDict): type: str @@ -59,6 +62,9 @@ class StreamState: # Total chars sent to the current Slack stream; reset on rotation stream_chars_sent: int = 0 + # Set by _on_run_paused; caller inspects after stream ends to render approval UI + paused_event: Optional["BaseRunOutputEvent"] = None + def track_task(self, key: str, title: str) -> None: self.task_cards[key] = TaskCard(title=title) diff --git a/libs/agno/agno/tools/slack.py b/libs/agno/agno/tools/slack.py index ba9a978afe..f01859488e 100644 --- a/libs/agno/agno/tools/slack.py +++ b/libs/agno/agno/tools/slack.py @@ -391,6 +391,7 @@ def send_message_thread(self, channel: str, text: str, thread_ts: str) -> str: logger.exception("Error sending message") return json.dumps({"error": str(e)}) + def list_channels(self) -> str: """List all channels in the Slack workspace. diff --git a/libs/agno/pyproject.toml b/libs/agno/pyproject.toml index f028a69fee..f40ab49f57 100644 --- a/libs/agno/pyproject.toml +++ b/libs/agno/pyproject.toml @@ -193,7 +193,7 @@ agui = ["ag-ui-protocol"] a2a = ["a2a-sdk"] # Dependencies for Slack integration -slack = ["slack_sdk>=3.40.0"] +slack = ["slack_sdk>=3.41.0"] # Dependencies for Embedders huggingface = [