Skip to content
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
3b5b865
[RayJob] Sidecar Mode to avoid cross pod communication
Future-Outlier Aug 18, 2025
4530cb5
make generate
Future-Outlier Aug 18, 2025
ef92c89
Remove logs
Future-Outlier Aug 18, 2025
5016896
# shutdownAfterJobFinishes: false
Future-Outlier Aug 19, 2025
a3ce5fc
Add a new field submitterContainer in rayjob_types.go
Future-Outlier Aug 19, 2025
9600d8e
make sync
Future-Outlier Aug 19, 2025
dad2e43
update
Future-Outlier Aug 19, 2025
f4b8908
update
Future-Outlier Aug 20, 2025
9803619
add validation logic when submission mode is sidecar and SubmitterPod…
Future-Outlier Aug 20, 2025
4bf6c68
Remove sidecar error handling logic, need to re-use k8sjob mode's cod…
Future-Outlier Aug 20, 2025
8f5c8e8
add function checkSidecarContainerAndUpdateStatusIfNeeded to handle s…
Future-Outlier Aug 20, 2025
b94f3ad
polish checkSidecarContainerAndUpdateStatusIfNeeded for rayJob.Status…
Future-Outlier Aug 20, 2025
89ca572
refactor GetDefaultSubmitterTemplate and GetDefaultSubmitterContainer…
Future-Outlier Aug 20, 2025
69d0715
refactor GetSidecarJobCommand/GetK8sJobCommand
Future-Outlier Aug 20, 2025
204515b
mode -> submissionMode
Future-Outlier Aug 20, 2025
4d21705
remove GetSidecarJobCommand
Future-Outlier Aug 20, 2025
9edd606
use configureSubmitterContainer to aboid repeated code
Future-Outlier Aug 20, 2025
d91789a
Remove sidecarmode logging when JobDeploymentStatusInitializing
Future-Outlier Aug 20, 2025
0c6effa
nit
Future-Outlier Aug 20, 2025
7d9a852
nit
Future-Outlier Aug 20, 2025
8f378d0
nit
Future-Outlier Aug 20, 2025
125f0ae
sidecar name string matching
Future-Outlier Aug 20, 2025
bdbaed5
handle initialize status failed scenario
Future-Outlier Aug 20, 2025
b3c1d20
add comments
Future-Outlier Aug 20, 2025
f68ff65
Update Kai-Hsun's advice
Future-Outlier Aug 21, 2025
14ae58b
update api doc
Future-Outlier Aug 21, 2025
a936880
fix validation and container status problem
Future-Outlier Aug 21, 2025
ddda86f
Merge branch 'master' into rayjob-sidecar-mode-v2
Future-Outlier Aug 21, 2025
e35a2a6
Use checkSubmitterAndUpdateStatusIfNeeded
Future-Outlier Aug 21, 2025
e8fe3c2
add comments in sidecar mode yaml!
Future-Outlier Aug 21, 2025
cd01094
Merge branch 'master' into rayjob-sidecar-mode-v2
Future-Outlier Aug 21, 2025
1383ec5
Update Andrew's advice
Future-Outlier Aug 22, 2025
f4ae235
update andrew's api doc advice
Future-Outlier Aug 22, 2025
a1e71d4
update andrew's sidecar mode command advice
Future-Outlier Aug 24, 2025
6e9185c
Merge branch 'ray-project:master' into rayjob-sidecar-mode-v2
Future-Outlier Aug 24, 2025
94b2ddb
update kai hsun's SOLID advices
Future-Outlier Aug 24, 2025
563a09d
add submitterConfig
Future-Outlier Aug 26, 2025
35f0d0d
RayJob checkSidecarContainerAndUpdateStatusIfNeeded
Future-Outlier Aug 26, 2025
aefc08e
Update most minor fix
Future-Outlier Aug 26, 2025
ed6e025
use getDashboardPortFromRayJobSpec
Future-Outlier Aug 26, 2025
107066e
use rayDashboardGCSHealthCommand in BuildJobSubmitCommand
Future-Outlier Aug 26, 2025
0fdc468
update
Future-Outlier Aug 26, 2025
7e7e931
Apply Kai-Hsun and Jun Hao's advices
Future-Outlier Aug 29, 2025
bef4401
update
Future-Outlier Aug 29, 2025
f0de681
port = utils.FindContainerPort
Future-Outlier Aug 29, 2025
35f1383
integration test, most are copied from http mode
Future-Outlier Aug 29, 2025
7c586e2
Add other tests
Future-Outlier Aug 29, 2025
4a6e714
remove unnecessary test
Future-Outlier Aug 30, 2025
a793386
Simpler and clearer yaml description.
Future-Outlier Aug 30, 2025
538a439
add tests for case whether the sidecar container is injected in the h…
Future-Outlier Aug 31, 2025
a0012af
merge
Future-Outlier Aug 31, 2025
948c362
Verify sidecar container injection
Future-Outlier Sep 1, 2025
e3c9ae2
update
Future-Outlier Sep 1, 2025
bf81123
update
Future-Outlier Sep 1, 2025
d2df229
We don't need to use Eventually
Future-Outlier Sep 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ _Appears in:_
| `entrypoint` _string_ | Entrypoint represents the command to start execution. | | |
| `runtimeEnvYAML` _string_ | RuntimeEnvYAML represents the runtime environment configuration<br />provided as a multi-line YAML string. | | |
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.<br />In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.<br />In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.<br />In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster. | K8sJobMode | |
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.<br />In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.<br />In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.<br />In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster.<br />In "SidecarMode", the KubeRay operator injects a container into the Ray head Pod that acts as the job submitter to submit the Ray job.<br />If SidecarMode is enabled, retries are managed at the Ray job level using a backoff limit.<br />A retry is triggered if the Ray job fails or if the submitter container fails to submit the Ray job. | K8sJobMode | |
| `entrypointResources` _string_ | EntrypointResources specifies the custom resources and quantities to reserve for the<br />entrypoint command. | | |
| `entrypointNumCpus` _float_ | EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command. | | |
| `entrypointNumGpus` _float_ | EntrypointNumGpus specifies the number of gpus to reserve for the entrypoint command. | | |
Expand Down
4 changes: 4 additions & 0 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const (
K8sJobMode JobSubmissionMode = "K8sJobMode" // Submit job via Kubernetes Job
HTTPMode JobSubmissionMode = "HTTPMode" // Submit job via HTTP request
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
SidecarMode JobSubmissionMode = "SidecarMode" // Submit job via a sidecar container in the Ray head Pod
)

type DeletionPolicyType string
Expand Down Expand Up @@ -174,6 +175,9 @@ type RayJobSpec struct {
// In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.
// In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.
// In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster.
// In "SidecarMode", the KubeRay operator injects a container into the Ray head Pod that acts as the job submitter to submit the Ray job.
// If SidecarMode is enabled, retries are managed at the Ray job level using a backoff limit.
// A retry is triggered if the Ray job fails or if the submitter container fails to submit the Ray job.
// +kubebuilder:default:=K8sJobMode
// +optional
SubmissionMode JobSubmissionMode `json:"submissionMode,omitempty"`
Expand Down
140 changes: 140 additions & 0 deletions ray-operator/config/samples/ray-job.sidecar-mode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sidecar-mode-shutdown
spec:
# submissionMode specifies how RayJob submits the Ray job to the RayCluster.
# The default value is "K8sJobMode", meaning RayJob will submit the Ray job via a submitter Kubernetes Job.
# The alternative value is "HTTPMode", indicating that KubeRay will submit the Ray job by sending an HTTP request to the RayCluster.
# In SidecarMode:
# - In "SidecarMode", the KubeRay operator injects a container into the Ray head Pod that acts as the job submitter to submit the Ray job.
# - If SidecarMode is enabled, retries are managed at the Ray job level using a backoff limit.
# - A retry is triggered if the Ray job fails or if the submitter container fails to submit the Ray job.
submissionMode: "SidecarMode"
entrypoint: python /home/ray/samples/sample_code.py
# shutdownAfterJobFinishes specifies whether the RayCluster should be deleted after the RayJob finishes. Default is false.
# shutdownAfterJobFinishes: false

# ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes.
# ttlSecondsAfterFinished: 10

# activeDeadlineSeconds is the duration in seconds that the RayJob may be active before
# KubeRay actively tries to terminate the RayJob; value must be positive integer.
# activeDeadlineSeconds: 120

# RuntimeEnvYAML represents the runtime environment configuration provided as a multi-line YAML string.
# See https://docs.ray.io/en/latest/ray-core/handling-dependencies.html for details.
# (New in KubeRay version 1.0.)
runtimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"

# Suspend specifies whether the RayJob controller should create a RayCluster instance.
# If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
# If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluster will be created.
# suspend: false

# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
rayClusterSpec:
rayVersion: '2.46.0' # should match the Ray version in the image of the containers
# Ray head pod template
headGroupSpec:
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams: {}
#pod template
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.46.0
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265 # Ray dashboard
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "1"
requests:
cpu: "200m"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
# You set volumes at the Pod level, then mount them into containers inside that Pod
- name: code-sample
configMap:
# Provide the name of the ConfigMap you want to mount.
name: ray-job-code-sample
# An array of keys from the ConfigMap to create as files
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
# the pod replicas in this group typed worker
- replicas: 1
minReplicas: 1
maxReplicas: 5
# logical group name, for this called small-group, also can be functional
groupName: small-group
# The `rayStartParams` are used to configure the `ray start` command.
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
rayStartParams: {}
#pod template
template:
spec:
containers:
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
image: rayproject/ray:2.46.0
resources:
limits:
cpu: "1"
requests:
cpu: "200m"

######################Ray code sample#################################
# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example
# it is mounted into the container and executed to show the Ray job at work
---
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import os
import requests

ray.init()

@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0

def inc(self):
self.counter += 1

def get_counter(self):
return "{} got {}".format(self.name, self.counter)

counter = Counter.remote()

for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))

# Verify that the correct runtime env was used for the job.
assert requests.__version__ == "2.26.0"
106 changes: 70 additions & 36 deletions ray-operator/controllers/ray/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,38 @@ func GetMetadataJson(metadata map[string]string, rayVersion string) (string, err
return pkgutils.ConvertByteSliceToString(metadataBytes), nil
}

// GetK8sJobCommand builds the K8s job command for the Ray job.
func GetK8sJobCommand(rayJobInstance *rayv1.RayJob) ([]string, error) {
address := rayJobInstance.Status.DashboardURL
// BuildJobSubmitCommand builds the job submit command based on submission mode.
func BuildJobSubmitCommand(rayJobInstance *rayv1.RayJob, submissionMode rayv1.JobSubmissionMode) ([]string, error) {
var address string
var waitLoop []string

switch submissionMode {
case rayv1.SidecarMode:
// Sidecar submitter talks to the local head Pod dashboard.
address = "http://127.0.0.1:8265"
// Wait until dashboard is reachable before proceeding.
waitLoop = []string{
"until", "ray", "job", "list", "--address", address, ">/dev/null", "2>&1", ";",
"do", "echo", strconv.Quote("Waiting for Ray dashboard at " + address + " ..."), ";", "sleep", "2", ";", "done", ";",
}
case rayv1.K8sJobMode:
// Submitter is a separate K8s Job; use cluster dashboard address.
address = rayJobInstance.Status.DashboardURL
if !strings.HasPrefix(address, "http://") {
address = "http://" + address
}
default:
return nil, fmt.Errorf("unsupported submission mode for job submit command: %s", submissionMode)
}

var cmd []string
metadata := rayJobInstance.Spec.Metadata
jobId := rayJobInstance.Status.JobId
entrypoint := strings.TrimSpace(rayJobInstance.Spec.Entrypoint)
entrypointNumCpus := rayJobInstance.Spec.EntrypointNumCpus
entrypointNumGpus := rayJobInstance.Spec.EntrypointNumGpus
entrypointResources := rayJobInstance.Spec.EntrypointResources

// add http:// if needed
if !strings.HasPrefix(address, "http://") {
address = "http://" + address
}

// `ray job submit` alone doesn't handle duplicated submission gracefully. See https://github.com/ray-project/kuberay/issues/2154.
// In order to deal with that, we use `ray job status` first to check if the jobId has been submitted.
// If the jobId has been submitted, we use `ray job logs` to follow the logs.
Expand All @@ -77,73 +94,90 @@ func GetK8sJobCommand(rayJobInstance *rayv1.RayJob) ([]string, error) {
// then ray job submit --address http://$RAY_ADDRESS --submission-id $RAY_JOB_SUBMISSION_ID --no-wait -- ... ;
// fi ; ray job logs --address http://$RAY_ADDRESS --follow $RAY_JOB_SUBMISSION_ID
jobStatusCommand := []string{"ray", "job", "status", "--address", address, jobId, ">/dev/null", "2>&1"}
jobFollowCommand := []string{"ray", "job", "logs", "--address", address, "--follow", jobId}
jobSubmitCommand := []string{"ray", "job", "submit", "--address", address, "--no-wait"}
k8sJobCommand := append([]string{"if", "!"}, jobStatusCommand...)
k8sJobCommand = append(k8sJobCommand, ";", "then")
k8sJobCommand = append(k8sJobCommand, jobSubmitCommand...)
jobFollowCommand := []string{"ray", "job", "logs", "--address", address, "--follow", jobId}

cmd = append(cmd, waitLoop...)
// In Sidecar mode, we only support RayJob level retry, which means that the submitter job being retried on an existing cluster won't happen
// so we won't have to check if the job has been submitted.
if submissionMode == rayv1.K8sJobMode {
// Only check job status in K8s mode to handle duplicated submission gracefully
cmd = append(cmd, "if", "!")
cmd = append(cmd, jobStatusCommand...)
cmd = append(cmd, ";", "then")
}
cmd = append(cmd, jobSubmitCommand...)

runtimeEnvJson, err := getRuntimeEnvJson(rayJobInstance)
if err != nil {
return nil, err
}
if len(runtimeEnvJson) > 0 {
k8sJobCommand = append(k8sJobCommand, "--runtime-env-json", strconv.Quote(runtimeEnvJson))
cmd = append(cmd, "--runtime-env-json", strconv.Quote(runtimeEnvJson))
}

if len(metadata) > 0 {
metadataJson, err := GetMetadataJson(metadata, rayJobInstance.Spec.RayClusterSpec.RayVersion)
if err != nil {
return nil, err
}
k8sJobCommand = append(k8sJobCommand, "--metadata-json", strconv.Quote(metadataJson))
cmd = append(cmd, "--metadata-json", strconv.Quote(metadataJson))
}

if len(jobId) > 0 {
k8sJobCommand = append(k8sJobCommand, "--submission-id", jobId)
cmd = append(cmd, "--submission-id", jobId)
}

if entrypointNumCpus > 0 {
k8sJobCommand = append(k8sJobCommand, "--entrypoint-num-cpus", fmt.Sprintf("%f", entrypointNumCpus))
cmd = append(cmd, "--entrypoint-num-cpus", fmt.Sprintf("%f", entrypointNumCpus))
}

if entrypointNumGpus > 0 {
k8sJobCommand = append(k8sJobCommand, "--entrypoint-num-gpus", fmt.Sprintf("%f", entrypointNumGpus))
cmd = append(cmd, "--entrypoint-num-gpus", fmt.Sprintf("%f", entrypointNumGpus))
}

if len(entrypointResources) > 0 {
k8sJobCommand = append(k8sJobCommand, "--entrypoint-resources", strconv.Quote(entrypointResources))
cmd = append(cmd, "--entrypoint-resources", strconv.Quote(entrypointResources))
}

// "--" is used to separate the entrypoint from the Ray Job CLI command and its arguments.
k8sJobCommand = append(k8sJobCommand, "--", entrypoint, ";", "fi", ";")
k8sJobCommand = append(k8sJobCommand, jobFollowCommand...)
cmd = append(cmd, "--", entrypoint, ";")
if submissionMode == rayv1.K8sJobMode {
cmd = append(cmd, "fi", ";")
}

cmd = append(cmd, jobFollowCommand...)

return k8sJobCommand, nil
return cmd, nil
}

// GetDefaultSubmitterTemplate creates a default submitter template for the Ray job.
func GetDefaultSubmitterTemplate(rayClusterInstance *rayv1.RayCluster) corev1.PodTemplateSpec {
return corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "ray-job-submitter",
// Use the image of the Ray head to be defensive against version mismatch issues
Image: rayClusterInstance.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Image,
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("200Mi"),
},
},
},
GetDefaultSubmitterContainer(rayClusterInstance),
},
RestartPolicy: corev1.RestartPolicyNever,
},
}
}

// GetDefaultSubmitterContainer creates a default submitter container for the Ray job.
func GetDefaultSubmitterContainer(rayClusterInstance *rayv1.RayCluster) corev1.Container {
return corev1.Container{
Name: utils.SubmitterContainerName,
// Use the image of the Ray head to be defensive against version mismatch issues
Image: rayClusterInstance.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Image,
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("200Mi"),
},
},
}
}
8 changes: 4 additions & 4 deletions ray-operator/controllers/ray/common/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestGetMetadataJson(t *testing.T) {
assert.JSONEq(t, expected, metadataJson)
}

func TestGetK8sJobCommand(t *testing.T) {
func TestBuildJobSubmitCommandWithK8sJobMode(t *testing.T) {
expected := []string{
"if",
"!", "ray", "job", "status", "--address", "http://127.0.0.1:8265", "testJobId", ">/dev/null", "2>&1",
Expand All @@ -88,12 +88,12 @@ func TestGetK8sJobCommand(t *testing.T) {
";", "fi", ";",
"ray", "job", "logs", "--address", "http://127.0.0.1:8265", "--follow", "testJobId",
}
command, err := GetK8sJobCommand(testRayJob)
command, err := BuildJobSubmitCommand(testRayJob, rayv1.K8sJobMode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a command test for sidecar mode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After confirming with Kai-Hsun, I will add tests once the non-test code is finalized. Thank you!

require.NoError(t, err)
assert.Equal(t, expected, command)
}

func TestGetK8sJobCommandWithYAML(t *testing.T) {
func TestBuildJobSubmitCommandWithK8sJobModeAndYAML(t *testing.T) {
rayJobWithYAML := &rayv1.RayJob{
Spec: rayv1.RayJobSpec{
RuntimeEnvYAML: `
Expand Down Expand Up @@ -126,7 +126,7 @@ pip: ["python-multipart==0.0.6"]
";", "fi", ";",
"ray", "job", "logs", "--address", "http://127.0.0.1:8265", "--follow", "testJobId",
}
command, err := GetK8sJobCommand(rayJobWithYAML)
command, err := BuildJobSubmitCommand(rayJobWithYAML, rayv1.K8sJobMode)
require.NoError(t, err)

// Ensure the slices are the same length.
Expand Down
Loading
Loading