Skip to content

Commit f13af6c

Browse files
hekikeclaude
andcommitted
fix(invoicesync): use post-commit publish with error propagation
Use transaction.Run to return a postTxEvent from the transaction body, then publish after commit using the original (non-tx) context. This defers event publishing until DB state is visible while keeping publish errors in the normal return flow. Also fix large payload test to use JSONEq for Postgres jsonb normalization. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 9483519 commit f13af6c

2 files changed

Lines changed: 62 additions & 58 deletions

File tree

openmeter/app/stripe/invoicesync/adapter/adapter_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,9 @@ func TestCreateSyncPlan_LargePayload(t *testing.T) {
506506
fetched, err := setup.adapter.GetSyncPlan(ctx, plan.ID)
507507
require.NoError(t, err)
508508
require.Len(t, fetched.Operations, 1)
509-
assert.Equal(t, rawPayload, []byte(fetched.Operations[0].Payload))
509+
assert.JSONEq(t, string(rawPayload), string(fetched.Operations[0].Payload))
510+
// Verify no truncation: fetched payload should be at least as large as the input.
511+
assert.GreaterOrEqual(t, len(fetched.Operations[0].Payload), len(rawPayload))
510512
}
511513

512514
// TestCreateSyncPlan_DuplicateIdempotencyKey verifies that creating two plans that happen to share

openmeter/app/stripe/invoicesync/handler.go

Lines changed: 59 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,17 @@ import (
1414
"github.com/openmeterio/openmeter/openmeter/secret"
1515
secretentity "github.com/openmeterio/openmeter/openmeter/secret/entity"
1616
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
17+
"github.com/openmeterio/openmeter/openmeter/watermill/marshaler"
1718
"github.com/openmeterio/openmeter/pkg/clock"
1819
"github.com/openmeterio/openmeter/pkg/framework/transaction"
1920
)
2021

22+
// postTxEvent holds an event that must be published after the transaction commits.
23+
// Using a concrete type instead of closures avoids capturing the transaction context.
24+
type postTxEvent struct {
25+
event marshaler.Event // nil means nothing to publish
26+
}
27+
2128
// LockFunc acquires an advisory lock for the given invoice within the current transaction context.
2229
// This ensures only one sync plan executes at a time per invoice.
2330
// It should block until the lock is available, or return ErrSyncPlanLocked if the lock is
@@ -132,44 +139,45 @@ func (h *Handler) Handle(ctx context.Context, event *ExecuteSyncPlanEvent) error
132139

