Skip to content

Commit

Permalink
fix: scheduler host address passed to runtime (#1421)
Browse files Browse the repository at this point in the history
* fix: scheduler host address passed to runtime

Signed-off-by: mikeee <[email protected]>

* fix: scheduler client stream initialised for 1.14<

Signed-off-by: mikeee <[email protected]>

* fix: modify scheduler host address validation

if the scheduler container is not active, the scheduler flag will
not be passed to the runtime

Signed-off-by: mikeee <[email protected]>

* fix: lint and refactor

Signed-off-by: mikeee <[email protected]>

---------

Signed-off-by: mikeee <[email protected]>
  • Loading branch information
mikeee authored Jul 10, 2024
1 parent 762e2bb commit ed0d3af
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 79 deletions.
1 change: 0 additions & 1 deletion cmd/annotate.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ func readInputsFromFS(path string) ([]io.Reader, error) {
inputs = append(inputs, file)
return nil
})

if err != nil {
return nil, err
}
Expand Down
111 changes: 61 additions & 50 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
"time"

"golang.org/x/mod/semver"

"github.com/spf13/cobra"
"github.com/spf13/viper"

Expand All @@ -38,34 +40,35 @@ import (
)

var (
appPort int
profilePort int
appID string
configFile string
port int
grpcPort int
internalGRPCPort int
maxConcurrency int
enableProfiling bool
logLevel string
protocol string
componentsPath string
resourcesPaths []string
appSSL bool
metricsPort int
maxRequestBodySize int
readBufferSize int
unixDomainSocket string
enableAppHealth bool
appHealthPath string
appHealthInterval int
appHealthTimeout int
appHealthThreshold int
enableAPILogging bool
apiListenAddresses string
runFilePath string
appChannelAddress string
enableRunK8s bool
appPort int
profilePort int
appID string
configFile string
port int
grpcPort int
internalGRPCPort int
maxConcurrency int
enableProfiling bool
logLevel string
protocol string
componentsPath string
resourcesPaths []string
appSSL bool
metricsPort int
maxRequestBodySize int
readBufferSize int
unixDomainSocket string
enableAppHealth bool
appHealthPath string
appHealthInterval int
appHealthTimeout int
appHealthThreshold int
enableAPILogging bool
apiListenAddresses string
schedulerHostAddress string
runFilePath string
appChannelAddress string
enableRunK8s bool
)

