Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions .semaphore/semaphore.yml

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions felix/calc/calc_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type passthruCallbacks interface {
OnGlobalBGPConfigUpdate(*v3.BGPConfiguration)
OnServiceUpdate(*proto.ServiceUpdate)
OnServiceRemove(*proto.ServiceRemove)
OnTyphaRevisionRemove()
OnTyphaRevisionUpdate(tr *model.TyphaRevision)
}

type routeCallbacks interface {
Expand Down
10 changes: 10 additions & 0 deletions felix/calc/dataplane_passthru.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (h *DataplanePassthru) RegisterWith(dispatcher *dispatcher.Dispatcher) {
dispatcher.Register(model.IPPoolKey{}, h.OnUpdate)
dispatcher.Register(model.WireguardKey{}, h.OnUpdate)
dispatcher.Register(model.ResourceKey{}, h.OnUpdate)
dispatcher.Register(model.TyphaRevisionKey{}, h.OnUpdate)
}

func (h *DataplanePassthru) OnUpdate(update api.Update) (filterOut bool) {
Expand Down Expand Up @@ -95,6 +96,15 @@ func (h *DataplanePassthru) OnUpdate(update api.Update) (filterOut bool) {
wg := update.Value.(*model.Wireguard)
h.callbacks.OnWireguardUpdate(key.NodeName, wg)
}
case model.TyphaRevisionKey:
if update.Value == nil {
log.WithField("update", update).Debug("Passing-through typha revision deletion")
h.callbacks.OnTyphaRevisionRemove()
} else {
log.WithField("update", update).Debug("Passing-through typha revision updatee")
tr := update.Value.(*model.TyphaRevision)
h.callbacks.OnTyphaRevisionUpdate(tr)
}
case model.ResourceKey:
if key.Kind == v3.KindBGPConfiguration && key.Name == "default" {
log.WithField("update", update).Debug("Passing through global BGPConfiguration")
Expand Down
42 changes: 42 additions & 0 deletions felix/calc/event_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

v3 "github.com/projectcalico/api/pkg/apis/projectcalico/v3"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/projectcalico/calico/felix/config"
"github.com/projectcalico/calico/felix/ip"
Expand Down Expand Up @@ -89,6 +90,8 @@ type EventSequencer struct {
pendingGlobalBGPConfig *proto.GlobalBGPConfigUpdate
pendingServiceUpdates map[serviceID]*proto.ServiceUpdate
pendingServiceDeletes set.Set[serviceID]
pendingTyphaRevision *model.TyphaRevision
pendingTyphaRevisionDeletion bool

// Sets to record what we've sent downstream. Updated whenever we flush.
sentIPSets set.Set[string]
Expand All @@ -106,6 +109,7 @@ type EventSequencer struct {
sentWireguard set.Set[string]
sentWireguardV6 set.Set[string]
sentServices set.Set[serviceID]
sentTyphaRevision *model.TyphaRevision

Callback EventHandler
}
Expand Down Expand Up @@ -913,6 +917,9 @@ func (buf *EventSequencer) Flush() {
}

buf.flushServices()

// Must be last!
buf.flushTyphaRevision()
}

func (buf *EventSequencer) flushRemovedIPSets() {
Expand Down Expand Up @@ -1214,6 +1221,39 @@ func (buf *EventSequencer) flushServices() {
log.Debug("Done flushing Services")
}

func (buf *EventSequencer) OnTyphaRevisionRemove() {
buf.pendingTyphaRevision = nil
if buf.sentTyphaRevision != nil {
buf.pendingTyphaRevisionDeletion = true
}
}

func (buf *EventSequencer) OnTyphaRevisionUpdate(tr *model.TyphaRevision) {
buf.pendingTyphaRevision = tr
buf.pendingTyphaRevisionDeletion = false
}

func (buf *EventSequencer) flushTyphaRevisionDeletion() {
if buf.pendingTyphaRevisionDeletion {
buf.Callback(&proto.TyphaRevisionRemove{})
buf.sentTyphaRevision = nil
buf.pendingTyphaRevisionDeletion = false
}
}

func (buf *EventSequencer) flushTyphaRevision() {
if buf.pendingTyphaRevision == nil {
return
}
buf.Callback(&proto.TyphaRevisionUpdate{
ServerID: buf.pendingTyphaRevision.ServerID,
Revision: buf.pendingTyphaRevision.Revision,
Timestamp: timestamppb.New(buf.pendingTyphaRevision.Timestamp),
})
buf.sentTyphaRevision = buf.pendingTyphaRevision
buf.pendingTyphaRevision = nil
}

func cidrToIPPoolID(cidr ip.CIDR) string {
return strings.Replace(cidr.String(), "/", "-", 1)
}
Expand Down Expand Up @@ -1301,3 +1341,5 @@ func isVMWorkload(labels uniquelabels.Map) bool {
}
return false
}

var _ PipelineCallbacks = &EventSequencer{}
8 changes: 8 additions & 0 deletions felix/calc/profile_decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@ type passthruCallbackRecorder struct {
nsRemoves []types.NamespaceID
}

func (p *passthruCallbackRecorder) OnTyphaRevisionRemove() {
Fail("OnTyphaRevisionRemove received")
}

func (p *passthruCallbackRecorder) OnTyphaRevisionUpdate(tr *model.TyphaRevision) {
Fail("OnTyphaRevisionUpdate received")
}

func (p *passthruCallbackRecorder) OnHostIPUpdate(hostname string, ip *net.IP) {
Fail("HostIPUpdate received")
}
Expand Down
52 changes: 28 additions & 24 deletions felix/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,31 @@ configRetry:
debugserver.StartDebugPprofServer(configParams.DebugHost, configParams.DebugPort)
}

var typhaConnection *syncclient.SyncerClient
syncerToValidator := dedupebuffer.New()

if typhaDiscoverer.TyphaEnabled() {
// Use a remote Syncer, via the Typha server.
log.Info("Connecting to Typha.")
typhaConnection = syncclient.New(
typhaDiscoverer,
buildinfo.Version,
configParams.FelixHostname,
fmt.Sprintf("Revision: %s; Build date: %s",
buildinfo.GitRevision, buildinfo.BuildDate),
syncerToValidator,
&syncclient.Options{
ReadTimeout: configParams.TyphaReadTimeout,
WriteTimeout: configParams.TyphaWriteTimeout,
KeyFile: configParams.TyphaKeyFile,
CertFile: configParams.TyphaCertFile,
CAFile: configParams.TyphaCAFile,
ServerCN: configParams.TyphaCN,
ServerURISAN: configParams.TyphaURISAN,
},
)
}

// Start up the dataplane driver. This may be the internal go-based driver or an external
// one.
var dpDriver dp.DataplaneDriver
Expand All @@ -449,13 +474,14 @@ configRetry:
}

dpDriver, dpDriverCmd = dp.StartDataplaneDriver(
configParams.Copy(), // Copy to avoid concurrent access.
configParams.Copy(),
healthAggregator,
dpStatsCollector,
configChangedRestartCallback,
fatalErrorCallback,
k8sClientSet,
lookupsCache,
typhaConnection != nil,
)

// Defer reporting ready until we've started the dataplane driver. This
Expand Down Expand Up @@ -536,30 +562,8 @@ configRetry:
// Get a Syncer from the datastore, or a connection to our remote sync daemon, Typha,
// which will feed the calculation graph with updates, bringing Felix into sync.
var syncer Startable
var typhaConnection *syncclient.SyncerClient
syncerToValidator := dedupebuffer.New()

if typhaDiscoverer.TyphaEnabled() {
// Use a remote Syncer, via the Typha server.
log.Info("Connecting to Typha.")
typhaConnection = syncclient.New(
typhaDiscoverer,
buildinfo.Version,
configParams.FelixHostname,
fmt.Sprintf("Revision: %s; Build date: %s",
buildinfo.GitRevision, buildinfo.BuildDate),
syncerToValidator,
&syncclient.Options{
ReadTimeout: configParams.TyphaReadTimeout,
WriteTimeout: configParams.TyphaWriteTimeout,
KeyFile: configParams.TyphaKeyFile,
CertFile: configParams.TyphaCertFile,
CAFile: configParams.TyphaCAFile,
ServerCN: configParams.TyphaCN,
ServerURISAN: configParams.TyphaURISAN,
},
)
} else {
if !typhaDiscoverer.TyphaEnabled() {
// Use the syncer locally.
syncer = felixsyncer.New(backendClient, datastoreConfig.Spec, syncerToValidator, configParams.IsLeader())

Expand Down
2 changes: 2 additions & 0 deletions felix/dataplane/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func StartDataplaneDriver(
fatalErrorCallback func(error),
k8sClientSet *kubernetes.Clientset,
lc *calc.LookupsCache,
typhaEnabled bool,
) (DataplaneDriver, *exec.Cmd) {
if !configParams.IsLeader() {
// Return an inactive dataplane, since we're not the leader.
Expand Down Expand Up @@ -217,6 +218,7 @@ func StartDataplaneDriver(
ResyncInterval: configParams.InterfaceRefreshInterval,
NetlinkTimeout: configParams.NetlinkTimeoutSecs,
},
ReportTyphaStats: typhaEnabled,
RulesConfig: rules.Config{
FlowLogsEnabled: configParams.FlowLogsEnabled(),
NFTables: configParams.NFTablesMode == "Enabled",
Expand Down
1 change: 1 addition & 0 deletions felix/dataplane/driver_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func StartDataplaneDriver(configParams *config.Config,
fatalErrorCallback func(error),
k8sClientSet *kubernetes.Clientset,
_ *calc.LookupsCache,
typhaEnabled bool,
) (DataplaneDriver, *exec.Cmd) {
log.Info("Using Windows dataplane driver.")

Expand Down
2 changes: 2 additions & 0 deletions felix/dataplane/external/ext_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ func WrapPayloadWithEnvelope(msg interface{}, seqNo uint64) (*proto.ToDataplane,
envelope.Payload = &proto.ToDataplane_ServiceUpdate{ServiceUpdate: msg}
case *proto.ServiceRemove:
envelope.Payload = &proto.ToDataplane_ServiceRemove{ServiceRemove: msg}
case *proto.TyphaRevisionUpdate:
envelope.Payload = &proto.ToDataplane_TyphaRevisionUpdate{TyphaRevisionUpdate: msg}

default:
return nil, fmt.Errorf("unknown message type: %T", msg)
Expand Down
75 changes: 73 additions & 2 deletions felix/dataplane/linux/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ var (
Help: "Number of interface address messages processed in each batch. Higher " +
"values indicate we're doing more batching to try to keep up.",
})
gaugeLastAppliedTyphaTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "felix_int_dataplane_last_typha_timestamp_seconds",
Help: "Timestamp of the last Typha revision update that was applied to the dataplane.",
}, []string{"server_id"})
summaryTyphaBatchLatency = cprometheus.NewSummary(prometheus.SummaryOpts{
Name: "felix_int_dataplane_typha_batch_latency_seconds",
Help: "Latency between Typha receiving events from the datastore (and producing an " +
"update batch) and completion of dataplane update.",
})

processStartTime time.Time
zeroKey = wgtypes.Key{}
Expand All @@ -150,9 +159,20 @@ func init() {
prometheus.MustRegister(summaryBatchSize)
prometheus.MustRegister(summaryIfaceBatchSize)
prometheus.MustRegister(summaryAddrBatchSize)
prometheus.MustRegister(gaugeLastAppliedTyphaTimestamp)
prometheus.MustRegister(summaryTyphaBatchLatency)
processStartTime = time.Now()
}

var registerTyphaStatsOnce sync.Once

func ensureTyphaStatsRegistered() {
registerTyphaStatsOnce.Do(func() {
prometheus.MustRegister(gaugeLastAppliedTyphaTimestamp)
prometheus.MustRegister(summaryTyphaBatchLatency)
})
}

type Config struct {
Hostname string
NodeZone string
Expand All @@ -162,6 +182,7 @@ type Config struct {
VXLANMTU int
VXLANMTUV6 int
VXLANPort int
ReportTyphaStats bool

MaxIPSetSize int

Expand Down Expand Up @@ -411,6 +432,10 @@ type InternalDataplane struct {

actions generictables.ActionFactory
newMatch func() generictables.MatchCriteria

lastTyphaRevisionUpdate *proto.TyphaRevisionUpdate
typhaRevisionUpdatePending bool
lastReportedTyphaServerID string
}

const (
Expand Down Expand Up @@ -481,6 +506,10 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
actions: actionSet,
newMatch: newMatchFn,
}
if config.ReportTyphaStats {
log.Info("Using Typha, registering stats.")
ensureTyphaStatsRegistered()
}
dp.applyThrottle.Refill() // Allow the first apply() immediately.
dp.ifaceMonitor.StateCallback = dp.onIfaceStateChange
dp.ifaceMonitor.AddrCallback = dp.onIfaceAddrsChange
Expand Down Expand Up @@ -2294,6 +2323,7 @@ func (d *InternalDataplane) loopUpdatingDataplane() {
d.sendDataplaneInSyncOnce.Do(func() {
d.fromDataplane <- &proto.DataplaneInSync{}
})
d.maybeReportTyphaRevision()
}

d.loopSummarizer.EndOfIteration(applyTime)
Expand Down Expand Up @@ -2351,19 +2381,30 @@ func (d *InternalDataplane) onDatastoreMessage(msg interface{}) {

func (d *InternalDataplane) processMsgFromCalcGraph(msg interface{}) {
if log.IsLevelEnabled(log.InfoLevel) {
log.Infof("Received %T update from calculation graph. msg=%s", msg, proto.MsgStringer{Msg: msg}.String())
if _, ok := msg.(*proto.TyphaRevisionUpdate); ok {
// TyphaRevisionUpdate is very noisy, so log at Debug level.
log.Debugf("Received %T update from calculation graph. msg=%s", msg, proto.MsgStringer{Msg: msg}.String())
} else {
log.Infof("Received %T update from calculation graph. msg=%s", msg, proto.MsgStringer{Msg: msg}.String())
}
}
d.datastoreBatchSize++
d.dataplaneNeedsSync = true
d.recordMsgStat(msg)
for _, mgr := range d.allManagers {
mgr.OnUpdate(msg)
}
switch msg.(type) {
switch msg := msg.(type) {
case *proto.InSync:
log.WithField("timeSinceStart", time.Since(processStartTime)).Info(
"Datastore in sync, flushing the dataplane for the first time...")
d.datastoreInSync = true
case *proto.TyphaRevisionUpdate:
d.lastTyphaRevisionUpdate = msg
d.typhaRevisionUpdatePending = true
case *proto.TyphaRevisionRemove:
d.lastTyphaRevisionUpdate = nil
d.typhaRevisionUpdatePending = true
}
}

Expand Down Expand Up @@ -2802,6 +2843,36 @@ func (d *InternalDataplane) reportHealth() {
}
}

func (d *InternalDataplane) maybeReportTyphaRevision() {
if !d.typhaRevisionUpdatePending {
return
}
if d.lastTyphaRevisionUpdate == nil {
// Had a revision but now it's been deleted. (Perhaps we reconnected to
// Typha, and it had been downgraded so we didn't get a revision.)
gaugeLastAppliedTyphaTimestamp.Reset()
return
}
serverID := d.lastTyphaRevisionUpdate.ServerID
if d.lastReportedTyphaServerID != serverID {
// If we switch Typha, clear out the previous set of labels so that
// we don't continue to report the old Typha's timestamp.
gaugeLastAppliedTyphaTimestamp.Reset()
}

revCreationTime := d.lastTyphaRevisionUpdate.Timestamp.AsTime()
unixTime := float64(revCreationTime.UnixMicro()) / 1e6
gaugeLastAppliedTyphaTimestamp.WithLabelValues(serverID).Set(unixTime)

d.lastReportedTyphaServerID = serverID
if revCreationTime.After(processStartTime) {
// Avoid reporting a huge latency spike when the process starts. The
// initial Typha breadcrumb may be hours old if the datastore is quiet!
summaryTyphaBatchLatency.Observe(time.Since(revCreationTime).Seconds())
}
d.typhaRevisionUpdatePending = false
}

type dummyLock struct{}

func (d dummyLock) Lock() {
Expand Down
Loading