Skip to content

SASL_SSL OAUTHBEARER support for high level consumer #581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 26, 2025
Merged
1 change: 1 addition & 0 deletions config.m4
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ if test "$PHP_RDKAFKA" != "no"; then

AC_CHECK_LIB($LIBNAME,[rd_kafka_oauthbearer_set_token],[
AC_DEFINE(HAS_RD_KAFKA_OAUTHBEARER,1,[ ])
SOURCES="$SOURCES oauthbearer.c"
],[
AC_MSG_WARN([oauthbearer support is not available])
])
Expand Down
87 changes: 87 additions & 0 deletions kafka_consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include "topic.h"
#include "message.h"
#include "metadata.h"
#ifdef HAS_RD_KAFKA_OAUTHBEARER
#include "oauthbearer.h"
#endif
#if PHP_VERSION_ID < 80000
#include "kafka_consumer_legacy_arginfo.h"
#else
Expand Down Expand Up @@ -868,6 +871,90 @@ PHP_METHOD(RdKafka_KafkaConsumer, resumePartitions)
}
/* }}} */

/* {{{ proto int RdKafka::poll(int $timeout_ms)
Polls the provided kafka handle for events */
PHP_METHOD(RdKafka_KafkaConsumer, poll)
{
object_intern *intern;
zend_long timeout;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &timeout) == FAILURE) {
return;
}

intern = get_object(getThis());
if (!intern) {
return;
}

RETURN_LONG(rd_kafka_poll(intern->rk, timeout));
}
/* }}} */

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/* {{{ proto void RdKafka\KafkaConsumer::oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = [])
* Set SASL/OAUTHBEARER token and metadata
*
* The SASL/OAUTHBEARER token refresh callback or event handler should cause
* this method to be invoked upon success, via
* $consumer->oauthbearerSetToken(). The extension keys must not include the
* reserved key "`auth`", and all extension keys and values must conform to the
* required format as per https://tools.ietf.org/html/rfc7628#section-3.1:
*
* key = 1*(ALPHA)
* value = *(VCHAR / SP / HTAB / CR / LF )
*/
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken)
{
object_intern *intern;
char *token_value;
size_t token_value_len;
zval *zlifetime_ms;
int64_t lifetime_ms;
char *principal_name;
size_t principal_len;
HashTable *extensions_hash = NULL;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "szs|h", &token_value, &token_value_len, &zlifetime_ms, &principal_name, &principal_len, &extensions_hash) == FAILURE) {
return;
}

lifetime_ms = zval_to_int64(zlifetime_ms, "Argument #2 ($lifetime_ms) must be a valid integer");

intern = get_object(getThis());
if (!intern) {
return;
}

oauthbearer_set_token(intern->rk, token_value, lifetime_ms, principal_name, extensions_hash);
}
/* }}} */

/* {{{ proto void RdKafka::oauthbearerSetTokenFailure(string $error)
The SASL/OAUTHBEARER token refresh callback or event handler should cause
this method to be invoked upon failure, via
rd_kafka_oauthbearer_set_token_failure().
*/
PHP_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure)
{
object_intern *intern;
const char *errstr;
size_t errstr_len;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &errstr, &errstr_len) == FAILURE) {
return;
}

intern = get_object(getThis());
if (!intern) {
return;
}

oauthbearer_set_token_failure(intern->rk, errstr);
}
/* }}} */
#endif

void kafka_kafka_consumer_minit(INIT_FUNC_ARGS) /* {{{ */
{
ce = register_class_RdKafka_KafkaConsumer();
Expand Down
11 changes: 11 additions & 0 deletions kafka_consumer.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,15 @@ public function pausePartitions(array $topic_partitions): array {}

/** @tentative-return-type */
public function resumePartitions(array $topic_partitions): array {}

/** @tentative-return-type */
public function poll(int $timeout_ms): int {}

#ifdef HAS_RD_KAFKA_OAUTHBEARER
/** @tentative-return-type */
public function oauthbearerSetToken(string $token_value, int $lifetime_ms, string $principal_name, array $extensions = []): void {}

/** @tentative-return-type */
public function oauthbearerSetTokenFailure(string $error): void {}
#endif
}
53 changes: 48 additions & 5 deletions kafka_consumer_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */
* Stub hash: c992beb679f8970a0d6af9fb06592d8c3c4cb232 */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1)
ZEND_ARG_OBJ_INFO(0, conf, RdKafka\\Conf, 0)
Expand Down Expand Up @@ -135,6 +135,38 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions

#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 1, IS_LONG, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_poll, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, timeout_ms, IS_LONG, 0)
ZEND_END_ARG_INFO()

