Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

关于入口流量的端口匹配问题 #1297

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apiserver/xdsserverv3/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (sc *XDSCache) CreateDeltaWatch(request *cachev3.DeltaRequest, state stream
}
item := sc.loadCache(request)
if item == nil {
value <- nil
value <- &NoReadyXdsResponse{}
return func() {}
}
return item.CreateDeltaWatch(request, state, value)
Expand Down
20 changes: 20 additions & 0 deletions apiserver/xdsserverv3/cache/response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cache

import (
"errors"

discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
cachev3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

type NoReadyXdsResponse struct{
cachev3.DeltaResponse
}

func (r *NoReadyXdsResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRequest{
return nil
}

func (r *NoReadyXdsResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscoveryResponse, error){
return nil, errors.New("node xds not created yet")
}
9 changes: 6 additions & 3 deletions apiserver/xdsserverv3/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,14 @@ func (x *XdsResourceGenerator) buildSidecarXDSCache(registryInfo map[string]map[
Name: xdsNode.GetSelfService(),
},
}

if services,ok:=registryInfo[xdsNode.GetSelfNamespace()];ok{
opt.Services=services
}

opt.TrafficDirection = corev3.TrafficDirection_OUTBOUND
// 构建 INBOUND LDS 资源
// 构建 OUTBOUND LDS 资源
x.buildAndDeltaUpdate(resource.LDS, opt)
// 构建 INBOUND RDS 资源
// 构建 OUTBOUND RDS 资源
x.buildAndDeltaUpdate(resource.RDS, opt)
opt.TrafficDirection = corev3.TrafficDirection_INBOUND
// 构建 INBOUND LDS 资源
Expand Down
72 changes: 58 additions & 14 deletions apiserver/xdsserverv3/lds.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/polarismesh/polaris/apiserver/xdsserverv3/resource"
"github.com/polarismesh/polaris/service"
Expand Down Expand Up @@ -135,7 +136,8 @@ func (lds *LDSBuilder) makeListener(option *resource.BuildOption,
}
}

listener := makeDefaultListener(direction, boundHCM, option)
dstPorts := makeListenersMatchDestinationPorts(option)
listener := makeDefaultListener(direction, boundHCM, option, dstPorts)
listener.ListenerFilters = append(listener.ListenerFilters, defaultListenerFilters...)

if option.TLSMode != resource.TLSModeNone {
Expand Down Expand Up @@ -173,11 +175,15 @@ func (lds *LDSBuilder) makeListener(option *resource.BuildOption,
}

func makeDefaultListener(trafficDirection corev3.TrafficDirection,
boundHCM *hcm.HttpConnectionManager, option *resource.BuildOption) *listenerv3.Listener {
boundHCM *hcm.HttpConnectionManager, option *resource.BuildOption, dstPorts []uint32) *listenerv3.Listener {

bindPort := boundBindPort[trafficDirection]
trafficDirectionName := corev3.TrafficDirection_name[int32(trafficDirection)]
ldsName := fmt.Sprintf("%s_%d", trafficDirectionName, bindPort)

filterChain := makeDefaultListenerFilterChain(trafficDirection,
boundHCM, dstPorts)

if trafficDirection == core.TrafficDirection_INBOUND {
ldsName = fmt.Sprintf("%s_%s_%d", option.SelfService.Domain(), trafficDirectionName, bindPort)
}
Expand All @@ -196,20 +202,58 @@ func makeDefaultListener(trafficDirection corev3.TrafficDirection,
},
},
},
FilterChains: []*listenerv3.FilterChain{
{
Filters: []*listenerv3.Filter{
{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listenerv3.Filter_TypedConfig{
TypedConfig: resource.MustNewAny(boundHCM),
},
},
},
},
},
FilterChains: filterChain,
ListenerFilters: []*listenerv3.ListenerFilter{},
}
listener.DefaultFilterChain = resource.MakeDefaultFilterChain()
return listener
}

func makeListenersMatchDestinationPorts(option *resource.BuildOption) []uint32 {
var destination_ports []uint32
selfService := option.SelfService

selfServiceInfo, ok := option.Services[selfService]
if ok && len(selfServiceInfo.Ports) > 0 {
for _, i := range selfServiceInfo.Ports {
destination_ports = append(destination_ports, i.Port)
}
}
return destination_ports

}

func makeDefaultListenerFilterChain(trafficDirection corev3.TrafficDirection,
boundHCM *hcm.HttpConnectionManager, dstPorts []uint32) []*listenerv3.FilterChain {

filterChain := make([]*listenerv3.FilterChain, 0)

defaultHttpFilter := []*listenerv3.Filter{
{
Name: wellknown.HTTPConnectionManager,
ConfigType: &listenerv3.Filter_TypedConfig{
TypedConfig: resource.MustNewAny(boundHCM),
},
},
}

if trafficDirection == core.TrafficDirection_INBOUND {
for _, i := range dstPorts {
filterChain = append(filterChain, &listenerv3.FilterChain{
Filters: defaultHttpFilter,
FilterChainMatch: &listenerv3.FilterChainMatch{
DestinationPort: &wrapperspb.UInt32Value{
Value: i,
},
TransportProtocol: "raw_buffer",
},
})
}
} else {
filterChain = append(filterChain, &listenerv3.FilterChain{
Filters: defaultHttpFilter,
})
}

return filterChain
}
Loading