Skip to content

Commit

Permalink
instrument obj store method with otel agent
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewitt1 committed Sep 1, 2024
1 parent 6942323 commit fba07fc
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 39 deletions.
2 changes: 1 addition & 1 deletion runhouse/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ def clean_up_ssh_connections():

# Note: this initalizes a dummy global object. The obj_store must
# be properly initialized by a servlet via initialize.
obj_store = ObjStore()
obj_store = ObjStore(skip_init=True)
5 changes: 2 additions & 3 deletions runhouse/servers/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,10 @@ async def get_object(
serialization: Optional[str] = "json",
remote: bool = False,
):
request_id = request.headers.get("X-Request-ID")
try:
return await obj_store.aget(
key=key,
serialization=serialization,
remote=remote,
key=key, serialization=serialization, remote=remote, span_id=request_id
)
except Exception as e:
return handle_exception_response(
Expand Down
114 changes: 111 additions & 3 deletions runhouse/servers/obj_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@
from typing import Any, Dict, List, Optional, Set, Union

import ray

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace.status import Status, StatusCode
from pydantic import BaseModel

from runhouse.logger import get_logger

from runhouse.rns.defaults import req_ctx
from runhouse.rns.utils.api import ResourceVisibility
from runhouse.rns.utils.api import generate_uuid, ResourceVisibility
from runhouse.utils import (
arun_in_thread,
generate_default_name,
Expand Down Expand Up @@ -119,6 +125,67 @@ async def wrapper(self, *args, **kwargs):
return wrapper


def trace_method(name=None):
def decorator(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
if not hasattr(self, "tracer") or self.tracer is None:
return await func(self, *args, **kwargs)

span_id = kwargs.pop("span_id", None) or generate_uuid()
span_name = name or f"{self.__class__.__name__}.{func.__name__}"
func_name = func.__name__

try:
with self.tracer.start_as_current_span(span_name) as span:
try:
span.set_attribute("function_name", func_name)
span.set_attribute("span_id", span_id)

# Set attributes for arguments
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())
for i, arg in enumerate(args, start=1):
if i < len(param_names):
span.set_attribute(param_names[i], str(arg))
for arg_name, arg_value in kwargs.items():
span.set_attribute(arg_name, str(arg_value))

# Manually add log to the span
span.add_event(f"Starting execution for func: {func_name}")

result = await func(self, *args, **kwargs)

# Add another log event for successful execution
span.add_event(f"Finished execution for func: {func_name}")
span.set_status(Status(StatusCode.OK))
return result

except Exception as e:
# Log the exception in the span
span.add_event(
"Exception occurred",
{
"exception.type": type(e).__name__,
"exception.message": str(e),
},
)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise Exception

except Exception as otel_error:
# Catch any OpenTelemetry-related exceptions
logger.warning(f"OpenTelemetry error in {func_name}: {otel_error}")

# Execute the function without tracing
return await func(self, *args, **kwargs)

return wrapper

return decorator


class ObjStore:
"""Class to handle internal IPC and storage for Runhouse.
Expand All @@ -142,7 +209,7 @@ class ObjStore:
if functions within that Servlet make key/value requests.
"""

def __init__(self):
def __init__(self, skip_init=False, **kwargs):
self.servlet_name: Optional[str] = None
self.cluster_servlet: Optional[ray.actor.ActorHandle] = None
self.cluster_config: Optional[Dict[str, Any]] = None
Expand All @@ -152,6 +219,46 @@ def __init__(self):
self.env_servlet_cache = {}
self.active_function_calls = {}

# TODO: add flag for enabling observability
if not skip_init:
self.telemetry_agent = self._initialize_telemetry_agent(**kwargs)
self.tracer = self._initialize_tracer()

##############################################
# Telemetry setup
##############################################
def _initialize_telemetry_agent(self, **kwargs):
from runhouse.servers.telemetry import TelemetryAgent, TelemetryAgentConfig

try:
config = TelemetryAgentConfig(
backend_collector_endpoint=kwargs.get("backend_collector_endpoint"),
backend_collector_status_url=kwargs.get("backend_collector_status_url"),
)
ta = TelemetryAgent(config)
ta.start()
return ta

except Exception as e:
logger.warning(f"Failed to start telemetry agent: {e}")
return None

def _initialize_tracer(self):
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Export to local agent, which handles sending to the backend collector
span_processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=f"localhost:{self.telemetry_agent.config.grpc_port}",
insecure=True,
)
)
trace.get_tracer_provider().add_span_processor(span_processor)

return tracer

# --------------------
async def ainitialize(
self,
servlet_name: Optional[str] = None,
Expand Down Expand Up @@ -764,15 +871,16 @@ def get_local(self, key: Any, default: Optional[Any] = None, remote: bool = Fals
raise KeyError(f"No local store exists; key {key} not found.")
return default

@trace_method()
async def aget(
self,
key: Any,
serialization: Optional[str] = None,
remote: bool = False,
default: Optional[Any] = None,
span_id: Optional[str] = None,
):
env_servlet_name_containing_key = await self.aget_env_servlet_name_for_key(key)

if not env_servlet_name_containing_key:
if default == KeyError:
raise KeyError(f"No local store exists; key {key} not found.")
Expand Down
147 changes: 116 additions & 31 deletions runhouse/servers/telemetry/telemetry_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import platform
import subprocess
import tarfile
import time
import urllib
from datetime import datetime
from pathlib import Path
from typing import Optional

Expand Down Expand Up @@ -103,7 +105,8 @@ def _create_default_config(self):
with open(self.local_config_path, "w") as f:
yaml.dump(otel_config, f, default_flow_style=False)

def _load_install_url(self):
def _generate_install_url(self):
"""Generate the download URL for the agent binary based on the host system."""
# https://opentelemetry.io/docs/collector/installation/
system = platform.system().lower()
arch = platform.machine()
Expand All @@ -117,29 +120,58 @@ def _load_install_url(self):
url = f"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v{OTEL_VERSION}/otelcol_{OTEL_VERSION}_{system}_{arch}.tar.gz"
return url

def _find_existing_agent_process(self):
"""Finds the running agent process by name"""
for proc in psutil.process_iter(["pid", "name"]):
try:
if proc.info["name"] == "otelcol":
return proc
except (
psutil.NoSuchProcess,
psutil.AccessDenied,
psutil.TimeoutExpired,
psutil.ZombieProcess,
):
pass

def collector_health_check(self) -> int:
"""Ping the health status port of the backend collector - should return a status code of 200 if healthy"""
resp = requests.get(self.config.backend_collector_status_url)
return resp.status_code

def is_up(self) -> bool:
if self._agent_process is None:
return False
"""
Checks if the local agent process is running.
Handles two scenarios:
1. The TelemetryAgent object has a reference to a process it started.
2. The TelemetryAgent object is newly initialized and needs to find an existing process.
In both cases, we maintain a ref to the running process.
Returns:
bool: True if the agent process is running, False otherwise.
"""
if self._agent_process:
# TelemetryAgent is still tracking a running process
return psutil.pid_exists(self._agent_process.pid)

exec_path = Path(self.config.local_agent_dir).exists()
if not exec_path:
proc = self._find_existing_agent_process()
if not proc:
return False

return psutil.pid_exists(self._agent_process.pid)
self._agent_process = proc
return True

def is_installed(self) -> bool:
"""Check if the binary path exists on the host machine."""
return Path(self.executable_path).exists()

def install(self):
"""Install the otel binary used for running the telemetry agent."""
logger.debug("Installing OTel agent")
try:
install_url = self._load_install_url()
install_url = self._generate_install_url()
logger.debug(f"Downloading OTel agent from url: {install_url}")

# Download and extract
Expand All @@ -164,39 +196,92 @@ def install(self):
except Exception as e:
raise RuntimeError(f"Failed to install OTel agent: {e}")

def start(self, force_reinstall=False, reload_config=False):
"""Start the local telemetry agent. Ignore if the agent is already running."""
def start(self, force_reinstall=False, reload_config=False, timeout=30):
"""
Start the local telemetry agent.
Args:
force_reinstall (bool): If True, reinstall the agent even if it's already installed.
reload_config (bool): If True, reload the agent config even if it exists.
timeout (int): Maximum time to wait for the agent to start, in seconds.
Returns:
bool: True if the agent was successfully started, False otherwise.
"""
if force_reinstall or not self.is_installed():
self.install()

if self.is_up() and not reload_config:
return

config_path = self.local_config_path
if reload_config or not Path(config_path).exists():
self._create_default_config()
logger.info("Otel agent is already running.")
return True

try:
# Start the agent as a background process
self._agent_process = subprocess.Popen(
[self.executable_path, "--config", config_path],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
config_path = self.local_config_path
if reload_config or not Path(config_path).exists():
self._create_default_config()

logs_dir = os.path.join(os.path.dirname(self.executable_path), "logs")
os.makedirs(logs_dir, exist_ok=True)
log_file = os.path.join(logs_dir, "agent.log")

logger.debug(f"Starting OpenTelemetry agent at {datetime.now()}\n")

with open(log_file, "a") as out_file:
self._agent_process = subprocess.Popen(
[self.executable_path, "--config", config_path],
stdout=out_file,
stderr=subprocess.STDOUT,
text=True,
)

except PermissionError as e:
logger.error(f"Insufficient permissions to run Otel agent: {e}")
# Wait for the process to start
start_time = datetime.now()
while (datetime.now() - start_time).total_seconds() < timeout:
if self.is_up():
logger.info(
f"Successfully started Otel agent (pid={self._agent_process.pid})"
)
return True

time.sleep(0.5)
raise TimeoutError(
f"Otel agent failed to start within the specified timeout ({timeout} seconds)."
)

except subprocess.CalledProcessError as e:
except (PermissionError, subprocess.CalledProcessError, TimeoutError) as e:
logger.error(f"Failed to start Otel agent: {e}")

logger.info(f"Successfully started Otel agent (pid={self._agent_process.pid})")
return False

def stop(self):
"""Stop the local telemetry agent."""
if self._agent_process is None:
logger.debug("Otel agent not currently running")
return
"""
Stop the local telemetry agent.
Returns:
bool: True if a process was stopped, False otherwise.
"""
process_to_stop = self._agent_process or self._find_existing_agent_process()
if not process_to_stop:
logger.info("No running Otel agent found.")
return False

self._agent_process.terminate()
logger.info("Stopped Otel agent")
try:
process_to_stop.terminate()
process_to_stop.wait(
timeout=5
) # Wait for up to 5 seconds for the process to terminate
logger.info(f"Stopped Otel agent (pid={process_to_stop.pid})")
self._agent_process = None # Clear the reference
return True

except psutil.NoSuchProcess:
logger.info("Otel agent no longer running.")
return True

except psutil.TimeoutExpired:
process_to_stop.kill() # Force kill if it doesn't terminate
logger.info("Stopped the Otel agent.")
return True

except Exception as e:
logger.error(f"Error stopping Otel agent: {e}")
return False
3 changes: 2 additions & 1 deletion tests/test_servers/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ def cert_config():
def local_telemetry_agent():
from runhouse.servers.telemetry import TelemetryAgent, TelemetryAgentConfig

# Note: For local testing purposes the backend collector will run on a lower port for HTTP and GRPC to
# Note: For local testing purposes the backend collector will run on different ports for HTTP and GRPC to
# avoid collisions with the local agent
agent_config = TelemetryAgentConfig(
backend_collector_endpoint="localhost:4316",
backend_collector_status_url="http://localhost:13133",
log_level="DEBUG",
)
ta = TelemetryAgent(agent_config)
ta.start(reload_config=True)
Expand Down
Loading

0 comments on commit fba07fc

Please sign in to comment.