Python client library for KaaIoT platform REST APIs.
pip install kaa-rest-clientimport asyncio
from kaa_rest_client import KaaClient, EprClient, EptsClient
async def main():
async with KaaClient(api_key="your-api-key") as kaa:
epr = EprClient(kaa)
endpoints = await epr.list_endpoints(limit=10)
print(endpoints)
epts = EptsClient(kaa)
config = await epts.get_time_series_config()
print(config)
asyncio.run(main())Two authentication methods are supported:
| Method | Header | Env variable |
|---|---|---|
| API key (primary) | x-iamcore-api-key |
KAA_API_KEY |
| Bearer token | Authorization: Bearer ... |
KAA_BEARER_TOKEN |
# API key (recommended)
kaa = KaaClient(api_key="your-api-key")
# Bearer token
kaa = KaaClient(bearer_token="your-token")
# Custom base URL
kaa = KaaClient(base_url="https://your-instance.kaaiot.com", api_key="your-api-key")If both are provided, API key takes precedence.
All service clients use composition — pass a KaaClient instance to the constructor.
Manages endpoints, metadata, filters, and relations.
epr = EprClient(kaa)
# List endpoints
endpoints = await epr.list_endpoints(limit=10, sort="endpointId")
# Register a new endpoint
result = await epr.register_endpoint({"endpointId": "my-device", "applicationName": "my-app"})
# Metadata
metadata = await epr.get_endpoint_metadata("endpoint-id")
await epr.update_endpoint_metadata("endpoint-id", {"key": "value"})
# Filters
filters = await epr.list_filters("my-app")
await epr.create_filter("my-app", {"name": "active-devices", "query": "..."})Invokes commands on endpoints and manages batch operations.
cex = CexClient(kaa)
# Invoke a command
result = await cex.invoke_command("endpoint-id", "reboot", await_timeout=30)
# With payload
result = await cex.invoke_command("endpoint-id", "config", {"brightness": 80})
# Batch command by filter
task = await cex.batch_command_by_filter("reboot", "filter-id")Retrieves time series configuration and data.
epts = EptsClient(kaa)
# Get time series config
config = await epts.get_time_series_config(application_names="my-app")
# Get latest data points
latest = await epts.get_last_time_series("my-app")
# Get historical data
data = await epts.get_time_series_data(
"my-app",
time_series_name="temperature",
from_date="now-7d",
to_date="now",
)Manages ingest pipelines, index templates, and searches documents.
asf = AsfClient(kaa)
# Ingest pipelines
await asf.upsert_pipeline("my-pipeline", {"processors": [...]})
pipeline = await asf.get_pipeline("my-pipeline")
# Search documents
results = await asf.search_documents("tenant-id", "my-app", "2024.01.15", {
"query": {"match_all": {}}
})You can pass an external httpx.AsyncClient to share connection pools:
import httpx
async with httpx.AsyncClient() as http:
kaa = KaaClient(api_key="key", client=http)
epr = EprClient(kaa)
# ...from kaa_rest_client import KaaApiError, KaaAuthError, KaaNotFoundError
try:
endpoint = await epr.get_endpoint("nonexistent")
except KaaNotFoundError:
print("Endpoint not found")
except KaaAuthError:
print("Authentication failed")
except KaaApiError as e:
print(f"API error {e.status_code}: {e.message}")