From 8620f98b3aea28555b51c2d3759c0ece8a45d77c Mon Sep 17 00:00:00 2001 From: ABHINAV KUMAR Date: Fri, 17 Apr 2026 20:27:34 +0530 Subject: [PATCH] Fix for https://github.com/agno-agi/agno/issues/7517 --- libs/agno/agno/client/os.py | 21 ++++++++++++++++---- libs/agno/tests/unit/os/test_client.py | 27 ++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/libs/agno/agno/client/os.py b/libs/agno/agno/client/os.py index 8bb1c7460d..ae4d5219e0 100644 --- a/libs/agno/agno/client/os.py +++ b/libs/agno/agno/client/os.py @@ -390,10 +390,23 @@ async def _parse_sse_events( # Extract and parse JSON payload json_str = line[6:] # Remove "data: " prefix event_dict = json.loads(json_str) - - # Parse into typed event using provided factory - event = event_parser(event_dict) - yield event + # Some streaming paths emit a full RunOutput/TeamRunOutput object + # containing an `events` list instead of a single `event` object. + # Normalize both payload shapes into a sequence of event dictionaries. + if "event" in event_dict: + event_payloads = [event_dict] + elif isinstance(event_dict.get("events"), list): + event_payloads = event_dict["events"] + else: + event_payloads = [event_dict] + + for event_payload in event_payloads: + if not isinstance(event_payload, dict): + logger.error(f"Invalid SSE event payload (expected dict): {event_payload}") + continue + # Parse into typed event using provided factory + event = event_parser(event_payload) + yield event except json.JSONDecodeError: logger.exception(f"Failed to parse SSE JSON: {line[:100]}...") diff --git a/libs/agno/tests/unit/os/test_client.py b/libs/agno/tests/unit/os/test_client.py index 671d060e4b..21ce450d2b 100644 --- a/libs/agno/tests/unit/os/test_client.py +++ b/libs/agno/tests/unit/os/test_client.py @@ -723,6 +723,33 @@ async def async_generator(): assert mock_logger.exception.called +@pytest.mark.asyncio +async def test_stream_handles_run_output_payload_with_events_list(): + """Verify stream parser supports RunOutput-shaped payloads with `events`.""" + from agno.run.agent import RunCompletedEvent, RunStartedEvent + + client = AgentOSClient(base_url="http://localhost:7777") + + mock_lines = [ + 'data: {"run_id": "run-123", "events": [{"event": "RunStarted", "run_id": "run-123", "agent_id": "agent-1", "created_at": 1234567890}, {"event": "RunCompleted", "run_id": "run-123", "agent_id": "agent-1", "created_at": 1234567890}]}', + ] + + async def async_generator(): + for line in mock_lines: + yield line + + with patch.object(client, "_astream_post_form_data") as mock_stream: + mock_stream.return_value = async_generator() + + events = [] + async for event in client.run_agent_stream("agent-123", "test"): + events.append(event) + + assert len(events) == 2 + assert isinstance(events[0], RunStartedEvent) + assert isinstance(events[1], RunCompletedEvent) + + @pytest.mark.asyncio async def test_stream_handles_empty_lines(): """Verify empty lines and comments are skipped."""