Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create table and related cache for additional instance data #1262

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b24d2e1
feat: add related cache for additional instance data.
fabian4 Sep 20, 2023
9e821e4
feat: add related cache for additional instance data.
fabian4 Sep 22, 2023
3118f50
feat: add related cache for additional instance data.
fabian4 Sep 22, 2023
7f02df5
feat: add related cache for additional instance data.
fabian4 Sep 23, 2023
f9336bc
feat: add related cache for additional instance data.
fabian4 Sep 25, 2023
7cd8ff9
feat: add related cache for additional instance data.
fabian4 Sep 25, 2023
b17cbee
feat: add related cache for additional instance data.
fabian4 Sep 26, 2023
6770526
Merge branch 'main' into new_table_for_instance_console
fabian4 Sep 26, 2023
b3e121e
feat: add related cache for additional instance data.
fabian4 Sep 26, 2023
ac81b30
feat: add related cache for additional instance data.
fabian4 Sep 27, 2023
65049f4
feat: add related cache for additional instance data.
fabian4 Sep 27, 2023
2c5f961
feat: add related cache for additional instance data.
fabian4 Sep 27, 2023
5058981
feat: add related cache for additional instance data.
fabian4 Oct 10, 2023
7f03488
feat: add related cache for additional instance data.
fabian4 Oct 10, 2023
022dcb8
feat: add related cache for additional instance data.
fabian4 Oct 10, 2023
c0c337f
feat: add related cache for additional instance data.
fabian4 Oct 10, 2023
967abd2
feat: add related cache for additional instance data.
fabian4 Oct 10, 2023
e8133ad
feat: add related cache for additional instance data.
fabian4 Oct 10, 2023
1ee4956
feat: add related cache for additional instance data.
fabian4 Oct 13, 2023
fff45cf
feat: add related cache for additional instance data.
fabian4 Oct 13, 2023
c765ac6
Merge branch 'refs/heads/main' into new_table_for_instance_console
fabian4 Sep 24, 2024
5e19f35
feat: add related cache for additional instance data.
fabian4 Sep 24, 2024
d3ce712
Merge branch 'main' into new_table_for_instance_console
fabian4 Sep 25, 2024
05b2e00
feat: add related cache for additional instance data.
fabian4 Sep 25, 2024
92c12ed
Merge branch 'main' into new_table_for_instance_console
fabian4 Sep 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apiserver/eurekaserver/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func TestEurekaServer_renew(t *testing.T) {
insId: ins,
disableBeatInsId: disableBeatIns,
}, nil)
mockStore.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes().
Return(nil, nil)
mockStore.EXPECT().StartReadTx().Return(mockTx, nil).AnyTimes()
mockStore.EXPECT().
GetMoreServices(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Expand Down
31 changes: 30 additions & 1 deletion cache/service/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
// service id -> [instanceid ->instance]
services *utils.SyncMap[string, *model.ServiceInstances]
// service id -> [instanceCount]
instanceCounts *utils.SyncMap[string, *model.InstanceCount]
instanceCounts *utils.SyncMap[string, *model.InstanceCount]
// service id -> [instanceConsole]
instanceConsoles *utils.SyncMap[string, *model.InstanceConsole]
instancePorts *instancePorts
disableBusiness bool
needMeta bool
Expand All @@ -78,6 +80,7 @@
ic.ids = utils.NewSyncMap[string, *model.Instance]()
ic.services = utils.NewSyncMap[string, *model.ServiceInstances]()
ic.instanceCounts = utils.NewSyncMap[string, *model.InstanceCount]()
ic.instanceConsoles = utils.NewSyncMap[string, *model.InstanceConsole]()
ic.instancePorts = newInstancePorts()
if opt == nil {
return nil
Expand Down Expand Up @@ -192,6 +195,17 @@
return nil, nil, -1, err
}

instanceConsoles, err := ic.storage.GetMoreInstanceConsoles(tx, ic.LastFetchTime(), ic.IsFirstUpdate(),
ic.needMeta, ic.systemServiceID)
if err != nil {
log.Error("[Cache][InstanceConsole] update get storage more", zap.Error(err))
return nil, nil, -1, err

Check warning on line 202 in cache/service/instance.go

View check run for this annotation

Codecov / codecov/patch

cache/service/instance.go#L201-L202

Added lines #L201 - L202 were not covered by tests
}
for _, item := range instanceConsoles {
//Todo: check validation
ic.instanceConsoles.Store(item.Id, item)
}

events, lastMtimes, update, del := ic.setInstances(instances)
log.Info("[Cache][Instance] get more instances",
zap.Int("pull-from-store", len(instances)), zap.Int("update", update), zap.Int("delete", del),
Expand All @@ -205,6 +219,7 @@
ic.ids = utils.NewSyncMap[string, *model.Instance]()
ic.services = utils.NewSyncMap[string, *model.ServiceInstances]()
ic.instanceCounts = utils.NewSyncMap[string, *model.InstanceCount]()
ic.instanceConsoles = utils.NewSyncMap[string, *model.InstanceConsole]()
ic.instancePorts.reset()
ic.instanceCount = 0
return nil
Expand Down Expand Up @@ -434,6 +449,20 @@
return value
}

// GetInstanceConsole 根据实例ID获取实例数据
func (ic *instanceCache) GetInstanceConsole(instanceConsoleID string) *model.InstanceConsole {
if instanceConsoleID == "" {
return nil

Check warning on line 455 in cache/service/instance.go

View check run for this annotation

Codecov / codecov/patch

cache/service/instance.go#L455

Added line #L455 was not covered by tests
}

value, ok := ic.instanceConsoles.Load(instanceConsoleID)
if !ok {
return nil

Check warning on line 460 in cache/service/instance.go

View check run for this annotation

Codecov / codecov/patch

cache/service/instance.go#L460

Added line #L460 was not covered by tests
}

return value
}

// GetInstances 根据服务名获取实例,先查找服务名对应的服务ID,再找实例列表
func (ic *instanceCache) GetInstances(serviceID string) *model.ServiceInstances {
if serviceID == "" {
Expand Down
68 changes: 66 additions & 2 deletions cache/service/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ func genModelInstances(label string, total int) map[string]*model.Instance {
return out
}

func genModelInstancesConsole(label string, total int) map[string]*model.InstanceConsole {
out := make(map[string]*model.InstanceConsole)
for i := 0; i < total; i++ {
entry := &model.InstanceConsole{
Id: fmt.Sprintf("InstanceConsole-%s-%d", label, i),
Isolate: false,
Weight: 100,
Metadata: "Metadata",
}
out[entry.Id] = entry
}

return out
}

// 对instanceCache的缓存数据进行计数统计
func iteratorInstances(ic *instanceCache) (int, int) {
instancesCount := 0
Expand All @@ -118,6 +133,7 @@ func TestInstanceCache_Update(t *testing.T) {
ret := make(map[string]*model.Instance)
instances1 := genModelInstances("service1", 10) // 每次gen为一个服务的
instances2 := genModelInstances("service2", 5)
instanceConsoles := genModelInstancesConsole("console", 3)

for id, instance := range instances1 {
ret[id] = instance
Expand All @@ -129,13 +145,17 @@ func TestInstanceCache_Update(t *testing.T) {
gomock.InOrder(storage.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(ret, nil))
gomock.InOrder(storage.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instanceConsoles, nil))
gomock.InOrder(storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(15), nil))
if err := ic.Update(); err != nil {
t.Fatalf("error: %s", err.Error())
}

servicesCount, instancesCount := iteratorInstances(ic)
if servicesCount == 2 && instancesCount == 10+5 { // gen两次,有两个不同服务
instanceConsoleCounts := ic.instanceConsoles.Len()
if servicesCount == 2 && instancesCount == 10+5 && instanceConsoleCounts == 3 { // gen两次,有两个不同服务
t.Logf("pass")
} else {
t.Fatalf("error: %d, %d", servicesCount, instancesCount)
Expand All @@ -147,26 +167,34 @@ func TestInstanceCache_Update(t *testing.T) {
gomock.InOrder(storage.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(nil, nil))
gomock.InOrder(storage.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(nil, nil))
if err := ic.Update(); err != nil {
t.Fatalf("error: %s", err.Error())
}

servicesCount, instancesCount := iteratorInstances(ic)
if servicesCount != 0 || instancesCount != 0 {
instanceConsoleCounts := ic.instanceConsoles.Len()
if servicesCount != 0 || instancesCount != 0 || instanceConsoleCounts != 0 {
t.Fatalf("error: %d %d", servicesCount, instancesCount)
}
})

t.Run("lastMtime可以正常更新", func(t *testing.T) {
_ = ic.Clear()
instances := genModelInstances("services", 10)
instanceConsoles := genModelInstancesConsole("console", 3)
maxMtime := time.Now()
instances[fmt.Sprintf("instanceID-%s-%d", "services", 5)].ModifyTime = maxMtime

gomock.InOrder(
storage.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), gomock.Any(), ic.needMeta, ic.systemServiceID).
Return(instances, nil),
storage.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instanceConsoles, nil),
storage.EXPECT().GetUnixSecond(gomock.Any()).Return(maxMtime.Unix(), nil).AnyTimes(),
)
if err := ic.Update(); err != nil {
Expand All @@ -188,6 +216,9 @@ func TestInstanceCache_Update2(t *testing.T) {
gomock.InOrder(storage.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(nil, fmt.Errorf("storage get error")))
gomock.InOrder(storage.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(nil, nil))
gomock.InOrder(storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(0), fmt.Errorf("storage get error")))
if err := ic.Update(); err != nil {
t.Logf("pass: %s", err.Error())
Expand All @@ -199,9 +230,13 @@ func TestInstanceCache_Update2(t *testing.T) {
t.Run("更新数据,再删除部分数据,缓存正常", func(t *testing.T) {
_ = ic.Clear()
instances := genModelInstances("service-a", 20)
instanceConsoles := genModelInstancesConsole("console", 3)
gomock.InOrder(storage.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instances, nil))
gomock.InOrder(storage.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instanceConsoles, nil))
if err := ic.Update(); err != nil {
t.Fatalf("error: %s", err.Error())
}
Expand All @@ -218,6 +253,9 @@ func TestInstanceCache_Update2(t *testing.T) {
gomock.InOrder(storage.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instances, nil))
gomock.InOrder(storage.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instanceConsoles, nil))
if err := ic.Update(); err != nil {
t.Fatalf("error: %s", err.Error())
}
Expand All @@ -231,6 +269,7 @@ func TestInstanceCache_Update2(t *testing.T) {
t.Run("对账发现缓存数据数量和存储层不一致", func(t *testing.T) {
_ = ic.Clear()
instances := genModelInstances("service-a", 20)

queryCount := int32(0)
storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(0), nil).AnyTimes()
storage.EXPECT().
Expand All @@ -256,9 +295,14 @@ func TestInstanceCache_GetInstance(t *testing.T) {
t.Run("缓存有数据,可以正常获取到数据", func(t *testing.T) {
_ = ic.Clear()
instances := genModelInstances("my-services", 10)
instanceConsoles := genModelInstancesConsole("console", 3)

gomock.InOrder(storage.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instances, nil))
gomock.InOrder(storage.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instanceConsoles, nil))
gomock.InOrder(storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(10), nil))
if err := ic.Update(); err != nil {
t.Fatalf("error: %s", err.Error())
Expand All @@ -271,6 +315,14 @@ func TestInstanceCache_GetInstance(t *testing.T) {
if instance := ic.GetInstance("test-instance-xx"); instance != nil {
t.Fatalf("error")
}

if instanceConsole := ic.GetInstanceConsole(instanceConsoles[fmt.Sprintf("InstanceConsole-%s-%d", "console", 2)].Id); instanceConsole == nil {
t.Fatalf("error")
}

if instanceConsole := ic.GetInstance("test-instanceConsole-xx"); instanceConsole != nil {
t.Fatalf("error")
}
})
}

Expand All @@ -280,6 +332,7 @@ func TestInstanceCache_GetServicePorts(t *testing.T) {
t.Run("缓存有数据,可以正常获取到服务的端口列表", func(t *testing.T) {
_ = ic.Clear()
instances := genModelInstances("my-services", 10)
instanceConsoles := genModelInstancesConsole("console", 3)

ports := make(map[string][]*model.ServicePort)

Expand Down Expand Up @@ -311,6 +364,9 @@ func TestInstanceCache_GetServicePorts(t *testing.T) {
gomock.InOrder(storage.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instances, nil))
gomock.InOrder(storage.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instanceConsoles, nil))
gomock.InOrder(storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(10), nil))
if err := ic.Update(); err != nil {
t.Fatalf("error: %s", err.Error())
Expand All @@ -333,6 +389,7 @@ func TestInstanceCache_fillIntrnalLabels(t *testing.T) {
t.Run("向实例Metadata中自动注入北极星默认label信息", func(t *testing.T) {
_ = ic.Clear()
instances := genModelInstances("inject-internal-label", 10)
instanceConsoles := genModelInstancesConsole("console", 3)

ports := make(map[string][]string)

Expand Down Expand Up @@ -362,6 +419,9 @@ func TestInstanceCache_fillIntrnalLabels(t *testing.T) {
gomock.InOrder(storage.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instances, nil))
gomock.InOrder(storage.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instanceConsoles, nil))
gomock.InOrder(storage.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(10), nil))
if err := ic.Update(); err != nil {
t.Fatalf("error: %s", err.Error())
Expand Down Expand Up @@ -393,6 +453,7 @@ func TestGetInstancesByServiceID(t *testing.T) {
instances1 := genModelInstances("my-services", instances1Count)
instances2 := genModelInstances("my-services-a", instances2Count)
// instances2 = append(instances2, instances1...)
instanceConsoles := genModelInstancesConsole("console", 3)

ret := make(map[string]*model.Instance)
for id, instance := range instances1 {
Expand All @@ -405,6 +466,9 @@ func TestGetInstancesByServiceID(t *testing.T) {
gomock.InOrder(storage.EXPECT().
GetMoreInstances(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(ret, nil))
gomock.InOrder(storage.EXPECT().
GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), ic.IsFirstUpdate(), ic.needMeta, ic.systemServiceID).
Return(instanceConsoles, nil))
gomock.InOrder(storage.EXPECT().
GetInstancesCountTx(gomock.Any()).
Return(uint32(instances1Count+instances2Count), nil))
Expand Down
1 change: 1 addition & 0 deletions cache/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func TestServiceCache_GetServicesByFilter(t *testing.T) {
mockStore.EXPECT().GetInstancesCountTx(gomock.Any()).Return(uint32(len(instances)), nil).AnyTimes()
mockStore.EXPECT().GetMoreServices(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(services, nil).AnyTimes()
mockStore.EXPECT().GetMoreInstances(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(instances, nil).AnyTimes()
mockStore.EXPECT().GetMoreInstanceConsoles(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes()
ic.setInstances(instances)

hostToService := make(map[string]string)
Expand Down
8 changes: 8 additions & 0 deletions common/model/naming.go
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,14 @@ type InstanceEvent struct {
MetaData map[string]string
}

// instance info from console
type InstanceConsole struct {
Id string
Isolate bool
Weight int8
Metadata string
}

// InjectMetadata 从context中获取metadata并注入到事件对象
func (i *InstanceEvent) InjectMetadata(ctx context.Context) {
value := ctx.Value(CtxEventKeyMetadata)
Expand Down
25 changes: 25 additions & 0 deletions store/boltdb/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,31 @@
return toInstance(instances), nil
}

// GetMoreInstancesConsoles 获取增量修改数据
func (ins *instanceStore) GetMoreInstanceConsoles(tx store.Tx, mtime time.Time, firstUpdate, needMeta bool,
serviceID []string) (map[string]*model.InstanceConsole, error) {
//Todo: get from boltdb
return nil, nil
}

// DeleteInstanceConsole 逻辑删除instanceConsole
func (ins *instanceStore) DeleteInstanceConsole(instanceConsoleID string) error {

Check warning on line 583 in store/boltdb/instance.go

View check run for this annotation

Codecov / codecov/patch

store/boltdb/instance.go#L583

Added line #L583 was not covered by tests
//Todo: delete from boltdb
return nil

Check warning on line 585 in store/boltdb/instance.go

View check run for this annotation

Codecov / codecov/patch

store/boltdb/instance.go#L585

Added line #L585 was not covered by tests
}

// CleanInstanceConsole 物理删除instanceConsole
func (ins *instanceStore) CleanInstanceConsole(instanceConsoleID string) error {

Check warning on line 589 in store/boltdb/instance.go

View check run for this annotation

Codecov / codecov/patch

store/boltdb/instance.go#L589

Added line #L589 was not covered by tests
//Todo: clean from boltdb
return nil

Check warning on line 591 in store/boltdb/instance.go

View check run for this annotation

Codecov / codecov/patch

store/boltdb/instance.go#L591

Added line #L591 was not covered by tests
}

// UpdateInstanceConsole 更新instanceConsole
func (ins *instanceStore) UpdateInstanceConsole(instanceConsole *model.InstanceConsole) error {

Check warning on line 595 in store/boltdb/instance.go

View check run for this annotation

Codecov / codecov/patch

store/boltdb/instance.go#L595

Added line #L595 was not covered by tests
//Todo: update from boltdb
return nil

Check warning on line 597 in store/boltdb/instance.go

View check run for this annotation

Codecov / codecov/patch

store/boltdb/instance.go#L597

Added line #L597 was not covered by tests
}

// BatchSetInstanceHealthStatus 批量设置实例的健康状态
func (i *instanceStore) BatchSetInstanceHealthStatus(ids []interface{}, healthy int, revision string) error {
for _, id := range ids {
Expand Down
4 changes: 4 additions & 0 deletions store/discover_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,15 @@ type InstanceStore interface {
BatchAddInstances(instances []*model.Instance) error
// UpdateInstance 更新实例
UpdateInstance(instance *model.Instance) error
UpdateInstanceConsole(instanceConsole *model.InstanceConsole) error
// DeleteInstance 删除一个实例,实际是把valid置为false
DeleteInstance(instanceID string) error
DeleteInstanceConsole(instanceConsoleID string) error
// BatchDeleteInstances 批量删除实例,flag=1
BatchDeleteInstances(ids []interface{}) error
// CleanInstance 清空一个实例,真正删除
CleanInstance(instanceID string) error
CleanInstanceConsole(instanceConsoleID string) error
// BatchGetInstanceIsolate 检查ID是否存在,并且返回存在的ID,以及ID的隔离状态
BatchGetInstanceIsolate(ids map[string]bool) (map[string]bool, error)
// GetInstancesBrief 获取实例关联的token
Expand All @@ -118,6 +121,7 @@ type InstanceStore interface {
// GetMoreInstances 根据mtime获取增量instances,返回所有store的变更信息
// 此方法用于 cache 增量更新,需要注意 mtime 应为数据库时间戳
GetMoreInstances(tx Tx, mtime time.Time, firstUpdate, needMeta bool, serviceID []string) (map[string]*model.Instance, error)
GetMoreInstanceConsoles(tx Tx, mtime time.Time, firstUpdate, needMeta bool, serviceID []string) (map[string]*model.InstanceConsole, error)
// SetInstanceHealthStatus 设置实例的健康状态
SetInstanceHealthStatus(instanceID string, flag int, revision string) error
// BatchSetInstanceHealthStatus 批量设置实例的健康状态
Expand Down
Loading
Loading