Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #176: Changes needed to get the tests working, at least #177

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from

Conversation

tombentley
Copy link
Contributor

Type of change

Select the type of your PR

  • Enhancement / new feature

Description

Adds support for running Kraft-based clusters with separate controller nodes.

  • Write tests
  • Make sure all tests pass
  • Update documentation
  • Reference relevant issue(s) and close them after merging
  • Update CHANGELOG.md

* See the class JavaDoc for example topologies.
* @return true to use combined mode.
*/
public boolean combinedMode() default true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really make sense? I admit its what we talked about in the other PR and thus what you've done.

I wonder if we should just expose MetadataMode here instead? I guess metadataMode = Zookeeper doesn't make sense here.

I almost wonder if we should drop the KraftCluster & ZookeeperCluster annotations and just have KafkaCluster(medataData = [KRaftCombined | KRaftIndependent | Zookeeper] ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's a bit murky. It is certainly true that we could express all topologies using something like this:

@KafkaCluster(numXxx=X, metadataMode=Y, numBrokers=Z)

But I'm not sure what to call numXxx. It can't be numControllers, because in ZK mode the controller is a broker and the ZK nodes are counted separately. Maybe metadataQuorumSize would work?

I'm inclined to do the simplest possible thing in this PR and consider the API question separately. If nothing else to make the PRs of a sensible size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think separate PR for an api change makes sense.

I'll give this another pass with that in mind.

Because it's not supported yet.

Signed-off-by: Tom Bentley <[email protected]>
Signed-off-by: Tom Bentley <[email protected]>
Signed-off-by: Tom Bentley <[email protected]>
Signed-off-by: Tom Bentley <[email protected]>
Signed-off-by: Tom Bentley <[email protected]>
@sonarcloud
Copy link

sonarcloud bot commented Sep 13, 2023

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 33 Code Smells

No Coverage information No Coverage information
0.0% 0.0% Duplication

Copy link
Member

@SamBarker SamBarker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with the general shape of this. A few thoughts to be discussed about the details.

public enum MetadataMode {
ZOOKEEPER,
KRAFT_COMBINED,
KRAFT_SEPARATE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
KRAFT_SEPARATE;
KRAFT_DEDICATED;

@@ -218,7 +216,7 @@ public Stream<ConfigHolder> getBrokerConfigs(Supplier<KafkaEndpoints> endPointCo
*/
@NotNull
public ConfigHolder generateConfigForSpecificNode(KafkaEndpoints kafkaEndpoints, int nodeId) {
final var role = determineRole(nodeId);
final var roles = nodeId >= numNodes() ? EnumSet.of(NodeRole.BROKER) : processRoles(nodeId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this nodeId >= numNodes() check be part of processRoles() ?

if (role.contains(BROKER_ROLE)) {
configHolder = configureBroker(kafkaEndpoints, nodeId, protocolMap, listeners, advertisedListeners, earlyStart, nodeConfiguration);
if (NodeRole.hasBrokerRole(roles)) {
var interBrokerEndpoint = kafkaEndpoints.getEndpointPair(Listener.INTERNAL, nodeId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you inline configureBroker again for a particular reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Broker and controller are non-exclusive roles which a node may have. Having separate methods for broker and controller obscures the combined case. Certainly we can try to tidy this up a bit, but I think it would be better to move the ConfigHolder instantiation outside of the if/else, to make it clear that it's only the endpoints which are different when a node is a broker.

if (clusterConfig.isKraftMode() && isController(nodeId)) {
throw new UnsupportedOperationException("Cannot remove controller node " + nodeId + " from a kraft cluster.");
if (!clusterConfig.isPureBroker(nodeId)) {
throw new UnsupportedOperationException("Node " + nodeId + " is not a pure broker.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand what this message means but is the pure broker terminology going to be meaningful to users?

@@ -315,7 +331,7 @@ public synchronized void removeBroker(int nodeId) throws IllegalArgumentExceptio
throw new IllegalStateException("Cannot remove nodes from a cluster with stopped nodes.");
}

var target = servers.keySet().stream().filter(n -> n != nodeId).findFirst();
var target = servers.keySet().stream().filter(n -> n != nodeId && clusterConfig.hasBrokerRole(n)).findFirst();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is && clusterConfig.hasBrokerRole(n) really needed in the filter clause? Shouldn't we have validated that up front that clusterConfig.hasBrokerRole(nodeId) is true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about finding the target brokers for replicas currently on nodeId, I.e. n here is not the same as nodeId, and without the extra condition n might be a controller, but not a broker, thus cannot replicate any partitions hosted the broker which is being removed.

Comment on lines +415 to +416
return key < clusterConfig.numNodes() // dynamically added nodes are always pure brokers
&& clusterConfig.hasControllerRole(key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think pushing this back to the clusterConfig is a good move, but it moves us back to playing games with nodeId's rather testing for something that is an inherent property of the feature we care about.

Separately I don't see why we need the first half of the condition key < clusterConfig.numNodes() it either has the controller role or not. Sprinkling key < clusterConfig.numNodes() throughout just defuses the definition of what is and isn't a controller throughout the codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster config is immutable and encodes the requested number of brokers and controllers. When nodes are added dynamically it doesn't know about them. We might be able to have it reason about node.id > numNodes(), but when what does numNodes() actually mean? Since it's the KafkaCluster that is exposing add/removeBroker() it seems reasonable to expect it to be responsible for knowing about the node ids required.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we are agreeing or not.

I think guessing if something is a controller or not based on its ID is not a good idea. It just bakes in assumptions about the order of operations. The old model of checking the port allocations was an odd choice but at least tested for a property of being a controller or not.

It still strikes me that we should have some way to say given a nodeID what roles does it have. Pushing the question of what does this nodeID do into clusterConfig and thus hiding the nodeID guessing is a decent compromise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we agree? With what I've done in this PR then given a MetadataMode and the number of brokers and controllers it's completely deterministic what roles any given node id should have. There is no guessing, and the order of operations cannot change the role: If a node ends up configured with different roles it's simply a bug.

// TODO this is nasty. We shouldn't need to go via the portAllocator to figure out what a node is
// But it is at least testing something meaningful about the configuration
return portsAllocator.hasRegisteredPort(Listener.ANON, key);
return key >= clusterConfig.numNodes() // dynamically added nodes are always pure brokers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record similar thoughts apply here as to isController above

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants