Skip to content

Commit

Permalink
add telemetry agent and local backend collector for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewitt1 committed Sep 1, 2024
1 parent 56cd67b commit dc43df5
Show file tree
Hide file tree
Showing 11 changed files with 538 additions and 1 deletion.
13 changes: 13 additions & 0 deletions docker/telemetry-collector/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
FROM otel/opentelemetry-collector-contrib:latest

# Define an argument for the config file path
ARG CONFIG_FILE=otel-collector-config.yaml

# Copy the config file into the container
COPY ${CONFIG_FILE} /otel-collector-config.yaml

# Expose the necessary ports (health check, grpc, and http ports)
EXPOSE 13133 4319 4316

# Set the command to run the collector with the config file
CMD ["--config", "/otel-collector-config.yaml"]
34 changes: 34 additions & 0 deletions docker/telemetry-collector/otel-collector-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
extensions:
health_check:
endpoint: 0.0.0.0:13133

receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4316
http:
endpoint: 0.0.0.0:4319

exporters:
logging:
verbosity: detailed

processors:
batch:

service:
extensions: [health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [logging]
logs:
receivers: [otlp]
processors: [batch]
exporters: [logging]
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
python-dotenv
fastapi
pexpect
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp
pyopenssl>=23.3.0
ray[default] >= 2.9.0
rich
Expand Down
10 changes: 10 additions & 0 deletions runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,13 @@
DEFAULT_SURFACED_LOG_LENGTH = 20
# Constants for schedulers
INCREASED_INTERVAL = 1 * HOUR

# Telemetry constants
OTEL_VERSION = "0.108.0"
TELEMETRY_AGENT_LOCAL_CONFIG_DIR = "/tmp/otel"

TELEMETRY_AGENT_HTTP_PORT = 4318
TELEMETRY_AGENT_GRPC_PORT = 4317

TELEMETRY_COLLECTOR_ENDPOINT = "telemetry.run.house:4317"
TELEMETRY_COLLECTOR_STATUS_URL = "https://telemetry.run.house/status"
1 change: 1 addition & 0 deletions runhouse/servers/telemetry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .telemetry_agent import TelemetryAgent, TelemetryAgentConfig
287 changes: 287 additions & 0 deletions runhouse/servers/telemetry/telemetry_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import logging
import os
import platform
import subprocess
import tarfile
import time
import urllib
from datetime import datetime
from pathlib import Path
from typing import Optional

import psutil
import requests
import yaml
from pydantic import BaseModel

from runhouse.constants import (
OTEL_VERSION,
TELEMETRY_AGENT_GRPC_PORT,
TELEMETRY_AGENT_HTTP_PORT,
TELEMETRY_AGENT_LOCAL_CONFIG_DIR,
TELEMETRY_COLLECTOR_ENDPOINT,
TELEMETRY_COLLECTOR_STATUS_URL,
)
from runhouse.logger import get_logger

logger = get_logger(__name__)


class TelemetryAgentConfig(BaseModel):
# Note: Local agent will listen on the default ports (4317 for gRPC and 4318 for HTTP)
http_port: Optional[int] = TELEMETRY_AGENT_HTTP_PORT
grpc_port: Optional[int] = TELEMETRY_AGENT_GRPC_PORT
backend_collector_endpoint: Optional[str] = TELEMETRY_COLLECTOR_ENDPOINT
backend_collector_status_url: Optional[str] = TELEMETRY_COLLECTOR_STATUS_URL
log_level: Optional[str] = logging.getLevelName(logger.getEffectiveLevel())
local_agent_dir: Optional[str] = TELEMETRY_AGENT_LOCAL_CONFIG_DIR


class TelemetryAgent:
"""Runs a local OpenTelemetry Collector instance for telemetry collection
Key actions:
- Installs the OpenTelemetry Collector binary (if not already present)
- Creates a config file for the agent
- Starts the agent collector process as a background process on the host machine
- Listens for incoming telemetry data on specified ports
- Exports telemetry data to the backend collector
"""

def __init__(self, telemetry_config: TelemetryAgentConfig = None):
self._agent_process = None
self.config = telemetry_config or TelemetryAgentConfig()

@property
def executable_path(self):
"""Path to the otel binary."""
return f"{self.config.local_agent_dir}/otelcol"

@property
def local_config_path(self):
"""Path to the agent config file."""
return f"{self.config.local_agent_dir}/otel-config.yaml"

def _create_default_config(self):
"""Base config for the local agent, which forwards the collected telemetry data to the collector backend."""
otel_config = {
"receivers": {
"otlp": {
"protocols": {
"grpc": {"endpoint": f"0.0.0.0:{self.config.grpc_port}"},
"http": {"endpoint": f"0.0.0.0:{self.config.http_port}"},
}
}
},
"processors": {"batch": {}},
"exporters": {
"logging": {"loglevel": self.config.log_level.lower()},
"otlp/grpc": {
"endpoint": self.config.backend_collector_endpoint,
"tls": {"insecure": True},
},
},
"service": {
"pipelines": {
"traces": {
"receivers": ["otlp"],
"processors": ["batch"],
"exporters": ["logging", "otlp/grpc"],
},
"metrics": {
"receivers": ["otlp"],
"processors": ["batch"],
"exporters": ["logging", "otlp/grpc"],
},
"logs": {
"receivers": ["otlp"],
"processors": ["batch"],
"exporters": ["logging", "otlp/grpc"],
},
}
},
}

with open(self.local_config_path, "w") as f:
yaml.dump(otel_config, f, default_flow_style=False)

def _generate_install_url(self):
"""Generate the download URL for the agent binary based on the host system."""
# https://opentelemetry.io/docs/collector/installation/
system = platform.system().lower()
arch = platform.machine()
if system == "linux":
arch = "amd64" if arch in ["x86_64", "amd64"] else arch
elif system == "darwin":
arch = "amd64" if arch == "x86_64" else "arm64" if arch == "arm64" else arch
else:
raise ValueError(f"Unsupported system: {system}")

url = f"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v{OTEL_VERSION}/otelcol_{OTEL_VERSION}_{system}_{arch}.tar.gz"
return url

def _find_existing_agent_process(self):
"""Finds the running agent process by name"""
for proc in psutil.process_iter(["pid", "name"]):
try:
if proc.info["name"] == "otelcol":
return proc
except (
psutil.NoSuchProcess,
psutil.AccessDenied,
psutil.TimeoutExpired,
psutil.ZombieProcess,
):
pass

def collector_health_check(self) -> int:
"""Ping the health status port of the backend collector - should return a status code of 200 if healthy"""
resp = requests.get(self.config.backend_collector_status_url)
return resp.status_code

def is_up(self) -> bool:
"""
Checks if the local agent process is running.
Handles two scenarios:
1. The TelemetryAgent object has a reference to a process it started.
2. The TelemetryAgent object is newly initialized and needs to find an existing process.
In both cases, we maintain a ref to the running process.
Returns:
bool: True if the agent process is running, False otherwise.
"""
if self._agent_process:
# TelemetryAgent is still tracking a running process
return psutil.pid_exists(self._agent_process.pid)

proc = self._find_existing_agent_process()
if not proc:
return False

self._agent_process = proc
return True

def is_installed(self) -> bool:
"""Check if the binary path exists on the host machine."""
return Path(self.executable_path).exists()

def install(self):
"""Install the otel binary used for running the telemetry agent."""
logger.debug("Installing OTel agent")
try:
install_url = self._generate_install_url()
logger.debug(f"Downloading OTel agent from url: {install_url}")

# Download and extract
tar_path = "/tmp/otelcol.tar.gz"
urllib.request.urlretrieve(install_url, tar_path)

local_agent_dir = self.config.local_agent_dir
Path(local_agent_dir).mkdir(parents=True, exist_ok=True)
os.chmod(local_agent_dir, 0o755)

with tarfile.open(tar_path, "r:gz") as tar:
tar.extractall(path=local_agent_dir)

# Verify installation
if not self.is_installed():
raise FileExistsError(
f"No otel binary found in path: {self.executable_path}"
)

logger.info("OTel agent installed successfully.")

except Exception as e:
raise RuntimeError(f"Failed to install OTel agent: {e}")

def start(self, force_reinstall=False, reload_config=False, timeout=30):
"""
Start the local telemetry agent.
Args:
force_reinstall (bool): If True, reinstall the agent even if it's already installed.
reload_config (bool): If True, reload the agent config even if it exists.
timeout (int): Maximum time to wait for the agent to start, in seconds.
Returns:
bool: True if the agent was successfully started, False otherwise.
"""
if force_reinstall or not self.is_installed():
self.install()

if self.is_up() and not reload_config:
logger.info("Otel agent is already running.")
return True

try:
config_path = self.local_config_path
if reload_config or not Path(config_path).exists():
self._create_default_config()

logs_dir = os.path.join(os.path.dirname(self.executable_path), "logs")
os.makedirs(logs_dir, exist_ok=True)
log_file = os.path.join(logs_dir, "agent.log")

logger.debug(f"Starting OpenTelemetry agent at {datetime.now()}\n")

with open(log_file, "a") as out_file:
self._agent_process = subprocess.Popen(
[self.executable_path, "--config", config_path],
stdout=out_file,
stderr=subprocess.STDOUT,
text=True,
)

# Wait for the process to start
start_time = datetime.now()
while (datetime.now() - start_time).total_seconds() < timeout:
if self.is_up():
logger.info(
f"Successfully started Otel agent (pid={self._agent_process.pid})"
)
return True

time.sleep(0.5)
raise TimeoutError(
f"Otel agent failed to start within the specified timeout ({timeout} seconds)."
)

except (PermissionError, subprocess.CalledProcessError, TimeoutError) as e:
logger.error(f"Failed to start Otel agent: {e}")
return False

def stop(self):
"""
Stop the local telemetry agent.
Returns:
bool: True if a process was stopped, False otherwise.
"""
process_to_stop = self._agent_process or self._find_existing_agent_process()
if not process_to_stop:
logger.info("No running Otel agent found.")
return False

try:
process_to_stop.terminate()
process_to_stop.wait(
timeout=5
) # Wait for up to 5 seconds for the process to terminate
logger.info(f"Stopped Otel agent (pid={process_to_stop.pid})")
self._agent_process = None # Clear the reference
return True

except psutil.NoSuchProcess:
logger.info("Otel agent no longer running.")
return True

except psutil.TimeoutExpired:
process_to_stop.kill() # Force kill if it doesn't terminate
logger.info("Stopped the Otel agent.")
return True

except Exception as e:
logger.error(f"Error stopping Otel agent: {e}")
return False
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ def parse_readme(readme: str) -> str:
"python-dotenv",
"fastapi",
"pexpect",
"opentelemetry-api",
"opentelemetry-sdk",
"opentelemetry-exporter-otlp",
"pyOpenSSL>=23.3.0",
"ray[default] >= 2.9.0",
"async_timeout", # Needed for ray<=2.9
Expand Down
6 changes: 5 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ def event_loop():
shared_function, # noqa: F401
)

# ----------------- Telemetry -----------------
from tests.fixtures.docker_telemetry_fixtures import (
local_telemetry_collector, # noqa: F401
)

from tests.fixtures.on_demand_cluster_fixtures import (
a10g_gpu_cluster, # noqa: F401
k80_gpu_cluster, # noqa: F401
Expand Down Expand Up @@ -268,7 +273,6 @@ def event_loop():
sm_gpu_cluster, # noqa: F401
)


# ----------------- Folders -----------------

from tests.fixtures.folder_fixtures import ( # usort: skip
Expand Down
Loading

0 comments on commit dc43df5

Please sign in to comment.