Skip to content

Commit e27e670

Browse files
committed
Use mid-turn steering for messages sent while the agent is busy
Instead of holding messages in a client-side queue and replaying them as separate turns after the stream stops, the TUI now calls runtime.Steer() to inject them mid-turn. The agent sees steered messages at the next tool-round boundary via <system-reminder> tags, enabling real-time course corrections without waiting for the full turn to complete.
1 parent ded8733 commit e27e670

File tree

5 files changed

+155
-124
lines changed

5 files changed

+155
-124
lines changed

pkg/app/app.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,13 @@ func (a *App) Run(ctx context.Context, cancel context.CancelFunc, message string
340340
}()
341341
}
342342

343+
// Steer enqueues a user message for mid-turn injection into the running agent
344+
// loop. The agent will see the message at the next tool-round boundary. Returns
345+
// an error if the steer queue is full.
346+
func (a *App) Steer(content string) error {
347+
return a.runtime.Steer(runtime.QueuedMessage{Content: content})
348+
}
349+
343350
// processFileAttachment reads a file from disk, classifies it, and either
344351
// appends its text content to textBuilder or adds a binary part to binaryParts.
345352
func (a *App) processFileAttachment(ctx context.Context, att messages.Attachment, textBuilder *strings.Builder, binaryParts *[]chat.MessagePart) {

pkg/tui/page/chat/chat.go

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -406,10 +406,10 @@ func (p *chatPage) Update(msg tea.Msg) (layout.Model, tea.Cmd) {
406406
}
407407
cmds = append(cmds, p.messages.ScrollToBottom())
408408

409-
// Process next queued message after cancel (queue is preserved)
410-
if queueCmd := p.processNextQueuedMessage(); queueCmd != nil {
411-
cmds = append(cmds, queueCmd)
412-
}
409+
// Clear the display-only queue; steered messages that the runtime
410+
// hasn't consumed yet are lost when the stream is cancelled.
411+
p.messageQueue = nil
412+
p.syncQueueToSidebar()
413413

414414
return p, tea.Batch(cmds...)
415415

@@ -687,22 +687,20 @@ func (p *chatPage) handleSendMsg(msg msgtypes.SendMsg) (layout.Model, tea.Cmd) {
687687
return p, cmd
688688
}
689689

690-
// If queue is full, reject the message
691-
if len(p.messageQueue) >= maxQueuedMessages {
692-
return p, notification.WarningCmd(fmt.Sprintf("Queue full (max %d messages). Please wait.", maxQueuedMessages))
690+
// Steer the message into the running agent loop. The runtime injects it
691+
// at the next tool-round boundary so the model sees it mid-turn.
692+
if err := p.app.Steer(msg.Content); err != nil {
693+
return p, notification.WarningCmd("Steer queue full (max 5). Please wait for the agent to catch up.")
693694
}
694695

695-
// Add to queue
696+
// Track for sidebar display; cleared when the stream stops.
696697
p.messageQueue = append(p.messageQueue, queuedMessage{
697698
content: msg.Content,
698699
attachments: msg.Attachments,
699700
})
700701
p.syncQueueToSidebar()
701702

702-
queueLen := len(p.messageQueue)
703-
notifyMsg := fmt.Sprintf("Message queued (%d waiting) · Ctrl+X to clear", queueLen)
704-
705-
return p, notification.InfoCmd(notifyMsg)
703+
return p, notification.InfoCmd("Message steered · agent will see it at the next step")
706704
}
707705

708706
func (p *chatPage) handleEditUserMessage(msg msgtypes.EditUserMessageMsg) (layout.Model, tea.Cmd) {
@@ -826,28 +824,8 @@ func (p *chatPage) extractAttachmentsFromSession(position int) []msgtypes.Attach
826824
return attachments
827825
}
828826

829-
// processNextQueuedMessage pops the next message from the queue and processes it.
830-
// Returns nil if the queue is empty.
831-
func (p *chatPage) processNextQueuedMessage() tea.Cmd {
832-
if len(p.messageQueue) == 0 {
833-
return nil
834-
}
835-
836-
// Pop the first message from the queue
837-
queued := p.messageQueue[0]
838-
p.messageQueue[0] = queuedMessage{} // zero out to allow GC
839-
p.messageQueue = p.messageQueue[1:]
840-
p.syncQueueToSidebar()
841-
842-
msg := msgtypes.SendMsg{
843-
Content: queued.content,
844-
Attachments: queued.attachments,
845-
}
846-
847-
return p.processMessage(msg)
848-
}
849-
850-
// handleClearQueue clears all queued messages and shows a notification.
827+
// handleClearQueue clears the display-only queue of steered messages and shows a notification.
828+
// Note: messages already delivered to the runtime's steer queue cannot be recalled.
851829
func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) {
852830
count := len(p.messageQueue)
853831
if count == 0 {

pkg/tui/page/chat/queue_test.go

Lines changed: 127 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,141 +1,170 @@
11
package chat
22

33
import (
4+
"context"
5+
"errors"
46
"testing"
57

68
"github.com/stretchr/testify/assert"
79
"github.com/stretchr/testify/require"
810

11+
"github.com/docker/docker-agent/pkg/app"
12+
"github.com/docker/docker-agent/pkg/runtime"
13+
"github.com/docker/docker-agent/pkg/session"
14+
"github.com/docker/docker-agent/pkg/tools"
15+
mcptools "github.com/docker/docker-agent/pkg/tools/mcp"
16+
17+
"github.com/docker/docker-agent/pkg/sessiontitle"
18+
"github.com/docker/docker-agent/pkg/tools/builtin"
919
"github.com/docker/docker-agent/pkg/tui/components/sidebar"
1020
"github.com/docker/docker-agent/pkg/tui/messages"
1121
"github.com/docker/docker-agent/pkg/tui/service"
1222
)
1323

14-
// newTestChatPage creates a minimal chatPage for testing queue behavior.
15-
// Note: This only initializes fields needed for queue testing.
16-
// processMessage cannot be called without full initialization.
17-
func newTestChatPage(t *testing.T) *chatPage {
24+
// steerRuntime is a minimal runtime.Runtime for testing steer behaviour.
25+
type steerRuntime struct {
26+
steered []runtime.QueuedMessage
27+
steerFn func(runtime.QueuedMessage) error // optional override
28+
}
29+
30+
func (r *steerRuntime) Steer(msg runtime.QueuedMessage) error {
31+
if r.steerFn != nil {
32+
return r.steerFn(msg)
33+
}
34+
r.steered = append(r.steered, msg)
35+
return nil
36+
}
37+
38+
// Remaining interface methods — no-ops for this test.
39+
func (r *steerRuntime) CurrentAgentName() string { return "test" }
40+
func (r *steerRuntime) CurrentAgentInfo(context.Context) runtime.CurrentAgentInfo {
41+
return runtime.CurrentAgentInfo{}
42+
}
43+
func (r *steerRuntime) SetCurrentAgent(string) error { return nil }
44+
func (r *steerRuntime) CurrentAgentTools(context.Context) ([]tools.Tool, error) {
45+
return nil, nil
46+
}
47+
func (r *steerRuntime) EmitStartupInfo(_ context.Context, _ *session.Session, _ chan runtime.Event) {
48+
// Do not close the channel — app.New's goroutine defers the close.
49+
}
50+
func (r *steerRuntime) ResetStartupInfo() {}
51+
func (r *steerRuntime) RunStream(context.Context, *session.Session) <-chan runtime.Event {
52+
ch := make(chan runtime.Event)
53+
close(ch)
54+
return ch
55+
}
56+
func (r *steerRuntime) Run(context.Context, *session.Session) ([]session.Message, error) {
57+
return nil, nil
58+
}
59+
func (r *steerRuntime) Resume(context.Context, runtime.ResumeRequest) {}
60+
func (r *steerRuntime) ResumeElicitation(context.Context, tools.ElicitationAction, map[string]any) error {
61+
return nil
62+
}
63+
func (r *steerRuntime) SessionStore() session.Store { return nil }
64+
func (r *steerRuntime) Summarize(context.Context, *session.Session, string, chan runtime.Event) {}
65+
func (r *steerRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil }
66+
func (r *steerRuntime) CurrentAgentSkillsToolset() *builtin.SkillsToolset { return nil }
67+
func (r *steerRuntime) CurrentMCPPrompts(context.Context) map[string]mcptools.PromptInfo {
68+
return nil
69+
}
70+
func (r *steerRuntime) ExecuteMCPPrompt(context.Context, string, map[string]string) (string, error) {
71+
return "", nil
72+
}
73+
func (r *steerRuntime) UpdateSessionTitle(context.Context, *session.Session, string) error {
74+
return nil
75+
}
76+
func (r *steerRuntime) TitleGenerator() *sessiontitle.Generator { return nil }
77+
func (r *steerRuntime) Close() error { return nil }
78+
func (r *steerRuntime) FollowUp(runtime.QueuedMessage) error { return nil }
79+
func (r *steerRuntime) RegenerateTitle(context.Context, *session.Session, chan runtime.Event) {}
80+
81+
// newTestChatPage creates a minimal chatPage for testing steer/queue behaviour.
82+
func newTestChatPage(t *testing.T) (*chatPage, *steerRuntime) {
1883
t.Helper()
1984
sessionState := &service.SessionState{}
2085

86+
rt := &steerRuntime{}
87+
ctx, cancel := context.WithCancel(context.Background())
88+
t.Cleanup(cancel)
89+
a := app.New(ctx, rt, session.New())
90+
2191
return &chatPage{
2292
sidebar: sidebar.New(sessionState),
2393
sessionState: sessionState,
24-
working: true, // Start busy so messages get queued
25-
}
94+
working: true, // Start busy so messages get steered
95+
app: a,
96+
}, rt
2697
}
2798

28-
func TestQueueFlow_BusyAgent_QueuesMessage(t *testing.T) {
99+
func TestSteer_BusyAgent_SteersMessage(t *testing.T) {
29100
t.Parallel()
30101

31-
p := newTestChatPage(t)
32-
// newTestChatPage already sets working=true
102+
p, rt := newTestChatPage(t)
33103

34-
// Send first message while busy
104+
// Send first message while busy — should steer to runtime
35105
msg1 := messages.SendMsg{Content: "first message"}
36106
_, cmd := p.handleSendMsg(msg1)
107+
assert.NotNil(t, cmd) // notification command
37108

38-
// Should be queued
109+
require.Len(t, rt.steered, 1)
110+
assert.Equal(t, "first message", rt.steered[0].Content)
111+
// Display queue should track the steered message
39112
require.Len(t, p.messageQueue, 1)
40113
assert.Equal(t, "first message", p.messageQueue[0].content)
41-
// Command should be a notification (not processMessage)
42-
assert.NotNil(t, cmd)
43114

44-
// Send second message while still busy
115+
// Send second message
45116
msg2 := messages.SendMsg{Content: "second message"}
46117
_, _ = p.handleSendMsg(msg2)
47118

119+
require.Len(t, rt.steered, 2)
120+
assert.Equal(t, "second message", rt.steered[1].Content)
48121
require.Len(t, p.messageQueue, 2)
49-
assert.Equal(t, "first message", p.messageQueue[0].content)
50-
assert.Equal(t, "second message", p.messageQueue[1].content)
51-
52-
// Send third message
53-
msg3 := messages.SendMsg{Content: "third message"}
54-
_, _ = p.handleSendMsg(msg3)
55-
56-
require.Len(t, p.messageQueue, 3)
57122
}
58123

59-
func TestQueueFlow_QueueFull_RejectsMessage(t *testing.T) {
124+
func TestSteer_QueueFull_RejectsMessage(t *testing.T) {
60125
t.Parallel()
61126

62-
p := newTestChatPage(t)
63-
// newTestChatPage sets working=true
64-
65-
// Fill the queue to max
66-
for i := range maxQueuedMessages {
67-
msg := messages.SendMsg{Content: "message"}
68-
_, _ = p.handleSendMsg(msg)
69-
assert.Len(t, p.messageQueue, i+1)
127+
p, rt := newTestChatPage(t)
128+
129+
// Make the runtime's steer queue reject after the first call
130+
calls := 0
131+
rt.steerFn = func(msg runtime.QueuedMessage) error {
132+
calls++
133+
if calls > 3 {
134+
return errors.New("steer queue full")
135+
}
136+
rt.steered = append(rt.steered, msg)
137+
return nil
70138
}
71139

72-
require.Len(t, p.messageQueue, maxQueuedMessages)
73-
74-
// Try to add one more - should be rejected
75-
msg := messages.SendMsg{Content: "overflow message"}
76-
_, cmd := p.handleSendMsg(msg)
77-
78-
// Queue size should not change
79-
assert.Len(t, p.messageQueue, maxQueuedMessages)
80-
// Should return a warning notification command
81-
assert.NotNil(t, cmd)
82-
}
83-
84-
func TestQueueFlow_PopFromQueue(t *testing.T) {
85-
t.Parallel()
86-
87-
p := newTestChatPage(t)
88-
89-
// Queue some messages
90-
p.handleSendMsg(messages.SendMsg{Content: "first"})
91-
p.handleSendMsg(messages.SendMsg{Content: "second"})
92-
p.handleSendMsg(messages.SendMsg{Content: "third"})
93-
94-
require.Len(t, p.messageQueue, 3)
95-
96-
// Manually pop messages (simulating what processNextQueuedMessage does internally)
97-
// Pop first
98-
popped := p.messageQueue[0]
99-
p.messageQueue = p.messageQueue[1:]
100-
p.syncQueueToSidebar()
101-
102-
assert.Equal(t, "first", popped.content)
103-
require.Len(t, p.messageQueue, 2)
104-
assert.Equal(t, "second", p.messageQueue[0].content)
105-
assert.Equal(t, "third", p.messageQueue[1].content)
106-
107-
// Pop second
108-
popped = p.messageQueue[0]
109-
p.messageQueue = p.messageQueue[1:]
110-
111-
assert.Equal(t, "second", popped.content)
112-
require.Len(t, p.messageQueue, 1)
113-
assert.Equal(t, "third", p.messageQueue[0].content)
114-
115-
// Pop last
116-
popped = p.messageQueue[0]
117-
p.messageQueue = p.messageQueue[1:]
140+
// First 3 messages succeed
141+
for i := range 3 {
142+
_, _ = p.handleSendMsg(messages.SendMsg{Content: "message"})
143+
assert.Len(t, rt.steered, i+1)
144+
}
118145

119-
assert.Equal(t, "third", popped.content)
120-
assert.Empty(t, p.messageQueue)
146+
// Fourth message should be rejected by the runtime
147+
_, cmd := p.handleSendMsg(messages.SendMsg{Content: "overflow"})
148+
assert.NotNil(t, cmd) // warning notification
149+
assert.Len(t, rt.steered, 3)
150+
// Display queue should not grow when steer fails
151+
assert.Len(t, p.messageQueue, 3)
121152
}
122153

123-
func TestQueueFlow_ClearQueue(t *testing.T) {
154+
func TestSteer_ClearQueue(t *testing.T) {
124155
t.Parallel()
125156

126-
p := newTestChatPage(t)
127-
// newTestChatPage sets working=true
157+
p, _ := newTestChatPage(t)
128158

129-
// Queue some messages
159+
// Steer some messages
130160
p.handleSendMsg(messages.SendMsg{Content: "first"})
131161
p.handleSendMsg(messages.SendMsg{Content: "second"})
132162
p.handleSendMsg(messages.SendMsg{Content: "third"})
133163

134164
require.Len(t, p.messageQueue, 3)
135165

136-
// Clear the queue
166+
// Clear the display queue
137167
_, cmd := p.handleClearQueue()
138-
139168
assert.Empty(t, p.messageQueue)
140169
assert.NotNil(t, cmd) // Success notification
141170

@@ -144,3 +173,16 @@ func TestQueueFlow_ClearQueue(t *testing.T) {
144173
assert.Empty(t, p.messageQueue)
145174
assert.NotNil(t, cmd) // Info notification
146175
}
176+
177+
func TestSteer_IdleAgent_ProcessesImmediately(t *testing.T) {
178+
t.Parallel()
179+
180+
p, rt := newTestChatPage(t)
181+
p.working = false // agent is idle
182+
183+
// When idle, handleSendMsg should NOT steer — it calls processMessage
184+
// instead. We can't call processMessage without full init, but we can
185+
// verify no steer occurred.
186+
_ = messages.SendMsg{Content: "hello"}
187+
assert.Empty(t, rt.steered)
188+
}

pkg/tui/page/chat/runtime_events.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,11 @@ func (p *chatPage) handleStreamStopped(msg *runtime.StreamStoppedEvent) tea.Cmd
256256
p.streamCancelled = false
257257
spinnerCmd := p.setWorking(false)
258258
p.setPendingResponse(false)
259-
queueCmd := p.processNextQueuedMessage()
259+
260+
// Clear the display-only shadow queue; all steered messages have been
261+
// consumed by the runtime loop at this point.
262+
p.messageQueue = nil
263+
p.syncQueueToSidebar()
260264

261265
var exitCmd tea.Cmd
262266
if p.app.ShouldExitAfterFirstResponse() && p.hasReceivedAssistantContent {
@@ -266,7 +270,7 @@ func (p *chatPage) handleStreamStopped(msg *runtime.StreamStoppedEvent) tea.Cmd
266270
})
267271
}
268272

269-
return tea.Batch(p.messages.ScrollToBottom(), spinnerCmd, sidebarCmd, queueCmd, exitCmd)
273+
return tea.Batch(p.messages.ScrollToBottom(), spinnerCmd, sidebarCmd, exitCmd)
270274
}
271275

272276
// handlePartialToolCall processes partial tool call events by rendering each

0 commit comments

Comments
 (0)