Skip to content

Commit 99bd993

Browse files
fix(a2a): resubscribe on graceful SSE close with in-flight task (#2406)
* fix(a2a): resubscribe on graceful SSE close with in-flight task Load balancers with idle timeouts (commonly ~5 minutes) close idle TCP connections gracefully with a FIN rather than an error. The A2A streaming iterator surfaces this as end-of-stream, so the catch-block reconnect never runs and the message is finalized with a non-terminal taskState, orphaning the task server-side. Route clean closes through the same resubscribeLoop used in the error path when the task is still resubscribable. The loop already handles backoff, progress detection, and give-up; it just needed a second entry point. Covered by three regression tests: clean-close with in-flight task, clean-close with terminal task (no resubscribe), and clean-close where resubscribe exhausts retries (finalizes with gave-up status). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(a2a): guard clean-close reconnect from second round on finalize failure After the clean-close path enters resubscribeLoop and exhausts its 5 attempts without reaching terminal state, the captured task state is still 'working' and isResubscribable() remains true. If finalizeMessage then throws (e.g., the DB write fails), the outer catch would re-enter resubscribeLoop for another full 31s backoff round. Track entry with a resubscribeAttempted flag and skip the catch-path reconnect when the clean-close path already ran one. Mirrors the defensive pattern in the existing catch block around finalizeMessage. Add a regression test asserting exactly one round of 5 retries even when both resubscribe and finalize fail. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(a2a): address review comments on clean-close reconnect fix Production code (use-message-streaming.ts): - Remove redundant closeActiveTextBlock call in the clean-close branch; the block at the top of that stretch already closed it and nothing between could have opened a new one. - Report success=false when the clean-close path's resubscribe gives up, mirroring the error-path's gave-up semantics. An orphaned task is a failure regardless of whether the original disconnect was graceful. Tests (use-message-streaming.test.ts): - Renumber scenarios into a clean 1-25 sequence (was 1-13, 13b, 13c, 14-17 with an added 16b/c/d/e block mid-file). - Rename the TypeError test: the code breaks out of the retry loop rather than rethrowing, so "stops retrying immediately on TypeError" is accurate. - Update gave-up clean-close test to assert success=false. - Add scenario 23: clean-close resubscribe succeeds then finalizeMessage throws — guards the success-path arm of resubscribeAttempted against future regressions that change terminal-state tracking. - Add scenario 24: clean-close with taskId captured only from response metadata fallback — confirms the metadata block runs before the clean-close isResubscribable check and does not trigger spurious reconnects. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(a2a): log finalize failures on the clean-close reconnect path Mirror the catch-block recovery branch's inner try/catch so a DB write failure after a clean-close reconnect is observable in production telemetry instead of silently producing an a2a-error block. Closes the last review observation from the final Claude pass: the clean-close path previously let finalizeMessage errors propagate to the outer catch, which runs parseA2AError on the DB error but emits no log. Scenarios 22 and 23 now also assert the log fires exactly once from the clean-close branch, making the previously-inaccurate console.error spy comments accurate. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 2623d4f commit 99bd993

File tree

2 files changed

+338
-9
lines changed

2 files changed

+338
-9
lines changed

frontend/src/components/pages/agents/details/a2a/chat/hooks/use-message-streaming.test.ts

Lines changed: 298 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ describe('streamMessage - SSE reconnection via tasks/resubscribe', () => {
690690
});
691691

692692
// -------------------------------------------------------------------
693-
// Scenario 13b: Attempt counter resets when resubscribe makes progress
693+
// Scenario 14: Attempt counter resets when resubscribe makes progress
694694
// -------------------------------------------------------------------
695695
test('resets attempt counter when resubscribe delivers events, allowing unlimited retries with progress', async () => {
696696
const TASK_ID = 'task-reset-attempts';
@@ -746,7 +746,7 @@ describe('streamMessage - SSE reconnection via tasks/resubscribe', () => {
746746
});
747747

748748
// -------------------------------------------------------------------
749-
// Scenario 13c: processResubscribeStream pushes "reconnected" on first event
749+
// Scenario 15: processResubscribeStream pushes "reconnected" on first event
750750
// -------------------------------------------------------------------
751751
test('shows reconnected status as soon as resubscribe delivers first event', async () => {
752752
const TASK_ID = 'task-reconnected-early';
@@ -800,7 +800,7 @@ describe('streamMessage - SSE reconnection via tasks/resubscribe', () => {
800800
});
801801

802802
// -------------------------------------------------------------------
803-
// Scenario 14: updateMessage called with correct args after recovery
803+
// Scenario 16: updateMessage called with correct args after recovery
804804
// -------------------------------------------------------------------
805805
test('persists correct state to database after successful recovery', async () => {
806806
const TASK_ID = 'task-db-check';
@@ -842,9 +842,9 @@ describe('streamMessage - SSE reconnection via tasks/resubscribe', () => {
842842
});
843843

844844
// -------------------------------------------------------------------
845-
// Scenario 15: TypeError in resubscribe rethrown immediately, no retry
845+
// Scenario 17: TypeError in resubscribe stops retrying immediately
846846
// -------------------------------------------------------------------
847-
test('rethrows TypeError from resubscribe instead of retrying', async () => {
847+
test('stops retrying immediately on TypeError from resubscribe', async () => {
848848
const TASK_ID = 'task-typeerror';
849849
const onMessageUpdate = vi.fn();
850850

@@ -872,7 +872,7 @@ describe('streamMessage - SSE reconnection via tasks/resubscribe', () => {
872872
});
873873

874874
// -------------------------------------------------------------------
875-
// Scenario 16: finalizeMessage failure after recovery falls through to error path
875+
// Scenario 18: finalizeMessage failure after recovery falls through to error path
876876
// -------------------------------------------------------------------
877877
test('falls through to error path when finalizeMessage fails after recovery', async () => {
878878
// The production code path under test intentionally logs
@@ -918,7 +918,298 @@ describe('streamMessage - SSE reconnection via tasks/resubscribe', () => {
918918
});
919919

920920
// -------------------------------------------------------------------
921-
// Scenario 17: gave-up replaces stale reconnecting block (not appended)
921+
// Scenario 19: Stream ends cleanly with in-flight task — resubscribe runs
922+
// -------------------------------------------------------------------
923+
// Regression guard: load balancers with idle timeouts (commonly ~5 min)
924+
// close the TCP connection gracefully (FIN), which the SDK surfaces as a
925+
// clean end-of-stream rather than a thrown error. Before the fix, this
926+
// bypassed the catch-block resubscribe and finalized the message with a
927+
// non-terminal taskState. We now route clean closes through resubscribeLoop
928+
// too when the task is still in-flight.
929+
test('resubscribes when stream ends cleanly but task is still in-flight (LB idle-timeout)', async () => {
930+
const TASK_ID = 'task-clean-close-inflight';
931+
const onMessageUpdate = vi.fn();
932+
933+
// Initial stream emits task + working state and then ENDS CLEANLY (no throw).
934+
// Note the absence of a `crashAfter` argument to buildFullStream.
935+
streamTextImpl = () =>
936+
buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID, 'Still thinking...')), {
937+
responseId: TASK_ID,
938+
});
939+
940+
// Resubscribe stream picks up and drives the task to completion.
941+
const mockClient = buildMockClient([
942+
statusUpdateEvent(TASK_ID, 'completed', {
943+
text: 'All done after the idle-timeout reconnect.',
944+
messageId: 'msg-after-idle',
945+
final: true,
946+
}),
947+
]);
948+
createA2AClientImpl = vi.fn(async () => mockClient);
949+
950+
const result = await streamMessage({ ...baseParams, onMessageUpdate });
951+
952+
expect(result.success).toBe(true);
953+
expect(result.assistantMessage.taskState).toBe('completed');
954+
955+
// The fix: resubscribeTask must be called even though no error was thrown.
956+
expect(mockClient.resubscribeTask).toHaveBeenCalledWith({ id: TASK_ID });
957+
958+
// Connection-status blocks should show the disconnect → reconnected flow.
959+
const connBlocks = connectionStatuses(result.assistantMessage.contentBlocks);
960+
expect(connBlocks.some((b) => b.type === 'connection-status' && b.status === 'reconnected')).toBe(true);
961+
962+
// Final content from the resubscribe stream is present.
963+
const statusBlocks = result.assistantMessage.contentBlocks.filter((b) => b.type === 'task-status-update');
964+
expect(
965+
statusBlocks.some(
966+
(b) => b.type === 'task-status-update' && b.text === 'All done after the idle-timeout reconnect.'
967+
)
968+
).toBe(true);
969+
});
970+
971+
// -------------------------------------------------------------------
972+
// Scenario 20: Clean close on terminal task — does NOT resubscribe
973+
// -------------------------------------------------------------------
974+
test('does not resubscribe when stream ends cleanly and task is already terminal', async () => {
975+
const TASK_ID = 'task-clean-close-terminal';
976+
const onMessageUpdate = vi.fn();
977+
978+
const events = [
979+
...initialWorkingTaskEvents(TASK_ID, 'Processing...'),
980+
{
981+
type: 'raw' as const,
982+
rawValue: statusUpdateEvent(TASK_ID, 'completed', {
983+
text: 'Done normally.',
984+
messageId: 'msg-done',
985+
final: true,
986+
}),
987+
},
988+
];
989+
990+
// Stream ends cleanly after emitting the terminal 'completed' event.
991+
streamTextImpl = () =>
992+
buildStreamTextResult(buildFullStream(events), {
993+
responseId: TASK_ID,
994+
});
995+
996+
createA2AClientImpl = vi.fn(async () => buildMockClient([]));
997+
998+
const result = await streamMessage({ ...baseParams, onMessageUpdate });
999+
1000+
expect(result.success).toBe(true);
1001+
expect(result.assistantMessage.taskState).toBe('completed');
1002+
// Terminal state means isResubscribable() is false — no client should be created.
1003+
expect(vi.mocked(createA2AClientImpl)).not.toHaveBeenCalled();
1004+
1005+
const connBlocks = connectionStatuses(result.assistantMessage.contentBlocks);
1006+
expect(connBlocks).toHaveLength(0);
1007+
});
1008+
1009+
// -------------------------------------------------------------------
1010+
// Scenario 21: Clean close on in-flight task where resubscribe gives up
1011+
// -------------------------------------------------------------------
1012+
test('finalizes with gave-up status when clean-close triggers resubscribe but it exhausts retries', async () => {
1013+
const TASK_ID = 'task-clean-close-giveup';
1014+
const onMessageUpdate = vi.fn();
1015+
1016+
streamTextImpl = () =>
1017+
buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID, 'Starting slow op...')), {
1018+
responseId: TASK_ID,
1019+
});
1020+
1021+
// Every resubscribe attempt fails — backend is unreachable.
1022+
createA2AClientImpl = vi.fn(async () => buildMockClient([], new Error('still down')));
1023+
1024+
vi.useRealTimers();
1025+
vi.useFakeTimers({ shouldAdvanceTime: false });
1026+
1027+
const resultPromise = streamMessage({ ...baseParams, onMessageUpdate });
1028+
1029+
// Advance past all 5 backoff delays (1s + 2s + 4s + 8s + 16s).
1030+
for (let i = 0; i < 5; i++) {
1031+
await vi.advanceTimersByTimeAsync(2 ** i * 1000 + 100);
1032+
}
1033+
1034+
const result = await resultPromise;
1035+
1036+
// Clean-close gave-up is reported as success=false, mirroring the error-path
1037+
// gave-up: an orphaned task is a failure regardless of whether the original
1038+
// disconnect threw or was graceful. The UI surfaces the cause via the
1039+
// gave-up connection-status block.
1040+
expect(result.success).toBe(false);
1041+
expect(vi.mocked(createA2AClientImpl)).toHaveBeenCalledTimes(5);
1042+
1043+
const connBlocks = result.assistantMessage.contentBlocks.filter((b) => b.type === 'connection-status');
1044+
expect(connBlocks).toHaveLength(1);
1045+
expect(connBlocks[0].type === 'connection-status' && connBlocks[0].status).toBe('gave-up');
1046+
1047+
// Content received before the idle-timeout close is preserved.
1048+
expect(
1049+
result.assistantMessage.contentBlocks.some(
1050+
(b) => b.type === 'task-status-update' && b.text === 'Starting slow op...'
1051+
)
1052+
).toBe(true);
1053+
});
1054+
1055+
// -------------------------------------------------------------------
1056+
// Scenario 22: Clean-close resubscribe guard — no second round on gave-up + finalize failure
1057+
// -------------------------------------------------------------------
1058+
// Regression guard: after the clean-close path enters resubscribeLoop and
1059+
// gives up (task still non-terminal), if finalizeMessage subsequently
1060+
// throws (e.g., DB write fails), the outer catch must NOT invoke
1061+
// resubscribeLoop a second time. We track this via the `resubscribeAttempted`
1062+
// flag so that state-is-still-working + DB-error doesn't trigger another
1063+
// full round of exponential-backoff retries.
1064+
test('does not re-enter resubscribeLoop when finalizeMessage fails after a gave-up clean-close', async () => {
1065+
// The clean-close branch logs 'finalizeMessage failed after clean-close
1066+
// recovery:' on this path. Capture the spy so we can silence the expected
1067+
// log and also assert it fires exactly once.
1068+
const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {
1069+
// Swallow expected negative-path log to keep test output clean.
1070+
});
1071+
1072+
const TASK_ID = 'task-clean-close-no-double-reconnect';
1073+
const onMessageUpdate = vi.fn();
1074+
1075+
// Stream ends cleanly with task in 'working' state (triggers clean-close resubscribe path).
1076+
streamTextImpl = () =>
1077+
buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID)), {
1078+
responseId: TASK_ID,
1079+
});
1080+
1081+
// All resubscribe attempts fail → loop will hit gave-up.
1082+
createA2AClientImpl = vi.fn(async () => buildMockClient([], new Error('server down')));
1083+
1084+
// Make updateMessage reject on the post-resubscribe finalize call. There is
1085+
// also a saveMessage call at the top, but saveMessage is a separate mock.
1086+
vi.mocked(updateMessage).mockRejectedValueOnce(new Error('DB write failed'));
1087+
1088+
vi.useRealTimers();
1089+
vi.useFakeTimers({ shouldAdvanceTime: false });
1090+
1091+
const resultPromise = streamMessage({ ...baseParams, onMessageUpdate });
1092+
1093+
// Advance past the 5 backoff delays of the single resubscribe round.
1094+
// If the guard is broken, a second round would need another 31s of
1095+
// advancement; the assertion below will catch that.
1096+
for (let i = 0; i < 5; i++) {
1097+
await vi.advanceTimersByTimeAsync(2 ** i * 1000 + 100);
1098+
}
1099+
1100+
const result = await resultPromise;
1101+
1102+
// Exactly one round of 5 attempts — not 10.
1103+
expect(vi.mocked(createA2AClientImpl)).toHaveBeenCalledTimes(5);
1104+
1105+
// Fell through to error path because finalizeMessage threw.
1106+
expect(result.success).toBe(false);
1107+
expect(result.assistantMessage.contentBlocks.some((b) => b.type === 'a2a-error')).toBe(true);
1108+
1109+
// The clean-close recovery logs finalize failures for production
1110+
// observability; assert it fires exactly once (not once per resubscribe
1111+
// attempt and not silently swallowed).
1112+
const cleanCloseLogs = errorSpy.mock.calls.filter(
1113+
(args) => typeof args[0] === 'string' && args[0].startsWith('finalizeMessage failed after clean-close recovery')
1114+
);
1115+
expect(cleanCloseLogs).toHaveLength(1);
1116+
});
1117+
1118+
// -------------------------------------------------------------------
1119+
// Scenario 23: Clean-close resubscribe SUCCEEDS, then finalizeMessage throws
1120+
// -------------------------------------------------------------------
1121+
// Companion to scenario 22 (the gave-up arm): verifies that the
1122+
// resubscribeAttempted guard also protects the success-path arm. After a
1123+
// clean-close resubscribe that drives the task to terminal, if the DB
1124+
// write in finalizeMessage throws, the outer catch must not re-enter
1125+
// resubscribeLoop a second time (once or twice). In this particular case
1126+
// isResubscribable would already be false because the task is terminal,
1127+
// but we still want an explicit regression test so a future refactor that
1128+
// changes terminal-state tracking does not silently regress the guard.
1129+
test('does not re-enter resubscribeLoop when finalizeMessage fails after a successful clean-close resubscribe', async () => {
1130+
// Same pattern as scenario 22: silence the expected log and assert it fires.
1131+
const errorSpy = vi.spyOn(console, 'error').mockImplementation(() => {
1132+
// Swallow expected negative-path log to keep test output clean.
1133+
});
1134+
1135+
const TASK_ID = 'task-clean-close-success-then-db-fail';
1136+
const onMessageUpdate = vi.fn();
1137+
1138+
// Stream ends cleanly with task in 'working' state (clean-close path triggers).
1139+
streamTextImpl = () =>
1140+
buildStreamTextResult(buildFullStream(initialWorkingTaskEvents(TASK_ID)), {
1141+
responseId: TASK_ID,
1142+
});
1143+
1144+
// Resubscribe succeeds and drives task to terminal 'completed'.
1145+
const mockClient = buildMockClient([
1146+
statusUpdateEvent(TASK_ID, 'completed', {
1147+
text: 'Done after idle-timeout reconnect.',
1148+
messageId: 'msg-done',
1149+
final: true,
1150+
}),
1151+
]);
1152+
createA2AClientImpl = vi.fn(async () => mockClient);
1153+
1154+
// Make the post-resubscribe finalizeMessage throw on its updateMessage call.
1155+
vi.mocked(updateMessage).mockRejectedValueOnce(new Error('DB write failed'));
1156+
1157+
const result = await streamMessage({ ...baseParams, onMessageUpdate });
1158+
1159+
// Exactly one createA2AClient call — the guard prevented a second round.
1160+
expect(vi.mocked(createA2AClientImpl)).toHaveBeenCalledTimes(1);
1161+
1162+
// Finalize threw, so the outer catch ran and reported failure.
1163+
expect(result.success).toBe(false);
1164+
expect(result.assistantMessage.contentBlocks.some((b) => b.type === 'a2a-error')).toBe(true);
1165+
1166+
// Task state captured during recovery is preserved in the final message.
1167+
expect(result.assistantMessage.taskState).toBe('completed');
1168+
1169+
// Finalize-failure log fires exactly once from the clean-close branch.
1170+
const cleanCloseLogs = errorSpy.mock.calls.filter(
1171+
(args) => typeof args[0] === 'string' && args[0].startsWith('finalizeMessage failed after clean-close recovery')
1172+
);
1173+
expect(cleanCloseLogs).toHaveLength(1);
1174+
});
1175+
1176+
// -------------------------------------------------------------------
1177+
// Scenario 24: Clean-close with taskId captured from response metadata fallback
1178+
// -------------------------------------------------------------------
1179+
// The metadata-fallback block at streamMessage lines 344–352 runs BEFORE
1180+
// the clean-close isResubscribable check, so a stream that produced only a
1181+
// text-delta but has a valid "task-" id in response metadata should not
1182+
// enter resubscribeLoop (no capturedTaskState). This test asserts that
1183+
// ordering and also confirms that metadata fallback alone does not cause
1184+
// spurious reconnects.
1185+
test('does not resubscribe when only metadata fallback populates taskId and stream ended cleanly', async () => {
1186+
const TASK_ID = 'task-metadata-only';
1187+
const onMessageUpdate = vi.fn();
1188+
1189+
// No task/status-update events — just a text-delta. capturedTaskState
1190+
// will never be populated by the handlers, so isResubscribable returns
1191+
// false even after metadata fallback fills in capturedTaskId.
1192+
streamTextImpl = () =>
1193+
buildStreamTextResult(buildFullStream([{ type: 'text-delta', text: 'Quick reply.' }]), {
1194+
text: 'Quick reply.',
1195+
responseId: TASK_ID,
1196+
});
1197+
1198+
createA2AClientImpl = vi.fn(async () => buildMockClient([]));
1199+
1200+
const result = await streamMessage({ ...baseParams, onMessageUpdate });
1201+
1202+
expect(result.success).toBe(true);
1203+
expect(result.assistantMessage.taskId).toBe(TASK_ID);
1204+
// No resubscribe attempt because capturedTaskState was never set.
1205+
expect(vi.mocked(createA2AClientImpl)).not.toHaveBeenCalled();
1206+
// No connection-status blocks surfaced to the UI.
1207+
const connBlocks = connectionStatuses(result.assistantMessage.contentBlocks);
1208+
expect(connBlocks).toHaveLength(0);
1209+
});
1210+
1211+
// -------------------------------------------------------------------
1212+
// Scenario 25: gave-up replaces stale reconnecting block (not appended)
9221213
// -------------------------------------------------------------------
9231214
test('gave-up replaces the last reconnecting block instead of stacking', async () => {
9241215
const TASK_ID = 'task-gaveup-replace';

0 commit comments

Comments
 (0)