Skip to content

Commit dbe75c0

Browse files
authored
pdms: Choose a suitable pdms to transfer primary when upgrade (#5643)
1 parent 545685a commit dbe75c0

File tree

8 files changed

+234
-7
lines changed

8 files changed

+234
-7
lines changed

pkg/controller/pd_control.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,20 @@ func NewFakePDClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster) *
113113
return pdClient
114114
}
115115

116+
// NewFakePDMSClient creates a fake pdmsclient that is set as the pdms client
117+
func NewFakePDMSClient(pdControl *pdapi.FakePDControl, tc *v1alpha1.TidbCluster, curService string) *pdapi.FakePDMSClient {
118+
pdmsClient := pdapi.NewFakePDMSClient()
119+
if tc.Spec.Cluster != nil {
120+
pdControl.SetPDMSClientWithClusterDomain(pdapi.Namespace(tc.Spec.Cluster.Namespace), tc.Spec.Cluster.Name, tc.Spec.Cluster.ClusterDomain, curService, pdmsClient)
121+
}
122+
if tc.Spec.ClusterDomain != "" {
123+
pdControl.SetPDMSClientWithClusterDomain(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), tc.Spec.ClusterDomain, curService, pdmsClient)
124+
}
125+
pdControl.SetPDMSClient(pdapi.Namespace(tc.GetNamespace()), tc.GetName(), curService, pdmsClient)
126+
127+
return pdmsClient
128+
}
129+
116130
// NewFakePDClientWithAddress creates a fake pdclient that is set as the pd client
117131
func NewFakePDClientWithAddress(pdControl *pdapi.FakePDControl, peerURL string) *pdapi.FakePDClient {
118132
pdClient := pdapi.NewFakePDClient()

pkg/manager/member/pd_ms_upgrader.go

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/pingcap/tidb-operator/pkg/controller"
2323
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
2424
"github.com/pingcap/tidb-operator/pkg/third_party/k8s"
25+
"github.com/pingcap/tidb-operator/pkg/util/cmpver"
2526
apps "k8s.io/api/apps/v1"
2627
"k8s.io/klog/v2"
2728
)
@@ -120,12 +121,95 @@ func (u *pdMSUpgrader) gracefulUpgrade(tc *v1alpha1.TidbCluster, oldSet *apps.St
120121
}
121122
continue
122123
}
123-
mngerutils.SetUpgradePartition(newSet, i)
124-
return nil
124+
125+
return u.upgradePDMSPod(tc, i, newSet, curService)
125126
}
126127
return nil
127128
}
128129

