diff --git a/docs/reference/api.md b/docs/reference/api.md
index 6d3b803d524..4b495fef69e 100644
--- a/docs/reference/api.md
+++ b/docs/reference/api.md
@@ -246,7 +246,7 @@ _Appears in:_
| `entrypoint` _string_ | Entrypoint represents the command to start execution. | | |
| `runtimeEnvYAML` _string_ | RuntimeEnvYAML represents the runtime environment configuration
provided as a multi-line YAML string. | | |
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
-| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.
In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.
In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.
In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster. | K8sJobMode | |
+| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.
In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.
In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.
In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster.
In "SidecarMode", the KubeRay operator injects a container into the Ray head Pod that acts as the job submitter to submit the Ray job. | K8sJobMode | |
| `entrypointResources` _string_ | EntrypointResources specifies the custom resources and quantities to reserve for the
entrypoint command. | | |
| `entrypointNumCpus` _float_ | EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command. | | |
| `entrypointNumGpus` _float_ | EntrypointNumGpus specifies the number of gpus to reserve for the entrypoint command. | | |
diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go
index 1ada1e1ae7d..54a2ef7bce2 100644
--- a/ray-operator/apis/ray/v1/rayjob_types.go
+++ b/ray-operator/apis/ray/v1/rayjob_types.go
@@ -82,6 +82,7 @@ const (
K8sJobMode JobSubmissionMode = "K8sJobMode" // Submit job via Kubernetes Job
HTTPMode JobSubmissionMode = "HTTPMode" // Submit job via HTTP request
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
+ SidecarMode JobSubmissionMode = "SidecarMode" // Submit job via a sidecar container in the Ray head Pod
)
type DeletionPolicyType string
@@ -174,6 +175,7 @@ type RayJobSpec struct {
// In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.
// In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.
// In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster.
+ // In "SidecarMode", the KubeRay operator injects a container into the Ray head Pod that acts as the job submitter to submit the Ray job.
// +kubebuilder:default:=K8sJobMode
// +optional
SubmissionMode JobSubmissionMode `json:"submissionMode,omitempty"`
diff --git a/ray-operator/config/samples/ray-job.sidecar-mode.yaml b/ray-operator/config/samples/ray-job.sidecar-mode.yaml
new file mode 100644
index 00000000000..ac889f6dc6d
--- /dev/null
+++ b/ray-operator/config/samples/ray-job.sidecar-mode.yaml
@@ -0,0 +1,100 @@
+apiVersion: ray.io/v1
+kind: RayJob
+metadata:
+ name: rayjob-sidecar-mode
+spec:
+ # In SidecarMode, the KubeRay operator injects a container into the Ray head Pod to submit the Ray job and tail logs.
+ # This will avoid inter-Pod communication, which may cause network issues. For example, some users face WebSocket hangs.
+ # For more details, see https://github.com/ray-project/kuberay/issues/3928#issuecomment-3187164736.
+ submissionMode: "SidecarMode"
+ entrypoint: python /home/ray/samples/sample_code.py
+ runtimeEnvYAML: |
+ pip:
+ - requests==2.26.0
+ - pendulum==2.1.2
+ env_vars:
+ counter_name: "test_counter"
+
+ rayClusterSpec:
+ rayVersion: '2.46.0'
+ headGroupSpec:
+ rayStartParams: {}
+ template:
+ spec:
+ containers:
+ - name: ray-head
+ image: rayproject/ray:2.46.0
+ ports:
+ - containerPort: 6379
+ name: gcs-server
+ - containerPort: 8265
+ name: dashboard
+ - containerPort: 10001
+ name: client
+ resources:
+ limits:
+ cpu: "1"
+ requests:
+ cpu: "200m"
+ volumeMounts:
+ - mountPath: /home/ray/samples
+ name: code-sample
+ volumes:
+ - name: code-sample
+ configMap:
+ name: ray-job-code-sample
+ items:
+ - key: sample_code.py
+ path: sample_code.py
+ workerGroupSpecs:
+ - replicas: 1
+ minReplicas: 1
+ maxReplicas: 5
+ groupName: small-group
+ rayStartParams: {}
+ template:
+ spec:
+ containers:
+ - name: ray-worker
+ image: rayproject/ray:2.46.0
+ resources:
+ limits:
+ cpu: "1"
+ requests:
+ cpu: "200m"
+
+---
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: ray-job-code-sample
+data:
+ sample_code.py: |
+ import ray
+ import os
+ import requests
+
+ ray.init()
+
+ @ray.remote
+ class Counter:
+ def __init__(self):
+ # Used to verify runtimeEnv
+ self.name = os.getenv("counter_name")
+ assert self.name == "test_counter"
+ self.counter = 0
+
+ def inc(self):
+ self.counter += 1
+
+ def get_counter(self):
+ return "{} got {}".format(self.name, self.counter)
+
+ counter = Counter.remote()
+
+ for _ in range(5):
+ ray.get(counter.inc.remote())
+ print(ray.get(counter.get_counter.remote()))
+
+ # Verify that the correct runtime env was used for the job.
+ assert requests.__version__ == "2.26.0"
diff --git a/ray-operator/controllers/ray/common/job.go b/ray-operator/controllers/ray/common/job.go
index b47421bade1..6d7e04c53b6 100644
--- a/ray-operator/controllers/ray/common/job.go
+++ b/ray-operator/controllers/ray/common/job.go
@@ -54,9 +54,29 @@ func GetMetadataJson(metadata map[string]string, rayVersion string) (string, err
return pkgutils.ConvertByteSliceToString(metadataBytes), nil
}
-// GetK8sJobCommand builds the K8s job command for the Ray job.
-func GetK8sJobCommand(rayJobInstance *rayv1.RayJob) ([]string, error) {
- address := rayJobInstance.Status.DashboardURL
+// BuildJobSubmitCommand builds the `ray job submit` command based on submission mode.
+func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.JobSubmissionMode) ([]string, error) {
+ var address string
+ port := utils.DefaultDashboardPort
+
+ switch submissionMode {
+ case rayv1.SidecarMode:
+ // The sidecar submitter shares the same network namespace as the Ray dashboard,
+ // so it uses 127.0.0.1 to connect to the Ray dashboard.
+ rayHeadContainer := rayJobInstance.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex]
+ port = utils.FindContainerPort(&rayHeadContainer, utils.DashboardPortName, utils.DefaultDashboardPort)
+ address = "http://127.0.0.1:" + strconv.Itoa(port)
+ case rayv1.K8sJobMode:
+ // Submitter is a separate K8s Job; use cluster dashboard address.
+ address = rayJobInstance.Status.DashboardURL
+ if !strings.HasPrefix(address, "http://") {
+ address = "http://" + address
+ }
+ default:
+ return nil, fmt.Errorf("unsupported submission mode for job submit command: %s", submissionMode)
+ }
+
+ var cmd []string
metadata := rayJobInstance.Spec.Metadata
jobId := rayJobInstance.Status.JobId
entrypoint := strings.TrimSpace(rayJobInstance.Spec.Entrypoint)
@@ -64,11 +84,7 @@ func GetK8sJobCommand(rayJobInstance *rayv1.RayJob) ([]string, error) {
entrypointNumGpus := rayJobInstance.Spec.EntrypointNumGpus
entrypointResources := rayJobInstance.Spec.EntrypointResources
- // add http:// if needed
- if !strings.HasPrefix(address, "http://") {
- address = "http://" + address
- }
-
+ // In K8sJobMode, we need to avoid submitting the job twice, since the job submitter might retry.
// `ray job submit` alone doesn't handle duplicated submission gracefully. See https://github.com/ray-project/kuberay/issues/2154.
// In order to deal with that, we use `ray job status` first to check if the jobId has been submitted.
// If the jobId has been submitted, we use `ray job logs` to follow the logs.
@@ -76,19 +92,49 @@ func GetK8sJobCommand(rayJobInstance *rayv1.RayJob) ([]string, error) {
// if ! ray job status --address http://$RAY_ADDRESS $RAY_JOB_SUBMISSION_ID >/dev/null 2>&1 ;
// then ray job submit --address http://$RAY_ADDRESS --submission-id $RAY_JOB_SUBMISSION_ID --no-wait -- ... ;
// fi ; ray job logs --address http://$RAY_ADDRESS --follow $RAY_JOB_SUBMISSION_ID
+ // In Sidecar mode, the sidecar container's restart policy is set to Never, so duplicated submission won't happen.
jobStatusCommand := []string{"ray", "job", "status", "--address", address, jobId, ">/dev/null", "2>&1"}
+ jobSubmitCommand := []string{"ray", "job", "submit", "--address", address}
jobFollowCommand := []string{"ray", "job", "logs", "--address", address, "--follow", jobId}
- jobSubmitCommand := []string{"ray", "job", "submit", "--address", address, "--no-wait"}
- k8sJobCommand := append([]string{"if", "!"}, jobStatusCommand...)
- k8sJobCommand = append(k8sJobCommand, ";", "then")
- k8sJobCommand = append(k8sJobCommand, jobSubmitCommand...)
+
+ if submissionMode == rayv1.SidecarMode {
+ // Wait until Ray Dashboard GCS is healthy before proceeding.
+ // Use the same Ray Dashboard GCS health check command as the readiness probe
+ rayDashboardGCSHealthCommand := fmt.Sprintf(
+ utils.BaseWgetHealthCommand,
+ utils.DefaultReadinessProbeFailureThreshold,
+ port,
+ utils.RayDashboardGCSHealthPath,
+ )
+
+ waitLoop := []string{
+ "until", rayDashboardGCSHealthCommand, ">/dev/null", "2>&1", ";",
+ "do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
+ }
+ cmd = append(cmd, waitLoop...)
+ }
+
+ // In Sidecar mode, we only support RayJob level retry, which means that the submitter retry won't happen,
+ // so we won't have to check if the job has been submitted.
+ if submissionMode == rayv1.K8sJobMode {
+ // Only check job status in K8s mode to handle duplicated submission gracefully
+ cmd = append(cmd, "if", "!")
+ cmd = append(cmd, jobStatusCommand...)
+ cmd = append(cmd, ";", "then")
+ }
+
+ cmd = append(cmd, jobSubmitCommand...)
+
+ if submissionMode == rayv1.K8sJobMode {
+ cmd = append(cmd, "--no-wait")
+ }
runtimeEnvJson, err := getRuntimeEnvJson(rayJobInstance)
if err != nil {
return nil, err
}
if len(runtimeEnvJson) > 0 {
- k8sJobCommand = append(k8sJobCommand, "--runtime-env-json", strconv.Quote(runtimeEnvJson))
+ cmd = append(cmd, "--runtime-env-json", strconv.Quote(runtimeEnvJson))
}
if len(metadata) > 0 {
@@ -96,30 +142,33 @@ func GetK8sJobCommand(rayJobInstance *rayv1.RayJob) ([]string, error) {
if err != nil {
return nil, err
}
- k8sJobCommand = append(k8sJobCommand, "--metadata-json", strconv.Quote(metadataJson))
+ cmd = append(cmd, "--metadata-json", strconv.Quote(metadataJson))
}
if len(jobId) > 0 {
- k8sJobCommand = append(k8sJobCommand, "--submission-id", jobId)
+ cmd = append(cmd, "--submission-id", jobId)
}
if entrypointNumCpus > 0 {
- k8sJobCommand = append(k8sJobCommand, "--entrypoint-num-cpus", fmt.Sprintf("%f", entrypointNumCpus))
+ cmd = append(cmd, "--entrypoint-num-cpus", fmt.Sprintf("%f", entrypointNumCpus))
}
if entrypointNumGpus > 0 {
- k8sJobCommand = append(k8sJobCommand, "--entrypoint-num-gpus", fmt.Sprintf("%f", entrypointNumGpus))
+ cmd = append(cmd, "--entrypoint-num-gpus", fmt.Sprintf("%f", entrypointNumGpus))
}
if len(entrypointResources) > 0 {
- k8sJobCommand = append(k8sJobCommand, "--entrypoint-resources", strconv.Quote(entrypointResources))
+ cmd = append(cmd, "--entrypoint-resources", strconv.Quote(entrypointResources))
}
// "--" is used to separate the entrypoint from the Ray Job CLI command and its arguments.
- k8sJobCommand = append(k8sJobCommand, "--", entrypoint, ";", "fi", ";")
- k8sJobCommand = append(k8sJobCommand, jobFollowCommand...)
+ cmd = append(cmd, "--", entrypoint, ";")
+ if submissionMode == rayv1.K8sJobMode {
+ cmd = append(cmd, "fi", ";")
+ cmd = append(cmd, jobFollowCommand...)
+ }
- return k8sJobCommand, nil
+ return cmd, nil
}
// GetDefaultSubmitterTemplate creates a default submitter template for the Ray job.
@@ -127,23 +176,28 @@ func GetDefaultSubmitterTemplate(rayClusterInstance *rayv1.RayCluster) corev1.Po
return corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
- {
- Name: "ray-job-submitter",
- // Use the image of the Ray head to be defensive against version mismatch issues
- Image: rayClusterInstance.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Image,
- Resources: corev1.ResourceRequirements{
- Limits: corev1.ResourceList{
- corev1.ResourceCPU: resource.MustParse("1"),
- corev1.ResourceMemory: resource.MustParse("1Gi"),
- },
- Requests: corev1.ResourceList{
- corev1.ResourceCPU: resource.MustParse("500m"),
- corev1.ResourceMemory: resource.MustParse("200Mi"),
- },
- },
- },
+ GetDefaultSubmitterContainer(rayClusterInstance),
},
RestartPolicy: corev1.RestartPolicyNever,
},
}
}
+
+// GetDefaultSubmitterContainer creates a default submitter container for the Ray job.
+func GetDefaultSubmitterContainer(rayClusterInstance *rayv1.RayCluster) corev1.Container {
+ return corev1.Container{
+ Name: utils.SubmitterContainerName,
+ // Use the image of the Ray head to be defensive against version mismatch issues
+ Image: rayClusterInstance.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Image,
+ Resources: corev1.ResourceRequirements{
+ Limits: corev1.ResourceList{
+ corev1.ResourceCPU: resource.MustParse("1"),
+ corev1.ResourceMemory: resource.MustParse("1Gi"),
+ },
+ Requests: corev1.ResourceList{
+ corev1.ResourceCPU: resource.MustParse("500m"),
+ corev1.ResourceMemory: resource.MustParse("200Mi"),
+ },
+ },
+ }
+}
diff --git a/ray-operator/controllers/ray/common/job_test.go b/ray-operator/controllers/ray/common/job_test.go
index f5ce1aa07fa..b4b8fc5d7b1 100644
--- a/ray-operator/controllers/ray/common/job_test.go
+++ b/ray-operator/controllers/ray/common/job_test.go
@@ -2,6 +2,7 @@ package common
import (
"encoding/json"
+ "fmt"
"strconv"
"testing"
@@ -75,7 +76,7 @@ func TestGetMetadataJson(t *testing.T) {
assert.JSONEq(t, expected, metadataJson)
}
-func TestGetK8sJobCommand(t *testing.T) {
+func TestBuildJobSubmitCommandWithK8sJobMode(t *testing.T) {
testRayJob := rayJobTemplate()
expected := []string{
"if",
@@ -93,12 +94,51 @@ func TestGetK8sJobCommand(t *testing.T) {
";", "fi", ";",
"ray", "job", "logs", "--address", "http://127.0.0.1:8265", "--follow", "testJobId",
}
- command, err := GetK8sJobCommand(testRayJob)
+ command, err := BuildJobSubmitCommand(testRayJob, rayv1.K8sJobMode)
require.NoError(t, err)
assert.Equal(t, expected, command)
}
-func TestGetK8sJobCommandWithYAML(t *testing.T) {
+func TestBuildJobSubmitCommandWithSidecarMode(t *testing.T) {
+ testRayJob := rayJobTemplate()
+ testRayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.Containers = []corev1.Container{
+ {
+ Ports: []corev1.ContainerPort{
+ {
+ Name: utils.DashboardPortName,
+ ContainerPort: utils.DefaultDashboardPort,
+ },
+ },
+ },
+ }
+
+ expected := []string{
+ "until",
+ fmt.Sprintf(
+ utils.BaseWgetHealthCommand,
+ utils.DefaultReadinessProbeFailureThreshold,
+ utils.DefaultDashboardPort,
+ utils.RayDashboardGCSHealthPath,
+ ),
+ ">/dev/null", "2>&1", ";",
+ "do", "echo", strconv.Quote("Waiting for Ray Dashboard GCS to become healthy at http://127.0.0.1:8265 ..."), ";", "sleep", "2", ";", "done", ";",
+ "ray", "job", "submit", "--address", "http://127.0.0.1:8265",
+ "--runtime-env-json", strconv.Quote(`{"test":"test"}`),
+ "--metadata-json", strconv.Quote(`{"testKey":"testValue"}`),
+ "--submission-id", "testJobId",
+ "--entrypoint-num-cpus", "1.000000",
+ "--entrypoint-num-gpus", "0.500000",
+ "--entrypoint-resources", strconv.Quote(`{"Custom_1": 1, "Custom_2": 5.5}`),
+ "--",
+ "echo no quote 'single quote' \"double quote\"",
+ ";",
+ }
+ command, err := BuildJobSubmitCommand(testRayJob, rayv1.SidecarMode)
+ require.NoError(t, err)
+ assert.Equal(t, expected, command)
+}
+
+func TestBuildJobSubmitCommandWithK8sJobModeAndYAML(t *testing.T) {
rayJobWithYAML := &rayv1.RayJob{
Spec: rayv1.RayJobSpec{
RuntimeEnvYAML: `
@@ -131,7 +171,7 @@ pip: ["python-multipart==0.0.6"]
";", "fi", ";",
"ray", "job", "logs", "--address", "http://127.0.0.1:8265", "--follow", "testJobId",
}
- command, err := GetK8sJobCommand(rayJobWithYAML)
+ command, err := BuildJobSubmitCommand(rayJobWithYAML, rayv1.K8sJobMode)
require.NoError(t, err)
// Ensure the slices are the same length.
diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go
index 23f25bea924..4f44657b90a 100644
--- a/ray-operator/controllers/ray/rayjob_controller.go
+++ b/ray-operator/controllers/ray/rayjob_controller.go
@@ -236,19 +236,14 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
break
}
- job := &batchv1.Job{}
- if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode {
- // If the submitting Kubernetes Job reaches the backoff limit, transition the status to `Complete` or `Failed`.
- // This is because, beyond this point, it becomes impossible for the submitter to submit any further Ray jobs.
- // For light-weight mode, we don't transition the status to `Complete` or `Failed` based on the number of failed
- // requests. Instead, users can use the `ActiveDeadlineSeconds` to ensure that the RayJob in the light-weight
- // mode is not stuck in the `Running` status indefinitely.
- namespacedName := common.RayJobK8sJobNamespacedName(rayJobInstance)
- if err := r.Client.Get(ctx, namespacedName, job); err != nil {
- logger.Error(err, "Failed to get the submitter Kubernetes Job for RayJob", "NamespacedName", namespacedName)
+ var isSubmitterFinished bool
+ if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode || rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
+ var shouldUpdate bool
+ shouldUpdate, isSubmitterFinished, err = r.checkSubmitterAndUpdateStatusIfNeeded(ctx, rayJobInstance)
+ if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
- if shouldUpdate := checkK8sJobAndUpdateStatusIfNeeded(ctx, rayJobInstance, job); shouldUpdate {
+ if shouldUpdate {
break
}
}
@@ -289,9 +284,8 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
isJobTerminal := rayv1.IsJobTerminal(jobInfo.JobStatus)
// If in K8sJobMode, further refine the terminal condition by checking if the submitter Job has finished.
// See https://github.com/ray-project/kuberay/pull/1919 for reasons.
- if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode {
- _, finished := utils.IsJobFinished(job)
- isJobTerminal = isJobTerminal && finished
+ if utils.HasSubmitter(rayJobInstance) {
+ isJobTerminal = isJobTerminal && isSubmitterFinished
}
if isJobTerminal {
@@ -573,39 +567,49 @@ func getSubmitterTemplate(ctx context.Context, rayJobInstance *rayv1.RayJob, ray
logger.Info("user-provided submitter template is used; the first container is assumed to be the submitter")
}
- // If the command in the submitter pod template isn't set, use the default command.
- if len(submitterTemplate.Spec.Containers[utils.RayContainerIndex].Command) == 0 {
- k8sJobCommand, err := common.GetK8sJobCommand(rayJobInstance)
- if err != nil {
- return corev1.PodTemplateSpec{}, err
- }
+ if err := configureSubmitterContainer(&submitterTemplate.Spec.Containers[utils.RayContainerIndex], rayJobInstance, rayv1.K8sJobMode); err != nil {
+ return corev1.PodTemplateSpec{}, err
+ }
+
+ return submitterTemplate, nil
+}
+
+// getSubmitterContainer builds the submitter container for the Ray job Sidecar mode.
+func getSubmitterContainer(rayJobInstance *rayv1.RayJob, rayClusterInstance *rayv1.RayCluster) (corev1.Container, error) {
+ var submitterContainer corev1.Container = common.GetDefaultSubmitterContainer(rayClusterInstance)
+
+ if err := configureSubmitterContainer(&submitterContainer, rayJobInstance, rayv1.SidecarMode); err != nil {
+ return corev1.Container{}, err
+ }
+
+ return submitterContainer, nil
+}
+
+func configureSubmitterContainer(container *corev1.Container, rayJobInstance *rayv1.RayJob, submissionMode rayv1.JobSubmissionMode) error {
+ // If the command in the submitter container manifest isn't set, use the default command.
+ jobCmd, err := common.BuildJobSubmitCommand(rayJobInstance, submissionMode)
+ if err != nil {
+ return err
+ }
+
+ // K8sJobMode: If the user doesn't specify the command, use the default command.
+ // SidecarMode: Use the default command.
+ if len(container.Command) == 0 || submissionMode == rayv1.SidecarMode {
// Without the -e option, the Bash script will continue executing even if a command returns a non-zero exit code.
- submitterTemplate.Spec.Containers[utils.RayContainerIndex].Command = utils.GetContainerCommand([]string{"e"})
- submitterTemplate.Spec.Containers[utils.RayContainerIndex].Args = []string{strings.Join(k8sJobCommand, " ")}
- logger.Info("No command is specified in the user-provided template. Default command is used", "command", k8sJobCommand)
- } else {
- logger.Info("User-provided command is used", "command", submitterTemplate.Spec.Containers[utils.RayContainerIndex].Command)
+ container.Command = utils.GetContainerCommand([]string{"e"})
+ container.Args = []string{strings.Join(jobCmd, " ")}
}
// Set PYTHONUNBUFFERED=1 for real-time logging
- submitterTemplate.Spec.Containers[utils.RayContainerIndex].Env = append(submitterTemplate.Spec.Containers[utils.RayContainerIndex].Env, corev1.EnvVar{
- Name: PythonUnbufferedEnvVarName,
- Value: "1",
- })
+ container.Env = append(container.Env, corev1.EnvVar{Name: PythonUnbufferedEnvVarName, Value: "1"})
// Users can use `RAY_DASHBOARD_ADDRESS` to specify the dashboard address and `RAY_JOB_SUBMISSION_ID` to specify the job id to avoid
// double submission in the `ray job submit` command. For example:
// ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
- submitterTemplate.Spec.Containers[utils.RayContainerIndex].Env = append(submitterTemplate.Spec.Containers[utils.RayContainerIndex].Env, corev1.EnvVar{
- Name: utils.RAY_DASHBOARD_ADDRESS,
- Value: rayJobInstance.Status.DashboardURL,
- })
- submitterTemplate.Spec.Containers[utils.RayContainerIndex].Env = append(submitterTemplate.Spec.Containers[utils.RayContainerIndex].Env, corev1.EnvVar{
- Name: utils.RAY_JOB_SUBMISSION_ID,
- Value: rayJobInstance.Status.JobId,
- })
+ container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_DASHBOARD_ADDRESS, Value: rayJobInstance.Status.DashboardURL})
+ container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_JOB_SUBMISSION_ID, Value: rayJobInstance.Status.JobId})
- return submitterTemplate, nil
+ return nil
}
// createNewK8sJob creates a new Kubernetes Job. It returns an error.
@@ -653,10 +657,12 @@ func (r *RayJobReconciler) createNewK8sJob(ctx context.Context, rayJobInstance *
// deleteSubmitterJob deletes the submitter Job associated with the RayJob.
func (r *RayJobReconciler) deleteSubmitterJob(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) {
- logger := ctrl.LoggerFrom(ctx)
- if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode {
+ // In HTTPMode and SidecarMode, there's no job submitter pod to delete.
+ if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode || rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
return true, nil
}
+
+ logger := ctrl.LoggerFrom(ctx)
var isJobDeleted bool
// Since the name of the Kubernetes Job is the same as the RayJob, we need to delete the Kubernetes Job
@@ -896,6 +902,21 @@ func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.Ra
return nil, err
}
+ // Inject a submitter container into the head Pod in SidecarMode.
+ if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
+ sidecar, err := getSubmitterContainer(rayJobInstance, rayCluster)
+ if err != nil {
+ return nil, err
+ }
+ rayCluster.Spec.HeadGroupSpec.Template.Spec.Containers = append(
+ rayCluster.Spec.HeadGroupSpec.Template.Spec.Containers, sidecar)
+ // In K8sJobMode, the submitter Job relies on the K8s Job backoffLimit API to restart if it fails.
+ // This mainly handles WebSocket connection failures caused by transient network issues.
+ // In SidecarMode, however, the submitter container shares the same network namespace as the Ray dashboard,
+ // so restarts are no longer needed.
+ rayCluster.Spec.HeadGroupSpec.Template.Spec.RestartPolicy = corev1.RestartPolicyNever
+ }
+
return rayCluster, nil
}
@@ -918,11 +939,70 @@ func updateStatusToSuspendingIfNeeded(ctx context.Context, rayJob *rayv1.RayJob)
return true
}
-func checkK8sJobAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob, job *batchv1.Job) bool {
+func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) (shouldUpdate, isSubmitterFinished bool, err error) {
logger := ctrl.LoggerFrom(ctx)
- for _, cond := range job.Status.Conditions {
- if cond.Type == batchv1.JobFailed && cond.Status == corev1.ConditionTrue {
- logger.Info("The submitter Kubernetes Job has failed. Attempting to transition the status to `Failed`.", "Submitter K8s Job", job.Name, "Reason", cond.Reason, "Message", cond.Message)
+ shouldUpdate = false
+ isSubmitterFinished = false
+ var submitterContainerStatus *corev1.ContainerStatus
+ var condition *batchv1.JobCondition
+
+ switch rayJob.Spec.SubmissionMode {
+ case rayv1.SidecarMode:
+ var rayClusterInstance *rayv1.RayCluster
+ var headPod *corev1.Pod
+
+ rayClusterInstance, err = r.getOrCreateRayClusterInstance(ctx, rayJob)
+ if err != nil {
+ logger.Error(err, "Failed to get RayCluster instance for checking sidecar container status")
+ return
+ }
+
+ headPod, err = common.GetRayClusterHeadPod(ctx, r.Client, rayClusterInstance)
+ if err != nil {
+ logger.Error(err, "Failed to get Ray head pod for checking sidecar container status")
+ return
+ }
+
+ if headPod == nil {
+ logger.Info("Ray head pod not found, skipping sidecar container status check")
+ return
+ }
+
+ shouldUpdate, submitterContainerStatus = checkSidecarContainerStatus(headPod)
+ if shouldUpdate {
+ logger.Info("The submitter sidecar container has failed. Attempting to transition the status to `Failed`.",
+ "Submitter sidecar container", submitterContainerStatus.Name, "Reason", submitterContainerStatus.State.Terminated.Reason, "Message", submitterContainerStatus.State.Terminated.Message)
+ rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
+ // The submitter sidecar container needs to wait for the user code to finish and retrieve its logs.
+ // Therefore, a failed Submitter sidecar container indicates that the submission itself has failed or the user code has thrown an error.
+ // If the failure is due to user code, the JobStatus and Job message will be updated accordingly from the previous reconciliation.
+ if rayJob.Status.JobStatus == rayv1.JobStatusFailed {
+ rayJob.Status.Reason = rayv1.AppFailed
+ } else {
+ rayJob.Status.Reason = rayv1.SubmissionFailed
+ rayJob.Status.Message = fmt.Sprintf("Ray head pod container %s terminated with exit code %d: %s",
+ submitterContainerStatus.Name, submitterContainerStatus.State.Terminated.ExitCode, submitterContainerStatus.State.Terminated.Reason)
+ }
+ }
+
+ isSubmitterFinished = isSubmitterContainerFinished(headPod)
+ return
+ case rayv1.K8sJobMode:
+ job := &batchv1.Job{}
+ // If the submitting Kubernetes Job reaches the backoff limit, transition the status to `Complete` or `Failed`.
+ // This is because, beyond this point, it becomes impossible for the submitter to submit any further Ray jobs.
+ // For light-weight mode, we don't transition the status to `Complete` or `Failed` based on the number of failed
+ // requests. Instead, users can use the `ActiveDeadlineSeconds` to ensure that the RayJob in the light-weight
+ // mode is not stuck in the `Running` status indefinitely.
+ namespacedName := common.RayJobK8sJobNamespacedName(rayJob)
+ if err = r.Client.Get(ctx, namespacedName, job); err != nil {
+ logger.Error(err, "Failed to get the submitter Kubernetes Job for RayJob", "NamespacedName", namespacedName)
+ return
+ }
+ shouldUpdate, condition = checkK8sJobStatus(job)
+ if shouldUpdate {
+ logger.Info("The submitter Kubernetes Job has failed. Attempting to transition the status to `Failed`.",
+ "Submitter K8s Job", job.Name, "Reason", condition.Reason, "Message", condition.Message)
rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
// The submitter Job needs to wait for the user code to finish and retrieve its logs.
// Therefore, a failed Submitter Job indicates that the submission itself has failed or the user code has thrown an error.
@@ -931,12 +1011,41 @@ func checkK8sJobAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJo
rayJob.Status.Reason = rayv1.AppFailed
} else {
rayJob.Status.Reason = rayv1.SubmissionFailed
- rayJob.Status.Message = fmt.Sprintf("Job submission has failed. Reason: %s. Message: %s", cond.Reason, cond.Message)
+ rayJob.Status.Message = fmt.Sprintf("Job submission has failed. Reason: %s. Message: %s", condition.Reason, condition.Message)
+ }
+ }
+ _, isSubmitterFinished = utils.IsJobFinished(job)
+ return
+ default:
+ // This means that the KubeRay logic is wrong, and it's better to panic as a system error than to allow the operator to
+ // continue reconciling in a wrong state as an application error.
+ // https://github.com/ray-project/kuberay/pull/3971#discussion_r2292808734
+ panic("You can only call checkSubmitterAndUpdateStatusIfNeeded when using K8sJobMode and SidecarMode.")
+ }
+}
+
+func checkK8sJobStatus(job *batchv1.Job) (bool, *batchv1.JobCondition) {
+ for _, condition := range job.Status.Conditions {
+ if condition.Type == batchv1.JobFailed && condition.Status == corev1.ConditionTrue {
+ return true, &condition
+ }
+ }
+ return false, nil
+}
+
+func checkSidecarContainerStatus(headPod *corev1.Pod) (bool, *corev1.ContainerStatus) {
+ for _, containerStatus := range headPod.Status.ContainerStatuses {
+ if containerStatus.Name == utils.SubmitterContainerName {
+ // Check for terminated containers with error exit codes
+ // Based on the document, "ray job submit" will exit with 0 if the job succeeded, or exit with 1 if it failed.
+ // https://docs.ray.io/en/latest/cluster/running-applications/job-submission/cli.html#ray-job-submit
+ if containerStatus.State.Terminated != nil && containerStatus.State.Terminated.ExitCode != 0 {
+ return true, &containerStatus
}
- return true
+ break
}
}
- return false
+ return false, nil
}
func checkActiveDeadlineAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
@@ -981,3 +1090,15 @@ func checkTransitionGracePeriodAndUpdateStatusIfNeeded(ctx context.Context, rayJ
}
return false
}
+
+func isSubmitterContainerFinished(pod *corev1.Pod) bool {
+ for _, containerStatus := range pod.Status.ContainerStatuses {
+ if containerStatus.Name == utils.SubmitterContainerName {
+ if containerStatus.State.Terminated != nil {
+ return true
+ }
+ break
+ }
+ }
+ return false
+}
diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go
index 574f77c9ce0..246127f5681 100644
--- a/ray-operator/controllers/ray/utils/constant.go
+++ b/ray-operator/controllers/ray/utils/constant.go
@@ -206,6 +206,9 @@ const (
// RayNodeHeadGroupLabelValue is the value for the RayNodeGroupLabelKey label on a head node
RayNodeHeadGroupLabelValue = "headgroup"
+ // SubmitterContainerName is the default name of the job submit container injected into the head Pod in SidecarMode.
+ SubmitterContainerName = "ray-job-submitter"
+
// KUBERAY_VERSION is the build version of KubeRay.
// The version is included in the RAY_USAGE_STATS_EXTRA_TAGS environment variable
// as well as the user-agent. This constant is updated before release.
diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go
index 4308c0afb76..6692cb084b2 100644
--- a/ray-operator/controllers/ray/utils/util.go
+++ b/ray-operator/controllers/ray/utils/util.go
@@ -800,3 +800,7 @@ func GetRayHttpProxyClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
}
}
}
+
+func HasSubmitter(rayJobInstance *rayv1.RayJob) bool {
+ return rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode || rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode
+}
diff --git a/ray-operator/controllers/ray/utils/validation.go b/ray-operator/controllers/ray/utils/validation.go
index 88e3c0cd1c5..8d5352b5c42 100644
--- a/ray-operator/controllers/ray/utils/validation.go
+++ b/ray-operator/controllers/ray/utils/validation.go
@@ -182,6 +182,20 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
return fmt.Errorf("BackoffLimit is incompatible with InteractiveMode")
}
+ if rayJob.Spec.SubmissionMode == rayv1.SidecarMode {
+ if rayJob.Spec.SubmitterPodTemplate != nil {
+ return fmt.Errorf("Currently, SidecarMode doesn't support SubmitterPodTemplate")
+ }
+
+ if rayJob.Spec.SubmitterConfig != nil {
+ return fmt.Errorf("Currently, SidecarMode doesn't support SubmitterConfig")
+ }
+
+ if rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.RestartPolicy != "" && rayJob.Spec.RayClusterSpec.HeadGroupSpec.Template.Spec.RestartPolicy != corev1.RestartPolicyNever {
+ return fmt.Errorf("restartPolicy for head Pod should be Never or unset when using SidecarMode")
+ }
+ }
+
if rayJob.Spec.RayClusterSpec != nil {
if err := ValidateRayClusterSpec(rayJob.Spec.RayClusterSpec, rayJob.Annotations); err != nil {
return err
diff --git a/ray-operator/controllers/ray/utils/validation_test.go b/ray-operator/controllers/ray/utils/validation_test.go
index f416cc37348..b55a8f26c39 100644
--- a/ray-operator/controllers/ray/utils/validation_test.go
+++ b/ray-operator/controllers/ray/utils/validation_test.go
@@ -861,6 +861,46 @@ func TestValidateRayJobSpec(t *testing.T) {
},
expectError: true,
},
+ {
+ name: "SidecarMode",
+ spec: rayv1.RayJobSpec{
+ SubmissionMode: rayv1.SidecarMode,
+ RayClusterSpec: createBasicRayClusterSpec(),
+ },
+ expectError: false,
+ },
+ {
+ name: "SidecarMode doesn't support SubmitterPodTemplate",
+ spec: rayv1.RayJobSpec{
+ SubmissionMode: rayv1.SidecarMode,
+ SubmitterPodTemplate: &corev1.PodTemplateSpec{
+ Spec: corev1.PodSpec{},
+ },
+ },
+ expectError: true,
+ },
+ {
+ name: "SidecarMode doesn't support SubmitterConfig",
+ spec: rayv1.RayJobSpec{
+ SubmissionMode: rayv1.SidecarMode,
+ SubmitterConfig: &rayv1.SubmitterConfig{
+ BackoffLimit: ptr.To[int32](1),
+ },
+ },
+ expectError: true,
+ },
+ {
+ name: "SidecarMode RayCluster head pod should only be Never or unset",
+ spec: rayv1.RayJobSpec{
+ SubmissionMode: rayv1.SidecarMode,
+ RayClusterSpec: &rayv1.RayClusterSpec{
+ HeadGroupSpec: rayv1.HeadGroupSpec{
+ Template: podTemplateSpec(nil, ptr.To(corev1.RestartPolicyAlways)),
+ },
+ },
+ },
+ expectError: true,
+ },
}
for _, tt := range tests {
diff --git a/ray-operator/test/e2erayjob/rayjob_retry_test.go b/ray-operator/test/e2erayjob/rayjob_retry_test.go
index 42a04e4350c..3cbb70ab48a 100644
--- a/ray-operator/test/e2erayjob/rayjob_retry_test.go
+++ b/ray-operator/test/e2erayjob/rayjob_retry_test.go
@@ -212,4 +212,44 @@ func TestRayJobRetry(t *testing.T) {
g.Expect(err).NotTo(HaveOccurred())
LogWithTimestamp(test.T(), "Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
})
+
+ test.T().Run("Failing RayJob with SidecarMode submission mode", func(_ *testing.T) {
+ // Set up the RayJob with HTTP mode and a BackoffLimit
+ rayJobAC := rayv1ac.RayJob("failing-rayjob-in-sidecar-mode", namespace.Name).
+ WithSpec(rayv1ac.RayJobSpec().
+ WithSubmissionMode(rayv1.SidecarMode).
+ WithBackoffLimit(2).
+ WithEntrypoint("python /home/ray/jobs/fail.py").
+ WithShutdownAfterJobFinishes(false).
+ WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
+
+ rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
+
+ LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)
+
+ // Assert that the RayJob deployment status has been updated.
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
+ Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
+
+ // Assert the Ray job has failed.
+ g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
+ To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusFailed)))
+
+ // Check the RayJob reason has been updated.
+ g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
+ To(WithTransform(RayJobReason, Equal(rayv1.AppFailed)))
+
+ // Check whether the controller respects the backoffLimit.
+ g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
+ Should(WithTransform(RayJobFailed, Equal(int32(3)))) // 2 retries + 1 initial attempt = 3 failures
+ g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
+ Should(WithTransform(RayJobSucceeded, Equal(int32(0))))
+
+ // Clean up
+ err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{})
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
+ })
}
diff --git a/ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go b/ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go
new file mode 100644
index 00000000000..ec94ee29417
--- /dev/null
+++ b/ray-operator/test/e2erayjob/rayjob_sidecar_mode_test.go
@@ -0,0 +1,158 @@
+package e2erayjob
+
+import (
+ "testing"
+
+ . "github.com/onsi/gomega"
+ k8serrors "k8s.io/apimachinery/pkg/api/errors"
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
+
+ rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
+ "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
+ rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1"
+ . "github.com/ray-project/kuberay/ray-operator/test/support"
+)
+
+func TestRayJobSidecarMode(t *testing.T) {
+ test := With(t)
+ g := NewWithT(t)
+
+ // Create a namespace
+ namespace := test.NewTestNamespace()
+
+ // Job scripts
+ jobsAC := NewConfigMap(namespace.Name, Files(test, "counter.py", "fail.py", "stop.py"))
+ jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)
+
+ test.T().Run("Successful RayJob", func(_ *testing.T) {
+ rayJobAC := rayv1ac.RayJob("counter", namespace.Name).
+ WithSpec(rayv1ac.RayJobSpec().
+ WithSubmissionMode(rayv1.SidecarMode).
+ WithEntrypoint("python /home/ray/jobs/counter.py").
+ WithEntrypointNumCpus(2).
+ WithEntrypointNumGpus(2).
+ WithEntrypointResources(`{"R1": 2}`).
+ WithRuntimeEnvYAML(`
+env_vars:
+ counter_name: test_counter
+`).
+ WithShutdownAfterJobFinishes(true).
+ WithRayClusterSpec(rayv1ac.RayClusterSpec().
+ WithRayVersion(GetRayVersion()).
+ WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
+ WithRayStartParams(map[string]string{
+ "dashboard-host": "0.0.0.0",
+ "num-gpus": "4",
+ "num-cpus": "4",
+ "resources": `'{"R1": 4}'`,
+ }).
+ WithTemplate(PodTemplateSpecApplyConfiguration(HeadPodTemplateApplyConfiguration(),
+ MountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](jobs, "/home/ray/jobs"))))))
+
+ rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
+
+ LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
+ Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal)))
+
+ // Assert the RayJob has completed successfully
+ g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
+ To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded)))
+
+ // And the RayJob deployment status is updated accordingly
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)).
+ Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete)))
+
+ // Refresh the RayJob status
+ rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ g.Expect(err).NotTo(HaveOccurred())
+
+ // TODO (kevin85421): We may need to use `Eventually` instead if the assertion is flaky.
+ // Assert the RayCluster has been torn down
+ _, err = GetRayCluster(test, namespace.Name, rayJob.Status.RayClusterName)
+ g.Expect(k8serrors.IsNotFound(err)).To(BeTrue())
+ })
+
+ test.T().Run("Failing RayJob without cluster shutdown after finished", func(_ *testing.T) {
+ rayJobAC := rayv1ac.RayJob("fail", namespace.Name).
+ WithSpec(rayv1ac.RayJobSpec().
+ WithSubmissionMode(rayv1.SidecarMode).
+ WithEntrypoint("python /home/ray/jobs/fail.py").
+ WithShutdownAfterJobFinishes(false).
+ WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
+
+ rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
+
+ LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
+ Should(WithTransform(RayJobStatus, Satisfy(rayv1.IsJobTerminal)))
+
+ // Assert the Ray job has failed
+ g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
+ To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusFailed)))
+
+ // Assert that the RayJob deployment status and RayJob reason have been updated accordingly.
+ // Assert that the RayJob failed reason is "AppFailed".
+ // In the sidecar submission mode, the submitter Kubernetes Job should not be created.
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name)).
+ Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
+ g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
+ To(WithTransform(RayJobReason, Equal(rayv1.AppFailed)))
+ g.Eventually(Jobs(test, namespace.Name)).Should(BeEmpty())
+
+ // Refresh the RayJob status
+ rayJob, err = GetRayJob(test, rayJob.Namespace, rayJob.Name)
+ g.Expect(err).NotTo(HaveOccurred())
+
+ // Verify sidecar container injection
+ rayCluster, err := GetRayCluster(test, namespace.Name, rayJob.Status.RayClusterName)
+ g.Expect(err).NotTo(HaveOccurred())
+
+ headPod, err := GetHeadPod(test, rayCluster)
+ g.Expect(err).NotTo(HaveOccurred())
+ g.Expect(headPod).NotTo(BeNil())
+
+ containerNames := make(map[string]bool)
+ for _, container := range headPod.Spec.Containers {
+ containerNames[container.Name] = true
+ }
+ g.Expect(containerNames[utils.SubmitterContainerName]).To(BeTrue(), "submitter container should be present")
+ })
+
+ test.T().Run("Should transition to 'Complete' if the Ray job has stopped.", func(_ *testing.T) {
+ // `stop.py` will sleep for 20 seconds so that the RayJob has enough time to transition to `RUNNING`
+ // and then stop the Ray job. If the Ray job is stopped, the RayJob should transition to `Complete`.
+ rayJobAC := rayv1ac.RayJob("stop", namespace.Name).
+ WithSpec(rayv1ac.RayJobSpec().
+ WithSubmissionMode(rayv1.SidecarMode).
+ WithEntrypoint("python /home/ray/jobs/stop.py").
+ WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
+
+ rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
+
+ LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to be 'Running'", rayJob.Namespace, rayJob.Name)
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
+ Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusRunning)))
+
+ LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to be 'Complete'", rayJob.Namespace, rayJob.Name)
+ g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
+ Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete)))
+
+ // Refresh the RayJob status
+ g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).To(WithTransform(RayJobStatus, Equal(rayv1.JobStatusStopped)))
+
+ // Delete the RayJob
+ err = test.Client().Ray().RayV1().RayJobs(namespace.Name).Delete(test.Ctx(), rayJob.Name, metav1.DeleteOptions{})
+ g.Expect(err).NotTo(HaveOccurred())
+ LogWithTimestamp(test.T(), "Deleted RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)
+ })
+}