From 8fbc812a09c2c0758ed3a206a24d2a9d8af93fda Mon Sep 17 00:00:00 2001 From: Tomas Turek Date: Mon, 27 Jan 2025 16:44:52 +0100 Subject: [PATCH] Add Kubernetes election module Introduce a new election module using the Kubernetes Lease API. Modify the existing mechanism to allow selection of election mechanisms by introducing the `--election_system` parameter, following the same pattern used for storage and quota system selection. --- cmd/internal/provider/default_systems.go | 3 + cmd/trillian_log_signer/main.go | 12 +- go.mod | 20 ++ go.sum | 49 +++- util/election2/etcd/election.go | 10 - util/election2/etcd/election_test.go | 14 +- util/election2/etcd/provider.go | 56 +++++ util/election2/k8s/election.go | 308 +++++++++++++++++++++++ util/election2/k8s/election_test.go | 226 +++++++++++++++++ util/election2/k8s/provider.go | 66 +++++ util/election2/noop.go | 16 +- util/election2/provider.go | 71 ++++++ util/election2/testonly/tests.go | 28 +-- 13 files changed, 842 insertions(+), 37 deletions(-) create mode 100644 util/election2/etcd/provider.go create mode 100644 util/election2/k8s/election.go create mode 100644 util/election2/k8s/election_test.go create mode 100644 util/election2/k8s/provider.go create mode 100644 util/election2/provider.go diff --git a/cmd/internal/provider/default_systems.go b/cmd/internal/provider/default_systems.go index 7c0a37c695..aeeec8102b 100644 --- a/cmd/internal/provider/default_systems.go +++ b/cmd/internal/provider/default_systems.go @@ -5,6 +5,9 @@ import ( "github.com/google/trillian/quota" "github.com/google/trillian/storage" + + _ "github.com/google/trillian/util/election2/etcd" + _ "github.com/google/trillian/util/election2/k8s" ) var ( diff --git a/cmd/trillian_log_signer/main.go b/cmd/trillian_log_signer/main.go index e772dae431..f614504887 100644 --- a/cmd/trillian_log_signer/main.go +++ b/cmd/trillian_log_signer/main.go @@ -46,7 +46,6 @@ import ( "github.com/google/trillian/util/clock" "github.com/google/trillian/util/election" "github.com/google/trillian/util/election2" - etcdelect "github.com/google/trillian/util/election2/etcd" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "k8s.io/klog/v2" @@ -66,7 +65,6 @@ var ( sequencerGuardWindowFlag = flag.Duration("sequencer_guard_window", 0, "If set, the time elapsed before submitted leaves are eligible for sequencing") forceMaster = flag.Bool("force_master", false, "If true, assume master for all logs") etcdHTTPService = flag.String("etcd_http_service", "trillian-logsigner-http", "Service name to announce our HTTP endpoint under") - lockDir = flag.String("lock_file_path", "/test/multimaster", "etcd lock file directory path") healthzTimeout = flag.Duration("healthz_timeout", time.Second*5, "Timeout used during healthz checks") quotaSystem = flag.String("quota_system", provider.DefaultQuotaSystem, fmt.Sprintf("Quota system to use. One of: %v", quota.Providers())) @@ -76,6 +74,7 @@ var ( storageSystem = flag.String("storage_system", provider.DefaultStorageSystem, fmt.Sprintf("Storage system to use. One of: %v", storage.Providers())) + electionSystem = flag.String("election_system", "etcd", fmt.Sprintf("Election system to use. One of: %v", election2.Providers())) preElectionPause = flag.Duration("pre_election_pause", 1*time.Second, "Maximum time to wait before starting elections") masterHoldInterval = flag.Duration("master_hold_interval", 60*time.Second, "Minimum interval to hold mastership for") masterHoldJitter = flag.Duration("master_hold_jitter", 120*time.Second, "Maximal random addition to --master_hold_interval") @@ -133,17 +132,16 @@ func main() { defer cancel() go util.AwaitSignal(ctx, cancel) - hostname, _ := os.Hostname() - instanceID := fmt.Sprintf("%s.%d", hostname, os.Getpid()) var electionFactory election2.Factory switch { case *forceMaster: klog.Warning("**** Acting as master for all logs ****") electionFactory = election2.NoopFactory{} - case client != nil: - electionFactory = etcdelect.NewFactory(instanceID, client, *lockDir) default: - klog.Exit("Either --force_master or --etcd_servers must be supplied") + electionFactory, err = election2.NewProvider(*electionSystem) + if err != nil { + klog.Exitf("Failed to get election provider: %v", err) + } } qm, err := quota.NewManager(*quotaSystem) diff --git a/go.mod b/go.mod index e337e267f0..bc1cda4402 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,11 @@ require ( google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 google.golang.org/protobuf v1.36.3 gopkg.in/yaml.v2 v2.4.0 + k8s.io/api v0.31.5 + k8s.io/apimachinery v0.31.5 + k8s.io/client-go v0.31.5 k8s.io/klog/v2 v2.130.1 + k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 ) require ( @@ -85,16 +89,23 @@ require ( github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/envoyproxy/go-control-plane v0.13.1 // indirect github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-openapi/jsonpointer v0.20.2 // indirect + github.com/go-openapi/jsonreference v0.20.4 // indirect + github.com/go-openapi/swag v0.22.9 // indirect github.com/gofrs/flock v0.8.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.1 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/gofuzz v1.2.0 // indirect github.com/google/licenseclassifier/v2 v2.0.0 // indirect github.com/google/pprof v0.0.0-20240528025155-186aa0362fba // indirect github.com/google/s2a-go v0.1.9 // indirect @@ -120,8 +131,10 @@ require ( github.com/jhump/protoreflect v1.16.0 // indirect github.com/jmespath/go-jmespath v0.4.1-0.20220621161143-b0104c826a24 // indirect github.com/jonboulle/clockwork v0.4.0 // indirect + github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.11 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/miekg/pkcs11 v1.1.1 // indirect github.com/mitchellh/copystructure v1.0.0 // indirect @@ -152,6 +165,7 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20220101234140-673ab2c3ae75 // indirect github.com/urfave/cli v1.22.14 // indirect + github.com/x448/float16 v0.8.4 // indirect github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 // indirect go.etcd.io/bbolt v1.3.11 // indirect go.etcd.io/etcd/api/v3 v3.5.17 // indirect @@ -178,10 +192,16 @@ require ( golang.org/x/mod v0.22.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/oauth2 v0.25.0 // indirect + golang.org/x/term v0.28.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.9.0 // indirect gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect sigs.k8s.io/yaml v1.4.0 // indirect ) diff --git a/go.sum b/go.sum index eff59d417b..3261c1c610 100644 --- a/go.sum +++ b/go.sum @@ -744,6 +744,8 @@ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= +github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -777,6 +779,8 @@ github.com/fsouza/fake-gcs-server v1.49.2 h1:fukDqzEQM50QkA0jAbl6cLqeDu3maQjwZBu github.com/fsouza/fake-gcs-server v1.49.2/go.mod h1:17SYzJEXRcaAA5ATwwvgBkSIqIy7r1icnGM0y/y4foY= github.com/fullstorydev/grpcurl v1.9.2 h1:ObqVQTZW7aFnhuqQoppUrvep2duMBanB0UYK2Mm8euo= github.com/fullstorydev/grpcurl v1.9.2/go.mod h1:jLfcF55HAz6TYIJY9xFFWgsl0D7o2HlxA5Z4lUG0Tdo= +github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= +github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-fonts/dejavu v0.1.0/go.mod h1:4Wt4I4OU2Nq9asgDCteaAaWZOV24E+0/Pwo0gppep4g= github.com/go-fonts/latin-modern v0.2.0/go.mod h1:rQVLdDMK+mK1xscDwsqM5J8U2jrRa3T0ecnM9pNujks= @@ -795,6 +799,12 @@ github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-openapi/jsonpointer v0.20.2 h1:mQc3nmndL8ZBzStEo3JYF8wzmeWffDH4VbXz58sAx6Q= +github.com/go-openapi/jsonpointer v0.20.2/go.mod h1:bHen+N0u1KEO3YlmqOjTT9Adn1RfD91Ar825/PuiRVs= +github.com/go-openapi/jsonreference v0.20.4 h1:bKlDxQxQJgwpUSgOENiMPzCTBVuc7vTdXSSgNeAhojU= +github.com/go-openapi/jsonreference v0.20.4/go.mod h1:5pZJyJP2MnYCpoeoMAql78cCHauHj0V9Lhc506VOpw4= +github.com/go-openapi/swag v0.22.9 h1:XX2DssF+mQKM2DHsbgZK74y/zj4mo9I99+89xUmuZCE= +github.com/go-openapi/swag v0.22.9/go.mod h1:3/OXnFfnMAwBD099SwYRk7GD3xOrr1iL7d/XNLXVVwE= github.com/go-pdf/fpdf v0.5.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-pdf/fpdf v0.6.0/go.mod h1:HzcnA+A23uwogo0tp9yU+l3V+KXhiESpt1PMayhOh5M= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= @@ -803,6 +813,9 @@ github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpv github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= +github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw= @@ -860,6 +873,8 @@ github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= github.com/google/flatbuffers v2.0.8+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= +github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -882,6 +897,8 @@ github.com/google/go-licenses/v2 v2.0.0-alpha.1/go.mod h1:HlMUpsa+mbs8EqdlY0BDfC github.com/google/go-replayers/httpreplay v1.2.0 h1:VM1wEyyjaoU53BwrOnaf9VhAyQQEEioJvFYxYcLRKzk= github.com/google/go-replayers/httpreplay v1.2.0/go.mod h1:WahEFFZZ7a1P4VM1qEeHy+tME4bwyqPcwWbNlUI1Mcg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/licenseclassifier/v2 v2.0.0 h1:1Y57HHILNf4m0ABuMVb6xk4vAJYEUO0gDxNpog0pyeA= github.com/google/licenseclassifier/v2 v2.0.0/go.mod h1:cOjbdH0kyC9R22sdQbYsFkto4NGCAc+ZSwbeThazEtM= github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= @@ -1036,6 +1053,8 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGw github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= +github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= +github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= @@ -1077,6 +1096,8 @@ github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lyft/protoc-gen-star v0.6.0/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star v0.6.1/go.mod h1:TGAoBVkt8w7MPG72TrKIu85MIdXwDuzJYeZuUPFPNwA= github.com/lyft/protoc-gen-star/v2 v2.0.1/go.mod h1:RcCdONR2ScXaYnQC5tUzxzlpA3WVYF7/opLeUgcQs/o= +github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= +github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= @@ -1125,6 +1146,8 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg= @@ -1180,8 +1203,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= -github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= @@ -1235,6 +1258,8 @@ github.com/transparency-dev/merkle v0.0.2 h1:Q9nBoQcZcgPamMkGn7ghV8XiTZ/kRxn1yCG github.com/transparency-dev/merkle v0.0.2/go.mod h1:pqSy+OXefQ1EDUVmAJ8MUhHB9TXGuzVAT58PqBoHz1A= github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk= github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= +github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= +github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510 h1:S2dVYn90KE98chqDkyE9Z4N61UnQd+KOfgp5Iu53llk= github.com/xiang90/probing v0.0.0-20221125231312-a49e3df8f510/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -1628,6 +1653,8 @@ golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -2024,8 +2051,12 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/cheggaaa/pb.v1 v1.0.28 h1:n1tBJnnK2r7g9OW2btFH91V92STTUevLXYFb8gy9EMk= gopkg.in/cheggaaa/pb.v1 v1.0.28/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4= +gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= @@ -2051,8 +2082,18 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= +k8s.io/api v0.31.5 h1:7jP74egbPUOCLJV5KheUnwo9gz3zzUsMIj2EPkuYK1E= +k8s.io/api v0.31.5/go.mod h1:RMyMdZG1kJjou2ng5buEti0OHlo0uFXgSzTZ/k5LeVk= +k8s.io/apimachinery v0.31.5 h1:NxhAVGcfrSdTMx3M2v1OnvcMS7h1ZnWyt2x2z8CJJBU= +k8s.io/apimachinery v0.31.5/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= +k8s.io/client-go v0.31.5 h1:rmDswcUaIFAJ5vJaB82pjyqc52DgHCPv0G6af3OupO0= +k8s.io/client-go v0.31.5/go.mod h1:js93IlRSzRHql9o9zP54N56rMR249uH4+srnSOcFLsU= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= +k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7FjZpUb45WallggurYhKGag= +k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= +k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 h1:M3sRQVHv7vB20Xc2ybTt7ODCeFj6JSWYFzOFnYeS6Ro= +k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= modernc.org/cc/v3 v3.36.0/go.mod h1:NFUHyPn4ekoC/JHeZFfZurN6ixxawE1BnVonP/oahEI= @@ -2091,5 +2132,9 @@ rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8 rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= +sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= +sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= diff --git a/util/election2/etcd/election.go b/util/election2/etcd/election.go index cadc0fdd42..8691312be2 100644 --- a/util/election2/etcd/election.go +++ b/util/election2/etcd/election.go @@ -132,16 +132,6 @@ type Factory struct { lockDir string } -// NewFactory builds an election factory that uses the given parameters. The -// passed in etcd client should remain valid for the lifetime of the object. -func NewFactory(instanceID string, client *clientv3.Client, lockDir string) *Factory { - return &Factory{ - client: client, - instanceID: instanceID, - lockDir: lockDir, - } -} - // NewElection creates a specific Election instance. func (f *Factory) NewElection(ctx context.Context, resourceID string) (election2.Election, error) { // TODO(pavelkalinnikov): Re-create the session if it expires. diff --git a/util/election2/etcd/election_test.go b/util/election2/etcd/election_test.go index 97f0a193f0..52b0efc1bf 100644 --- a/util/election2/etcd/election_test.go +++ b/util/election2/etcd/election_test.go @@ -31,7 +31,11 @@ func TestElectionThroughCommonClient(t *testing.T) { defer cleanup() ctx := context.Background() - fact := NewFactory("serv", client, "res/") + fact := Factory{ + client: client, + instanceID: "serv", + lockDir: "res/", + } el1, err := fact.NewElection(ctx, "10") if err != nil { @@ -66,9 +70,13 @@ func TestElection(t *testing.T) { for _, nt := range testonly.Tests { // Create a new Factory for each test for better isolation. - fact := NewFactory("testID", client, fmt.Sprintf("%s/resources/", nt.Name)) + fact := Factory{ + client: client, + instanceID: "testID", + lockDir: fmt.Sprintf("%s/resources/", nt.Name), + } t.Run(nt.Name, func(t *testing.T) { - nt.Run(t, fact) + nt.Run(t, &fact) }) } } diff --git a/util/election2/etcd/provider.go b/util/election2/etcd/provider.go new file mode 100644 index 0000000000..871ecb637e --- /dev/null +++ b/util/election2/etcd/provider.go @@ -0,0 +1,56 @@ +package etcd + +import ( + "errors" + "flag" + "fmt" + "github.com/google/trillian/util/election2" + clientv3 "go.etcd.io/etcd/client/v3" + "k8s.io/klog/v2" + "os" + "strings" + "time" +) + +// ElectionName identifies the etcd election implementation. +const ElectionName = "etcd" + +var ( + lockDir = flag.String("lock_file_path", "/test/multimaster", "etcd lock file directory path") +) + +func init() { + if err := election2.RegisterProvider(ElectionName, newFactory); err != nil { + klog.Fatalf("Failed to register election implementation %v: %v", ElectionName, err) + } +} + +// NewFactory builds an election factory that uses the given parameters. +func newFactory() (election2.Factory, error) { + hostname, _ := os.Hostname() + instanceID := fmt.Sprintf("%s.%d", hostname, os.Getpid()) + + servers := flag.Lookup("etcd_servers").Value.String() + + var client *clientv3.Client + var err error + if servers != "" { + if client, err = clientv3.New(clientv3.Config{ + Endpoints: strings.Split(servers, ","), + DialTimeout: 5 * time.Second, + }); err != nil { + klog.Exitf("Failed to connect to etcd at %v: %v", servers, err) + } + } + + if client == nil { + return nil, errors.New("Either --force_master or --etcd_servers must be supplied") + } + + // The passed in etcd client should remain valid for the lifetime of the object. + return &Factory{ + client: client, + instanceID: instanceID, + lockDir: *lockDir, + }, nil +} diff --git a/util/election2/k8s/election.go b/util/election2/k8s/election.go new file mode 100644 index 0000000000..95c86a7622 --- /dev/null +++ b/util/election2/k8s/election.go @@ -0,0 +1,308 @@ +package k8s + +import ( + "bytes" + "context" + "fmt" + "github.com/google/trillian/util/election2" + v1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/klog/v2" + "k8s.io/utils/clock" + "strings" + "sync" + "time" +) + +// Factory creates Election instances. +type Factory struct { + client coordinationv1.CoordinationV1Interface + instanceID string + namespace string + leaseDuration time.Duration + retryPeriod time.Duration +} + +// NewElection creates a specific Election instance. +func (f *Factory) NewElection(ctx context.Context, resourceID string) (election2.Election, error) { + el := Election{ + lock: &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: strings.ToLower(resourceID), + Namespace: f.namespace, + }, + Client: f.client, + LockConfig: resourcelock.ResourceLockConfig{ + Identity: f.instanceID, + }, + }, + clock: clock.RealClock{}, + leaseDuration: f.leaseDuration, + retryPeriod: f.retryPeriod, + client: f.client, + } + el.onLeaseChanged = sync.NewCond(&el.mu) + err := el.watchLease(ctx, el.onLeaseChanged) + if err != nil { + return nil, err + } + + return &el, nil +} + +type Election struct { + lock *resourcelock.LeaseLock + + client coordinationv1.CoordinationV1Interface + + // clock is wrapper around time to allow for less flaky testing + clock clock.Clock + + // internal bookkeeping + observedRecord resourcelock.LeaderElectionRecord + observedRawRecord []byte + observedTime time.Time + // used to lock the observedRecord + observedRecordLock sync.Mutex + + leaseDuration time.Duration + retryPeriod time.Duration + + mu sync.Mutex + onLeaseChanged *sync.Cond +} + +func (e *Election) Await(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + err := wait.PollUntilContextCancel(ctx, e.retryPeriod, true, func(ctx context.Context) (done bool, err error) { + succeeded := e.tryAcquireOrRenew(ctx) + if !succeeded { + return false, nil + } + return true, nil + }) + if err != nil { + return err + } + return nil +} + +func (e *Election) tryAcquireOrRenew(ctx context.Context) bool { + now := metav1.NewTime(e.clock.Now()) + leaderElectionRecord := resourcelock.LeaderElectionRecord{ + HolderIdentity: e.lock.Identity(), + LeaseDurationSeconds: int(e.leaseDuration / time.Second), + RenewTime: now, + AcquireTime: now, + } + + // 1. fast path for the leader to update optimistically assuming that the record observed + // last time is the current version. + if e.IsLeader() && e.isLeaseValid(now.Time) { + oldObservedRecord := e.getObservedRecord() + leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime + leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions + + err := e.lock.Update(ctx, leaderElectionRecord) + if err == nil { + e.setObservedRecord(&leaderElectionRecord) + return true + } + klog.Errorf("Failed to update lock optimistically: %v, falling back to slow path", err) + } + + // 2. obtain or create the ElectionRecord + oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := e.lock.Get(ctx) + if err != nil { + if !apierrors.IsNotFound(err) { + klog.Errorf("error retrieving resource lock %v: %v", e.lock.Describe(), err) + return false + } + if err = e.lock.Create(ctx, leaderElectionRecord); err != nil { + klog.Errorf("error initially creating leader election record: %v", err) + return false + } + + e.setObservedRecord(&leaderElectionRecord) + + return true + } + + // 3. Record obtained, check the Identity & Time + if !bytes.Equal(e.observedRawRecord, oldLeaderElectionRawRecord) { + e.setObservedRecord(oldLeaderElectionRecord) + + e.observedRawRecord = oldLeaderElectionRawRecord + } + if len(oldLeaderElectionRecord.HolderIdentity) > 0 && e.isLeaseValid(now.Time) && !e.IsLeader() { + klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) + return false + } + + // 4. We're going to try to update. The leaderElectionRecord is set to it's default + // here. Let's correct it before updating. + if e.IsLeader() { + leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + } else { + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 + } + + // update the lock itself + if err = e.lock.Update(ctx, leaderElectionRecord); err != nil { + klog.Errorf("Failed to update lock: %v", err) + return false + } + + e.setObservedRecord(&leaderElectionRecord) + return true +} + +// WithMastership returns a context that is canceled if mastership is lost. +func (e *Election) WithMastership(ctx context.Context) (context.Context, error) { + e.mu.Lock() + defer e.mu.Unlock() + if !e.IsLeader() { + cctx, cancel := context.WithCancel(ctx) + cancel() + return cctx, nil + } + + cctx, cancel := e.watchContext(ctx, &e.mu, e.onLeaseChanged) + + go func() { // Goroutine to cancel the mastership context when leadership is lost. + defer cancel() + e.mu.Lock() + defer e.mu.Unlock() + + now := metav1.NewTime(e.clock.Now()) + for e.IsLeader() && e.isLeaseValid(now.Time) && cctx.Err() == nil { + e.onLeaseChanged.Wait() + } + }() + + return cctx, nil +} + +func (e *Election) watchContext(ctx context.Context, l sync.Locker, cond *sync.Cond) (context.Context, context.CancelFunc) { + cctx, cancel := context.WithCancel(ctx) + go func() { + defer cancel() + <-cctx.Done() + l.Lock() // Avoid racing with cond waiters on ctx status. + defer l.Unlock() + cond.Broadcast() + }() + return cctx, cancel +} + +func (e *Election) watchLease(ctx context.Context, onLeaseChanged *sync.Cond) error { + watcher, err := e.client.Leases(e.lock.LeaseMeta.Namespace).Watch(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("metadata.name=%s", e.lock.LeaseMeta.Name), + }) + if err != nil { + return fmt.Errorf("lease watcher `%s` failed: %w", e.lock.LockConfig.Identity, err) + } + go func() { + defer watcher.Stop() + defer onLeaseChanged.Broadcast() + + channel := watcher.ResultChan() + for { + select { + case <-ctx.Done(): + return + case event, ok := <-channel: + if !ok { + // channel closed + return + } + switch event.Type { + case watch.Modified, watch.Added: + lease, ok := event.Object.(*v1.Lease) + if !ok { + continue + } + record := resourcelock.LeaseSpecToLeaderElectionRecord(&lease.Spec) + e.setObservedRecord(record) + case watch.Deleted: + e.setObservedRecord(&resourcelock.LeaderElectionRecord{}) + } + } + } + }() + return nil +} + +func (e *Election) Resign(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + defer e.onLeaseChanged.Broadcast() + + if !e.IsLeader() { + return nil + } + + now := metav1.NewTime(e.clock.Now()) + leaderElectionRecord := resourcelock.LeaderElectionRecord{ + LeaderTransitions: e.observedRecord.LeaderTransitions, + LeaseDurationSeconds: 1, + RenewTime: now, + AcquireTime: now, + } + + if err := e.lock.Update(ctx, leaderElectionRecord); err != nil { + return fmt.Errorf("failed to release lock: %w", err) + } + + e.setObservedRecord(&leaderElectionRecord) + return nil +} + +func (e *Election) Close(ctx context.Context) error { + return e.Resign(ctx) +} + +func (e *Election) isLeaseValid(now time.Time) bool { + return e.observedTime.Add(time.Second * time.Duration(e.getObservedRecord().LeaseDurationSeconds)).After(now) +} + +// setObservedRecord will set a new observedRecord and update observedTime to the current time. +// Protect critical sections with lock. +func (e *Election) setObservedRecord(observedRecord *resourcelock.LeaderElectionRecord) { + e.observedRecordLock.Lock() + defer e.observedRecordLock.Unlock() + + e.observedRecord = *observedRecord + e.observedTime = e.clock.Now() + + e.onLeaseChanged.Broadcast() +} + +// getObservedRecord returns observersRecord. +// Protect critical sections with lock. +func (e *Election) getObservedRecord() resourcelock.LeaderElectionRecord { + e.observedRecordLock.Lock() + defer e.observedRecordLock.Unlock() + + return e.observedRecord +} + +// GetLeader returns the identity of the last observed leader or returns the empty string if +// no leader has yet been observed. +// This function is for informational purposes. (e.g. monitoring, logs, etc.) +func (e *Election) GetLeader() string { + return e.getObservedRecord().HolderIdentity +} + +// IsLeader returns true if the last observed leader was this client else returns false. +func (e *Election) IsLeader() bool { + return e.getObservedRecord().HolderIdentity == e.lock.Identity() +} diff --git a/util/election2/k8s/election_test.go b/util/election2/k8s/election_test.go new file mode 100644 index 0000000000..587bfd2413 --- /dev/null +++ b/util/election2/k8s/election_test.go @@ -0,0 +1,226 @@ +// Copyright 2018 Google LLC. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8s + +import ( + "context" + "errors" + "github.com/google/trillian/util/election2" + "github.com/google/trillian/util/election2/testonly" + v1 "k8s.io/api/coordination/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/fake" + "testing" + "time" +) + +var Tests []testonly.NamedTest + +func init() { + Tests = append(testonly.Tests, + testonly.NamedTest{Name: "RunLeaseEvents", Run: runLeaseEvents}, + testonly.NamedTest{Name: "RunParallelElections", Run: runParallelElections}, + testonly.NamedTest{Name: "RunWithMastershipMultiple", Run: runWithMastershipMultiple}, + ) +} + +func runLeaseEvents(t *testing.T, f election2.Factory) { + for _, tc := range []struct { + desc string + event watch.EventType + done bool + }{ + {desc: "modified", event: watch.Modified, done: false}, + {desc: "modified-cancel", event: watch.Modified, done: true}, + {desc: "deleted", event: watch.Deleted, done: true}, + } { + t.Run(tc.desc, func(t *testing.T) { + factory := f.(*Factory) + client := factory.client + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + e, err := f.NewElection(ctx, tc.desc) + if err != nil { + t.Fatalf("NewElection(): %v", err) + } + if err = e.Await(ctx); err != nil { + t.Fatalf("Await(): %v", err) + } + mctx, err := e.WithMastership(ctx) + if err != nil { + t.Errorf("WithMastership): %v", err) + } + testonly.CheckNotDone(mctx, t) + + // Simulate events + lease := &v1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.desc, + Namespace: factory.namespace, + }, + Spec: v1.LeaseSpec{ + HolderIdentity: &factory.instanceID, + }, + } + + newHolder := "new-holder-identity" + if tc.done { + lease.Spec.HolderIdentity = &newHolder + } + + switch tc.event { + case watch.Modified: + _, err = client.Leases(factory.namespace).Update(ctx, lease, metav1.UpdateOptions{}) + if err != nil { + t.Errorf("failed to update lease %v", err) + } + case watch.Deleted: + err = client.Leases(factory.namespace).Delete(ctx, tc.desc, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("failed to delete lease %v", err) + } + } + + if tc.done { + testonly.CheckDone(mctx, t, 1*time.Second) + } else { + testonly.CheckNotDone(mctx, t) + } + }) + } +} + +func runParallelElections(t *testing.T, f election2.Factory) { + for _, tc := range []struct { + desc string + block bool + err error + }{ + {desc: "same", block: true, err: context.DeadlineExceeded}, + {desc: "different", block: false}, + } { + t.Run(tc.desc, func(t *testing.T) { + resource1, resource2 := "resource-1", "resource-2" + + factory1 := *f.(*Factory) + factory1.instanceID = "instance-1" + + factory2 := *f.(*Factory) + factory2.instanceID = "instance-2" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + e1, err := factory1.NewElection(ctx, resource1) + if err != nil { + t.Fatalf("NewElection(): %v", err) + } + + if tc.block { + resource2 = resource1 + } + e2, err := factory2.NewElection(ctx, resource2) + if err != nil { + t.Fatalf("NewElection(): %v", err) + } + + if got := e1.Await(ctx); got != nil { + t.Errorf("Await(): %v, want %v", got, nil) + } + + if tc.block { + ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond) + defer cancel() + } + + if got, want := e2.Await(ctx), tc.err; !errors.Is(got, want) { + t.Errorf("Await(): %v, want %v", got, want) + } + }) + } +} + +func runWithMastershipMultiple(t *testing.T, f election2.Factory) { + ctxErr := errors.New("WithMastership error") + for _, tc := range []struct { + desc string + beMaster bool + cancel bool + err error + wantErr error + }{ + {desc: "master", beMaster: true}, + {desc: "master-cancel", beMaster: true, cancel: true}, + {desc: "not-master", beMaster: false}, + {desc: "not-master-cancel", cancel: true}, + {desc: "error", err: ctxErr, wantErr: ctxErr}, + } { + t.Run(tc.desc, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + e, err := f.NewElection(ctx, tc.desc) + if err != nil { + t.Fatalf("NewElection(): %v", err) + } + d := testonly.NewDecorator(e) + d.Update(testonly.Errs{WithMastership: tc.err}) + if tc.beMaster { + if err := d.Await(ctx); err != nil { + t.Fatalf("Await(): %v", err) + } + } + mctx1, err := d.WithMastership(ctx) + if want := tc.wantErr; err != want { + t.Fatalf("WithMastership(): %v, want %v", err, want) + } + mctx2, err := d.WithMastership(ctx) + if want := tc.wantErr; err != want { + t.Fatalf("WithMastership(): %v, want %v", err, want) + } + if err != nil { + return + } + if tc.cancel { + cancel() + } + if tc.beMaster && !tc.cancel { + testonly.CheckNotDone(mctx1, t) + testonly.CheckNotDone(mctx2, t) + } else { + testonly.CheckDone(mctx1, t, 0) + testonly.CheckDone(mctx2, t, 0) + } + }) + } +} + +func TestElection(t *testing.T) { + for _, nt := range Tests { + c := fake.NewClientset() + + fact := &Factory{ + client: c.CoordinationV1(), + instanceID: "testID", + namespace: "default", + leaseDuration: 15 * time.Second, + retryPeriod: 2 * time.Second, + } + t.Run(nt.Name, func(t *testing.T) { + nt.Run(t, fact) + }) + } +} diff --git a/util/election2/k8s/provider.go b/util/election2/k8s/provider.go new file mode 100644 index 0000000000..ced9c19f81 --- /dev/null +++ b/util/election2/k8s/provider.go @@ -0,0 +1,66 @@ +package k8s + +import ( + "flag" + "fmt" + "github.com/google/trillian/util/election2" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" + "time" +) + +// ElectionName identifies the kubernetes election implementation. +const ElectionName = "k8s" + +var ( + kubeconfig = flag.String("kubeconfig", "", "Paths to a kubeconfig. Only required if out-of-cluster.") + namespace = flag.String("lock_namespace", "", "The lease lock resource namespace. Only effective for election_system=k8s.") + instanceID = flag.String("lock_holder_identity", "", "The identity of the holder of a current lease") +) + +func init() { + if err := election2.RegisterProvider(ElectionName, newFactory); err != nil { + klog.Fatalf("Failed to register election implementation %v: %v", ElectionName, err) + } +} + +// NewFactory builds an election factory that uses the given parameters. +func newFactory() (election2.Factory, error) { + var instance = *instanceID + if instance == "" { + instance = "trillian-logsigner-" + string(uuid.NewUUID()) + } + + if *namespace == "" { + return nil, fmt.Errorf("namespace for lease lock need to be configured") + } + + config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) + if err != nil { + panic(err) + } + + clientset := kubernetes.NewForConfigOrDie(config) + + holdInterval := flag.Lookup("master_hold_interval").Value + holdIntervalDuration, err := time.ParseDuration(holdInterval.String()) + if err != nil { + return nil, fmt.Errorf("master_hold_interval: %w", err) + } + + holdJitter := flag.Lookup("master_hold_jitter").Value + holdJitterDuration, err := time.ParseDuration(holdJitter.String()) + if err != nil { + return nil, fmt.Errorf("master_hold_jitter: %w", err) + } + + return &Factory{ + client: clientset.CoordinationV1(), + namespace: *namespace, + instanceID: instance, + leaseDuration: holdJitterDuration, + retryPeriod: holdIntervalDuration, + }, nil +} diff --git a/util/election2/noop.go b/util/election2/noop.go index 0e0a88f417..ac151c09d8 100644 --- a/util/election2/noop.go +++ b/util/election2/noop.go @@ -14,7 +14,21 @@ package election2 -import "context" +import ( + "context" + "k8s.io/klog/v2" +) + +// noopElectionName represents the noop election implementation. +const noopElectionName = "noop" + +func init() { + if err := RegisterProvider(noopElectionName, func() (Factory, error) { + return NoopFactory{}, nil + }); err != nil { + klog.Fatalf("Failed to register %q: %v", noopElectionName, err) + } +} // NoopElection is a stub Election that always believes to be the master. type NoopElection string diff --git a/util/election2/provider.go b/util/election2/provider.go new file mode 100644 index 0000000000..87d8e254b4 --- /dev/null +++ b/util/election2/provider.go @@ -0,0 +1,71 @@ +// Copyright 2018 Google LLC. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package election2 + +import ( + "fmt" + "sync" +) + +var ( + qpMu sync.RWMutex + qpByName map[string]NewFactoryFunc +) + +// NewFactoryFunc is the signature of a function which can be registered +// to provide instances of an election factory. +type NewFactoryFunc func() (Factory, error) + +// RegisterProvider registers a function that provides Manager instances. +func RegisterProvider(name string, qp NewFactoryFunc) error { + qpMu.Lock() + defer qpMu.Unlock() + + if qpByName == nil { + qpByName = make(map[string]NewFactoryFunc) + } + + _, exists := qpByName[name] + if exists { + return fmt.Errorf("election provider %v already registered", name) + } + qpByName[name] = qp + return nil +} + +// Providers returns a slice of registered quota provider names. +func Providers() []string { + qpMu.RLock() + defer qpMu.RUnlock() + + r := []string{} + for k := range qpByName { + r = append(r, k) + } + + return r +} + +// NewProvider returns a Manager implementation. +func NewProvider(name string) (Factory, error) { + qpMu.RLock() + defer qpMu.RUnlock() + + f, exists := qpByName[name] + if !exists { + return nil, fmt.Errorf("unknown election provider: %v", name) + } + return f() +} diff --git a/util/election2/testonly/tests.go b/util/election2/testonly/tests.go index a765a0d946..c7865c843c 100644 --- a/util/election2/testonly/tests.go +++ b/util/election2/testonly/tests.go @@ -40,16 +40,16 @@ type NamedTest struct { Run func(t *testing.T, f election2.Factory) } -// checkNotDone ensures that the context is not done for some time. -func checkNotDone(ctx context.Context, t *testing.T) { +// CheckNotDone ensures that the context is not done for some time. +func CheckNotDone(ctx context.Context, t *testing.T) { t.Helper() if err := clock.SleepContext(ctx, 100*time.Millisecond); err != nil { t.Error("unexpected context cancelation") } } -// checkDone ensures that the context is done within the specified duration. -func checkDone(ctx context.Context, t *testing.T, wait time.Duration) { +// CheckDone ensures that the context is done within the specified duration. +func CheckDone(ctx context.Context, t *testing.T, wait time.Duration) { t.Helper() if wait == 0 { select { @@ -142,9 +142,9 @@ func runElectionWithMastership(t *testing.T, f election2.Factory) { cancel() } if tc.beMaster && !tc.cancel { - checkNotDone(mctx, t) + CheckNotDone(mctx, t) } else { - checkDone(mctx, t, 0) + CheckDone(mctx, t, 0) } }) } @@ -186,11 +186,11 @@ func runElectionResign(t *testing.T, f election2.Factory) { t.Errorf("Resign(): %v, want %v", got, want) } if tc.beMaster && tc.wantErr == nil { - checkDone(mctx, t, 1*time.Second) + CheckDone(mctx, t, 1*time.Second) } else if tc.beMaster { - checkNotDone(mctx, t) + CheckNotDone(mctx, t) } else { - checkDone(mctx, t, 0) + CheckDone(mctx, t, 0) } }) } @@ -241,11 +241,11 @@ func runElectionClose(t *testing.T, f election2.Factory) { t.Errorf("Close(): %v, want %v", got, want) } if tc.beMaster && tc.wantErr == nil { - checkDone(mctx, t, 1*time.Second) + CheckDone(mctx, t, 1*time.Second) } else if tc.beMaster { - checkNotDone(mctx, t) + CheckNotDone(mctx, t) } else { - checkDone(mctx, t, 0) + CheckDone(mctx, t, 0) } }) } @@ -275,10 +275,10 @@ func runElectionLoop(t *testing.T, f election2.Factory) { if err != nil { t.Fatalf("WithMastership(): %v", err) } - checkNotDone(mctx, t) // Do some work as master. + CheckNotDone(mctx, t) // Do some work as master. if err := d.Resign(ctx); err != nil { t.Errorf("Resign(): %v", err) } - checkDone(mctx, t, 1*time.Second) // The mastership context should close. + CheckDone(mctx, t, 1*time.Second) // The mastership context should close. } }