8000 KAFKA-18332 fix ClassDataAbstractionCoupling problem in KafkaRaftClie… · coderabbit-test/kafka@6f5be29 · GitHub
[go: up one dir, main page]

Skip to content

Commit 6f5be29

Browse files
authored
KAFKA-18332 fix ClassDataAbstractionCoupling problem in KafkaRaftClientTest(1/2) (apache#18926)
- extract a unit test named `KafkaRaftClientClusterAuthTest` to reduce the number of imported class Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1 parent 50fb993 commit 6f5be29

File tree

2 files changed

+185
-157
lines changed

2 files changed

+185
-157
lines changed
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
< 10000 /td>16+
*/
17+
18+
package org.apache.kafka.raft;
19+
20+
import org.apache.kafka.common.errors.ClusterAuthorizationException;
21+
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
22+
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
23+
import org.apache.kafka.common.message.FetchResponseData;
24+
import org.apache.kafka.common.message.VoteResponseData;
25+
import org.apache.kafka.common.protocol.Errors;
26+
27+
import org.junit.jupiter.params.ParameterizedTest;
28+
import org.junit.jupiter.params.provider.ValueSource;
29+
30+
import java.util.List;
31+
import java.util.Set;
32+
33+
import static org.apache.kafka.raft.KafkaRaftClientTest.randomReplicaId;
34+
import static org.apache.kafka.raft.RaftClientTestContext.Builder.DEFAULT_ELECTION_TIMEOUT_MS;
35+
import static org.junit.jupiter.api.Assertions.assertEquals;
36+
import static org.junit.jupiter.api.Assertions.assertThrows;
37+
38+
public class KafkaRaftClientClusterAuthTest {
39+
@ParameterizedTest
40+
@ValueSource(booleans = { true, false })
41+
void testClusterAuthorizationFailedInFetch(boolean withKip853Rpc) throws Exception {
42+
int localId = randomReplicaId();
43+
int otherNodeId = localId + 1;
44+
int epoch = 5;
45+
Set<Integer> voters = Set.of(localId, otherNodeId);
46+
47+
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
48+
.withKip853Rpc(withKip853Rpc)
49+
.withElectedLeader(epoch, otherNodeId)
50+
.build();
51+
52+
context.assertElectedLeader(epoch, otherNodeId);
53+
54+
context.pollUntilRequest();
55+
56+
RaftRequest.Outbound request = context.assertSentFetchRequest(epoch, 0, 0);
57+
FetchResponseData response = new FetchResponseData()
58+
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
59+
context.deliverResponse(
60+
request.correlationId(),
61+
request.destination(),
62+
response
63+
);
64+
assertThrows(ClusterAuthorizationException.class, context.client::poll);
65+
}
66+
67+
@ParameterizedTest
68+
@ValueSource(booleans = { true, false })
69+
void testClusterAuthorizationFailedInBeginQuorumEpoch(boolean withKip853Rpc) throws Exception {
70+
int localId = randomReplicaId();
71+
int otherNodeId = localId + 1;
72+
int epoch = 5;
73+
Set<Integer> voters = Set.of(localId, otherNodeId);
74+
75+
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
76+
.updateRandom(r -> r.mockNextInt(DEFAULT_ELECTION_TIMEOUT_MS, 0))
77+
.withUnknownLeader(epoch - 1)
78+
.withKip853Rpc(withKip853Rpc)
79+
.build();
80+
81+
context.time.sleep(context.electionTimeoutMs());
82+
context.expectAndGrantPreVotes(epoch - 1);
83+
context.expectAndGrantVotes(epoch);
84+
85+
context.pollUntilRequest();
86+
List<RaftRequest.Outbound> requests = context.collectBeginEpochRequests(epoch);
87+
assertEquals(1, requests.size());
88+
RaftRequest.Outbound request = requests.get(0);
89+
assertEquals(otherNodeId, request.destination().id());
90+
BeginQuorumEpochResponseData response = new BeginQuorumEpochResponseData()
91+
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
92+
93+
context.deliverResponse(request.correlationId(), request.destination(), response);
94+
assertThrows(ClusterAuthorizationException.class, context.client::poll);
95+
}
96+
97+
@ParameterizedTest
98+
@ValueSource(booleans = { true, false })
99+
void testClusterAuthorizationFailedInVote(boolean withKip853Rpc) throws Exception {
100+
int localId = randomReplicaId();
101+
int otherNodeId = localId + 1;
102+
int epoch = 5;
103+
Set<Integer> voters = Set.of(localId, otherNodeId);
104+
105+
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
106+
.withUnknownLeader(epoch - 1)
107+
.withKip853Rpc(withKip853Rpc)
108+
.build();
109+
110+
// Become a candidate
111+
context.unattachedToCandidate();
112+
context.pollUntilRequest();
113+
context.assertVotedCandidate(epoch, localId);
114+
115+
RaftRequest.Outbound request = context.assertSentVoteRequest(epoch, 0, 0L, 1);
116+
VoteResponseData response = new VoteResponseData()
117+
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
118+
119+
context.deliverResponse(request.correlationId(), request.destination(), response);
120+
assertThrows(ClusterAuthorizationException.class, context.client::poll);
121+
}
122+
123+
@ParameterizedTest
124+
@ValueSource(booleans = { true, false })
125+
void testClusterAuthorizationFailedInEndQuorumEpoch(boolean withKip853Rpc) throws Exception {
126+
int localId = randomReplicaId();
127+
int otherNodeId = localId + 1;
128+
Set<Integer> voters = Set.of(localId, otherNodeId);
129+
130+
RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters)
131+
.withUnknownLeader(1)
132+
.withKip853Rpc(withKip853Rpc)
133+
.build();
134+
135+
context.unattachedToLeader();
136+
int epoch = context.currentEpoch();
137+
138+
context.client.shutdown(5000);
139+
context.pollUntilRequest();
140+
141+
RaftRequest.Outbound request = context.assertSentEndQuorumEpochRequest(epoch, otherNodeId);
142+
EndQuorumEpochResponseData response = new EndQuorumEpochResponseData()
143+
.setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
144+
145+
context.deliverResponse(request.correlationId(), request.destination(), response);
146+
assertThrows(ClusterAuthorizationException.class, context.client::poll);
147+
}
148+
}

0 commit comments

Comments
 (0)
0