8000 KAFKA-18888: Add KIP-877 support to Authorizer by mimaison · Pull Request #19050 · apache/kafka · GitHub
[go: up one dir, main page]

Skip to content

KAFKA-18888: Add KIP-877 support to Authorizer #19050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 15, 2025

Conversation

mimaison
Copy link
Member
@mimaison mimaison commented Feb 27, 2025

This also adds metrics to StandardAuthorizer

Reviewers: Chia-Ping Tsai chia7712@gmail.com, Ken Huang
s7133700@gmail.com, Jhen-Yung Hsu jhenyunghsu@gmail.com, TaiJuWu
tjwu1217@gmail.com

@mimaison mimaison force-pushed the kafka-18888 branch 2 times, most recently from 01beae3 to 9127ff4 Compare February 28, 2025 09:59
@mimaison mimaison marked this pull request as draft March 3, 2025 21:04
@mimaison mimaison marked this pull request as ready for review March 24, 2025 15:02
Copy link
Collaborator
@Yunyung Yunyung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. We should mention Implement the Monitorable interface... in Authorizer's javadoc.

8000
@mimaison
Copy link
Member Author
mimaison commented Apr 3, 2025

@Yunyung Sorry the PR was not fully complete. I updated the javadoc and added an integration test. It should be ready to review now.

