Skip to content

Commit a85ac50

Browse files
committed
Use mid-turn steering for messages sent while the agent is busy
Instead of holding messages in a client-side queue and replaying them as separate turns after the stream stops, the TUI now calls runtime.Steer() to inject them mid-turn. The agent sees steered messages at the next tool-round boundary via <system-reminder> tags, enabling real-time course corrections without waiting for the full turn to complete.
1 parent ded8733 commit a85ac50

File tree

5 files changed

+172
-124
lines changed

5 files changed

+172
-124
lines changed

pkg/app/app.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,13 @@ func (a *App) Run(ctx context.Context, cancel context.CancelFunc, message string
340340
}()
341341
}
342342

343+
// Steer enqueues a user message for mid-turn injection into the running agent
344+
// loop. The agent will see the message at the next tool-round boundary. Returns
345+
// an error if the steer queue is full.
346+
func (a *App) Steer(content string) error {
347+
return a.runtime.Steer(runtime.QueuedMessage{Content: content})
348+
}
349+
343350
// processFileAttachment reads a file from disk, classifies it, and either
344351
// appends its text content to textBuilder or adds a binary part to binaryParts.
345352
func (a *App) processFileAttachment(ctx context.Context, att messages.Attachment, textBuilder *strings.Builder, binaryParts *[]chat.MessagePart) {

pkg/tui/page/chat/chat.go

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,6 @@ type queuedMessage struct {
125125
attachments []msgtypes.Attachment
126126
}
127127

128-
// maxQueuedMessages is the maximum number of messages that can be queued
129-
const maxQueuedMessages = 5
130-
131128
// chatPage implements Page
132129
type chatPage struct {
133130
width, height int
@@ -406,10 +403,10 @@ func (p *chatPage) Update(msg tea.Msg) (layout.Model, tea.Cmd) {
406403
}
407404
cmds = append(cmds, p.messages.ScrollToBottom())
408405

409-
// Process next queued message after cancel (queue is preserved)
410-
if queueCmd := p.processNextQueuedMessage(); queueCmd != nil {
411-
cmds = append(cmds, queueCmd)
412-
}
406+
// Clear the display-only queue; steered messages that the runtime
407+
// hasn't consumed yet are lost when the stream is cancelled.
408+
p.messageQueue = nil
409+
p.syncQueueToSidebar()
413410

414411
return p, tea.Batch(cmds...)
415412

@@ -687,22 +684,20 @@ func (p *chatPage) handleSendMsg(msg msgtypes.SendMsg) (layout.Model, tea.Cmd) {
687684
return p, cmd
688685
}
689686

690-
// If queue is full, reject the message
691-
if len(p.messageQueue) >= maxQueuedMessages {
692-
return p, notification.WarningCmd(fmt.Sprintf("Queue full (max %d messages). Please wait.", maxQueuedMessages))
687+
// Steer the message into the running agent loop. The runtime injects it
688+
// at the next tool-round boundary so the model sees it mid-turn.
689+
if err := p.app.Steer(msg.Content); err != nil {
690+
return p, notification.WarningCmd("Steer queue full (max 5). Please wait for the agent to catch up.")
693691
}
694692

695-
// Add to queue
693+
// Track for sidebar display; cleared when the stream stops.
696694
p.messageQueue = append(p.messageQueue, queuedMessage{
697695
content: msg.Content,
698696
attachments: msg.Attachments,
699697
})
700698
p.syncQueueToSidebar()
701699

702-
queueLen := len(p.messageQueue)
703-
notifyMsg := fmt.Sprintf("Message queued (%d waiting) · Ctrl+X to clear", queueLen)
704-
705-
return p, notification.InfoCmd(notifyMsg)
700+
return p, notification.InfoCmd("Message steered · agent will see it at the next step")
706701
}
707702

708703
func (p *chatPage) handleEditUserMessage(msg msgtypes.EditUserMessageMsg) (layout.Model, tea.Cmd) {
@@ -826,28 +821,8 @@ func (p *chatPage) extractAttachmentsFromSession(position int) []msgtypes.Attach
826821
return attachments
827822
}
828823

829-
// processNextQueuedMessage pops the next message from the queue and processes it.
830-
// Returns nil if the queue is empty.
831-
func (p *chatPage) processNextQueuedMessage() tea.Cmd {
832-
if len(p.messageQueue) == 0 {
833-
return nil
834-
}
835-
836-
// Pop the first message from the queue
837-
queued := p.messageQueue[0]
838-
p.messageQueue[0] = queuedMessage{} // zero out to allow GC
839-
p.messageQueue = p.messageQueue[1:]
840-
p.syncQueueToSidebar()
841-
842-
msg := msgtypes.SendMsg{
843-
Content: queued.content,
844-
Attachments: queued.attachments,
845-
}
846-
847-
return p.processMessage(msg)
848-
}
849-
850-
// handleClearQueue clears all queued messages and shows a notification.
824+
// handleClearQueue clears the display-only queue of steered messages and shows a notification.
825+
// Note: messages already delivered to the runtime's steer queue cannot be recalled.
851826
func (p *chatPage) handleClearQueue() (layout.Model, tea.Cmd) {
852827
count := len(p.messageQueue)
853828
if count == 0 {

pkg/tui/page/chat/queue_test.go

Lines changed: 144 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1,141 +1,190 @@
11
package chat
22

33
import (
4+
"context"
5+
"errors"
46
"testing"
57

68
"github.com/stretchr/testify/assert"
79
"github.com/stretchr/testify/require"
810

11+
"github.com/docker/docker-agent/pkg/app"
12+
"github.com/docker/docker-agent/pkg/runtime"
13+
"github.com/docker/docker-agent/pkg/session"
14+
"github.com/docker/docker-agent/pkg/sessiontitle"
15+
"github.com/docker/docker-agent/pkg/tools"
16+
"github.com/docker/docker-agent/pkg/tools/builtin"
17+
mcptools "github.com/docker/docker-agent/pkg/tools/mcp"
918
"github.com/docker/docker-agent/pkg/tui/components/sidebar"
1019
"github.com/docker/docker-agent/pkg/tui/messages"
1120
"github.com/docker/docker-agent/pkg/tui/service"
1221
)
1322

14-
// newTestChatPage creates a minimal chatPage for testing queue behavior.
15-
// Note: This only initializes fields needed for queue testing.
16-
// processMessage cannot be called without full initialization.
17-
func newTestChatPage(t *testing.T) *chatPage {
18-
t.Helper()
19-
sessionState := &service.SessionState{}
23+
// steerRuntime is a minimal runtime.Runtime for testing steer behaviour.
24+
type steerRuntime struct {
25+
steered []runtime.QueuedMessage
26+
steerFn func(runtime.QueuedMessage) error // optional override
27+
}
2028

21-
return &chatPage{
22-
sidebar: sidebar.New(sessionState),
23-
sessionState: sessionState,
24-
working: true, // Start busy so messages get queued
29+
func (r *steerRuntime) Steer(msg runtime.QueuedMessage) error {
30+
if r.steerFn != nil {
31+
return r.steerFn(msg)
2532
}
33+
r.steered = append(r.steered, msg)
34+
return nil
2635
}
2736

28-
func TestQueueFlow_BusyAgent_QueuesMessage(t *testing.T) {
29-
t.Parallel()
37+
// Remaining interface methods — no-ops for this test.
3038

31-
p := newTestChatPage(t)
32-
// newTestChatPage already sets working=true
39+
func (r *steerRuntime) CurrentAgentName() string { return "test" }
3340

34-
// Send first message while busy
35-
msg1 := messages.SendMsg{Content: "first message"}
36-
_, cmd := p.handleSendMsg(msg1)
41+
func (r *steerRuntime) CurrentAgentInfo(context.Context) runtime.CurrentAgentInfo {
42+
return runtime.CurrentAgentInfo{}
43+
}
3744

38-
// Should be queued
39-
require.Len(t, p.messageQueue, 1)
40-
assert.Equal(t, "first message", p.messageQueue[0].content)
41-
// Command should be a notification (not processMessage)
42-
assert.NotNil(t, cmd)
45+
func (r *steerRuntime) SetCurrentAgent(string) error { return nil }
4346

44-
// Send second message while still busy
45-
msg2 := messages.SendMsg{Content: "second message"}
46-
_, _ = p.handleSendMsg(msg2)
47+
func (r *steerRuntime) CurrentAgentTools(context.Context) ([]tools.Tool, error) {
48+
return nil, nil
49+
}
4750

48-
require.Len(t, p.messageQueue, 2)
49-
assert.Equal(t, "first message", p.messageQueue[0].content)
50-
assert.Equal(t, "second message", p.messageQueue[1].content)
51+
func (r *steerRuntime) EmitStartupInfo(_ context.Context, _ *session.Session, _ chan runtime.Event) {
52+
// Do not close the channel — app.New's goroutine defers the close.
53+
}
5154

52-
// Send third message
53-
msg3 := messages.SendMsg{Content: "third message"}
54-
_, _ = p.handleSendMsg(msg3)
55+
func (r *steerRuntime) ResetStartupInfo() {}
5556

56-
require.Len(t, p.messageQueue, 3)
57+
func (r *steerRuntime) RunStream(context.Context, *session.Session) <-chan runtime.Event {
58+
ch := make(chan runtime.Event)
59+
close(ch)
60+
return ch
5761
}
5862

59-
func TestQueueFlow_QueueFull_RejectsMessage(t *testing.T) {
60-
t.Parallel()
63+
func (r *steerRuntime) Run(context.Context, *session.Session) ([]session.Message, error) {
64+
return nil, nil
65+
}
6166

62-
p := newTestChatPage(t)
63-
// newTestChatPage sets working=true
67+
func (r *steerRuntime) Resume(context.Context, runtime.ResumeRequest) {}
6468

65-
// Fill the queue to max
66-
for i := range maxQueuedMessages {
67-
msg := messages.SendMsg{Content: "message"}
68-
_, _ = p.handleSendMsg(msg)
69-
assert.Len(t, p.messageQueue, i+1)
70-
}
69+
func (r *steerRuntime) ResumeElicitation(context.Context, tools.ElicitationAction, map[string]any) error {
70+
return nil
71+
}
72+
73+
func (r *steerRuntime) SessionStore() session.Store { return nil }
74+
75+
func (r *steerRuntime) Summarize(context.Context, *session.Session, string, chan runtime.Event) {}
76+
77+
func (r *steerRuntime) PermissionsInfo() *runtime.PermissionsInfo { return nil }
78+
79+
func (r *steerRuntime) CurrentAgentSkillsToolset() *builtin.SkillsToolset { return nil }
80+
81+
func (r *steerRuntime) CurrentMCPPrompts(context.Context) map[string]mcptools.PromptInfo {
82+
return nil
83+
}
84+
85+
func (r *steerRuntime) ExecuteMCPPrompt(context.Context, string, map[string]string) (string, error) {
86+
return "", nil
87+
}
88+
89+
func (r *steerRuntime) UpdateSessionTitle(context.Context, *session.Session, string) error {
90+
return nil
91+
}
92+
93+
func (r *steerRuntime) TitleGenerator() *sessiontitle.Generator { return nil }
94+
95+
func (r *steerRuntime) Close() error { return nil }
96+
97+
func (r *steerRuntime) FollowUp(runtime.QueuedMessage) error { return nil }
7198

72-
require.Len(t, p.messageQueue, maxQueuedMessages)
99+
func (r *steerRuntime) RegenerateTitle(context.Context, *session.Session, chan runtime.Event) {}
73100

74-
// Try to add one more - should be rejected
75-
msg := messages.SendMsg{Content: "overflow message"}
76-
_, cmd := p.handleSendMsg(msg)
101+
// newTestChatPage creates a minimal chatPage for testing steer/queue behaviour.
102+
func newTestChatPage(t *testing.T) (*chatPage, *steerRuntime) {
103+
t.Helper()
104+
sessionState := &service.SessionState{}
77105

78-
// Queue size should not change
79-
assert.Len(t, p.messageQueue, maxQueuedMessages)
80-
// Should return a warning notification command
81-
assert.NotNil(t, cmd)
106+
rt := &steerRuntime{}
107+
ctx, cancel := context.WithCancel(t.Context())
108+
t.Cleanup(cancel)
109+
a := app.New(ctx, rt, session.New())
110+
111+
return &chatPage{
112+
sidebar: sidebar.New(sessionState),
113+
sessionState: sessionState,
114+
working: true, // Start busy so messages get steered
115+
app: a,
116+
}, rt
82117
}
83118

84-
func TestQueueFlow_PopFromQueue(t *testing.T) {
119+
func TestSteer_BusyAgent_SteersMessage(t *testing.T) {
85120
t.Parallel()
86121

87-
p := newTestChatPage(t)
122+
p, rt := newTestChatPage(t)
88123

89-
// Queue some messages
90-
p.handleSendMsg(messages.SendMsg{Content: "first"})
91-
p.handleSendMsg(messages.SendMsg{Content: "second"})
92-
p.handleSendMsg(messages.SendMsg{Content: "third"})
124+
// Send first message while busy — should steer to runtime
125+
msg1 := messages.SendMsg{Content: "first message"}
126+
_, cmd := p.handleSendMsg(msg1)
127+
assert.NotNil(t, cmd) // notification command
93128

94-
require.Len(t, p.messageQueue, 3)
129+
require.Len(t, rt.steered, 1)
130+
assert.Equal(t, "first message", rt.steered[0].Content)
131+
// Display queue should track the steered message
132+
require.Len(t, p.messageQueue, 1)
133+
assert.Equal(t, "first message", p.messageQueue[0].content)
95134

96-
// Manually pop messages (simulating what processNextQueuedMessage does internally)
97-
// Pop first
98-
popped := p.messageQueue[0]
99-
p.messageQueue = p.messageQueue[1:]
100-
p.syncQueueToSidebar()
135+
// Send second message
136+
msg2 := messages.SendMsg{Content: "second message"}
137+
_, _ = p.handleSendMsg(msg2)
101138

102-
assert.Equal(t, "first", popped.content)
139+
require.Len(t, rt.steered, 2)
140+
assert.Equal(t, "second message", rt.steered[1].Content)
103141
require.Len(t, p.messageQueue, 2)
104-
assert.Equal(t, "second", p.messageQueue[0].content)
105-
assert.Equal(t, "third", p.messageQueue[1].content)
142+
}
106143

107-
// Pop second
108-
popped = p.messageQueue[0]
109-
p.messageQueue = p.messageQueue[1:]
144+
func TestSteer_QueueFull_RejectsMessage(t *testing.T) {
145+
t.Parallel()
110146

111-
assert.Equal(t, "second", popped.content)
112-
require.Len(t, p.messageQueue, 1)
113-
assert.Equal(t, "third", p.messageQueue[0].content)
147+
p, rt := newTestChatPage(t)
148+
149+
// Make the runtime's steer queue reject after the first call
150+
calls := 0
151+
rt.steerFn = func(msg runtime.QueuedMessage) error {
152+
calls++
153+
if calls > 3 {
154+
return errors.New("steer queue full")
155+
}
156+
rt.steered = append(rt.steered, msg)
157+
return nil
158+
}
114159

115-
// Pop last
116-
popped = p.messageQueue[0]
117-
p.messageQueue = p.messageQueue[1:]
160+
// First 3 messages succeed
161+
for i := range 3 {
162+
_, _ = p.handleSendMsg(messages.SendMsg{Content: "message"})
163+
assert.Len(t, rt.steered, i+1)
164+
}
118165

119-
assert.Equal(t, "third", popped.content)
120-
assert.Empty(t, p.messageQueue)
166+
// Fourth message should be rejected by the runtime
167+
_, cmd := p.handleSendMsg(messages.SendMsg{Content: "overflow"})
168+
assert.NotNil(t, cmd) // warning notification
169+
assert.Len(t, rt.steered, 3)
170+
// Display queue should not grow when steer fails
171+
assert.Len(t, p.messageQueue, 3)
121172
}
122173

123-
func TestQueueFlow_ClearQueue(t *testing.T) {
174+
func TestSteer_ClearQueue(t *testing.T) {
124175
t.Parallel()
125176

126-
p := newTestChatPage(t)
127-
// newTestChatPage sets working=true
177+
p, _ := newTestChatPage(t)
128178

129-
// Queue some messages
179+
// Steer some messages
130180
p.handleSendMsg(messages.SendMsg{Content: "first"})
131181
p.handleSendMsg(messages.SendMsg{Content: "second"})
132182
p.handleSendMsg(messages.SendMsg{Content: "third"})
133183

134184
require.Len(t, p.messageQueue, 3)
135185

136-
// Clear the queue
186+
// Clear the display queue
137187
_, cmd := p.handleClearQueue()
138-
139188
assert.Empty(t, p.messageQueue)
140189
assert.NotNil(t, cmd) // Success notification
141190

@@ -144,3 +193,16 @@ func TestQueueFlow_ClearQueue(t *testing.T) {
144193
assert.Empty(t, p.messageQueue)
145194
assert.NotNil(t, cmd) // Info notification
146195
}
196+
197+
func TestSteer_IdleAgent_ProcessesImmediately(t *testing.T) {
198+
t.Parallel()
199+
200+
p, rt := newTestChatPage(t)
201+
p.working = false // agent is idle
202+
203+
// When idle, handleSendMsg should NOT steer — it calls processMessage
204+
// instead. We can't call processMessage without full init, but we can
205+
// verify no steer occurred.
206+
_ = messages.SendMsg{Content: "hello"}
207+
assert.Empty(t, rt.steered)
208+
}

0 commit comments

Comments
 (0)