Skip to content

Commit

Permalink
Support set_offset_timestamp for KafkaToppar in automatic mode (#1475)
Browse files Browse the repository at this point in the history
  • Loading branch information
kedixa authored Jan 12, 2024
1 parent 0907da9 commit fefd0f5
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 35 deletions.
8 changes: 8 additions & 0 deletions src/client/WFKafkaClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,9 @@ bool KafkaClientTask::add_toppar(const KafkaToppar& toppar)
}

new_toppar.set_offset(toppar.get_offset());
new_toppar.set_offset_timestamp(toppar.get_offset_timestamp());
new_toppar.set_low_watermark(toppar.get_low_watermark());
new_toppar.set_high_watermark(toppar.get_high_watermark());
this->toppar_list.add_item(new_toppar);

this->meta_list.add_item(tmp);
Expand Down Expand Up @@ -1240,6 +1243,9 @@ bool KafkaClientTask::add_toppar(const KafkaToppar& toppar)
}

new_toppar.set_offset(toppar.get_offset());
new_toppar.set_offset_timestamp(toppar.get_offset_timestamp());
new_toppar.set_low_watermark(toppar.get_low_watermark());
new_toppar.set_high_watermark(toppar.get_high_watermark());
this->toppar_list.add_item(new_toppar);

this->meta_list.add_item(*meta);
Expand Down Expand Up @@ -1451,7 +1457,9 @@ int KafkaClientTask::arrange_fetch()
return -1;

new_toppar.set_offset(toppar->get_offset());
new_toppar.set_offset_timestamp(toppar->get_offset_timestamp());
new_toppar.set_low_watermark(toppar->get_low_watermark());
new_toppar.set_high_watermark(toppar->get_high_watermark());
this->toppar_list_map[node_id].add_item(std::move(new_toppar));
}
}
Expand Down
60 changes: 25 additions & 35 deletions src/factory/KafkaTaskImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,62 +207,53 @@ CommMessageOut *__ComplexKafkaTask::message_out()

KafkaConnectionInfo *conn_info =
(KafkaConnectionInfo *)this->get_connection()->get_context();
this->get_req()->set_api(&conn_info->api);
KafkaRequest *req = this->get_req();
req->set_api(&conn_info->api);

if (this->get_req()->get_api_type() == Kafka_Fetch ||
this->get_req()->get_api_type() == Kafka_ListOffsets)
if (req->get_api_type() == Kafka_Fetch ||
req->get_api_type() == Kafka_ListOffsets)
{
KafkaRequest *req = this->get_req();
req->get_toppar_list()->rewind();
KafkaTopparList *req_toppar_lst = req->get_toppar_list();
KafkaToppar *toppar;
KafkaTopparList toppar_list;
bool flag = false;
long long cfg_ts = req->get_config()->get_offset_timestamp();
long long tp_ts;

while ((toppar = req->get_toppar_list()->get_next()) != NULL)
req_toppar_lst->rewind();
while ((toppar = req_toppar_lst->get_next()) != NULL)
{
if (toppar->get_low_watermark() < 0)
toppar->set_offset_timestamp(KAFKA_TIMESTAMP_EARLIEST);
else if (toppar->get_high_watermark() < 0)
toppar->set_offset_timestamp(KAFKA_TIMESTAMP_LATEST);
else if (toppar->get_offset() == KAFKA_OFFSET_UNINIT)
tp_ts = toppar->get_offset_timestamp();
if (tp_ts == KAFKA_TIMESTAMP_UNINIT)
tp_ts = cfg_ts;

if (toppar->get_offset() == KAFKA_OFFSET_UNINIT)
{
long long conf_ts =
this->get_req()->get_config()->get_offset_timestamp();
if (conf_ts == KAFKA_TIMESTAMP_EARLIEST)
{
if (tp_ts == KAFKA_TIMESTAMP_EARLIEST)
toppar->set_offset(toppar->get_low_watermark());
continue;
}
else if (conf_ts == KAFKA_TIMESTAMP_LATEST)
else if (tp_ts < 0)
{
toppar->set_offset(toppar->get_high_watermark());
continue;
}
else
{
toppar->set_offset_timestamp(conf_ts);
tp_ts = KAFKA_TIMESTAMP_LATEST;
}
}
else if (toppar->get_offset() == KAFKA_OFFSET_OVERFLOW)
{
if (this->get_req()->get_config()->get_offset_timestamp() ==
KAFKA_TIMESTAMP_EARLIEST)
{
if (tp_ts == KAFKA_TIMESTAMP_EARLIEST)
toppar->set_offset(toppar->get_low_watermark());
}
else
{
toppar->set_offset(toppar->get_high_watermark());
tp_ts = KAFKA_TIMESTAMP_LATEST;
}
continue;
}
else

if (toppar->get_offset() < 0)
{
continue;
toppar->set_offset_timestamp(tp_ts);
toppar_list.add_item(*toppar);
flag = true;
}

toppar_list.add_item(*toppar);
flag = true;
}

if (flag)
Expand Down Expand Up @@ -530,8 +521,7 @@ bool __ComplexKafkaTask::process_fetch()
this->get_resp()->get_toppar_list()->rewind();
while ((toppar = this->get_resp()->get_toppar_list()->get_next()) != NULL)
{
if (toppar->get_error() == KAFKA_OFFSET_OUT_OF_RANGE &&
toppar->get_high_watermark() - toppar->get_low_watermark() > 0)
if (toppar->get_error() == KAFKA_OFFSET_OUT_OF_RANGE)
{
toppar->set_offset(KAFKA_OFFSET_OVERFLOW);
toppar->set_low_watermark(KAFKA_OFFSET_UNINIT);
Expand Down

0 comments on commit fefd0f5

Please sign in to comment.