Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v7
with:
version: v2.4
version: v2.11
args: -v
76 changes: 54 additions & 22 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package apiserver

import (
"bytes"
"context"
"fmt"
"net"
Expand Down Expand Up @@ -35,6 +36,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
)
Expand All @@ -49,6 +51,8 @@ const (
kappctrlSVCEnvKey = "KAPPCTRL_SYSTEM_SERVICE"

apiServiceName = "v1alpha1.data.packaging.carvel.dev"

apiServiceReconcileInterval = 30 * time.Second
)

var (
Expand Down Expand Up @@ -107,7 +111,7 @@ func NewAPIServer(clientConfig *rest.Config, coreClient kubernetes.Interface, kc
return nil, fmt.Errorf("building aggregation client: %v", err)
}

config, err := newServerConfig(aggClient, opts)
config, caContentProvider, err := newServerConfig(aggClient, opts)
if err != nil {
return nil, err
}
Expand All @@ -117,6 +121,29 @@ func NewAPIServer(clientConfig *rest.Config, coreClient kubernetes.Interface, kc
return nil, err
}

// Register the PostStartHook to reconcile the CA Bundle
if err := server.AddPostStartHook("apiservice-ca-reconciler", func(hookContext genericapiserver.PostStartHookContext) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Comment thread
himsngh marked this conversation as resolved.
defer cancel()
if err := updateAPIService(ctx, opts.Logger, aggClient, caContentProvider); err != nil {
opts.Logger.Error(err, "Initial APIService CA sync failed")
return err
}

// Background Reconciliation
go wait.Until(func() {
ctx, syncCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer syncCancel()
if err := updateAPIService(ctx, opts.Logger, aggClient, caContentProvider); err != nil {
opts.Logger.Error(err, "Background APIService CA reconciliation failed")
}
}, apiServiceReconcileInterval, hookContext.StopCh)

return nil
}); err != nil {
return nil, fmt.Errorf("error registering APIService CA reconciler hook: %v", err)
}

packageMetadatasStorage := packagerest.NewPackageMetadataCRDREST(kcClient, coreClient, opts.GlobalNamespace)
packageStorage := packagerest.NewPackageCRDREST(kcClient, coreClient, opts.GlobalNamespace, opts.Logger)

Expand Down Expand Up @@ -168,7 +195,7 @@ func (as *APIServer) isReady() (bool, error) {
return false, nil
}

