Skip to content

[RayJob] Sidecar Mode #3971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: master
Choose a base branch
from

Conversation

Future-Outlier
Copy link
Member

@Future-Outlier Future-Outlier commented Aug 18, 2025

Why are these changes needed?

conclusion from issue

  • Add a new application container in the Ray head Pod as the submitter.
    • Pros:
      • Easier to support batch schedulers because there are no additional Pods outside of the RayCluster.
      • Avoids inter-Pod communication — some users have reported WebSocket hangs.
      • Compared to HTTP mode: users can see logs in STDOUT/STDERR.
      • Compared to launching a new process in the Ray head container: using a new container allows us to avoid KubeRay sending requests to the Ray dashboard during each reconciliation. Instead, we can rely on the informer cache to check the container status.
    • Cons
      • Couple Ray head and submitter together.

How to test it?

kind create cluster --image=kindest/node:v1.26.0
make -C ray-operator install
make -C ray-operator build
./ray-operator/bin/manager -leader-election-namespace default -use-kubernetes-proxy
kubectl apply -f ray-operator/config/samples/ray-job.sidecar-mode.yaml

You can set shutdownAfterJobFinishes to False and shutdownAfterJobFinishes to True to observe 2 different behaviors.

Test Cases

1. shutdownAfterJobFinishes = false

image image image image

You can notice that the ray job submitter container's state is Completed here.

image

2. shutdownAfterJobFinishes = true

image image image

The ray-head pod and worker pod are deleted after the job is finished.
The RayJob CR shows that the job is succeeded!

3. Retry Scenario

a. Set RayJob's spec's backoffLimit to 3. (This will execute the job at most 4 times, since 1 (initial attempt) + 3 (retry number) = 4.
b. raise runtime error in python sample code to trigger error

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))
        if _ == 3:
            raise RuntimeError("Intentional error at count 4")

The yaml file is too long, so I am going to put at the comment below in this PR.
#3971 (comment)

RayJob CR's JOB STATUS FAILED.

image

Try the job 4 times.
image

The Backoff Limit is 3.
image

Related issue number

#3928

Checks

  • I've made sure the tests are passing.
  • Testing Strategy
    • Unit tests
    • Manual tests
    • This PR is not tested :(

Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
@Future-Outlier
Copy link
Member Author

Retry yaml file

apiVersion: ray.io/v1
kind: RayJob
metadata:
  name: rayjob-sidecar-mode-shutdown
spec:
  backoffLimit: 3
  runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"

  rayClusterSpec:
    rayVersion: '2.46.0'
    headGroupSpec:
      rayStartParams: {}
      template:
        spec:
          containers:
          - name: ray-head
            image: rayproject/ray:2.46.0
            ports:
            - containerPort: 6379
              name: gcs-server
            - containerPort: 8265 # Ray dashboard
              name: dashboard
            - containerPort: 10001
              name: client
            resources:
              limits:
                cpu: "1"
              requests:
                cpu: "200m"
            volumeMounts:
            - mountPath: /home/ray/samples
              name: code-sample
          volumes:
          - name: code-sample
            configMap:
              name: ray-job-code-sample
              items:
              - key: sample_code.py
                path: sample_code.py
    workerGroupSpecs:
    - replicas: 1
      minReplicas: 1
      maxReplicas: 5
      groupName: small-group
      rayStartParams: {}
      #pod template
      template:
        spec:
          containers:
          - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
            image: rayproject/ray:2.46.0
            resources:
              limits:
                cpu: "1"
              requests:
                cpu: "200m"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
data:
  sample_code.py: |
    import ray
    import os
    import requests

    ray.init()

    @ray.remote
    class Counter:
        def __init__(self):
            # Used to verify runtimeEnv
            self.name = os.getenv("counter_name")
            assert self.name == "test_counter"
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return "{} got {}".format(self.name, self.counter)

    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))
        if _ == 3:
            raise RuntimeError("Intentional error at count 4")

    # Verify that the correct runtime env was used for the job.
    assert requests.__version__ == "2.26.0"

@Future-Outlier Future-Outlier self-assigned this Aug 19, 2025
@Future-Outlier Future-Outlier marked this pull request as ready for review August 19, 2025 10:23
@Future-Outlier Future-Outlier linked an issue Aug 19, 2025 that may be closed by this pull request
2 tasks
Signed-off-by: Future-Outlier <[email protected]>
Comment on lines 675 to 692
// Set PYTHONUNBUFFERED=1 for real-time logging
submitterContainer.Env = append(submitterContainer.Env, corev1.EnvVar{
Name: PythonUnbufferedEnvVarName,
Value: "1",
})

// Users can use `RAY_DASHBOARD_ADDRESS` to specify the dashboard address and `RAY_JOB_SUBMISSION_ID` to specify the job id to avoid
// double submission in the `ray job submit` command. For example:
// ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...
submitterContainer.Env = append(submitterContainer.Env, corev1.EnvVar{
Name: utils.RAY_DASHBOARD_ADDRESS,
Value: rayJobInstance.Status.DashboardURL,
})
submitterContainer.Env = append(submitterContainer.Env, corev1.EnvVar{
Name: utils.RAY_JOB_SUBMISSION_ID,
Value: rayJobInstance.Status.JobId,
})

Copy link
Member Author

@Future-Outlier Future-Outlier Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of logic here is copied from getSubmitterTemplate

However, I am thinking that maybe we should check the yaml file's env first before append env variables to job submitter's container.

@Future-Outlier Future-Outlier changed the title [RayJob] Sidecar Mode to avoid cross pod communication [RayJob] Sidecar Mode Aug 19, 2025
Copy link
Member

@kevin85421 kevin85421 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid CRD change for now, and we can only consider the case where submitterPodTemplate is not set.

@@ -219,6 +219,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
logger.Info("RayCluster is created. Transition the status from `Initializing` to `Running`.", "SubmissionMode", rayJobInstance.Spec.SubmissionMode, "RayCluster", rayJobInstance.Status.RayClusterName)
}

if rayJobInstance.Spec.SubmissionMode == rayv1.SidecarMode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove these logs, including:

logger.Info("Both RayCluster and the submitter K8s Job are created. Transition the status from `Initializing` to `Running`.", "SubmissionMode", rayJobInstance.Spec.SubmissionMode, "RayCluster", rayJobInstance.Status.RayClusterName)
logger.Info("RayCluster is created. Transition the status from `Initializing` to `Running`.", "SubmissionMode", rayJobInstance.Spec.SubmissionMode, "RayCluster", rayJobInstance.Status.RayClusterName)

This can be in a separate PR.

@@ -299,12 +303,46 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
isJobTerminal = isJobTerminal && finished
}

// If in SidecarMode, refine terminal condition by checking the sidecar container terminated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can check the logic in checkK8sJobAndUpdateStatusIfNeeded, and try to unify the logic for both K8sMode and SidecarMode.

@Future-Outlier
Copy link
Member Author

Future-Outlier commented Aug 20, 2025

  • test checkSidecarContainerAndUpdateStatusIfNeeded when rayv1.JobDeploymentStatusRunning using exit code = 1
image

