Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6124be9
aws: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
f31189b
aws: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
567cc11
aws: optimize MSK IAM authentication and credential management
kalavt Nov 26, 2025
e609789
aws: initialize AWS provider in sync mode for MSK IAM
kalavt Nov 26, 2025
fe8c1eb
fix(aws): force credential refresh in provider refresh functions
kalavt Nov 27, 2025
b2e1dc5
Merge branch 'fluent:master' into fix/aws-msk-iam-optimization
kalavt Nov 27, 2025
4741f25
fix(aws): Minor leak on empty_payload_hex when canonical request buil…
kalavt Nov 27, 2025
84b39a5
aws: optimize MSK IAM authentication and credential management
kalavt Nov 27, 2025
afa3222
fix(aws): AWS MSK IAM authentication failures caused by stale credent…
kalavt Nov 27, 2025
b23c843
aws: optimize MSK IAM authentication and credential management
kalavt Nov 27, 2025
8a1ba68
fix(aws): AWS MSK IAM authentication failures on low traffic and Miss…
kalavt Nov 28, 2025
2b43128
fix(aws): Fix potential overflow in md_lifetime_ms on 32‑bit time_t
kalavt Nov 28, 2025
abdd42d
fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections …
kalavt Nov 28, 2025
5b21891
fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections …
kalavt Nov 28, 2025
76eb90d
fix(aws): Fix AWS MSK IAM OAuth Token Expiration on Idle Connections …
kalavt Nov 28, 2025
d039a8b
fix(aws): Fix AWS MSK IAM remove cluster_arn dependency
kalavt Nov 28, 2025
b559bb2
Merge branch 'fluent:master' into feature/rdkafka-sasl-mechanism-aws-…
kalavt Nov 28, 2025
9e10ffa
fix(aws): Fix AWS MSK IAM remove cluster_arn dependency
kalavt Nov 28, 2025
a55c115
kafka: enable AWS MSK IAM authentication
kalavt Nov 28, 2025
7de936c
kafka: enable AWS MSK IAM authentication
kalavt Nov 28, 2025
1fcb163
fix(aws): Fix use strlen for non-SDS buffer
kalavt Nov 28, 2025
56461a8
fix(aws_msk_iam): eliminate type confusion race condition in OAuth ca…
kalavt Nov 28, 2025
c01adf3
fix(kafka): fix critical concurrency and memory issues in AWS MSK IAM…
kalavt Nov 28, 2025
514b1e0
fix(kafka): Improve log clarity for cluster detection and token refresh
kalavt Nov 28, 2025
a6d39b1
feat(aws)!: AWS MSK IAM - support VPC endpoint
kalavt Nov 29, 2025
35a5e62
Merge branch 'fluent:master' into feature/rdkafka-sasl-mechanism-aws-…
kalavt Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions include/fluent-bit/aws/flb_aws_msk_iam.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@ struct flb_msk_iam_cb {

/*
* Register the oauthbearer refresh callback for MSK IAM authentication.
* Parameters:
* - config: Fluent Bit configuration
* - kconf: rdkafka configuration
* - opaque: Kafka opaque context (will be set with MSK IAM context)
* - brokers: Comma-separated list of broker addresses (used to extract AWS region)
* Returns context pointer on success or NULL on failure.
*/
struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *config,
rd_kafka_conf_t *kconf,
const char *cluster_arn,
struct flb_kafka_opaque *opaque);
struct flb_kafka_opaque *opaque,
const char *brokers);
void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx);

#endif
157 changes: 96 additions & 61 deletions plugins/in_kafka/in_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,40 +268,33 @@ static int in_kafka_init(struct flb_input_instance *ins,
return -1;
}

/* Retrieve SASL mechanism if configured */
conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
if (conf) {
ctx->sasl_mechanism = flb_sds_create(conf);
flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);

#ifdef FLB_HAVE_AWS_MSK_IAM
/*
* When MSK IAM auth is enabled, default the required
* security settings so users don't need to specify them.
*/
if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn) {
conf = flb_input_get_property("rdkafka.security.protocol", ins);
if (!conf) {
flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL");
}

conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
if (!conf) {
/* Check if using aws_msk_iam as SASL mechanism */
if (strcasecmp(conf, "aws_msk_iam") == 0) {
/* Mark that user explicitly requested AWS MSK IAM */
ctx->aws_msk_iam = FLB_TRUE;

/* Set SASL mechanism to OAUTHBEARER for librdkafka */
flb_input_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER");
flb_sds_destroy(ctx->sasl_mechanism);
ctx->sasl_mechanism = flb_sds_create("OAUTHBEARER");

/* Ensure security protocol is set */
conf = flb_input_get_property("rdkafka.security.protocol", ins);
if (!conf) {
flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL");
}

flb_plg_info(ins, "AWS MSK IAM authentication enabled via rdkafka.sasl.mechanism");
}
else {
ctx->sasl_mechanism = flb_sds_create(conf);
flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
}
}
else {
#endif

/* Retrieve SASL mechanism if configured */
conf = flb_input_get_property("rdkafka.sasl.mechanism", ins);
if (conf) {
ctx->sasl_mechanism = flb_sds_create(conf);
flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism);
}

#ifdef FLB_HAVE_AWS_MSK_IAM
}
#endif

kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1);
if (!kafka_conf) {
Expand Down Expand Up @@ -351,25 +344,45 @@ static int in_kafka_init(struct flb_input_instance *ins,
flb_kafka_opaque_set(ctx->opaque, ctx, NULL);
rd_kafka_conf_set_opaque(kafka_conf, ctx->opaque);

/*
* Enable SASL queue for all OAUTHBEARER configurations.
* This allows librdkafka to handle OAuth token refresh in a background thread,
* which is essential for idle connections or when poll intervals are large.
* This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc.
*/
if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
rd_kafka_conf_enable_sasl_queue(kafka_conf, 1);
flb_plg_debug(ins, "SASL queue enabled for OAUTHBEARER mechanism");
}

#ifdef FLB_HAVE_AWS_MSK_IAM
if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism &&
/* Only register MSK IAM if user explicitly requested it via rdkafka.sasl.mechanism=aws_msk_iam */
if (ctx->aws_msk_iam && ctx->sasl_mechanism &&
strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
flb_plg_info(ins, "registering MSK IAM authentication with cluster ARN: %s",
ctx->aws_msk_iam_cluster_arn);
ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config,
kafka_conf,
ctx->aws_msk_iam_cluster_arn,
ctx->opaque);
if (!ctx->msk_iam) {
flb_plg_error(ins, "failed to setup MSK IAM authentication");
}
else {
res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config",
"principal=admin", errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins,
"failed to set sasl.oauthbearer.config: %s",
errstr);
/* Check if brokers are configured for MSK IAM */
if (ctx->kafka.brokers &&
(strstr(ctx->kafka.brokers, ".kafka.") || strstr(ctx->kafka.brokers, ".kafka-serverless.")) &&
strstr(ctx->kafka.brokers, ".amazonaws.com")) {

/* Register MSK IAM OAuth callback - pass brokers string directly */
flb_plg_info(ins, "registering AWS MSK IAM authentication OAuth callback");
ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config,
kafka_conf,
ctx->opaque,
ctx->kafka.brokers);

if (!ctx->msk_iam) {
flb_plg_error(ins, "failed to setup MSK IAM authentication OAuth callback");
goto init_error;
}
else {
res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config",
"principal=admin", errstr, sizeof(errstr));
if (res != RD_KAFKA_CONF_OK) {
flb_plg_error(ins,
"failed to set sasl.oauthbearer.config: %s",
errstr);
}
}
}
}
Expand All @@ -380,9 +393,36 @@ static int in_kafka_init(struct flb_input_instance *ins,
/* Create Kafka consumer handle */
if (!ctx->kafka.rk) {
flb_plg_error(ins, "Failed to create new consumer: %s", errstr);
/* rd_kafka_new() did NOT take ownership on failure; kafka_conf is
* still valid and will be destroyed by init_error cleanup path. */
goto init_error;
}

/* rd_kafka_new() takes ownership of kafka_conf on success */
kafka_conf = NULL;

/*
* Enable SASL background callbacks for all OAUTHBEARER configurations.
* This ensures OAuth tokens are refreshed automatically even when:
* - Poll intervals are large
* - Topics have no messages
* - Collector is paused
* This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc.
*/
if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
rd_kafka_error_t *error;
error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk);
if (error) {
flb_plg_warn(ins, "failed to enable SASL background callbacks: %s. "
"OAuth tokens may not refresh during idle periods.",
rd_kafka_error_string(error));
rd_kafka_error_destroy(error);
}
else {
flb_plg_info(ins, "OAUTHBEARER: SASL background callbacks enabled");
}
}

/* Trigger initial token refresh for OAUTHBEARER */
rd_kafka_poll(ctx->kafka.rk, 0);

Expand Down Expand Up @@ -449,15 +489,23 @@ static int in_kafka_init(struct flb_input_instance *ins,
}
if (ctx->kafka.rk) {
rd_kafka_consumer_close(ctx->kafka.rk);
/* rd_kafka_destroy also destroys the conf that was passed to rd_kafka_new */
rd_kafka_destroy(ctx->kafka.rk);
}
else if (kafka_conf) {
/* If rd_kafka was never created, we need to destroy conf manually */
rd_kafka_conf_destroy(kafka_conf);
}
if (ctx->opaque) {
flb_kafka_opaque_destroy(ctx->opaque);
}
else if (kafka_conf) {
/* conf is already destroyed when rd_kafka is initialized */
rd_kafka_conf_destroy(kafka_conf);

#ifdef FLB_HAVE_AWS_MSK_IAM
if (ctx->msk_iam) {
flb_aws_msk_iam_destroy(ctx->msk_iam);
}
#endif

flb_sds_destroy(ctx->sasl_mechanism);
flb_free(ctx);

Expand Down Expand Up @@ -571,19 +619,6 @@ static struct flb_config_map config_map[] = {
"Rely on kafka auto-commit and commit messages in batches"
},

#ifdef FLB_HAVE_AWS_MSK_IAM
{
FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam_cluster_arn),
"ARN of the MSK cluster when using AWS IAM authentication"
},
{
FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false",
0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam),
"Enable AWS MSK IAM authentication"
},
#endif

/* EOF */
{0}
};
Expand Down
3 changes: 1 addition & 2 deletions plugins/in_kafka/in_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ struct flb_in_kafka_config {
struct flb_kafka_opaque *opaque;

#ifdef FLB_HAVE_AWS_MSK_IAM
flb_sds_t aws_msk_iam_cluster_arn;
struct flb_aws_msk_iam *msk_iam;
int aws_msk_iam; /* Flag to indicate user explicitly requested AWS MSK IAM */
#endif

/* SASL mechanism configured in rdkafka.sasl.mechanism */
int aws_msk_iam;
flb_sds_t sasl_mechanism;
};

Expand Down
13 changes: 0 additions & 13 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -678,19 +678,6 @@ static struct flb_config_map config_map[] = {
"that key will be sent to Kafka."
},

#ifdef FLB_HAVE_AWS_MSK_IAM
{
FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", NULL,
0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam_cluster_arn),
"ARN of the MSK cluster when using AWS IAM authentication"
},
{
FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false",
0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam),
"Enable AWS MSK IAM authentication"
},
#endif

/* EOF */
{0}
};
Expand Down
Loading
Loading