133140
// Execute within a transaction with an advisory lock to prevent parallel execution
134141
// of multiple plans for the same invoice (e.g., draft and issuing plans).
135-
return transaction.RunWithNoValue(ctx, h.adapter, func(ctx context.Context) error {
142+
// Returns a postTxEvent carrying any event that must be published after commit.
143+
result, err := transaction.Run(ctx, h.adapter, func(ctx context.Context) (postTxEvent, error) {
136144
// Acquire advisory lock scoped to this invoice
137145
if err := h.lockFunc(ctx, event.Namespace, event.InvoiceID); err != nil {
138146
if errors.Is(err, ErrSyncPlanLocked) {
139147
logger.InfoContext(ctx, "invoice sync locked by another plan, skipping")
140-
return nil
148+
return postTxEvent{}, nil
141149
}
142-
return fmt.Errorf("acquiring sync plan lock: %w", err)
150+
return postTxEvent{}, fmt.Errorf("acquiring sync plan lock: %w", err)
143151
}
144152

145153
// Re-fetch plan inside lock — another worker may have advanced it
146154
plan, err = h.adapter.GetSyncPlan(ctx, event.PlanID)
147155
if err != nil {
148-
return fmt.Errorf("getting sync plan under lock: %w", err)
156+
return postTxEvent{}, fmt.Errorf("getting sync plan under lock: %w", err)
149157
}
150158
if plan == nil || plan.Status == PlanStatusCompleted || plan.Status == PlanStatusFailed {
151-
return nil
159+
return postTxEvent{}, nil
152160
}
153161

154162
// Check if a newer plan exists for this invoice — if so, this plan has been
155163
// superseded and should not continue executing (even if cancelAllActivePlans
156164
// already marked it as failed, the events may already be in-flight).
157165
superseded, err := h.isSuperseded(ctx, plan)
158166
if err != nil {
159-
return fmt.Errorf("checking if plan is superseded: %w", err)
167+
return postTxEvent{}, fmt.Errorf("checking if plan is superseded: %w", err)
160168
}
161169
if superseded {
162170
logger.InfoContext(ctx, "plan superseded by a newer plan, canceling")
163171
if err := h.adapter.FailPlan(ctx, plan.ID, "superseded by newer plan"); err != nil {
164172
logger.ErrorContext(ctx, "failed to cancel superseded plan", "error", err)
165173
}
166-
return nil
174+
return postTxEvent{}, nil
167175
}
168176

169177
// Create Stripe client
170178
stripeClient, err := h.createStripeClient(ctx, event.Namespace, plan)
171179
if err != nil {
172-
return fmt.Errorf("creating stripe client: %w", err)
180+
return postTxEvent{}, fmt.Errorf("creating stripe client: %w", err)
173181
}
174182

175183
// Execute next operation
@@ -178,9 +186,9 @@ func (h *Handler) Handle(ctx context.Context, event *ExecuteSyncPlanEvent) error
178186
Logger: logger,
179187
}
180188

181-
result, err := executor.ExecuteNextOperation(ctx, stripeClient, plan)
189+
execResult, err := executor.ExecuteNextOperation(ctx, stripeClient, plan)
182190
if err != nil {
183-
return err
191+
return postTxEvent{}, err
184192
}
185193

186194
// Write back external IDs immediately so the invoice always reflects Stripe state.
@@ -189,55 +197,57 @@ func (h *Handler) Handle(ctx context.Context, event *ExecuteSyncPlanEvent) error
189197
//
190198
// On failure this rolls back the transaction (including CompleteOperation), causing
191199
// Kafka to redeliver. The idempotency key ensures no duplicate Stripe API calls.
192-
if result.InvoicingExternalID != nil || len(result.LineExternalIDs) > 0 || len(result.LineDiscountExternalIDs) > 0 {
200+
if execResult.InvoicingExternalID != nil || len(execResult.LineExternalIDs) > 0 || len(execResult.LineDiscountExternalIDs) > 0 {
193201
if err := h.billingService.SyncExternalIDs(ctx, billing.SyncExternalIDsInput{
194202
Invoice: billing.InvoiceID{
195203
Namespace: event.Namespace,
196204
ID: event.InvoiceID,
197205
},
198-
InvoicingExternalID: result.InvoicingExternalID,
199-
LineExternalIDs: result.LineExternalIDs,
200-
LineDiscountExternalIDs: result.LineDiscountExternalIDs,
206+
InvoicingExternalID: execResult.InvoicingExternalID,
207+
LineExternalIDs: execResult.LineExternalIDs,
208+
LineDiscountExternalIDs: execResult.LineDiscountExternalIDs,
201209
}); err != nil {
202-
return fmt.Errorf("syncing external IDs: %w", err)
210+
return postTxEvent{}, fmt.Errorf("syncing external IDs: %w", err)
203211
}
204212
}
205213

206-
if !result.Done {
207-
transaction.OnCommit(ctx, func(ctx context.Context) {
208-
if err := h.publisher.Publish(ctx, ExecuteSyncPlanEvent{
209-
PlanID: event.PlanID,
210-
InvoiceID: event.InvoiceID,
211-
Namespace: event.Namespace,
212-
CustomerID: event.CustomerID,
213-
}); err != nil {
214-
h.logger.ErrorContext(ctx, "failed to publish next sync plan event",
215-
"plan_id", event.PlanID,
216-
"invoice_id", event.InvoiceID,
217-
"error", err,
218-
)
219-
}
220-
})
221-
return nil
214+
if !execResult.Done {
215+
return postTxEvent{event: ExecuteSyncPlanEvent{
216+
PlanID: event.PlanID,
217+
InvoiceID: event.InvoiceID,
218+
Namespace: event.Namespace,
219+
CustomerID: event.CustomerID,
220+
}}, nil
222221
}
223222

224-
if result.Failed {
225-
logger.ErrorContext(ctx, "sync plan failed", "error", result.FailError)
226-
return h.handlePlanFailure(ctx, event, plan.Phase, result.FailError)
223+
if execResult.Failed {
224+
logger.ErrorContext(ctx, "sync plan failed", "error", execResult.FailError)
225+
return postTxEvent{}, h.handlePlanFailure(ctx, event, plan.Phase, execResult.FailError)
227226
}
228227

229228
// Refresh plan to get completed operation responses
230229
plan, err = h.adapter.GetSyncPlan(ctx, event.PlanID)
231230
if err != nil {
232-
return fmt.Errorf("refreshing sync plan: %w", err)
231+
return postTxEvent{}, fmt.Errorf("refreshing sync plan: %w", err)
233232
}
234233

235234
return h.handlePlanCompletion(ctx, event, plan)
236235
})
236+
if err != nil {
237+
return err
238+
}
239+
240+
// Publish after commit so DB state is visible, using the original (non-tx) context.
241+
if result.event != nil {
242+
return h.publisher.Publish(ctx, result.event)
243+
}
244+
245+
return nil
237246
}
238247

239248
// handlePlanCompletion writes results back to the invoice and triggers advancement.
240-
func (h *Handler) handlePlanCompletion(ctx context.Context, event *ExecuteSyncPlanEvent, plan *SyncPlan) error {
249+
// Returns a postTxEvent if an event must be published after the transaction commits.
250+
func (h *Handler) handlePlanCompletion(ctx context.Context, event *ExecuteSyncPlanEvent, plan *SyncPlan) (postTxEvent, error) {
241251
invoiceID := billing.InvoiceID{
242252
Namespace: event.Namespace,
243253
ID: event.InvoiceID,
@@ -248,7 +258,7 @@ func (h *Handler) handlePlanCompletion(ctx context.Context, event *ExecuteSyncPl
248258
case SyncPlanPhaseDraft:
249259
upsertResult, err := BuildUpsertResultFromPlan(plan)
250260
if err != nil {
251-
return fmt.Errorf("building upsert result: %w", err)
261+
return postTxEvent{}, fmt.Errorf("building upsert result: %w", err)
252262
}
253263

254264
_, err = h.billingService.SyncDraftInvoice(ctx, billing.SyncDraftStandardInvoiceInput{
@@ -268,15 +278,15 @@ func (h *Handler) handlePlanCompletion(ctx context.Context, event *ExecuteSyncPl
268278
"plan_id", plan.ID,
269279
"error", err,
270280
)
271-
return nil
281+
return postTxEvent{}, nil
272282
}
273-
return fmt.Errorf("syncing draft invoice: %w", err)
283+
return postTxEvent{}, fmt.Errorf("syncing draft invoice: %w", err)
274284
}
275285

276286
case SyncPlanPhaseIssuing:
277287
finalizeResult, err := BuildFinalizeResultFromPlan(plan)
278288
if err != nil {
279-
return fmt.Errorf("building finalize result: %w", err)
289+
return postTxEvent{}, fmt.Errorf("building finalize result: %w", err)
280290
}
281291

282292
_, err = h.billingService.SyncIssuingInvoice(ctx, billing.SyncIssuingStandardInvoiceInput{
@@ -293,31 +303,23 @@ func (h *Handler) handlePlanCompletion(ctx context.Context, event *ExecuteSyncPl
293303
"plan_id", plan.ID,
294304
"error", err,
295305
)
296-
return nil
306+
return postTxEvent{}, nil
297307
}
298-
return fmt.Errorf("syncing issuing invoice: %w", err)
308+
return postTxEvent{}, fmt.Errorf("syncing issuing invoice: %w", err)
299309
}
300310

301311
case SyncPlanPhaseDelete:
302-
// For delete, we just need to advance the invoice
303-
transaction.OnCommit(ctx, func(ctx context.Context) {
304-
if err := h.publisher.Publish(ctx, billing.AdvanceStandardInvoiceEvent{
305-
Invoice: invoiceID,
306-
CustomerID: event.CustomerID,
307-
}); err != nil {
308-
h.logger.ErrorContext(ctx, "failed to publish advance event after delete sync",
309-
"invoice_id", invoiceID.ID,
310-
"error", err,
311-
)
312-
}
313-
})
314-
return nil
312+
// For delete, we just need to advance the invoice after the transaction commits.
313+
return postTxEvent{event: billing.AdvanceStandardInvoiceEvent{
314+
Invoice: invoiceID,
315+
CustomerID: event.CustomerID,
316+
}}, nil
315317

316318
default:
317-
return fmt.Errorf("unknown sync plan phase %q for plan %s", plan.Phase, plan.ID)
319+
return postTxEvent{}, fmt.Errorf("unknown sync plan phase %q for plan %s", plan.Phase, plan.ID)
318320
}
319321

320-
return nil
322+
return postTxEvent{}, nil
321323
}
322324

323325
// handlePlanFailure triggers the invoice into a sync-failed state, surfacing the Stripe error

0 commit comments

Comments
 (0)