diff --git a/apiserver/xdsserverv3/cache/cache.go b/apiserver/xdsserverv3/cache/cache.go index 83ba3f24f..a718b7547 100644 --- a/apiserver/xdsserverv3/cache/cache.go +++ b/apiserver/xdsserverv3/cache/cache.go @@ -86,7 +86,7 @@ func (sc *XDSCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state stream } item := sc.loadCache(request) if item == nil { - value <- nil + value <- &NoReadyXdsResponse{} return func() {} } return item.CreateDeltaWatch(request, state, value) diff --git a/apiserver/xdsserverv3/cache/response.go b/apiserver/xdsserverv3/cache/response.go new file mode 100644 index 000000000..d2e074cf5 --- /dev/null +++ b/apiserver/xdsserverv3/cache/response.go @@ -0,0 +1,20 @@ +package cache + +import ( + "errors" + + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3" +) + +type NoReadyXdsResponse struct{ + cachev3.DeltaResponse +} + +func (r *NoReadyXdsResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest{ + return nil +} + +func (r *NoReadyXdsResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error){ + return nil, errors.New("node xds not created yet") +} \ No newline at end of file diff --git a/apiserver/xdsserverv3/generate.go b/apiserver/xdsserverv3/generate.go index 3a712ab33..70a5278d7 100644 --- a/apiserver/xdsserverv3/generate.go +++ b/apiserver/xdsserverv3/generate.go @@ -127,11 +127,14 @@ func (x *XdsResourceGenerator) buildSidecarXDSCache(registryInfo map[string]map[ Name: xdsNode.GetSelfService(), }, } - + if services,ok:=registryInfo[xdsNode.GetSelfNamespace()];ok{ + opt.Services=services + } + opt.TrafficDirection = corev3.TrafficDirection_OUTBOUND - // 构建 INBOUND LDS 资源 + // 构建 OUTBOUND LDS 资源 x.buildAndDeltaUpdate(resource.LDS, opt) - // 构建 INBOUND RDS 资源 + // 构建 OUTBOUND RDS 资源 x.buildAndDeltaUpdate(resource.RDS, opt) opt.TrafficDirection = corev3.TrafficDirection_INBOUND // 构建 INBOUND LDS 资源 diff --git a/apiserver/xdsserverv3/lds.go b/apiserver/xdsserverv3/lds.go index 58bfff6e8..601171588 100644 --- a/apiserver/xdsserverv3/lds.go +++ b/apiserver/xdsserverv3/lds.go @@ -31,6 +31,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/wellknown" "github.com/golang/protobuf/ptypes/wrappers" + "google.golang.org/protobuf/types/known/wrapperspb" "github.com/polarismesh/polaris/apiserver/xdsserverv3/resource" "github.com/polarismesh/polaris/service" @@ -135,7 +136,8 @@ func (lds *LDSBuilder) makeListener(option *resource.BuildOption, } } - listener := makeDefaultListener(direction, boundHCM, option) + dstPorts := makeListenersMatchDestinationPorts(option) + listener := makeDefaultListener(direction, boundHCM, option, dstPorts) listener.ListenerFilters = append(listener.ListenerFilters, defaultListenerFilters...) if option.TLSMode != resource.TLSModeNone { @@ -173,11 +175,15 @@ func (lds *LDSBuilder) makeListener(option *resource.BuildOption, } func makeDefaultListener(trafficDirection corev3.TrafficDirection, - boundHCM *hcm.HttpConnectionManager, option *resource.BuildOption) *listenerv3.Listener { + boundHCM *hcm.HttpConnectionManager, option *resource.BuildOption, dstPorts []uint32) *listenerv3.Listener { bindPort := boundBindPort[trafficDirection] trafficDirectionName := corev3.TrafficDirection_name[int32(trafficDirection)] ldsName := fmt.Sprintf("%s_%d", trafficDirectionName, bindPort) + + filterChain := makeDefaultListenerFilterChain(trafficDirection, + boundHCM, dstPorts) + if trafficDirection == core.TrafficDirection_INBOUND { ldsName = fmt.Sprintf("%s_%s_%d", option.SelfService.Domain(), trafficDirectionName, bindPort) } @@ -196,20 +202,58 @@ func makeDefaultListener(trafficDirection corev3.TrafficDirection, }, }, }, - FilterChains: []*listenerv3.FilterChain{ - { - Filters: []*listenerv3.Filter{ - { - Name: wellknown.HTTPConnectionManager, - ConfigType: &listenerv3.Filter_TypedConfig{ - TypedConfig: resource.MustNewAny(boundHCM), - }, - }, - }, - }, - }, + FilterChains: filterChain, ListenerFilters: []*listenerv3.ListenerFilter{}, } listener.DefaultFilterChain = resource.MakeDefaultFilterChain() return listener } + +func makeListenersMatchDestinationPorts(option *resource.BuildOption) []uint32 { + var destination_ports []uint32 + selfService := option.SelfService + + selfServiceInfo, ok := option.Services[selfService] + if ok && len(selfServiceInfo.Ports) > 0 { + for _, i := range selfServiceInfo.Ports { + destination_ports = append(destination_ports, i.Port) + } + } + return destination_ports + +} + +func makeDefaultListenerFilterChain(trafficDirection corev3.TrafficDirection, + boundHCM *hcm.HttpConnectionManager, dstPorts []uint32) []*listenerv3.FilterChain { + + filterChain := make([]*listenerv3.FilterChain, 0) + + defaultHttpFilter := []*listenerv3.Filter{ + { + Name: wellknown.HTTPConnectionManager, + ConfigType: &listenerv3.Filter_TypedConfig{ + TypedConfig: resource.MustNewAny(boundHCM), + }, + }, + } + + if trafficDirection == core.TrafficDirection_INBOUND { + for _, i := range dstPorts { + filterChain = append(filterChain, &listenerv3.FilterChain{ + Filters: defaultHttpFilter, + FilterChainMatch: &listenerv3.FilterChainMatch{ + DestinationPort: &wrapperspb.UInt32Value{ + Value: i, + }, + TransportProtocol: "raw_buffer", + }, + }) + } + } else { + filterChain = append(filterChain, &listenerv3.FilterChain{ + Filters: defaultHttpFilter, + }) + } + + return filterChain +}