Skip to content

Commit 0d498df

Browse files
committed
finalize leveldb ops
1 parent 472fb0f commit 0d498df

File tree

8 files changed

+358
-145
lines changed

8 files changed

+358
-145
lines changed

pkg/config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const Prefix = "honu"
1818
// values that are omitted. The Config should be validated in preparation for running
1919
// the honudb instance to ensure that all server operations work as expected.
2020
type Config struct {
21+
PID uint32 `required:"true" desc:"the unique process id for this replica"`
2122
Maintenance bool `default:"false" desc:"if true, the replica will start in maintenance mode"`
2223
LogLevel logger.LevelDecoder `split_words:"true" default:"info" desc:"specify the verbosity of logging (trace, debug, info, warn, error, fatal panic)"`
2324
ConsoleLog bool `split_words:"true" default:"false" desc:"if true logs colorized human readable output instead of json"`

pkg/config/config_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
)
1111

1212
var testEnv = map[string]string{
13+
"HONU_PID": "24",
1314
"HONU_MAINTENANCE": "true",
1415
"HONU_LOG_LEVEL": "debug",
1516
"HONU_CONSOLE_LOG": "true",
@@ -28,6 +29,7 @@ func TestConfig(t *testing.T) {
2829
require.False(t, conf.IsZero(), "processed config should not be zero valued")
2930

3031
// Ensure configuration is correctly set from the environment
32+
require.Equal(t, uint32(24), conf.PID)
3133
require.True(t, conf.Maintenance)
3234
require.Equal(t, zerolog.DebugLevel, conf.GetLogLevel())
3335
require.True(t, conf.ConsoleLog)

pkg/store/engine/engine.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"github.com/rotationalio/honu/pkg/store/iterator"
55
"github.com/rotationalio/honu/pkg/store/key"
66
"github.com/rotationalio/honu/pkg/store/object"
7-
"github.com/rotationalio/honu/pkg/store/opts"
87
)
98

109
// Engines are the disk storage mechanism that Honu wraps. Users may chose different
@@ -19,15 +18,17 @@ type Engine interface {
1918
}
2019

2120
// Store is a simple key/value interface that allows for Get, Put, and Delete. Nearly
22-
// all engines should support the Store interface.
21+
// all engines should support the Store interface. Note that transactions and options
22+
// are a higher level construct than engine stores and are provided by the Honu store.
2323
type Store interface {
24-
Has(key.Key, *opts.ReadOptions) (exists bool, err error)
25-
Get(key.Key, *opts.ReadOptions) (object.Object, error)
26-
Put(key.Key, object.Object, *opts.WriteOptions) error
27-
Delete(key.Key, *opts.WriteOptions) error
24+
Has(key.Key) (exists bool, err error)
25+
Get(key.Key) (object.Object, error)
26+
Put(key.Key, object.Object) error
27+
Delete(key.Key) error
2828
}
2929

3030
// Iterator engines allow queries that scan a range of consecutive keys.
3131
type Iterator interface {
32-
Iter(prefix []byte, ro *opts.ReadOptions) (i iterator.Iterator, err error)
32+
Iter(prefix []byte) (i iterator.Iterator, err error)
33+
Range(start, limit []byte) (i iterator.Iterator, err error)
3334
}

pkg/store/engine/leveldb/iter.go

+6-92
Original file line numberDiff line numberDiff line change
@@ -4,105 +4,19 @@ import (
44
"github.com/rotationalio/honu/pkg/store/iterator"
55
"github.com/rotationalio/honu/pkg/store/key"
66
"github.com/rotationalio/honu/pkg/store/object"
7-
"github.com/rotationalio/honu/pkg/store/opts"
87

98
ldbiter "github.com/syndtr/goleveldb/leveldb/iterator"
109
)
1110

12-
func NewIterator(iter ldbiter.Iterator, ro *opts.ReadOptions) iterator.Iterator {
13-
return &ldbIterator{ldb: iter, ro: ro}
11+
func NewIterator(iter ldbiter.Iterator) iterator.Iterator {
12+
return &ldbIterator{Iterator: iter}
1413
}
1514

1615
type ldbIterator struct {
17-
ldb ldbiter.Iterator
18-
ro *opts.ReadOptions
16+
ldbiter.Iterator
1917
}
2018

21-
// Iterator management and error handling.
22-
func (i *ldbIterator) Error() error { return i.ldb.Error() }
23-
func (i *ldbIterator) Release() { i.ldb.Release() }
24-
2519
// Iterator accessor methods.
26-
func (i *ldbIterator) Key() key.Key { return key.Key(i.ldb.Key()) }
27-
func (i *ldbIterator) Object() object.Object { return object.Object(i.ldb.Value()) }
28-
29-
//===========================================================================
30-
// Implement iterator.Seeker interface
31-
//===========================================================================
32-
33-
func (i *ldbIterator) Seek(key []byte) bool {
34-
// Attempt to seek to the specified key.
35-
if ok := i.ldb.Seek(key); !ok {
36-
return false
37-
}
38-
39-
// If we aren't including tombstones, skip over any tombstones.
40-
if !i.ro.Tombstones {
41-
if i.Object().Tombstone() {
42-
return i.Next()
43-
}
44-
}
45-
return true
46-
}
47-
48-
func (i *ldbIterator) Next() bool {
49-
if ok := i.ldb.Next(); !ok {
50-
return false
51-
}
52-
53-
// Keep skipping over tombstones until we find a non-tombstone object by
54-
// recursively calling Next()
55-
if !i.ro.Tombstones {
56-
if i.Object().Tombstone() {
57-
return i.Next()
58-
}
59-
}
60-
61-
return true
62-
}
63-
64-
func (i *ldbIterator) Prev() bool {
65-
if ok := i.ldb.Prev(); !ok {
66-
return false
67-
}
68-
69-
// Keep skipping over tombstones until we find a non-tombstone object by
70-
// recursively calling Prev()
71-
if !i.ro.Tombstones {
72-
if i.Object().Tombstone() {
73-
return i.Prev()
74-
}
75-
}
76-
77-
return true
78-
}
79-
80-
func (i *ldbIterator) First() bool {
81-
if ok := i.ldb.First(); !ok {
82-
return false
83-
}
84-
85-
// If we aren't including tombstones, skip over any tombstones by calling Next
86-
if !i.ro.Tombstones {
87-
if i.Object().Tombstone() {
88-
return false
89-
}
90-
}
91-
92-
return true
93-
}
94-
95-
func (i *ldbIterator) Last() bool {
96-
if ok := i.ldb.Last(); !ok {
97-
return false
98-
}
99-
100-
// If we aren't including tombstones, skip over any tombstones by calling Prev
101-
if !i.ro.Tombstones {
102-
if i.Object().Tombstone() {
103-
return i.Prev()
104-
}
105-
}
106-
107-
return true
108-
}
20+
func (i *ldbIterator) Key() key.Key { return key.Key(i.Iterator.Key()) }
21+
func (i *ldbIterator) Object() object.Object { return object.Object(i.Iterator.Value()) }
22+
func (i *ldbIterator) Error() error { return Wrap(i.Iterator.Error()) }

pkg/store/engine/leveldb/iter_test.go

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package leveldb_test
2+
3+
import (
4+
"math/rand/v2"
5+
"testing"
6+
7+
"github.com/rotationalio/honu/pkg/store/iterator"
8+
"github.com/rotationalio/honu/pkg/store/key"
9+
"github.com/stretchr/testify/require"
10+
"go.rtnl.ai/ulid"
11+
)
12+
13+
func TestIterator(t *testing.T) {
14+
// Create a database and populate it with some data
15+
db := OpenLevelDB(t, false)
16+
17+
// Testing Data Structures
18+
cid := ulid.Make()
19+
oids := make(map[ulid.ULID]int)
20+
nkeys := 0
21+
22+
// Create three different objects with multiple versions each in the same collection
23+
var target ulid.ULID
24+
for i := 0; i < 3; i++ {
25+
count := rand.IntN(20) + 4
26+
nkeys += count
27+
28+
oid := ulid.Make()
29+
oids[oid] = count
30+
31+
Populate(t, db, count, cid, oid)
32+
33+
if i == 1 {
34+
target = oid
35+
}
36+
}
37+
38+
t.Run("All", func(t *testing.T) {
39+
actual := 0
40+
iter, err := db.Iter(nil)
41+
require.NoError(t, err)
42+
43+
for iter.Next() {
44+
actual++
45+
46+
k := iter.Key()
47+
require.Equal(t, cid, k.CollectionID())
48+
49+
obj := iter.Object()
50+
require.NotNil(t, obj)
51+
}
52+
53+
iter.Release()
54+
require.NoError(t, iter.Error())
55+
require.Equal(t, nkeys, actual)
56+
})
57+
58+
t.Run("Prefix", func(t *testing.T) {
59+
k := key.New(cid, target, nil)
60+
iter, err := db.Iter(k.ObjectPrefix())
61+
require.NoError(t, err)
62+
63+
actual := 0
64+
for iter.Next() {
65+
actual++
66+
require.Equal(t, target, iter.Key().ObjectID())
67+
}
68+
69+
iter.Release()
70+
require.NoError(t, iter.Error())
71+
require.Equal(t, oids[target], actual)
72+
})
73+
74+
t.Run("Range", func(t *testing.T) {
75+
k := key.New(cid, target, nil)
76+
iter, err := db.Range(k.ObjectPrefix(), k.ObjectLimit())
77+
require.NoError(t, err)
78+
79+
actual := 0
80+
for iter.Next() {
81+
actual++
82+
require.Equal(t, target, iter.Key().ObjectID())
83+
}
84+
85+
iter.Release()
86+
require.NoError(t, iter.Error())
87+
require.Equal(t, oids[target], actual)
88+
})
89+
90+
t.Run("Error", func(t *testing.T) {
91+
iter, err := db.Iter(nil)
92+
require.NoError(t, err)
93+
94+
iter.Release()
95+
require.False(t, iter.Next())
96+
require.ErrorIs(t, iter.Error(), iterator.ErrIterReleased)
97+
})
98+
}

pkg/store/engine/leveldb/leveldb.go

+19-42
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ import (
99
"github.com/rotationalio/honu/pkg/store/iterator"
1010
"github.com/rotationalio/honu/pkg/store/key"
1111
"github.com/rotationalio/honu/pkg/store/object"
12-
"github.com/rotationalio/honu/pkg/store/opts"
12+
1313
"github.com/syndtr/goleveldb/leveldb"
14+
"github.com/syndtr/goleveldb/leveldb/util"
1415
)
1516

1617
func Open(conf config.StoreConfig) (engine *Engine, err error) {
@@ -44,7 +45,7 @@ func (e *Engine) DB() *leveldb.DB {
4445

4546
// Returns the name of the engine type
4647
func (e *Engine) Engine() string {
47-
return "honu.LevelDBEngine"
48+
return "leveldb"
4849
}
4950

5051
// Close the database and flush all remaining writes to disk. LevelDB requires a close
@@ -57,60 +58,30 @@ func (e *Engine) Close() error {
5758
// Implement engine.Store Interface
5859
//===========================================================================
5960

60-
func (e *Engine) Has(key key.Key, ro *opts.ReadOptions) (exists bool, err error) {
61-
// TODO: tombstone handling
62-
if exists, err = e.ldb.Has(key[:], nil); err != nil {
61+
func (e *Engine) Has(key key.Key) (exists bool, err error) {
62+
if exists, err = e.ldb.Has(key, nil); err != nil {
6363
return exists, Wrap(err)
6464
}
6565
return exists, nil
6666
}
6767

68-
func (e *Engine) Get(key key.Key, ro *opts.ReadOptions) (_ object.Object, err error) {
69-
// TODO: tombstone handling
68+
func (e *Engine) Get(key key.Key) (_ object.Object, err error) {
7069
var val []byte
71-
if val, err = e.ldb.Get(key[:], nil); err != nil {
70+
if val, err = e.ldb.Get(key, nil); err != nil {
7271
return nil, Wrap(err)
7372
}
7473
return object.Object(val), nil
7574
}
7675

77-
func (e *Engine) Put(key key.Key, obj object.Object, wo *opts.WriteOptions) (err error) {
78-
// TODO: do we need a transaction here?
79-
if wo.GetNoOverwrite() || wo.GetCheckUpdate() {
80-
var exists bool
81-
if exists, err = e.ldb.Has(key, nil); err != nil {
82-
return Wrap(err)
83-
}
84-
85-
if exists && wo.GetNoOverwrite() {
86-
return engine.ErrAlreadyExists
87-
}
88-
89-
if !exists && wo.GetCheckUpdate() {
90-
return engine.ErrNotFound
91-
}
92-
}
93-
94-
if err = e.ldb.Put(key[:], obj[:], nil); err != nil {
76+
func (e *Engine) Put(key key.Key, obj object.Object) (err error) {
77+
if err = e.ldb.Put(key, obj, nil); err != nil {
9578
return Wrap(err)
9679
}
9780
return nil
9881
}
9982

100-
func (e *Engine) Delete(key key.Key, wo *opts.WriteOptions) (err error) {
101-
// TODO: do we need a transaction here?
102-
if wo.CheckDelete {
103-
var exists bool
104-
if exists, err = e.ldb.Has(key, nil); err != nil {
105-
return Wrap(err)
106-
}
107-
108-
if !exists {
109-
return engine.ErrNotFound
110-
}
111-
}
112-
113-
if err = e.ldb.Delete(key[:], nil); err != nil {
83+
func (e *Engine) Delete(key key.Key) (err error) {
84+
if err = e.ldb.Delete(key, nil); err != nil {
11485
return Wrap(err)
11586
}
11687
return nil
@@ -120,8 +91,12 @@ func (e *Engine) Delete(key key.Key, wo *opts.WriteOptions) (err error) {
12091
// Implement engine.Iterator Interface
12192
//===========================================================================
12293

123-
func (e *Engine) Iter(prefix []byte, ro *opts.ReadOptions) (_ iterator.Iterator, err error) {
124-
return nil, nil
94+
func (e *Engine) Iter(prefix []byte) (_ iterator.Iterator, err error) {
95+
return NewIterator(e.ldb.NewIterator(util.BytesPrefix(prefix), nil)), nil
96+
}
97+
98+
func (e *Engine) Range(start, limit []byte) (_ iterator.Iterator, err error) {
99+
return NewIterator(e.ldb.NewIterator(&util.Range{Start: start, Limit: limit}, nil)), nil
125100
}
126101

127102
//===========================================================================
@@ -130,6 +105,8 @@ func (e *Engine) Iter(prefix []byte, ro *opts.ReadOptions) (_ iterator.Iterator,
130105

131106
func Wrap(err error) error {
132107
switch {
108+
case err == nil:
109+
return nil
133110
case errors.Is(err, leveldb.ErrNotFound):
134111
return engine.ErrNotFound
135112
case errors.Is(err, leveldb.ErrReadOnly):

0 commit comments

Comments
 (0)