Skip to content

Commit

Permalink
Merge branch 'local-telemetry-agent' into obj-store-otel-export
Browse files Browse the repository at this point in the history
# Conflicts:
#	runhouse/constants.py
#	runhouse/servers/telemetry/telemetry_agent.py
  • Loading branch information
jlewitt1 committed Sep 1, 2024
2 parents 3685fc6 + 99c89ca commit 4a5b09f
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 77 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/local_den_unit_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ name: Local Den Unit Tests

on: workflow_dispatch

env:
TELEMETRY_COLLECTOR_ENDPOINT: localhost:4316
TELEMETRY_COLLECTOR_STATUS_URL: http://localhost:13134

jobs:
local-den-tests:
runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/local_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ on:
env:
API_SERVER_URL: https://api.run.house
RH_LOG_LEVEL: INFO
TELEMETRY_COLLECTOR_ENDPOINT: localhost:4316
TELEMETRY_COLLECTOR_STATUS_URL: http://localhost:13134

jobs:
# TODO: THESE ARE ONLY SEPARATE JOBS BECAUSE THERE ARE
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/local_tests_den_dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ on:

env:
API_SERVER_URL: https://api-dev.run.house
TELEMETRY_COLLECTOR_ENDPOINT: localhost:4316
TELEMETRY_COLLECTOR_STATUS_URL: http://localhost:13134

jobs:
# TODO: THESE ARE ONLY SEPARATE JOBS BECAUSE THERE ARE
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pexpect
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp
opentelemetry-instrumentation-logging
pyopenssl>=23.3.0
ray[default] >= 2.9.0
rich
Expand Down
1 change: 0 additions & 1 deletion runhouse/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@

# Telemetry constants
OTEL_VERSION = "0.108.0"
TELEMETRY_AGENT_LOCAL_CONFIG_DIR = "/tmp/otel"

TELEMETRY_AGENT_HTTP_PORT = 4318
TELEMETRY_AGENT_GRPC_PORT = 4317
Expand Down
8 changes: 4 additions & 4 deletions runhouse/servers/http/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import inspect
import json
import traceback
import uuid
from functools import wraps
from pathlib import Path
from typing import Optional
Expand Down Expand Up @@ -48,6 +47,7 @@
FolderParams,
FolderPutParams,
FolderRmParams,
generate_request_id_from_headers,
get_token_from_request,
handle_exception_response,
OutputType,
Expand Down Expand Up @@ -88,7 +88,7 @@ async def wrapper(*args, **kwargs):
)
token = get_token_from_request(request)

request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request_id = generate_request_id_from_headers(request)
ctx_token = obj_store.set_ctx(request_id=request_id, token=token)

try:
Expand Down Expand Up @@ -568,6 +568,7 @@ async def put_resource(request: Request, params: PutResourceParams):
serialized_data=params.serialized_data,
serialization=params.serialization,
env_name=env_name,
request_id=generate_request_id_from_headers(request),
)
except Exception as e:
return handle_exception_response(
Expand Down Expand Up @@ -604,13 +605,12 @@ async def get_object(
serialization: Optional[str] = "json",
remote: bool = False,
):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
try:
return await obj_store.aget(
key=key,
serialization=serialization,
remote=remote,
request_id=request_id,
request_id=generate_request_id_from_headers(request),
)
except Exception as e:
return handle_exception_response(
Expand Down
5 changes: 5 additions & 0 deletions runhouse/servers/http/http_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import shutil
import sys
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

Expand Down Expand Up @@ -251,6 +252,10 @@ def auth_headers_from_request(request):
return request.headers.get("Authorization", "")


def generate_request_id_from_headers(request):
return request.headers.get("X-Request-ID", str(uuid.uuid4()))


def load_current_cluster_rns_address():
from runhouse.resources.hardware import _current_cluster, _get_cluster_from

Expand Down
80 changes: 55 additions & 25 deletions runhouse/servers/obj_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,17 @@
import ray

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.logging import LoggingInstrumentor

# Note: logs in experimental state (https://opentelemetry-python.readthedocs.io/en/latest/examples/logs/README.html)
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Status, StatusCode

from pydantic import BaseModel

Expand Down Expand Up @@ -135,18 +143,24 @@ async def wrapper(self, *args, **kwargs):
if tracer is None:
return await func(self, *args, **kwargs)

span_id = kwargs.pop("span_id", None) or generate_uuid()
span_id = kwargs.pop("request_id", None) or generate_uuid()
span_name = f"{self.__class__.__name__}.{func.__name__}"
func_name = func.__name__
cluster_config = self.cluster_config

# Set up logger named for the module containing the decorated method
logger = logging.getLogger(func.__module__)

try:
from opentelemetry.trace.status import Status, StatusCode
# Instrument logger to include trace context in logs
LoggingInstrumentor().instrument(set_logging_format=True)

with tracer.start_as_current_span(span_name) as span:
try:
span.set_attribute("function_name", func_name)
span.set_attribute("span_id", span_id)

# Set cluster config attributes
for k, v in cluster_config.items():
if isinstance(v, rh.Resource):
v = {
Expand All @@ -159,49 +173,47 @@ async def wrapper(self, *args, **kwargs):
json_value = json.dumps(v)
span.set_attribute(span_key, json_value)
except TypeError:
# If the value is not JSON serializable, convert it to a string
span.set_attribute(span_key, str(v))

# Set attributes for arguments
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())
# Ignore the 'self' arg
for i, arg in enumerate(args, start=1):
for i, arg in enumerate(args[1:], start=1): # Skip 'self'
if i < len(param_names):
span.set_attribute(param_names[i], str(arg))

# TODO: might want to be more selective about what to include here
for arg_name, arg_value in kwargs.items():
span.set_attribute(arg_name, str(arg_value))
if arg_name != "serialized_data":
span.set_attribute(arg_name, str(arg_value))

# Manually add log to the span
span.add_event(f"Starting execution for func: {func_name}")
# Log start of function execution
logger.info(f"Starting execution for func: {func_name}")

result = await func(self, *args, **kwargs)

# Add another log event for successful execution
span.add_event(f"Finished execution for func: {func_name}")
# Log end of function execution
logger.info(f"Finished execution for func: {func_name}")
span.set_status(Status(StatusCode.OK))
return result

except Exception as e:
# Log the exception in the span
span.add_event(
"Exception occurred",
{
"exception.type": type(e).__name__,
"exception.message": str(e),
},
)
# Log the exception
logger.exception(f"Exception in {func_name}: {str(e)}")
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise Exception
raise

except Exception as e:
# Catch any OpenTelemetry-related exceptions
logger.warning(f"OpenTelemetry error in {func_name}: {e}")

# Execute the function without tracing
return await func(self, *args, **kwargs)

finally:
# Uninstrument logger to avoid affecting other parts of the application
LoggingInstrumentor().uninstrument()

return wrapper

return decorator
Expand Down Expand Up @@ -274,10 +286,10 @@ def tracer(self):
return self._tracer

def _initialize_telemetry_agent(self):
from runhouse.servers.telemetry import TelemetryAgent
from runhouse.servers.telemetry import TelemetryAgentExporter

try:
ta = TelemetryAgent()
ta = TelemetryAgentExporter()
ta.start()
return ta

Expand All @@ -286,18 +298,34 @@ def _initialize_telemetry_agent(self):
return None

def _initialize_tracer(self):
trace.set_tracer_provider(TracerProvider())
resource = Resource.create({"service.name": "runhouse-oss"})
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)

# Export to local agent, which handles sending to the backend collector
span_processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=f"localhost:{self.telemetry_agent.config.grpc_port}",
endpoint=f"localhost:{self.telemetry_agent.agent_config.grpc_port}",
insecure=True,
)
)
trace.get_tracer_provider().add_span_processor(span_processor)

# Set up logging
log_provider = LoggerProvider(resource=resource)
log_exporter = OTLPLogExporter(
endpoint=f"localhost:{self.telemetry_agent.agent_config.grpc_port}",
insecure=True,
)
log_processor = BatchLogRecordProcessor(log_exporter)
log_provider.add_log_record_processor(log_processor)

# Set up logging handler
handler = LoggingHandler(level=logging.NOTSET, logger_provider=log_provider)

# Attach OTLP handler to root logger
logging.getLogger().addHandler(handler)

return tracer

# ----------------------------------------------------
Expand Down Expand Up @@ -920,7 +948,7 @@ async def aget(
serialization: Optional[str] = None,
remote: bool = False,
default: Optional[Any] = None,
request_id: Optional[str] = None,
**kwargs,
):
env_servlet_name_containing_key = await self.aget_env_servlet_name_for_key(key)
if not env_servlet_name_containing_key:
Expand Down Expand Up @@ -1605,11 +1633,13 @@ def get_obj_refs_dict(self, d: Dict[Any, Any]):
##############################################
# More specific helpers
##############################################
@trace_method()
async def aput_resource(
self,
serialized_data: Any,
serialization: Optional[str] = None,
env_name: Optional[str] = None,
**kwargs,
) -> "Response":
from runhouse.servers.http.http_utils import deserialize_data

Expand Down
Loading

0 comments on commit 4a5b09f

Please sign in to comment.