diff --git a/include/fluent-bit/aws/flb_aws_msk_iam.h b/include/fluent-bit/aws/flb_aws_msk_iam.h index df0ea258557..b745fa03d35 100644 --- a/include/fluent-bit/aws/flb_aws_msk_iam.h +++ b/include/fluent-bit/aws/flb_aws_msk_iam.h @@ -36,12 +36,17 @@ struct flb_msk_iam_cb { /* * Register the oauthbearer refresh callback for MSK IAM authentication. + * Parameters: + * - config: Fluent Bit configuration + * - kconf: rdkafka configuration + * - opaque: Kafka opaque context (will be set with MSK IAM context) + * - brokers: Comma-separated list of broker addresses (used to extract AWS region) * Returns context pointer on success or NULL on failure. */ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *config, rd_kafka_conf_t *kconf, - const char *cluster_arn, - struct flb_kafka_opaque *opaque); + struct flb_kafka_opaque *opaque, + const char *brokers); void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx); #endif diff --git a/plugins/in_kafka/in_kafka.c b/plugins/in_kafka/in_kafka.c index e07d7970a7c..612d3b37259 100644 --- a/plugins/in_kafka/in_kafka.c +++ b/plugins/in_kafka/in_kafka.c @@ -268,40 +268,33 @@ static int in_kafka_init(struct flb_input_instance *ins, return -1; } + /* Retrieve SASL mechanism if configured */ + conf = flb_input_get_property("rdkafka.sasl.mechanism", ins); + if (conf) { + ctx->sasl_mechanism = flb_sds_create(conf); + flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism); + #ifdef FLB_HAVE_AWS_MSK_IAM - /* - * When MSK IAM auth is enabled, default the required - * security settings so users don't need to specify them. - */ - if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn) { - conf = flb_input_get_property("rdkafka.security.protocol", ins); - if (!conf) { - flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL"); - } - - conf = flb_input_get_property("rdkafka.sasl.mechanism", ins); - if (!conf) { + /* Check if using aws_msk_iam as SASL mechanism */ + if (strcasecmp(conf, "aws_msk_iam") == 0) { + /* Mark that user explicitly requested AWS MSK IAM */ + ctx->aws_msk_iam = FLB_TRUE; + + /* Set SASL mechanism to OAUTHBEARER for librdkafka */ flb_input_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER"); + flb_sds_destroy(ctx->sasl_mechanism); ctx->sasl_mechanism = flb_sds_create("OAUTHBEARER"); + + /* Ensure security protocol is set */ + conf = flb_input_get_property("rdkafka.security.protocol", ins); + if (!conf) { + flb_input_set_property(ins, "rdkafka.security.protocol", "SASL_SSL"); + } + + flb_plg_info(ins, "AWS MSK IAM authentication enabled via rdkafka.sasl.mechanism"); } - else { - ctx->sasl_mechanism = flb_sds_create(conf); - flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism); - } - } - else { #endif - - /* Retrieve SASL mechanism if configured */ - conf = flb_input_get_property("rdkafka.sasl.mechanism", ins); - if (conf) { - ctx->sasl_mechanism = flb_sds_create(conf); - flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism); - } - -#ifdef FLB_HAVE_AWS_MSK_IAM } -#endif kafka_conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 1); if (!kafka_conf) { @@ -351,25 +344,45 @@ static int in_kafka_init(struct flb_input_instance *ins, flb_kafka_opaque_set(ctx->opaque, ctx, NULL); rd_kafka_conf_set_opaque(kafka_conf, ctx->opaque); + /* + * Enable SASL queue for all OAUTHBEARER configurations. + * This allows librdkafka to handle OAuth token refresh in a background thread, + * which is essential for idle connections or when poll intervals are large. + * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc. + */ + if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + rd_kafka_conf_enable_sasl_queue(kafka_conf, 1); + flb_plg_debug(ins, "SASL queue enabled for OAUTHBEARER mechanism"); + } + #ifdef FLB_HAVE_AWS_MSK_IAM - if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism && + /* Only register MSK IAM if user explicitly requested it via rdkafka.sasl.mechanism=aws_msk_iam */ + if (ctx->aws_msk_iam && ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { - flb_plg_info(ins, "registering MSK IAM authentication with cluster ARN: %s", - ctx->aws_msk_iam_cluster_arn); - ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, - kafka_conf, - ctx->aws_msk_iam_cluster_arn, - ctx->opaque); - if (!ctx->msk_iam) { - flb_plg_error(ins, "failed to setup MSK IAM authentication"); - } - else { - res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config", - "principal=admin", errstr, sizeof(errstr)); - if (res != RD_KAFKA_CONF_OK) { - flb_plg_error(ins, - "failed to set sasl.oauthbearer.config: %s", - errstr); + /* Check if brokers are configured for MSK IAM */ + if (ctx->kafka.brokers && + (strstr(ctx->kafka.brokers, ".kafka.") || strstr(ctx->kafka.brokers, ".kafka-serverless.")) && + strstr(ctx->kafka.brokers, ".amazonaws.com")) { + + /* Register MSK IAM OAuth callback - pass brokers string directly */ + flb_plg_info(ins, "registering AWS MSK IAM authentication OAuth callback"); + ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, + kafka_conf, + ctx->opaque, + ctx->kafka.brokers); + + if (!ctx->msk_iam) { + flb_plg_error(ins, "failed to setup MSK IAM authentication OAuth callback"); + goto init_error; + } + else { + res = rd_kafka_conf_set(kafka_conf, "sasl.oauthbearer.config", + "principal=admin", errstr, sizeof(errstr)); + if (res != RD_KAFKA_CONF_OK) { + flb_plg_error(ins, + "failed to set sasl.oauthbearer.config: %s", + errstr); + } } } } @@ -380,9 +393,36 @@ static int in_kafka_init(struct flb_input_instance *ins, /* Create Kafka consumer handle */ if (!ctx->kafka.rk) { flb_plg_error(ins, "Failed to create new consumer: %s", errstr); + /* rd_kafka_new() did NOT take ownership on failure; kafka_conf is + * still valid and will be destroyed by init_error cleanup path. */ goto init_error; } + /* rd_kafka_new() takes ownership of kafka_conf on success */ + kafka_conf = NULL; + + /* + * Enable SASL background callbacks for all OAUTHBEARER configurations. + * This ensures OAuth tokens are refreshed automatically even when: + * - Poll intervals are large + * - Topics have no messages + * - Collector is paused + * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc. + */ + if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + rd_kafka_error_t *error; + error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk); + if (error) { + flb_plg_warn(ins, "failed to enable SASL background callbacks: %s. " + "OAuth tokens may not refresh during idle periods.", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } + else { + flb_plg_info(ins, "OAUTHBEARER: SASL background callbacks enabled"); + } + } + /* Trigger initial token refresh for OAUTHBEARER */ rd_kafka_poll(ctx->kafka.rk, 0); @@ -449,15 +489,23 @@ static int in_kafka_init(struct flb_input_instance *ins, } if (ctx->kafka.rk) { rd_kafka_consumer_close(ctx->kafka.rk); + /* rd_kafka_destroy also destroys the conf that was passed to rd_kafka_new */ rd_kafka_destroy(ctx->kafka.rk); } + else if (kafka_conf) { + /* If rd_kafka was never created, we need to destroy conf manually */ + rd_kafka_conf_destroy(kafka_conf); + } if (ctx->opaque) { flb_kafka_opaque_destroy(ctx->opaque); } - else if (kafka_conf) { - /* conf is already destroyed when rd_kafka is initialized */ - rd_kafka_conf_destroy(kafka_conf); + +#ifdef FLB_HAVE_AWS_MSK_IAM + if (ctx->msk_iam) { + flb_aws_msk_iam_destroy(ctx->msk_iam); } +#endif + flb_sds_destroy(ctx->sasl_mechanism); flb_free(ctx); @@ -571,19 +619,6 @@ static struct flb_config_map config_map[] = { "Rely on kafka auto-commit and commit messages in batches" }, -#ifdef FLB_HAVE_AWS_MSK_IAM - { - FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", (char *)NULL, - 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam_cluster_arn), - "ARN of the MSK cluster when using AWS IAM authentication" - }, - { - FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false", - 0, FLB_TRUE, offsetof(struct flb_in_kafka_config, aws_msk_iam), - "Enable AWS MSK IAM authentication" - }, -#endif - /* EOF */ {0} }; diff --git a/plugins/in_kafka/in_kafka.h b/plugins/in_kafka/in_kafka.h index 096cf1c561b..8319b08ec82 100644 --- a/plugins/in_kafka/in_kafka.h +++ b/plugins/in_kafka/in_kafka.h @@ -55,12 +55,11 @@ struct flb_in_kafka_config { struct flb_kafka_opaque *opaque; #ifdef FLB_HAVE_AWS_MSK_IAM - flb_sds_t aws_msk_iam_cluster_arn; struct flb_aws_msk_iam *msk_iam; + int aws_msk_iam; /* Flag to indicate user explicitly requested AWS MSK IAM */ #endif /* SASL mechanism configured in rdkafka.sasl.mechanism */ - int aws_msk_iam; flb_sds_t sasl_mechanism; }; diff --git a/plugins/out_kafka/kafka.c b/plugins/out_kafka/kafka.c index dadd4725f74..b6ff6f45307 100644 --- a/plugins/out_kafka/kafka.c +++ b/plugins/out_kafka/kafka.c @@ -678,19 +678,6 @@ static struct flb_config_map config_map[] = { "that key will be sent to Kafka." }, -#ifdef FLB_HAVE_AWS_MSK_IAM - { - FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", NULL, - 0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam_cluster_arn), - "ARN of the MSK cluster when using AWS IAM authentication" - }, - { - FLB_CONFIG_MAP_BOOL, "aws_msk_iam", "false", - 0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam), - "Enable AWS MSK IAM authentication" - }, -#endif - /* EOF */ {0} }; diff --git a/plugins/out_kafka/kafka_config.c b/plugins/out_kafka/kafka_config.c index b4bb9be6acf..287e61c7ba4 100644 --- a/plugins/out_kafka/kafka_config.c +++ b/plugins/out_kafka/kafka_config.c @@ -58,37 +58,33 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, return NULL; } + /* Retrieve SASL mechanism if configured */ + tmp = flb_output_get_property("rdkafka.sasl.mechanism", ins); + if (tmp) { + ctx->sasl_mechanism = flb_sds_create(tmp); + flb_plg_info(ins, "SASL mechanism configured: %s", ctx->sasl_mechanism); + #ifdef FLB_HAVE_AWS_MSK_IAM - /* - * When MSK IAM auth is enabled, default the required - * security settings so users don't need to specify them. - */ - if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn) { - tmp = flb_output_get_property("rdkafka.security.protocol", ins); - if (!tmp) { - flb_output_set_property(ins, "rdkafka.security.protocol", "SASL_SSL"); - } - - tmp = flb_output_get_property("rdkafka.sasl.mechanism", ins); - if (!tmp) { + /* Check if using aws_msk_iam as SASL mechanism */ + if (strcasecmp(tmp, "aws_msk_iam") == 0) { + /* Mark that user explicitly requested AWS MSK IAM */ + ctx->aws_msk_iam = FLB_TRUE; + + /* Set SASL mechanism to OAUTHBEARER for librdkafka */ flb_output_set_property(ins, "rdkafka.sasl.mechanism", "OAUTHBEARER"); + flb_sds_destroy(ctx->sasl_mechanism); ctx->sasl_mechanism = flb_sds_create("OAUTHBEARER"); + + /* Ensure security protocol is set */ + tmp = flb_output_get_property("rdkafka.security.protocol", ins); + if (!tmp) { + flb_output_set_property(ins, "rdkafka.security.protocol", "SASL_SSL"); + } + + flb_plg_info(ins, "AWS MSK IAM authentication enabled via rdkafka.sasl.mechanism"); } - else { - ctx->sasl_mechanism = flb_sds_create(tmp); - } - } - else { #endif - /* Retrieve SASL mechanism if configured */ - tmp = flb_output_get_property("rdkafka.sasl.mechanism", ins); - if (tmp) { - ctx->sasl_mechanism = flb_sds_create(tmp); - } - -#ifdef FLB_HAVE_AWS_MSK_IAM } -#endif /* rdkafka config context */ ctx->conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 0); @@ -210,18 +206,38 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, flb_kafka_opaque_set(ctx->opaque, ctx, NULL); rd_kafka_conf_set_opaque(ctx->conf, ctx->opaque); + /* + * Enable SASL queue for all OAUTHBEARER configurations. + * This allows librdkafka to handle OAuth token refresh in a background thread, + * which is essential for idle connections where rd_kafka_poll() is not called. + * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc. + */ + if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + rd_kafka_conf_enable_sasl_queue(ctx->conf, 1); + flb_plg_debug(ins, "SASL queue enabled for OAUTHBEARER mechanism"); + } + #ifdef FLB_HAVE_AWS_MSK_IAM - if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism && + /* Only register MSK IAM if user explicitly requested it via rdkafka.sasl.mechanism=aws_msk_iam */ + if (ctx->aws_msk_iam && ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { - - ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, - ctx->conf, - ctx->aws_msk_iam_cluster_arn, - ctx->opaque); - if (!ctx->msk_iam) { - flb_plg_error(ctx->ins, "failed to setup MSK IAM authentication"); - } - else { + /* Check if brokers are configured for MSK IAM */ + if (ctx->kafka.brokers && + (strstr(ctx->kafka.brokers, ".kafka.") || strstr(ctx->kafka.brokers, ".kafka-serverless.")) && + strstr(ctx->kafka.brokers, ".amazonaws.com")) { + + /* Register MSK IAM OAuth callback - pass brokers string directly */ + flb_plg_info(ins, "registering AWS MSK IAM authentication OAuth callback"); + ctx->msk_iam = flb_aws_msk_iam_register_oauth_cb(config, + ctx->conf, + ctx->opaque, + ctx->kafka.brokers); + if (!ctx->msk_iam) { + flb_plg_error(ctx->ins, "failed to setup MSK IAM authentication OAuth callback"); + flb_out_kafka_destroy(ctx); + return NULL; + } + res = rd_kafka_conf_set(ctx->conf, "sasl.oauthbearer.config", "principal=admin", errstr, sizeof(errstr)); if (res != RD_KAFKA_CONF_OK) { @@ -236,13 +252,38 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins, /* Kafka Producer */ ctx->kafka.rk = rd_kafka_new(RD_KAFKA_PRODUCER, ctx->conf, errstr, sizeof(errstr)); + if (!ctx->kafka.rk) { flb_plg_error(ctx->ins, "failed to create producer: %s", errstr); + /* rd_kafka_new() did NOT take ownership on failure; ctx->conf is + * still valid and will be destroyed by flb_out_kafka_destroy(). */ flb_out_kafka_destroy(ctx); return NULL; } + /* rd_kafka_new() takes ownership of ctx->conf on success */ + ctx->conf = NULL; + + /* + * Enable SASL background callbacks for all OAUTHBEARER configurations. + * This ensures OAuth tokens are refreshed automatically even on idle connections. + * This benefits all OAUTHBEARER methods: AWS IAM, OIDC, custom OAuth, etc. + */ + if (ctx->sasl_mechanism && strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) { + rd_kafka_error_t *error; + error = rd_kafka_sasl_background_callbacks_enable(ctx->kafka.rk); + if (error) { + flb_plg_warn(ctx->ins, "failed to enable SASL background callbacks: %s. " + "OAuth tokens may not refresh on idle connections.", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } + else { + flb_plg_info(ctx->ins, "OAUTHBEARER: SASL background callbacks enabled"); + } + } + #ifdef FLB_HAVE_AVRO_ENCODER /* Config AVRO */ tmp = flb_output_get_property("schema_str", ins); @@ -301,8 +342,13 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx) flb_kafka_topic_destroy_all(ctx); if (ctx->kafka.rk) { + /* rd_kafka_destroy also destroys the conf that was passed to rd_kafka_new */ rd_kafka_destroy(ctx->kafka.rk); } + else if (ctx->conf) { + /* If rd_kafka was never created, we need to destroy conf manually */ + rd_kafka_conf_destroy(ctx->conf); + } if (ctx->opaque) { flb_kafka_opaque_destroy(ctx->opaque); diff --git a/plugins/out_kafka/kafka_config.h b/plugins/out_kafka/kafka_config.h index e1ebc04e65c..57bd6ae92f7 100644 --- a/plugins/out_kafka/kafka_config.h +++ b/plugins/out_kafka/kafka_config.h @@ -126,12 +126,10 @@ struct flb_out_kafka { #endif #ifdef FLB_HAVE_AWS_MSK_IAM - flb_sds_t aws_msk_iam_cluster_arn; struct flb_aws_msk_iam *msk_iam; + int aws_msk_iam; /* Flag to indicate user explicitly requested AWS MSK IAM */ #endif - int aws_msk_iam; - struct flb_kafka_opaque *opaque; /* SASL mechanism configured in rdkafka.sasl.mechanism */ diff --git a/src/aws/flb_aws_credentials_ec2.c b/src/aws/flb_aws_credentials_ec2.c index 2722e26d223..9aa1444f1fb 100644 --- a/src/aws/flb_aws_credentials_ec2.c +++ b/src/aws/flb_aws_credentials_ec2.c @@ -130,6 +130,7 @@ int refresh_fn_ec2(struct flb_aws_provider *provider) { int ret = -1; flb_debug("[aws_credentials] Refresh called on the EC2 IMDS provider"); + if (try_lock_provider(provider)) { ret = get_creds_ec2(implementation); unlock_provider(provider); diff --git a/src/aws/flb_aws_credentials_profile.c b/src/aws/flb_aws_credentials_profile.c index 48cb9299572..7ad7099ff45 100644 --- a/src/aws/flb_aws_credentials_profile.c +++ b/src/aws/flb_aws_credentials_profile.c @@ -663,8 +663,7 @@ static int get_shared_credentials(char* credentials_path, if (flb_read_file(credentials_path, &buf, &size) < 0) { if (errno == ENOENT) { - AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Shared credentials file %s does not exist", - credentials_path); + AWS_CREDS_DEBUG("Shared credentials file %s does not exist", credentials_path); } else { flb_errno(); AWS_CREDS_ERROR_OR_DEBUG(debug_only, "Could not read shared credentials file %s", diff --git a/src/aws/flb_aws_credentials_sts.c b/src/aws/flb_aws_credentials_sts.c index 554fac20353..155a41d3998 100644 --- a/src/aws/flb_aws_credentials_sts.c +++ b/src/aws/flb_aws_credentials_sts.c @@ -175,7 +175,7 @@ int refresh_fn_sts(struct flb_aws_provider *provider) { struct flb_aws_provider_sts *implementation = provider->implementation; flb_debug("[aws_credentials] Refresh called on the STS provider"); - + if (try_lock_provider(provider)) { ret = sts_assume_role_request(implementation->sts_client, &implementation->creds, implementation->uri, @@ -480,6 +480,7 @@ int refresh_fn_eks(struct flb_aws_provider *provider) { struct flb_aws_provider_eks *implementation = provider->implementation; flb_debug("[aws_credentials] Refresh called on the EKS provider"); + if (try_lock_provider(provider)) { ret = assume_with_web_identity(implementation); unlock_provider(provider); diff --git a/src/aws/flb_aws_msk_iam.c b/src/aws/flb_aws_msk_iam.c index cf8af7d0cc8..4be3ea7e261 100644 --- a/src/aws/flb_aws_msk_iam.c +++ b/src/aws/flb_aws_msk_iam.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include @@ -36,15 +37,23 @@ #include #include #include +#include + +/* + * OAuth token lifetime of 5 minutes (industry standard). + * Matches AWS Go SDK and Kafka Connect implementations. + */ +#define MSK_IAM_TOKEN_LIFETIME_SECONDS 300 -/* Lightweight config - NO persistent AWS provider */ struct flb_aws_msk_iam { - struct flb_config *flb_config; /* For creating AWS provider on-demand */ + struct flb_config *flb_config; flb_sds_t region; - flb_sds_t cluster_arn; + int is_serverless; /* Flag to indicate if this is MSK Serverless */ + struct flb_tls *cred_tls; + struct flb_aws_provider *provider; + pthread_mutex_t lock; /* Protects credential provider access from concurrent threads */ }; -/* Utility functions - same as before */ static int to_encode(char c) { if ((c >= '0' && c <= '9') || @@ -125,49 +134,88 @@ static int hmac_sha256_sign(unsigned char out[32], return 0; } -static char *extract_region(const char *arn) +/* Extract region from MSK broker address + * Supported formats: + * - MSK Standard: b-1.example.c1.kafka..amazonaws.com:port + * - MSK Serverless: boot-.c.kafka-serverless..amazonaws.com:port + * - VPC Endpoint: vpce-.kafka..vpce.amazonaws.com:port + */ +static flb_sds_t extract_region_from_broker(const char *broker) { const char *p; - const char *r; + const char *start; + const char *end; + const char *port_pos; size_t len; - char *out; - - /* arn:partition:service:region:... */ - p = strchr(arn, ':'); - if (!p) { + flb_sds_t out; + + if (!broker || strlen(broker) == 0) { return NULL; } - p = strchr(p + 1, ':'); - if (!p) { + + /* Remove port if present (e.g., :9098) */ + port_pos = strchr(broker, ':'); + if (port_pos) { + len = port_pos - broker; + } else { + len = strlen(broker); + } + + /* Find .amazonaws.com */ + p = strstr(broker, ".amazonaws.com"); + if (!p || p >= broker + len) { return NULL; } - p = strchr(p + 1, ':'); - if (!p) { + + /* Region is between the last dot before .amazonaws.com and .amazonaws.com + * Handle VPC endpoints (vpce-xxx.kafka.region.vpce.amazonaws.com) + * Example formats: + * Standard: ...kafka.us-east-1.amazonaws.com + * Serverless: ...kafka-serverless.us-east-1.amazonaws.com + * VPC Endpoint: ...kafka.us-east-1.vpce.amazonaws.com + */ + end = p; /* Points to .amazonaws.com */ + + /* Check for VPC endpoint format: .vpce.amazonaws.com */ + if (p >= broker + 5 && strncmp(p - 5, ".vpce", 5) == 0) { + /* For VPC endpoints, region ends at .vpce */ + end = p - 5; + } + + /* Find the start of region by going backwards to find the previous dot */ + start = end - 1; + while (start > broker && *start != '.') { + start--; + } + + if (*start == '.') { + start++; /* Skip the dot */ + } + + if (start >= end) { return NULL; } - - r = p + 1; - p = strchr(r, ':'); - if (!p) { + + len = end - start; + + /* Sanity check on region length (AWS regions are typically 9-20 chars) */ + if (len == 0 || len > 32) { return NULL; } - len = p - r; - out = flb_malloc(len + 1); + + out = flb_sds_create_len(start, len); if (!out) { return NULL; } - memcpy(out, r, len); - out[len] = '\0'; - + return out; } -/* Stateless payload generator - creates AWS provider on demand */ +/* Payload generator - builds MSK IAM authentication payload */ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, - const char *host) + const char *host, + struct flb_aws_credentials *creds) { - struct flb_aws_provider *temp_provider = NULL; - struct flb_aws_credentials *creds = NULL; flb_sds_t payload = NULL; int encode_result; char *p; @@ -205,46 +253,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, /* Validate inputs */ if (!config || !config->region || flb_sds_len(config->region) == 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: region is not set or invalid"); + flb_error("[aws_msk_iam] region is not set or invalid"); return NULL; } if (!host || strlen(host) == 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: host is required"); - return NULL; - } - - flb_info("[aws_msk_iam] build_msk_iam_payload: generating payload for host: %s, region: %s", - host, config->region); - - /* Create AWS provider on-demand */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (!temp_provider) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create AWS credentials provider"); - return NULL; - } - - if (temp_provider->provider_vtable->init(temp_provider) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to initialize AWS credentials provider"); - flb_aws_provider_destroy(temp_provider); - return NULL; - } - - /* Get credentials */ - creds = temp_provider->provider_vtable->get_credentials(temp_provider); - if (!creds) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to get credentials"); - flb_aws_provider_destroy(temp_provider); + flb_error("[aws_msk_iam] host is required"); return NULL; } - if (!creds->access_key_id || !creds->secret_access_key) { - flb_error("[aws_msk_iam] build_msk_iam_payload: incomplete credentials"); - flb_aws_credentials_destroy(creds); - flb_aws_provider_destroy(temp_provider); + if (!creds || !creds->access_key_id || !creds->secret_access_key) { + flb_error("[aws_msk_iam] invalid or incomplete credentials"); return NULL; } @@ -269,19 +288,17 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* CRITICAL: Encode the action parameter */ action_enc = uri_encode_params("kafka-cluster:Connect", 21); if (!action_enc) { goto error; } - /* Build canonical query string with ACTION parameter first (alphabetical order) */ + /* Build canonical query string */ query = flb_sds_create_size(8192); if (!query) { goto error; } - /* note: Action must be FIRST in alphabetical order */ query = flb_sds_printf(&query, "Action=%s&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=%s" "&X-Amz-Date=%s&X-Amz-Expires=900", @@ -290,27 +307,23 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* Add session token if present (before SignedHeaders alphabetically) */ + /* Add session token if present */ if (creds->session_token && flb_sds_len(creds->session_token) > 0) { session_token_enc = uri_encode_params(creds->session_token, flb_sds_len(creds->session_token)); if (!session_token_enc) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to encode session token"); goto error; } tmp = flb_sds_printf(&query, "&X-Amz-Security-Token=%s", session_token_enc); if (!tmp) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to append session token to query"); goto error; } query = tmp; } - /* Add SignedHeaders LAST (alphabetically after Security-Token) */ tmp = flb_sds_printf(&query, "&X-Amz-SignedHeaders=host"); if (!tmp) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to append SignedHeaders"); goto error; } query = tmp; @@ -321,10 +334,8 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* CRITICAL: MSK IAM canonical request format - use SHA256 of empty string, not UNSIGNED-PAYLOAD */ if (flb_hash_simple(FLB_HASH_SHA256, (unsigned char *) "", 0, empty_payload_hash, sizeof(empty_payload_hash)) != FLB_CRYPTO_SUCCESS) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to hash empty payload"); goto error; } @@ -338,17 +349,15 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, query, host, empty_payload_hex); flb_sds_destroy(empty_payload_hex); - empty_payload_hex = NULL; /* Prevent double-free */ + empty_payload_hex = NULL; if (!canonical) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to build canonical request"); goto error; } - /* Hash canonical request immediately */ + /* Hash canonical request */ if (flb_hash_simple(FLB_HASH_SHA256, (unsigned char *) canonical, flb_sds_len(canonical), sha256_buf, sizeof(sha256_buf)) != FLB_CRYPTO_SUCCESS) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to hash canonical request"); goto error; } @@ -384,34 +393,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, len = strlen(datestamp); if (hmac_sha256_sign(key_date, (unsigned char *) key, flb_sds_len(key), (unsigned char *) datestamp, len) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign date"); goto error; } - /* Clean up key immediately after use - prevent double-free */ flb_sds_destroy(key); key = NULL; len = strlen(config->region); if (hmac_sha256_sign(key_region, key_date, 32, (unsigned char *) config->region, len) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign region"); goto error; } if (hmac_sha256_sign(key_service, key_region, 32, (unsigned char *) "kafka-cluster", 13) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign service"); goto error; } if (hmac_sha256_sign(key_signing, key_service, 32, (unsigned char *) "aws4_request", 12) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to create signing key"); goto error; } if (hmac_sha256_sign(sig, key_signing, 32, (unsigned char *) string_to_sign, flb_sds_len(string_to_sign)) != 0) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to sign request"); goto error; } @@ -420,85 +423,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, goto error; } - /* Append signature to query */ tmp = flb_sds_printf(&query, "&X-Amz-Signature=%s", hexsig); if (!tmp) { goto error; } query = tmp; - /* Build the complete presigned URL */ - presigned_url = flb_sds_create_size(16384); - if (!presigned_url) { - goto error; - } - - presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s", host, query); - if (!presigned_url) { - goto error; - } - - /* Base64 URL encode the presigned URL */ - url_len = flb_sds_len(presigned_url); - encoded_len = ((url_len + 2) / 3) * 4 + 1; /* Base64 encoding size + null terminator */ - - payload = flb_sds_create_size(encoded_len); - if (!payload) { - goto error; - } - - encode_result = flb_base64_encode((unsigned char*) payload, encoded_len, &actual_encoded_len, - (const unsigned char*) presigned_url, url_len); - if (encode_result == -1) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to base64 encode URL"); - goto error; - } - flb_sds_len_set(payload, actual_encoded_len); - - /* Convert to Base64 URL encoding (replace + with -, / with _, remove padding =) */ - p = payload; - while (*p) { - if (*p == '+') { - *p = '-'; - } - else if (*p == '/') { - *p = '_'; - } - p++; - } - - /* Remove padding */ - len = flb_sds_len(payload); - while (len > 0 && payload[len-1] == '=') { - len--; - } - flb_sds_len_set(payload, len); - payload[len] = '\0'; - - /* Build the complete presigned URL */ - flb_sds_destroy(presigned_url); + /* Build complete presigned URL */ presigned_url = flb_sds_create_size(16384); if (!presigned_url) { goto error; } - presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s", host, query); + presigned_url = flb_sds_printf(&presigned_url, "https://%s/?%s&User-Agent=fluent-bit-msk-iam", + host, query); if (!presigned_url) { goto error; } - /* Add User-Agent parameter to the signed URL (like Go implementation) */ - tmp = flb_sds_printf(&presigned_url, "&User-Agent=fluent-bit-msk-iam"); - if (!tmp) { - goto error; - } - presigned_url = tmp; - - /* Base64 URL encode the presigned URL (RawURLEncoding - no padding like Go) */ + /* Base64 URL encode */ url_len = flb_sds_len(presigned_url); - encoded_len = ((url_len + 2) / 3) * 4 + 1; /* Base64 encoding size + null terminator */ + encoded_len = ((url_len + 2) / 3) * 4 + 1; - flb_sds_destroy(payload); payload = flb_sds_create_size(encoded_len); if (!payload) { goto error; @@ -507,14 +453,12 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, encode_result = flb_base64_encode((unsigned char*) payload, encoded_len, &actual_encoded_len, (const unsigned char *) presigned_url, url_len); if (encode_result == -1) { - flb_error("[aws_msk_iam] build_msk_iam_payload: failed to base64 encode URL"); goto error; } - /* Update the SDS length to match actual encoded length */ flb_sds_len_set(payload, actual_encoded_len); - /* Convert to Base64 URL encoding AND remove padding (RawURLEncoding like Go) */ + /* Convert to Base64 URL encoding and remove padding */ p = payload; while (*p) { if (*p == '+') { @@ -526,7 +470,6 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, p++; } - /* Remove ALL padding (RawURLEncoding) */ final_len = flb_sds_len(payload); while (final_len > 0 && payload[final_len-1] == '=') { final_len--; @@ -534,7 +477,7 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, flb_sds_len_set(payload, final_len); payload[final_len] = '\0'; - /* Clean up before successful return */ + /* Clean up */ flb_sds_destroy(credential); flb_sds_destroy(credential_enc); flb_sds_destroy(canonical); @@ -547,65 +490,28 @@ static flb_sds_t build_msk_iam_payload(struct flb_aws_msk_iam *config, if (session_token_enc) { flb_sds_destroy(session_token_enc); } - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } return payload; error: - /* Clean up everything - check for NULL to prevent double-free */ - if (credential) { - flb_sds_destroy(credential); - } - if (credential_enc) { - flb_sds_destroy(credential_enc); - } - if (canonical) { - flb_sds_destroy(canonical); - } - if (hexhash) { - flb_sds_destroy(hexhash); - } - if (string_to_sign) { - flb_sds_destroy(string_to_sign); - } - if (hexsig) { - flb_sds_destroy(hexsig); - } - if (query) { - flb_sds_destroy(query); - } - if (action_enc) { - flb_sds_destroy(action_enc); - } - if (presigned_url) { - flb_sds_destroy(presigned_url); - } - if (key) { /* Only destroy if not already destroyed */ - flb_sds_destroy(key); - } - if (payload) { - flb_sds_destroy(payload); - } - if (session_token_enc) { - flb_sds_destroy(session_token_enc); - } - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); - } + if (credential) flb_sds_destroy(credential); + if (credential_enc) flb_sds_destroy(credential_enc); + if (canonical) flb_sds_destroy(canonical); + if (hexhash) flb_sds_destroy(hexhash); + if (string_to_sign) flb_sds_destroy(string_to_sign); + if (hexsig) flb_sds_destroy(hexsig); + if (query) flb_sds_destroy(query); + if (action_enc) flb_sds_destroy(action_enc); + if (presigned_url) flb_sds_destroy(presigned_url); + if (key) flb_sds_destroy(key); + if (payload) flb_sds_destroy(payload); + if (session_token_enc) flb_sds_destroy(session_token_enc); + if (empty_payload_hex) flb_sds_destroy(empty_payload_hex); return NULL; } - -/* Stateless callback - creates AWS provider on-demand for each refresh */ +/* OAuth token refresh callback */ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque) @@ -614,101 +520,103 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, flb_sds_t payload = NULL; rd_kafka_resp_err_t err; char errstr[512]; - int64_t now; + time_t now; int64_t md_lifetime_ms; - const char *s3_suffix = "-s3"; - size_t arn_len; - size_t suffix_len; struct flb_aws_msk_iam *config; struct flb_aws_credentials *creds = NULL; struct flb_kafka_opaque *kafka_opaque; - struct flb_aws_provider *temp_provider = NULL; (void) oauthbearer_config; kafka_opaque = (struct flb_kafka_opaque *) opaque; if (!kafka_opaque || !kafka_opaque->msk_iam_ctx) { - flb_error("[aws_msk_iam] oauthbearer_token_refresh_cb: invalid opaque context"); + flb_error("[aws_msk_iam] invalid opaque context"); rd_kafka_oauthbearer_set_token_failure(rk, "invalid context"); return; } - flb_debug("[aws_msk_iam] running OAuth bearer token refresh callback"); - - /* get the msk_iam config (not persistent context!) */ config = kafka_opaque->msk_iam_ctx; - /* validate region (mandatory) */ if (!config->region || flb_sds_len(config->region) == 0) { - flb_error("[aws_msk_iam] region is not set or invalid"); + flb_error("[aws_msk_iam] region is not set"); rd_kafka_oauthbearer_set_token_failure(rk, "region not set"); return; } - /* Determine host endpoint */ - if (config->cluster_arn) { - arn_len = strlen(config->cluster_arn); - suffix_len = strlen(s3_suffix); - if (arn_len >= suffix_len && strcmp(config->cluster_arn + arn_len - suffix_len, s3_suffix) == 0) { - snprintf(host, sizeof(host), "kafka-serverless.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] MSK Serverless cluster, using generic endpoint: %s", host); - } - else { - snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); - } + /* Determine MSK endpoint based on cluster type */ + if (config->is_serverless) { + snprintf(host, sizeof(host), "kafka-serverless.%s.amazonaws.com", config->region); } else { snprintf(host, sizeof(host), "kafka.%s.amazonaws.com", config->region); - flb_info("[aws_msk_iam] Regular MSK cluster, using generic endpoint: %s", host); } - flb_info("[aws_msk_iam] requesting MSK IAM payload for region: %s, host: %s", config->region, host); + flb_debug("[aws_msk_iam] OAuth token refresh callback triggered"); + + /* + * CRITICAL CONCURRENCY FIX: + * Lock the credential provider to prevent race conditions. + * The librdkafka refresh callback executes in its internal thread context, + * while Fluent Bit may access the same provider from other threads. + * Without synchronization, concurrent refresh/get_credentials calls can + * corrupt provider state and cause authentication failures. + */ + pthread_mutex_lock(&config->lock); - /* Generate payload using stateless function - creates and destroys AWS provider internally */ - payload = build_msk_iam_payload(config, host); + /* Refresh credentials */ + if (config->provider->provider_vtable->refresh(config->provider) < 0) { + pthread_mutex_unlock(&config->lock); + flb_warn("[aws_msk_iam] credential refresh failed, will retry on next callback"); + rd_kafka_oauthbearer_set_token_failure(rk, "credential refresh failed"); + return; + } + + /* Get credentials */ + creds = config->provider->provider_vtable->get_credentials(config->provider); + if (!creds) { + pthread_mutex_unlock(&config->lock); + flb_error("[aws_msk_iam] failed to get AWS credentials from provider"); + rd_kafka_oauthbearer_set_token_failure(rk, "credential retrieval failed"); + return; + } + + /* Unlock immediately after getting credentials - no need to hold lock during payload generation */ + pthread_mutex_unlock(&config->lock); + + /* Generate payload */ + payload = build_msk_iam_payload(config, host, creds); if (!payload) { flb_error("[aws_msk_iam] failed to generate MSK IAM payload"); + flb_aws_credentials_destroy(creds); rd_kafka_oauthbearer_set_token_failure(rk, "payload generation failed"); return; } - /* Get credentials for principal (create temporary provider just for this) */ - temp_provider = flb_standard_chain_provider_create(config->flb_config, NULL, - config->region, NULL, NULL, - flb_aws_client_generator(), - NULL); - if (temp_provider) { - if (temp_provider->provider_vtable->init(temp_provider) == 0) { - creds = temp_provider->provider_vtable->get_credentials(temp_provider); - } - } - + /* + * Set OAuth token with fixed 5-minute lifetime (AWS industry standard). + * librdkafka's background thread will automatically trigger a refresh callback + * at 80% of the token's lifetime (4 minutes) to ensure the token never expires, + * even on completely idle connections. + */ now = time(NULL); - md_lifetime_ms = (now + 900) * 1000; + md_lifetime_ms = ((int64_t)now + MSK_IAM_TOKEN_LIFETIME_SECONDS) * 1000; err = rd_kafka_oauthbearer_set_token(rk, payload, md_lifetime_ms, - creds ? creds->access_key_id : "unknown", + creds->access_key_id, NULL, 0, errstr, sizeof(errstr)); + flb_aws_credentials_destroy(creds); + if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { flb_error("[aws_msk_iam] failed to set OAuth bearer token: %s", errstr); rd_kafka_oauthbearer_set_token_failure(rk, errstr); } else { - flb_info("[aws_msk_iam] OAuth bearer token successfully set"); - } - - /* Clean up everything immediately - no memory leaks possible! */ - if (creds) { - flb_aws_credentials_destroy(creds); - } - if (temp_provider) { - flb_aws_provider_destroy(temp_provider); + flb_info("[aws_msk_iam] OAuth bearer token refreshed"); } if (payload) { @@ -716,86 +624,163 @@ static void oauthbearer_token_refresh_cb(rd_kafka_t *rk, } } -/* Register callback with lightweight config - keeps your current interface */ +/* Register OAuth callback */ struct flb_aws_msk_iam *flb_aws_msk_iam_register_oauth_cb(struct flb_config *config, rd_kafka_conf_t *kconf, - const char *cluster_arn, - struct flb_kafka_opaque *opaque) + struct flb_kafka_opaque *opaque, + const char *brokers) { struct flb_aws_msk_iam *ctx; - char *region_str; - - flb_info("[aws_msk_iam] registering OAuth callback with cluster ARN: %s", cluster_arn); + flb_sds_t region_str = NULL; + char *first_broker = NULL; + char *comma; - if (!cluster_arn) { - flb_error("[aws_msk_iam] cluster ARN is required"); + /* Validate inputs */ + if (!opaque) { + flb_error("[aws_msk_iam] opaque context is required"); return NULL; } - /* Allocate lightweight config - NO AWS provider! */ + if (!brokers || strlen(brokers) == 0) { + flb_error("[aws_msk_iam] brokers configuration is required for region extraction"); + return NULL; + } + + /* Extract first broker from comma-separated list */ + first_broker = flb_strdup(brokers); + if (!first_broker) { + flb_error("[aws_msk_iam] failed to allocate memory for broker parsing"); + return NULL; + } + + comma = strchr(first_broker, ','); + if (comma) { + *comma = '\0'; /* Terminate at first comma */ + } + + /* Extract region from broker address */ + region_str = extract_region_from_broker(first_broker); + if (!region_str || flb_sds_len(region_str) == 0) { + flb_error("[aws_msk_iam] failed to extract region from broker address: %s", + brokers); + flb_free(first_broker); + if (region_str) { + flb_sds_destroy(region_str); + } + return NULL; + } + + /* Detect if this is MSK Serverless by checking broker address */ ctx = flb_calloc(1, sizeof(struct flb_aws_msk_iam)); if (!ctx) { flb_errno(); + flb_free(first_broker); + flb_sds_destroy(region_str); return NULL; } - /* Store the flb_config for on-demand provider creation */ ctx->flb_config = config; - - ctx->cluster_arn = flb_sds_create(cluster_arn); - if (!ctx->cluster_arn) { - flb_error("[aws_msk_iam] failed to create cluster ARN string"); + ctx->region = region_str; + + /* Detect cluster type (Standard vs Serverless) */ + if (strstr(first_broker, ".kafka-serverless.")) { + ctx->is_serverless = 1; + flb_info("[aws_msk_iam] detected MSK Serverless cluster"); + } + else { + ctx->is_serverless = 0; + } + + flb_free(first_broker); + first_broker = NULL; + + flb_info("[aws_msk_iam] detected %s MSK cluster, region: %s", + ctx->is_serverless ? "Serverless" : "Standard", + region_str); + + /* Create TLS instance */ + ctx->cred_tls = flb_tls_create(FLB_TLS_CLIENT_MODE, + FLB_TRUE, + 0, /* TLS debug off by default */ + NULL, NULL, NULL, NULL, NULL, NULL); + if (!ctx->cred_tls) { + flb_error("[aws_msk_iam] failed to create TLS instance"); + flb_sds_destroy(ctx->region); flb_free(ctx); return NULL; } - /* Extract region */ - region_str = extract_region(cluster_arn); - if (!region_str || strlen(region_str) == 0) { - flb_error("[aws_msk_iam] failed to extract region from cluster ARN: %s", cluster_arn); - flb_sds_destroy(ctx->cluster_arn); + /* Create AWS provider */ + ctx->provider = flb_standard_chain_provider_create(config, + ctx->cred_tls, + ctx->region, + NULL, NULL, + flb_aws_client_generator(), + NULL); + if (!ctx->provider) { + flb_error("[aws_msk_iam] failed to create AWS credentials provider"); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); flb_free(ctx); - if (region_str) flb_free(region_str); return NULL; } - ctx->region = flb_sds_create(region_str); - flb_free(region_str); - - if (!ctx->region) { - flb_error("[aws_msk_iam] failed to create region string"); - flb_sds_destroy(ctx->cluster_arn); + /* Initialize provider */ + ctx->provider->provider_vtable->sync(ctx->provider); + if (ctx->provider->provider_vtable->init(ctx->provider) != 0) { + flb_error("[aws_msk_iam] failed to initialize AWS credentials provider"); + flb_aws_provider_destroy(ctx->provider); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); flb_free(ctx); return NULL; } + ctx->provider->provider_vtable->async(ctx->provider); - flb_info("[aws_msk_iam] extracted region: %s", ctx->region); + /* Initialize mutex to protect credential provider access from concurrent threads */ + if (pthread_mutex_init(&ctx->lock, NULL) != 0) { + flb_error("[aws_msk_iam] failed to initialize credential provider mutex"); + flb_aws_provider_destroy(ctx->provider); + flb_tls_destroy(ctx->cred_tls); + flb_sds_destroy(ctx->region); + flb_free(ctx); + return NULL; + } - /* Set the callback and opaque */ - rd_kafka_conf_set_oauthbearer_token_refresh_cb(kconf, oauthbearer_token_refresh_cb); + /* + * Set MSK IAM context in opaque - now opaque->msk_iam_ctx only holds + * struct flb_aws_msk_iam * throughout its lifetime, eliminating type confusion. + */ flb_kafka_opaque_set(opaque, NULL, ctx); rd_kafka_conf_set_opaque(kconf, opaque); - - flb_info("[aws_msk_iam] OAuth callback registered successfully"); + + /* Register OAuth token refresh callback */ + rd_kafka_conf_set_oauthbearer_token_refresh_cb(kconf, oauthbearer_token_refresh_cb); return ctx; } -/* Simple destroy - just config cleanup, no AWS provider to leak! */ +/* Destroy MSK IAM config */ void flb_aws_msk_iam_destroy(struct flb_aws_msk_iam *ctx) { if (!ctx) { return; } - flb_info("[aws_msk_iam] destroying MSK IAM config"); + if (ctx->provider) { + flb_aws_provider_destroy(ctx->provider); + } - /* NO AWS provider to destroy! */ + if (ctx->cred_tls) { + flb_tls_destroy(ctx->cred_tls); + } + if (ctx->region) { flb_sds_destroy(ctx->region); } - if (ctx->cluster_arn) { - flb_sds_destroy(ctx->cluster_arn); - } + + /* Destroy the credential provider mutex */ + pthread_mutex_destroy(&ctx->lock); + flb_free(ctx); } diff --git a/src/flb_kafka.c b/src/flb_kafka.c index 316c9ba9719..6a76c0dca33 100644 --- a/src/flb_kafka.c +++ b/src/flb_kafka.c @@ -95,7 +95,7 @@ rd_kafka_conf_t *flb_kafka_conf_create(struct flb_kafka *kafka, err: if (kafka_cfg) { - flb_free(kafka_cfg); + rd_kafka_conf_destroy(kafka_cfg); } return NULL; }