Skip to content

Commit

Permalink
add basic functions to execute query
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng committed Dec 3, 2022
1 parent 3537994 commit fc547f5
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 10 deletions.
91 changes: 89 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,95 @@

package zgraph

type Graph struct{}
import (
"context"
"sync"

"github.com/vescale/zgraph/catalog"
"github.com/vescale/zgraph/storage"
"github.com/vescale/zgraph/storage/kv"
)

// DB represents the zGraph database instance.
type DB struct {
// All fields are not been protected by Mutex will be read-only.
options *Options
store kv.Storage
catalog *catalog.Catalog

mu struct {
sync.RWMutex
sessions map[int64]*Session
}
}

// Open opens a zGraph database instance with specified directory name.
func Open(dirname string, opt *Options) (*DB, error) {
if opt == nil {
opt = &Options{}
}
opt.SetDefaults()

store, err := storage.Open(dirname)
if err != nil {
return nil, err
}

// Load the catalog from storage.
snapshot, err := store.Snapshot(store.CurrentVersion())
if err != nil {
return nil, err
}
catalog, err := catalog.Load(snapshot)
if err != nil {
return nil, err
}

db := &DB{
options: opt,
store: store,
catalog: catalog,
}
db.mu.sessions = map[int64]*Session{}

return db, nil
}

// NewSession returns a new session.
func (db *DB) NewSession() *Session {
// TODO: concurrency limitation
db.mu.Lock()
defer db.mu.Unlock()

s := newSession(db)
db.mu.sessions[s.ID()] = s
return s
}

// Execute executes a query and reports whether the query executed successfully or not.
func (db *DB) Execute(ctx context.Context, query string) (ResultSet, error) {
return db.NewSession().Execute(ctx, query)
}

