Skip to content

feat: add Confluent v2-compatible consumer REST API#73

Merged
haochenpan merged 5 commits intomainfrom
web-refactor
Feb 28, 2026
Merged

feat: add Confluent v2-compatible consumer REST API#73
haochenpan merged 5 commits intomainfrom
web-refactor

Conversation

@haochenpan
Copy link
Owner

Summary

  • Add 13 Confluent Kafka REST Proxy v2-compatible consumer endpoints, allowing users to consume events over HTTP using only their Globus access token — no Kafka client or AWS credentials needed
  • Refactor web service: remove legacy code (web_service_v3, action_provider, common, lambda), rename web_service_v3 to web_service, consolidate tests
  • Fix auth error tuple bug, prevent orphaned Kafka topics, bump version to 0.4.6

Consumer REST API endpoints

All under /api/v3/{namespace}/consumers/...:

# Method Path Description
1 POST /{group} Create consumer instance
2 DELETE /{group}/instances/{id} Delete consumer instance
3 POST /{group}/instances/{id}/subscription Subscribe to topics
4 GET /{group}/instances/{id}/subscription Get subscription
5 DELETE /{group}/instances/{id}/subscription Unsubscribe
6 GET /{group}/instances/{id}/records Fetch records
7 POST /{group}/instances/{id}/offsets Commit offsets
8 GET /{group}/instances/{id}/offsets Get committed offsets
9 POST /{group}/instances/{id}/assignments Assign partitions
10 GET /{group}/instances/{id}/assignments Get assignments
11 POST /{group}/instances/{id}/positions Seek to offset
12 POST /{group}/instances/{id}/positions/beginning Seek to beginning
13 POST /{group}/instances/{id}/positions/end Seek to end

Key design decisions

  • Confluent v2 API compatibility: request/response shapes, dot-notation field names, HTTP status codes, and error format match the Confluent REST Proxy spec
  • Namespace-scoped isolation: Kafka group ID = {namespace}.{group_name}, topic = {namespace}.{topic}
  • Thread safety: global lock for consumer registry, per-consumer lock for operations
  • Idle consumer reaping: daemon thread cleans up consumers idle >5 min (configurable via CONSUMER_INSTANCE_TIMEOUT_MS)
  • No new dependencies: uses kafka-python's KafkaConsumer (already a dependency)

New files

  • web_service/consumer_models.py — Pydantic request models with Confluent dot-notation field aliases
  • web_service/consumer_service.py — Consumer lifecycle management (registry, MSK IAM auth, reaper thread)

Test plan

  • All 65 existing unit tests pass
  • Ruff lint and format pass
  • All 13 endpoints verified end-to-end via ConsumerRESTDemo notebook against local server + real MSK cluster

🤖 Generated with Claude Code

haochenpan and others added 4 commits February 28, 2026 13:05
- Remove action_provider/, common/, lambda/, examples/, docs/ap/
- Rename web_service_v3/ to web_service/ as the canonical service
- Merge common/ utilities into web_service/utils.py
- Move test files from web_service_v3/ to tests/ with *_test.py naming
- Remove old v2 web_service code and action_provider tests
- Update Dockerfile, CI workflows, docs, and pyproject.toml accordingly

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move DynamoDB deletion after Kafka deletion check in delete_topic()
so metadata isn't removed when Kafka deletion fails. Add direct Kafka
cleanup fallback in race condition test.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fix the "not in audience" auth error that incorrectly passed a tuple
instead of a string as the error reason. Also remove the unused
validate_name method and its NAME_MIN/NAME_MAX constants from
AuthManager, since NamespaceService.validate_name is the active one.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Implement a full Kafka consumer REST API matching the Confluent Kafka
REST Proxy v2 spec, allowing users to consume events over HTTP using
only their Globus access token — no Kafka client or AWS credentials.

Endpoints (under /api/v3/{namespace}/consumers/...):
  1. POST   /{group}                          — Create consumer instance
  2. DELETE /{group}/instances/{id}            — Delete consumer instance
  3. POST   /{group}/instances/{id}/subscription   — Subscribe to topics
  4. GET    /{group}/instances/{id}/subscription   — Get subscription
  5. DELETE /{group}/instances/{id}/subscription   — Unsubscribe
  6. GET    /{group}/instances/{id}/records         — Fetch records
  7. POST   /{group}/instances/{id}/offsets         — Commit offsets
  8. GET    /{group}/instances/{id}/offsets         — Get committed offsets
  9. POST   /{group}/instances/{id}/assignments     — Assign partitions
 10. GET    /{group}/instances/{id}/assignments     — Get assignments
 11. POST   /{group}/instances/{id}/positions       — Seek to offset
 12. POST   /{group}/instances/{id}/positions/beginning — Seek to beginning
 13. POST   /{group}/instances/{id}/positions/end       — Seek to end

New files:
- consumer_models.py: Pydantic models with Confluent dot-notation fields
- consumer_service.py: Consumer lifecycle with thread-safe registry,
  per-consumer locks, idle timeout reaping, and MSK IAM auth

Key design decisions:
- Namespace-scoped isolation (group ID = {ns}.{group}, topic = {ns}.{topic})
- Confluent-compatible request/response shapes and HTTP status codes
- Header-only auth extraction (replaces extract_val body/header hybrid)
- Blocking poll() runs in threadpool via sync def to avoid event loop stall
- Daemon reaper thread cleans up consumers idle >5min (configurable)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@haochenpan haochenpan merged commit 6b95b07 into main Feb 28, 2026
2 checks passed
@haochenpan haochenpan deleted the web-refactor branch February 28, 2026 22:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant