diff --git a/go.mod b/go.mod index 56991247f27..a903c040447 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/openshift/generic-admission-server v1.14.1-0.20230920151207-22a210ddee02 github.com/pingcap/advanced-statefulset/client v1.17.1-0.20241017064830-af926cc6da0d github.com/pingcap/errors v0.11.4 - github.com/pingcap/kvproto v0.0.0-20231122054644-fb0f5c2a0a10 + github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 github.com/pingcap/tidb-operator/pkg/apis v1.6.1 github.com/pingcap/tidb-operator/pkg/client v1.6.1 github.com/pingcap/tiproxy/lib v0.0.0-20230907130944-eb5b4b9c9e79 diff --git a/go.sum b/go.sum index 8f8b46c8252..a43e8cdf8e1 100644 --- a/go.sum +++ b/go.sum @@ -651,8 +651,8 @@ github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0/go.mod h1:PYMCGwN0JH github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= -github.com/pingcap/kvproto v0.0.0-20231122054644-fb0f5c2a0a10 h1:qnhfzwdWOy8oOSZYX7/aK9XKDs4hJ6P/Gg+s7Sr9VKY= -github.com/pingcap/kvproto v0.0.0-20231122054644-fb0f5c2a0a10/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= +github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1 h1:rTAyiswGyWSGHJVa4Mkhdi8YfGqfA4LrUVKsH9nrJ8E= +github.com/pingcap/kvproto v0.0.0-20250117122752-2b87602a94a1/go.mod h1:rXxWk2UnwfUhLXha1jxRWPADw9eMZGWEWCg92Tgmb/8= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9 h1:AJD9pZYm72vMgPcQDww9rkZ1DnWfl0pXV3BOWlkYIjA= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/tiproxy/lib v0.0.0-20230907130944-eb5b4b9c9e79 h1:q2ILYQoB7COkkkE+dA3hwIFHZHYxOwDBpR3AYenb/hM= diff --git a/pkg/controller/tidbcluster/pod_control.go b/pkg/controller/tidbcluster/pod_control.go index 891a79f299b..31cbef368b4 100644 --- a/pkg/controller/tidbcluster/pod_control.go +++ b/pkg/controller/tidbcluster/pod_control.go @@ -458,10 +458,15 @@ func (c *PodController) syncTiKVPodForEviction(ctx context.Context, pod *corev1. klog.Infof("Region leader count is %d for Pod %s/%s", leaderCount, pod.Namespace, pod.Name) if leaderCount == 0 { + if err := kvClient.FlushLogBackupTasks(ctx); err != nil { + klog.ErrorS(err, "failed to flush log backup tasks. continue to delete pod...", "pod", pod.Name, "pod.ns", pod.Namespace) + } + err = c.deps.KubeClientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) if err != nil && !errors.IsNotFound(err) { return reconcile.Result{}, perrors.Annotatef(err, "failed to delete pod %q", pod.Name) } + klog.InfoS("successfully deleted the pod.", "pod", pod.Name, "pod.ns", pod.Namespace) } else { // re-check leader count next time return reconcile.Result{RequeueAfter: c.recheckLeaderCountDuration}, nil diff --git a/pkg/controller/tidbcluster/pod_control_test.go b/pkg/controller/tidbcluster/pod_control_test.go index f508ed56664..7563452581e 100644 --- a/pkg/controller/tidbcluster/pod_control_test.go +++ b/pkg/controller/tidbcluster/pod_control_test.go @@ -36,7 +36,8 @@ import ( ) type kvClient struct { - leaderCount int32 + leaderCount int32 + logBackupFlushed atomic.Bool } var _ tikvapi.TiKVClient = &kvClient{} @@ -46,6 +47,12 @@ func (c *kvClient) GetLeaderCount() (int, error) { return int(count), nil } +// FlushLogBackupTasks implements tikvapi.TiKVClient. +func (c *kvClient) FlushLogBackupTasks(ctx context.Context) error { + c.logBackupFlushed.Store(true) + return nil +} + func TestTiKVPodSyncForEviction(t *testing.T) { interval := time.Millisecond * 100 timeout := time.Minute * 1 @@ -166,6 +173,7 @@ func TestTiKVPodSyncForEviction(t *testing.T) { stat := c.getPodStat(pod) return stat.finishAnnotationCounts }, timeout, interval).ShouldNot(Equal(0), "should finish annotation") + g.Expect(kvClient.logBackupFlushed.Load()).To(BeTrue(), "should flushed log backup tasks") } func TestTiKVPodSyncForReplaceVolume(t *testing.T) { diff --git a/pkg/manager/member/tikv_upgrader.go b/pkg/manager/member/tikv_upgrader.go index bf2fe58538f..61e821a5d00 100644 --- a/pkg/manager/member/tikv_upgrader.go +++ b/pkg/manager/member/tikv_upgrader.go @@ -14,6 +14,7 @@ package member import ( + "context" "fmt" "strconv" "time" @@ -249,7 +250,11 @@ func (u *tikvUpgrader) evictLeaderBeforeUpgrade(tc *v1alpha1.TidbCluster, upgrad } if leaderCount == 0 { - klog.Infof("%s: leader count is 0, so ready to upgrade", logPrefix) + klog.Infof("%s: leader count is 0, so ready to upgrade, triggering force flush when there are some log backup tasks", logPrefix) + err := u.triggerForceFlush(tc, upgradePod) + if err != nil { + klog.Warningf("%s: failed to trigger force flush, continuing: %s", logPrefix, err) + } return true, nil } @@ -348,6 +353,12 @@ func (u *tikvUpgrader) endEvictLeaderAfterUpgrade(tc *v1alpha1.TidbCluster, pod } +func (u *tikvUpgrader) triggerForceFlush(tc *v1alpha1.TidbCluster, pod *corev1.Pod) error { + pdcli := u.deps.TiKVControl.GetTiKVPodClient(tc.GetNamespace(), tc.GetName(), pod.GetName(), tc.Spec.ClusterDomain, tc.IsTLSClusterEnabled()) + cxLogger := klog.NewContext(context.Background(), klog.Background().WithValues("pod", pod.GetName(), "pod.ns", pod.GetNamespace())) + return pdcli.FlushLogBackupTasks(cxLogger) +} + func (u *tikvUpgrader) beginEvictLeader(tc *v1alpha1.TidbCluster, storeID uint64, pod *corev1.Pod) error { ns := tc.GetNamespace() podName := pod.GetName() diff --git a/pkg/tikvapi/fake_tikvapi.go b/pkg/tikvapi/fake_tikvapi.go index 7cde1087e61..c4fe0feb8a3 100644 --- a/pkg/tikvapi/fake_tikvapi.go +++ b/pkg/tikvapi/fake_tikvapi.go @@ -14,13 +14,15 @@ package tikvapi import ( + "context" "fmt" ) type ActionType string const ( - GetLeaderCountActionType ActionType = "GetLeaderCount" + GetLeaderCountActionType ActionType = "GetLeaderCount" + FlushLogBackupTasksActionType ActionType = "FlushLogBackupTasks" ) type NotFoundReaction struct { @@ -72,3 +74,10 @@ func (c *FakeTiKVClient) GetLeaderCount() (int, error) { } return result.(int), nil } + +// FlushLogBackupTasks implements TiKVClient. +func (c *FakeTiKVClient) FlushLogBackupTasks(ctx context.Context) error { + action := &Action{} + _, err := c.fakeAPI(FlushLogBackupTasksActionType, action) + return err +} diff --git a/pkg/tikvapi/tikv_control.go b/pkg/tikvapi/tikv_control.go index 4784807fc8f..9c81c9db32e 100644 --- a/pkg/tikvapi/tikv_control.go +++ b/pkg/tikvapi/tikv_control.go @@ -49,20 +49,27 @@ func (tc *defaultTiKVControl) GetTiKVPodClient(namespace string, tcName string, var tlsConfig *tls.Config var err error - var scheme = "http" + var configOfSchema = func(scheme string) TiKVClientOpts { + return TiKVClientOpts{ + HTTPEndpoint: TiKVPodClientURL(namespace, tcName, podName, scheme, clusterDomain), + GRPCEndpoint: TiKVGRPCClientURL(namespace, tcName, podName, scheme, clusterDomain), + Timeout: DefaultTimeout, + TLSConfig: tlsConfig, + DisableKeepAlives: true, + } + } if tlsEnabled { - scheme = "https" tlsConfig, err = pdapi.GetTLSConfig(tc.secretLister, pdapi.Namespace(namespace), util.ClusterClientTLSSecretName(tcName)) if err != nil { klog.Errorf("Unable to get tls config for TiKV cluster %q, tikv client may not work: %v", tcName, err) - return NewTiKVClient(TiKVPodClientURL(namespace, tcName, podName, scheme, clusterDomain), DefaultTimeout, tlsConfig, true) + return NewTiKVClient(configOfSchema("https")) } - return NewTiKVClient(TiKVPodClientURL(namespace, tcName, podName, scheme, clusterDomain), DefaultTimeout, tlsConfig, true) + return NewTiKVClient(configOfSchema("https")) } - return NewTiKVClient(TiKVPodClientURL(namespace, tcName, podName, scheme, clusterDomain), DefaultTimeout, tlsConfig, true) + return NewTiKVClient(configOfSchema("http")) } func tikvPodClientKey(schema, namespace, clusterName, podName string) string { @@ -71,10 +78,23 @@ func tikvPodClientKey(schema, namespace, clusterName, podName string) string { // TiKVPodClientURL builds the url of tikv pod client func TiKVPodClientURL(namespace, clusterName, podName, scheme, clusterDomain string) string { + return attachPort(tiKVBaseURL(namespace, clusterName, podName, scheme, clusterDomain), v1alpha1.DefaultTiKVStatusPort) +} + +// TiKVGRPCURL builds the url of tikv grpc client +func TiKVGRPCClientURL(namespace, clusterName, podName, scheme, clusterDomain string) string { + return attachPort(tiKVBaseURL(namespace, clusterName, podName, scheme, clusterDomain), v1alpha1.DefaultTiKVServerPort) +} + +func tiKVBaseURL(namespace, clusterName, podName, scheme, clusterDomain string) string { if clusterDomain != "" { - return fmt.Sprintf("%s://%s.%s-tikv-peer.%s.svc.%s:%d", scheme, podName, clusterName, namespace, clusterDomain, v1alpha1.DefaultTiKVStatusPort) + return fmt.Sprintf("%s://%s.%s-tikv-peer.%s.svc.%s", scheme, podName, clusterName, namespace, clusterDomain) } - return fmt.Sprintf("%s://%s.%s-tikv-peer.%s:%d", scheme, podName, clusterName, namespace, v1alpha1.DefaultTiKVStatusPort) + return fmt.Sprintf("%s://%s.%s-tikv-peer.%s", scheme, podName, clusterName, namespace) +} + +func attachPort(path string, port int32) string { + return fmt.Sprintf("%s:%d", path, port) } // FakeTiKVControl implements a fake version of TiKVControlInterface. diff --git a/pkg/tikvapi/tikvapi.go b/pkg/tikvapi/tikvapi.go index d67dbd4670b..b6f8f774643 100644 --- a/pkg/tikvapi/tikvapi.go +++ b/pkg/tikvapi/tikvapi.go @@ -14,15 +14,22 @@ package tikvapi import ( + "context" "crypto/tls" "fmt" "net" "net/http" "strconv" + "strings" "time" + "github.com/pingcap/errors" + logbackup "github.com/pingcap/kvproto/pkg/logbackuppb" dto "github.com/prometheus/client_model/go" "github.com/prometheus/prom2json" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" "k8s.io/klog/v2" ) @@ -36,12 +43,69 @@ const ( // TiKVClient provides tikv server's api type TiKVClient interface { GetLeaderCount() (int, error) + FlushLogBackupTasks(ctx context.Context) error +} + +type lazyGRPCConn struct { + target string + opts []grpc.DialOption +} + +func (l *lazyGRPCConn) conn(ctx context.Context) (*grpc.ClientConn, error) { + ctx, cancel := context.WithTimeout(ctx, DefaultTimeout) + defer cancel() + ch, err := grpc.DialContext(ctx, l.target, l.opts...) + if err != nil { + return nil, errors.Annotatef(err, "during connecting to %s", l.target) + } + + return ch, nil } // tikvClient is default implementation of TiKVClient type tikvClient struct { - url string - httpClient *http.Client + url string + httpClient *http.Client + grpcConnector *lazyGRPCConn +} + +// FlushLogBackupTasks implements TiKVClient. +func (c *tikvClient) FlushLogBackupTasks(ctx context.Context) error { + logger := klog.FromContext(ctx) + + // For now we are using one-shot sessions, because the `TiKVClient` + // interface doesn't provide a `Close` method... + conn, err := c.grpcConnector.conn(ctx) + if err != nil { + return err + } + defer func() { + if err := conn.Close(); err != nil { + logger.Error(err, "tikvClient: failed to close grpc connection") + } + }() + + cli := logbackup.NewLogBackupClient(conn) + res, err := cli.FlushNow(ctx, &logbackup.FlushNowRequest{}) + if err != nil { + return err + } + + // Fast path: no task, return early. + if len(res.Results) == 0 { + return nil + } + + for _, r := range res.Results { + if r.Success { + logger.Info("successfully flushed the log backup task.", "task", r.TaskName) + } else { + return errors.Errorf("force flush failed for task %s: %s", r.TaskName, r.ErrorMessage) + } + } + // Wait a while so TiKV is able to send the flush result to the advancer. + time.Sleep(3 * time.Second) + return nil } // GetLeaderCount gets region leader count from the URL @@ -74,15 +138,35 @@ func (c *tikvClient) GetLeaderCount() (int, error) { return 0, fmt.Errorf("metric %s{type=\"%s\"} not found for %s", metricNameRegionCount, labelNameLeaderCount, apiURL) } +type TiKVClientOpts struct { + HTTPEndpoint string + GRPCEndpoint string + Timeout time.Duration + TLSConfig *tls.Config + DisableKeepAlives bool +} + // NewTiKVClient returns a new TiKVClient -func NewTiKVClient(url string, timeout time.Duration, tlsConfig *tls.Config, disableKeepalive bool) TiKVClient { +func NewTiKVClient(opts TiKVClientOpts) TiKVClient { + var conn lazyGRPCConn + if strings.HasPrefix(opts.GRPCEndpoint, "https://") { + conn.opts = append(conn.opts, grpc.WithTransportCredentials(credentials.NewTLS(opts.TLSConfig))) + } else { + conn.opts = append(conn.opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + conn.target = opts.GRPCEndpoint + for _, prefix := range []string{"http://", "https://"} { + conn.target = strings.TrimPrefix(conn.target, prefix) + } + return &tikvClient{ - url: url, + url: opts.HTTPEndpoint, + grpcConnector: &conn, httpClient: &http.Client{ - Timeout: timeout, + Timeout: opts.Timeout, Transport: &http.Transport{ - TLSClientConfig: tlsConfig, - DisableKeepAlives: disableKeepalive, + TLSClientConfig: opts.TLSConfig, + DisableKeepAlives: opts.DisableKeepAlives, ResponseHeaderTimeout: 10 * time.Second, TLSHandshakeTimeout: 10 * time.Second, DialContext: (&net.Dialer{