130+
func (u *pdMSUpgrader) upgradePDMSPod(tc *v1alpha1.TidbCluster, ordinal int32, newSet *apps.StatefulSet, curService string) error {
131+
// Only support after `8.3.0` to keep compatibility.
132+
if check, err := pdMSSupportMicroServicesWithName.Check(tc.PDMSVersion(curService)); check && err == nil {
133+
ns := tc.GetNamespace()
134+
tcName := tc.GetName()
135+
upgradePDMSName := PDMSName(tcName, ordinal, tc.Namespace, tc.Spec.ClusterDomain, tc.Spec.AcrossK8s, curService)
136+
upgradePodName := PDMSPodName(tcName, ordinal, curService)
137+
138+
pdClient := controller.GetPDClient(u.deps.PDControl, tc)
139+
primary, err := pdClient.GetMSPrimary(curService)
140+
if err != nil {
141+
return err
142+
}
143+
144+
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: check primary: %s, upgradePDMSName: %s, upgradePodName: %s", ns, tcName,
145+
primary, upgradePDMSName, upgradePodName)
146+
// If current pdms is primary, transfer primary to other pdms pod
147+
if strings.Contains(primary, upgradePodName) || strings.Contains(primary, upgradePDMSName) {
148+
targetName := ""
149+
150+
if tc.PDMSStsActualReplicas(curService) > 1 {
151+
targetName = choosePDMSToTransferFromMembers(tc, newSet, ordinal)
152+
}
153+
154+
if targetName != "" {
155+
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s", ns, tcName, targetName)
156+
err := controller.GetPDMSClient(u.deps.PDControl, tc, curService).TransferPrimary(targetName)
157+
if err != nil {
158+
klog.Errorf("TidbCluster: [%s/%s]' pdms upgrader: failed to transfer pdms primary to: %s, %v", ns, tcName, targetName, err)
159+
return err
160+
}
161+
klog.Infof("TidbCluster: [%s/%s]' pdms upgrader: transfer pdms primary to: %s successfully", ns, tcName, targetName)
162+
} else {
163+
klog.Warningf("TidbCluster: [%s/%s]' pdms upgrader: skip to transfer pdms primary, because can not find a suitable pd", ns, tcName)
164+
}
165+
}
166+
}
167+
168+
mngerutils.SetUpgradePartition(newSet, ordinal)
169+
return nil
170+
}
171+
172+
// choosePDMSToTransferFromMembers choose a pdms to transfer primary from members
173+
//
174+
// Assume that current primary ordinal is x, and range is [0, n]
175+
// 1. Find the max suitable ordinal in (x, n], because they have been upgraded
176+
// 2. If no suitable ordinal, find the min suitable ordinal in [0, x) to reduce the count of transfer
177+
func choosePDMSToTransferFromMembers(tc *v1alpha1.TidbCluster, newSet *apps.StatefulSet, ordinal int32) string {
178+
ns := tc.GetNamespace()
179+
tcName := tc.GetName()
180+
klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: start to choose pdms to transfer primary from members", ns, tcName)
181+
ordinals := helper.GetPodOrdinals(*newSet.Spec.Replicas, newSet)
182+
183+
// set ordinal to max ordinal if ordinal isn't exist
184+
if !ordinals.Has(ordinal) {
185+
ordinal = helper.GetMaxPodOrdinal(*newSet.Spec.Replicas, newSet)
186+
}
187+
188+
targetName := ""
189+
list := ordinals.List()
190+
if len(list) == 0 {
191+
return ""
192+
}
193+
194+
// just using pods index for now. TODO: add healthy checker for pdms.
195+
// find the maximum ordinal which is larger than ordinal
196+
if len(list) > int(ordinal)+1 {
197+
targetName = PDMSPodName(tcName, list[len(list)-1], controller.PDMSTrimName(newSet.Name))
198+
}
199+
200+
if targetName == "" && ordinal != 0 {
201+
// find the minimum ordinal which is less than ordinal
202+
targetName = PDMSPodName(tcName, list[0], controller.PDMSTrimName(newSet.Name))
203+
}
204+
205+
klog.Infof("Tidbcluster: [%s/%s]' pdms upgrader: choose pdms to transfer primary from members, targetName: %s", ns, tcName, targetName)
206+
return targetName
207+
}
208+
209+
// PDMSSupportMicroServicesWithName returns true if the given version of PDMS supports microservices with name.
210+
// related https://github.com/tikv/pd/pull/8157.
211+
var pdMSSupportMicroServicesWithName, _ = cmpver.NewConstraint(cmpver.GreaterOrEqual, "v8.3.0")
212+
129213
type fakePDMSUpgrader struct{}
130214

131215
// NewFakePDMSUpgrader returns a fakePDUpgrader