func newServerConfig(aggClient aggregatorclient.Interface, opts NewAPIServerOpts) (*genericapiserver.RecommendedConfig, error) {
func newServerConfig(aggClient aggregatorclient.Interface, opts NewAPIServerOpts) (*genericapiserver.RecommendedConfig, *dynamiccertificates.DynamicFileCAContent, error) {

recommendedOptions := genericoptions.NewRecommendedOptions("", Codecs.LegacyCodec(v1alpha1.SchemeGroupVersion))
recommendedOptions.Etcd = nil
Expand All @@ -180,26 +207,22 @@ func newServerConfig(aggClient aggregatorclient.Interface, opts NewAPIServerOpts

// ports below 1024 are probably the wrong port, see https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Well-known_ports
if opts.BindPort < 1024 {
return nil, fmt.Errorf("error initializing API Port to %v - try passing a port above 1023", opts.BindPort)
return nil, nil, fmt.Errorf("error initializing API Port to %v - try passing a port above 1023", opts.BindPort)
}
recommendedOptions.SecureServing.BindPort = opts.BindPort

if err := recommendedOptions.SecureServing.MaybeDefaultWithSelfSignedCerts("kapp-controller", []string{apiServiceEndoint()}, []net.IP{net.ParseIP("127.0.0.1")}); err != nil {
return nil, fmt.Errorf("error creating self-signed certificates: %v", err)
return nil, nil, fmt.Errorf("error creating self-signed certificates: %v", err)
}

caContentProvider, err := dynamiccertificates.NewDynamicCAContentFromFile("self-signed cert", recommendedOptions.SecureServing.ServerCert.CertKey.CertFile)
if err != nil {
return nil, fmt.Errorf("error reading self-signed CA certificate: %v", err)
}

if err := updateAPIService(opts.Logger, aggClient, caContentProvider); err != nil {
return nil, fmt.Errorf("error updating api service with generated certs: %v", err)
return nil, nil, fmt.Errorf("error reading self-signed CA certificate: %v", err)
}

serverVersion, err := getServerVersion(aggClient.Discovery())
if err != nil {
return nil, err
return nil, nil, err
}

// this feature gate is not enabled in k8s <1.29 as the
Expand All @@ -208,7 +231,7 @@ func newServerConfig(aggClient aggregatorclient.Interface, opts NewAPIServerOpts
// so the best we can do for older k8s clusters is to allow it to be disabled.
minSupportedVersionForAPF, err := semver.New("1.29.0")
if err != nil {
return nil, err
return nil, nil, err
}
isServerVerLTminSupportedVer := serverVersion.LT(*minSupportedVersionForAPF)
if !opts.EnableAPIPriorityAndFairness || isServerVerLTminSupportedVer {
Expand All @@ -223,7 +246,7 @@ func newServerConfig(aggClient aggregatorclient.Interface, opts NewAPIServerOpts
// However, we will still run namespaceLifecycle, mutatingAdmissionWebhook and validatingAdmissionWebhooks.
minSupportedVersionForValidatingAdmissionPolicy, err := semver.New("1.30.0")
if err != nil {
return nil, err
return nil, nil, err
}
isServerVerLTminSupportedVer = serverVersion.LT(*minSupportedVersionForValidatingAdmissionPolicy)
if isServerVerLTminSupportedVer {
Expand All @@ -248,10 +271,10 @@ func newServerConfig(aggClient aggregatorclient.Interface, opts NewAPIServerOpts
serverConfig.OpenAPIConfig.Info.Version = "v1alpha1"

if err := recommendedOptions.ApplyTo(serverConfig); err != nil {
return nil, err
return nil, nil, err
}

return serverConfig, nil
return serverConfig, caContentProvider, nil
}

func getServerVersion(discoveryClient discovery.DiscoveryInterface) (semver.Version, error) {
Expand All @@ -268,14 +291,23 @@ func getServerVersion(discoveryClient discovery.DiscoveryInterface) (semver.Vers
return retv, nil
}

func updateAPIService(logger logr.Logger, client aggregatorclient.Interface, caProvider dynamiccertificates.CAContentProvider) error {
logger.Info("Syncing CA certificate with APIServices")
apiService, err := client.ApiregistrationV1().APIServices().Get(context.TODO(), apiServiceName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error getting APIService %s: %v", apiServiceName, err)
}
apiService.Spec.CABundle = caProvider.CurrentCABundleContent()
if _, err := client.ApiregistrationV1().APIServices().Update(context.TODO(), apiService, metav1.UpdateOptions{}); err != nil {
func updateAPIService(ctx context.Context, logger logr.Logger, client aggregatorclient.Interface, caProvider dynamiccertificates.CAContentProvider) error {
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
apiService, err := client.ApiregistrationV1().APIServices().Get(ctx, apiServiceName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error getting APIService %s: %v", apiServiceName, err)
}

caBundle := caProvider.CurrentCABundleContent()
if bytes.Equal(apiService.Spec.CABundle, caBundle) {
return nil
}

logger.Info("Syncing CA certificate with APIServices")
apiService.Spec.CABundle = caBundle
_, err = client.ApiregistrationV1().APIServices().Update(ctx, apiService, metav1.UpdateOptions{})
return err
}); err != nil {
return fmt.Errorf("error updating kapp-controller CA cert of APIService %s: %v", apiServiceName, err)
}
return nil
Expand Down
94 changes: 94 additions & 0 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2026 The Carvel Authors.
Comment thread
praveenrewar marked this conversation as resolved.
// SPDX-License-Identifier: Apache-2.0

package apiserver
Comment thread
himsngh marked this conversation as resolved.

import (
"context"
"crypto/x509"
"testing"

"github.com/go-logr/logr"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
clienttesting "k8s.io/client-go/testing"
apiregv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
fakeaggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake"
)

// fakeCAProvider implements dynamiccertificates.CAContentProvider
type fakeCAProvider struct {
bundle []byte
}

func (f *fakeCAProvider) Name() string { return "fake-ca-provider" }
func (f *fakeCAProvider) CurrentCABundleContent() []byte { return f.bundle }
func (f *fakeCAProvider) AddListener(_ dynamiccertificates.Listener) {}
func (f *fakeCAProvider) VerifyOptions() (x509.VerifyOptions, bool) {
return x509.VerifyOptions{}, false
}

func Test_updateAPIService(t *testing.T) {
logger := logr.Discard()

tests := []struct {
name string
existingBundle []byte
newBundle []byte
expectUpdate bool
}{
{
name: "updates APIService when CA bundle is different",
existingBundle: []byte("old-dead-pod-cert"),
newBundle: []byte("new-active-pod-cert"),
expectUpdate: true,
},
{
name: "does nothing when CA bundle is identical",
existingBundle: []byte("active-pod-cert"),
newBundle: []byte("active-pod-cert"),
expectUpdate: false,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
apiSvc := &apiregv1.APIService{
ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
Spec: apiregv1.APIServiceSpec{
CABundle: tc.existingBundle,
},
}
fakeClient := fakeaggregator.NewSimpleClientset(apiSvc)

fakeProvider := &fakeCAProvider{bundle: tc.newBundle}

err := updateAPIService(context.TODO(), logger, fakeClient, fakeProvider)
require.NoError(t, err)

actions := fakeClient.Actions()
require.GreaterOrEqual(t, len(actions), 1, "expected at least a GET action")
require.Equal(t, "get", actions[0].GetVerb())

var updateActionFound bool
for _, action := range actions {
if action.GetVerb() == "update" {
updateActionFound = true
updateAction, ok := action.(clienttesting.UpdateAction)
if !ok {
t.Fatalf("Expected UpdateAction, got %T", action)
}
updatedSvc := updateAction.GetObject().(*apiregv1.APIService)
require.Equal(t, tc.newBundle, updatedSvc.Spec.CABundle)
}
}

if tc.expectUpdate {
require.True(t, updateActionFound, "expected an UPDATE action to be executed, but none was found")
} else {
require.False(t, updateActionFound, "expected NO UPDATE action, but one was executed")
}
})
}
}
Comment thread
himsngh marked this conversation as resolved.
4 changes: 4 additions & 0 deletions vendor/k8s.io/client-go/util/retry/OWNERS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 105 additions & 0 deletions vendor/k8s.io/client-go/util/retry/util.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading