Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tikv rpc cli #6057

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ require (
github.com/openshift/generic-admission-server v1.14.1-0.20230920151207-22a210ddee02
github.com/pingcap/advanced-statefulset/client v1.17.1-0.20241017064830-af926cc6da0d
github.com/pingcap/errors v0.11.4
github.com/pingcap/kvproto v0.0.0-20231122054644-fb0f5c2a0a10
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1
github.com/pingcap/tidb-operator/pkg/apis v1.6.1
github.com/pingcap/tidb-operator/pkg/client v1.6.1
github.com/pingcap/tiproxy/lib v0.0.0-20230907130944-eb5b4b9c9e79
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -651,8 +651,8 @@ github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0/go.mod h1:PYMCGwN0JH
github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
github.com/pingcap/kvproto v0.0.0-20231122054644-fb0f5c2a0a10 h1:qnhfzwdWOy8oOSZYX7/aK9XKDs4hJ6P/Gg+s7Sr9VKY=
github.com/pingcap/kvproto v0.0.0-20231122054644-fb0f5c2a0a10/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 h1:rTAyiswGyWSGHJVa4Mkhdi8YfGqfA4LrUVKsH9nrJ8E=
github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/tiproxy/lib v0.0.0-20230907130944-eb5b4b9c9e79 h1:q2ILYQoB7COkkkE+dA3hwIFHZHYxOwDBpR3AYenb/hM=
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/tidbcluster/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,15 @@ func (c *PodController) syncTiKVPodForEviction(ctx context.Context, pod *corev1.
klog.Infof("Region leader count is %d for Pod %s/%s", leaderCount, pod.Namespace, pod.Name)

if leaderCount == 0 {
if err := kvClient.FlushLogBackupTasks(ctx); err != nil {
klog.ErrorS(err, "failed to flush log backup tasks. continue to delete pod...", "pod", pod.Name, "pod.ns", pod.Namespace)
}

err = c.deps.KubeClientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return reconcile.Result{}, perrors.Annotatef(err, "failed to delete pod %q", pod.Name)
}
klog.InfoS("successfully deleted the pod.", "pod", pod.Name, "pod.ns", pod.Namespace)
} else {
// re-check leader count next time
return reconcile.Result{RequeueAfter: c.recheckLeaderCountDuration}, nil
Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/tidbcluster/pod_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import (
)

type kvClient struct {
leaderCount int32
leaderCount int32
logBackupFlushed atomic.Bool
}

var _ tikvapi.TiKVClient = &kvClient{}
Expand All @@ -46,6 +47,12 @@ func (c *kvClient) GetLeaderCount() (int, error) {
return int(count), nil
}

// FlushLogBackupTasks implements tikvapi.TiKVClient.
func (c *kvClient) FlushLogBackupTasks(ctx context.Context) error {
c.logBackupFlushed.Store(true)
return nil
}

func TestTiKVPodSyncForEviction(t *testing.T) {
interval := time.Millisecond * 100
timeout := time.Minute * 1
Expand Down Expand Up @@ -166,6 +173,7 @@ func TestTiKVPodSyncForEviction(t *testing.T) {
stat := c.getPodStat(pod)
return stat.finishAnnotationCounts
}, timeout, interval).ShouldNot(Equal(0), "should finish annotation")
g.Expect(kvClient.logBackupFlushed.Load()).To(BeTrue(), "should flushed log backup tasks")
}

func TestTiKVPodSyncForReplaceVolume(t *testing.T) {
Expand Down
13 changes: 12 additions & 1 deletion pkg/manager/member/tikv_upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package member

import (
"context"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -249,7 +250,11 @@ func (u *tikvUpgrader) evictLeaderBeforeUpgrade(tc *v1alpha1.TidbCluster, upgrad
}

if leaderCount == 0 {
klog.Infof("%s: leader count is 0, so ready to upgrade", logPrefix)
klog.Infof("%s: leader count is 0, so ready to upgrade, triggering force flush when there are some log backup tasks", logPrefix)
err := u.triggerForceFlush(tc, upgradePod)
if err != nil {
klog.Warningf("%s: failed to trigger force flush, continuing: %s", logPrefix, err)
}
return true, nil
}

Expand Down Expand Up @@ -348,6 +353,12 @@ func (u *tikvUpgrader) endEvictLeaderAfterUpgrade(tc *v1alpha1.TidbCluster, pod

}

func (u *tikvUpgrader) triggerForceFlush(tc *v1alpha1.TidbCluster, pod *corev1.Pod) error {
pdcli := u.deps.TiKVControl.GetTiKVPodClient(tc.GetNamespace(), tc.GetName(), pod.GetName(), tc.Spec.ClusterDomain, tc.IsTLSClusterEnabled())
cxLogger := klog.NewContext(context.Background(), klog.Background().WithValues("pod", pod.GetName(), "pod.ns", pod.GetNamespace()))
return pdcli.FlushLogBackupTasks(cxLogger)
}

func (u *tikvUpgrader) beginEvictLeader(tc *v1alpha1.TidbCluster, storeID uint64, pod *corev1.Pod) error {
ns := tc.GetNamespace()
podName := pod.GetName()
Expand Down
11 changes: 10 additions & 1 deletion pkg/tikvapi/fake_tikvapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package tikvapi

import (
"context"
"fmt"
)

type ActionType string

const (
GetLeaderCountActionType ActionType = "GetLeaderCount"
GetLeaderCountActionType ActionType = "GetLeaderCount"
FlushLogBackupTasksActionType ActionType = "FlushLogBackupTasks"
)

type NotFoundReaction struct {
Expand Down Expand Up @@ -72,3 +74,10 @@ func (c *FakeTiKVClient) GetLeaderCount() (int, error) {
}
return result.(int), nil
}

// FlushLogBackupTasks implements TiKVClient.
func (c *FakeTiKVClient) FlushLogBackupTasks(ctx context.Context) error {
action := &Action{}
_, err := c.fakeAPI(FlushLogBackupTasksActionType, action)
return err
}
34 changes: 27 additions & 7 deletions pkg/tikvapi/tikv_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,27 @@ func (tc *defaultTiKVControl) GetTiKVPodClient(namespace string, tcName string,

var tlsConfig *tls.Config
var err error
var scheme = "http"
var configOfSchema = func(scheme string) TiKVClientOpts {
return TiKVClientOpts{
HTTPEndpoint: TiKVPodClientURL(namespace, tcName, podName, scheme, clusterDomain),
GRPCEndpoint: TiKVGRPCClientURL(namespace, tcName, podName, scheme, clusterDomain),
Timeout: DefaultTimeout,
TLSConfig: tlsConfig,
DisableKeepAlives: true,
}
}

if tlsEnabled {
scheme = "https"
tlsConfig, err = pdapi.GetTLSConfig(tc.secretLister, pdapi.Namespace(namespace), util.ClusterClientTLSSecretName(tcName))
if err != nil {
klog.Errorf("Unable to get tls config for TiKV cluster %q, tikv client may not work: %v", tcName, err)
return NewTiKVClient(TiKVPodClientURL(namespace, tcName, podName, scheme, clusterDomain), DefaultTimeout, tlsConfig, true)
return NewTiKVClient(configOfSchema("https"))
}

return NewTiKVClient(TiKVPodClientURL(namespace, tcName, podName, scheme, clusterDomain), DefaultTimeout, tlsConfig, true)
return NewTiKVClient(configOfSchema("https"))
}

return NewTiKVClient(TiKVPodClientURL(namespace, tcName, podName, scheme, clusterDomain), DefaultTimeout, tlsConfig, true)
return NewTiKVClient(configOfSchema("http"))
}

func tikvPodClientKey(schema, namespace, clusterName, podName string) string {
Expand All @@ -71,10 +78,23 @@ func tikvPodClientKey(schema, namespace, clusterName, podName string) string {

// TiKVPodClientURL builds the url of tikv pod client
func TiKVPodClientURL(namespace, clusterName, podName, scheme, clusterDomain string) string {
return attachPort(tiKVBaseURL(namespace, clusterName, podName, scheme, clusterDomain), v1alpha1.DefaultTiKVStatusPort)
}

// TiKVGRPCURL builds the url of tikv grpc client
func TiKVGRPCClientURL(namespace, clusterName, podName, scheme, clusterDomain string) string {
return attachPort(tiKVBaseURL(namespace, clusterName, podName, scheme, clusterDomain), v1alpha1.DefaultTiKVServerPort)
}

func tiKVBaseURL(namespace, clusterName, podName, scheme, clusterDomain string) string {
if clusterDomain != "" {
return fmt.Sprintf("%s://%s.%s-tikv-peer.%s.svc.%s:%d", scheme, podName, clusterName, namespace, clusterDomain, v1alpha1.DefaultTiKVStatusPort)
return fmt.Sprintf("%s://%s.%s-tikv-peer.%s.svc.%s", scheme, podName, clusterName, namespace, clusterDomain)
}
return fmt.Sprintf("%s://%s.%s-tikv-peer.%s:%d", scheme, podName, clusterName, namespace, v1alpha1.DefaultTiKVStatusPort)
return fmt.Sprintf("%s://%s.%s-tikv-peer.%s", scheme, podName, clusterName, namespace)
}

func attachPort(path string, port int32) string {
return fmt.Sprintf("%s:%d", path, port)
}

// FakeTiKVControl implements a fake version of TiKVControlInterface.
Expand Down
98 changes: 91 additions & 7 deletions pkg/tikvapi/tikvapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@
package tikvapi

import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
logbackup "github.com/pingcap/kvproto/pkg/logbackuppb"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/prom2json"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"
)

Expand All @@ -36,12 +43,69 @@ const (
// TiKVClient provides tikv server's api
type TiKVClient interface {
GetLeaderCount() (int, error)
FlushLogBackupTasks(ctx context.Context) error
}

type lazyGRPCConn struct {
target string
opts []grpc.DialOption
}

func (l *lazyGRPCConn) conn(ctx context.Context) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(ctx, DefaultTimeout)
defer cancel()
ch, err := grpc.DialContext(ctx, l.target, l.opts...)
if err != nil {
return nil, errors.Annotatef(err, "during connecting to %s", l.target)
}

return ch, nil
}

// tikvClient is default implementation of TiKVClient
type tikvClient struct {
url string
httpClient *http.Client
url string
httpClient *http.Client
grpcConnector *lazyGRPCConn
}

// FlushLogBackupTasks implements TiKVClient.
func (c *tikvClient) FlushLogBackupTasks(ctx context.Context) error {
logger := klog.FromContext(ctx)

// For now we are using one-shot sessions, because the `TiKVClient`
// interface doesn't provide a `Close` method...
conn, err := c.grpcConnector.conn(ctx)
if err != nil {
return err
}
defer func() {
if err := conn.Close(); err != nil {
logger.Error(err, "tikvClient: failed to close grpc connection")
}
}()

cli := logbackup.NewLogBackupClient(conn)
res, err := cli.FlushNow(ctx, &logbackup.FlushNowRequest{})
if err != nil {
return err
}

// Fast path: no task, return early.
if len(res.Results) == 0 {
return nil
}

for _, r := range res.Results {
if r.Success {
logger.Info("successfully flushed the log backup task.", "task", r.TaskName)
} else {
return errors.Errorf("force flush failed for task %s: %s", r.TaskName, r.ErrorMessage)
}
}
// Wait a while so TiKV is able to send the flush result to the advancer.
time.Sleep(3 * time.Second)
return nil
}

// GetLeaderCount gets region leader count from the URL
Expand Down Expand Up @@ -74,15 +138,35 @@ func (c *tikvClient) GetLeaderCount() (int, error) {
return 0, fmt.Errorf("metric %s{type=\"%s\"} not found for %s", metricNameRegionCount, labelNameLeaderCount, apiURL)
}

type TiKVClientOpts struct {
HTTPEndpoint string
GRPCEndpoint string
Timeout time.Duration
TLSConfig *tls.Config
DisableKeepAlives bool
}

// NewTiKVClient returns a new TiKVClient
func NewTiKVClient(url string, timeout time.Duration, tlsConfig *tls.Config, disableKeepalive bool) TiKVClient {
func NewTiKVClient(opts TiKVClientOpts) TiKVClient {
var conn lazyGRPCConn
if strings.HasPrefix(opts.GRPCEndpoint, "https://") {
conn.opts = append(conn.opts, grpc.WithTransportCredentials(credentials.NewTLS(opts.TLSConfig)))
} else {
conn.opts = append(conn.opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn.target = opts.GRPCEndpoint
for _, prefix := range []string{"http://", "https://"} {
conn.target = strings.TrimPrefix(conn.target, prefix)
}

return &tikvClient{
url: url,
url: opts.HTTPEndpoint,
grpcConnector: &conn,
httpClient: &http.Client{
Timeout: timeout,
Timeout: opts.Timeout,
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
DisableKeepAlives: disableKeepalive,
TLSClientConfig: opts.TLSConfig,
DisableKeepAlives: opts.DisableKeepAlives,
ResponseHeaderTimeout: 10 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
DialContext: (&net.Dialer{
Expand Down
Loading