From cd6fbb1f445b4e33863c1af659e81d242f798f17 Mon Sep 17 00:00:00 2001 From: Mustafa Elbehery Date: Wed, 21 Feb 2024 15:30:06 +0100 Subject: [PATCH] add raft example --- .../etcdserverpb/raft_internal_stringer.go | 185 ----------- raftexample/go.mod | 15 + raftexample/go.sum | 57 ++++ raftexample/raft.go | 87 ++++-- raftexample/verify/verify.go | 83 ----- raftexample/wal/version.go | 288 ------------------ 6 files changed, 138 insertions(+), 577 deletions(-) delete mode 100644 raftexample/etcdserverpb/raft_internal_stringer.go create mode 100644 raftexample/go.mod create mode 100644 raftexample/go.sum delete mode 100644 raftexample/verify/verify.go delete mode 100644 raftexample/wal/version.go diff --git a/raftexample/etcdserverpb/raft_internal_stringer.go b/raftexample/etcdserverpb/raft_internal_stringer.go deleted file mode 100644 index 3ab1631f..00000000 --- a/raftexample/etcdserverpb/raft_internal_stringer.go +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright 2018 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package etcdserverpb - -import ( - "fmt" - "strings" - - proto "github.com/golang/protobuf/proto" -) - -// InternalRaftStringer implements custom proto Stringer: -// redact password, replace value fields with value_size fields. -type InternalRaftStringer struct { - Request *InternalRaftRequest -} - -func (as *InternalRaftStringer) String() string { - switch { - case as.Request.LeaseGrant != nil: - return fmt.Sprintf("header:<%s> lease_grant:", - as.Request.Header.String(), - as.Request.LeaseGrant.TTL, - as.Request.LeaseGrant.ID, - ) - case as.Request.LeaseRevoke != nil: - return fmt.Sprintf("header:<%s> lease_revoke:", - as.Request.Header.String(), - as.Request.LeaseRevoke.ID, - ) - case as.Request.Authenticate != nil: - return fmt.Sprintf("header:<%s> authenticate:", - as.Request.Header.String(), - as.Request.Authenticate.Name, - as.Request.Authenticate.SimpleToken, - ) - case as.Request.AuthUserAdd != nil: - return fmt.Sprintf("header:<%s> auth_user_add:", - as.Request.Header.String(), - as.Request.AuthUserAdd.Name, - ) - case as.Request.AuthUserChangePassword != nil: - return fmt.Sprintf("header:<%s> auth_user_change_password:", - as.Request.Header.String(), - as.Request.AuthUserChangePassword.Name, - ) - case as.Request.Put != nil: - return fmt.Sprintf("header:<%s> put:<%s>", - as.Request.Header.String(), - NewLoggablePutRequest(as.Request.Put).String(), - ) - case as.Request.Txn != nil: - return fmt.Sprintf("header:<%s> txn:<%s>", - as.Request.Header.String(), - NewLoggableTxnRequest(as.Request.Txn).String(), - ) - default: - // nothing to redact - } - return as.Request.String() -} - -// txnRequestStringer implements a custom proto String to replace value bytes fields with value size -// fields in any nested txn and put operations. -type txnRequestStringer struct { - Request *TxnRequest -} - -//nolint:revive -func NewLoggableTxnRequest(request *TxnRequest) *txnRequestStringer { - return &txnRequestStringer{request} -} - -func (as *txnRequestStringer) String() string { - var compare []string - for _, c := range as.Request.Compare { - switch cv := c.TargetUnion.(type) { - case *Compare_Value: - compare = append(compare, newLoggableValueCompare(c, cv).String()) - default: - // nothing to redact - compare = append(compare, c.String()) - } - } - var success []string - for _, s := range as.Request.Success { - success = append(success, newLoggableRequestOp(s).String()) - } - var failure []string - for _, f := range as.Request.Failure { - failure = append(failure, newLoggableRequestOp(f).String()) - } - return fmt.Sprintf("compare:<%s> success:<%s> failure:<%s>", - strings.Join(compare, " "), - strings.Join(success, " "), - strings.Join(failure, " "), - ) -} - -// requestOpStringer implements a custom proto String to replace value bytes fields with value -// size fields in any nested txn and put operations. -type requestOpStringer struct { - Op *RequestOp -} - -func newLoggableRequestOp(op *RequestOp) *requestOpStringer { - return &requestOpStringer{op} -} - -func (as *requestOpStringer) String() string { - switch op := as.Op.Request.(type) { - case *RequestOp_RequestPut: - return fmt.Sprintf("request_put:<%s>", NewLoggablePutRequest(op.RequestPut).String()) - case *RequestOp_RequestTxn: - return fmt.Sprintf("request_txn:<%s>", NewLoggableTxnRequest(op.RequestTxn).String()) - default: - // nothing to redact - } - return as.Op.String() -} - -// loggableValueCompare implements a custom proto String for Compare.Value union member types to -// replace the value bytes field with a value size field. -// To preserve proto encoding of the key and range_end bytes, a faked out proto type is used here. -type loggableValueCompare struct { - Result Compare_CompareResult `protobuf:"varint,1,opt,name=result,proto3,enum=etcdserverpb.Compare_CompareResult"` - Target Compare_CompareTarget `protobuf:"varint,2,opt,name=target,proto3,enum=etcdserverpb.Compare_CompareTarget"` - Key []byte `protobuf:"bytes,3,opt,name=key,proto3"` - ValueSize int64 `protobuf:"varint,7,opt,name=value_size,proto3"` - RangeEnd []byte `protobuf:"bytes,64,opt,name=range_end,proto3"` -} - -func newLoggableValueCompare(c *Compare, cv *Compare_Value) *loggableValueCompare { - return &loggableValueCompare{ - c.Result, - c.Target, - c.Key, - int64(len(cv.Value)), - c.RangeEnd, - } -} - -func (m *loggableValueCompare) Reset() { *m = loggableValueCompare{} } -func (m *loggableValueCompare) String() string { return proto.CompactTextString(m) } -func (*loggableValueCompare) ProtoMessage() {} - -// loggablePutRequest implements a custom proto String to replace value bytes field with a value -// size field. -// To preserve proto encoding of the key bytes, a faked out proto type is used here. -type loggablePutRequest struct { - Key []byte `protobuf:"bytes,1,opt,name=key,proto3"` - ValueSize int64 `protobuf:"varint,2,opt,name=value_size,proto3"` - Lease int64 `protobuf:"varint,3,opt,name=lease,proto3"` - PrevKv bool `protobuf:"varint,4,opt,name=prev_kv,proto3"` - IgnoreValue bool `protobuf:"varint,5,opt,name=ignore_value,proto3"` - IgnoreLease bool `protobuf:"varint,6,opt,name=ignore_lease,proto3"` -} - -//nolint:revive -func NewLoggablePutRequest(request *PutRequest) *loggablePutRequest { - return &loggablePutRequest{ - request.Key, - int64(len(request.Value)), - request.Lease, - request.PrevKv, - request.IgnoreValue, - request.IgnoreLease, - } -} - -func (m *loggablePutRequest) Reset() { *m = loggablePutRequest{} } -func (m *loggablePutRequest) String() string { return proto.CompactTextString(m) } -func (*loggablePutRequest) ProtoMessage() {} diff --git a/raftexample/go.mod b/raftexample/go.mod new file mode 100644 index 00000000..e15e7449 --- /dev/null +++ b/raftexample/go.mod @@ -0,0 +1,15 @@ +module go.etcd.io/raft/raftexample + +go 1.21.1 + +require ( + github.com/golang/protobuf v1.5.3 + go.etcd.io/raft/v3 v3.6.0-alpha.0 + go.uber.org/zap v1.26.0 +) + +require ( + github.com/gogo/protobuf v1.3.2 // indirect + go.uber.org/multierr v1.10.0 // indirect + google.golang.org/protobuf v1.27.1 // indirect +) diff --git a/raftexample/go.sum b/raftexample/go.sum new file mode 100644 index 00000000..8e5a1b86 --- /dev/null +++ b/raftexample/go.sum @@ -0,0 +1,57 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.etcd.io/raft/v3 v3.6.0-alpha.0 h1:cMmjAEjCKMGiQPowjSWM43Y5ZnBEeNP8RSYcm3ewtns= +go.etcd.io/raft/v3 v3.6.0-alpha.0/go.mod h1:QpxpKeYmocQQFHP75LxNrdJTukZmqQig9lotwYLsUJY= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.26.0 h1:sI7k6L95XOKS281NhVKOFCUNIvv9e0w4BF8N3u+tCRo= +go.uber.org/zap v1.26.0/go.mod h1:dtElttAiwGvoJ/vj4IwHBS/gXsEu/pZ50mUIRWuG0so= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/raftexample/raft.go b/raftexample/raft.go index dfcbd80c..228d1bc2 100644 --- a/raftexample/raft.go +++ b/raftexample/raft.go @@ -21,22 +21,18 @@ import ( "net/http" "net/url" "os" + "path/filepath" "strconv" "time" "go.etcd.io/raft/v3" - "go.etcd.io/raft/v3/raftexample/fileutil" - "go.etcd.io/raft/v3/raftexample/rafthttp" - "go.etcd.io/raft/v3/raftexample/snap" - "go.etcd.io/raft/v3/raftexample/types" - stats "go.etcd.io/raft/v3/raftexample/v2stats" - "go.etcd.io/raft/v3/raftexample/wal" - "go.etcd.io/raft/v3/raftexample/wal/walpb" "go.etcd.io/raft/v3/raftpb" "go.uber.org/zap" ) +const snapSuffix = ".snap" + type commit struct { data []string applyDoneC chan<- struct{} @@ -69,10 +65,8 @@ type raftNode struct { // raft backing for the commit/error channel node raft.Node raftStorage *raft.MemoryStorage - wal *wal.WAL snapCount uint64 - transport *rafthttp.Transport stopc chan struct{} // signals proposal channel closed httpstopc chan struct{} // signals http server to shutdown httpdonec chan struct{} // signals http server shutdown complete @@ -89,10 +83,6 @@ type SnapshotStorage interface { // Load reads and returns the newest snapshot that is // available. Load() (*raftpb.Snapshot, error) - - // LoadNewestAvailable loads the newest available snapshot - // whose term and index matches one of those in walSnaps. - LoadNewestAvailable(walSnaps []walpb.Snapshot) (*raftpb.Snapshot, error) } var defaultSnapshotCount uint64 = 10000 @@ -154,10 +144,7 @@ func startRaftNode( rc.loadAndApplySnapshot() - oldwal := wal.Exist(rc.waldir) - rc.wal = rc.replayWAL() - - go rc.startRaft(oldwal) + go rc.startRaft() return rc } @@ -373,7 +360,7 @@ func (rc *raftNode) writeError(err error) { rc.node.Stop() } -func (rc *raftNode) startRaft(oldwal bool) { +func (rc *raftNode) startRaft() { rpeers := make([]raft.Peer, len(rc.peers)) for i := range rpeers { rpeers[i] = raft.Peer{ID: uint64(i + 1)} @@ -614,10 +601,68 @@ func (rc *raftNode) ReportSnapshot(id uint64, status raft.SnapshotStatus) { } func newSnapshotStorage(lg *zap.Logger, dir string) (SnapshotStorage, error) { - if !fileutil.Exist(dir) { - if err := os.Mkdir(dir, 0750); err != nil { + _, err := os.Stat(dir) + if os.IsNotExist(err) { + if err = os.Mkdir(dir, 0750); err != nil { return nil, fmt.Errorf("cannot create dir for snapshot: %w", err) } } - return snap.New(lg, dir), nil + return New(lg, dir), nil +} + +type snapshotter struct { + lg *zap.Logger + dir string +} + +func New(lg *zap.Logger, dir string) SnapshotStorage { + if lg == nil { + lg = zap.NewNop() + } + return &snapshotter{ + lg: lg, + dir: dir, + } +} + +func (s *snapshotter) SaveSnap(snapshot raftpb.Snapshot) error { + if raft.IsEmptySnap(snapshot) { + return nil + } + return s.save(&snapshot) +} + +func (s *snapshotter) Load() (*raftpb.Snapshot, error) { + snapName := fmt.Sprintf("%s%s", "snapshot", snapSuffix) + fpath := filepath.Join(s.dir, snapName) + data, err := os.ReadFile(fpath) + if err != nil { + s.lg.Warn("failed to read a snap file", zap.String("path", snapName), zap.Error(err)) + return nil, err + } + var snap raftpb.Snapshot + if err = snap.Unmarshal(data); err != nil { + s.lg.Warn("failed to unmarshal raftpb.Snapshot", zap.String("path", snapName), zap.Error(err)) + return nil, err + } + return &snap, nil +} + +func (s *snapshotter) save(snapshot *raftpb.Snapshot) error { + fname := fmt.Sprintf("%s%s", "snapshot", snapSuffix) + data, err := snapshot.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal snapshot: %w", err) + } + spath := filepath.Join(s.dir, fname) + err = os.WriteFile(spath, data, 0666) + if err != nil { + s.lg.Warn("failed to write a snap file", zap.String("path", spath), zap.Error(err)) + rerr := os.Remove(spath) + if rerr != nil { + s.lg.Warn("failed to remove a broken snap file", zap.String("path", spath), zap.Error(rerr)) + } + return err + } + return nil } diff --git a/raftexample/verify/verify.go b/raftexample/verify/verify.go deleted file mode 100644 index 85aff5b0..00000000 --- a/raftexample/verify/verify.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2022 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package verify - -import ( - "fmt" - "os" - "strings" -) - -//nolint:revive -const ENV_VERIFY = "ETCD_VERIFY" - -type VerificationType string - -const ( - //nolint:revive - ENV_VERIFY_VALUE_ALL VerificationType = "all" - //nolint:revive - ENV_VERIFY_VALUE_ASSERT VerificationType = "assert" -) - -func getEnvVerify() string { - return strings.ToLower(os.Getenv(ENV_VERIFY)) -} - -func IsVerificationEnabled(verification VerificationType) bool { - env := getEnvVerify() - return env == string(ENV_VERIFY_VALUE_ALL) || env == strings.ToLower(string(verification)) -} - -// EnableVerifications sets `ENV_VERIFY` and returns a function that -// can be used to bring the original settings. -func EnableVerifications(verification VerificationType) func() { - previousEnv := getEnvVerify() - os.Setenv(ENV_VERIFY, string(verification)) - return func() { - os.Setenv(ENV_VERIFY, previousEnv) - } -} - -// EnableAllVerifications enables verification and returns a function -// that can be used to bring the original settings. -func EnableAllVerifications() func() { - return EnableVerifications(ENV_VERIFY_VALUE_ALL) -} - -// DisableVerifications unsets `ENV_VERIFY` and returns a function that -// can be used to bring the original settings. -func DisableVerifications() func() { - previousEnv := getEnvVerify() - os.Unsetenv(ENV_VERIFY) - return func() { - os.Setenv(ENV_VERIFY, previousEnv) - } -} - -// Verify performs verification if the assertions are enabled. -// In the default setup running in tests and skipped in the production code. -func Verify(f func()) { - if IsVerificationEnabled(ENV_VERIFY_VALUE_ASSERT) { - f() - } -} - -// Assert will panic with a given formatted message if the given condition is false. -func Assert(condition bool, msg string, v ...interface{}) { - if !condition { - panic(fmt.Sprintf("assertion failed: "+msg, v...)) - } -} diff --git a/raftexample/wal/version.go b/raftexample/wal/version.go deleted file mode 100644 index bcf86e02..00000000 --- a/raftexample/wal/version.go +++ /dev/null @@ -1,288 +0,0 @@ -// Copyright 2021 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package wal - -import ( - "fmt" - "strings" - - "github.com/coreos/go-semver/semver" - "github.com/golang/protobuf/proto" - "google.golang.org/protobuf/reflect/protoreflect" - "google.golang.org/protobuf/types/descriptorpb" - - "go.etcd.io/raft/v3/raftexample/version" - - "go.etcd.io/raft/v3/raftexample/etcdserverpb" - "go.etcd.io/raft/v3/raftexample/pbutil" - "go.etcd.io/raft/v3/raftpb" -) - -// ReadWALVersion reads remaining entries from opened WAL and returns struct -// that implements schema.WAL interface. -// -//nolint:revive -func ReadWALVersion(w *WAL) (*walVersion, error) { - _, _, ents, err := w.ReadAll() - if err != nil { - return nil, err - } - return &walVersion{entries: ents}, nil -} - -type walVersion struct { - entries []raftpb.Entry -} - -// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log, -func (w *walVersion) MinimalEtcdVersion() *semver.Version { - return MinimalEtcdVersion(w.entries) -} - -// MinimalEtcdVersion returns minimal etcd able to interpret entries from WAL log, -// determined by looking at entries since the last snapshot and returning the highest -// etcd version annotation from used messages, fields, enums and their values. -func MinimalEtcdVersion(ents []raftpb.Entry) *semver.Version { - var maxVer *semver.Version - for _, ent := range ents { - err := visitEntry(ent, func(path protoreflect.FullName, ver *semver.Version) error { - maxVer = maxVersion(maxVer, ver) - return nil - }) - if err != nil { - panic(err) - } - } - return maxVer -} - -type Visitor func(path protoreflect.FullName, ver *semver.Version) error - -// VisitFileDescriptor calls visitor on each field and enum value with etcd version read from proto definition. -// If field/enum value is not annotated, visitor will be called with nil. -// Upon encountering invalid annotation, will immediately exit with error. -func VisitFileDescriptor(file protoreflect.FileDescriptor, visitor Visitor) error { - msgs := file.Messages() - for i := 0; i < msgs.Len(); i++ { - err := visitMessageDescriptor(msgs.Get(i), visitor) - if err != nil { - return err - } - } - enums := file.Enums() - for i := 0; i < enums.Len(); i++ { - err := visitEnumDescriptor(enums.Get(i), visitor) - if err != nil { - return err - } - } - return nil -} - -func visitEntry(ent raftpb.Entry, visitor Visitor) error { - err := visitMessage(proto.MessageReflect(&ent), visitor) - if err != nil { - return err - } - return visitEntryData(ent.Type, ent.Data, visitor) -} - -func visitEntryData(entryType raftpb.EntryType, data []byte, visitor Visitor) error { - var msg protoreflect.Message - switch entryType { - case raftpb.EntryNormal: - var raftReq etcdserverpb.InternalRaftRequest - if err := pbutil.Unmarshaler(&raftReq).Unmarshal(data); err != nil { - // try V2 Request - var r etcdserverpb.Request - if pbutil.Unmarshaler(&r).Unmarshal(data) != nil { - // return original error - return err - } - msg = proto.MessageReflect(&r) - break - } - msg = proto.MessageReflect(&raftReq) - if raftReq.ClusterVersionSet != nil { - ver, err := semver.NewVersion(raftReq.ClusterVersionSet.Ver) - if err != nil { - return err - } - err = visitor(msg.Descriptor().FullName(), ver) - if err != nil { - return err - } - } - case raftpb.EntryConfChange: - var confChange raftpb.ConfChange - err := pbutil.Unmarshaler(&confChange).Unmarshal(data) - if err != nil { - return nil - } - msg = proto.MessageReflect(&confChange) - return visitor(msg.Descriptor().FullName(), &version.V3_0) - case raftpb.EntryConfChangeV2: - var confChange raftpb.ConfChangeV2 - err := pbutil.Unmarshaler(&confChange).Unmarshal(data) - if err != nil { - return nil - } - msg = proto.MessageReflect(&confChange) - return visitor(msg.Descriptor().FullName(), &version.V3_4) - default: - panic("unhandled") - } - return visitMessage(msg, visitor) -} - -func visitMessageDescriptor(md protoreflect.MessageDescriptor, visitor Visitor) error { - err := visitDescriptor(md, visitor) - if err != nil { - return err - } - fields := md.Fields() - for i := 0; i < fields.Len(); i++ { - fd := fields.Get(i) - err = visitDescriptor(fd, visitor) - if err != nil { - return err - } - } - - enums := md.Enums() - for i := 0; i < enums.Len(); i++ { - err := visitEnumDescriptor(enums.Get(i), visitor) - if err != nil { - return err - } - } - return err -} - -func visitMessage(m protoreflect.Message, visitor Visitor) error { - md := m.Descriptor() - err := visitDescriptor(md, visitor) - if err != nil { - return err - } - m.Range(func(field protoreflect.FieldDescriptor, value protoreflect.Value) bool { - fd := md.Fields().Get(field.Index()) - err = visitDescriptor(fd, visitor) - if err != nil { - return false - } - - switch m := value.Interface().(type) { - case protoreflect.Message: - err = visitMessage(m, visitor) - case protoreflect.EnumNumber: - err = visitEnumNumber(fd.Enum(), m, visitor) - } - if err != nil { - return false - } - return true - }) - return err -} - -func visitEnumDescriptor(enum protoreflect.EnumDescriptor, visitor Visitor) error { - err := visitDescriptor(enum, visitor) - if err != nil { - return err - } - fields := enum.Values() - for i := 0; i < fields.Len(); i++ { - fd := fields.Get(i) - err = visitDescriptor(fd, visitor) - if err != nil { - return err - } - } - return err -} - -func visitEnumNumber(enum protoreflect.EnumDescriptor, number protoreflect.EnumNumber, visitor Visitor) error { - err := visitDescriptor(enum, visitor) - if err != nil { - return err - } - intNumber := int(number) - fields := enum.Values() - if intNumber >= fields.Len() || intNumber < 0 { - return fmt.Errorf("could not visit EnumNumber [%d]", intNumber) - } - return visitEnumValue(fields.Get(intNumber), visitor) -} - -func visitEnumValue(enum protoreflect.EnumValueDescriptor, visitor Visitor) error { - valueOpts := enum.Options().(*descriptorpb.EnumValueOptions) - if valueOpts != nil { - ver, _ := etcdVersionFromOptionsString(valueOpts.String()) - err := visitor(enum.FullName(), ver) - if err != nil { - return err - } - } - return nil -} - -func visitDescriptor(md protoreflect.Descriptor, visitor Visitor) error { - opts, ok := md.Options().(fmt.Stringer) - if !ok { - return nil - } - ver, err := etcdVersionFromOptionsString(opts.String()) - if err != nil { - return fmt.Errorf("%s: %s", md.FullName(), err) - } - return visitor(md.FullName(), ver) -} - -func maxVersion(a *semver.Version, b *semver.Version) *semver.Version { - if a != nil && (b == nil || b.LessThan(*a)) { - return a - } - return b -} - -func etcdVersionFromOptionsString(opts string) (*semver.Version, error) { - // TODO: Use proto.GetExtention when gogo/protobuf is usable with protoreflect - msgs := []string{"[versionpb.etcd_version_msg]:", "[versionpb.etcd_version_field]:", "[versionpb.etcd_version_enum]:", "[versionpb.etcd_version_enum_value]:"} - var end, index int - for _, msg := range msgs { - index = strings.Index(opts, msg) - end = index + len(msg) - if index != -1 { - break - } - } - if index == -1 { - return nil, nil - } - var verStr string - _, err := fmt.Sscanf(opts[end:], "%q", &verStr) - if err != nil { - return nil, err - } - if strings.Count(verStr, ".") == 1 { - verStr = verStr + ".0" - } - ver, err := semver.NewVersion(verStr) - if err != nil { - return nil, err - } - return ver, nil -}