Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling no coordinator and data loss in ICR mode #12372

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

kumarpritam863
Copy link

Few Observations:

  • In the ICR mode "open" receives only the "newly added partitions", and "open" will "not be called" by connect framework if there are No New Partitions Assigned to the Task.

  • Similarly in case of "Close" the Task receives only the "removed partitions" but we blindly close the "Co-ordinator".

  • The coordinator is created only in case "open" is called but in case when a partition is revoked and no partition is added on the task then only close will be called with that revoked partition without any open call.

How this is leading to NO-Coordinator Scenario:

  • Consider the case when a partition other than partition ZERO is removed from the leader task and is assigned to some other task.
  • In this case a close call on leader will close the Co-ordinator but since this task will not get open call this will not lead to leader-election on this task.
  • As the other task which received the removed task has not received partition zero, leader election on that task will also not lead to that task being elected as leader.

Let's see this with the below example:

Initially we had one worker "W0" with two tasks "T0" and "T1" consuming from two partitions of one topic namely "P0" and "P1", so the initial configuration is:

W0 -> [{T0,P0}, {T1, P1}] -> this will elect "T0" as the co-ordinator as it has "P0"

Now another worker "W1" joins:-> this will lead to rebalancing on both tasks as well as topic partitions within those tasks.

  • Connect Framework will stop T1 on W0.
  • This will cause partition level rebalance and partition P1 will be assigned to T0.

State at this point of time:

W0 -> [{T0,[P0, P1]}]
W1 -> []

Now,

  • Connect FrameWork will start the T1 on W1.
  • This will again lead to a rebalance at the partition level.

Assume P1 is removed from T0:

  • Connect Framework will call "close" call on T0 with Partition P1.
  • This will close the Co-ordinator on T0.
  • An "open" call we be made by the Connect FrameWork on T1 with partition P1.
  • This will lead to leader-election on T1 but since it is assigned P1, T1 will not be elected as Co-ordinator.

Hence this leads to a No-Coordinator scenario.

Data Loss Scenario:

In Incremental Cooperative Rebalancing (ICR) mode, when rebalance happens, consumers do not stop consuming as their is no stop the world like in "Eager" mode of rebalancing.
In case a partition is removed from a task, Consumer co-ordinator calls "close(Collection()) of the sink Task. In this call since we are blindly dumping all the files, this will dump the records also for the partitions still retained by this task. Moreover close call will make the committer null and since we have not null check for commiter in the put(Collection) , this will silently ignore the records. Once we get an open(Collection) call from Kafka this will start the commiter without resetting the offsets which leads to data loss.

Document explaining both the scenario: https://docs.google.com/document/d/1okqGq1HXu2rDnq88wIlVDv0EmNFZYB1PhgwyAzWIKT8/edit?tab=t.0#heading=h.51qcys2ewbsa

close();
// We need to close worker here in every case to ensure exactly once.
committer.stop(ResourceType.WORKER);
if(committer.isLeader(partitions)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name isLeader now that it has been reworked, we should rename it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure will incorporate that.


package org.apache.iceberg.connect;

public enum ResourceType {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this enum, we can just have methods for each in the committer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah thought that initially, did not do as it would have made a lot of methods in the committer but will make that change

worker = new Worker(config, clientFactory, sinkWriter, context);
worker.start();
@Override
public void start(ResourceType resourceType) {
Copy link
Contributor

@bryanck bryanck Feb 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just call the type-specific methods directly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

coordinatorThread.terminate();
coordinatorThread = null;
@Override
public void stop(ResourceType resourceType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, we can just call the type-specific methods directly instead.

public void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext context) {
KafkaClientFactory clientFactory = new KafkaClientFactory(config.kafkaProps());

public boolean isLeader(Collection<TopicPartition> currentAssignedPartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should rename this now that its functionality has changed

@bryanck
Copy link
Contributor

bryanck commented Feb 23, 2025

We should add some comments, and also tests if feasible. Also looks like there are some code formatting issues.

@bryanck bryanck changed the title Handling no coordinator and data loss in current design in icr mode Handling no coordinator and data loss in ICR mode Feb 24, 2025

public interface Committer {
void start(Catalog catalog, IcebergSinkConfig config, SinkTaskContext context);
void startCoordinator();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I should have brought up sooner, the Committer interface is meant to be generic, i.e. you could plug in a committer that uses server-side coordination, and in that case these new methods aren't relevant. Also, this is modifying a public interface, and that could be a breaking change for some.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do not want to modify the interface, we somehow need to pass the current partitions either added or removed for the committer to handle the coordination start and termination logic

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since commiter is being only used in the sink task, and committerImpl is only class which is implementing this interface, I did not get how this will break the compatibility. Also are we planning to expose a config for providing implementation of committer. Also what is meant by "server-side coordination" here.


void save(Collection<SinkRecord> sinkRecords);

boolean isCoordinator(Collection<TopicPartition> currentAssignedPartitions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the naming here is accurate either, i.e. this could return false even for a task that is the coordinator node. Also, this isn't relevant to other types of committers that aren't using a coordinator.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please suggest some name for this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this would return false for a node that is coordinator. This would not return false for a coordinator node

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a coordinator node calls close with the partitions being closed, and the partitions don't contain the first partition, then it will return false.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, isn't that correct, since it did not contained the partition 0 then committer is telling that you are not the coordinator. Hence the name isCoordinator. Or I can alternatively name it is amITheCoordinator

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is similar to the kafka where they call the method isLeader() to check if the consumer is the leader of the group. Likewise the task is check isCoordinator.

committer = CommitterFactory.createCommitter(config);
committer.start(catalog, config, context);
// We should be starting co-ordinator only the list of partitions has the zeroth partition.
if(committer.isCoordinator(partitions)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move this logic into the committer implementation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to move this logic to the committer then we need to somehow pass the the partitions information and that would require modification to the commuter method signatures

// We need to close worker here in every case to ensure exactly once otherwise this will lead to duplicate records.
committer.stopWorker();
// Coordinator should only be closed if this received closed partitions has the partition which elected this task as coordinator.
if(committer.isCoordinator(partitions)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, we should move this into the committer implementation rather than having it in the sink task.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to move this logic to the committer then we need to somehow pass the the partitions information and that would require modification to the commuter method signatures

// We need to close worker here in every case to ensure exactly once otherwise this will lead to duplicate records.
committer.stopWorker();
// Coordinator should only be closed if this received closed partitions has the partition which elected this task as coordinator.
if(committer.isCoordinator(partitions)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, we should move this into the committer implementation rather than having it in the sink task.

}

private void close() {
if (committer != null) {
committer.stop();
committer.stopWorker();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, we should keep a generic stop and let the committer implementation decide what to do.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants