Skip to content

Commit 1e55f1e

Browse files
Fixed following issues:
- consumer and producer yields in lua __gc meta methods which leads to crash for whole tarantool (?) - fixed by removing lua __gc meta methods and moving it's logic into separate destroy C functions which will be called from lua close methods of Consumer / Producer - consumer:close sometimes hangs forever - fixed byfollowing changes: - now we instantly destroying librdkafka messages on consume in poll thread which prevents rd_kafka_close and rd_kafka_destroy C functions hangs cause of busy resources - now lua Consumer stops polling fibers only after librdkafka consumer was closed which prevents non handeled callbacks from closing operation - producer sometimes hangs forever - updated librdkafka to 1.5.2
1 parent 7fe2a09 commit 1e55f1e

15 files changed

+143
-76
lines changed

Makefile

+4-1
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ tests-run:
100100
pytest -W ignore -vv && \
101101
deactivate
102102

103-
test-run-with-docker: tests-dep docker-run-all
103+
test-sleep: tests-dep docker-run-all
104104
sleep 10
105105

106+
test-run-with-docker: test-sleep
106107
docker run \
107108
--net=${NETWORK} \
108109
--rm confluentinc/cp-kafka:5.0.0 \
@@ -151,6 +152,8 @@ test-run-with-docker: tests-dep docker-run-all
151152
kafka-topics --create --topic test_consuming_from_last_committed_offset --partitions 1 --replication-factor 1 \
152153
--if-not-exists --zookeeper zookeeper:2181
153154

155+
sleep 5
156+
154157
cd ./tests && \
155158
python3 -m venv venv && \
156159
. venv/bin/activate && \

README.md

+5
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,11 @@ Local run in docker:
379379
# Developing
380380

381381
## Tests
382+
Before run any test you should add to `/etc/hosts` entry
383+
```
384+
127.0.0.1 kafka
385+
```
386+
382387
You can run docker based integration tests via makefile target
383388
```bash
384389
make test-run-with-docker

kafka/common.c

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
#include <lualib.h>
33
#include <lauxlib.h>
44

5+
#include <tarantool/module.h>
6+
57
#include <common.h>
68

79
const char* const consumer_label = "__tnt_kafka_consumer";

kafka/common.h

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
#include <lualib.h>
1515
#include <lauxlib.h>
1616

17+
#include <tarantool/module.h>
18+
1719
const char* const consumer_label;
1820
const char* const consumer_msg_label;
1921
const char* const producer_label;

kafka/consumer.c

+23-21
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,14 @@ consumer_poll_loop(void *arg) {
4646
// FIXME: push errors to error queue?
4747
rd_kafka_message_destroy(rd_msg);
4848
} else {
49+
msg_t *msg = new_consumer_msg(rd_msg);
50+
// free rdkafka message instantly to prevent hang on close / destroy consumer
51+
rd_kafka_message_destroy(rd_msg);
52+
rd_msg = NULL;
53+
4954
pthread_mutex_lock(&event_queues->consume_queue->lock);
5055

51-
queue_lockfree_push(event_queues->consume_queue, rd_msg);
56+
queue_lockfree_push(event_queues->consume_queue, msg);
5257
count = event_queues->consume_queue->count;
5358

5459
pthread_mutex_unlock(&event_queues->consume_queue->lock);
@@ -225,18 +230,14 @@ lua_consumer_poll_msg(struct lua_State *L) {
225230
int msgs_limit = lua_tonumber(L, 2);
226231

227232
lua_createtable(L, msgs_limit, 0);
228-
rd_kafka_message_t *rd_msg = NULL;
233+
msg_t *msg = NULL;
229234
while (msgs_limit > counter) {
230-
rd_msg = queue_pop(consumer->event_queues->consume_queue);
231-
if (rd_msg == NULL) {
235+
msg = queue_pop(consumer->event_queues->consume_queue);
236+
if (msg == NULL) {
232237
break;
233238
}
234239
counter += 1;
235240

236-
msg_t *msg;
237-
msg = malloc(sizeof(msg_t));
238-
msg->rd_message = rd_msg;
239-
240241
msg_t **msg_p = (msg_t **)lua_newuserdata(L, sizeof(msg));
241242
*msg_p = msg;
242243

@@ -500,7 +501,7 @@ lua_consumer_store_offset(struct lua_State *L) {
500501
luaL_error(L, "Usage: err = consumer:store_offset(msg)");
501502

502503
msg_t *msg = lua_check_consumer_msg(L, 2);
503-
rd_kafka_resp_err_t err = rd_kafka_offset_store(msg->rd_message->rkt, msg->rd_message->partition, msg->rd_message->offset);
504+
rd_kafka_resp_err_t err = rd_kafka_offset_store(msg->topic, msg->partition, msg->offset);
504505
if (err) {
505506
const char *const_err_str = rd_kafka_err2str(err);
506507
char err_str[512];
@@ -523,6 +524,14 @@ wait_consumer_close(va_list args) {
523524
return 0;
524525
}
525526

527+
static ssize_t
528+
wait_consumer_destroy(va_list args) {
529+
rd_kafka_t *rd_kafka = va_arg(args, rd_kafka_t *);
530+
// prevents hanging forever
531+
rd_kafka_destroy_flags(rd_kafka, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
532+
return 0;
533+
}
534+
526535
static rd_kafka_resp_err_t
527536
consumer_destroy(struct lua_State *L, consumer_t *consumer) {
528537
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
@@ -531,11 +540,6 @@ consumer_destroy(struct lua_State *L, consumer_t *consumer) {
531540
rd_kafka_topic_partition_list_destroy(consumer->topics);
532541
}
533542

534-
// trying to close in background until success
535-
while (coio_call(wait_consumer_close, consumer->rd_consumer) == -1) {
536-
// FIXME: maybe send errors to error queue?
537-
}
538-
539543
if (consumer->poller != NULL) {
540544
destroy_consumer_poller(consumer->poller);
541545
}
@@ -547,7 +551,7 @@ consumer_destroy(struct lua_State *L, consumer_t *consumer) {
547551
if (consumer->rd_consumer != NULL) {
548552
/* Destroy handle */
549553
// FIXME: kafka_destroy hangs forever
550-
// coio_call(kafka_destroy, consumer->rd_consumer);
554+
coio_call(wait_consumer_destroy, consumer->rd_consumer);
551555
}
552556

553557
free(consumer);
@@ -563,22 +567,20 @@ lua_consumer_close(struct lua_State *L) {
563567
return 1;
564568
}
565569

570+
rd_kafka_commit((*consumer_p)->rd_consumer, NULL, 0); // sync commit of current offsets
571+
rd_kafka_unsubscribe((*consumer_p)->rd_consumer);
572+
566573
// trying to close in background until success
567574
while (coio_call(wait_consumer_close, (*consumer_p)->rd_consumer) == -1) {
568575
// FIXME: maybe send errors to error queue?
569576
}
570577

571-
if ((*consumer_p)->poller != NULL) {
572-
destroy_consumer_poller((*consumer_p)->poller);
573-
(*consumer_p)->poller = NULL;
574-
}
575-
576578
lua_pushboolean(L, 1);
577579
return 1;
578580
}
579581

580582
int
581-
lua_consumer_gc(struct lua_State *L) {
583+
lua_consumer_destroy(struct lua_State *L) {
582584
consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label);
583585
if (consumer_p && *consumer_p) {
584586
consumer_destroy(L, *consumer_p);

kafka/consumer.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ int lua_consumer_store_offset(struct lua_State *L);
5050

5151
int lua_consumer_close(struct lua_State *L);
5252

53-
int lua_consumer_gc(struct lua_State *L);
53+
int lua_consumer_destroy(struct lua_State *L);
5454

5555
int lua_create_consumer(struct lua_State *L);
5656

kafka/consumer_msg.c

+66-24
Original file line numberDiff line numberDiff line change
@@ -22,83 +22,83 @@ lua_check_consumer_msg(struct lua_State *L, int index) {
2222
int
2323
lua_consumer_msg_topic(struct lua_State *L) {
2424
msg_t *msg = lua_check_consumer_msg(L, 1);
25-
lua_pushstring(L, rd_kafka_topic_name(msg->rd_message->rkt));
25+
lua_pushstring(L, rd_kafka_topic_name(msg->topic));
2626
return 1;
2727
}
2828

2929
int
3030
lua_consumer_msg_partition(struct lua_State *L) {
3131
msg_t *msg = lua_check_consumer_msg(L, 1);
3232

33-
lua_pushnumber(L, (double)msg->rd_message->partition);
33+
lua_pushnumber(L, (double)msg->partition);
3434
return 1;
3535
}
3636

3737
int
3838
lua_consumer_msg_offset(struct lua_State *L) {
3939
msg_t *msg = lua_check_consumer_msg(L, 1);
4040

41-
luaL_pushint64(L, msg->rd_message->offset);
41+
luaL_pushint64(L, msg->offset);
4242
return 1;
4343
}
4444

4545
int
4646
lua_consumer_msg_key(struct lua_State *L) {
4747
msg_t *msg = lua_check_consumer_msg(L, 1);
4848

49-
if (msg->rd_message->key_len <= 0 || msg->rd_message->key == NULL || ((char*)msg->rd_message->key) == NULL) {
49+
if (msg->key_len <= 0 || msg->key == NULL || ((char*)msg->key) == NULL) {
5050
return 0;
5151
}
5252

53-
lua_pushlstring(L, (char*)msg->rd_message->key, msg->rd_message->key_len);
53+
lua_pushlstring(L, msg->key, msg->key_len);
5454
return 1;
5555
}
5656

5757
int
5858
lua_consumer_msg_value(struct lua_State *L) {
5959
msg_t *msg = lua_check_consumer_msg(L, 1);
6060

61-
if (msg->rd_message->len <= 0 || msg->rd_message->payload == NULL || ((char*)msg->rd_message->payload) == NULL) {
61+
if (msg->value_len <= 0 || msg->value == NULL || ((char*)msg->value) == NULL) {
6262
return 0;
6363
}
6464

65-
lua_pushlstring(L, (char*)msg->rd_message->payload, msg->rd_message->len);
65+
lua_pushlstring(L, msg->value, msg->value_len);
6666
return 1;
6767
}
6868

6969
int
7070
lua_consumer_msg_tostring(struct lua_State *L) {
7171
msg_t *msg = lua_check_consumer_msg(L, 1);
7272

73-
size_t key_len = msg->rd_message->key_len <= 0 ? 5: msg->rd_message->key_len + 1;
73+
size_t key_len = msg->key_len <= 0 ? 5: msg->key_len + 1;
7474
char key[key_len];
7575

76-
if (msg->rd_message->key_len <= 0 || msg->rd_message->key == NULL || ((char*)msg->rd_message->key) == NULL) {
76+
if (msg->key_len <= 0 || msg->key == NULL || ((char*)msg->key) == NULL) {
7777
strncpy(key, "NULL", 5);
7878
} else {
79-
strncpy(key, msg->rd_message->key, msg->rd_message->key_len + 1);
80-
if (key[msg->rd_message->key_len] != '\0') {
81-
key[msg->rd_message->key_len] = '\0';
79+
strncpy(key, msg->key, msg->key_len + 1);
80+
if (key[msg->key_len] != '\0') {
81+
key[msg->key_len] = '\0';
8282
}
8383
}
8484

85-
size_t value_len = msg->rd_message->len <= 0 ? 5: msg->rd_message->len + 1;
85+
size_t value_len = msg->value_len <= 0 ? 5: msg->value_len + 1;
8686
char value[value_len];
8787

88-
if (msg->rd_message->len <= 0 || msg->rd_message->payload == NULL || ((char*)msg->rd_message->payload) == NULL) {
88+
if (msg->value_len <= 0 || msg->value == NULL || ((char*)msg->value) == NULL) {
8989
strncpy(value, "NULL", 5);
9090
} else {
91-
strncpy(value, msg->rd_message->payload, msg->rd_message->len + 1);
92-
if (value[msg->rd_message->len] != '\0') {
93-
value[msg->rd_message->len] = '\0';
91+
strncpy(value, msg->value, msg->value_len + 1);
92+
if (value[msg->value_len] != '\0') {
93+
value[msg->value_len] = '\0';
9494
}
9595
}
9696

9797
lua_pushfstring(L,
9898
"Kafka Consumer Message: topic=%s partition=%d offset=%d key=%s value=%s",
99-
rd_kafka_topic_name(msg->rd_message->rkt),
100-
msg->rd_message->partition,
101-
msg->rd_message->offset,
99+
rd_kafka_topic_name(msg->topic),
100+
msg->partition,
101+
msg->offset,
102102
key,
103103
value);
104104
return 1;
@@ -108,13 +108,55 @@ int
108108
lua_consumer_msg_gc(struct lua_State *L) {
109109
msg_t **msg_p = (msg_t **)luaL_checkudata(L, 1, consumer_msg_label);
110110
if (msg_p && *msg_p) {
111-
if ((*msg_p)->rd_message != NULL) {
112-
rd_kafka_message_destroy((*msg_p)->rd_message);
113-
}
114-
free(*msg_p);
111+
destroy_consumer_msg(*msg_p);
115112
}
116113
if (msg_p)
117114
*msg_p = NULL;
118115

119116
return 0;
117+
}
118+
119+
msg_t *
120+
new_consumer_msg(rd_kafka_message_t *rd_message) {
121+
msg_t *msg;
122+
msg = malloc(sizeof(msg_t));
123+
msg->topic = rd_message->rkt;
124+
msg->partition = rd_message->partition;
125+
126+
// value
127+
if (rd_message->len > 0) {
128+
msg->value = malloc(rd_message->len);
129+
memcpy(msg->value, rd_message->payload, rd_message->len);
130+
}
131+
msg->value_len = rd_message->len;
132+
133+
// key
134+
if (rd_message->key_len > 0) {
135+
msg->key = malloc(rd_message->key_len);
136+
memcpy(msg->key, rd_message->key, rd_message->key_len);
137+
}
138+
msg->key_len = rd_message->key_len;
139+
140+
msg->offset = rd_message->offset;
141+
142+
return msg;
143+
}
144+
145+
void
146+
destroy_consumer_msg(msg_t *msg) {
147+
if (msg == NULL) {
148+
return;
149+
}
150+
151+
if (msg->key != NULL) {
152+
free(msg->key);
153+
}
154+
155+
if (msg->value != NULL) {
156+
free(msg->value);
157+
}
158+
159+
free(msg);
160+
161+
return;
120162
}

kafka/consumer_msg.h

+11-1
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,21 @@
1212
* Consumer Message
1313
*/
1414
typedef struct {
15-
rd_kafka_message_t *rd_message;
15+
rd_kafka_topic_t *topic;
16+
int32_t partition;
17+
char *value;
18+
size_t value_len;
19+
char *key;
20+
size_t key_len;
21+
int64_t offset;
1622
} msg_t;
1723

1824
msg_t *lua_check_consumer_msg(struct lua_State *L, int index);
1925

26+
msg_t *new_consumer_msg(rd_kafka_message_t *rd_message);
27+
28+
void destroy_consumer_msg(msg_t *msg);
29+
2030
int lua_consumer_msg_topic(struct lua_State *L);
2131

2232
int lua_consumer_msg_partition(struct lua_State *L);

0 commit comments

Comments
 (0)