Skip to content

Commit

Permalink
*: support CREATE/DROP graph statements
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng committed Dec 4, 2022
1 parent 2a3bc62 commit cfd8a45
Show file tree
Hide file tree
Showing 16 changed files with 481 additions and 36 deletions.
13 changes: 12 additions & 1 deletion catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

// Catalog maintains the catalog of graphs and label information.
type Catalog struct {
mu sync.RWMutex
mdl sync.Mutex // mdl prevent executing DDL concurrently.
mu sync.RWMutex // mu protect the catalog fields.

byName map[string]*Graph
byID map[int64]*Graph
Expand Down Expand Up @@ -94,3 +95,13 @@ func (c *Catalog) LabelByID(graphID, labelID int64) *Label {

return g.LabelByID(labelID)
}

// MDLock locks the catalog to prevent executing DDL concurrently.
func (c *Catalog) MDLock() {
c.mdl.Lock()
}

// MDUnlock unlocks the catalog.
func (c *Catalog) MDUnlock() {
c.mdl.Unlock()
}
15 changes: 15 additions & 0 deletions catalog/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,18 @@ func (g *Graph) LabelByID(id int64) *Label {

return g.byID[id]
}

// Labels returns the labels.
func (g *Graph) Labels() []*Label {
g.mu.RLock()
defer g.mu.RUnlock()

if len(g.byID) < 1 {
return nil
}
labels := make([]*Label, 0, len(g.byID))
for _, label := range g.byID {
labels = append(labels, label)
}
return labels
}
56 changes: 56 additions & 0 deletions catalog/patch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2022 zGraph Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package catalog

import "github.com/vescale/zgraph/parser/model"

// PatchType represents the type of patch.
type PatchType byte

const (
PatchTypeCreateGraph PatchType = iota
PatchTypeCreateLabel
PatchTypeCreateIndex
PatchTypeDropGraph
PatchTypeDropLabel
PatchTypeDropIndex
)

// Patch represents patch which contains a DDL change.
type Patch struct {
Type PatchType
Data interface{}
}

// Apply applies the patch to catalog.
// Note: we need to ensure the DDL changes have applied to persistent storage first.
func (c *Catalog) Apply(patch *Patch) {
switch patch.Type {
case PatchTypeCreateGraph:
data := patch.Data.(*model.GraphInfo)
graph := NewGraph(data)
c.mu.Lock()
c.byName[data.Name.L] = graph
c.byID[data.ID] = graph
c.mu.Unlock()

case PatchTypeDropGraph:
data := patch.Data.(*model.GraphInfo)
c.mu.Lock()
delete(c.byName, data.Name.L)
delete(c.byID, data.ID)
c.mu.Unlock()
}
}
8 changes: 4 additions & 4 deletions compiler/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ func Compile(sc *stmtctx.Context, catalog *catalog.Catalog, node ast.StmtNode) (
if err != nil {
return nil, err
}
p, isLogicalPlan := plan.(planner.LogicalPlan)
logicalPlan, isLogicalPlan := plan.(planner.LogicalPlan)
if !isLogicalPlan {
return executor.NewStatement(p), nil
return executor.NewStatement(sc, catalog, plan), nil
}

// Optimize the logical plan and generate physical plan.
optimized := planner.Optimize(plan)
optimized := planner.Optimize(logicalPlan)

return executor.NewStatement(optimized), nil
return executor.NewStatement(sc, catalog, optimized), nil
}
6 changes: 0 additions & 6 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package zgraph

import (
"context"
"sync"

"github.com/vescale/zgraph/catalog"
Expand Down Expand Up @@ -91,11 +90,6 @@ func (db *DB) NewSession() *session.Session {
return s
}

// Execute executes a query and reports whether the query executed successfully or not.
func (db *DB) Execute(ctx context.Context, query string) (session.ResultSet, error) {
return db.NewSession().Execute(ctx, query)
}

// Close destroys the zGraph database instances and all sessions will be terminated.
func (db *DB) Close() error {
db.mu.Lock()
Expand Down
24 changes: 24 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package zgraph

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -26,3 +27,26 @@ func TestOpen(t *testing.T) {
assert.Nil(err)
assert.NotNil(db)
}

func TestDB_NewSession(t *testing.T) {
assert := assert.New(t)
db, err := Open(t.TempDir(), nil)
assert.Nil(err)
assert.NotNil(db)
defer db.Close()

session := db.NewSession()
assert.NotNil(session)

ctx := context.Background()
rs, err := session.Execute(ctx, "create graph graph1000")
assert.Nil(err)

err = rs.Next(ctx)
assert.Nil(err)

// Check the catalog.
catalog := db.Catalog()
graph := catalog.Graph("graph1000")
assert.NotNil(graph)
}
73 changes: 73 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 zGraph Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"github.com/pingcap/errors"
"github.com/vescale/zgraph/catalog"
"github.com/vescale/zgraph/planner"
"github.com/vescale/zgraph/stmtctx"
)

// Builder is used to build from a plan into executor.
type Builder struct {
sc *stmtctx.Context
catalog *catalog.Catalog
err error
}

// NewBuilder returns a build instance.
func NewBuilder(sc *stmtctx.Context, catalog *catalog.Catalog) *Builder {
return &Builder{
sc: sc,
catalog: catalog,
}
}

// Build builds an executor from a plan.
func (b *Builder) Build(plan planner.Plan) Executor {
switch p := plan.(type) {
case *planner.DDL:
return b.buildDDL(p)
case *planner.Simple:
return b.buildSimple(p)
default:
b.err = errors.Errorf("unknown plan: %T", plan)
}
return nil
}

// Error returns the internal error encountered while building.
func (b *Builder) Error() error {
return b.err
}

func (b *Builder) buildDDL(plan *planner.DDL) Executor {
exec := &DDLExec{
baseExecutor: newBaseExecutor(b.sc, plan.Schema(), plan.ID()),
sc: b.sc,
statement: plan.Statement,
catalog: b.catalog,
}
return exec
}

func (b *Builder) buildSimple(plan *planner.Simple) Executor {
exec := &SimpleExec{
baseExecutor: newBaseExecutor(b.sc, plan.Schema(), plan.ID()),
statement: plan.Statement,
}
return exec
}
135 changes: 135 additions & 0 deletions executor/ddl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2022 zGraph Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"

"github.com/pingcap/errors"
"github.com/vescale/zgraph/catalog"
"github.com/vescale/zgraph/internal/chunk"
"github.com/vescale/zgraph/meta"
"github.com/vescale/zgraph/parser/ast"
"github.com/vescale/zgraph/parser/model"
"github.com/vescale/zgraph/stmtctx"
"github.com/vescale/zgraph/storage/kv"
)

// DDLExec is used to execute a DDL operation.
type DDLExec struct {
baseExecutor

sc *stmtctx.Context
done bool
statement ast.DDLNode
catalog *catalog.Catalog
}

// Next implements the Executor interface.
func (e *DDLExec) Next(_ context.Context, _ *chunk.Chunk) error {
if e.done {
return nil
}
e.done = true

// TODO: prevent executing DDL in transaction context.
// Prevent executing DDL concurrently.
e.catalog.MDLock()
defer e.catalog.MDUnlock()

var patch *catalog.Patch
err := kv.RunNewTxn(e.sc.Store(), func(txn kv.Transaction) error {
m := meta.New(txn)
var err error
switch stmt := e.statement.(type) {
case *ast.CreateGraphStmt:
patch, err = e.createGraph(m, stmt)
case *ast.DropGraphStmt:
patch, err = e.dropGraph(m, stmt)
default:
return errors.Errorf("unknown DDL(%T)", e.statement)
}
return err
})
if err != nil {
return err
}

// Apply the patch to catalog after the DDL changes have persistent in storage.
if patch != nil {
e.catalog.Apply(patch)
}
return nil
}

func (e *DDLExec) createGraph(m *meta.Meta, stmt *ast.CreateGraphStmt) (*catalog.Patch, error) {
graph := e.catalog.Graph(stmt.Graph.L)
if graph != nil {
if stmt.IfNotExists {
return nil, nil
}
return nil, meta.ErrGraphExists
}

// Persistent to storage.
id, err := m.NextGlobalID()
if err != nil {
return nil, err
}
graphInfo := &model.GraphInfo{
ID: id,
Name: stmt.Graph,
}
err = m.CreateGraph(graphInfo)
if err != nil {
return nil, err
}

patch := &catalog.Patch{
Type: catalog.PatchTypeCreateGraph,
Data: graphInfo,
}
return patch, nil
}

func (e *DDLExec) dropGraph(m *meta.Meta, stmt *ast.DropGraphStmt) (*catalog.Patch, error) {
graph := e.catalog.Graph(stmt.Graph.L)
if graph == nil {
if stmt.IfExists {
return nil, nil
}
return nil, meta.ErrGraphNotExists
}

// Persistent to storage.
graphID := graph.Meta().ID
labels := graph.Labels()
for _, label := range labels {
err := m.DropLabel(graphID, label.Meta().ID)
if err != nil {
return nil, err
}
}
err := m.DropGraph(graphID)
if err != nil {
return nil, err
}

patch := &catalog.Patch{
Type: catalog.PatchTypeDropGraph,
Data: graph.Meta(),
}
return patch, nil
}
Loading

0 comments on commit cfd8a45

Please sign in to comment.