Comment on lines +64 to +70
// Sidecar submitter talks to the local head Pod dashboard.
address = "http://127.0.0.1:8265"
// Wait until dashboard is reachable before proceeding.
waitLoop = []string{
"until", "ray", "job", "list", "--address", address, ">/dev/null", "2>&1", ";",
"do", "echo", strconv.Quote("Waiting for Ray dashboard at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't get the address first, since we create RayClusterInstance first, then assign rayJobInstance.Status.DashboardURL.

var rayClusterInstance *rayv1.RayCluster
if rayClusterInstance, err = r.getOrCreateRayClusterInstance(ctx, rayJobInstance); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
// Check the current status of RayCluster before submitting.
if clientURL := rayJobInstance.Status.DashboardURL; clientURL == "" {
if rayClusterInstance.Status.State != rayv1.Ready {
logger.Info("Wait for the RayCluster.Status.State to be ready before submitting the job.", "RayCluster", rayClusterInstance.Name, "State", rayClusterInstance.Status.State)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
if clientURL, err = utils.FetchHeadServiceURL(ctx, r.Client, rayClusterInstance, utils.DashboardPortName); err != nil || clientURL == "" {
logger.Error(err, "Failed to get the dashboard URL after the RayCluster is ready!", "RayCluster", rayClusterInstance.Name)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
rayJobInstance.Status.DashboardURL = clientURL
}

Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
# submissionMode specifies how RayJob submits the Ray job to the RayCluster.
# The default value is "K8sJobMode", meaning RayJob will submit the Ray job via a submitter Kubernetes Job.
# The alternative value is "HTTPMode", indicating that KubeRay will submit the Ray job by sending an HTTP request to the RayCluster.
submissionMode: "SidecarMode"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to add some comments indicating how the behavior of backoffLimit changes when using SidecarMode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review!
Comment added!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, the comments needs to be added in the API docs as well, in addition to this exampe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, thank you!

cpu: "1"
requests:
cpu: "200m"
volumeMounts:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this example work if we don't add the volume mount to the sidecar?

Copy link
Member Author

@Future-Outlier Future-Outlier Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don’t need the /home/ray/samples/sample_code.py code in the sidecar.
Since the sidecar now uses the Ray image, it will only execute the following commands:

until ray job list --address http://127.0.0.1:8265 >/dev/null 2>&1 ;
do echo "Waiting for Ray dashboard at http://127.0.0.1:8265 ..." ; sleep 2 ; done ;

if ! ray job status --address http://127.0.0.1:8265 rayjob-sidecar-mode-shutdown-rc2s8 >/dev/null 2>&1 ;
then ray job submit --address http://127.0.0.1:8265 --no-wait --runtime-env-json
 "{\"env_vars\":{\"counter_name\":\"test_counter\"},\"pip\":[\"requests==2.26.0\",\"pendulum==2.1.2\"]}"
  --submission-id rayjob-sidecar-mode-shutdown-rc2s8 -- python /home/ray/samples/sample_code.py ;
fi ;

ray job logs --address http://127.0.0.1:8265 --follow rayjob-sidecar-mode-shutdown-rc2s8

As you can see, we are only using Ray’s CLI to perform these actions!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, the entry point can reference paths only on the Head pod.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of checking if a job exists with ray job status and then running either ray job submit or ray job logs is due to the submitter job being retried on an existing cluster. For the sidecar case could we just run ray job submit everytime since each retry would be on a fresh cluster?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this makes sense, thank you!
Let me update it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, the entry point can reference paths only on the Head pod.

The sidecar container only submits the job - the entrypoint path is resolved and executed in the head container.
Since the entrypoint path is mounted on the head container, it works!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of checking if a job exists with ray job status and then running either ray job submit or ray job logs is due to the submitter job being retried on an existing cluster. For the sidecar case could we just run ray job submit everytime since each retry would be on a fresh cluster?

This is a really NICE and SOLID review, thank you!!

@@ -82,6 +82,7 @@ const (
K8sJobMode JobSubmissionMode = "K8sJobMode" // Submit job via Kubernetes Job
HTTPMode JobSubmissionMode = "HTTPMode" // Submit job via HTTP request
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
SidecarMode JobSubmissionMode = "SidecarMode" // Submit job inside the Ray head Pod via a sidecar container, then tail logs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
SidecarMode JobSubmissionMode = "SidecarMode" // Submit job inside the Ray head Pod via a sidecar container, then tail logs
SidecarMode JobSubmissionMode = "SidecarMode" // Submit job via a sidecar container in the Ray head Pod

@@ -174,6 +175,7 @@ type RayJobSpec struct {
// In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.
// In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.
// In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster.
// In "SidecarMode", the operator injects a submitter container into the Ray head Pod which submits the job and tails logs.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// In "SidecarMode", the operator injects a submitter container into the Ray head Pod which submits the job and tails logs.
// In "SidecarMode", the KubeRay operator injects a container into the Ray head Pod that acts as the job submitter to submit the Ray job.

@@ -183,6 +183,13 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
break
}

// Handle case when job submitter container failed to start
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove it.

break
}
}

job := &batchv1.Job{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • job := &batchv1.Job{} => isJobFinished := true
func checkSubmitterAndUpdateStatusIfNeeded(submissionMode, ...) {
   // handle K8sJobMode
   // handle SidecarMode
   return isJobFinished
}

Then, we can remove:

		// If in K8sJobMode, further refine the terminal condition by checking if the submitter Job has finished.
		// See https://github.com/ray-project/kuberay/pull/1919 for reasons.
		if rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode {
			_, finished := utils.IsJobFinished(job)
			isJobTerminal = isJobTerminal && finished
		}

// Check container statuses for any error conditions
for _, containerStatus := range headPod.Status.ContainerStatuses {
// Check for terminated containers with error exit codes
if containerStatus.State.Terminated != nil && containerStatus.State.Terminated.ExitCode != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only check the submitter container instead of all containers. For example, it's common for users to inject sidecar containers like FluentBit to collect logs, and the RayJob CR will be marked as AppFailed when the FluentBit container failed which is misleading.

// Check container statuses for any error conditions
for _, containerStatus := range headPod.Status.ContainerStatuses {
// Check for terminated containers with error exit codes
if containerStatus.State.Terminated != nil && containerStatus.State.Terminated.ExitCode != 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you verify ray job submit exit code in different cases?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have verified this and updated it in the comments.
According to the Ray documentation:

If the job succeeded, it exits with 0. If it failed, it exits with 1.

https://docs.ray.io/en/latest/cluster/running-applications/job-submission/cli.html#ray-job-submit

var waitLoop []string

switch submissionMode {
case rayv1.SidecarMode:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of checking if a job exists with ray job status and then running either ray job submit or ray job logs is due to the submitter job being retried on an existing cluster. For the sidecar case could we just run ray job submit everytime since each retry would be on a fresh cluster?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah we still need to wait for the dashboard API to be available, but once ready can we just run ray job submit or is there a reason to also check the job exists with ray job status?

Copy link
Member Author

@Future-Outlier Future-Outlier Aug 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just remove the ray job status command, thank you!
Right now the entry point will be

until ray job list --address http://127.0.0.1:8265 >/dev/null 2>&1 ;
do echo "Waiting for Ray dashboard at http://127.0.0.1:8265 ..." ;
sleep 2 ;done ;
ray job submit --address http://127.0.0.1:8265 --no-wait --runtime-env-json "{\"env_vars\":{\"counter_name\":\"test_counter\"},\"pip\":[\"requests==2.26.0\",\"pendulum==2.1.2\"]}" --submission-id rayjob-sidecar-mode-shutdown-ft4hw -- python /home/ray/samples/sample_code.py ;
ray job logs --address http://127.0.0.1:8265 --follow rayjob-sidecar-mode-shutdown-ft4hw

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can remove --no-wait in ray job submit can we also remove the ray job logs after?

Thinking about this more, what is the expected behavior if only the submitter pod crashes for some reason. I think k8s will only restart the submitter container, in which case maybe the old logic was correct? Can you test and confirm this?

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Andrew Sy Kim <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
@@ -88,12 +88,12 @@ func TestGetK8sJobCommand(t *testing.T) {
";", "fi", ";",
"ray", "job", "logs", "--address", "http://127.0.0.1:8265", "--follow", "testJobId",
}
command, err := GetK8sJobCommand(testRayJob)
command, err := BuildJobSubmitCommand(testRayJob, rayv1.K8sJobMode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a command test for sidecar mode?

containerStatus.Name, containerStatus.State.Terminated.ExitCode, containerStatus.State.Terminated.Reason)

return true
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break

}
case rayv1.K8sJobMode:
// Submitter is a separate K8s Job; use cluster dashboard address.
address = rayJobInstance.Status.DashboardURL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return an error if the address is empty? It may not happen now, but preventing misuse in the future might be good.

@@ -170,6 +170,19 @@ func ValidateRayJobSpec(rayJob *rayv1.RayJob) error {
return fmt.Errorf("BackoffLimit is incompatible with InteractiveMode")
}

if rayJob.Spec.SubmissionMode == rayv1.SidecarMode {
// SidecarMode does not support SubmitterPodTemplate.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment doesn’t provide additional details. Please remove it.

// In SidecarMode, the operator automatically injects a submitter container
// into the Ray head Pod, so users should not specify SubmitterPodTemplate.
if rayJob.Spec.SubmitterPodTemplate != nil {
return fmt.Errorf("SubmitterPodTemplate is incompatible with SidecarMode. In SidecarMode, the operator automatically injects a submitter container into the Ray head Pod")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("SubmitterPodTemplate is incompatible with SidecarMode. In SidecarMode, the operator automatically injects a submitter container into the Ray head Pod")
return fmt.Errorf("Currently, `SidecarMode` doesn’t support `SubmitterPodTemplate`.")

@@ -174,6 +175,9 @@ type RayJobSpec struct {
// In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.
// In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.
// In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster.
// In "SidecarMode", the KubeRay operator injects a container into the Ray head Pod that acts as the job submitter to submit the Ray job.
// If SidecarMode is enabled, retries are managed at the Ray job level using a backoff limit.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is misleading in my opinion. I assume you’re referring to the K8s Job’s backOffLimit? It doesn’t retry the Ray job. It only tails the logs if the submitter crashes while the Ray job is still running. It does not resubmit the job if the Ray job fails.

Please remove it.

Suggested change
// If SidecarMode is enabled, retries are managed at the Ray job level using a backoff limit.

@@ -174,6 +175,9 @@ type RayJobSpec struct {
// In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.
// In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.
// In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster.
// In "SidecarMode", the KubeRay operator injects a container into the Ray head Pod that acts as the job submitter to submit the Ray job.
// If SidecarMode is enabled, retries are managed at the Ray job level using a backoff limit.
// A retry is triggered if the Ray job fails or if the submitter container fails to submit the Ray job.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// A retry is triggered if the Ray job fails or if the submitter container fails to submit the Ray job.

containerStatus.Name, containerStatus.State.Terminated.ExitCode, containerStatus.State.Terminated.Reason)

return true
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have already found the submitter container, we don’t need to iterate through the containers any further. We can break or return false.

return true, nil
}

logger := ctrl.LoggerFrom(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to move logger := ctrl.LoggerFrom(ctx)?

@@ -910,6 +936,43 @@ func updateStatusToSuspendingIfNeeded(ctx context.Context, rayJob *rayv1.RayJob)
return true
}

func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob, submissionMode rayv1.JobSubmissionMode) (shouldUpdate, isK8sJobFinished bool, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning isK8sJobFinished is an abstraction leak for this function, especially if we want to unify all modes with a submitter. Instead, we can use isSubmitterFinished instead.

The value of isSubmitterFinished is based on the K8s Job (K8sJobMode) or the submitter sidecar container (SidecarMode)

@@ -910,6 +936,43 @@ func updateStatusToSuspendingIfNeeded(ctx context.Context, rayJob *rayv1.RayJob)
return true
}

func (r *RayJobReconciler) checkSubmitterAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob, submissionMode rayv1.JobSubmissionMode) (shouldUpdate, isK8sJobFinished bool, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initialize the logger at the beginning of the function so that we don't need to initialize it in both case rayv1.K8sJobMode: and default:.

err = nil
return
default:
logger := ctrl.LoggerFrom(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means the KubeRay logic is wrong. I’m considering using panic instead. It’s not very common to use panic in an operator, but in my opinion, it’s still better than letting it keep reconciling in a wrong state.

In addition, this is a system error instead of application error, but the error message "you can only call ..." looks like application error.

@@ -931,6 +994,52 @@ func checkK8sJobAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJo
return false
}

func (r *RayJobReconciler) checkSidecarContainerAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change the function signature to take checkSidecarContainerAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob, headPod)?

Then, the function signature will be similar to checkK8sJobAndUpdateStatusIfNeeded.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Add Sidecar Mode in Ray Job to avoid cross pod communication
4 participants