-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-18888: Add KIP-877 support to Authorizer #19050
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
01beae3
to
9127ff4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. We should mention Implement the Monitorable interface...
in Authorizer's javadoc.
@Yunyung Sorry the PR was not fully complete. I updated the javadoc and added an integration test. It should be ready to review now. |
@@ -2654,7 +2655,7 @@ class ReplicaManager(val config: KafkaConfig, | |||
config.replicaSelectorClassName.map { className => | |||
val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className, classOf[ReplicaSelector]) | |||
tmpReplicaSelector.configure(config.originals()) | |||
Plugin.wrapInstance(tmpReplicaSelector, metrics, className) | |||
Plugin.wrapInstance(tmpReplicaSelector, metrics, ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes an issue we introduced in 79fe130
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fix it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mimaison for this patch, left some comments, PTAL
@@ -766,7 +766,7 @@ class BrokerServer( | |||
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this) | |||
if (dataPlaneRequestProcessor != null) | |||
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this) | |||
authorizer.foreach(Utils.closeQuietly(_, "authorizer")) | |||
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename "authorizer" to "authorizer plugin" for clarity? Since we explicitly close the authorizer in Plugin#close()
.
@@ -467,7 +470,7 @@ class ControllerServer( | |||
CoreUtils.swallow(quotaManagers.shutdown(), this) | |||
Utils.closeQuietly(controller, "controller") | |||
Utils.closeQuietly(quorumControllerMetrics, "quorum controller metrics") | |||
authorizer.foreach(Utils.closeQuietly(_, "authorizer")) | |||
authorizerPlugin.foreach(Utils.closeQuietly(_, "authorizer")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: Should we rename "authorizer" to "authorizer plugin" for clarity? Since we explicitly close the authorizer in Plugin#close()
.
@Warmup(iterations = 1) | ||
@Measurement(iterations = 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, why is this change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No this is unwanted, I just used these smaller numbers to run the benchmark more quickly. I'll revert it.
* The standard authorizer which is used in KRaft-based clusters if no other authorizer is | ||
* configured. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove KRaft-related keywords to improve this document. because we only have KRaft based cluster in 4.X.
@@ -250,7 +251,7 @@ public GroupCoordinatorShard build() { | |||
throw new IllegalArgumentException("TopicPartition must be set."); | |||
if (groupConfigManager == null) | |||
throw new IllegalArgumentException("GroupConfigManager must be set."); | |||
if (authorizer == null) | |||
if (authorizerPlugin == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also check authorizerPlugin.isEmpty()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine to pass Optional.empty()
down the call stack.
public Builder withAuthorizerPlugin(Plugin<Authorizer> authorizerPlugin) { | ||
this.authorizerPlugin = Optional.of(authorizerPlugin); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We set the parameter is Optional<Plugin<Authorizer>> authorizerPlugin
in GroupMetadataManager
, If we consistently pass Plugin<Authorizer>
into the builder and wrap it inside the method, can we avoid the need to check for null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell the other fields are also always set, so I'll leave the null
check for consistency.
try (Admin admin = Admin.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { | ||
admin.describeCluster().clusterId().get(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need to descibe the cluster information in this test, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right we don't need it now, I had it because I was also testing other server plugins. I'll delete it for now.
authorizer.ifPresent(authorizer -> { | ||
ClusterMetadataAuthorizer clusterMetadataAuthorizer = (ClusterMetadataAuthorizer) authorizer.get(); | ||
clusterMetadataAuthorizer.completeInitialLoad(new TimeoutException()); | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: new line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was already missing. Added
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for update, a few more comments about the documentation updates
@@ -3284,7 +3285,7 @@ private boolean maybeUpdateRegularExpressions( | |||
* @param log The log instance. | |||
* @param time The time instance. | |||
* @param image The metadata image to use for listing the topics. | |||
* @param authorizer The authorizer. | |||
* @param authorizerPlugin The authorizer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about update to The authorizer is wrapped in a {@link org.apache.kafka.common.internals.Plugin}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a bit overkill. This field was an Optional<Authorizer>
and we just said The authorizer
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, make sense to me.
@@ -3352,15 +3353,15 @@ public static Map<String, ResolvedRegularExpression> refreshRegularExpressions( | |||
* that the member is authorized to describe. | |||
* | |||
* @param context The request context. | |||
* @param authorizer The authorizer. | |||
* @param authorizerPlugin The authorizer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: How about update to The authorizer is wrapped in a {@link org.apache.kafka.common.internals.Plugin}
?
@@ -79,17 +80,17 @@ public Builder addReadinessFutures( | |||
/** | |||
* Build the EndpointReadyFutures object. | |||
* | |||
* @param authorizer The authorizer to use, if any. Will be started. | |||
* @param info Server information to be passed to the authorizer. | |||
* @param authorizerPlugin The authorizer to use, if any. Will be started. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about update to The authorizer which is wrapped in a {@link org.apache.kafka.common.internals.Plugin} to use
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Will you be addressing #19068 (comment) in this PR?
I pushed a commit to address it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @mimaison update, left some comments
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key, Map<String, String> extraTags) { | ||
/** | ||
* Wrap an instance into a Plugin. | ||
* @param instance the instance to wrap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also mention that the instance's class name will be used as the value for the <code>class</code>
tag.
* @param key the value for the <code>config</code> tag | ||
* @return the plugin | ||
*/ | ||
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, String key, String name, String value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to use a LinkedHashMap
instead of separate String name/value pairs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment all the callers have a single extra tag, so it's simpler than having them build a LinkedHashMap
.
@@ -60,6 +99,13 @@ private static <T> Map<String, String> tags(String key, T instance) { | |||
return tags; | |||
} | |||
|
|||
/** | |||
* Wrap a list of instances into Plugins. | |||
* @param instances the instances to wrap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto I think we should also mention that the instance's class name will be used as the value for the <code>class</code>
tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already mentioned in the class javadoc
public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, Supplier<Map<String, String>> tagsSupplier) { | ||
PluginMetricsImpl pluginMetrics = null; | ||
if (instance instanceof Monitorable && metrics != null) { | ||
pluginMetrics = new PluginMetricsImpl(metrics, tagsSupplier.get()); | ||
((Monitorable) instance).withPluginMetrics(pluginMetrics); | ||
} | ||
return new Plugin<>(instance, pluginMetrics); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also ask the method PluginMetricsImpl#metricName, it also use Map.putAll()
, Should we also update it for order?
@Override
public MetricName metricName(String name, String description, Map<String, String> tags) {
if (closing) throw new IllegalStateException("This PluginMetrics instance is closed");
for (String tagName : tags.keySet()) {
if (this.tags.containsKey(tagName)) {
throw new IllegalArgumentException("Cannot use " + tagName + " as a tag name");
}
}
Map<String, String> metricsTags = new LinkedHashMap<>(this.tags);
metricsTags.putAll(tags);
return metrics.metricName(name, GROUP, description, metricsTags);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should update the PluginMetrics
API to require a LinkedHashMap
, or at least mention it in the javadoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I’d prefer to require a LinkedHashMap
rather than just mentioning it in the JavaDoc, since JavaDoc alone doesn’t enforce anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to open a jira if you want to handle this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, Open https://issues.apache.org/jira/browse/KAFKA-19139 to trace it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
|
||
/** | ||
* Wrap an instance into a Plugin. | ||
* @param instance the instance to wrap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto I think we should also mention that the instance's class name will be used as the value for the <code>class</code>
tag.
@@ -40,14 +47,46 @@ private Plugin(T instance, PluginMetricsImpl pluginMetrics) { | |||
this.pluginMetrics = Optional.ofNullable(pluginMetrics); | |||
} | |||
|
|||
/** | |||
* Wrap an instance into a Plugin. | |||
* @param instance the instance to wrap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto I think we should also mention that the instance's class name will be used as the value for the <code>class</code>
tag.
@chia7712 If you have time, can you take a look? Thanks |
this.authorizerMetrics = new AuthorizerMetrics(metrics); | ||
} | ||
|
||
private class AuthorizerMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered exposing AuthorizerMetrics
as public APIs for custom Authorizer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Frankly, no I didn't. This would require updating KIP-877 if we wanted to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall LGTM, and only one small comment is left
private final Sensor authorizationRequestSensor; | ||
|
||
private AuthorizerMetrics(PluginMetrics metrics) { | ||
authorizationAllowedSensor = metrics.addSensor("authorizer-authorization-allowed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
those metrics are bound with StandardAuthorizer
, and maybe the metrics in KIP-877 should use the explicit authorizer name.
kafka.server:type=plugins,config=authorizer.class.name,class=StandardAuthorizer,name=acls-total-count
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was a typo in the KIP, updated.
|
||
public static class MonitorableReplicaSelector extends RackAwareReplicaSelector implements Monitorable { | ||
|
||
private static final int METRICS_COUNT = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excuse me, why to set a static constant in a inner class? Maybe it can be moved to MonitorablePluginsIntegrationTest
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've put it in the class as it's the number of metrics this class registers.
This also adds metrics to StandardAuthorizer
Reviewers: Chia-Ping Tsai chia7712@gmail.com, Ken Huang
s7133700@gmail.com, Jhen-Yung Hsu jhenyunghsu@gmail.com, TaiJuWu
tjwu1217@gmail.com