Skip to content

Commit f30b268

Browse files
hekikeclaude
andcommitted
feat(app): switch Stripe invoice operations to use invoicesync
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent abc9817 commit f30b268

File tree

18 files changed

+972
-1010
lines changed

18 files changed

+972
-1010
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go.work
1515
go.work.sum
1616

1717
__debug_bin*
18+
*.test
1819

1920
openmeter.log
2021

app/common/app.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ import (
1717
appservice "github.com/openmeterio/openmeter/openmeter/app/service"
1818
appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe"
1919
appstripeadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/adapter"
20+
"github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync"
21+
invoicesyncadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync/adapter"
22+
invoicesyncservice "github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync/service"
2023
appstripeservice "github.com/openmeterio/openmeter/openmeter/app/stripe/service"
2124
"github.com/openmeterio/openmeter/openmeter/billing"
2225
"github.com/openmeterio/openmeter/openmeter/customer"
@@ -29,6 +32,8 @@ import (
2932
var App = wire.NewSet(
3033
NewAppRegistry,
3134
NewAppService,
35+
NewSyncPlanAdapter,
36+
NewSyncPlanService,
3237
NewAppStripeService,
3338
NewAppSandboxFactory,
3439
NewAppSandboxProvisioner,
@@ -55,7 +60,21 @@ func NewAppService(
5560
})
5661
}
5762

58-
func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service, billingService billing.Service, publisher eventbus.Publisher) (appstripe.Service, error) {
63+
func NewSyncPlanAdapter(db *entdb.Client) (*invoicesyncadapter.Adapter, error) {
64+
return invoicesyncadapter.New(invoicesyncadapter.Config{
65+
Client: db,
66+
})
67+
}
68+
69+
func NewSyncPlanService(adapter *invoicesyncadapter.Adapter, publisher eventbus.Publisher, logger *slog.Logger) (invoicesync.Service, error) {
70+
return invoicesyncservice.New(invoicesyncservice.Config{
71+
Adapter: adapter,
72+
Publisher: publisher,
73+
Logger: logger.With("component", "invoicesync-service"),
74+
})
75+
}
76+
77+
func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service, billingService billing.Service, publisher eventbus.Publisher, syncPlanService invoicesync.Service) (appstripe.Service, error) {
5978
appStripeAdapter, err := appstripeadapter.New(appstripeadapter.Config{
6079
Client: db,
6180
AppService: appService,
@@ -88,6 +107,7 @@ func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig confi
88107
DisableWebhookRegistration: appsConfig.Stripe.DisableWebhookRegistration,
89108
Publisher: publisher,
90109
WebhookURLGenerator: webhookGenerator,
110+
SyncPlanService: syncPlanService,
91111
})
92112
}
93113

app/common/openmeter_billingworker.go

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package common
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"log/slog"
78
"syscall"
@@ -11,12 +12,20 @@ import (
1112
"github.com/oklog/run"
1213

1314
"github.com/openmeterio/openmeter/app/config"
15+
"github.com/openmeterio/openmeter/openmeter/app"
16+
appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe"
17+
stripeclient "github.com/openmeterio/openmeter/openmeter/app/stripe/client"
18+
"github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync"
19+
invoicesyncadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync/adapter"
1420
"github.com/openmeterio/openmeter/openmeter/billing"
1521
billingworker "github.com/openmeterio/openmeter/openmeter/billing/worker"
1622
"github.com/openmeterio/openmeter/openmeter/billing/worker/subscriptionsync"
23+
"github.com/openmeterio/openmeter/openmeter/secret"
1724
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
1825
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
26+
"github.com/openmeterio/openmeter/openmeter/watermill/grouphandler"
1927
"github.com/openmeterio/openmeter/openmeter/watermill/router"
28+
"github.com/openmeterio/openmeter/pkg/framework/lockr"
2029
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
2130
)
2231

@@ -97,12 +106,61 @@ func NewBillingWorkerOptions(
97106
}
98107
}
99108

