Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go.work
go.work.sum

__debug_bin*
*.test

openmeter.log

Expand Down
22 changes: 21 additions & 1 deletion app/common/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
appservice "github.com/openmeterio/openmeter/openmeter/app/service"
appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe"
appstripeadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/adapter"
"github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync"
invoicesyncadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync/adapter"
invoicesyncservice "github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync/service"
appstripeservice "github.com/openmeterio/openmeter/openmeter/app/stripe/service"
"github.com/openmeterio/openmeter/openmeter/customer"
entdb "github.com/openmeterio/openmeter/openmeter/ent/db"
Expand All @@ -28,6 +31,8 @@ import (
var App = wire.NewSet(
NewAppRegistry,
NewAppService,
NewSyncPlanAdapter,
NewSyncPlanService,
NewAppStripeService,
NewAppSandboxFactory,
NewAppSandboxProvisioner,
Expand All @@ -54,7 +59,21 @@ func NewAppService(
})
}

func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service, billingRegistry BillingRegistry, publisher eventbus.Publisher) (appstripe.Service, error) {
func NewSyncPlanAdapter(db *entdb.Client) (*invoicesyncadapter.Adapter, error) {
return invoicesyncadapter.New(invoicesyncadapter.Config{
Client: db,
})
}

func NewSyncPlanService(adapter *invoicesyncadapter.Adapter, publisher eventbus.Publisher, logger *slog.Logger) (invoicesync.Service, error) {
return invoicesyncservice.New(invoicesyncservice.Config{
Adapter: adapter,
Publisher: publisher,
Logger: logger.With("component", "invoicesync-service"),
})
}

func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig config.AppsConfiguration, appService app.Service, customerService customer.Service, secretService secret.Service, billingRegistry BillingRegistry, publisher eventbus.Publisher, syncPlanService invoicesync.Service) (appstripe.Service, error) {
appStripeAdapter, err := appstripeadapter.New(appstripeadapter.Config{
Client: db,
AppService: appService,
Expand Down Expand Up @@ -87,6 +106,7 @@ func NewAppStripeService(logger *slog.Logger, db *entdb.Client, appsConfig confi
DisableWebhookRegistration: appsConfig.Stripe.DisableWebhookRegistration,
Publisher: publisher,
WebhookURLGenerator: webhookGenerator,
SyncPlanService: syncPlanService,
})
}

Expand Down
60 changes: 59 additions & 1 deletion app/common/openmeter_billingworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
"context"
"errors"
"fmt"
"log/slog"
"syscall"
Expand All @@ -11,11 +12,19 @@ import (
"github.com/oklog/run"

"github.com/openmeterio/openmeter/app/config"
"github.com/openmeterio/openmeter/openmeter/app"
appstripe "github.com/openmeterio/openmeter/openmeter/app/stripe"
stripeclient "github.com/openmeterio/openmeter/openmeter/app/stripe/client"
"github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync"
invoicesyncadapter "github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync/adapter"
billingworker "github.com/openmeterio/openmeter/openmeter/billing/worker"
"github.com/openmeterio/openmeter/openmeter/billing/worker/subscriptionsync"
"github.com/openmeterio/openmeter/openmeter/secret"
watermillkafka "github.com/openmeterio/openmeter/openmeter/watermill/driver/kafka"
"github.com/openmeterio/openmeter/openmeter/watermill/eventbus"
"github.com/openmeterio/openmeter/openmeter/watermill/grouphandler"
"github.com/openmeterio/openmeter/openmeter/watermill/router"
"github.com/openmeterio/openmeter/pkg/framework/lockr"
pkgkafka "github.com/openmeterio/openmeter/pkg/kafka"
)

Expand Down Expand Up @@ -96,12 +105,61 @@ func NewBillingWorkerOptions(
}
}

func NewBillingWorker(workerOptions billingworker.WorkerOptions) (*billingworker.Worker, error) {
func NewBillingWorker(
workerOptions billingworker.WorkerOptions,
appService app.Service,
stripeAppService appstripe.Service,
secretService secret.Service,
billingRegistry BillingRegistry,
publisher eventbus.Publisher,
logger *slog.Logger,
syncPlanAdapter *invoicesyncadapter.Adapter,
) (*billingworker.Worker, error) {
worker, err := billingworker.New(workerOptions)
if err != nil {
return nil, fmt.Errorf("failed to initialize worker: %w", err)
}

// Create locker for advisory locks on sync plan execution
syncPlanLocker, err := lockr.NewLocker(&lockr.LockerConfig{
Logger: logger.With("component", "stripe-sync-plan-locker"),
})
if err != nil {
return nil, fmt.Errorf("failed to create sync plan locker: %w", err)
}

// Register sync plan handler for async Stripe invoice sync
syncPlanHandler, err := invoicesync.NewHandler(invoicesync.HandlerConfig{
Adapter: syncPlanAdapter,
AppService: appService,
BillingService: billingRegistry.Billing,
StripeAppService: stripeAppService,
SecretService: secretService,
StripeAppClientFactory: stripeclient.NewStripeAppClient,
Publisher: publisher,
LockFunc: func(ctx context.Context, namespace, invoiceID string) error {
key, err := lockr.NewKey("namespace", namespace, "invoice_sync", invoiceID)
if err != nil {
return fmt.Errorf("creating lock key: %w", err)
}
if err := syncPlanLocker.LockForTX(ctx, key); err != nil {
if errors.Is(err, lockr.ErrLockTimeout) {
return invoicesync.ErrSyncPlanLocked
}
return err
}
return nil
},
Logger: logger.With("component", "stripe-sync-plan"),
})
if err != nil {
return nil, fmt.Errorf("failed to create sync plan handler: %w", err)
}

worker.AddHandler(grouphandler.NewGroupEventHandler(func(ctx context.Context, event *invoicesync.ExecuteSyncPlanEvent) error {
return syncPlanHandler.Handle(ctx, event)
}))

return worker, nil
}

Expand Down
50 changes: 35 additions & 15 deletions cmd/billing-worker/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 23 additions & 1 deletion cmd/jobs/internal/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 23 additions & 1 deletion cmd/server/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions openmeter/app/stripe/entity/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
stripeapp "github.com/openmeterio/openmeter/openmeter/app/stripe"
stripeclient "github.com/openmeterio/openmeter/openmeter/app/stripe/client"
appstripeentity "github.com/openmeterio/openmeter/openmeter/app/stripe/entity"
"github.com/openmeterio/openmeter/openmeter/app/stripe/invoicesync"
"github.com/openmeterio/openmeter/openmeter/billing"
"github.com/openmeterio/openmeter/openmeter/secret"
)
Expand Down Expand Up @@ -41,6 +42,9 @@ type App struct {
StripeAppClientFactory stripeclient.StripeAppClientFactory `json:"-"`
StripeAppService stripeapp.Service `json:"-"`
SecretService secret.Service `json:"-"`

// Sync plan support for async invoice sync
SyncPlanService invoicesync.Service `json:"-"`
}

func (a App) Validate() error {
Expand Down Expand Up @@ -84,6 +88,10 @@ func (a App) Validate() error {
return errors.New("logger is required")
}

if a.SyncPlanService == nil {
return errors.New("sync plan service is required")
}

return nil
}

Expand Down
64 changes: 0 additions & 64 deletions openmeter/app/stripe/entity/app/calculator.go

This file was deleted.

Loading
Loading