Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decompose the interface #798

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ type DB struct {
rwtx *Tx
txs []*Tx

freelist fl.Interface
freelistLoad sync.Once
freelist fl.Interface
freelistLoad sync.Once
freelistReadWriter fl.ReadWriter

pagePool sync.Pool

Expand Down Expand Up @@ -417,12 +418,13 @@ func (db *DB) getPageSizeFromSecondMeta() (int, bool, error) {
func (db *DB) loadFreelist() {
db.freelistLoad.Do(func() {
db.freelist = newFreelist(db.FreelistType)
db.freelistReadWriter = fl.NewSortedSerializer()
if !db.hasSyncedFreelist() {
// Reconstruct free list by scanning the DB.
db.freelist.Init(db.freepages())
} else {
// Read free list from freelist page.
db.freelist.Read(db.page(db.meta().Freelist()))
db.freelistReadWriter.Read(db.freelist, db.page(db.meta().Freelist()))
}
db.stats.FreePageN = db.freelist.FreeCount()
})
Expand Down
17 changes: 5 additions & 12 deletions internal/freelist/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@ import (
)

type array struct {
*shared

ids []common.Pgid // all free and available free page ids.
}

func (f *array) Init(ids common.Pgids) {
f.ids = ids
f.reindex()
}

func (f *array) Allocate(txid common.Txid, n int) common.Pgid {
func (f *array) alloc(txid common.Txid, n int, allocs *map[common.Pgid]common.Txid, cache *map[common.Pgid]struct{}) common.Pgid {
if len(f.ids) == 0 {
return 0
}
Expand Down Expand Up @@ -49,9 +46,9 @@ func (f *array) Allocate(txid common.Txid, n int) common.Pgid {

// Remove from the free cache.
for i := common.Pgid(0); i < common.Pgid(n); i++ {
delete(f.cache, initial+i)
delete(*cache, initial+i)
}
f.allocs[initial] = txid
(*allocs)[initial] = txid
return initial
}

Expand All @@ -64,7 +61,7 @@ func (f *array) FreeCount() int {
return len(f.ids)
}

func (f *array) freePageIds() common.Pgids {
func (f *array) FreePageIds() common.Pgids {
return f.ids
}

Expand Down Expand Up @@ -99,9 +96,5 @@ func (f *array) mergeSpans(ids common.Pgids) {
}

func NewArrayFreelist() Interface {
a := &array{
shared: newShared(),
}
a.Interface = a
return a
return newShared(&array{})
}
8 changes: 4 additions & 4 deletions internal/freelist/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestFreelistArray_allocate(t *testing.T) {
if id := int(f.Allocate(1, 0)); id != 0 {
t.Fatalf("exp=0; got=%v", id)
}
if exp := common.Pgids([]common.Pgid{9, 18}); !reflect.DeepEqual(exp, f.freePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.freePageIds())
if exp := common.Pgids([]common.Pgid{9, 18}); !reflect.DeepEqual(exp, f.FreePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds())
}

if id := int(f.Allocate(1, 1)); id != 9 {
Expand All @@ -46,7 +46,7 @@ func TestFreelistArray_allocate(t *testing.T) {
if id := int(f.Allocate(1, 1)); id != 0 {
t.Fatalf("exp=0; got=%v", id)
}
if exp := common.Pgids([]common.Pgid{}); !reflect.DeepEqual(exp, f.freePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.freePageIds())
if exp := common.Pgids([]common.Pgid{}); !reflect.DeepEqual(exp, f.FreePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds())
}
}
82 changes: 43 additions & 39 deletions internal/freelist/freelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,34 @@ import (

type ReadWriter interface {
// Read calls Init with the page ids stored in the given page.
Read(page *common.Page)
Read(f Interface, page *common.Page)

// Write writes the freelist into the given page.
Write(page *common.Page)
Write(f Interface, page *common.Page)

// EstimatedWritePageSize returns the size of the freelist after serialization in Write.
// This should never underestimate the size.
EstimatedWritePageSize() int
EstimatedWritePageSize(f Interface) int
}

type Interface interface {
ReadWriter

type allocator interface {
// Init initializes this freelist with the given list of pages.
Init(ids common.Pgids)

// Allocate tries to allocate the given number of contiguous pages
// from the free list pages. It returns the starting page ID if
// available; otherwise, it returns 0.
Allocate(txid common.Txid, numPages int) common.Pgid

// Count returns the number of free and pending pages.
Count() int

// FreeCount returns the number of free pages.
FreeCount() int

// PendingCount returns the number of pending pages.
PendingCount() int
// FreePageIds returns the IDs of all free pages.
FreePageIds() common.Pgids

// mergeSpans is merging the given pages into the freelist
mergeSpans(ids common.Pgids)

// TODO(thomas): this is necessary to decouple, but leaks internals
alloc(txid common.Txid, numPages int, allocs *map[common.Pgid]common.Txid, cache *map[common.Pgid]struct{}) common.Pgid
}

type txManager interface {
// AddReadonlyTXID adds a given read-only transaction id for pending page tracking.
AddReadonlyTXID(txid common.Txid)

Expand All @@ -45,6 +43,31 @@ type Interface interface {
// ReleasePendingPages releases any pages associated with closed read-only transactions.
ReleasePendingPages()

// pendingPageIds returns all pending pages by transaction id.
pendingPageIds() map[common.Txid]*txPending

// release moves all page ids for a transaction id (or older) to the freelist.
release(txId common.Txid)

// releaseRange moves pending pages allocated within an extent [begin,end] to the free list.
releaseRange(begin, end common.Txid)
}

type Interface interface {
allocator
txManager

// Allocate tries to allocate the given number of contiguous pages
// from the free list pages. It returns the starting page ID if
// available; otherwise, it returns 0.
Allocate(txid common.Txid, numPages int) common.Pgid

// Count returns the number of free and pending pages.
Count() int

// PendingCount returns the number of pending pages.
PendingCount() int

// Free releases a page and its overflow for a given transaction id.
// If the page is already free then a panic will occur.
Free(txId common.Txid, p *common.Page)
Expand All @@ -55,28 +78,9 @@ type Interface interface {
// Rollback removes the pages from a given pending tx.
Rollback(txId common.Txid)

// Copyall copies a list of all free ids and all pending ids in one sorted list.
// f.count returns the minimum length required for dst.
Copyall(dst []common.Pgid)

// Reload reads the freelist from a page and filters out pending items.
Reload(p *common.Page)
// List returns a list of all free ids and all pending ids in one sorted list.
List() common.Pgids

// NoSyncReload reads the freelist from Pgids and filters out pending items.
NoSyncReload(pgIds common.Pgids)

// freePageIds returns the IDs of all free pages.
freePageIds() common.Pgids

// pendingPageIds returns all pending pages by transaction id.
pendingPageIds() map[common.Txid]*txPending

// release moves all page ids for a transaction id (or older) to the freelist.
release(txId common.Txid)

// releaseRange moves pending pages allocated within an extent [begin,end] to the free list.
releaseRange(begin, end common.Txid)

// mergeSpans is merging the given pages into the freelist
mergeSpans(ids common.Pgids)
// Reload reads the freelist from Pgids and filters out pending items.
Reload(pgIds common.Pgids)
}
32 changes: 17 additions & 15 deletions internal/freelist/freelist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func TestFreelist_release(t *testing.T) {
f.Free(102, common.NewPage(39, 0, 0, 0))
f.release(100)
f.release(101)
if exp := common.Pgids([]common.Pgid{9, 12, 13}); !reflect.DeepEqual(exp, f.freePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.freePageIds())
if exp := common.Pgids([]common.Pgid{9, 12, 13}); !reflect.DeepEqual(exp, f.FreePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds())
}

f.release(102)
if exp := common.Pgids([]common.Pgid{9, 12, 13, 39}); !reflect.DeepEqual(exp, f.freePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.freePageIds())
if exp := common.Pgids([]common.Pgid{9, 12, 13, 39}); !reflect.DeepEqual(exp, f.FreePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds())
}
}

Expand Down Expand Up @@ -173,8 +173,8 @@ func TestFreelist_releaseRange(t *testing.T) {
f.releaseRange(r.begin, r.end)
}

if exp := common.Pgids(c.wantFree); !reflect.DeepEqual(exp, f.freePageIds()) {
t.Errorf("exp=%v; got=%v for %s", exp, f.freePageIds(), c.title)
if exp := common.Pgids(c.wantFree); !reflect.DeepEqual(exp, f.FreePageIds()) {
t.Errorf("exp=%v; got=%v for %s", exp, f.FreePageIds(), c.title)
}
}
}
Expand All @@ -194,11 +194,12 @@ func TestFreelist_read(t *testing.T) {

// Deserialize page into a freelist.
f := newTestFreelist()
f.Read(page)
s := SortedSerializer{}
s.Read(f, page)

// Ensure that there are two page ids in the freelist.
if exp := common.Pgids([]common.Pgid{23, 50}); !reflect.DeepEqual(exp, f.freePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.freePageIds())
if exp := common.Pgids([]common.Pgid{23, 50}); !reflect.DeepEqual(exp, f.FreePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds())
}
}

Expand All @@ -207,21 +208,22 @@ func TestFreelist_write(t *testing.T) {
// Create a freelist and write it to a page.
var buf [4096]byte
f := newTestFreelist()
s := SortedSerializer{}

f.Init([]common.Pgid{12, 39})
f.pendingPageIds()[100] = &txPending{ids: []common.Pgid{28, 11}}
f.pendingPageIds()[101] = &txPending{ids: []common.Pgid{3}}
p := (*common.Page)(unsafe.Pointer(&buf[0]))
f.Write(p)
s.Write(f, p)

// Read the page back out.
f2 := newTestFreelist()
f2.Read(p)
s.Read(f2, p)

// Ensure that the freelist is correct.
// All pages should be present and in reverse order.
if exp := common.Pgids([]common.Pgid{3, 11, 12, 28, 39}); !reflect.DeepEqual(exp, f2.freePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f2.freePageIds())
if exp := common.Pgids([]common.Pgid{3, 11, 12, 28, 39}); !reflect.DeepEqual(exp, f2.FreePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f2.FreePageIds())
}
}

Expand Down Expand Up @@ -258,15 +260,15 @@ func Test_freelist_ReadIDs_and_getFreePageIDs(t *testing.T) {

f.Init(exp)

if got := f.freePageIds(); !reflect.DeepEqual(exp, got) {
if got := f.FreePageIds(); !reflect.DeepEqual(exp, got) {
t.Fatalf("exp=%v; got=%v", exp, got)
}

f2 := newTestFreelist()
var exp2 []common.Pgid
f2.Init(exp2)

if got2 := f2.freePageIds(); !reflect.DeepEqual(got2, common.Pgids(exp2)) {
if got2 := f2.FreePageIds(); !reflect.DeepEqual(got2, common.Pgids(exp2)) {
t.Fatalf("exp2=%#v; got2=%#v", exp2, got2)
}

Expand Down
27 changes: 12 additions & 15 deletions internal/freelist/hashmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
type pidSet map[common.Pgid]struct{}

type hashMap struct {
*shared

freePagesCount uint64 // count of free pages(hashmap version)
freemaps map[uint64]pidSet // key is the size of continuous pages(span), value is a set which contains the starting pgids of same size
forwardMap map[common.Pgid]uint64 // key is start pgid, value is its span size
Expand Down Expand Up @@ -54,11 +52,9 @@ func (f *hashMap) Init(pgids common.Pgids) {
if size != 0 && start != 0 {
f.addSpan(start, size)
}

f.reindex()
}

func (f *hashMap) Allocate(txid common.Txid, n int) common.Pgid {
func (f *hashMap) alloc(txid common.Txid, n int, allocs *map[common.Pgid]common.Txid, cache *map[common.Pgid]struct{}) common.Pgid {
if n == 0 {
return 0
}
Expand All @@ -69,10 +65,10 @@ func (f *hashMap) Allocate(txid common.Txid, n int) common.Pgid {
// remove the span
f.delSpan(pid, uint64(n))

f.allocs[pid] = txid
(*allocs)[pid] = txid

for i := common.Pgid(0); i < common.Pgid(n); i++ {
delete(f.cache, pid+i)
delete(*cache, pid+i)
}
return pid
}
Expand All @@ -88,15 +84,15 @@ func (f *hashMap) Allocate(txid common.Txid, n int) common.Pgid {
// remove the initial
f.delSpan(pid, size)

f.allocs[pid] = txid
(*allocs)[pid] = txid

remain := size - uint64(n)

// add remain span
f.addSpan(pid+common.Pgid(n), remain)

for i := common.Pgid(0); i < common.Pgid(n); i++ {
delete(f.cache, pid+i)
delete(*cache, pid+i)
}
return pid
}
Expand All @@ -114,7 +110,7 @@ func (f *hashMap) FreeCount() int {
return int(f.freePagesCount)
}

func (f *hashMap) freePageIds() common.Pgids {
func (f *hashMap) FreePageIds() common.Pgids {
count := f.FreeCount()
if count == 0 {
return nil
Expand Down Expand Up @@ -280,13 +276,14 @@ func (f *hashMap) idsFromBackwardMap() map[common.Pgid]struct{} {
return ids
}

func NewHashMapFreelist() Interface {
hm := &hashMap{
shared: newShared(),
func newHashMap() *hashMap {
return &hashMap{
freemaps: make(map[uint64]pidSet),
forwardMap: make(map[common.Pgid]uint64),
backwardMap: make(map[common.Pgid]uint64),
}
hm.Interface = hm
return hm
}

func NewHashMapFreelist() Interface {
return newShared(newHashMap())
}
Loading