100-
func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error) {
109+
func NewBillingWorker(
110+
workerOptions billingworker.WorkerOptions,
111+
appService app.Service,
112+
stripeAppService appstripe.Service,
113+
secretService secret.Service,
114+
billingService billing.Service,
115+
publisher eventbus.Publisher,
116+
logger *slog.Logger,
117+
syncPlanAdapter *invoicesyncadapter.Adapter,
118+
) (*billingworker.Worker, error) {
101119
worker, err := billingworker.New(workerOptions)
102120
if err != nil {
103121
return nil, fmt.Errorf("failed to initialize worker: %w", err)
104122
}
105123

124+
// Create locker for advisory locks on sync plan execution
125+
syncPlanLocker, err := lockr.NewLocker(&lockr.LockerConfig{
126+
Logger: logger.With("component", "stripe-sync-plan-locker"),
127+
})
128+
if err != nil {
129+
return nil, fmt.Errorf("failed to create sync plan locker: %w", err)
130+
}
131+
132+
// Register sync plan handler for async Stripe invoice sync
133+
syncPlanHandler, err := invoicesync.NewHandler(invoicesync.HandlerConfig{
134+
Adapter: syncPlanAdapter,
135+
AppService: appService,
136+
BillingService: billingService,
137+
StripeAppService: stripeAppService,
138+
SecretService: secretService,
139+
StripeAppClientFactory: stripeclient.NewStripeAppClient,
140+
Publisher: publisher,
141+
LockFunc: func(ctx context.Context, namespace, invoiceID string) error {
142+
key, err := lockr.NewKey("namespace", namespace, "invoice_sync", invoiceID)
143+
if err != nil {
144+
return fmt.Errorf("creating lock key: %w", err)
145+
}
146+
if err := syncPlanLocker.LockForTX(ctx, key); err != nil {
147+
if errors.Is(err, lockr.ErrLockTimeout) {
148+
return invoicesync.ErrSyncPlanLocked
149+
}
150+
return err
151+
}
152+
return nil
153+
},
154+
Logger: logger.With("component", "stripe-sync-plan"),
155+
})
156+
if err != nil {
157+
return nil, fmt.Errorf("failed to create sync plan handler: %w", err)
158+
}
159+
160+
worker.AddHandler(grouphandler.NewGroupEventHandler(func(ctx context.Context, event *invoicesync.ExecuteSyncPlanEvent) error {
161+
return syncPlanHandler.Handle(ctx, event)
162+
}))
163+
106164
return worker, nil
107165
}
108166

cmd/billing-worker/wire_gen.go

Lines changed: 35 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/jobs/internal/wire_gen.go

Lines changed: 23 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/server/wire_gen.go

Lines changed: 23 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,6 +1407,8 @@ github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
14071407
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
14081408
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
14091409
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
1410+
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
1411+
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
14101412
github.com/oliveagle/jsonpath v0.1.0 h1:W0H0fIMZPYED7cyrVdAQ6vETH9vCNMSL9Yh9h5sL3gs=
14111413
github.com/oliveagle/jsonpath v0.1.0/go.mod h1:eqOVx5Vwu4gd2mmMZvVZsgIqNSaW3xxRThUJ0k/TPk4=
14121414
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=

openmeter/app/stripe/entity/app/app.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
stripeapp "github.com/openmeterio/openmeter/openmeter/app/stripe"
1010
stripeclient "github.com/openmeterio/openmeter/openmeter/app/stripe/client"
1111
appstripeentity "github.com/openmeterio/openmeter/openmeter/app/stripe/entity"
12+
"github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync"
1213
"github.com/openmeterio/openmeter/openmeter/billing"
1314
"github.com/openmeterio/openmeter/openmeter/secret"
1415
)
@@ -41,6 +42,9 @@ type App struct {
4142
StripeAppClientFactory stripeclient.StripeAppClientFactory `json:"-"`
4243
StripeAppService stripeapp.Service `json:"-"`
4344
SecretService secret.Service `json:"-"`
45+
46+
// Sync plan support for async invoice sync
47+
SyncPlanService invoicesync.Service `json:"-"`
4448
}
4549

4650
func (a App) Validate() error {
@@ -84,6 +88,10 @@ func (a App) Validate() error {
8488
return errors.New("logger is required")
8589
}
8690

91+
if a.SyncPlanService == nil {
92+
return errors.New("sync plan service is required")
93+
}
94+
8795
return nil
8896
}
8997

0 commit comments

Comments
 (0)