Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
# librdkafka v2.13.0

librdkafka v2.13.0 is a feature release:

* Fix memory management for interceptors in rd_kafka_conf to prevent
double-free errors (#5240).
* Fix for the pseudo-random generator seed on Windows involving as well
the uniqueness of the new consumer group protocol member id (#5265).


## Fixes

### General fixes

* Issues: #4142.
Fix memory management for interceptors in rd_kafka_conf to prevent double-free errors.
In case the client instance fails the users needs to destroy the configuration
data structure, it was causing a double-free because the interceptors were
already freed in the constructor.
Happening since 1.x (#5240).
* Issues: #5263, #3929.
Fix for the pseudo-random seed on Windows. The function `rand_r` isn't present
on Windows and the global seed wasn't based on the current microseconds and thread
id. Also it wasn't called on every thread as required on this platform but
only once per process. The fix allows on this platform the uniqueness of client side
member id generation in next-generation consumer group protocol.
Happening since 1.x (#5265).



# librdkafka v2.12.1

librdkafka v2.12.1 is a maintenance release:
Expand Down
69 changes: 51 additions & 18 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "rdkafka_interceptor.h"
#include "rdkafka_idempotence.h"
#include "rdkafka_sasl_oauthbearer.h"
#include "rdmurmur2.h"
#if WITH_OAUTHBEARER_OIDC
#include "rdkafka_sasl_oauthbearer_oidc.h"
#endif
Expand All @@ -82,8 +83,14 @@
#endif


static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
static once_flag rd_kafka_global_srand_once = ONCE_FLAG_INIT;
static once_flag rd_kafka_global_init_once = ONCE_FLAG_INIT;
#ifdef _WIN32
/* On Windows srand needs to be called on each thread. */
static RD_TLS once_flag rd_kafka_srand_once = ONCE_FLAG_INIT;
#else
static once_flag rd_kafka_srand_once = ONCE_FLAG_INIT;
#endif


/**
* @brief Global counter+lock for all active librdkafka instances
Expand Down Expand Up @@ -130,6 +137,22 @@ void rd_kafka_set_thread_name(const char *fmt, ...) {
*/
static char RD_TLS rd_kafka_thread_sysname[16] = "app";

/**
* @brief Seed the PRNG with current microseconds and thread ID.
*/
static void rd_kafka_srand(void) {
unsigned int seed = 0;
struct timeval tv;
rd_gettimeofday(&tv, NULL);
seed = (unsigned int)(tv.tv_usec);
seed ^= thrd_current_id();

/* Apply the murmur2 hash to distribute entropy to
* the whole seed. */
seed = (unsigned int)rd_murmur2(&seed, sizeof(seed));
srand(seed);
}

void rd_kafka_set_thread_sysname(const char *fmt, ...) {
va_list ap;

Expand All @@ -141,6 +164,30 @@ void rd_kafka_set_thread_sysname(const char *fmt, ...) {
thrd_setname(rd_kafka_thread_sysname);
}

/**
* @brief Seed the PRNG for the current thread or for the whole process.
* Depending on the platform implementation of srand() the seed can
* be a thread local or global one. In case it's thread local we
* need to call it on each thread.
*
* @param rk Client instance.
* @param internal_thread If true, seed the PRNG if
* it's required per-thread.
*/
void rd_kafka_thread_srand(rd_kafka_t *rk, rd_bool_t internal_thread) {
#ifdef _WIN32
rd_bool_t required_per_thread = rd_true;
#else
rd_bool_t required_per_thread = rd_false;
#endif
if ((required_per_thread &&
(rk->rk_conf.enable_random_seed || internal_thread)) ||
(!required_per_thread && rk->rk_conf.enable_random_seed &&
!internal_thread)) {
call_once(&rd_kafka_srand_once, rd_kafka_srand);
}
}

static void rd_kafka_global_init0(void) {
cJSON_Hooks json_hooks = {.malloc_fn = rd_malloc, .free_fn = rd_free};

Expand Down Expand Up @@ -171,18 +218,6 @@ void rd_kafka_global_init(void) {
}


/**
* @brief Seed the PRNG with current_time.milliseconds
*/
static void rd_kafka_global_srand(void) {
struct timeval tv;

rd_gettimeofday(&tv, NULL);

srand((unsigned int)(tv.tv_usec / 1000));
}


/**
* @returns the current number of active librdkafka instances
*/
Expand Down Expand Up @@ -2218,6 +2253,7 @@ static int rd_kafka_thread_main(void *arg) {

rd_kafka_set_thread_name("main");
rd_kafka_set_thread_sysname("rdk:main");
rd_kafka_thread_srand(rk, rd_true /* we're in an internal thread */);

rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_MAIN);

Expand Down Expand Up @@ -2367,10 +2403,7 @@ rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,
* freed from rd_kafka_destroy_internal()
* as the rk itself is destroyed. */

/* Seed PRNG, don't bother about HAVE_RAND_R, since it is pretty cheap.
*/
if (rk->rk_conf.enable_random_seed)
call_once(&rd_kafka_global_srand_once, rd_kafka_global_srand);
rd_kafka_thread_srand(rk, rd_false /* we're on an app thread */);

/* Call on_new() interceptors */
rd_kafka_interceptors_on_new(rk, &rk->rk_conf);
Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_background.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ int rd_kafka_background_thread_main(void *arg) {

rd_kafka_set_thread_name("background");
rd_kafka_set_thread_sysname("rdk:bg");
rd_kafka_thread_srand(rk, rd_true /* we're in an internal thread */);

rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BACKGROUND);

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -4512,6 +4512,7 @@ static int rd_kafka_broker_thread_main(void *arg) {

rd_kafka_set_thread_name("%s", rkb->rkb_name);
rd_kafka_set_thread_sysname("rdk:broker%" PRId32, rkb->rkb_nodeid);
rd_kafka_thread_srand(rk, rd_true /* we're in an internal thread */);

rd_kafka_interceptors_on_thread_start(rk, RD_KAFKA_THREAD_BROKER);

Expand Down
1 change: 1 addition & 0 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,7 @@ extern char RD_TLS rd_kafka_thread_name[64];

void rd_kafka_set_thread_name(const char *fmt, ...) RD_FORMAT(printf, 1, 2);
void rd_kafka_set_thread_sysname(const char *fmt, ...) RD_FORMAT(printf, 1, 2);
void rd_kafka_thread_srand(rd_kafka_t *rk, rd_bool_t internal_thread);

int rd_kafka_path_is_dir(const char *path);
rd_bool_t rd_kafka_dir_is_empty(const char *path);
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,8 @@ static int rd_kafka_mock_cluster_thread_main(void *arg) {

rd_kafka_set_thread_name("mock");
rd_kafka_set_thread_sysname("rdk:mock");
rd_kafka_thread_srand(mcluster->rk,
rd_true /* we're in an internal thread */);
rd_kafka_interceptors_on_thread_start(mcluster->rk,
RD_KAFKA_THREAD_BACKGROUND);
rd_atomic32_add(&rd_kafka_thread_cnt_curr, 1);
Expand Down
9 changes: 1 addition & 8 deletions src/rdkafka_ssl.c
Original file line number Diff line number Diff line change
Expand Up @@ -1993,14 +1993,7 @@ rd_kafka_transport_ssl_lock_cb(int mode, int i, const char *file, int line) {
#endif

static RD_UNUSED unsigned long rd_kafka_transport_ssl_threadid_cb(void) {
#ifdef _WIN32
/* Windows makes a distinction between thread handle
* and thread id, which means we can't use the
* thrd_current() API that returns the handle. */
return (unsigned long)GetCurrentThreadId();
#else
return (unsigned long)(intptr_t)thrd_current();
#endif
return thrd_current_id();
}

#ifdef HAVE_OPENSSL_CRYPTO_THREADID_SET_CALLBACK
Expand Down
2 changes: 1 addition & 1 deletion src/rdrand.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ int rd_jitter(int low, int high) {
struct timeval tv;
rd_gettimeofday(&tv, NULL);
seed = (unsigned int)(tv.tv_usec);
seed ^= (unsigned int)(intptr_t)thrd_current();
seed ^= thrd_current_id();

/* When many threads are created at the same time and the
* thread id is different only by a few bits it's possible that
Expand Down
11 changes: 11 additions & 0 deletions src/tinycthread_extra.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ int thrd_is_current(thrd_t thr) {
#endif
}

unsigned long thrd_current_id(void) {
#ifdef _WIN32
/* Windows makes a distinction between thread handle
* and thread id, which means we can't use the
* thrd_current() API that returns the handle. */
return (unsigned long)GetCurrentThreadId();
#else
return (unsigned long)(intptr_t)thrd_current();
#endif
}


#ifdef _WIN32
void cnd_wait_enter(cnd_t *cond) {
Expand Down
6 changes: 6 additions & 0 deletions src/tinycthread_extra.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ int thrd_setname(const char *name);
*/
int thrd_is_current(thrd_t thr);

/**
* @brief Get current thread ID as an unsigned long.
* @return Current thread ID.
*/
unsigned long thrd_current_id(void);


#ifdef _WIN32
/**
Expand Down