const (
Expand Down Expand Up @@ -120,7 +123,6 @@ dapr run --run-file /path/to/directory -k
Args: cobra.MinimumNArgs(0),
PreRun: func(cmd *cobra.Command, args []string) {
viper.BindPFlag("placement-host-address", cmd.Flags().Lookup("placement-host-address"))
viper.BindPFlag("scheduler-host-address", cmd.Flags().Lookup("scheduler-host-address"))
},
Run: func(cmd *cobra.Command, args []string) {
if len(runFilePath) > 0 {
Expand Down Expand Up @@ -166,26 +168,26 @@ dapr run --run-file /path/to/directory -k
}

sharedRunConfig := &standalone.SharedRunConfig{
ConfigFile: configFile,
EnableProfiling: enableProfiling,
LogLevel: logLevel,
MaxConcurrency: maxConcurrency,
AppProtocol: protocol,
PlacementHostAddr: viper.GetString("placement-host-address"),
SchedulerHostAddr: viper.GetString("scheduler-host-address"),
ComponentsPath: componentsPath,
ResourcesPaths: resourcesPaths,
AppSSL: appSSL,
MaxRequestBodySize: maxRequestBodySize,
HTTPReadBufferSize: readBufferSize,
EnableAppHealth: enableAppHealth,
AppHealthPath: appHealthPath,
AppHealthInterval: appHealthInterval,
AppHealthTimeout: appHealthTimeout,
AppHealthThreshold: appHealthThreshold,
EnableAPILogging: enableAPILogging,
APIListenAddresses: apiListenAddresses,
DaprdInstallPath: daprRuntimePath,
ConfigFile: configFile,
EnableProfiling: enableProfiling,
LogLevel: logLevel,
MaxConcurrency: maxConcurrency,
AppProtocol: protocol,
PlacementHostAddr: viper.GetString("placement-host-address"),
ComponentsPath: componentsPath,
ResourcesPaths: resourcesPaths,
AppSSL: appSSL,
MaxRequestBodySize: maxRequestBodySize,
HTTPReadBufferSize: readBufferSize,
EnableAppHealth: enableAppHealth,
AppHealthPath: appHealthPath,
AppHealthInterval: appHealthInterval,
AppHealthTimeout: appHealthTimeout,
AppHealthThreshold: appHealthThreshold,
EnableAPILogging: enableAPILogging,
APIListenAddresses: apiListenAddresses,
SchedulerHostAddress: schedulerHostAddress,
DaprdInstallPath: daprRuntimePath,
}
output, err := runExec.NewOutput(&standalone.RunConfig{
AppID: appID,
Expand Down Expand Up @@ -227,6 +229,15 @@ dapr run --run-file /path/to/directory -k
output.DaprHTTPPort,
output.DaprGRPCPort)
}

if semver.Compare(fmt.Sprintf("v%v", daprVer.RuntimeVersion), "v1.14.0-rc.1") == -1 {
print.InfoStatusEvent(os.Stdout, "The scheduler is only compatible with dapr runtime 1.14 onwards.")
for i, arg := range output.DaprCMD.Args {
if strings.HasPrefix(arg, "--scheduler-host-address") {
output.DaprCMD.Args[i] = ""
}
}
}
print.InfoStatusEvent(os.Stdout, startInfo)

output.DaprCMD.Stdout = os.Stdout
Expand Down Expand Up @@ -456,7 +467,7 @@ func init() {
// By marking this as deprecated, the flag will be hidden from the help menu, but will continue to work. It will show a warning message when used.
RunCmd.Flags().MarkDeprecated("components-path", "This flag is deprecated and will be removed in the future releases. Use \"resources-path\" flag instead")
RunCmd.Flags().String("placement-host-address", "localhost", "The address of the placement service. Format is either <hostname> for default port or <hostname>:<port> for custom port")
RunCmd.Flags().String("scheduler-host-address", "localhost", "The address of the scheduler service. Format is either <hostname> for default port or <hostname>:<port> for custom port")
RunCmd.Flags().StringVarP(&schedulerHostAddress, "scheduler-host-address", "", "localhost", "The address of the scheduler service. Format is either <hostname> for default port or <hostname>:<port> for custom port")
// TODO: Remove below flag once the flag is removed in runtime in future release.
RunCmd.Flags().BoolVar(&appSSL, "app-ssl", false, "Enable https when Dapr invokes the application")
RunCmd.Flags().MarkDeprecated("app-ssl", "This flag is deprecated and will be removed in the future releases. Use \"app-protocol\" flag with https or grpcs values instead")
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ require (
sigs.k8s.io/yaml v1.4.0
)

require github.com/Masterminds/semver/v3 v3.2.0
require (
github.com/Masterminds/semver/v3 v3.2.0
golang.org/x/mod v0.14.0
)

require (
github.com/alphadose/haxmap v1.3.1 // indirect
Expand All @@ -61,7 +64,6 @@ require (
go.mongodb.org/mongo-driver v1.12.1 // indirect
go.opentelemetry.io/otel/metric v1.23.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/tools v0.17.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240116215550-a9fa1716bcac // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240205150955-31a09d347014 // indirect
Expand Down
2 changes: 0 additions & 2 deletions pkg/kubernetes/renew_certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,13 @@ func RenewCertificate(conf RenewCertificateParams) error {
conf.RootCertificateFilePath,
conf.IssuerCertificateFilePath,
conf.IssuerPrivateKeyFilePath)

if err != nil {
return err
}
} else {
rootCertBytes, issuerCertBytes, issuerKeyBytes, err = GenerateNewCertificates(
conf.ValidUntil,
conf.RootPrivateKeyFilePath)

if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/kubernetes/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ func Uninstall(namespace string, uninstallAll bool, uninstallDev bool, timeout u
}

_, err = uninstallClient.Run(daprReleaseName)

if err != nil {
return err
}
Expand Down
42 changes: 26 additions & 16 deletions pkg/standalone/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package standalone

import (
"context"
"fmt"
"net"
"os"
Expand All @@ -23,6 +24,8 @@ import (
"strconv"
"strings"

dockerClient "github.com/docker/docker/client"

"github.com/Pallinder/sillyname-go"
"github.com/phayes/freeport"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -70,8 +73,6 @@ type SharedRunConfig struct {
MaxConcurrency int `arg:"app-max-concurrency" annotation:"dapr.io/app-max-concurrerncy" yaml:"appMaxConcurrency" default:"-1"`
// Speicifcally omitted from annotations similar to config file path above.
PlacementHostAddr string `arg:"placement-host-address" yaml:"placementHostAddress"`
// Must use env for scheduler host address because using arg would cause a sidecar crash in older daprd versions.
SchedulerHostAddr string `env:"DAPR_SCHEDULER_HOST_ADDRESS" yaml:"schedulerHostAddress"`
// Speicifcally omitted from annotations similar to config file path above.
ComponentsPath string `arg:"components-path"` // Deprecated in run template file: use ResourcesPaths instead.
// Speicifcally omitted from annotations similar to config file path above.
Expand All @@ -89,10 +90,11 @@ type SharedRunConfig struct {
AppHealthThreshold int `arg:"app-health-threshold" annotation:"dapr.io/app-health-threshold" ifneq:"0" yaml:"appHealthThreshold"`
EnableAPILogging bool `arg:"enable-api-logging" annotation:"dapr.io/enable-api-logging" yaml:"enableApiLogging"`
// Specifically omitted from annotations see https://github.com/dapr/cli/issues/1324 .
DaprdInstallPath string `yaml:"runtimePath"`
Env map[string]string `yaml:"env"`
DaprdLogDestination LogDestType `yaml:"daprdLogDestination"`
AppLogDestination LogDestType `yaml:"appLogDestination"`
DaprdInstallPath string `yaml:"runtimePath"`
Env map[string]string `yaml:"env"`
DaprdLogDestination LogDestType `yaml:"daprdLogDestination"`
AppLogDestination LogDestType `yaml:"appLogDestination"`
SchedulerHostAddress string `arg:"scheduler-host-address" yaml:"schedulerHostAddress"`
}

func (meta *DaprMeta) newAppID() string {
Expand Down Expand Up @@ -139,18 +141,27 @@ func (config *RunConfig) validatePlacementHostAddr() error {
}

func (config *RunConfig) validateSchedulerHostAddr() error {
schedulerHostAddr := config.SchedulerHostAddr
if len(schedulerHostAddr) == 0 {
schedulerHostAddr = "localhost"
// If the scheduler isn't running - don't add the flag to the runtime cmd.
docker, err := dockerClient.NewClientWithOpts()
if err != nil {
return err
}
if indx := strings.Index(schedulerHostAddr, ":"); indx == -1 {
if runtime.GOOS == daprWindowsOS {
schedulerHostAddr = fmt.Sprintf("%s:6060", schedulerHostAddr)
} else {
schedulerHostAddr = fmt.Sprintf("%s:50006", schedulerHostAddr)
_, err = docker.ContainerInspect(context.Background(), "dapr_scheduler")
if err == nil {
schedulerHostAddr := config.SchedulerHostAddress
if len(schedulerHostAddr) == 0 {
schedulerHostAddr = "localhost"
}
if indx := strings.Index(schedulerHostAddr, ":"); indx == -1 {
if runtime.GOOS == daprWindowsOS {
schedulerHostAddr = fmt.Sprintf("%s:6060", schedulerHostAddr)
} else {
schedulerHostAddr = fmt.Sprintf("%s:50006", schedulerHostAddr)
}
}
config.SchedulerHostAddress = schedulerHostAddr
return nil
}
config.SchedulerHostAddr = schedulerHostAddr
return nil
}

Expand Down Expand Up @@ -237,7 +248,6 @@ func (config *RunConfig) Validate() error {
}

err = config.validateSchedulerHostAddr()

if err != nil {
return err
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/standalone/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,6 @@ func runZipkin(wg *sync.WaitGroup, errorChan chan<- error, info initInfo) {
args = append(args, imageName)
}
_, err = utils.RunCmdAndWait(runtimeCmd, args...)

if err != nil {
runError := isContainerRunError(err)
if !runError {
Expand Down Expand Up @@ -481,7 +480,6 @@ func runRedis(wg *sync.WaitGroup, errorChan chan<- error, info initInfo) {
args = append(args, imageName)
}
_, err = utils.RunCmdAndWait(runtimeCmd, args...)

if err != nil {
runError := isContainerRunError(err)
if !runError {
Expand Down Expand Up @@ -568,7 +566,6 @@ func runPlacementService(wg *sync.WaitGroup, errorChan chan<- error, info initIn
args = append(args, image)

_, err = utils.RunCmdAndWait(runtimeCmd, args...)

if err != nil {
runError := isContainerRunError(err)
if !runError {
Expand Down Expand Up @@ -668,7 +665,6 @@ func runSchedulerService(wg *sync.WaitGroup, errorChan chan<- error, info initIn
args = append(args, image)

_, err = utils.RunCmdAndWait(runtimeCmd, args...)

if err != nil {
runError := isContainerRunError(err)
if !runError {
Expand Down
1 change: 0 additions & 1 deletion tests/e2e/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,6 @@ func exportCurrentCertificate(daprPath string) error {
os.RemoveAll("./certs")
}
_, err = spawn.Command(daprPath, "mtls", "export", "-o", "./certs")

if err != nil {
return fmt.Errorf("error in exporting certificate %w", err)
}
Expand Down
2 changes: 0 additions & 2 deletions tests/e2e/standalone/windows_run_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func startAppsWithAppLogDestFile(t *testing.T, file string) {
assert.NotContains(t, output, "msg=\"All outstanding components processed\" app_id=emit-metrics")

assert.Contains(t, output, "Received signal to stop Dapr and app processes. Shutting down Dapr and app processes.")

}

func startAppsWithAppLogDestConsole(t *testing.T, file string) {
Expand All @@ -139,5 +138,4 @@ func startAppsWithAppLogDestConsole(t *testing.T, file string) {
assert.NotContains(t, output, "msg=\"All outstanding components processed\" app_id=emit-metrics")

assert.Contains(t, output, "Received signal to stop Dapr and app processes. Shutting down Dapr and app processes.")

}

0 comments on commit ed0d3af

Please sign in to comment.