#if defined(HAS_RD_KAFKA_OAUTHBEARER)
#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 3, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3)
#endif
ZEND_ARG_TYPE_INFO(0, token_value, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, lifetime_ms, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(0, principal_name, IS_STRING, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 0, "[]")
ZEND_END_ARG_INFO()
#endif

#if defined(HAS_RD_KAFKA_OAUTHBEARER)
#if (PHP_VERSION_ID >= 80100)
ZEND_BEGIN_ARG_WITH_TENTATIVE_RETURN_TYPE_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 1, IS_VOID, 0)
#else
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1)
#endif
ZEND_ARG_TYPE_INFO(0, error, IS_STRING, 0)
ZEND_END_ARG_INFO()
#endif


ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
Expand Down Expand Up @@ -162,6 +194,14 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets);
ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, poll);
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken);
#endif
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure);
#endif


static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
Expand Down Expand Up @@ -191,6 +231,13 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC)
#endif
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};

Expand All @@ -199,11 +246,7 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void)
zend_class_entry ce, *class_entry;

INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods);
#if (PHP_VERSION_ID >= 80400)
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
#endif

zval property_error_cb_default_value;
ZVAL_UNDEF(&property_error_cb_default_value);
Expand Down
39 changes: 34 additions & 5 deletions kafka_consumer_legacy_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 19d0e5f9de1e91016dd8e8c87e88c3d17e0c094f */
* Stub hash: c992beb679f8970a0d6af9fb06592d8c3c4cb232 */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer___construct, 0, 0, 1)
ZEND_ARG_INFO(0, conf)
Expand Down Expand Up @@ -82,6 +82,24 @@ ZEND_END_ARG_INFO()

#define arginfo_class_RdKafka_KafkaConsumer_resumePartitions arginfo_class_RdKafka_KafkaConsumer_getOffsetPositions

#define arginfo_class_RdKafka_KafkaConsumer_poll arginfo_class_RdKafka_KafkaConsumer_consume

#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, 0, 0, 3)
ZEND_ARG_INFO(0, token_value)
ZEND_ARG_INFO(0, lifetime_ms)
ZEND_ARG_INFO(0, principal_name)
ZEND_ARG_INFO(0, extensions)
ZEND_END_ARG_INFO()
#endif

#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, 0, 0, 1)
ZEND_ARG_INFO(0, error)
ZEND_END_ARG_INFO()
#endif


ZEND_METHOD(RdKafka_KafkaConsumer, __construct);
ZEND_METHOD(RdKafka_KafkaConsumer, assign);
#if defined(HAS_RD_KAFKA_INCREMENTAL_ASSIGN)
Expand Down Expand Up @@ -109,6 +127,14 @@ ZEND_METHOD(RdKafka_KafkaConsumer, queryWatermarkOffsets);
ZEND_METHOD(RdKafka_KafkaConsumer, offsetsForTimes);
ZEND_METHOD(RdKafka_KafkaConsumer, pausePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, resumePartitions);
ZEND_METHOD(RdKafka_KafkaConsumer, poll);
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetToken);
#endif
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_METHOD(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure);
#endif


static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, __construct, arginfo_class_RdKafka_KafkaConsumer___construct, ZEND_ACC_PUBLIC)
Expand Down Expand Up @@ -138,6 +164,13 @@ static const zend_function_entry class_RdKafka_KafkaConsumer_methods[] = {
ZEND_ME(RdKafka_KafkaConsumer, offsetsForTimes, arginfo_class_RdKafka_KafkaConsumer_offsetsForTimes, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, pausePartitions, arginfo_class_RdKafka_KafkaConsumer_pausePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, resumePartitions, arginfo_class_RdKafka_KafkaConsumer_resumePartitions, ZEND_ACC_PUBLIC)
ZEND_ME(RdKafka_KafkaConsumer, poll, arginfo_class_RdKafka_KafkaConsumer_poll, ZEND_ACC_PUBLIC)
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetToken, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetToken, ZEND_ACC_PUBLIC)
#endif
#if defined(HAS_RD_KAFKA_OAUTHBEARER)
ZEND_ME(RdKafka_KafkaConsumer, oauthbearerSetTokenFailure, arginfo_class_RdKafka_KafkaConsumer_oauthbearerSetTokenFailure, ZEND_ACC_PUBLIC)
#endif
ZEND_FE_END
};

Expand All @@ -146,11 +179,7 @@ static zend_class_entry *register_class_RdKafka_KafkaConsumer(void)
zend_class_entry ce, *class_entry;

INIT_NS_CLASS_ENTRY(ce, "RdKafka", "KafkaConsumer", class_RdKafka_KafkaConsumer_methods);
#if (PHP_VERSION_ID >= 80400)
class_entry = zend_register_internal_class_with_flags(&ce, NULL, 0);
#else
class_entry = zend_register_internal_class_ex(&ce, NULL);
#endif

zval property_error_cb_default_value;
ZVAL_NULL(&property_error_cb_default_value);
Expand Down
Loading
Loading