Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
layout python python3.12
dotenv_if_exists
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ share/python-wheels/
*.egg
MANIFEST

# Virtual environments
# Virtual environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
.direnv

# IDE
.vscode/
Expand Down Expand Up @@ -61,4 +62,4 @@ Thumbs.db
# Replicated SDK state
Library/
.local/
AppData/
AppData/
9 changes: 1 addition & 8 deletions examples/async_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import asyncio

from replicated import AsyncReplicatedClient, InstanceStatus
from replicated import AsyncReplicatedClient


async def main():
Expand All @@ -31,13 +31,6 @@ async def main():
)
print("Metrics sent successfully")

# Set the instance status and version concurrently
await asyncio.gather(
instance.set_status(InstanceStatus.RUNNING),
instance.set_version("1.2.0"),
)
print("Instance status set to RUNNING and version set to 1.2.0")

print("Example completed successfully!")


Expand Down
85 changes: 85 additions & 0 deletions examples/metrics_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""
Basic example of using the Replicated Python SDK.
This script initializes the replicated package, creates a customer and instance.
"""

import argparse
import asyncio

from replicated import AsyncReplicatedClient


async def main():
parser = argparse.ArgumentParser(description="Basic Replicated SDK example")
parser.add_argument(
"--base-url",
default="https://replicated.app",
help="Base URL for the Replicated API (default: https://replicated.app)",
)
parser.add_argument(
"--publishable-key",
required=True,
help="Your Replicated publishable key (required)",
)
parser.add_argument(
"--app-slug", required=True, help="Your application slug (required)"
)
parser.add_argument(
"--customer-email",
default="[email protected]",
help="Customer email address (default: [email protected])",
)
parser.add_argument("--channel", help="Channel for the customer (optional)")
parser.add_argument("--customer-name", help="Customer name (optional)")

args = parser.parse_args()

print("Initializing Replicated client...")
print(f"Base URL: {args.base_url}")
print(f"App Slug: {args.app_slug}")

# Initialize the client
async with AsyncReplicatedClient(
publishable_key=args.publishable_key,
app_slug=args.app_slug,
base_url=args.base_url,
) as client:
print("✓ Replicated client initialized successfully")

# Create or get customer
channel_info = f" (channel: {args.channel})" if args.channel else ""
name_info = f" (name: {args.customer_name})" if args.customer_name else ""
print(
f"\nCreating/getting customer with email: "
f"{args.customer_email}{channel_info}{name_info}"
)
customer = await client.customer.get_or_create(
email_address=args.customer_email,
channel=args.channel,
name=args.customer_name,
)
print(f"✓ Customer created/retrieved - ID: {customer.customer_id}")

# Get or create the associated instance
instance = await customer.get_or_create_instance()
print(f"Instance ID: {instance.instance_id}")
print(f"✓ Instance created/retrieved - ID: {instance.instance_id}")

# Get or create the associated instance
instance = await customer.get_or_create_instance()
print(f"Instance ID: {instance.instance_id}")

# Send some metrics concurrently
await asyncio.gather(
instance.send_metric("cpu_usage", 0.83),
instance.send_metric("memory_usage", 0.67),
instance.send_metric("disk_usage", 0.45),
)
print("Metrics sent successfully")

print(f"Instance ID: {instance.instance_id}")


if __name__ == "__main__":
asyncio.run(main())
10 changes: 1 addition & 9 deletions examples/sync_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Synchronous example of using the Replicated Python SDK.
"""

from replicated import InstanceStatus, ReplicatedClient
from replicated import ReplicatedClient


def main():
Expand All @@ -27,14 +27,6 @@ def main():
instance.send_metric("disk_usage", 0.45)
print("Metrics sent successfully")

# Set the instance status
instance.set_status(InstanceStatus.RUNNING)
print("Instance status set to RUNNING")

# Set the application version
instance.set_version("1.2.0")
print("Instance version set to 1.2.0")

print("Example completed successfully!")


Expand Down
4 changes: 3 additions & 1 deletion replicated/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def _get_auth_headers(self) -> Dict[str, str]:
# Try to use dynamic token first, fall back to publishable key
dynamic_token = self.state_manager.get_dynamic_token()
if dynamic_token:
return {"Authorization": f"Bearer {dynamic_token}"}
# Service tokens are sent without Bearer prefix
return {"Authorization": dynamic_token}
else:
# Publishable keys use Bearer prefix
return {"Authorization": f"Bearer {self.publishable_key}"}
4 changes: 3 additions & 1 deletion replicated/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def _get_auth_headers(self) -> Dict[str, str]:
# Try to use dynamic token first, fall back to publishable key
dynamic_token = self.state_manager.get_dynamic_token()
if dynamic_token:
return {"Authorization": f"Bearer {dynamic_token}"}
# Service tokens are sent without Bearer prefix
return {"Authorization": dynamic_token}
else:
# Publishable keys use Bearer prefix
return {"Authorization": f"Bearer {self.publishable_key}"}
5 changes: 0 additions & 5 deletions replicated/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ def _handle_response(self, response: httpx.Response) -> Dict[str, Any]:
error_message = json_body.get("message", default_msg)
error_code = json_body.get("code")

# Debug: print the full error response
print(f"DEBUG: HTTP {response.status_code} Error Response:")
print(f"DEBUG: Response body: {response.text}")
print(f"DEBUG: JSON body: {json_body}")

if response.status_code == 401:
raise ReplicatedAuthError(
message=error_message,
Expand Down
Loading