Skip to content
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## v3.0.0 (unreleased)
- Implement MultiUpsert method in client and expose it via the Client interface (#304)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any breaking changes here, what's the reason for wanting to call this 3.0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess no reason. should i add a 2.7.0?


## v2.6.0 (2018-04-16)
- Fix bug in invalidating fallback cache on upsert (#292)
Expand Down
80 changes: 70 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,12 @@ type Client interface {
// to update in fieldsToUpdate (or all the fields if you use dosa.All())
Upsert(ctx context.Context, fieldsToUpdate []string, objectToUpdate DomainObject) error

// TODO: Coming in v2.1
// MultiUpsert creates or updates multiple rows. A list of fields to
// update can be specified. Use All() or nil for all fields.
// MultiUpsert(context.Context, []string, ...DomainObject) (MultiResult, error)
// update can be specified. Use All() or nil for all fields. Partial
// successes are possible, so it is critical to inspect the MultiResult response
// to check for failures.
// NOTE: This API only upserts objects of same entity type from same scope.
MultiUpsert(context.Context, []string, ...DomainObject) (MultiResult, error)

// Remove removes a row by primary key. The passed-in entity should contain
// the primary key field values, all other fields are ignored.
Expand All @@ -151,7 +153,7 @@ type Client interface {
// given RemoveRangeOp.
RemoveRange(ctx context.Context, removeRangeOp *RemoveRangeOp) error

// TODO: Coming in v2.1
// TODO: Coming in future versions
// MultiRemove removes multiple rows by primary key. The passed-in entity should
// contain the primary key field values.
// MultiRemove(context.Context, ...DomainObject) (MultiResult, error)
Expand Down Expand Up @@ -410,12 +412,70 @@ func (c *client) createOrUpsert(ctx context.Context, fieldsToUpdate []string, en
return fn(ctx, re.EntityInfo(), fieldValues)
}

// MultiUpsert updates several entities by primary key, The entities provided
// must contain values for all components of its primary key for the operation
// to succeed. If `fieldsToUpdate` is provided, only a subset of fields will be
// updated.
func (c *client) MultiUpsert(context.Context, []string, ...DomainObject) (MultiResult, error) {
panic("not implemented")
// MultiUpsert updates several entities of the same type by primary key, The
// entities provided must contain values for all components of its primary key
// for the operation to succeed. If `fieldsToUpdate` is provided, only a subset
// of fields will be updated. Moreover, all entities being upserted must be part
// of the same partition otherwise the request will be rejected. This is enforced
// server side.
// NOTE: This endpoint is not officially released. No guarantees about correctness
// or performance of this API will be guaranteed until v3.0.0 is released.
func (c *client) MultiUpsert(ctx context.Context, fieldsToUpdate []string, entities ...DomainObject) (MultiResult, error) {
if !c.initialized {
return nil, &ErrNotInitialized{}
}

if len(entities) == 0 {
return nil, fmt.Errorf("the number of entities to upsert is zero")
}

// lookup registered entity, registry will return error if registration
// is not found
var re *RegisteredEntity
var listMultiValues []map[string]FieldValue
for _, entity := range entities {
ere, err := c.registrar.Find(entity)
if err != nil {
return nil, err
}

if re == nil {
re = ere
} else if re != ere {
return nil, fmt.Errorf("inconsistent entity type for multi upsert: %v vs %v", re, ere)
}

// translate entity field values to a map of primary key name/values pairs
keyFieldValues := re.KeyFieldValues(entity)

// translate remaining entity fields values to map of column name/value pairs
fieldValues, err := re.OnlyFieldValues(entity, fieldsToUpdate)
if err != nil {
return nil, err
}

// merge key and remaining values
for k, v := range keyFieldValues {
fieldValues[k] = v
}

listMultiValues = append(listMultiValues, fieldValues)
}

results, err := c.connector.MultiUpsert(ctx, re.EntityInfo(), listMultiValues)
if err != nil {
return nil, errors.Wrap(err, "MultiUpsert")
}

multiResult := MultiResult{}
// map results to entity fields
for i, entity := range entities {
if results[i] != nil {
multiResult[entity] = results[i]
}
}

return multiResult, nil
}

// Remove deletes an entity by primary key, The entity provided must contain
Expand Down
83 changes: 67 additions & 16 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,70 @@ func TestClient_Upsert(t *testing.T) {
assert.NoError(t, c3.Upsert(ctx, fieldsToUpdate, cte1))
assert.Equal(t, cte1.Email, updatedEmail)
}

func TestClient_MultiUpsert(t *testing.T) {
reg1, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1)
reg2, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1, cte2)
fieldsToUpsert := []string{"Email"}

e1 := &ClientTestEntity1{ID: int64(1), Email: "bar@email.com"}
e2 := &ClientTestEntity1{ID: int64(2), Email: "foo@email.com"}

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
_, err := c1.MultiUpsert(ctx, dosaRenamed.All(), cte1)
assert.Error(t, err)

// empty upsert
c1.Initialize(ctx)
_, err = c1.MultiUpsert(ctx, dosaRenamed.All())
assert.Error(t, err)
assert.Contains(t, err.Error(), "zero")

// unregistered object
c1.Initialize(ctx)
_, err = c1.MultiUpsert(ctx, dosaRenamed.All(), cte2)
assert.Error(t, err)
assert.Contains(t, err.Error(), "ClientTestEntity2")

// multi read different types of object
c1.Initialize(ctx)
_, err = c1.MultiUpsert(ctx, dosaRenamed.All(), cte2, cte1)
assert.Error(t, err)
assert.Contains(t, err.Error(), "ClientTestEntity2")

// happy path, mock connector
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockConn := mocks.NewMockConnector(ctrl)
mockConn.EXPECT().CheckSchema(ctx, gomock.Any(), gomock.Any(), gomock.Any()).Return(int32(1), nil).AnyTimes()
mockConn.EXPECT().MultiUpsert(ctx, gomock.Any(), gomock.Any()).
Do(func(_ context.Context, _ *dosaRenamed.EntityInfo, allValues []map[string]dosaRenamed.FieldValue) {
assert.Equal(t, allValues[0]["id"], e1.ID)
assert.Equal(t, allValues[0]["email"], e1.Email)
assert.Equal(t, allValues[1]["id"], e2.ID)
assert.Equal(t, allValues[1]["email"], e2.Email)

}).Return([]error{nil, nil}, nil).Times(1)
c2 := dosaRenamed.NewClient(reg2, mockConn)
assert.NoError(t, c2.Initialize(ctx))
rs, err := c2.MultiUpsert(ctx, fieldsToUpsert, e1, e2)
assert.NoError(t, err)
assert.Empty(t, rs)

// system error, mock connector
mockConn.EXPECT().MultiUpsert(ctx, gomock.Any(), gomock.Any()).Return([]error{nil, nil}, errors.New("connector error"))
rs, err = c2.MultiUpsert(ctx, fieldsToUpsert, e1, e2)
assert.Error(t, err)
assert.Empty(t, rs)

// single entity error, mock connector
mockConn.EXPECT().MultiUpsert(ctx, gomock.Any(), gomock.Any()).Return([]error{errors.New("single error"), nil}, nil)
rs, err = c2.MultiUpsert(ctx, fieldsToUpsert, e1, e2)
assert.NoError(t, err)
assert.NotNil(t, rs[e1])
}

func TestClient_CreateIfNotExists(t *testing.T) {
reg1, _ := dosaRenamed.NewRegistrar("test", "team.service", cte1)
reg2, _ := dosaRenamed.NewRegistrar("test", "team.service", cte1, cte2)
Expand Down Expand Up @@ -726,11 +790,12 @@ func TestClient_MultiRead(t *testing.T) {

// uninitialized
c1 := dosaRenamed.NewClient(reg1, nullConnector)
assert.Error(t, c1.Read(ctx, fieldsToRead, cte1))
_, err := c1.MultiRead(ctx, fieldsToRead, cte1)
assert.Error(t, err)

// unregistered object
c1.Initialize(ctx)
_, err := c1.MultiRead(ctx, dosaRenamed.All(), cte2)
_, err = c1.MultiRead(ctx, dosaRenamed.All(), cte2)
assert.Error(t, err)
assert.Contains(t, err.Error(), "ClientTestEntity2")

Expand Down Expand Up @@ -762,20 +827,6 @@ func TestClient_MultiRead(t *testing.T) {
assert.Equal(t, rs[e2].Error(), "not fonud")
}

/* TODO: Coming in v2.1
func TestClient_Unimplemented(t *testing.T) {
reg1, _ := dosaRenamed.NewRegistrar(scope, namePrefix, cte1)

c := dosaRenamed.NewClient(reg1, nullConnector)
assert.Panics(t, func() {
c.MultiUpsert(ctx, dosaRenamed.All(), &ClientTestEntity1{})
})
assert.Panics(t, func() {
c.MultiRemove(ctx, &ClientTestEntity1{})
})
}
*/

func TestAdminClient_CreateScope(t *testing.T) {
c := dosaRenamed.NewAdminClient(nullConnector)
assert.NotNil(t, c)
Expand Down
16 changes: 16 additions & 0 deletions mocks/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ func (_mr *_MockClientRecorder) MultiRead(arg0, arg1 interface{}, arg2 ...interf
return _mr.mock.ctrl.RecordCall(_mr.mock, "MultiRead", _s...)
}

func (_m *MockClient) MultiUpsert(_param0 context.Context, _param1 []string, _param2 ...dosa.DomainObject) (dosa.MultiResult, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := _m.ctrl.Call(_m, "MultiUpsert", _s...)
ret0, _ := ret[0].(dosa.MultiResult)
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockClientRecorder) MultiUpsert(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "MultiUpsert", _s...)
}

func (_m *MockClient) Range(_param0 context.Context, _param1 *dosa.RangeOp) ([]dosa.DomainObject, string, error) {
ret := _m.ctrl.Call(_m, "Range", _param0, _param1)
ret0, _ := ret[0].([]dosa.DomainObject)
Expand Down