-
Notifications
You must be signed in to change notification settings - Fork 384
/
Copy pathtree_storage.go
249 lines (218 loc) · 6.41 KB
/
tree_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
// Copyright 2017 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package memory
import (
"container/list"
"context"
"fmt"
"strings"
"sync"
"github.com/google/btree"
"github.com/google/trillian"
"github.com/google/trillian/storage/cache"
"github.com/google/trillian/storage/storagepb"
stree "github.com/google/trillian/storage/tree"
"google.golang.org/protobuf/proto"
"k8s.io/klog/v2"
)
const degree = 8
// subtreeKey formats a key for use in a tree's BTree store. The associated
// Item value will be the SubtreeProto with the given prefix.
func subtreeKey(treeID, rev int64, prefix []byte) btree.Item {
return &kv{k: fmt.Sprintf("/%d/subtree/%x/%d", treeID, prefix, rev)}
}
// tree stores all data for a given treeID
type tree struct {
// mu protects access to all tree members.
mu sync.RWMutex
// store is a key-value representation of a Trillian tree storage.
// The keyspace is partitioned off into various prefixes for the different
// 'tables' of things stored in there.
// e.g. subtree protos are stored with a key returned by subtreeKey() above.
//
// Other prefixes are used by Log/Map Storage.
//
// See the various key formatting functions for details of what is stored
// under the formatted keys.
//
// store uses a BTree so that we can have a defined ordering over things
// (such as sequenced leaves), while still accessing by key.
store *btree.BTree
// currentSTH is the timestamp of the current STH.
currentSTH uint64
meta *trillian.Tree
}
func (t *tree) Lock() {
t.mu.Lock()
}
func (t *tree) Unlock() {
t.mu.Unlock()
}
func (t *tree) RLock() {
t.mu.RLock()
}
func (t *tree) RUnlock() {
t.mu.RUnlock()
}
// TreeStorage is shared between the memoryLog and (forthcoming) memoryMap-
// Storage implementations, and contains functionality which is common to both,
type TreeStorage struct {
// mu only protects access to the trees map.
mu sync.RWMutex
trees map[int64]*tree
}
// NewTreeStorage returns a new instance of the in-memory tree storage database.
func NewTreeStorage() *TreeStorage {
return &TreeStorage{
trees: make(map[int64]*tree),
}
}
// getTree returns the tree associated with id, or nil if no such tree exists.
func (m *TreeStorage) getTree(id int64) *tree {
m.mu.RLock()
defer m.mu.RUnlock()
return m.trees[id]
}
// kv is a simple key->value type which implements btree's Item interface.
type kv struct {
k string
v interface{}
}
// Less than by k's string key
func (a kv) Less(b btree.Item) bool {
return strings.Compare(a.k, b.(*kv).k) < 0
}
// newTree creates and initializes a tree struct.
func newTree(t *trillian.Tree) *tree {
ret := &tree{
store: btree.New(degree),
meta: proto.Clone(t).(*trillian.Tree),
}
k := unseqKey(t.TreeId)
k.(*kv).v = list.New()
ret.store.ReplaceOrInsert(k)
k = hashToSeqKey(t.TreeId)
k.(*kv).v = make(map[string][]int64)
ret.store.ReplaceOrInsert(k)
return ret
}
func (m *TreeStorage) beginTreeTX(ctx context.Context, treeID int64, hashSizeBytes int, cache *cache.SubtreeCache, readonly bool) (treeTX, error) {
tree := m.getTree(treeID)
// Lock the tree for the duration of the TX.
// It will be unlocked by a call to Commit or Close.
var unlock func()
if readonly {
tree.RLock()
unlock = tree.RUnlock
} else {
tree.Lock()
unlock = tree.Unlock
}
return treeTX{
ts: m,
tx: tree.store.Clone(),
tree: tree,
treeID: treeID,
hashSizeBytes: hashSizeBytes,
subtreeCache: cache,
writeRevision: -1,
unlock: unlock,
}, nil
}
type treeTX struct {
closed bool
tx *btree.BTree
ts *TreeStorage
tree *tree
treeID int64
hashSizeBytes int
subtreeCache *cache.SubtreeCache
writeRevision int64
unlock func()
}
func (t *treeTX) getSubtrees(ctx context.Context, treeRevision int64, ids [][]byte) ([]*storagepb.SubtreeProto, error) {
if len(ids) == 0 {
return nil, nil
}
ret := make([]*storagepb.SubtreeProto, 0, len(ids))
for _, id := range ids {
// Look for a nodeID at or below treeRevision:
for r := treeRevision; r >= 0; r-- {
s := t.tx.Get(subtreeKey(t.treeID, r, id))
if s == nil {
continue
}
// Return a copy of the proto to protect against the caller modifying the stored one.
p := s.(*kv).v.(*storagepb.SubtreeProto)
v := proto.Clone(p).(*storagepb.SubtreeProto)
ret = append(ret, v)
break
}
}
// The InternalNodes cache is possibly nil here, but the SubtreeCache (which called
// this method) will re-populate it.
return ret, nil
}
func (t *treeTX) storeSubtrees(ctx context.Context, subtrees []*storagepb.SubtreeProto) error {
if len(subtrees) == 0 {
klog.Warning("attempted to store 0 subtrees...")
return nil
}
for _, s := range subtrees {
s := s
if s.Prefix == nil {
panic(fmt.Errorf("nil prefix on %v", s))
}
k := subtreeKey(t.treeID, t.writeRevision, s.Prefix)
k.(*kv).v = s
t.tx.ReplaceOrInsert(k)
}
return nil
}
// getSubtreesAtRev returns a GetSubtreesFunc which reads at the passed in rev.
func (t *treeTX) getSubtreesAtRev(ctx context.Context, rev int64) cache.GetSubtreesFunc {
return func(ids [][]byte) ([]*storagepb.SubtreeProto, error) {
return t.getSubtrees(ctx, rev, ids)
}
}
func (t *treeTX) SetMerkleNodes(ctx context.Context, nodes []stree.Node) error {
rev := t.writeRevision - 1
return t.subtreeCache.SetNodes(nodes, t.getSubtreesAtRev(ctx, rev))
}
func (t *treeTX) Commit(ctx context.Context) error {
defer t.unlock()
if t.writeRevision > -1 {
tiles, err := t.subtreeCache.UpdatedTiles()
if err != nil {
klog.Warningf("SubtreeCache updated tiles error: %v", err)
return err
}
if err := t.storeSubtrees(ctx, tiles); err != nil {
klog.Warningf("TX commit flush error: %v", err)
return err
}
}
t.closed = true
// update the shared view of the tree post TX:
t.tree.store = t.tx
return nil
}
func (t *treeTX) Close() error {
if t.closed {
return nil
}
defer t.unlock()
t.closed = true
return nil
}