// Close destroys the zGraph database instances and all sessions will be terminated.
func (db *DB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()

for _, s := range db.mu.sessions {
s.setCloseCallback(db.onSessionClosedLocked)
s.Close()
}

func (g *Graph) Execute(query string) (ResultSet error) {
return nil
}

func (db *DB) onSessionClosed(s *Session) {
db.mu.Lock()
defer db.mu.Unlock()
db.onSessionClosedLocked(s)
}

func (db *DB) onSessionClosedLocked(s *Session) {
delete(db.mu.sessions, s.ID())
}
28 changes: 28 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2022 zGraph Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zgraph

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestOpen(t *testing.T) {
assert := assert.New(t)
db, err := Open(t.TempDir(), nil)
assert.Nil(err)
assert.NotNil(db)
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ module github.com/vescale/zgraph
go 1.19

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cockroachdb/pebble v0.0.0-20221122204154-936e011bb911
github.com/coocood/freecache v1.2.3
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8
github.com/cznic/strutil v0.0.0-20181122101858-275e90344537
github.com/google/btree v1.1.2
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63
github.com/stretchr/testify v1.8.1
github.com/twmb/murmur3 v1.1.6
go.uber.org/atomic v1.7.0
golang.org/x/exp v0.0.0-20221114191408-850992195362
modernc.org/parser v1.0.3
modernc.org/y v1.0.4
Expand All @@ -19,7 +21,6 @@ require (
require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cockroachdb/errors v1.8.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20190617123548-eb05cc24525f // indirect
Expand All @@ -40,8 +41,7 @@ require (
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/twmb/murmur3 v1.1.6 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.1.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ github.com/cockroachdb/redact v1.0.8/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZ
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 h1:IKgmqgMQlVJIZj19CdocBeSfSaiCbEBZGKODaixqtHM=
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ=
github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM=
github.com/coocood/freecache v1.2.3 h1:lcBwpZrwBZRZyLk/8EMyQVXRiFl663cCuMOrjCALeto=
github.com/coocood/freecache v1.2.3/go.mod h1:RBUWa/Cy+OHdfTGFEhEuE1pMCMX51Ncizj7rthiQ3vk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down Expand Up @@ -462,7 +460,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
32 changes: 32 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2022 zGraph Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zgraph

const defaultConcurrency = 512

// Options contains some options which is used to customize the zGraph database
// instance while instantiating zGraph.
type Options struct {
// Concurrency is used to limit the max concurrent sessions count. The NewSession
// method will block if the current alive sessions count reach this limitation.
Concurrency int64
}

// SetDefaults sets the missing options into default value.
func (opt *Options) SetDefaults() {
if opt.Concurrency <= 0 {
opt.Concurrency = defaultConcurrency
}
}
24 changes: 23 additions & 1 deletion resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,26 @@

package zgraph

type ResultSet interface{}
// SelectField represents a field information.
type SelectField struct {
Graph string
Label string
OrgLabel string
Name string
OrgName string
ColumnLength uint32
}

// ResultSet represents the result of a query.
type ResultSet interface {
// Fields returns the fields information of the current query.
Fields() []*SelectField
// Valid reports whether the current result set valid.
Valid() bool
// Next advances the current result set to the next row of query result.
Next() error
// Scan reads the current row.
Scan(fields ...interface{}) error
// Close closes the current result set, which will release all query intermediate resources..
Close() error
}
89 changes: 89 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2022 zGraph Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package zgraph

import (
"context"
"sync"
"sync/atomic"

"github.com/vescale/zgraph/stmtctx"
)

var sessionIDGenerator atomic.Int64

// Session represents the session to interact with zGraph database instance.
// Typically, the number of session will be same as the concurrent thread
// count of the application.
// All execution intermediate variables should be placed in the Context.
type Session struct {
// Protect the current session will not be used concurrently.
mu sync.Mutex
id int64
db *DB
sc *stmtctx.Context
wg sync.WaitGroup
closed atomic.Bool
cancelFn context.CancelFunc

// Callback function while session closing.
closeCallback func(s *Session)
}

// newSession returns a new session instance.
func newSession(db *DB) *Session {
return &Session{
id: sessionIDGenerator.Add(1),
db: db,
sc: stmtctx.New(),
}
}

// ID returns a integer identifier of the current session.
func (s *Session) ID() int64 {
return s.id
}

// Execute executes a query.
func (s *Session) Execute(ctx context.Context, query string) (ResultSet, error) {
s.mu.Lock()
defer s.mu.Unlock()

ctx, cancelFn := context.WithCancel(ctx)
s.cancelFn = cancelFn
s.wg.Add(1)
defer s.wg.Done()

return s.execute(ctx, query)
}

func (s *Session) execute(ctx context.Context, query string) (ResultSet, error) {
return nil, nil
}

// Close terminates the current session.
func (s *Session) Close() {
if s.closed.Swap(true) {
return
}
s.cancelFn()
s.wg.Wait()
}

func (s *Session) setCloseCallback(cb func(session *Session)) {
s.mu.Lock()
s.mu.Unlock()
s.closeCallback = cb
}
59 changes: 59 additions & 0 deletions stmtctx/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2022 zGraph Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stmtctx

import (
"sync"
)

// Context represent the intermediate state of a query execution and will be
// reset after a query finished.
type Context struct {
mu struct {
sync.RWMutex

affectedRows uint64
foundRows uint64
records uint64
deleted uint64
updated uint64
copied uint64
touched uint64

warnings []SQLWarn
errorCount uint16
}
}

// New returns a session statement context instance.
func New() *Context {
return &Context{}
}

// Reset resets all variables associated to execute a query.
func (sc *Context) Reset() {
sc.mu.Lock()
defer sc.mu.Unlock()

sc.mu.affectedRows = 0
sc.mu.foundRows = 0
sc.mu.records = 0
sc.mu.deleted = 0
sc.mu.updated = 0
sc.mu.copied = 0
sc.mu.touched = 0
sc.mu.warnings = sc.mu.warnings[:0]
sc.mu.errorCount = 0
}
Loading

0 comments on commit fc547f5

Please sign in to comment.