@@ -2654,7 +2655,7 @@ class ReplicaManager(val config: KafkaConfig,
config.replicaSelectorClassName.map { className =>
val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className, classOf[ReplicaSelector])
tmpReplicaSelector.configure(config.originals())
Plugin.wrapInstance(tmpReplicaSelector, metrics, className)
Plugin.wrapInstance(tmpReplicaSelector, metrics, ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes an issue we introduced in 79fe130

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fix it!

Copy link
Collaborator
@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mimaison for this patch, left some comments, PTAL

@@ -766,7 +766,7 @@ class BrokerServer(
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (dataPlaneRequestProcessor != null)
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename "authorizer" to "authorizer plugin" for clarity? Since we explicitly close the authorizer in Plugin#close().

@@ -467,7 +470,7 @@ class ControllerServer(
CoreUtils.swallow(quotaManagers.shutdown(), this)
Utils.closeQuietly(controller, "controller")
Utils.closeQuietly(quorumControllerMetrics, "quorum controller metrics")
authorizer.foreach(Utils.closeQuietly(_, "authorizer"))
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: Should we rename "authorizer" to "authorizer plugin" for clarity? Since we explicitly close the authorizer in Plugin#close().

Comment on lines 97 to 98
@Warmup(iterations = 1)
@Measurement(iterations = 3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why is this change necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No this is unwanted, I just used these smaller numbers to run the benchmark more quickly. I'll revert it.

Comment on lines 54 to 55
* The standard authorizer which is used in KRaft-based clusters if no other authorizer is
* configured.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove KRaft-related keywords to improve this document. because we only have KRaft based cluster in 4.X.

@@ -250,7 +251,7 @@ public GroupCoordinatorShard build() {
throw new IllegalArgumentException("TopicPartition must be set.");
if (groupConfigManager == null)
throw new IllegalArgumentException("GroupConfigManager must be set.");
if (authorizer == null)
if (authorizerPlugin == null)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check authorizerPlugin.isEmpty()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to pass Optional.empty() down the call stack.

Comment on lines +509 to +510
public Builder withAuthorizerPlugin(Plugin<Authorizer> authorizerPlugin) {
this.authorizerPlugin = Optional.of(authorizerPlugin);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set the parameter is Optional<Plugin<Authorizer>> authorizerPlugin in GroupMetadataManager, If we consistently pass Plugin<Authorizer> into the builder and wrap it inside the method, can we avoid the need to check for null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell the other fields are also always set, so I'll leave the null check for consistency.

Comment on lines 67 to 69
try (Admin admin = Admin.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) {
admin.describeCluster().clusterId().get();
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need to descibe the cluster information in this test, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right we don't need it now, I had it because I was also testing other server plugins. I'll delete it for now.

authorizer.ifPresent(authorizer -> {
ClusterMetadataAuthorizer clusterMetadataAuthorizer = (ClusterMetadataAuthorizer) authorizer.get();
clusterMetadataAuthorizer.completeInitialLoad(new TimeoutException());
});
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was already missing. Added

Copy link
Collaborator
@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for update, a few more comments about the documentation updates

@@ -3284,7 +3285,7 @@ private boolean maybeUpdateRegularExpressions(
* @param log The log instance.
* @param time The time instance.
* @param image The metadata image to use for listing the topics.
* @param authorizer The authorizer.
* @param authorizerPlugin The authorizer.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about update to The authorizer is wrapped in a {@link org.apache.kafka.common.internals.Plugin}?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit overkill. This field was an Optional<Authorizer> and we just said The authorizer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, make sense to me.

@@ -3352,15 +3353,15 @@ public static Map<String, ResolvedRegularExpression> refreshRegularExpressions(
* that the member is authorized to describe.
*
* @param context The request context.
* @param authorizer The authorizer.
* @param authorizerPlugin The authorizer.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: How about update to The authorizer is wrapped in a {@link org.apache.kafka.common.internals.Plugin}?

@@ -79,17 +80,17 @@ public Builder addReadinessFutures(
/**
* Build the EndpointReadyFutures object.
*
* @param authorizer The authorizer to use, if any. Will be started.
* @param info Server information to be passed to the authorizer.
* @param authorizerPlugin The authorizer to use, if any. Will be started.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about update to The authorizer which is wrapped in a {@link org.apache.kafka.common.internals.Plugin} to use?

Copy link
Collaborator
@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Will you be addressing #19068 (comment) in this PR?

@mimaison
Copy link
Member Author

Overall LGTM. Will you be addressing #19068 (comment) in this PR?

I pushed a commit to address it.

Copy link
Collaborator
@Yunyung Yunyung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Copy link
Collaborator
@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for @mimaison update, left some comments

public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key, Map<String, String> extraTags) {
/**
* Wrap an instance into a Plugin.
* @param instance the instance to wrap
Copy link
Collaborator
@m1a2st m1a2st Apr 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also mention that the instance's class name will be used as the value for the <code>class</code> tag.

* @param key the value for the <code>config</code> tag
* @return the plugin
*/
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key, String name, String value) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to use a LinkedHashMap instead of separate String name/value pairs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment all the callers have a single extra tag, so it's simpler than having them build a LinkedHashMap.

@@ -60,6 +99,13 @@ private static <T> Map<String, String> tags(String key, T instance) {
return tags;
}

/**
* Wrap a list of instances into Plugins.
* @param instances the instances to wrap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto I think we should also mention that the instance's class name will be used as the value for the <code>class</code> tag.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already mentioned in the class javadoc

Comment on lines +57 to +64
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, Supplier<Map<String, String>> tagsSupplier) {
PluginMetricsImpl pluginMetrics = null;
if (instance instanceof Monitorable && metrics != null) {
pluginMetrics = new PluginMetricsImpl(metrics, tagsSupplier.get());
((Monitorable) instance).withPluginMetrics(pluginMetrics);
}
return new Plugin<>(instance, pluginMetrics);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also ask the method PluginMetricsImpl#metricName, it also use Map.putAll(), Should we also update it for order?

@Override
public MetricName metricName(String name, String description, Map<String, String> tags) {
    if (closing) throw new IllegalStateException("This PluginMetrics instance is closed");
    for (String tagName : tags.keySet()) {
        if (this.tags.containsKey(tagName)) {
            throw new IllegalArgumentException("Cannot use " + tagName + " as a tag name");
        }
    }
    Map<String, String> metricsTags = new LinkedHashMap<>(this.tags);
    metricsTags.putAll(tags);
    return metrics.metricName(name, GROUP, description, metricsTags);
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should update the PluginMetrics API to require a LinkedHashMap, or at least mention it in the javadoc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I’d prefer to require a LinkedHashMap rather than just mentioning it in the JavaDoc, since JavaDoc alone doesn’t enforce anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to open a jira if you want to handle this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!


/**
* Wrap an instance into a Plugin.
* @param instance the instance to wrap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto I think we should also mention that the instance's class name will be used as the value for the <code>class</code> tag.

@@ -40,14 +47,46 @@ private Plugin(T instance, PluginMetricsImpl pluginMetrics) {
this.pluginMetrics = Optional.ofNullable(pluginMetrics);
}

/**
* Wrap an instance into a Plugin.
* @param instance the instance to wrap
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto I think we should also mention that the instance's class name will be used as the value for the <code>class</code> tag.

@mimaison
Copy link
Member Author

@chia7712 If you have time, can you take a look? Thanks

this.authorizerMetrics = new AuthorizerMetrics(metrics);
}

private class AuthorizerMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered exposing AuthorizerMetrics as public APIs for custom Authorizer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frankly, no I didn't. This would require updating KIP-877 if we wanted to do so.

@github-actions github-actions bot added storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature build Gradle build or GitHub Actions labels Apr 15, 2025
Copy link
Member
@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall LGTM, and only one small comment is left

private final Sensor authorizationRequestSensor;

private AuthorizerMetrics(PluginMetrics metrics) {
authorizationAllowedSensor = metrics.addSensor("authorizer-authorization-allowed");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those metrics are bound with StandardAuthorizer, and maybe the metrics in KIP-877 should use the explicit authorizer name.

kafka.server:type=plugins,config=authorizer.class.name,class=StandardAuthorizer,name=acls-total-count

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was a typo in the KIP, updated.


public static class MonitorableReplicaSelector extends RackAwareReplicaSelector implements Monitorable {

private static final int METRICS_COUNT = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excuse me, why to set a static constant in a inner class? Maybe it can be moved to MonitorablePluginsIntegrationTest?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put it in the class as it's the number of metrics this class registers.

@mimaison mimaison merged commit fb2ce76 into apache:trunk Apr 15, 2025
41 checks passed
@mimaison mimaison deleted the kafka-18888 branch April 15, 2025 17:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Gradle build or GitHub Actions clients core Kafka Broker kraft performance storage Pull requests that target the storage module tiered-storage Related to the Tiered Storage feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants
0