Skip to content

Chore/audio processing v2#501

Draft
dangusev wants to merge 13 commits intomainfrom
chore/audio-processing-v2
Draft

Chore/audio processing v2#501
dangusev wants to merge 13 commits intomainfrom
chore/audio-processing-v2

Conversation

@dangusev
Copy link
Copy Markdown
Collaborator

@dangusev dangusev commented Apr 21, 2026

TBD

Summary by CodeRabbit

Release Notes

  • New Features

    • Added stream-based audio processing pipeline for end-to-end inference with STT, turn detection, LLM, and TTS stages
    • Introduced streaming text-to-speech API with async iteration over output chunks
    • Added automatic 20ms PCM audio chunking for consistent audio frame handling
    • Implemented sentence-boundary tokenization for improved TTS output segmentation
  • API Changes

    • Transitioned from event-based callbacks to stream-based outputs for speech-to-text and turn detection
    • Removed manual output format configuration from TTS; stream now emits native sample rates
  • Improvements

    • Enhanced audio buffering with backpressure support and fairness guarantees

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 21, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: edd562bc-952f-474a-b7f7-6856b71c1661

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR introduces a stream-based inference architecture for real-time agent interactions. Core changes include a generic Stream[T] abstraction for async FIFO buffering, new audio/LLM turn primitives, and TranscribingInferenceFlow—a comprehensive pipeline orchestrating STT, turn detection, LLM invocation, and TTS synthesis. Event-based outputs across STT, TTS, and turn detection are replaced with stream-based publish/subscribe patterns. Test utilities transition from synchronous session-based waiting to stream collection. All plugins update to use the new typed output streams.

Changes

Cohort / File(s) Summary
Core Stream Infrastructure
agents-core/vision_agents/core/utils/stream.py
Added generic async Stream[T] with bounded FIFO buffering, backpressure semantics (send/send_nowait), consumer iteration (get/__aiter__), and lifecycle methods (close, clear, collect). Includes StreamClosed, StreamEmpty, StreamFull exceptions.
Audio Primitives
agents-core/vision_agents/core/agents/inference/audio.py
New audio stream types: AudioInputChunk (data + participant), AudioOutputChunk (PCM + final flag), AudioOutputFlush control message. AudioOutputStream re-chunks arbitrary PCM into fixed 20ms frames with carry-forward and final-padding logic.
Inference Flow Base & LLM Turn
agents-core/vision_agents/core/agents/inference/base.py, llm_turn.py
Added InferenceFlow abstract interface for lifecycle control (start, stop, interrupt) and utterance handling (simple_response, say). Added LLMTurn dataclass managing LLM response streaming, task lifecycle, and cancellation semantics.
TranscribingInferenceFlow
agents-core/vision_agents/core/agents/inference/transcribing_flow.py
New 514-line end-to-end pipeline: audio → STT (with optional turn detection) → transcript buffering → LLM turn execution → TTS synthesis → audio output. Handles interrupt, buffer management, turn-boundary logic with eager/non-eager modes, and streaming/non-streaming TTS variants.
Inference Module Exports
agents-core/vision_agents/core/agents/inference/__init__.py
Consolidated public API re-exporting audio types, InferenceFlow, RealtimeInferenceFlow, TranscribingInferenceFlow.
STT Refactoring
agents-core/vision_agents/core/stt/stt.py, __init__.py
Replaced event emission with stream-based output (Stream[Transcript | TurnEnded | TurnStarted]). Added typed TranscriptResponse, Transcript dataclasses. Removed output-format/resampling logic. Added send_iter async generator. Updated eager_turn_detection flag.
TTS Refactoring
agents-core/vision_agents/core/tts/tts.py, manual_test.py
Removed output-format configuration and resampling. Added TTSInput, TTSInputEnd, TTSOutputChunk streaming control types. Introduced send_iter async generator for streamed synthesis. Removed TTSSession testing utility (deleted testing.py). Added class-level streaming: bool flag.
Turn Detection Refactoring
agents-core/vision_agents/core/turn_detection/turn_detection.py, __init__.py
Replaced event-based emission with Stream[TurnEnded | TurnStarted] output. Added TurnStarted, TurnEnded dataclasses with confidence/metadata fields. Removed TurnEvent enum. Updated process_audio signature (audio_datadata, optional conversation).
Transcript & Buffer Utilities
agents-core/vision_agents/core/agents/transcript/buffer.py, store.py
Added TranscriptBuffer.final property tracking finality state. Updated TranscriptStore.update_* methods with optional drop parameter for clearing buffer state without mode restrictions. Changed List[str] to list[str] type hints.
Text & Tokenization Utilities
agents-core/vision_agents/core/utils/text.py, tokenizer.py, exceptions.py
Added sanitize_text for markdown removal. Added Tokenizer ABC and TTSSentenceTokenizer for streaming sentence-boundary detection in TTS. Added log_exceptions context manager for exception logging.
AssemblyAI Plugin
plugins/assemblyai/vision_agents/plugins/assemblyai/stt.py, tests/test_assemblyai_stt.py
Updated to emit Transcript/TurnStarted/TurnEnded via output.send_nowait(...) instead of internal event helpers. Test fixture refactored from STTSession to stt.output.collect().
Deepgram Plugin
plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py, tts.py, tests/test_stt.py, test_tts.py
Deepgram STT migrated to typed stream output. Deepgram TTS adds streaming=True and validates sample rates. Tests restructured from STTSession/TTSSession to stream collection. Deleted old test modules (test_deepgram_stt.py, test_deepgram_stt_close.py), added unified test_stt.py.
ElevenLabs Plugin
plugins/elevenlabs/vision_agents/plugins/elevenlabs/stt.py, tests/test_tts.py, test_tts_close.py
ElevenLabs STT emits typed Transcript/turn events via stream. TTS tests migrated from manual_tts_to_wav/TTSSession to send_iter streaming. Deleted old test module test_tts_close.py, added new unit test test_close_closes_http_client.
Fast Whisper Plugin
plugins/fast_whisper/vision_agents/plugins/fast_whisper/stt.py, tests/test_fast_whisper_stt.py
Replaced _emit_* methods with Transcript stream output. Removed error-event emission. Tests refactored to use stt.output.collect() instead of STTSession.
Fish Plugin
plugins/fish/vision_agents/plugins/fish/stt.py, tests/test_stt.py, test_tts.py
Fish STT updated to emit typed Transcript objects. Deleted old tests (test_fish_stt.py, test_fish_tts.py), added unified stream-based tests (test_stt.py, test_tts.py) using send_iter and output collection.
Inworld Plugin
plugins/inworld/vision_agents/plugins/inworld/tts.py, tests/test_tts.py
TTS close() now delegates to parent via await super().close(). Tests migrated from manual_tts_to_wav/TTSSession to send_iter streaming.
Kokoro Plugin
plugins/kokoro/vision_agents/plugins/kokoro/tts.py, tests/test_tts.py
Minor import reordering. Tests refactored from manual test to send_iter-based assertions. Fixture converted to standard @pytest.fixture.
Mistral Plugin
plugins/mistral/vision_agents/plugins/mistral/stt.py, tests/test_mistral_stt.py
Replaced transcript events with Transcript stream objects. Removed error-event emission. Tests use stt.output.collect() instead of STTSession.
OpenAI Plugin
plugins/openai/tests/test_tts.py, test_tts_openai.py
Added new stream-based integration test module. Deleted old test_tts_openai.py (session-based approach).
Pocket Plugin
plugins/pocket/tests/test_tts.py
Tests refactored from TTSSession/set_output_format to send_iter streaming assertions.
Smart Turn Plugin
plugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.py, tests/test_smart_turn.py
Replaced turn event emission via internal methods with await self.output.send(TurnStarted/TurnEnded). Tests updated to collect from output instead of subscribing to events.
Vogent Plugin
plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py, tests/test_vogent.py
Migrated turn events to await self.output.send() pattern. Updated process_audio signature (audio_datadata, optional conversation). Tests use stream collection instead of event subscriptions.
Wizper Plugin
plugins/wizper/vision_agents/plugins/wizper/stt.py, tests/test_wizper_stt.py
Replaced _emit_transcript_event with direct Transcript stream emission. Removed error-event emission. Tests use stream collection instead of STTSession.
Cartesia Plugin
plugins/cartesia/vision_agents/plugins/cartesia/tts.py, tests/test_tts.py
Minor logging level reduction (info → debug). Tests refactored to use send_iter streaming.
Test Stubs & Utilities
conftest.py, tests/test_agents/test_inference/stubs.py, tests/test_tts_base.py
Removed STTSession from conftest.py. Added comprehensive test stubs (STTStub, TTSStub, LLMStub, RealtimeStub, TurnDetectorStub) for injection-based testing. Refactored base TTS tests from event collection to send_iter streaming assertions.
Inference Tests
tests/test_agents/test_inference/test_audio.py, test_llm_turn.py, test_transcribing_flow.py
Added 1,227+ lines of comprehensive async tests covering AudioOutputStream 20ms chunking, LLMTurn lifecycle and cancellation, and TranscribingInferenceFlow end-to-end orchestration with edge cases (eager turns, barge-in, buffer management).
Stream & Utility Tests
tests/test_utils/test_stream.py, test_tokenizer.py, tests/test_agents/test_transcript/test_buffer.py, test_store.py
Added 600+ lines of tests for Stream FIFO semantics, bounded/unbounded modes, cancellation, TTSSentenceTokenizer boundary detection, and TranscriptBuffer/TranscriptStore finality/drop semantics.
Documentation
docs/ai/instructions/ai-stt.md
Updated STT implementation examples to use Transcript objects and self.output.send_nowait(...) instead of internal event-emission helpers. Added turn-detection example with typed TurnEnded/TurnStarted events.

Sequence Diagram(s)

sequenceDiagram
    participant Audio as Audio Input
    participant STT as STT
    participant TD as Turn Detector
    participant LLM as LLM
    participant TTS as TTS
    participant AudioOut as Audio Output

    Audio->>STT: process_audio(pcm, participant)
    activate STT
    STT->>STT: accumulate & detect speech
    STT-->>TD: TurnStarted event
    TD-->>TD: queue event
    
    Note over STT: Partial & final<br/>transcripts emitted
    STT-->>STT: emit Transcript(mode="delta")
    STT-->>STT: emit Transcript(mode="final")
    
    STT->>TD: process_audio(pcm)
    activate TD
    TD-->>TD: emit TurnEnded event
    deactivate TD
    deactivate STT

    Note over LLM: LLMTurn orchestration
    rect rgba(100, 150, 200, 0.5)
        LLM->>LLM: create & start LLMTurn
        LLM->>LLM: await llm.simple_response(transcript)
        LLM-->>LLM: emit LLMResponseDelta items
        LLM-->>LLM: emit LLMResponseFinal
        LLM->>LLM: confirm & finalize turn
    end

    LLM->>TTS: send LLM text deltas
    activate TTS
    TTS->>TTS: accumulate text via tokenizer
    TTS->>TTS: emit at sentence boundaries
    TTS-->>TTS: emit TTSOutputChunk(data, final)
    deactivate TTS

    TTS->>AudioOut: send TTSOutputChunk
    activate AudioOut
    AudioOut->>AudioOut: rechunk to 20ms frames
    AudioOut->>AudioOut: carry forward partial chunks
    AudioOut-->>Audio: emit AudioOutputChunk
    deactivate AudioOut
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • Cleanup stt #122: Refactors STT event emission system to use typed Transcript/TranscriptResponse objects and stream-based output, directly complementing this PR's broader streaming architecture migration.

Suggested labels

agents-core, plugins, streaming-architecture, inference-flow, breaking-change

Poem

Madness swarms in microstreams—
each chunk a 20-millisecond bell,
tolling text through silicon till words emerge,
corralled by sentences, dispatched by turn.
The old event-ghosts dissolve.
New channels carry voice like blood.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch chore/audio-processing-v2

@dangusev dangusev marked this pull request as draft April 21, 2026 09:43
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (7)
plugins/kokoro/vision_agents/plugins/kokoro/tts.py (1)

1-1: ⚠️ Potential issue | 🟡 Minor

Remove the future annotations import.

This import violates the coding guidelines, which explicitly state: "Never use from __future__ import annotations in Python files." As per coding guidelines: "Never use from __future__ import annotations in Python files".

🔧 Proposed fix
-from __future__ import annotations
-
 import asyncio

If any forward references exist in type annotations, replace them with string literals or refactor to avoid the forward reference.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/kokoro/vision_agents/plugins/kokoro/tts.py` at line 1, Remove the
line "from __future__ import annotations" from the module
(plugins/kokoro/vision_agents/plugins/kokoro/tts.py) and update any
forward-referenced type annotations in functions/classes within this file (e.g.,
references used in TTS-related functions or classes) by converting them to
string-literal annotations or refactoring the declarations so forward references
are unnecessary; ensure all annotations remain valid without the future import.
plugins/vogent/tests/test_vogent.py (1)

42-56: ⚠️ Potential issue | 🟡 Minor

Disjunctive assertion weakens the test.

assert kinds == [...] or kinds == [...] papers over nondeterminism in what the detector emits for this fixture. Either:

  • the extra start/stop pair is a real property of the input (silence boundary), in which case assert structural invariants (e.g., kinds[0] == "start", kinds[-1] == "stop", pairs alternate, count is even), or
  • it's a flake, in which case the source should be fixed.

Also worth filtering out None kinds explicitly so a surprise item type doesn't just silently fail equality with a confusing diff.

♻️ Suggested invariant-style assertion
-        items = await vogent_turn_detection.output.collect(timeout=1.0)
-        kinds = [
-            "start"
-            if isinstance(item, TurnStarted)
-            else "stop"
-            if isinstance(item, TurnEnded)
-            else None
-            for item in items
-        ]
-        assert kinds == ["start", "stop"] or kinds == [
-            "start",
-            "stop",
-            "start",
-            "stop",
-        ]
+        items = await vogent_turn_detection.output.collect(timeout=1.0)
+        kinds = [
+            "start" if isinstance(item, TurnStarted) else "stop"
+            for item in items
+            if isinstance(item, (TurnStarted, TurnEnded))
+        ]
+        assert kinds, "expected at least one turn event"
+        assert len(kinds) % 2 == 0, f"unbalanced turn events: {kinds}"
+        assert all(
+            a == "start" and b == "stop"
+            for a, b in zip(kinds[::2], kinds[1::2])
+        ), f"turn events not strictly start/stop paired: {kinds}"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/vogent/tests/test_vogent.py` around lines 42 - 56, The current
disjunctive assertion on kinds hides nondeterminism and can miss unexpected item
types; modify the test around vogent_turn_detection.output.collect(...) so you
first map items to kinds but filter out any None (i.e., explicitly handle
unexpected types), then replace the OR assertion with invariant-style checks:
assert kinds[0] == "start", kinds[-1] == "stop", len(kinds) % 2 == 0 (even
count), and that kinds alternates between "start" and "stop" (e.g., for i, k in
enumerate(kinds): assert k == ("start" if i % 2 == 0 else "stop")). This keeps
the test robust and surfaces unexpected item types instead of silently passing.
plugins/fish/vision_agents/plugins/fish/stt.py (1)

138-157: ⚠️ Potential issue | 🟠 Major

clear() flushes transcripts into the output stream and then immediately wipes them.

The override in Fish.clear() (Line 151-157) calls await self._send_buffer(key) — whose terminal step is self.output.send_nowait(Transcript(...)) here at Line 138-145 — and then calls await super().clear(), which in the base runs self._output.clear() synchronously. There is no await between the send_nowait inside _send_buffer and the base class's clear(), so any transcript enqueued by the flush is evicted before a consumer task can dequeue it. Delivery then becomes a scheduling race: it "works" only when the consumer happens to be woken during the await asyncio.to_thread(self.client.asr, ...) window inside _send_buffer.

Decide the contract for clear():

  • If it means discard pending audio / transcripts (e.g. barge-in), do not flush — just drop the buffers and call super().clear().
  • If it means finalize pending audio first, flush and then do not clear the output stream (don't call super().clear(), or have the base not wipe the queue in this path).

Given the call sites elsewhere (interrupt on turn start), option 1 is most likely the intended semantics.

🐛 Suggested fix (discard-on-clear)
     async def clear(self) -> None:
-        for key in list(self._buffers):
-            if self._buffers[key].duration_ms >= MIN_DURATION_MS:
-                await self._send_buffer(key)
-            else:
-                self._buffers[key].clear()
+        for key in list(self._buffers):
+            self._buffers[key].clear()
         await super().clear()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/fish/vision_agents/plugins/fish/stt.py` around lines 138 - 157, The
current Fish.clear() flushes buffers by awaiting self._send_buffer(key) which
enqueues transcripts via self.output.send_nowait but then immediately calls
super().clear(), racing with consumers; change clear() to discard pending
audio/transcripts instead: remove the await self._send_buffer(key) loop and
instead iterate the keys in self._buffers and call self._buffers[key].clear()
(honoring MIN_DURATION_MS is unnecessary for discard), then call await
super().clear(); update the method in the Fish class (clear) and ensure no calls
to _send_buffer or send_nowait remain in that path so pending transcripts are
not enqueued before the output is cleared.
docs/ai/instructions/ai-stt.md (1)

26-56: ⚠️ Potential issue | 🟡 Minor

Example leaks a loop-scoped response into the final emit.

response is constructed inside for part in parts: and then referenced on Line 53 for the final Transcript. When parts is empty, response is undefined and this snippet raises UnboundLocalError. Since this is copy/pasted by plugin authors (and the AI instructions pipeline), it's worth either hoisting the TranscriptResponse construction out of the loop or guarding the final emit.

📝 Proposed fix
         parts = self.client.stt(pcm_data, stream=True)
         full_text = ""
+        response = TranscriptResponse(
+            confidence=0.9,
+            language='en',
+            processing_time_ms=300,
+            audio_duration_ms=2000,
+            other={}
+        )
         for part in parts:
-            response = TranscriptResponse(
-                confidence=0.9,
-                language='en',
-                processing_time_ms=300,
-                audio_duration_ms=2000,
-                other={}
-            )
             # partials — mode depends on provider semantics:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/ai/instructions/ai-stt.md` around lines 26 - 56, The example leaks the
loop-scoped variable response (constructed inside the for part in parts: loop)
into the final Transcript emit causing UnboundLocalError when parts is empty;
fix by hoisting the TranscriptResponse construction (the response object used in
Transcript) out of the loop (create response before iterating over parts) or
alternatively guard the final self.output.send_nowait(...) so it only emits when
response is defined (e.g., only emit final Transcript if parts produced any
content or if full_text is non-empty); update references to TranscriptResponse,
Transcript, self.output.send_nowait, response, full_text, and parts accordingly.
agents-core/vision_agents/core/agents/transcript/buffer.py (1)

86-89: ⚠️ Potential issue | 🟠 Major

reset() leaves _final stuck at True.

After a "final" update sets self._final = True, calling reset() clears segments and the pending flag but not _final. The next consumer sees buffer.final == True on an empty buffer, which contradicts the semantics of a reset buffer and will mislead the new TranscriptStore/flow code that relies on this property.

Proposed fix
     def reset(self) -> None:
         """Clear all accumulated segments."""
         self._segments.clear()
         self._has_pending_partial = False
+        self._final = False
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/transcript/buffer.py` around lines 86 -
89, The reset() method clears _segments and _has_pending_partial but fails to
reset the _final flag, leaving buffer.final True after reset; update reset() to
also set self._final = False so the buffer's final state is cleared along with
segments and pending partials (reference symbols: reset(), self._segments,
self._has_pending_partial, self._final, and the buffer.final property).
plugins/mistral/vision_agents/plugins/mistral/stt.py (1)

234-268: ⚠️ Potential issue | 🟡 Minor

close() no longer marks the STT closed before doing cleanup.

Previously await super().close() ran first and flipped self.closed = True, so the if self.closed guard in process_audio (line 214) would short-circuit any concurrent calls during shutdown. With super().close() moved to the end, a process_audio call that arrives mid-close can still reach await self._connection.send_audio(...) after end_audio() has been signalled, racing with the connection teardown.

Consider calling await super().close() first (to set the closed flag) and then performing the Mistral-specific drain/close, or otherwise guarding sends during close.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 234 - 268,
The close() implementation must set the STT closed flag before performing
Mistral-specific teardown to prevent process_audio from racing into sending
after end_audio() is signalled; call await super().close() early (at the start
of close()) so self.closed is true, then proceed with end_audio(), waiting on
self._done_received, cancel_and_wait(self._receive_task), and closing
self._connection, or alternately add a send guard in process_audio() that checks
self.closed and self._connection_ready/_connection before calling
self._connection.send_audio(); update close(), process_audio(), and references
to
self._connection.end_audio/self._connection.send_audio/_receive_task/_done_received
accordingly.
agents-core/vision_agents/core/agents/transcript/store.py (1)

53-70: ⚠️ Potential issue | 🟡 Minor

drop=True is ignored when the buffer ends up empty.

If buffer is empty after buffer.update(...) (e.g., an empty-text "final" on a freshly-created entry, or any update that leaves no segments), the method returns at line 67 before the drop branch runs, so the entry remains in self._users. Callers asking for "drop" to clear state won't get that guarantee on no-op updates. Consider moving the drop cleanup before the not buffer short-circuit, or dropping unconditionally when drop=True.

The same pattern exists in update_agent_transcript (lines 115–120).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/transcript/store.py` around lines 53 -
70, The method updates the user's transcript but returns early when the buffer
is empty, so a requested drop (drop=True) is never executed; change the logic so
that when drop is True you remove the entry from self._users regardless of
whether buffer is empty—either move the drop handling to before the "if not
buffer: return None" short-circuit or perform the pop unconditionally after
buffer.update when drop is True; apply the same fix to update_agent_transcript
(the matching agent-transcript update function) and ensure you reference the
same tuple unpack (msg_id, uid, buffer) and self._users.pop(participant_id,
None) call.
🟡 Minor comments (14)
agents-core/vision_agents/core/agents/inference/base.py-7-14 (1)

7-14: ⚠️ Potential issue | 🟡 Minor

Add explicit -> None return types to lifecycle methods.

These abstract methods are part of the public flow interface and should match the typed simple_response()/say() signatures.

Proposed fix
     `@abc.abstractmethod`
-    async def start(self): ...
+    async def start(self) -> None: ...

     `@abc.abstractmethod`
-    async def stop(self): ...
+    async def stop(self) -> None: ...

     `@abc.abstractmethod`
-    async def interrupt(self): ...
+    async def interrupt(self) -> None: ...

As per coding guidelines, use type annotations everywhere with modern syntax.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/inference/base.py` around lines 7 - 14,
The abstract lifecycle methods start, stop, and interrupt in the class currently
lack explicit return annotations; update their signatures to include explicit
"-> None" return types so they match the project's typed public flow interface
(e.g., change "async def start(self): ..." to "async def start(self) -> None:
...", and do the same for stop and interrupt), ensuring type consistency with
simple_response()/say and modern typing guidelines.
agents-core/vision_agents/core/utils/stream.py-83-91 (1)

83-91: ⚠️ Potential issue | 🟡 Minor

Catch asyncio.CancelledError explicitly instead of using bare except:.

These await blocks only raise asyncio.CancelledError when futures are cancelled (no set_exception() calls exist on these futures). Bare except: also catches KeyboardInterrupt, SystemExit, and other base exceptions that should never be silently caught and re-raised.

Proposed fix
-            except:
+            except asyncio.CancelledError:
                 sender.cancel()
                 with contextlib.suppress(ValueError):
                     self._senders.remove(sender)
                 if not self.full() and not sender.cancelled():
                     self._wakeup_next_sender()
                 raise
-            except:
+            except asyncio.CancelledError:
                 getter.cancel()  # Just in case getter is not done yet.
                 with contextlib.suppress(ValueError):
                     # Clean self._getters from canceled getters.
                     self._getters.remove(getter)
                 if not self.empty() and not getter.cancelled():
                     self._wakeup_next_getter()
                 raise
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/utils/stream.py` around lines 83 - 91, The
bare except in the await sender block should be changed to catch
asyncio.CancelledError explicitly: replace "except:" with "except
asyncio.CancelledError:" (and add "import asyncio" if missing) so only
cancellation is handled by calling sender.cancel(), removing from self._senders,
calling self._wakeup_next_sender() when appropriate, and re-raising the
CancelledError; leave other exceptions unhandled so they propagate normally. Use
the existing symbols sender, self._senders, full(), and _wakeup_next_sender() to
locate the code to update.
plugins/deepgram/vision_agents/plugins/deepgram/tts.py-37-38 (1)

37-38: ⚠️ Potential issue | 🟡 Minor

Typo in comment: "detlas" → "deltas".

✏️ Fix
-    # This implementation accepts partial text detlas
+    # This implementation accepts partial text deltas
     streaming = True
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/deepgram/vision_agents/plugins/deepgram/tts.py` around lines 37 - 38,
The comment above the streaming flag contains a typo: change "detlas" to
"deltas" in the comment that currently reads "# This implementation accepts
partial text detlas" (near the streaming = True line) so it correctly reads "#
This implementation accepts partial text deltas".
tests/test_utils/test_stream.py-274-288 (1)

274-288: ⚠️ Potential issue | 🟡 Minor

Assertion doesn't match the test's intent.

The name says served_in_order, but sorted(results) == [10, 20, 30] only checks set membership, not order. If FIFO fairness across waiters is part of the Stream contract (as the sibling test_multiple_senders_served_in_order tests for senders), assert against the unsorted list so a regression in getter fairness is actually caught.

🔧 Suggested tightening
-        await asyncio.gather(*tasks)
-        assert sorted(results) == [10, 20, 30]
+        await asyncio.gather(*tasks)
+        assert results == [10, 20, 30]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_utils/test_stream.py` around lines 274 - 288, The assertion in
test_multiple_getters_served_in_order is wrong: it uses sorted(results) which
ignores ordering; change the final assertion to check the actual delivery order
by asserting results == [10, 20, 30] so the test verifies FIFO fairness for
getters (update the assertion in the test function
test_multiple_getters_served_in_order that collects values via getter which
awaits stream.get() after stream.send(10/20/30) and joins tasks).
agents-core/vision_agents/core/agents/inference/llm_turn.py-55-77 (1)

55-77: ⚠️ Potential issue | 🟡 Minor

started flips back to False after cancel().

started is defined as bool(self._llm_response_task), but cancel() sets self._llm_response_task = None after awaiting. A turn that was started and then cancelled will report started is False and cancelled is True simultaneously, and a subsequent start() call won't raise "already running". If that's intentional, consider documenting; otherwise track start as a dedicated flag and keep the task handle until destruction.

🔧 Suggested fix
-    _llm_response_task: asyncio.Task | None = field(default=None, init=False)
-    _finalize_task: asyncio.Task | None = field(default=None, init=False)
-    _cancelled: bool = field(default=False, init=False)
-    _confirmed: bool = field(default=False, init=False)
+    _llm_response_task: asyncio.Task | None = field(default=None, init=False)
+    _finalize_task: asyncio.Task | None = field(default=None, init=False)
+    _started: bool = field(default=False, init=False)
+    _cancelled: bool = field(default=False, init=False)
+    _confirmed: bool = field(default=False, init=False)
@@
     `@property`
     def started(self) -> bool:
-        return bool(self._llm_response_task)
+        return self._started
@@
     def start(self, llm: LLM) -> None:
-        if self._llm_response_task is not None:
+        if self._started:
             raise RuntimeError("LLM response task is already running")
+        self._started = True
         self._llm_response_task = asyncio.create_task(self._do_llm_response(llm))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/inference/llm_turn.py` around lines 55
- 77, The started property should not be derived solely from _llm_response_task
because cancel() clears that handle; add a dedicated boolean flag (e.g.,
self._started) used by the started property, set self._started = True in
start(self, llm), and set self._started = False only when the turn truly
finishes (e.g., at the end of _do_llm_response or when finalized) rather than
immediately in cancel(); update start(), cancel(), and the
completion/finalization path to maintain this flag so start() will correctly
detect an active turn via the new _started flag while still clearing/awaiting
the task handles (_llm_response_task and _finalize_task) as before.
agents-core/vision_agents/core/tts/tts.py-344-388 (1)

344-388: ⚠️ Potential issue | 🟡 Minor

Document the TTSOutputChunk output contract in the send_iter docstring.

The fast-path (single PcmData response) yields one chunk where data is not None and final=True. The iterator-path yields N chunks with final=False, then a terminal data=None, final=True sentinel. While the inline comment on TTSOutputChunk mentions data "can be None if it's a final chunk", the send_iter docstring should explicitly document this contract so downstream consumers understand when to expect the sentinel and when data may be None.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/tts/tts.py` around lines 344 - 388, Update the
send_iter docstring to explicitly describe the TTSOutputChunk output contract:
when response is a single PcmData (fast-path) send_iter yields one
TTSOutputChunk with data not None and final=True (index 0, includes synthesis_id
and text); when response is an async iterator send_iter yields N chunks with
data set to each PcmData and final=False (index 0..N-1), then emits a terminal
sentinel TTSOutputChunk with data=None and final=True (index N) to signal
completion; also mention that consumers should not expect audio bytes on the
final sentinel and that index, synthesis_id and text are provided on all chunks
for correlation.
tests/test_agents/test_transcript/test_buffer.py-236-246 (1)

236-246: ⚠️ Potential issue | 🟡 Minor

test_replacement_to_final doesn’t actually re-test replacement.

The test name and the comment on Line 244 both claim the third update re-enters replacement mode, but Line 245 passes mode="delta". As written, this is effectively a duplicate of test_delta_to_final’s tail and leaves the finalreplacement → non-final transition uncovered.

🧾 Proposed fix
         # another "replacement" makes it non-final again
-        buffer.update("test2", mode="delta")
+        buffer.update("test2", mode="replacement")
         assert not buffer.final
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_agents/test_transcript/test_buffer.py` around lines 236 - 246, The
test test_replacement_to_final is supposed to verify that after marking the
buffer final, a subsequent "replacement" update makes it non-final, but the
third update currently uses mode="delta"; update the third call to buffer.update
to use mode="replacement" (the same test function test_replacement_to_final and
the buffer.update(...) invocation) and assert buffer.final is False afterward so
the final→replacement→non-final transition is covered.
plugins/wizper/tests/test_wizper_stt.py-14-16 (1)

14-16: ⚠️ Potential issue | 🟡 Minor

Participant({}, ...) diverges from convention in peer tests — use original=None instead.

Peer tests construct participants as Participant(user_id=..., id=..., original=None) (see plugins/openai/tests/test_tts.py Line 24). Here {} is passed positionally and reads as inconsistent. While original is typed as Any and technically accepts any value, aligning with peer tests improves consistency and readability.

🧵 Proposed fix
-        return Participant({}, user_id="test-user", id="test-user")
+        return Participant(original=None, user_id="test-user", id="test-user")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/wizper/tests/test_wizper_stt.py` around lines 14 - 16, The fixture
participant constructs Participant using a positional empty dict which diverges
from other tests; update the pytest fixture named participant to call
Participant with explicit keyword original=None and the same user_id/id (i.e.,
Participant(user_id="test-user", id="test-user", original=None)) so it matches
peer tests and improves readability, locating the change in the participant
fixture definition where Participant(...) is returned.
plugins/pocket/tests/test_tts.py-7-19 (1)

7-19: ⚠️ Potential issue | 🟡 Minor

Close warmed TTS fixtures during teardown.

Both fixtures warm up a pocket.TTS instance but never close it, which can leak model/runtime resources across integration tests. Use yield fixtures and close in finally.

Proposed fixture cleanup
+from collections.abc import AsyncIterator
+
 import pytest
 from vision_agents.plugins import pocket
@@
     `@pytest.fixture`
-    async def tts(self) -> pocket.TTS:
+    async def tts(self) -> AsyncIterator[pocket.TTS]:
         tts_instance = pocket.TTS()
-        await tts_instance.warmup()
-        return tts_instance
+        try:
+            await tts_instance.warmup()
+            yield tts_instance
+        finally:
+            await tts_instance.close()
 
     `@pytest.fixture`
-    async def tts_custom_voice(self) -> pocket.TTS:
+    async def tts_custom_voice(self) -> AsyncIterator[pocket.TTS]:
         tts_instance = pocket.TTS(
             voice="hf://kyutai/tts-voices/alba-mackenna/casual.wav"
         )
-        await tts_instance.warmup()
-        return tts_instance
+        try:
+            await tts_instance.warmup()
+            yield tts_instance
+        finally:
+            await tts_instance.close()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/pocket/tests/test_tts.py` around lines 7 - 19, The fixtures tts and
tts_custom_voice create and warm a pocket.TTS instance but never close it;
convert each pytest.fixture to an async yield fixture that creates tts_instance,
awaits tts_instance.warmup(), yields the instance, and in a finally block await
tts_instance.close() (or the appropriate shutdown method on pocket.TTS) to
ensure model/runtime resources are released after tests; apply the same pattern
for the custom-voice fixture that constructs pocket.TTS(voice=...).
agents-core/vision_agents/core/agents/transcript/buffer.py-19-21 (1)

19-21: ⚠️ Potential issue | 🟡 Minor

Docstring indentation for the "final" mode is off.

The "final" header is indented deeper than "replacement" and "delta" above it, which breaks the rendered docstring block structure.

Proposed fix
-        ``"final"``
-        Finalizes the current segment. If text is provided it replaces the
-        segment first. An empty-text final just marks the segment as done.
+    ``"final"``
+        Finalizes the current segment. If text is provided it replaces the
+        segment first. An empty-text final just marks the segment as done.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/transcript/buffer.py` around lines 19 -
21, Fix the docstring indentation for the `"final"` mode in
agents-core/vision_agents/core/agents/transcript/buffer.py so that the `"final"`
header aligns with the `"replacement"` and `"delta"` headers above it; adjust
the leading whitespace on the lines describing `"final"` (the triple-backtick
header and its subsequent description) to match the same indentation level as
the other mode sections so the rendered docstring block structure is correct.
plugins/deepgram/tests/test_stt.py-33-39 (1)

33-39: ⚠️ Potential issue | 🟡 Minor

Minor: transcripts[0] can IndexError on a timeout or empty-result run.

If output.collect(timeout=10.0) returns without any Transcript items (e.g., upstream hiccup, or an empty window for the eager parameterization), transcripts[0].participant raises IndexError and obscures the real failure. A tiny length guard makes the failure message point at the right thing.

🪶 A small bell-jar of a guard, lest the list stay empty and the error bloom elsewhere
         transcripts = [i for i in items if isinstance(i, Transcript)]
+        assert transcripts, "no transcripts received from Deepgram within timeout"
         finals = [t for t in transcripts if t.final]
         full_transcript = " ".join(t.text for t in finals)
         assert "forgotten treasures" in full_transcript.lower()
         assert transcripts[0].participant == participant
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/deepgram/tests/test_stt.py` around lines 33 - 39, The test can
IndexError when no Transcript items are produced; add a guard before accessing
transcripts[0] by asserting the transcripts list is non-empty (e.g., assert
transcripts and/or assert len(transcripts) > 0 with a helpful message) after
building transcripts from items returned by output.collect(timeout=10.0), then
proceed to check transcripts[0].participant and other assertions; reference the
variables/types: items, Transcript, transcripts, participant, and
output.collect(timeout=10.0).
agents-core/vision_agents/core/agents/inference/transcribing_flow.py-441-442 (1)

441-442: ⚠️ Potential issue | 🟡 Minor

Minor: misleading log_exceptions message in process_llm_output.

Copy‑paste from process_audio_input — the context manager in the LLM‑output loop logs "Error while processing audio input", which will send future debuggers looking in the wrong pipeline stage.

🪶 A label slipped off its jar — rename so the right ghost answers
         async for item in llm_output:
-            with log_exceptions(logger, "Error while processing audio input"):
+            with log_exceptions(logger, "Error while processing LLM output"):
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/inference/transcribing_flow.py` around
lines 441 - 442, In process_llm_output, the log_exceptions context around the
async for over llm_output uses the wrong message ("Error while processing audio
input"); update that call to use an accurate message like "Error while
processing LLM output" (or similar) so debugging points to the LLM-output stage;
locate the async for loop iterating over llm_output and change the
log_exceptions(logger, ...) invocation accordingly.
tests/test_agents/test_inference/test_audio.py-96-100 (1)

96-100: ⚠️ Potential issue | 🟡 Minor

Minor: test_carry_is_reset_after_final does not actually prove carry reset.

With 5 ms of carry + a 10 ms follow-up, the total is 15 ms — still under the 20 ms emit threshold, so stream.empty() holds whether the final=True reset the carry or not. To make the test actually distinguish the two states, send ≥15 ms after the final so emission only happens if carry was not cleared, and drop the stream.clear() that masks the post-final state.

🪶 A sharper edge, so the carry’s ghost cannot hide under the threshold
     async def test_carry_is_reset_after_final(self, stream: AudioOutputStream):
         stream.send_nowait(AudioOutputChunk(data=make_pcm(5), final=True))
-        stream.clear()
-        stream.send_nowait(AudioOutputChunk(data=make_pcm(10)))
+        # Drain the pad+terminal emitted by the final=True above.
+        _ = stream.peek()
+        # 15ms would emit a 20ms chunk only if 5ms of carry survived final.
+        stream.send_nowait(AudioOutputChunk(data=make_pcm(15)))
         assert stream.empty()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_agents/test_inference/test_audio.py` around lines 96 - 100, The
test test_carry_is_reset_after_final currently sends 5ms then 10ms and calls
stream.clear(), which never distinguishes carry reset because 5+10 < 20ms; to
fix, remove the stream.clear() and send at least 15ms (e.g., make_pcm(15) or any
>=15ms) in the second AudioOutputChunk after the first final=True chunk so the
total crosses the 20ms threshold only if carry was NOT cleared; update the
asserts to check emission behavior using stream.empty()/not empty accordingly
and keep references to AudioOutputChunk, make_pcm, stream.send_nowait, and
stream.empty to locate the change.
tests/test_agents/test_inference/stubs.py-123-125 (1)

123-125: ⚠️ Potential issue | 🟡 Minor

RealtimeStub.simple_response signature doesn't match the base class contract.

The method is declared as a sync def returning AsyncIterator[LLMResponseDelta | LLMResponseFinal], but the Realtime base class inherits from OmniLLM, which expects async def simple_response(...) -> LLMResponseEvent[Any]. The stub's return type annotation and async/sync mismatch make it incompatible with the interface. Consider aligning it with LLMStub (which correctly uses async def) or implementing the proper return type for consistency.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_agents/test_inference/stubs.py` around lines 123 - 125, The
RealtimeStub.simple_response signature is incompatible with the OmniLLM/Realtime
contract: change the sync def simple_response to an async def and adjust its
return annotation to match the base interface (LLMResponseEvent[Any]) or mirror
the correct LLMStub implementation; ensure the method body yields/returns the
same event types expected by OmniLLM (use AsyncIterator/LLMResponseEvent[Any] as
in LLMStub) so RealtimeStub implements the same async contract as
Realtime/OmniLLM.
🧹 Nitpick comments (15)
agents-core/vision_agents/core/tts/manual_test.py (1)

15-15: Track or remove this deletion TODO.

# TODO delete is ambiguous after this cleanup; either remove it if the helper is still intended, or track the deletion explicitly.

Would you like me to open an issue for deleting or relocating this manual helper?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/tts/manual_test.py` at line 15, The stray
comment "# TODO delete" in manual_test.py is ambiguous; either remove the line
if the manual helper should remain, or replace it with a clear actionable note
referencing an issue or RFC (e.g., "TODO: remove this helper when ISSUE-1234 is
resolved") so the intent is tracked; locate the comment in
agents-core/vision_agents/core/tts/manual_test.py and either delete the TODO or
update it to a specific, trackable instruction that names the helper and links
to an issue or PR.
agents-core/vision_agents/core/turn_detection/turn_detection.py (1)

27-33: Missing docstring on TurnEnded.

TurnStarted documents the event; TurnEnded doesn't. Add a short Google-style docstring for symmetry and discoverability.

✍️ Proposed tweak
 `@dataclass`
 class TurnEnded:
+    """Event emitted when a speaker ends their turn."""
+
     participant: Participant
     confidence: float
     eager: bool = False
     trailing_silence_ms: Optional[float] = None
     duration_ms: Optional[float] = None
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/turn_detection/turn_detection.py` around lines
27 - 33, Add a short Google-style docstring to the TurnEnded dataclass to match
TurnStarted: describe that the class represents an event signaling a
participant's turn has ended and briefly document fields participant
(Participant who ended the turn), confidence (float confidence in the detector),
eager (bool for eager end detection), trailing_silence_ms (Optional[float]
silence after end), and duration_ms (Optional[float] duration of the turn).
Place the docstring directly under the TurnEnded class declaration so it mirrors
the documentation style and structure used by TurnStarted.
plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py (1)

312-335: Stream emission looks correct.

await self.output.send(...) for both TurnStarted and TurnEnded is consistent with the new Stream[T] API, and the inlined payloads carry the VAD speech_probability / Vogent confidence as expected.

Minor consistency note: other plugins (e.g. elevenlabs/stt.py) use send_nowait for this same event class. Not a bug — send vs send_nowait depend on whether the producer wants backpressure. Just worth deciding on a single house style in a follow-up so pipeline semantics are uniform across plugins.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py` around
lines 312 - 335, The code currently uses await self.output.send(...) when
emitting TurnStarted and TurnEnded; to match the house style used elsewhere
(e.g., elevenlabs/stt.py) replace these with self.output.send_nowait(...)
(remove await) for the TurnStarted and TurnEnded sends in
vogent_turn_detection.py (the calls inside the branches referencing
self._active_segment and when is_speech is true/false) so pipeline emission
semantics are consistent across plugins.
plugins/elevenlabs/vision_agents/plugins/elevenlabs/stt.py (1)

277-361: Lift the hardcoded confidence=0.5 into a named sentinel.

Four emission sites (Line 278, 324, 361, and the analogous path) all use confidence=0.5 as a placeholder because ElevenLabs' VAD doesn't surface a turn-boundary probability. That's fine, but downstream consumers may gate on confidence thresholds (eager turn handling, TranscribingInferenceFlow), and a magic 0.5 can silently cross those thresholds.

Introduce a named constant so the intent — "no meaningful probability available" — is explicit, and consider a value that can't accidentally satisfy common thresholds (or NaN, which any > comparison will reject).

♻️ Proposed cleanup
 logger = logging.getLogger(__name__)
+
+# ElevenLabs' realtime API does not expose a turn-boundary probability;
+# emit a neutral sentinel so downstream threshold checks are explicit.
+_UNKNOWN_TURN_CONFIDENCE = 0.5

and at each emission site:

-            self.output.send_nowait(
-                TurnStarted(participant=participant, confidence=0.5)
-            )
+            self.output.send_nowait(
+                TurnStarted(participant=participant, confidence=_UNKNOWN_TURN_CONFIDENCE)
+            )

(apply the same substitution to the three TurnEnded(..., confidence=0.5) sites).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/elevenlabs/vision_agents/plugins/elevenlabs/stt.py` around lines 277
- 361, Replace the hardcoded 0.5 confidence literal used when emitting turn
events and transcripts with a named sentinel constant that signals "no
meaningful probability available" (e.g., NO_CONFIDENCE or NO_TURN_CONFIDENCE
defined near the top of this module), then update all emit sites referenced in
_on_committed_transcript and where TurnStarted/TurnEnded/Transcript are sent
(symbols: _on_committed_transcript, TurnStarted, TurnEnded, Transcript) to use
that constant instead of 0.5; choose a sentinel that will not accidentally
satisfy numeric thresholds (NaN or a clearly out-of-band value) and document it
in a short comment next to the constant.
plugins/inworld/vision_agents/plugins/inworld/tts.py (1)

189-192: Prefer the implicit super().close().

super(TTS, self).close() works but the two-argument form is only needed when the lexical class isn't the one you're delegating from. Inside inworld.TTS.close, plain super() is clearer and immune to future class-name renames.

♻️ Nit
-        await super(TTS, self).close()
+        await super().close()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/inworld/vision_agents/plugins/inworld/tts.py` around lines 189 - 192,
The call to super uses the explicit two-argument form inside TTS.close; update
the method to call super().close() instead (in the async def close method of
class TTS) so the delegation is clearer and resilient to renames—replace
super(TTS, self).close() with super().close() and keep the existing await
semantics and client aclose behavior.
agents-core/vision_agents/core/agents/inference/audio.py (1)

76-91: Replace hardcoded 50 with self._chunk_frac.

Line 78 hardcodes 50 — the same constant you already derive as self._chunk_frac = 1000 // self._chunk_size_ms. If _chunk_size_ms is ever tuned, this branch will silently produce mis-sized tail chunks.

♻️ Suggested fix
-            if self._carry is not None and len(self._carry.samples) > 0:
-                chunk_size = self._carry.sample_rate // 50
-                padded = next(self._carry.chunks(chunk_size, pad_last=True))
+            if self._carry is not None and len(self._carry.samples) > 0:
+                chunk_size = self._carry.sample_rate // self._chunk_frac
+                padded = next(self._carry.chunks(chunk_size, pad_last=True))
                 super().send_nowait(AudioOutputChunk(data=padded))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/inference/audio.py` around lines 76 -
91, The code in the finalization branch uses a hardcoded 50 to compute
chunk_size which can drift from the configured chunk size; update the chunk_size
calculation in the block that checks item.final and self._carry to use
self._chunk_frac (i.e. chunk_size = self._carry.sample_rate // self._chunk_frac)
instead of 50, keeping the remainder of the logic that pads and sends the last
chunk via super().send_nowait(AudioOutputChunk(...)) and then clears self._carry
and sends the final PcmData chunk; ensure integer division is used and refer to
symbols _carry, _chunk_frac, send_nowait, AudioOutputChunk, and PcmData when
making the change.
agents-core/vision_agents/core/tts/tts.py (1)

298-299: Dead try/except Exception: raise.

Both send() (Line 298–299) and send_iter() (Line 412–413) end with except Exception: raise, which is a no-op. The PR description also mentions dropping the error-event emission. Either remove the try/except entirely or restore a meaningful handler (e.g., emit TTSErrorEvent).

♻️ Suggested cleanup for `send_iter`
-        try:
+        # Synthesize audio in provider-native format
-            # Synthesize audio in provider-native format
-            response = await self.stream_audio(text, *args, **kwargs)
+        response = await self.stream_audio(text, *args, **kwargs)
             ...
-            self.events.send(
-                TTSSynthesisCompleteEvent(
+        self.events.send(
+            TTSSynthesisCompleteEvent(
                     ...
-                )
-            )
-        except Exception:
-            raise
+            )
+        )

Same applies to the identical block in send().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/tts/tts.py` around lines 298 - 299, The
try/except blocks at the end of send() and send_iter() are no-ops (they just
"except Exception: raise"); remove these empty handlers or replace them with a
meaningful handler: catch Exception as e, emit a TTSErrorEvent (or the module's
error event) with the exception details and context (e.g., input/request id),
then re-raise the exception; update both send() and send_iter() to either delete
the try/except entirely or implement the emit-and-raise pattern so errors are
logged/emitted before being propagated.
plugins/kokoro/tests/test_tts.py (1)

8-11: Narrow the import-probe exception.

An import kokoro failure manifests as ImportError (or ModuleNotFoundError); catching bare Exception will also swallow unrelated failures (e.g., bugs raised at kokoro’s import-time side effects) and silently skip the test rather than surfacing them.

As per coding guidelines: "Never write except Exception as e. Catch specific exceptions instead".

♻️ Proposed refactor
         try:
             import kokoro  # noqa: F401
-        except Exception:
+        except ImportError:
             pytest.skip("kokoro package not installed; skipping manual playback test.")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/kokoro/tests/test_tts.py` around lines 8 - 11, The try/except around
the import kokoro line is too broad and hides unrelated errors; narrow it to
only catch import failures by replacing the broad except Exception with except
(ModuleNotFoundError, ImportError) so the test is skipped only when the kokoro
package is absent and other import-time errors still surface; update the import
probe in the test_tts.py block that contains the `import kokoro` statement
accordingly.
plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py (1)

173-173: Prefer the plain super().start() form.

super(STT, self).start() is equivalent here but redundant — inside the STT class method, super() already resolves to the same MRO entry. The verbose form can also be accidentally wrong if the class is ever renamed.

Proposed fix
-        await super(STT, self).start()
+        await super().start()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py` at line 173,
Replace the verbose super call in the STT.start implementation with the plain
form: locate the STT class's start method where it calls super(STT,
self).start(), and change it to use super().start() to avoid redundancy and
potential rename bugs; ensure you only modify that call and keep the await and
surrounding logic unchanged.
agents-core/vision_agents/core/agents/transcript/buffer.py (1)

96-99: Add a short Google-style docstring to the new final property.

As per coding guidelines ("Use Google-style docstrings and keep them short"), the new public property deserves a one-liner explaining its meaning (e.g., "True once a final update has been applied and no partial is pending").

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/transcript/buffer.py` around lines 96 -
99, Add a one-line Google-style docstring to the public property final on the
Transcript buffer class to explain its meaning (e.g., "True once a final update
has been applied and no partial is pending"). Place the docstring directly under
the `@property` decorator and before the def final(self) -> bool: signature, and
ensure it references the semantics driven by self._final and
self._has_pending_partial without changing their logic.
agents-core/vision_agents/core/agents/transcript/store.py (1)

44-53: Document the new drop parameter.

As per coding guidelines ("Use Google-style docstrings and keep them short"), the public update_user_transcript / update_agent_transcript docstrings should describe the new drop flag and its effect on buffered state. It materially changes the return/lifecycle contract.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/transcript/store.py` around lines 44 -
53, Update the Google-style docstrings for update_user_transcript and
update_agent_transcript to document the new drop: bool parameter, describing
that when drop is True the method discards buffered transcript state for the
given participant/user instead of appending text, and how this alters the return
behavior (may return None or a TranscriptUpdate indicating a dropped/cleared
buffer). Use clear short sentences, include types (drop: bool) and the effect on
buffered state and lifecycle/return contract, and mention the mode:
TranscriptMode parameter context.
plugins/assemblyai/tests/test_assemblyai_stt.py (1)

31-36: Minor: guard transcripts[0] before indexing.

If no finals arrive, line 35 fails with a helpful message but line 36 would then never run — however if a non-final transcript is produced without any finals, transcripts could be non-empty while finals is empty, so the ordering is fine. Consider asserting transcripts is non-empty explicitly to make the failure mode obvious.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/assemblyai/tests/test_assemblyai_stt.py` around lines 31 - 36, The
test currently assumes transcripts has at least one element before checking
transcripts[0].participant; add an explicit guard to assert that the transcripts
list is non-empty (after collecting items and building transcripts) so failures
are clearer — e.g., after constructing transcripts (from stt.output.collect and
variable transcripts) assert transcripts to ensure indexing is safe before
referencing transcripts[0].participant and keep the existing finals/fulls
assertions as-is.
plugins/mistral/vision_agents/plugins/mistral/stt.py (1)

114-115: Catch specific exceptions in the receive loop.

As per coding guidelines ("Never write except Exception as e. Catch specific exceptions instead"), the broad except Exception: here swallows everything from SDK and network layers. Narrow it to the concrete exceptions raised by mistralai (e.g., RealtimeTranscriptionError-adjacent SDK errors / ConnectionClosed / asyncio.IncompleteReadError) so programming errors still surface.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@plugins/mistral/vision_agents/plugins/mistral/stt.py` around lines 114 - 115,
The receive loop in plugins/mistral/vision_agents/plugins/mistral/stt.py
currently uses a broad "except Exception:" which hides programming errors;
change that catch to explicitly handle only the concrete runtime/network/SDK
errors (for example: the SDK's realtime transcription error class like
RealtimeTranscriptionError or the Mistral SDK error type, websocket/connection
exceptions such as websockets.exceptions.ConnectionClosed or any
ConnectionClosed from the transport, and asyncio.IncompleteReadError), import
those exception classes at the top, log them with logger.exception("Error in
Mistral receive loop: %s", exc) and re-raise any other unexpected exceptions so
they surface instead of being swallowed; keep the rest of the receive loop logic
unchanged and ensure the catch references the actual exception types used by the
mistralai/websocket/asyncio stack.
agents-core/vision_agents/core/agents/inference/transcribing_flow.py (2)

285-291: Nit: docstring typos ("receivng", "avaialble") and dangling sentence.

process_stt_output's docstring has two typos and a sentence that ends mid‑thought ("This task reads" in the outer docstring on process_turn_detection too — line 257). Quick polish.

🪶 A Plath-clean sweep of the margins
-            - The LLM turns are confirmed immediately after receivng TurnEnded
+            - The LLM turns are confirmed immediately after receiving TurnEnded
               regardless of their transcript.
-              They may be received long before the final transcript is avaialble.
+              They may be received long before the final transcript is available.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/inference/transcribing_flow.py` around
lines 285 - 291, Fix typos and incomplete sentences in the docstrings: in
process_stt_output correct "receivng" -> "receiving" and "avaialble" ->
"available", and edit the dangling sentence in process_turn_detection (the "This
task reads" line) to be a complete, clear sentence explaining what the task
reads and returns; update any nearby wording (e.g., "LLM turns are confirmed
immediately after receiving TurnEnded") for clarity while keeping the original
meaning.

443-457: Minor: empty/whitespace deltas still emit TTSInput and dirty the conversation.

When item.delta is empty (or sanitizes to ""), the code still calls conversation.upsert_message(content="") and tts_input.send(TTSInput(text="", delta=True)). Streaming TTS tokenizers tolerate this, but the conversation store is being upserted with empty content per empty delta, and any downstream consumer counting TTS inputs will see noise. A cheap skip keeps the pipeline tidy.

🪶 Empty deltas — small white rooms — best left unopened
                 if isinstance(item, LLMResponseDelta):
                     # Process the delta response
                     text = sanitize_text(item.delta or "")
+                    if not text:
+                        continue
                     logger.debug(f"🤖 [LLM response delta]: {text}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/inference/transcribing_flow.py` around
lines 443 - 457, The LLMResponseDelta handling currently upserts empty deltas
and sends empty TTS inputs; modify the block that processes LLMResponseDelta
(use sanitize_text(item.delta or "")) to skip further work when the sanitized
text is empty or only whitespace: do not call
self._conversation.upsert_message(...) nor tts_input.send(TTSInput(...)) if text
is empty. Keep references to LLMResponseDelta, sanitize_text,
self._conversation.upsert_message, tts_input.send, and TTSInput so the check is
added immediately after computing text and before any upsert/send calls.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 0b3a179f-9ab3-43e0-86c1-10f2aad2d41e

📥 Commits

Reviewing files that changed from the base of the PR and between d887697 and a8a1150.

📒 Files selected for processing (67)
  • agents-core/vision_agents/core/agents/inference/__init__.py
  • agents-core/vision_agents/core/agents/inference/audio.py
  • agents-core/vision_agents/core/agents/inference/base.py
  • agents-core/vision_agents/core/agents/inference/llm_turn.py
  • agents-core/vision_agents/core/agents/inference/transcribing_flow.py
  • agents-core/vision_agents/core/agents/transcript/buffer.py
  • agents-core/vision_agents/core/agents/transcript/store.py
  • agents-core/vision_agents/core/stt/__init__.py
  • agents-core/vision_agents/core/stt/stt.py
  • agents-core/vision_agents/core/tts/manual_test.py
  • agents-core/vision_agents/core/tts/testing.py
  • agents-core/vision_agents/core/tts/tts.py
  • agents-core/vision_agents/core/turn_detection/__init__.py
  • agents-core/vision_agents/core/turn_detection/turn_detection.py
  • agents-core/vision_agents/core/utils/exceptions.py
  • agents-core/vision_agents/core/utils/stream.py
  • agents-core/vision_agents/core/utils/text.py
  • agents-core/vision_agents/core/utils/tokenizer.py
  • conftest.py
  • docs/ai/instructions/ai-stt.md
  • plugins/assemblyai/tests/test_assemblyai_stt.py
  • plugins/assemblyai/vision_agents/plugins/assemblyai/stt.py
  • plugins/aws/tests/test_tts.py
  • plugins/cartesia/tests/test_tts.py
  • plugins/cartesia/vision_agents/plugins/cartesia/tts.py
  • plugins/deepgram/tests/test_deepgram_stt.py
  • plugins/deepgram/tests/test_deepgram_stt_close.py
  • plugins/deepgram/tests/test_stt.py
  • plugins/deepgram/tests/test_tts.py
  • plugins/deepgram/vision_agents/plugins/deepgram/deepgram_stt.py
  • plugins/deepgram/vision_agents/plugins/deepgram/tts.py
  • plugins/elevenlabs/tests/test_elevenlabs_stt.py
  • plugins/elevenlabs/tests/test_tts.py
  • plugins/elevenlabs/tests/test_tts_close.py
  • plugins/elevenlabs/vision_agents/plugins/elevenlabs/stt.py
  • plugins/fast_whisper/tests/test_fast_whisper_stt.py
  • plugins/fast_whisper/vision_agents/plugins/fast_whisper/stt.py
  • plugins/fish/tests/test_fish_stt.py
  • plugins/fish/tests/test_fish_tts.py
  • plugins/fish/tests/test_stt.py
  • plugins/fish/tests/test_tts.py
  • plugins/fish/vision_agents/plugins/fish/stt.py
  • plugins/inworld/tests/test_tts.py
  • plugins/inworld/vision_agents/plugins/inworld/tts.py
  • plugins/kokoro/tests/test_tts.py
  • plugins/kokoro/vision_agents/plugins/kokoro/tts.py
  • plugins/mistral/tests/test_mistral_stt.py
  • plugins/mistral/vision_agents/plugins/mistral/stt.py
  • plugins/openai/tests/test_tts.py
  • plugins/openai/tests/test_tts_openai.py
  • plugins/pocket/tests/test_tts.py
  • plugins/smart_turn/tests/test_smart_turn.py
  • plugins/smart_turn/vision_agents/plugins/smart_turn/smart_turn_detection.py
  • plugins/vogent/tests/test_vogent.py
  • plugins/vogent/vision_agents/plugins/vogent/vogent_turn_detection.py
  • plugins/wizper/tests/test_wizper_stt.py
  • plugins/wizper/vision_agents/plugins/wizper/stt.py
  • tests/test_agents/test_inference/__init__.py
  • tests/test_agents/test_inference/stubs.py
  • tests/test_agents/test_inference/test_audio.py
  • tests/test_agents/test_inference/test_llm_turn.py
  • tests/test_agents/test_inference/test_transcribing_flow.py
  • tests/test_agents/test_transcript/test_buffer.py
  • tests/test_agents/test_transcript/test_store.py
  • tests/test_tts_base.py
  • tests/test_utils/test_stream.py
  • tests/test_utils/test_tokenizer.py
💤 Files with no reviewable changes (8)
  • plugins/deepgram/tests/test_deepgram_stt_close.py
  • plugins/elevenlabs/tests/test_tts_close.py
  • plugins/openai/tests/test_tts_openai.py
  • plugins/deepgram/tests/test_deepgram_stt.py
  • conftest.py
  • plugins/fish/tests/test_fish_tts.py
  • plugins/fish/tests/test_fish_stt.py
  • agents-core/vision_agents/core/tts/testing.py

Comment on lines +39 to +45
@property
def buffered(self) -> float:
"""Return the amount of seconds of audio pending in the buffer."""
seconds = len(self._items) * self._chunk_size_ms / 1000
if self._carry is not None:
seconds += self._carry.duration
return seconds
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -nP -C3 '\b_items\b' --type=py -g '!**/tests/**'

Repository: GetStream/Vision-Agents

Length of output: 4199


🏁 Script executed:

cat -n agents-core/vision_agents/core/agents/inference/audio.py | head -100

Repository: GetStream/Vision-Agents

Length of output: 4043


🏁 Script executed:

rg -n "class AudioOutputChunk|class AudioOutputFlush|def send_nowait" agents-core/vision_agents/core/agents/inference/audio.py -A 5

Repository: GetStream/Vision-Agents

Length of output: 674


🏁 Script executed:

rg -n "AudioOutputFlush|AudioOutputChunk\(data=None" agents-core/vision_agents/core/agents/inference/ -B 2 -A 2

Repository: GetStream/Vision-Agents

Length of output: 2733


buffered miscounts control messages and empty chunks as audio duration.

self._items receives AudioOutputFlush (line 59, 99) and AudioOutputChunk(data=None) (line 56-60) control messages that carry no audio, yet each inflates the reported buffer by 20ms. This causes pacing and back-pressure logic to drift whenever flushes or sentinel chunks are queued.

Additionally, empty PcmData chunks created on final utterance (lines 82-91) are counted despite containing zero samples.

🔧 Proposed fix
     `@property`
     def buffered(self) -> float:
         """Return the amount of seconds of audio pending in the buffer."""
-        seconds = len(self._items) * self._chunk_size_ms / 1000
+        audio_items = sum(
+            1
+            for item in self._items
+            if isinstance(item, AudioOutputChunk) and item.data is not None
+        )
+        seconds = audio_items * self._chunk_size_ms / 1000
         if self._carry is not None:
             seconds += self._carry.duration
         return seconds

Note: This filters control messages but may still count empty PcmData chunks with final=True. Consider adding a check for non-empty sample data if precision is required.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/inference/audio.py` around lines 39 -
45, The buffered property currently multiplies len(self._items) by
_chunk_size_ms which counts control messages and empty chunks as audio; change
buffered to iterate self._items and only count items that are actual audio
frames (e.g., instances of PcmData or AudioOutputChunk whose .data is not None
and has non-zero samples/length) and sum their durations (use _chunk_size_ms per
non-empty chunk), and also only add self._carry.duration if _carry is an audio
object with positive duration; ignore AudioOutputFlush and
AudioOutputChunk(data=None) sentinel/control messages when computing seconds.

Comment on lines +100 to +119
async def _do_llm_response(self, llm: LLM):
with tracer.start_as_current_span("simple_response") as span:
async for item in llm.simple_response(self.transcript, self.participant):
await self.stream.send(item)
if isinstance(item, LLMResponseFinal):
self.stream.close()

span.set_attribute("text", self.transcript)

async def _do_finalize(self, output: Stream[LLMResponseDelta | LLMResponseFinal]):
"""
The LLM turn is confirmed and final, we can now read the LLM response chunks
and send them to the output.

Args:
output: output stream for LLM response chunks

"""
async for item in self.stream:
await output.send(item)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Finalizer can hang forever — stream isn't guaranteed to close.

_do_llm_response only calls self.stream.close() inside the if isinstance(item, LLMResponseFinal) branch. Two realistic paths skip it entirely:

  1. The LLM iterator finishes without ever yielding an LLMResponseFinal (early return, empty response, connection-level end-of-stream, etc.).
  2. llm.simple_response(...) raises — the exception aborts the coroutine, the stream stays open, and the caller's _do_finalize async for iterates on an open-and-empty stream forever.

Additionally, the loop does not break after emitting LLMResponseFinal, so if the LLM ever yields anything after the final, await self.stream.send(item) will raise StreamClosed into the task.

Wrap the producer in try/finally and break on final:

🔧 Proposed fix
     async def _do_llm_response(self, llm: LLM):
         with tracer.start_as_current_span("simple_response") as span:
-            async for item in llm.simple_response(self.transcript, self.participant):
-                await self.stream.send(item)
-                if isinstance(item, LLMResponseFinal):
-                    self.stream.close()
-
-            span.set_attribute("text", self.transcript)
+            span.set_attribute("text", self.transcript)
+            try:
+                async for item in llm.simple_response(self.transcript, self.participant):
+                    await self.stream.send(item)
+                    if isinstance(item, LLMResponseFinal):
+                        break
+            finally:
+                self.stream.close()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/agents/inference/llm_turn.py` around lines 100
- 119, _do_llm_response can leave self.stream open (causing _do_finalize to
hang) and can raise StreamClosed if items are yielded after an LLMResponseFinal;
wrap the producer loop in a try/finally so self.stream.close() is always called,
and break the loop immediately after sending an LLMResponseFinal to avoid
sending further items; specifically modify _do_llm_response (which iterates over
llm.simple_response(self.transcript, self.participant) and calls
self.stream.send/close) to ensure close() in the finally block and to break when
encountering LLMResponseFinal so _do_finalize (which async for's self.stream)
cannot hang or receive items after final.

Comment on lines +17 to +48
@dataclass
class TranscriptResponse:
confidence: float | None = None
language: str | None = None
processing_time_ms: float | None = None
audio_duration_ms: float | None = None
model_name: str | None = None
other: dict | None = None


@dataclass
class Transcript:
"""Event emitted when a complete transcript is available."""

participant: Participant
mode: TranscriptMode
text: str = ""
confidence: float | None = None
language: str | None = None
processing_time_ms: float | None = None
audio_duration_ms: float | None = None
model_name: str | None = None
response: TranscriptResponse = field(default_factory=TranscriptResponse)

def __post_init__(self):
if not self.text:
raise ValueError("Transcript text cannot be empty")

@property
def final(self) -> bool:
return self.mode == "final"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Transcript duplicates metadata fields with TranscriptResponse — pick one home.

Transcript declares confidence, language, processing_time_ms, audio_duration_ms, model_name at the top level, and then embeds a TranscriptResponse that has the same five fields. Across the plugin changes in this PR (fish, elevenlabs, etc.) only response=TranscriptResponse(...) is populated; the top-level fields are left at None. Consumers now have two places to look and the two can legitimately disagree — a recipe for subtle bugs downstream in TranscribingInferenceFlow and the agent layer.

Decide which surface is canonical and drop the other (or make the top-level fields computed @property delegates to response).

Separately: text: str = "" is accepted as a default by the dataclass but then rejected by __post_init__, so the default is unusable. Either drop the default or loosen the validation to only reject when mode == "final" (partial/replacement transcripts with empty text can be legitimate in some providers).

♻️ Proposed consolidation (keep `TranscriptResponse` as the single source of truth)
 `@dataclass`
 class Transcript:
     """Event emitted when a complete transcript is available."""

     participant: Participant
     mode: TranscriptMode
-    text: str = ""
-    confidence: float | None = None
-    language: str | None = None
-    processing_time_ms: float | None = None
-    audio_duration_ms: float | None = None
-    model_name: str | None = None
+    text: str
     response: TranscriptResponse = field(default_factory=TranscriptResponse)

     def __post_init__(self):
-        if not self.text:
+        if self.mode == "final" and not self.text:
             raise ValueError("Transcript text cannot be empty")
+
+    `@property`
+    def confidence(self) -> float | None:
+        return self.response.confidence
+
+    # … similar delegates for language/processing_time_ms/audio_duration_ms/model_name
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/stt/stt.py` around lines 17 - 48, The
Transcript dataclass duplicates metadata fields that already exist on
TranscriptResponse; consolidate by making TranscriptResponse the single source
of truth: remove confidence, language, processing_time_ms, audio_duration_ms,
and model_name from Transcript and update any consumers to read these values
from transcript.response; alternatively implement property delegates on
Transcript (e.g., def confidence(self): return self.response.confidence) if
backward compatibility is required. Also fix the unusable text default by either
removing the default from Transcript.text or change __post_init__ to only raise
ValueError when mode == "final" and text is empty so partial transcripts are
allowed; update references to Transcript (and TranscribingInferenceFlow
consumers) to read metadata from response or the new properties.

Comment on lines +26 to +30
except BaseException as exc:
logger.exception(message)
# Always propagate non-targeted BaseExceptions
# (includes CancelledError, KeyboardInterrupt, SystemExit, etc.)
if not isinstance(exc, exceptions) or reraise:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

cat -n agents-core/vision_agents/core/utils/exceptions.py

Repository: GetStream/Vision-Agents

Length of output: 1274


Don't log non-targeted BaseExceptions.

Catching BaseException and logging all exceptions before filtering causes normal task cancellations (e.g., CancelledError, KeyboardInterrupt) to be logged with tracebacks, making them appear as application failures when they should pass through silently.

Proposed fix
-    except BaseException as exc:
+    except exceptions:
         logger.exception(message)
-        # Always propagate non-targeted BaseExceptions
-        # (includes CancelledError, KeyboardInterrupt, SystemExit, etc.)
-        if not isinstance(exc, exceptions) or reraise:
+        if reraise:
             raise

This aligns with the guideline to catch specific exceptions instead of broad exception types.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/utils/exceptions.py` around lines 26 - 30, The
current except BaseException block logs every exception before checking whether
it’s a targeted exception, causing task cancellations (e.g., CancelledError,
KeyboardInterrupt) to be logged; update the handler so you first check the
exception using the existing isinstance(exc, exceptions) or reraise condition
and immediately re-raise non-targeted exceptions, and only call
logger.exception(message) (or otherwise log) for exceptions that are in the
targeted exceptions set (i.e., when isinstance(exc, exceptions) and not
reraise); reference the except BaseException as exc clause, the
logger.exception(message) call, and the exceptions / reraise variables when
making this change.

Comment on lines +54 to +59
def clear(self) -> None:
"""
Empty the stream but allow the running iterators to keep going.
"""
self._items.clear()
self._wakeup_next_sender()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Wake enough blocked senders after clear().

For bounded streams with maxsize > 1, clearing a full buffer frees multiple slots but only wakes one blocked send(). The remaining senders can stay suspended despite available capacity.

Proposed fix
     def clear(self) -> None:
         """
         Empty the stream but allow the running iterators to keep going.
         """
         self._items.clear()
-        self._wakeup_next_sender()
+        wakeups = self._maxsize if self._maxsize > 0 else len(self._senders)
+        for _ in range(wakeups):
+            self._wakeup_next_sender()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/utils/stream.py` around lines 54 - 59, The
clear() method only calls _wakeup_next_sender(), which wakes a single blocked
sender; update clear() (in stream.py) to wake as many blocked senders as there
are newly freed slots: after self._items.clear(), loop while self.maxsize == 0
or len(self._items) < self.maxsize and there are entries in self._send_waiters,
calling _wakeup_next_sender() for each iteration (or otherwise drain/wake send
waiters until capacity is filled or no waiters remain) so all eligible send()
waiters resume; use the existing _send_waiters deque/list and preserve existing
locking semantics around _wakeup_next_sender().

Comment on lines +42 to +48
def flush(self) -> str:
"""
Empty the buffer and return the accumulated text.
"""
text = self._buffer.strip()
self._buffer = ""
return text
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Sanitize the final flushed TTS chunk too.

flush() emits the last buffered text to TTS, so markdown in an unterminated final response currently bypasses sanitize_text() and can be spoken literally.

Proposed fix
     def flush(self) -> str:
         """
         Empty the buffer and return the accumulated text.
         """
-        text = self._buffer.strip()
+        text = sanitize_text(self._buffer.strip())
         self._buffer = ""
         return text
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def flush(self) -> str:
"""
Empty the buffer and return the accumulated text.
"""
text = self._buffer.strip()
self._buffer = ""
return text
def flush(self) -> str:
"""
Empty the buffer and return the accumulated text.
"""
text = sanitize_text(self._buffer.strip())
self._buffer = ""
return text
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agents-core/vision_agents/core/utils/tokenizer.py` around lines 42 - 48, The
flush() method returns the final buffered string without sanitization, allowing
markdown/unsafe content to be emitted; update flush in tokenizer.py to call
sanitize_text on the accumulated text (e.g., text =
sanitize_text(self._buffer.strip())) before clearing self._buffer and returning
it, referencing the existing sanitize_text function and the flush method/_buffer
field to locate the change.

Comment thread plugins/aws/tests/test_tts.py
Comment thread plugins/fast_whisper/vision_agents/plugins/fast_whisper/stt.py
Comment thread plugins/wizper/vision_agents/plugins/wizper/stt.py
Comment thread tests/test_agents/test_inference/test_transcribing_flow.py
@dangusev dangusev force-pushed the chore/audio-processing-v2 branch from a8a1150 to e00ebff Compare April 21, 2026 12:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant