Open
Description
Current example for topic reader:
// Create topic client.
NYdb::NTopic::TTopicClient topicClient(driver);
// Create read session.
NYdb::NTopic::TReadSessionSettings settings;
settings
.ConsumerName(opts.ConsumerName)
.AppendTopics(opts.TopicPath);
ReadSession = topicClient.CreateReadSession(settings);
std::cerr << "Session was created" << std::endl;
// [BEGIN read session process events]
// Event loop
while (true) {
auto future = ReadSession->WaitEvent();
// Wait for next event or ten seconds
future.Wait(TDuration::Seconds(10));
// Get event
std::optional<NYdb::NTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(true/*block - will block if no event received yet*/);
std::cerr << "Got new read session event: " << DebugString(*event) << std::endl;
if (auto* dataEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
for (const auto& message : dataEvent->GetMessages()) {
std::cerr << "Data message: \"" << message.GetData() << "\"" << std::endl;
}
if (opts.CommitAfterProcessing) {
dataEvent->Commit();
}
} else if (auto* startPartitionSessionEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
startPartitionSessionEvent->Confirm();
} else if (auto* stopPartitionSessionEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
stopPartitionSessionEvent->Confirm();
} else if (auto* endPartitionSessionEvent = std::get_if<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent>(&*event)) {
endPartitionSessionEvent->Confirm();
} else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
break;
}
}
It's unfriendly API, we should design more simple and friendly API like ISimpleWriteSession
Metadata
Metadata
Assignees
Labels
No labels