diff --git a/openmeter/subscription/service/service.go b/openmeter/subscription/service/service.go index 7c40a5305d..964ae60ce3 100644 --- a/openmeter/subscription/service/service.go +++ b/openmeter/subscription/service/service.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/samber/lo" + "go.opentelemetry.io/otel/attribute" "github.com/openmeterio/openmeter/openmeter/customer" "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" @@ -98,6 +99,11 @@ func (s *service) lockCustomer(ctx context.Context, customerId string) error { func (s *service) Create(ctx context.Context, namespace string, spec subscription.SubscriptionSpec) (subscription.Subscription, error) { ctx = subscription.NewSubscriptionOperationContext(ctx) + setSpanAttrs(ctx, + attribute.String("subscription.namespace", namespace), + attribute.String("subscription.operation", "create"), + ) + setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.input", spec)...) def := subscription.Subscription{} @@ -193,6 +199,12 @@ func (s *service) Create(ctx context.Context, namespace string, spec subscriptio func (s *service) Update(ctx context.Context, subscriptionID models.NamespacedID, newSpec subscription.SubscriptionSpec) (subscription.Subscription, error) { ctx = subscription.NewSubscriptionOperationContext(ctx) + setSpanAttrs(ctx, + attribute.String("subscription.namespace", subscriptionID.Namespace), + attribute.String("subscription.id", subscriptionID.ID), + attribute.String("subscription.operation", "update"), + ) + setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.input", newSpec)...) var def subscription.Subscription @@ -201,6 +213,8 @@ func (s *service) Update(ctx context.Context, subscriptionID models.NamespacedID if err != nil { return def, fmt.Errorf("failed to get view: %w", err) } + setSpanAttrs(ctx, addViewAttrs([]attribute.KeyValue{}, "subscription.view.current", view)...) + setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.current", view.Spec)...) if err := s.validateUpdate(ctx, view, newSpec); err != nil { return def, err @@ -227,6 +241,8 @@ func (s *service) Update(ctx context.Context, subscriptionID models.NamespacedID if err != nil { return subs, err } + setSpanAttrs(ctx, addViewAttrs([]attribute.KeyValue{}, "subscription.view.updated", updatedView)...) + setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.updated", updatedView.Spec)...) err = errors.Join(lo.Map(s.Hooks, func(v subscription.SubscriptionCommandHook, _ int) error { return v.AfterUpdate(ctx, updatedView) diff --git a/openmeter/subscription/service/sync.go b/openmeter/subscription/service/sync.go index 9e9c6efd20..7d8517feb4 100644 --- a/openmeter/subscription/service/sync.go +++ b/openmeter/subscription/service/sync.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/samber/lo" + "go.opentelemetry.io/otel/attribute" "github.com/openmeterio/openmeter/openmeter/entitlement" "github.com/openmeterio/openmeter/openmeter/subscription" @@ -26,8 +27,19 @@ import ( // TODO: localize error so phase and item keys are always included (alongside subscription reference) // TODO (OM-1074): clean up this control flow func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, newSpec subscription.SubscriptionSpec) (subscription.Subscription, error) { + setSpanAttrs(ctx, + attribute.String("subscription.namespace", view.Subscription.Namespace), + attribute.String("subscription.id", view.Subscription.ID), + attribute.String("subscription.sync.operation", "spec_sync"), + ) + setSpanAttrs(ctx, addViewAttrs([]attribute.KeyValue{}, "subscription.view.before", view)...) + setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.before", view.Spec)...) + setSpanAttrs(ctx, addSpecAttrs([]attribute.KeyValue{}, "subscription.spec.target", newSpec)...) + return transaction.Run(ctx, s.TransactionManager, func(ctx context.Context) (subscription.Subscription, error) { var def subscription.Subscription + var phaseDeleted, phaseCreated int + var itemDeleted, itemCreated int // Some sanity checks for good measure if view.Subscription.CustomerId != newSpec.CustomerId { @@ -63,6 +75,12 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, if err := s.deletePhase(ctx, currentPhaseView); err != nil { return def, fmt.Errorf("failed to delete phase: %w", err) } + phaseDeleted++ + addSpanEvent(ctx, "subscription.sync.phase.delete", + attribute.String("phase.key", currentPhaseView.SubscriptionPhase.Key), + attribute.String("phase.id", currentPhaseView.SubscriptionPhase.ID), + attribute.String("reason", "removed"), + ) dirty.mark(subscription.NewPhasePath(currentPhaseView.SubscriptionPhase.Key)) @@ -99,6 +117,12 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, if err := s.deletePhase(ctx, currentPhaseView); err != nil { return def, fmt.Errorf("failed to delete phase: %w", err) } + phaseDeleted++ + addSpanEvent(ctx, "subscription.sync.phase.delete", + attribute.String("phase.key", currentPhaseView.SubscriptionPhase.Key), + attribute.String("phase.id", currentPhaseView.SubscriptionPhase.ID), + attribute.String("reason", "changed"), + ) dirty.mark(subscription.NewPhasePath(currentPhaseView.SubscriptionPhase.Key)) @@ -124,6 +148,13 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, if err := s.deleteItem(ctx, currentItemView); err != nil { return def, fmt.Errorf("failed to delete item: %w", err) } + itemDeleted++ + addSpanEvent(ctx, "subscription.sync.item.delete", + attribute.String("phase.key", currentItemView.Spec.PhaseKey), + attribute.String("item.key", currentItemView.Spec.ItemKey), + attribute.String("item.id", currentItemView.SubscriptionItem.ID), + attribute.String("reason", "key_removed"), + ) dirty.mark(subscription.NewItemPath(currentItemView.Spec.PhaseKey, currentItemView.Spec.ItemKey)) } @@ -140,6 +171,14 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, if err := s.deleteItem(ctx, currentItemView); err != nil { return def, fmt.Errorf("failed to delete item: %w", err) } + itemDeleted++ + addSpanEvent(ctx, "subscription.sync.item.delete", + attribute.String("phase.key", currentItemView.Spec.PhaseKey), + attribute.String("item.key", currentItemView.Spec.ItemKey), + attribute.String("item.id", currentItemView.SubscriptionItem.ID), + attribute.Int("item.version", currentItemIdx), + attribute.String("reason", "version_removed"), + ) dirty.mark(subscription.NewItemVersionPath(currentItemView.Spec.PhaseKey, currentItemView.Spec.ItemKey, currentItemIdx)) @@ -191,6 +230,14 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, if err := s.deleteItem(ctx, currentItemView); err != nil { return def, fmt.Errorf("failed to delete item: %w", err) } + itemDeleted++ + addSpanEvent(ctx, "subscription.sync.item.delete", + attribute.String("phase.key", currentItemView.Spec.PhaseKey), + attribute.String("item.key", currentItemView.Spec.ItemKey), + attribute.String("item.id", currentItemView.SubscriptionItem.ID), + attribute.Int("item.version", currentItemIdx), + attribute.String("reason", "changed"), + ) dirty.mark(subscription.NewItemVersionPath(currentItemView.Spec.PhaseKey, currentItemView.Spec.ItemKey, currentItemIdx)) @@ -228,6 +275,7 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, if _, err := s.createPhase(ctx, view.Customer, *matchingPhaseFromNewSpec, view.Subscription, newPhaseCadence); err != nil { return def, fmt.Errorf("failed to create phase: %w", err) } + phaseCreated++ // There's nothing more to be done for this phase, so lets skip to the next one continue @@ -265,6 +313,7 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, }); err != nil { return def, fmt.Errorf("failed to create item: %w", err) } + itemCreated++ // There's nothing more to be done for this item, so lets skip to the next one continue @@ -294,6 +343,7 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, if _, err := s.createPhase(ctx, view.Customer, *phase, view.Subscription, phaseCadence); err != nil { return def, fmt.Errorf("failed to create phase: %w", err) } + phaseCreated++ continue } @@ -318,6 +368,7 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, }); err != nil { return def, fmt.Errorf("failed to create item: %w", err) } + itemCreated++ // There's nothing left to do for this item continue @@ -335,13 +386,31 @@ func (s *service) sync(ctx context.Context, view subscription.SubscriptionView, }); err != nil { return def, fmt.Errorf("failed to create item: %w", err) } + itemCreated++ } } } } // 4. Finally we're done with syncing everything, we should just re-fetch the subscription - return s.Get(ctx, view.Subscription.NamespacedID) + setSpanAttrs(ctx, + attribute.Int("subscription.sync.touched_paths.count", len(dirty)), + attribute.Int("subscription.sync.phases.deleted", phaseDeleted), + attribute.Int("subscription.sync.phases.created", phaseCreated), + attribute.Int("subscription.sync.items.deleted", itemDeleted), + attribute.Int("subscription.sync.items.created", itemCreated), + ) + + sub, err := s.Get(ctx, view.Subscription.NamespacedID) + if err != nil { + return def, err + } + setSpanAttrs(ctx, + attribute.String("subscription.sync.result_id", sub.ID), + attribute.String("subscription.sync.result_namespace", sub.Namespace), + ) + + return sub, nil }) } diff --git a/openmeter/subscription/service/trace.go b/openmeter/subscription/service/trace.go new file mode 100644 index 0000000000..98c9ad9bb5 --- /dev/null +++ b/openmeter/subscription/service/trace.go @@ -0,0 +1,101 @@ +package service + +import ( + "context" + "slices" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/openmeterio/openmeter/openmeter/subscription" +) + +func setSpanAttrs(ctx context.Context, attrs ...attribute.KeyValue) { + span := trace.SpanFromContext(ctx) + if span == nil { + return + } + + span.SetAttributes(attrs...) +} + +func addSpanEvent(ctx context.Context, name string, attrs ...attribute.KeyValue) { + span := trace.SpanFromContext(ctx) + if span == nil { + return + } + + span.AddEvent(name, trace.WithAttributes(attrs...)) +} + +func addSpecAttrs(attrs []attribute.KeyValue, prefix string, spec subscription.SubscriptionSpec) []attribute.KeyValue { + phaseKeys := make([]string, 0, len(spec.Phases)) + itemKeySet := make(map[string]struct{}) + itemVersions := 0 + for k, phase := range spec.Phases { + phaseKeys = append(phaseKeys, k) + for ik, items := range phase.ItemsByKey { + itemKeySet[ik] = struct{}{} + itemVersions += len(items) + } + } + slices.Sort(phaseKeys) + + itemKeys := make([]string, 0, len(itemKeySet)) + for k := range itemKeySet { + itemKeys = append(itemKeys, k) + } + slices.Sort(itemKeys) + + attrs = append(attrs, + attribute.String(prefix+".customer_id", spec.CustomerId), + attribute.Int(prefix+".phases.count", len(spec.Phases)), + attribute.StringSlice(prefix+".phase_keys", phaseKeys), + attribute.Int(prefix+".item_keys.count", len(itemKeySet)), + attribute.StringSlice(prefix+".item_keys", itemKeys), + attribute.Int(prefix+".item_versions.count", itemVersions), + attribute.Bool(prefix+".has_billables", spec.HasBillables()), + attribute.Bool(prefix+".has_metered_billables", spec.HasMeteredBillables()), + attribute.Bool(prefix+".has_entitlements", spec.HasEntitlements()), + ) + + if spec.Plan != nil { + attrs = append(attrs, + attribute.String(prefix+".plan.id", spec.Plan.Id), + attribute.String(prefix+".plan.key", spec.Plan.Key), + attribute.Int(prefix+".plan.version", spec.Plan.Version), + ) + } + + return attrs +} + +func addViewAttrs(attrs []attribute.KeyValue, prefix string, view subscription.SubscriptionView) []attribute.KeyValue { + phaseKeys := make([]string, 0, len(view.Phases)) + itemKeySet := make(map[string]struct{}) + itemVersions := 0 + for _, phase := range view.Phases { + phaseKeys = append(phaseKeys, phase.SubscriptionPhase.Key) + for ik, items := range phase.ItemsByKey { + itemKeySet[ik] = struct{}{} + itemVersions += len(items) + } + } + slices.Sort(phaseKeys) + + itemKeys := make([]string, 0, len(itemKeySet)) + for k := range itemKeySet { + itemKeys = append(itemKeys, k) + } + slices.Sort(itemKeys) + + return append(attrs, + attribute.String(prefix+".subscription_id", view.Subscription.ID), + attribute.String(prefix+".customer_id", view.Subscription.CustomerId), + attribute.Int(prefix+".phases.count", len(view.Phases)), + attribute.StringSlice(prefix+".phase_keys", phaseKeys), + attribute.Int(prefix+".item_keys.count", len(itemKeySet)), + attribute.StringSlice(prefix+".item_keys", itemKeys), + attribute.Int(prefix+".item_versions.count", itemVersions), + ) +} diff --git a/openmeter/subscription/workflow/service/addon.go b/openmeter/subscription/workflow/service/addon.go index 4e63691b71..d7873df3a8 100644 --- a/openmeter/subscription/workflow/service/addon.go +++ b/openmeter/subscription/workflow/service/addon.go @@ -5,9 +5,11 @@ import ( "encoding/json" "errors" "fmt" + "slices" "time" "github.com/samber/lo" + "go.opentelemetry.io/otel/attribute" "github.com/openmeterio/openmeter/openmeter/subscription" subscriptionaddon "github.com/openmeterio/openmeter/openmeter/subscription/addon" @@ -22,6 +24,20 @@ func (s *service) AddAddon(ctx context.Context, subscriptionID models.Namespaced var def1 subscription.SubscriptionView var def2 subscriptionaddon.SubscriptionAddon + setSpanAttrs(ctx, + attribute.String("subscription.namespace", subscriptionID.Namespace), + attribute.String("subscription.id", subscriptionID.ID), + attribute.String("workflow.operation", "add_addon"), + attribute.String("addon.id", addonInp.AddonID), + attribute.Int("addon.initial_quantity", addonInp.InitialQuantity), + attribute.Bool("subscription.timing.has_custom", addonInp.Timing.Custom != nil), + attribute.String("subscription.timing.enum", lo.TernaryF(addonInp.Timing.Enum != nil, func() string { + return string(*addonInp.Timing.Enum) + }, func() string { + return "" + })), + ) + if err := addonInp.Validate(); err != nil { return def1, def2, models.NewGenericValidationError(err) } @@ -60,6 +76,7 @@ func (s *service) AddAddon(ctx context.Context, subscriptionID models.Namespaced if err != nil { return def, fmt.Errorf("failed to resolve timing: %w", err) } + setSpanAttrs(ctx, attribute.String("subscription.edit_time", editTime.UTC().Format(time.RFC3339Nano))) if !subView.Subscription.IsActiveAt(editTime) { return def, models.NewGenericValidationError(fmt.Errorf("subscription is not active at the time of adding the addon")) @@ -69,6 +86,13 @@ func (s *service) AddAddon(ctx context.Context, subscriptionID models.Namespaced if err != nil { return def, fmt.Errorf("failed to get diffable from addons: %w", err) } + setSpanAttrs(ctx, + addSubscriptionViewAttrs([]attribute.KeyValue{}, "subscription.view.before", subView)..., + ) + setSpanAttrs(ctx, + addSubscriptionAddonsAttrs([]attribute.KeyValue{}, "subscription.addons.before", subsAdds.Items)..., + ) + setSpanAttrs(ctx, attribute.Int("subscription.addons.before.diffables.count", len(diffs))) if len(diffs) != len(subsAdds.Items) { return def, fmt.Errorf("failed to get diffable from addons, got %d addons but %d diffs", len(subsAdds.Items), len(diffs)) @@ -119,6 +143,20 @@ func (s *service) ChangeAddonQuantity(ctx context.Context, subscriptionID models var def1 subscription.SubscriptionView var def2 subscriptionaddon.SubscriptionAddon + setSpanAttrs(ctx, + attribute.String("subscription.namespace", subscriptionID.Namespace), + attribute.String("subscription.id", subscriptionID.ID), + attribute.String("workflow.operation", "change_addon_quantity"), + attribute.String("subscription_addon.id", changeInp.SubscriptionAddonID.ID), + attribute.Int("addon.quantity.new", changeInp.Quantity), + attribute.Bool("subscription.timing.has_custom", changeInp.Timing.Custom != nil), + attribute.String("subscription.timing.enum", lo.TernaryF(changeInp.Timing.Enum != nil, func() string { + return string(*changeInp.Timing.Enum) + }, func() string { + return "" + })), + ) + if subscriptionID.Namespace != changeInp.SubscriptionAddonID.Namespace { return def1, def2, models.NewGenericValidationError(fmt.Errorf("subscription and subscription addon are in different namespaces")) } @@ -156,6 +194,7 @@ func (s *service) ChangeAddonQuantity(ctx context.Context, subscriptionID models if err != nil { return def, fmt.Errorf("failed to resolve timing: %w", err) } + setSpanAttrs(ctx, attribute.String("subscription.edit_time", editTime.UTC().Format(time.RFC3339Nano))) subsAdd, err := s.AddonService.ChangeQuantity(ctx, changeInp.SubscriptionAddonID, subscriptionaddon.CreateSubscriptionAddonQuantityInput{ Quantity: changeInp.Quantity, @@ -171,6 +210,15 @@ func (s *service) ChangeAddonQuantity(ctx context.Context, subscriptionID models if err != nil { return def, err } + setSpanAttrs(ctx, + addSubscriptionViewAttrs([]attribute.KeyValue{}, "subscription.view.before", subView)..., + ) + setSpanAttrs(ctx, + addSubscriptionAddonsAttrs([]attribute.KeyValue{}, "subscription.addons.before", subsAddsBefore.Items)..., + ) + setSpanAttrs(ctx, + addSubscriptionAddonsAttrs([]attribute.KeyValue{}, "subscription.addons.after", subsAddsAfter.Items)..., + ) subView, err = s.syncWithAddons(ctx, subView, subsAddsBefore.Items, subsAddsAfter.Items, editTime) if err != nil { @@ -193,6 +241,18 @@ func (s *service) syncWithAddons( after []subscriptionaddon.SubscriptionAddon, currentTime time.Time, ) (subscription.SubscriptionView, error) { + setSpanAttrs(ctx, + attribute.String("workflow.operation", "sync_with_addons"), + attribute.String("subscription.namespace", view.Subscription.Namespace), + attribute.String("subscription.id", view.Subscription.ID), + attribute.String("subscription.sync.current_time", currentTime.UTC().Format(time.RFC3339Nano)), + ) + setSpanAttrs(ctx, addSubscriptionViewAttrs([]attribute.KeyValue{}, "subscription.view.input", view)...) + setSpanAttrs(ctx, addSubscriptionAddonsAttrs([]attribute.KeyValue{}, "subscription.addons.before", before)...) + setSpanAttrs(ctx, addSubscriptionAddonsAttrs([]attribute.KeyValue{}, "subscription.addons.after", after)...) + emitAddonApplyPlanEvents(ctx, "restore", view, before) + emitAddonApplyPlanEvents(ctx, "apply", view, after) + return transaction.Run(ctx, s.TransactionManager, func(ctx context.Context) (subscription.SubscriptionView, error) { var def subscription.SubscriptionView @@ -228,11 +288,13 @@ func (s *service) syncWithAddons( if err != nil { return def, fmt.Errorf("failed to get diffable from addons: %w", err) } + setSpanAttrs(ctx, attribute.Int("subscription.addons.before.diffables.count", len(restores))) applies, err := asDiffs(view, after) if err != nil { return def, fmt.Errorf("failed to get diffable from addons: %w", err) } + setSpanAttrs(ctx, attribute.Int("subscription.addons.after.diffables.count", len(applies))) if err := spec.ApplyMany(lo.Map(restores, func(d addondiff.Diffable, _ int) subscription.AppliesToSpec { return d.GetRestores() @@ -259,11 +321,75 @@ func (s *service) syncWithAddons( return def, fmt.Errorf("failed to update subscription: %w", err) } + updated, err := s.Service.GetView(ctx, view.Subscription.NamespacedID) + if err != nil { + return def, err + } + setSpanAttrs(ctx, addSubscriptionViewAttrs([]attribute.KeyValue{}, "subscription.view.output", updated)...) + setSpanAttrs(ctx, addSubscriptionSpecAttrs([]attribute.KeyValue{}, "subscription.spec.output", updated.Spec)...) - return s.Service.GetView(ctx, view.Subscription.NamespacedID) + return updated, nil }) } +func emitAddonApplyPlanEvents(ctx context.Context, source string, view subscription.SubscriptionView, addons []subscriptionaddon.SubscriptionAddon) { + order := 0 + + for addonOrder, add := range addons { + affectedByRateCardKey := addondiff.GetAffectedItemIDs(view, add) + affectedRateCardKeys := lo.Keys(affectedByRateCardKey) + slices.Sort(affectedRateCardKeys) + + addSpanEvent(ctx, "subscription.addon.apply.plan", + attribute.String("apply.source", source), + attribute.Int("apply.order", order), + attribute.Int("apply.addon_order", addonOrder), + attribute.String("subscription.namespace", add.Namespace), + attribute.String("subscription.id", add.SubscriptionID), + attribute.String("subscription_addon.id", add.ID), + attribute.String("addon.id", add.Addon.ID), + attribute.StringSlice("addon.affected_ratecard_keys", affectedRateCardKeys), + ) + order++ + + for _, rateCardKey := range affectedRateCardKeys { + addSpanEvent(ctx, "subscription.addon.apply.plan", + attribute.String("apply.source", source), + attribute.Int("apply.order", order), + attribute.String("subscription_addon.id", add.ID), + attribute.String("addon.id", add.Addon.ID), + attribute.String("addon.ratecard_key", rateCardKey), + attribute.StringSlice("subscription.item_ids", affectedByRateCardKey[rateCardKey]), + ) + order++ + } + + instances := add.GetInstances() + for instanceOrder, inst := range instances { + rateCardKeys := lo.Map(inst.RateCards, func(rc subscriptionaddon.SubscriptionAddonRateCard, _ int) string { + return rc.AddonRateCard.Key() + }) + + addSpanEvent(ctx, "subscription.addon.apply.plan", + attribute.String("apply.source", source), + attribute.Int("apply.order", order), + attribute.Int("apply.instance_order", instanceOrder), + attribute.String("subscription_addon.id", inst.ID), + attribute.String("addon.id", inst.Addon.ID), + attribute.Int("addon.quantity", inst.Quantity), + attribute.String("addon.instance_active_from", inst.ActiveFrom.UTC().Format(time.RFC3339Nano)), + attribute.String("addon.instance_active_to", lo.TernaryF(inst.ActiveTo != nil, func() string { + return inst.ActiveTo.UTC().Format(time.RFC3339Nano) + }, func() string { + return "" + })), + attribute.StringSlice("addon.instance_ratecard_keys", rateCardKeys), + ) + order++ + } + } +} + // The sub has addons if it has a non-0 quantity on any of them during its cadence func hasAddons(view subscription.SubscriptionView, addons []subscriptionaddon.SubscriptionAddon) bool { subPer := view.Subscription.CadencedModel.AsPeriod() @@ -289,7 +415,9 @@ func asDiffs(view subscription.SubscriptionView, subsAdds []subscriptionaddon.Su return nil, fmt.Errorf("failed to get diffable from addon: %w", err) } - return lo.Filter(diffs, func(d addondiff.Diffable, _ int) bool { + filtered := lo.Filter(diffs, func(d addondiff.Diffable, _ int) bool { return d != nil - }), nil + }) + + return filtered, nil } diff --git a/openmeter/subscription/workflow/service/subscription.go b/openmeter/subscription/workflow/service/subscription.go index 2af5fe31f2..d5792f4aca 100644 --- a/openmeter/subscription/workflow/service/subscription.go +++ b/openmeter/subscription/workflow/service/subscription.go @@ -2,10 +2,13 @@ package service import ( "context" + "encoding/json" "fmt" "maps" + "time" "github.com/samber/lo" + "go.opentelemetry.io/otel/attribute" "github.com/openmeterio/openmeter/openmeter/customer" "github.com/openmeterio/openmeter/openmeter/subscription" @@ -20,6 +23,17 @@ import ( ) func (s *service) CreateFromPlan(ctx context.Context, inp subscriptionworkflow.CreateSubscriptionWorkflowInput, plan subscription.Plan) (subscription.SubscriptionView, error) { + setSpanAttrs(ctx, + attribute.String("subscription.namespace", inp.Namespace), + attribute.String("workflow.operation", "create_from_plan"), + attribute.Bool("subscription.timing.has_custom", inp.Timing.Custom != nil), + attribute.String("subscription.timing.enum", lo.TernaryF(inp.Timing.Enum != nil, func() string { + return string(*inp.Timing.Enum) + }, func() string { + return "" + })), + ) + return transaction.Run(ctx, s.TransactionManager, func(ctx context.Context) (subscription.SubscriptionView, error) { var def subscription.SubscriptionView @@ -76,6 +90,8 @@ func (s *service) CreateFromPlan(ctx context.Context, inp subscriptionworkflow.C return def, fmt.Errorf("failed to create spec from plan: %w", err) } + setSpanAttrs(ctx, addSubscriptionSpecAttrs([]attribute.KeyValue{}, "subscription.spec", spec)...) + if err := spec.ValidateAlignment(); err != nil { return def, err } @@ -91,6 +107,19 @@ func (s *service) CreateFromPlan(ctx context.Context, inp subscriptionworkflow.C } func (s *service) EditRunning(ctx context.Context, subscriptionID models.NamespacedID, customizations []subscription.Patch, timing subscription.Timing) (subscription.SubscriptionView, error) { + setSpanAttrs(ctx, + attribute.String("subscription.namespace", subscriptionID.Namespace), + attribute.String("subscription.id", subscriptionID.ID), + attribute.String("workflow.operation", "edit_running"), + attribute.Int("subscription.customizations.count", len(customizations)), + attribute.Bool("subscription.timing.has_custom", timing.Custom != nil), + attribute.String("subscription.timing.enum", lo.TernaryF(timing.Enum != nil, func() string { + return string(*timing.Enum) + }, func() string { + return "" + })), + ) + // Finally, let's update the subscription return transaction.Run(ctx, s.TransactionManager, func(ctx context.Context) (subscription.SubscriptionView, error) { // First, let's fetch the current state of the Subscription @@ -154,20 +183,73 @@ func (s *service) EditRunning(ctx context.Context, subscriptionID models.Namespa if err != nil { return subscription.SubscriptionView{}, fmt.Errorf("failed to resolve timing: %w", err) } + setSpanAttrs(ctx, attribute.String("subscription.edit_time", editTime.UTC().Format(time.RFC3339Nano))) // Let's apply the customizations spec := curr.AsSpec() + setSpanAttrs(ctx, addSubscriptionSpecAttrs([]attribute.KeyValue{}, "subscription.spec.before_apply", spec)...) + for idx, p := range customizations { + path := p.Path() + eventAttrs := []attribute.KeyValue{ + attribute.String("apply.source", "subscription.customization"), + attribute.Int("apply.order", idx), + attribute.String("patch.op", string(p.Op())), + attribute.String("patch.path", string(path)), + attribute.String("patch.path.type", string(path.Type())), + attribute.String("patch.phase_key", path.PhaseKey()), + } + if itemKey := path.ItemKey(); itemKey != "" { + eventAttrs = append(eventAttrs, attribute.String("patch.item_key", itemKey)) + } + if itemVersion := path.ItemVersion(); itemVersion >= 0 { + eventAttrs = append(eventAttrs, attribute.Int("patch.item_version", itemVersion)) + } + + // Add cadence override details for add-item patches (critical for debugging sort order issues) + addItemCadenceAttrs := func(inp subscription.SubscriptionItemSpec) { + if inp.ActiveFromOverrideRelativeToPhaseStart != nil { + eventAttrs = append(eventAttrs, attribute.String("patch.item.active_from_override", inp.ActiveFromOverrideRelativeToPhaseStart.ISOString().String())) + } + if inp.ActiveToOverrideRelativeToPhaseStart != nil { + eventAttrs = append(eventAttrs, attribute.String("patch.item.active_to_override", inp.ActiveToOverrideRelativeToPhaseStart.ISOString().String())) + } + } + if ap, ok := p.(patch.PatchAddItem); ok { + addItemCadenceAttrs(ap.CreateInput) + } else if ap, ok := p.(*patch.PatchAddItem); ok { + addItemCadenceAttrs(ap.CreateInput) + } + + addSpanEvent(ctx, "subscription.apply.plan", eventAttrs...) + } + + // TODO: remove after issue is fixed + specBeforeApplyJSON, _ := json.Marshal(spec) + logApplyErr := func(mErr error) { + customizationsJSON, err := json.Marshal(customizations) + if err != nil { + s.Logger.DebugContext(ctx, "failed to marshal customizations for error logging", "error", err) + } + s.Logger.DebugContext(ctx, "failed to apply customizations", + "apply_error", mErr, + "spec_before_apply", specBeforeApplyJSON, + "customizations", customizationsJSON, + "edit_time", editTime, + ) + } err = spec.ApplyMany(lo.Map(customizations, subscription.ToApplies), subscription.ApplyContext{ CurrentTime: editTime, }) if err := subscriptionworkflow.MapSubscriptionErrors(err); err != nil { + logApplyErr(err) return subscription.SubscriptionView{}, fmt.Errorf("failed to apply customizations: %w", err) } if err := spec.ValidateAlignment(); err != nil { return subscription.SubscriptionView{}, err } + setSpanAttrs(ctx, addSubscriptionSpecAttrs([]attribute.KeyValue{}, "subscription.spec.after_apply", spec)...) sub, err := s.Service.Update(ctx, subscriptionID, spec) if err != nil { @@ -179,6 +261,18 @@ func (s *service) EditRunning(ctx context.Context, subscriptionID models.Namespa } func (s *service) ChangeToPlan(ctx context.Context, subscriptionID models.NamespacedID, inp subscriptionworkflow.ChangeSubscriptionWorkflowInput, plan subscription.Plan) (subscription.Subscription, subscription.SubscriptionView, error) { + setSpanAttrs(ctx, + attribute.String("subscription.namespace", subscriptionID.Namespace), + attribute.String("subscription.id", subscriptionID.ID), + attribute.String("workflow.operation", "change_to_plan"), + attribute.Bool("subscription.timing.has_custom", inp.Timing.Custom != nil), + attribute.String("subscription.timing.enum", lo.TernaryF(inp.Timing.Enum != nil, func() string { + return string(*inp.Timing.Enum) + }, func() string { + return "" + })), + ) + // typing helper type res struct { curr subscription.Subscription diff --git a/openmeter/subscription/workflow/service/trace.go b/openmeter/subscription/workflow/service/trace.go new file mode 100644 index 0000000000..366bd6a4fe --- /dev/null +++ b/openmeter/subscription/workflow/service/trace.go @@ -0,0 +1,123 @@ +package service + +import ( + "context" + "slices" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + + "github.com/openmeterio/openmeter/openmeter/subscription" + subscriptionaddon "github.com/openmeterio/openmeter/openmeter/subscription/addon" +) + +func setSpanAttrs(ctx context.Context, attrs ...attribute.KeyValue) { + span := trace.SpanFromContext(ctx) + if span == nil { + return + } + + span.SetAttributes(attrs...) +} + +func addSpanEvent(ctx context.Context, name string, attrs ...attribute.KeyValue) { + span := trace.SpanFromContext(ctx) + if span == nil { + return + } + + span.AddEvent(name, trace.WithAttributes(attrs...)) +} + +func addSubscriptionSpecAttrs(attrs []attribute.KeyValue, prefix string, spec subscription.SubscriptionSpec) []attribute.KeyValue { + phaseKeys := make([]string, 0, len(spec.Phases)) + itemKeySet := make(map[string]struct{}) + itemVersions := 0 + for k, phase := range spec.Phases { + phaseKeys = append(phaseKeys, k) + for ik, items := range phase.ItemsByKey { + itemKeySet[ik] = struct{}{} + itemVersions += len(items) + } + } + slices.Sort(phaseKeys) + + itemKeys := make([]string, 0, len(itemKeySet)) + for k := range itemKeySet { + itemKeys = append(itemKeys, k) + } + slices.Sort(itemKeys) + + attrs = append(attrs, + attribute.String(prefix+".customer_id", spec.CustomerId), + attribute.Int(prefix+".phases.count", len(spec.Phases)), + attribute.StringSlice(prefix+".phase_keys", phaseKeys), + attribute.Int(prefix+".item_keys.count", len(itemKeySet)), + attribute.StringSlice(prefix+".item_keys", itemKeys), + attribute.Int(prefix+".item_versions.count", itemVersions), + attribute.Bool(prefix+".has_billables", spec.HasBillables()), + attribute.Bool(prefix+".has_metered_billables", spec.HasMeteredBillables()), + attribute.Bool(prefix+".has_entitlements", spec.HasEntitlements()), + ) + + if spec.Plan != nil { + attrs = append(attrs, + attribute.String(prefix+".plan.id", spec.Plan.Id), + attribute.String(prefix+".plan.key", spec.Plan.Key), + attribute.Int(prefix+".plan.version", spec.Plan.Version), + ) + } + + return attrs +} + +func addSubscriptionViewAttrs(attrs []attribute.KeyValue, prefix string, view subscription.SubscriptionView) []attribute.KeyValue { + phaseKeys := make([]string, 0, len(view.Phases)) + itemKeySet := make(map[string]struct{}) + itemVersions := 0 + for _, phase := range view.Phases { + phaseKeys = append(phaseKeys, phase.SubscriptionPhase.Key) + for ik, items := range phase.ItemsByKey { + itemKeySet[ik] = struct{}{} + itemVersions += len(items) + } + } + slices.Sort(phaseKeys) + + itemKeys := make([]string, 0, len(itemKeySet)) + for k := range itemKeySet { + itemKeys = append(itemKeys, k) + } + slices.Sort(itemKeys) + + return append(attrs, + attribute.String(prefix+".subscription_id", view.Subscription.ID), + attribute.String(prefix+".customer_id", view.Subscription.CustomerId), + attribute.Int(prefix+".phases.count", len(view.Phases)), + attribute.StringSlice(prefix+".phase_keys", phaseKeys), + attribute.Int(prefix+".item_keys.count", len(itemKeySet)), + attribute.StringSlice(prefix+".item_keys", itemKeys), + attribute.Int(prefix+".item_versions.count", itemVersions), + ) +} + +func addSubscriptionAddonsAttrs(attrs []attribute.KeyValue, prefix string, addons []subscriptionaddon.SubscriptionAddon) []attribute.KeyValue { + instances := 0 + subAddonIDs := make([]string, 0, len(addons)) + addonIDs := make([]string, 0, len(addons)) + addonKeys := make([]string, 0, len(addons)) + for _, add := range addons { + instances += len(add.GetInstances()) + subAddonIDs = append(subAddonIDs, add.ID) + addonIDs = append(addonIDs, add.Addon.ID) + addonKeys = append(addonKeys, add.Addon.Key) + } + + return append(attrs, + attribute.Int(prefix+".addons.count", len(addons)), + attribute.Int(prefix+".instances.count", instances), + attribute.StringSlice(prefix+".subscription_addon_ids", subAddonIDs), + attribute.StringSlice(prefix+".addon_ids", addonIDs), + attribute.StringSlice(prefix+".addon_keys", addonKeys), + ) +}