Skip to content

Commit 99b4c9b

Browse files
hekikeclaude
andcommitted
fix(invoicesync): use post-commit publish with error propagation
Replace transaction.OnCommit fire-and-forget publish with a postCommit closure pattern that defers event publishing until after transaction commits while still propagating publish errors to the caller. Also fix large payload test to use JSONEq for Postgres jsonb normalization. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9483519 commit 99b4c9b

File tree

2 files changed

+30
-25
lines changed

2 files changed

+30
-25
lines changed

openmeter/app/stripe/invoicesync/adapter/adapter_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,9 @@ func TestCreateSyncPlan_LargePayload(t *testing.T) {
506506
fetched, err := setup.adapter.GetSyncPlan(ctx, plan.ID)
507507
require.NoError(t, err)
508508
require.Len(t, fetched.Operations, 1)
509-
assert.Equal(t, rawPayload, []byte(fetched.Operations[0].Payload))
509+
assert.JSONEq(t, string(rawPayload), string(fetched.Operations[0].Payload))
510+
// Verify no truncation: fetched payload should be at least as large as the input.
511+
assert.GreaterOrEqual(t, len(fetched.Operations[0].Payload), len(rawPayload))
510512
}
511513

512514
// TestCreateSyncPlan_DuplicateIdempotencyKey verifies that creating two plans that happen to share

openmeter/app/stripe/invoicesync/handler.go

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,14 @@ func (h *Handler) Handle(ctx context.Context, event *ExecuteSyncPlanEvent) error
130130
return fmt.Errorf("sync plan %s has no app ID", plan.ID)
131131
}
132132

133+
// postCommit holds a publish call that must run after the transaction commits.
134+
// By capturing it here and executing after RunWithNoValue returns, we ensure
135+
// the DB state is visible before the event fires, while still propagating errors.
136+
var postCommit func() error
137+
133138
// Execute within a transaction with an advisory lock to prevent parallel execution
134139
// of multiple plans for the same invoice (e.g., draft and issuing plans).
135-
return transaction.RunWithNoValue(ctx, h.adapter, func(ctx context.Context) error {
140+
if err := transaction.RunWithNoValue(ctx, h.adapter, func(ctx context.Context) error {
136141
// Acquire advisory lock scoped to this invoice
137142
if err := h.lockFunc(ctx, event.Namespace, event.InvoiceID); err != nil {
138143
if errors.Is(err, ErrSyncPlanLocked) {
@@ -204,20 +209,14 @@ func (h *Handler) Handle(ctx context.Context, event *ExecuteSyncPlanEvent) error
204209
}
205210

206211
if !result.Done {
207-
transaction.OnCommit(ctx, func(ctx context.Context) {
208-
if err := h.publisher.Publish(ctx, ExecuteSyncPlanEvent{
212+
postCommit = func() error {
213+
return h.publisher.Publish(ctx, ExecuteSyncPlanEvent{
209214
PlanID: event.PlanID,
210215
InvoiceID: event.InvoiceID,
211216
Namespace: event.Namespace,
212217
CustomerID: event.CustomerID,
213-
}); err != nil {
214-
h.logger.ErrorContext(ctx, "failed to publish next sync plan event",
215-
"plan_id", event.PlanID,
216-
"invoice_id", event.InvoiceID,
217-
"error", err,
218-
)
219-
}
220-
})
218+
})
219+
}
221220
return nil
222221
}
223222

@@ -232,12 +231,21 @@ func (h *Handler) Handle(ctx context.Context, event *ExecuteSyncPlanEvent) error
232231
return fmt.Errorf("refreshing sync plan: %w", err)
233232
}
234233

235-
return h.handlePlanCompletion(ctx, event, plan)
236-
})
234+
return h.handlePlanCompletion(ctx, event, plan, &postCommit)
235+
}); err != nil {
236+
return err
237+
}
238+
239+
if postCommit != nil {
240+
return postCommit()
241+
}
242+
243+
return nil
237244
}
238245

239246
// handlePlanCompletion writes results back to the invoice and triggers advancement.
240-
func (h *Handler) handlePlanCompletion(ctx context.Context, event *ExecuteSyncPlanEvent, plan *SyncPlan) error {
247+
// The postCommit parameter captures any publish that must happen after the transaction commits.
248+
func (h *Handler) handlePlanCompletion(ctx context.Context, event *ExecuteSyncPlanEvent, plan *SyncPlan, postCommit *func() error) error {
241249
invoiceID := billing.InvoiceID{
242250
Namespace: event.Namespace,
243251
ID: event.InvoiceID,
@@ -299,18 +307,13 @@ func (h *Handler) handlePlanCompletion(ctx context.Context, event *ExecuteSyncPl
299307
}
300308

301309
case SyncPlanPhaseDelete:
302-
// For delete, we just need to advance the invoice
303-
transaction.OnCommit(ctx, func(ctx context.Context) {
304-
if err := h.publisher.Publish(ctx, billing.AdvanceStandardInvoiceEvent{
310+
// For delete, we just need to advance the invoice after the transaction commits.
311+
*postCommit = func() error {
312+
return h.publisher.Publish(ctx, billing.AdvanceStandardInvoiceEvent{
305313
Invoice: invoiceID,
306314
CustomerID: event.CustomerID,
307-
}); err != nil {
308-
h.logger.ErrorContext(ctx, "failed to publish advance event after delete sync",
309-
"invoice_id", invoiceID.ID,
310-
"error", err,
311-
)
312-
}
313-
})
315+
})
316+
}
314317
return nil
315318

316319
default:

0 commit comments

Comments
 (0)