Skip to content

Commit

Permalink
Replace env var append function with merge with replace function
Browse files Browse the repository at this point in the history
  • Loading branch information
deadlycoconuts committed May 24, 2024
1 parent 94cd64f commit bb8360c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 18 deletions.
32 changes: 16 additions & 16 deletions api/turing/cluster/servicebuilder/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (sb *clusterSvcBuilder) buildRouterEnvs(
envs := sb.getEnvVars(ver.ResourceRequest, nil)

// Add app name, router timeout, jaeger collector
envs = append(envs,
envs = mergeEnvVars(envs,
[]corev1.EnvVar{
{Name: envAppName, Value: fmt.Sprintf("%s-%d.%s", ver.Router.Name, ver.Version, namespace)},
{Name: envAppEnvironment, Value: environmentType},
Expand All @@ -223,16 +223,16 @@ func (sb *clusterSvcBuilder) buildRouterEnvs(
{Name: envRouterProtocol, Value: string(ver.Protocol)},
{Name: envSentryEnabled, Value: strconv.FormatBool(sentryEnabled)},
{Name: envSentryDSN, Value: sentryDSN},
}...)
})

// Add enricher / ensembler related env vars, if enabled
if ver.Enricher != nil {
endpoint := buildPrePostProcessorEndpoint(ver, namespace,
ComponentTypes.Enricher, ver.Enricher.Endpoint)
envs = append(envs, []corev1.EnvVar{
envs = mergeEnvVars(envs, []corev1.EnvVar{
{Name: envEnricherEndpoint, Value: endpoint},
{Name: envEnricherTimeout, Value: ver.Enricher.Timeout},
}...)
})
}
if ver.HasDockerConfig() {
endpoint := buildPrePostProcessorEndpoint(
Expand All @@ -241,28 +241,28 @@ func (sb *clusterSvcBuilder) buildRouterEnvs(
ComponentTypes.Ensembler,
ver.Ensembler.DockerConfig.Endpoint,
)
envs = append(envs, []corev1.EnvVar{
envs = mergeEnvVars(envs, []corev1.EnvVar{
{Name: envEnsemblerEndpoint, Value: endpoint},
{Name: envEnsemblerTimeout, Value: ver.Ensembler.DockerConfig.Timeout},
}...)
})
}

// Add exp engine secret path as env var if service account key file path is specified for exp engine
if ver.ExperimentEngine != nil && ver.ExperimentEngine.ServiceAccountKeyFilePath != nil {
envs = append(envs, []corev1.EnvVar{
envs = mergeEnvVars(envs, []corev1.EnvVar{
{Name: envExpGoogleApplicationCredentials, Value: secretMountPathExpEngine + secretKeyNameExpEngine},
}...)
})
}

// Process Log config
logConfig := ver.LogConfig
envs = append(envs, []corev1.EnvVar{
envs = mergeEnvVars(envs, []corev1.EnvVar{
{Name: envLogLevel, Value: string(logConfig.LogLevel)},
{Name: envCustomMetrics, Value: strconv.FormatBool(logConfig.CustomMetricsEnabled)},
{Name: envJaegerEnabled, Value: strconv.FormatBool(logConfig.JaegerEnabled)},
{Name: envResultLogger, Value: string(logConfig.ResultLoggerType)},
{Name: envFiberDebugLog, Value: strconv.FormatBool(logConfig.FiberDebugLogEnabled)},
}...)
})

// Add BQ config
switch logConfig.ResultLoggerType {
Expand All @@ -275,29 +275,29 @@ func (sb *clusterSvcBuilder) buildRouterEnvs(
return envs, fmt.Errorf("Invalid BigQuery table name %s",
logConfig.BigQueryConfig.Table)
}
envs = append(envs, []corev1.EnvVar{
envs = mergeEnvVars(envs, []corev1.EnvVar{
{Name: envGcpProject, Value: bqFQN[0]},
{Name: envBQDataset, Value: bqFQN[1]},
{Name: envBQTable, Value: bqFQN[2]},
{Name: envBQBatchLoad, Value: strconv.FormatBool(logConfig.BigQueryConfig.BatchLoad)},
{Name: envGoogleApplicationCredentials, Value: secretMountPathRouter + secretKeyNameRouter},
}...)
})
if logConfig.BigQueryConfig.BatchLoad {
envs = append(envs, []corev1.EnvVar{
envs = mergeEnvVars(envs, []corev1.EnvVar{
{Name: envFluentdHost, Value: buildFluentdHost(ver, namespace)},
{Name: envFluentdPort, Value: strconv.Itoa(fluentdPort)},
{Name: envFluentdTag, Value: routerDefaults.FluentdConfig.Tag},
}...)
})
}
case models.KafkaLogger, models.UPILogger:
// UPILogger's kafka details are created in BuildRouterVersion so that information are persisted in DB
envs = append(envs, []corev1.EnvVar{
envs = mergeEnvVars(envs, []corev1.EnvVar{
{Name: envKafkaBrokers, Value: logConfig.KafkaConfig.Brokers},
{Name: envKafkaTopic, Value: logConfig.KafkaConfig.Topic},
{Name: envKafkaSerializationFormat, Value: string(logConfig.KafkaConfig.SerializationFormat)},
{Name: envKafkaMaxMessageBytes, Value: strconv.Itoa(routerDefaults.KafkaConfig.MaxMessageBytes)},
{Name: envKafkaCompressionType, Value: routerDefaults.KafkaConfig.CompressionType},
}...)
})
}

return envs, nil
Expand Down
24 changes: 22 additions & 2 deletions api/turing/cluster/servicebuilder/service_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,10 @@ func (sb *clusterSvcBuilder) getEnvVars(resourceRequest *models.ResourceRequest,
userEnvVars *models.EnvVars) (newEnvVars []corev1.EnvVar) {
if resourceRequest != nil && (resourceRequest.CPULimit == nil || resourceRequest.CPULimit.IsZero()) &&
sb.knativeServiceConfig.UserContainerCPULimitRequestFactor == 0 {
newEnvVars = append(newEnvVars, sb.knativeServiceConfig.DefaultEnvVarsWithoutCPULimits...)
newEnvVars = mergeEnvVars(newEnvVars, sb.knativeServiceConfig.DefaultEnvVarsWithoutCPULimits)
}
if userEnvVars != nil {
newEnvVars = append(newEnvVars, userEnvVars.ToKubernetesEnvVars()...)
newEnvVars = mergeEnvVars(newEnvVars, userEnvVars.ToKubernetesEnvVars())
}
return
}
Expand All @@ -460,3 +460,23 @@ func buildLabels(
}
return labeller.BuildLabels(r)
}

// mergeEnvVars merges multiple sets of environment variables and return the merging result.
// All the EnvVars passed as arguments will be not mutated.
// EnvVars to the right have higher precedence.
func mergeEnvVars(left []corev1.EnvVar, rightEnvVars ...[]corev1.EnvVar) []corev1.EnvVar {
for _, right := range rightEnvVars {
envIndexMap := make(map[string]int, len(left)+len(right))
for index, ev := range left {
envIndexMap[ev.Name] = index
}
for _, add := range right {
if index, exist := envIndexMap[add.Name]; exist {
left[index].Value = add.Value
} else {
left = append(left, add)
}
}
}
return left
}

0 comments on commit bb8360c

Please sign in to comment.