From 61149156ded3631dcc2191d2286e44fbdf6eff32 Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Thu, 23 Oct 2025 14:46:39 +0530 Subject: [PATCH 1/3] Add test duration feature and TLS logging to rdkafka_performance example - Introduced a new command-line option '-E' to specify the test duration in seconds, allowing for time-based message production. - Implemented a logger callback to capture and display TLS version information during connections. - Updated print_stats function to include remaining time for duration-based tests. - Added validation to prevent simultaneous use of message count and test duration options. --- examples/rdkafka_performance.c | 92 +++++++++++++++++++++++++++++++--- 1 file changed, 85 insertions(+), 7 deletions(-) diff --git a/examples/rdkafka_performance.c b/examples/rdkafka_performance.c index dab0b06b8f..e2abe0b346 100644 --- a/examples/rdkafka_performance.c +++ b/examples/rdkafka_performance.c @@ -76,6 +76,7 @@ static int partition_cnt = 0; static int eof_cnt = 0; static int with_dr = 1; static int read_hdrs = 0; +static int test_duration = 0; static void stop(int sig) { @@ -84,6 +85,22 @@ static void stop(int sig) { run = 0; } +/** + * @brief Logger callback to capture and display TLS version information + */ +static void logger_cb(const rd_kafka_t *rk, + int level, + const char *fac, + const char *buf) { + /* Display TLS connection info */ + if (strstr(fac, "SSLVERIFY") && strstr(buf, "TLS version:")) { + fprintf(stderr, "\n[INFO] TLS Connection Established: %s\n\n", buf); + } + + /* Print warnings and errors to stderr */ + fprintf(stderr, "%%%d|%s|%s: %s\n", level, rd_kafka_name(rk), fac, buf); +} + static long int msgs_wait_cnt = 0; static long int msgs_wait_produce_cnt = 0; static rd_ts_t t_end; @@ -594,6 +611,17 @@ print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) { } if (otype & _OTYPE_SUMMARY) { + /* Add time remaining for duration-based tests */ + if (test_duration > 0 && cnt.t_start > 0) { + rd_ts_t elapsed_us = now - cnt.t_start; + rd_ts_t duration_us = (rd_ts_t)test_duration * 1000 * 1000; + if (elapsed_us < duration_us) { + int remaining_secs = (int)((duration_us - elapsed_us) / 1000000); + extra_of += rd_snprintf(extra + extra_of, sizeof(extra) - extra_of, + ", %ds remaining", remaining_secs); + } + } + printf("%% %" PRIu64 " messages produced " "(%" PRIu64 @@ -607,13 +635,13 @@ print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) { "%.02f MB/s, " "%" PRIu64 " produce failures, %i in queue, " - "%s compression\n", + "%s compression%s\n", cnt.msgs, cnt.bytes, cnt.msgs_dr_ok, cnt.last_offset, cnt.msgs_dr_err, t_total / 1000, ((cnt.msgs_dr_ok * 1000000) / t_total), (float)((cnt.bytes_dr_ok) / (float)t_total), cnt.tx_err, rk ? rd_kafka_outq_len(rk) : 0, - compression); + compression, extra); } } else { @@ -894,7 +922,7 @@ int main(int argc, char **argv) { while ((opt = getopt(argc, argv, "PCG:t:p:b:s:k:c:fi:MDd:m:S:x:" - "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:")) != -1) { + "R:a:z:o:X:B:eT:Y:qvIur:lA:OwNH:E:")) != -1) { switch (opt) { case 'G': if (rd_kafka_conf_set(conf, "group.id", optarg, errstr, @@ -1118,12 +1146,32 @@ int main(int argc, char **argv) { with_dr = 0; break; + case 'E': + test_duration = atoi(optarg); + if (test_duration <= 0) { + fprintf(stderr, + "%% Invalid test duration: %s " + "(must be > 0)\n", + optarg); + exit(1); + } + break; + default: fprintf(stderr, "Unknown option: %c\n", opt); goto usage; } } + /* Validate that both -c and -E are not specified together */ + if (msgcnt != -1 && test_duration > 0) { + fprintf(stderr, + "%% Error: Cannot specify both -c (message count) " + "and -E (test duration)\n" + "%% Use either -c OR -E , not both\n"); + exit(1); + } + if (topics->cnt == 0 || optind != argc) { if (optind < argc) fprintf(stderr, "Unknown argument: %s\n", argv[optind]); @@ -1148,6 +1196,9 @@ int main(int argc, char **argv) { " -H Add header to message (producer)\n" " -H parse Read message headers (consumer)\n" " -c Messages to transmit/receive\n" + " -E Run test for specified duration in seconds (producer)\n" + " Cannot be used with -c. When used, sends unlimited messages\n" + " until time expires.\n" " -x Hard exit after transmitting " "messages (producer)\n" " -D Copy/Duplicate data buffer (producer)\n" @@ -1221,6 +1272,9 @@ int main(int argc, char **argv) { exit(1); } + /* Set logger callback to display TLS version info */ + rd_kafka_conf_set_log_cb(conf, logger_cb); + /* Always enable stats (for RTT extraction), and if user supplied * the -T option we let her take part of the stats aswell. */ rd_kafka_conf_set_stats_cb(conf, stats_cb); @@ -1368,12 +1422,18 @@ int main(int argc, char **argv) { rof += (off_t)xlen; } - if (msgcnt == -1) + /* Handle time-based testing */ + if (test_duration > 0) { + printf("%% Sending messages of size %i bytes for %i seconds\n", + msgsize, test_duration); + msgcnt = -1; /* Unlimited messages */ + } else if (msgcnt == -1) { printf("%% Sending messages of size %i bytes\n", msgsize); - else + } else { printf("%% Sending %i messages of size %i bytes\n", msgcnt, msgsize); + } if (with_dr) rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered); @@ -1407,6 +1467,18 @@ int main(int argc, char **argv) { msgs_wait_produce_cnt = msgcnt; while (run && (msgcnt == -1 || (int)cnt.msgs < msgcnt)) { + /* Check if test duration has elapsed */ + if (test_duration > 0) { + rd_ts_t elapsed_us = rd_clock() - cnt.t_start; + rd_ts_t duration_us = (rd_ts_t)test_duration * 1000 * 1000; /* seconds to microseconds */ + if (elapsed_us >= duration_us) { + if (verbosity >= 1) + printf("%% Test duration of %d seconds reached, stopping...\n", + test_duration); + break; + } + } + /* Send/Produce message. */ if (idle) { @@ -1507,8 +1579,14 @@ int main(int argc, char **argv) { msgs_wait_cnt); /* Wait for messages to be delivered */ - while (run && rd_kafka_poll(rk, 1000) != -1) - print_stats(rk, mode, otype, compression); + if (test_duration > 0) { + /* For time-based tests, flush remaining messages with timeout */ + rd_kafka_flush(rk, 5000); /* 5 second flush timeout */ + } else { + /* For count-based tests, wait for all deliveries */ + while (run && rd_kafka_poll(rk, 1000) != -1) + print_stats(rk, mode, otype, compression); + } outq = rd_kafka_outq_len(rk); From 340714873cda133148b41038996926549454f519 Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Fri, 24 Oct 2025 12:30:12 +0530 Subject: [PATCH 2/3] Add TLS version restriction to SSL context initialization and enhance SSL verification logging - Set both minimum and maximum TLS version to 1.3 for testing purposes. - Log the TLS version and cipher used during SSL certificate verification for improved debugging. --- src/rdkafka_ssl.c | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/rdkafka_ssl.c b/src/rdkafka_ssl.c index 6747d346e6..c61de6875a 100644 --- a/src/rdkafka_ssl.c +++ b/src/rdkafka_ssl.c @@ -617,6 +617,11 @@ static int rd_kafka_transport_ssl_verify(rd_kafka_transport_t *rktrans) { rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY", "Broker SSL certificate verified"); + rd_rkb_dbg(rktrans->rktrans_rkb, SECURITY, "SSLVERIFY", + "TLS version: %s, Cipher: %s", + SSL_get_version(rktrans->rktrans_ssl), + SSL_get_cipher(rktrans->rktrans_ssl)); + return 0; } @@ -1885,6 +1890,22 @@ int rd_kafka_ssl_ctx_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { goto fail; } + /* Force TLS version to 1.3 for testing */ +#if OPENSSL_VERSION_NUMBER >= 0x10100000 + if (!SSL_CTX_set_min_proto_version(ctx, TLS1_3_VERSION)) { + rd_snprintf(errstr, errstr_size, + "Failed to set minimum TLS version to 1.3"); + goto fail; + } + if (!SSL_CTX_set_max_proto_version(ctx, TLS1_3_VERSION)) { + rd_snprintf(errstr, errstr_size, + "Failed to set maximum TLS version to 1.3"); + goto fail; + } + rd_kafka_dbg(rk, SECURITY, "SSL", + "TLS version restricted to 1.3 only (min=1.3, max=1.3)"); +#endif + #ifdef SSL_OP_NO_SSLv3 /* Disable SSLv3 (unsafe) */ SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv3); From 3235e05404c0335fbf0d1aadd5bcd3474c6c93f2 Mon Sep 17 00:00:00 2001 From: Ankith-Confluent Date: Mon, 3 Nov 2025 20:09:44 +0530 Subject: [PATCH 3/3] formatting changes --- examples/rdkafka_performance.c | 56 +++++++++++++++++++++------------- src/rdkafka_ssl.c | 16 ---------- 2 files changed, 35 insertions(+), 37 deletions(-) diff --git a/examples/rdkafka_performance.c b/examples/rdkafka_performance.c index e2abe0b346..00c6b7d050 100644 --- a/examples/rdkafka_performance.c +++ b/examples/rdkafka_performance.c @@ -76,7 +76,7 @@ static int partition_cnt = 0; static int eof_cnt = 0; static int with_dr = 1; static int read_hdrs = 0; -static int test_duration = 0; +static int test_duration = 0; static void stop(int sig) { @@ -88,15 +88,14 @@ static void stop(int sig) { /** * @brief Logger callback to capture and display TLS version information */ -static void logger_cb(const rd_kafka_t *rk, - int level, - const char *fac, - const char *buf) { +static void +logger_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf) { /* Display TLS connection info */ if (strstr(fac, "SSLVERIFY") && strstr(buf, "TLS version:")) { - fprintf(stderr, "\n[INFO] TLS Connection Established: %s\n\n", buf); + fprintf(stderr, "\n[INFO] TLS Connection Established: %s\n\n", + buf); } - + /* Print warnings and errors to stderr */ fprintf(stderr, "%%%d|%s|%s: %s\n", level, rd_kafka_name(rk), fac, buf); } @@ -614,14 +613,19 @@ print_stats(rd_kafka_t *rk, int mode, int otype, const char *compression) { /* Add time remaining for duration-based tests */ if (test_duration > 0 && cnt.t_start > 0) { rd_ts_t elapsed_us = now - cnt.t_start; - rd_ts_t duration_us = (rd_ts_t)test_duration * 1000 * 1000; + rd_ts_t duration_us = + (rd_ts_t)test_duration * 1000 * 1000; if (elapsed_us < duration_us) { - int remaining_secs = (int)((duration_us - elapsed_us) / 1000000); - extra_of += rd_snprintf(extra + extra_of, sizeof(extra) - extra_of, - ", %ds remaining", remaining_secs); + int remaining_secs = + (int)((duration_us - elapsed_us) / + 1000000); + extra_of += rd_snprintf( + extra + extra_of, + sizeof(extra) - extra_of, + ", %ds remaining", remaining_secs); } } - + printf("%% %" PRIu64 " messages produced " "(%" PRIu64 @@ -1196,8 +1200,10 @@ int main(int argc, char **argv) { " -H Add header to message (producer)\n" " -H parse Read message headers (consumer)\n" " -c Messages to transmit/receive\n" - " -E Run test for specified duration in seconds (producer)\n" - " Cannot be used with -c. When used, sends unlimited messages\n" + " -E Run test for specified duration in seconds " + "(producer)\n" + " Cannot be used with -c. When used, sends " + "unlimited messages\n" " until time expires.\n" " -x Hard exit after transmitting " "messages (producer)\n" @@ -1424,8 +1430,10 @@ int main(int argc, char **argv) { /* Handle time-based testing */ if (test_duration > 0) { - printf("%% Sending messages of size %i bytes for %i seconds\n", - msgsize, test_duration); + printf( + "%% Sending messages of size %i bytes for %i " + "seconds\n", + msgsize, test_duration); msgcnt = -1; /* Unlimited messages */ } else if (msgcnt == -1) { printf("%% Sending messages of size %i bytes\n", @@ -1470,15 +1478,20 @@ int main(int argc, char **argv) { /* Check if test duration has elapsed */ if (test_duration > 0) { rd_ts_t elapsed_us = rd_clock() - cnt.t_start; - rd_ts_t duration_us = (rd_ts_t)test_duration * 1000 * 1000; /* seconds to microseconds */ + rd_ts_t duration_us = + (rd_ts_t)test_duration * 1000 * + 1000; /* seconds to microseconds */ if (elapsed_us >= duration_us) { if (verbosity >= 1) - printf("%% Test duration of %d seconds reached, stopping...\n", - test_duration); + printf( + "%% Test duration of %d " + "seconds reached, " + "stopping...\n", + test_duration); break; } } - + /* Send/Produce message. */ if (idle) { @@ -1580,7 +1593,8 @@ int main(int argc, char **argv) { /* Wait for messages to be delivered */ if (test_duration > 0) { - /* For time-based tests, flush remaining messages with timeout */ + /* For time-based tests, flush remaining messages with + * timeout */ rd_kafka_flush(rk, 5000); /* 5 second flush timeout */ } else { /* For count-based tests, wait for all deliveries */ diff --git a/src/rdkafka_ssl.c b/src/rdkafka_ssl.c index c61de6875a..9229141cc4 100644 --- a/src/rdkafka_ssl.c +++ b/src/rdkafka_ssl.c @@ -1890,22 +1890,6 @@ int rd_kafka_ssl_ctx_init(rd_kafka_t *rk, char *errstr, size_t errstr_size) { goto fail; } - /* Force TLS version to 1.3 for testing */ -#if OPENSSL_VERSION_NUMBER >= 0x10100000 - if (!SSL_CTX_set_min_proto_version(ctx, TLS1_3_VERSION)) { - rd_snprintf(errstr, errstr_size, - "Failed to set minimum TLS version to 1.3"); - goto fail; - } - if (!SSL_CTX_set_max_proto_version(ctx, TLS1_3_VERSION)) { - rd_snprintf(errstr, errstr_size, - "Failed to set maximum TLS version to 1.3"); - goto fail; - } - rd_kafka_dbg(rk, SECURITY, "SSL", - "TLS version restricted to 1.3 only (min=1.3, max=1.3)"); -#endif - #ifdef SSL_OP_NO_SSLv3 /* Disable SSLv3 (unsafe) */ SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv3);