pkg/manager/member/pd_ms_upgrader_test.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
2222
"github.com/pingcap/tidb-operator/pkg/controller"
2323
mngerutils "github.com/pingcap/tidb-operator/pkg/manager/utils"
24+
"github.com/pingcap/tidb-operator/pkg/pdapi"
2425
apps "k8s.io/api/apps/v1"
2526
corev1 "k8s.io/api/core/v1"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -44,8 +45,20 @@ func TestPDMSUpgraderUpgrade(t *testing.T) {
4445

4546
testFn := func(test *testcase) {
4647
t.Log(test.name)
47-
upgrader, podInformer := newPDMSUpgrader()
48+
upgrader, pdControl, podInformer := newPDMSUpgrader()
4849
tc := newTidbClusterForPDMSUpgrader()
50+
pdClient := controller.NewFakePDClient(pdControl, tc)
51+
pdMSClient := controller.NewFakePDMSClient(pdControl, tc, "tso")
52+
53+
pdClient.AddReaction(pdapi.GetPDMSPrimaryActionType, func(action *pdapi.Action) (interface{}, error) {
54+
return "upgrader-tso-1", nil
55+
})
56+
pdMSClient.AddReaction(pdapi.GetHealthActionType, func(action *pdapi.Action) (interface{}, error) {
57+
return nil, nil
58+
})
59+
pdMSClient.AddReaction(pdapi.PDMSTransferPrimaryActionType, func(action *pdapi.Action) (interface{}, error) {
60+
return nil, nil
61+
})
4962

5063
if test.changeFn != nil {
5164
test.changeFn(tc)
@@ -218,11 +231,12 @@ func TestPDMSUpgraderUpgrade(t *testing.T) {
218231
}
219232
}
220233

221-
func newPDMSUpgrader() (Upgrader, podinformers.PodInformer) {
234+
func newPDMSUpgrader() (Upgrader, *pdapi.FakePDControl, podinformers.PodInformer) {
222235
fakeDeps := controller.NewFakeDependencies()
223236
pdMSUpgrader := &pdMSUpgrader{deps: fakeDeps}
224237
podInformer := fakeDeps.KubeInformerFactory.Core().V1().Pods()
225-
return pdMSUpgrader, podInformer
238+
pdControl := fakeDeps.PDControl.(*pdapi.FakePDControl)
239+
return pdMSUpgrader, pdControl, podInformer
226240
}
227241

228242
func newStatefulSetForPDMSUpgrader() *apps.StatefulSet {

pkg/manager/member/utils.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,21 @@ func PdName(tcName string, ordinal int32, namespace string, clusterDomain string
160160
return PdPodName(tcName, ordinal)
161161
}
162162

163+
// PDMSName should match the start arg `--name` of pd-server
164+
// See the start script of PDMS in pkg/manager/member/startscript/v2.renderPDMSStartScript
165+
func PDMSName(tcName string, ordinal int32, namespace, clusterDomain string, acrossK8s bool, component string) string {
166+
if len(clusterDomain) > 0 {
167+
return fmt.Sprintf("%s.%s-%s-peer.%s.svc.%s", PDMSPodName(tcName, ordinal, component), component, tcName, namespace, clusterDomain)
168+
}
169+
170+
// clusterDomain is not set
171+
if acrossK8s {
172+
return fmt.Sprintf("%s.%s-%s-peer.%s.svc", PDMSPodName(tcName, ordinal, component), component, tcName, namespace)
173+
}
174+
175+
return PDMSPodName(tcName, ordinal, component)
176+
}
177+
163178
// NeedForceUpgrade check if force upgrade is necessary
164179
func NeedForceUpgrade(ann map[string]string) bool {
165180
// Check if annotation 'pingcap.com/force-upgrade: "true"' is set

pkg/pdapi/fake_pdapi.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
GetClusterActionType ActionType = "GetCluster"
2929
GetMembersActionType ActionType = "GetMembers"
3030
GetPDMSMembersActionType ActionType = "GetPDMSMembers"
31+
GetPDMSPrimaryActionType ActionType = "GetPDMSPrimary"
3132
GetStoresActionType ActionType = "GetStores"
3233
GetTombStoneStoresActionType ActionType = "GetTombStoneStores"
3334
GetStoreActionType ActionType = "GetStore"
@@ -45,6 +46,7 @@ const (
4546
TransferPDLeaderActionType ActionType = "TransferPDLeader"
4647
GetAutoscalingPlansActionType ActionType = "GetAutoscalingPlans"
4748
GetRecoveringMarkActionType ActionType = "GetRecoveringMark"
49+
PDMSTransferPrimaryActionType ActionType = "PDMSTransferPrimary"
4850
)
4951

5052
type NotFoundReaction struct {
@@ -78,6 +80,15 @@ func (c *FakePDClient) GetMSMembers(_ string) ([]string, error) {
7880
return result.([]string), nil
7981
}
8082

83+
func (c *FakePDClient) GetMSPrimary(_ string) (string, error) {
84+
action := &Action{}
85+
result, err := c.fakeAPI(GetPDMSPrimaryActionType, action)
86+
if err != nil {
87+
return "", err
88+
}
89+
return result.(string), nil
90+
}
91+
8192
func NewFakePDClient() *FakePDClient {
8293
return &FakePDClient{reactions: map[ActionType]Reaction{}}
8394
}
@@ -291,3 +302,40 @@ func (c *FakePDClient) GetRecoveringMark() (bool, error) {
291302

292303
return true, nil
293304
}
305+
306+
// FakePDMSClient implements a fake version of PDMSClient.
307+
type FakePDMSClient struct {
308+
reactions map[ActionType]Reaction
309+
}
310+
311+
func NewFakePDMSClient() *FakePDMSClient {
312+
return &FakePDMSClient{reactions: map[ActionType]Reaction{}}
313+
}
314+
315+
func (c *FakePDMSClient) AddReaction(actionType ActionType, reaction Reaction) {
316+
c.reactions[actionType] = reaction
317+
}
318+
319+
// fakeAPI is a small helper for fake API calls
320+
func (c *FakePDMSClient) fakeAPI(actionType ActionType, action *Action) (interface{}, error) {
321+
if reaction, ok := c.reactions[actionType]; ok {
322+
result, err := reaction(action)
323+
if err != nil {
324+
return nil, err
325+
}
326+
return result, nil
327+
}
328+
return nil, &NotFoundReaction{actionType}
329+
}
330+
331+
func (c *FakePDMSClient) GetHealth() error {
332+
action := &Action{}
333+
_, err := c.fakeAPI(GetHealthActionType, action)
334+
return err
335+
}
336+
337+
func (c *FakePDMSClient) TransferPrimary(newPrimary string) error {
338+
action := &Action{Name: newPrimary}
339+
_, err := c.fakeAPI(PDMSTransferPrimaryActionType, action)
340+
return err
341+
}

pkg/pdapi/pd_control.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ type FakePDControl struct {
337337

338338
func NewFakePDControl(secretLister corelisterv1.SecretLister) *FakePDControl {
339339
return &FakePDControl{
340-
defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}},
340+
defaultPDControl{secretLister: secretLister, pdClients: map[string]PDClient{}, pdMSClients: map[string]PDMSClient{}},
341341
}
342342
}
343343

@@ -352,3 +352,15 @@ func (fpc *FakePDControl) SetPDClientWithClusterDomain(namespace Namespace, tcNa
352352
func (fpc *FakePDControl) SetPDClientWithAddress(peerURL string, pdclient PDClient) {
353353
fpc.defaultPDControl.pdClients[peerURL] = pdclient
354354
}
355+
356+
func (fpc *FakePDControl) SetPDMSClient(namespace Namespace, tcName, curService string, pdmsclient PDMSClient) {
357+
fpc.defaultPDControl.pdMSClients[genClientUrl(namespace, tcName, "http", "", curService, false)] = pdmsclient
358+
}
359+
360+
func (fpc *FakePDControl) SetPDMSClientWithClusterDomain(namespace Namespace, tcName, tcClusterDomain, curService string, pdmsclient PDMSClient) {
361+
fpc.defaultPDControl.pdMSClients[genClientUrl(namespace, tcName, "http", tcClusterDomain, curService, false)] = pdmsclient
362+
}
363+
364+
func (fpc *FakePDControl) SetPDMSClientWithAddress(peerURL string, pdmsclient PDMSClient) {
365+
fpc.defaultPDControl.pdMSClients[peerURL] = pdmsclient
366+
}

pkg/pdapi/pdapi.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ type PDClient interface {
9696
GetRecoveringMark() (bool, error)
9797
// GetMSMembers returns all PDMS members service-addr from cluster by specific Micro Service
9898
GetMSMembers(service string) ([]string, error)
99+
// GetMSPrimary returns the primary PDMS member service-addr from cluster by specific Micro Service
100+
GetMSPrimary(service string) (string, error)
99101
}
100102

101103
var (
@@ -341,6 +343,21 @@ func (c *pdClient) GetMSMembers(service string) ([]string, error) {
341343
return addrs, nil
342344
}
343345

346+
func (c *pdClient) GetMSPrimary(service string) (string, error) {
347+
apiURL := fmt.Sprintf("%s/%s/primary/%s", c.url, MicroServicePrefix, service)
348+
body, err := httputil.GetBodyOK(c.httpClient, apiURL)
349+
if err != nil {
350+
return "", err
351+
}
352+
var primary string
353+
err = json.Unmarshal(body, &primary)
354+
if err != nil {
355+
return "", err
356+
}
357+
358+
return primary, nil
359+
}
360+
344361
func (c *pdClient) getStores(apiURL string) (*StoresInfo, error) {
345362
body, err := httputil.GetBodyOK(c.httpClient, apiURL)
346363
if err != nil {

pkg/pdapi/pdms_api.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
package pdapi
1515

1616
import (
17+
"bytes"
1718
"crypto/tls"
19+
"encoding/json"
1820
"fmt"
1921
"net/http"
2022
"time"
@@ -27,10 +29,13 @@ import (
2729
type PDMSClient interface {
2830
// GetHealth returns ping result
2931
GetHealth() error
32+
// TransferPrimary transfers the primary to the newPrimary
33+
TransferPrimary(newPrimary string) error
3034
}
3135

3236
var (
33-
pdMSHealthPrefix = "api/v1/health"
37+
pdMSHealthPrefix = "api/v1/health"
38+
pdMSPrimaryTransferPrefix = "api/v1/primary/transfer"
3439
)
3540

3641
// pdMSClient is default implementation of PDClient
@@ -69,3 +74,21 @@ func (c *pdMSClient) GetHealth() error {
6974
}
7075
return nil
7176
}
77+
78+
func (c *pdMSClient) TransferPrimary(newPrimary string) error {
79+
apiURL := fmt.Sprintf("%s/%s/%s", c.url, c.serviceName, pdMSPrimaryTransferPrefix)
80+
data, err := json.Marshal(struct {
81+
NewPrimary string `json:"new_primary"`
82+
}{
83+
NewPrimary: newPrimary,
84+
})
85+
if err != nil {
86+
return err
87+
}
88+
_, err = httputil.PostBodyOK(c.httpClient, apiURL, bytes.NewBuffer(data))
89+
if err != nil {
90+
return err
91+
}
92+
93+
return nil
94+
}

0 commit comments

Comments
 (0)