- ossrh
- https://oss.sonatype.org/content/repositories/snapshots
+ central-portal-snapshots
+ https://central.sonatype.com/repository/maven-snapshots/
true
false
@@ -154,7 +154,14 @@ With Gradle:
[source,groovy,subs="attributes,specialcharacters"]
----
repositories {
- maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
+ maven {
+ name = 'Central Portal Snapshots'
+ url = 'https://central.sonatype.com/repository/maven-snapshots/'
+ // Only search this repository for the specific dependency
+ content {
+ includeModule("com.rabbitmq", "{project-artifact-id}")
+ }
+ }
mavenCentral()
}
----
diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
index 33fac4dcbc..4c8a5239f5 100644
--- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
@@ -436,31 +436,6 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
/** Helper to configure TLS. */
interface TlsConfiguration {
- /**
- * Enable hostname verification.
- *
- * Hostname verification is enabled by default.
- *
- * @return the TLS configuration helper
- * @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link
- * #sslContext(SslContext)}
- */
- @Deprecated(forRemoval = true)
- TlsConfiguration hostnameVerification();
-
- /**
- * Enable or disable hostname verification.
- *
- *
Hostname verification is enabled by default.
- *
- * @param hostnameVerification whether to enable hostname verification or not
- * @return the TLS configuration helper
- * @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link
- * #sslContext(SslContext)}
- */
- @Deprecated(forRemoval = true)
- TlsConfiguration hostnameVerification(boolean hostnameVerification);
-
/**
* Netty {@link SslContext} for TLS connections.
*
diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java
index 8679844939..e69681f747 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Client.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Client.java
@@ -256,7 +256,7 @@ public Client(ClientParameters parameters) {
b.option(
ChannelOption.ALLOCATOR,
parameters.byteBufAllocator == null
- ? ByteBufAllocator.DEFAULT
+ ? Utils.byteBufAllocator()
: parameters.byteBufAllocator);
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java
index 7e2d1a7369..3c288d5428 100644
--- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java
+++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java
@@ -28,17 +28,22 @@
final class DynamicBatch implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
- private static final int MIN_BATCH_SIZE = 32;
- private static final int MAX_BATCH_SIZE = 8192;
+ private static final int MIN_BATCH_SIZE = 16;
private final BlockingQueue requests = new LinkedBlockingQueue<>();
private final BatchConsumer consumer;
- private final int configuredBatchSize;
+ private final int configuredBatchSize, minBatchSize, maxBatchSize;
private final Thread thread;
- DynamicBatch(BatchConsumer consumer, int batchSize) {
+ DynamicBatch(BatchConsumer consumer, int batchSize, int maxUnconfirmed) {
this.consumer = consumer;
- this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
+ if (batchSize < maxUnconfirmed) {
+ this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2);
+ } else {
+ this.minBatchSize = min(1, maxUnconfirmed / 2);
+ }
+ this.configuredBatchSize = batchSize;
+ this.maxBatchSize = batchSize * 2;
this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
this.thread.start();
}
@@ -69,15 +74,7 @@ private void loop() {
if (state.items.size() >= state.batchSize) {
this.maybeCompleteBatch(state, true);
} else {
- item = this.requests.poll();
- if (item == null) {
- this.maybeCompleteBatch(state, false);
- } else {
- state.items.add(item);
- if (state.items.size() >= state.batchSize) {
- this.maybeCompleteBatch(state, true);
- }
- }
+ pump(state, 2);
}
} else {
this.maybeCompleteBatch(state, false);
@@ -85,6 +82,22 @@ private void loop() {
}
}
+ private void pump(State state, int pumpCount) {
+ if (pumpCount <= 0) {
+ return;
+ }
+ T item = this.requests.poll();
+ if (item == null) {
+ this.maybeCompleteBatch(state, false);
+ } else {
+ state.items.add(item);
+ if (state.items.size() >= state.batchSize) {
+ this.maybeCompleteBatch(state, true);
+ }
+ this.pump(state, pumpCount - 1);
+ }
+ }
+
private static final class State {
int batchSize;
@@ -96,13 +109,14 @@ private void maybeCompleteBatch(State state, boolean increaseIfCompleted) {
boolean completed = this.consumer.process(state.items);
if (completed) {
if (increaseIfCompleted) {
- state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE);
+ state.batchSize = min(state.batchSize * 2, this.maxBatchSize);
} else {
- state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE);
+ state.batchSize = max(state.batchSize / 2, this.minBatchSize);
}
state.items = new ArrayList<>(state.batchSize);
}
} catch (Exception e) {
+ // e.printStackTrace();
LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage());
}
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java
index ee8c397e13..8c763cde86 100644
--- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java
+++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java
@@ -38,6 +38,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
DynamicBatchMessageAccumulator(
int subEntrySize,
int batchSize,
+ int maxUnconfirmedMessages,
Codec codec,
int maxFrameSize,
ToLongFunction publishSequenceFunction,
@@ -75,7 +76,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
}
return result;
},
- batchSize);
+ batchSize,
+ maxUnconfirmedMessages);
} else {
byte compressionCode =
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
@@ -124,7 +126,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
}
return result;
},
- batchSize * subEntrySize);
+ batchSize * subEntrySize,
+ maxUnconfirmedMessages);
}
}
diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java
index 5ae8faa7dd..691fc65c57 100644
--- a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java
+++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java
@@ -30,6 +30,7 @@ static MessageAccumulator createMessageAccumulator(
boolean dynamicBatch,
int subEntrySize,
int batchSize,
+ int maxUnconfirmedMessages,
CompressionCodec compressionCodec,
Codec codec,
ByteBufAllocator byteBufAllocator,
@@ -44,6 +45,7 @@ static MessageAccumulator createMessageAccumulator(
return new DynamicBatchMessageAccumulator(
subEntrySize,
batchSize,
+ maxUnconfirmedMessages,
codec,
maxFrameSize,
publishSequenceFunction,
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
index bf7f395fca..1be66aa36b 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java
@@ -372,20 +372,6 @@ private DefaultTlsConfiguration(EnvironmentBuilder environmentBuilder) {
this.environmentBuilder = environmentBuilder;
}
- @Override
- @SuppressWarnings("removal")
- public TlsConfiguration hostnameVerification() {
- this.hostnameVerification = true;
- return this;
- }
-
- @Override
- @SuppressWarnings("removal")
- public TlsConfiguration hostnameVerification(boolean hostnameVerification) {
- this.hostnameVerification = hostnameVerification;
- return this;
- }
-
@Override
public TlsConfiguration sslContext(SslContext sslContext) {
this.sslContext = sslContext;
@@ -436,7 +422,7 @@ static class DefaultNettyConfiguration implements NettyConfiguration {
private final EnvironmentBuilder environmentBuilder;
private EventLoopGroup eventLoopGroup;
- private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT;
+ private ByteBufAllocator byteBufAllocator = Utils.byteBufAllocator();
private Consumer channelCustomizer = noOpConsumer();
private Consumer bootstrapCustomizer = noOpConsumer();
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
index 27552512c8..fd3cb8f25d 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
@@ -180,6 +180,7 @@ public int fragmentLength(Object entity) {
dynamicBatch,
subEntrySize,
batchSize,
+ maxUnconfirmedMessages,
compressionCodec,
environment.codec(),
environment.byteBufAllocator(),
diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
index 43a57bbc5c..b44290868c 100644
--- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
+++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java
@@ -28,8 +28,8 @@
class StreamProducerBuilder implements ProducerBuilder {
- static final boolean DEFAULT_DYNAMIC_BATCH = true;
-// Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true"));
+ static final boolean DEFAULT_DYNAMIC_BATCH =
+ Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true"));
private final StreamEnvironment environment;
@@ -201,7 +201,7 @@ public Producer build() {
if (this.routingConfiguration == null && this.superStream != null) {
throw new IllegalArgumentException(
- "A routing configuration must specified when a super stream is set");
+ "A routing configuration must be specified when a super stream is set");
}
if (this.stream != null) {
diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java
index 4ea934e911..4b535bb21d 100644
--- a/src/main/java/com/rabbitmq/stream/impl/Utils.java
+++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java
@@ -19,6 +19,8 @@
import com.rabbitmq.stream.*;
import com.rabbitmq.stream.impl.Client.ClientParameters;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultiThreadIoEventLoopGroup;
@@ -415,6 +417,10 @@ static EventLoopGroup eventLoopGroup() {
return new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
}
+ static ByteBufAllocator byteBufAllocator() {
+ return PooledByteBufAllocator.DEFAULT;
+ }
+
/*
class to help testing SAC on super streams
*/
diff --git a/src/test/java/SanityCheck.java b/src/test/java/SanityCheck.java
index 4e550c8bd2..7ee907180c 100755
--- a/src/test/java/SanityCheck.java
+++ b/src/test/java/SanityCheck.java
@@ -1,5 +1,4 @@
///usr/bin/env jbang "$0" "$@" ; exit $?
-//REPOS mavencentral,ossrh-staging=https://oss.sonatype.org/content/groups/staging/,rabbitmq-packagecloud-milestones=https://packagecloud.io/rabbitmq/maven-milestones/maven2
//DEPS com.rabbitmq:stream-client:${env.RABBITMQ_LIBRARY_VERSION}
//DEPS org.slf4j:slf4j-simple:1.7.36
diff --git a/src/test/java/com/rabbitmq/stream/Cli.java b/src/test/java/com/rabbitmq/stream/Cli.java
index 7097725af4..5c583a461b 100644
--- a/src/test/java/com/rabbitmq/stream/Cli.java
+++ b/src/test/java/com/rabbitmq/stream/Cli.java
@@ -178,6 +178,30 @@ static List toConnectionInfoList(String json) {
return GSON.fromJson(json, new TypeToken>() {}.getType());
}
+ public static List listGroupConsumers(String stream, String reference) {
+ ProcessState process =
+ rabbitmqStreams(
+ format(
+ "list_stream_group_consumers -q --stream %s --reference %s "
+ + "--formatter table subscription_id,state",
+ stream, reference));
+
+ List itemList = Collections.emptyList();
+ String content = process.output();
+ String[] lines = content.split(System.lineSeparator());
+ if (lines.length > 1) {
+ itemList = new ArrayList<>(lines.length - 1);
+ for (int i = 1; i < lines.length; i++) {
+ String line = lines[i];
+ String[] fields = line.split("\t");
+ String id = fields[0];
+ String state = fields[1].replace("\"", "");
+ itemList.add(new SubscriptionInfo(Integer.parseInt(id), state));
+ }
+ }
+ return itemList;
+ }
+
public static void restartStream(String stream) {
rabbitmqStreams(" restart_stream " + stream);
}
@@ -420,6 +444,30 @@ public String toString() {
}
}
+ public static final class SubscriptionInfo {
+
+ private final int id;
+ private final String state;
+
+ public SubscriptionInfo(int id, String state) {
+ this.id = id;
+ this.state = state;
+ }
+
+ public int id() {
+ return this.id;
+ }
+
+ public String state() {
+ return this.state;
+ }
+
+ @Override
+ public String toString() {
+ return "SubscriptionInfo{id='" + id + '\'' + ", state='" + state + '\'' + '}';
+ }
+ }
+
public static class ProcessState {
private final InputStreamPumpState inputState;
diff --git a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java
index 02844308dd..987cf311c9 100644
--- a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java
@@ -20,7 +20,6 @@
import com.rabbitmq.stream.impl.ServerFrameHandler.DeliverVersion1FrameHandler;
import com.rabbitmq.stream.metrics.NoOpMetricsCollector;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -51,7 +50,7 @@ public MessageBuilder messageBuilder() {
ByteBuf generateFrameBuffer(
int nbMessages, long chunkOffset, int dataSize, Iterable messages) {
- ByteBuf bb = ByteBufAllocator.DEFAULT.buffer(1024);
+ ByteBuf bb = Utils.byteBufAllocator().buffer(1024);
bb.writeShort(Utils.encodeRequestCode(Constants.COMMAND_DELIVER))
.writeShort(Constants.VERSION_1)
.writeByte(1) // subscription id
diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java
index dca9810762..07a2877385 100644
--- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java
@@ -23,8 +23,11 @@
import com.rabbitmq.stream.impl.TestUtils.Sync;
import java.util.Locale;
import java.util.Random;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
@@ -68,7 +71,7 @@ void itemAreProcessed() {
sync.down(items.size());
return true;
};
- try (DynamicBatch batch = new DynamicBatch<>(action, 100)) {
+ try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) {
RateLimiter rateLimiter = RateLimiter.create(10000);
IntStream.range(0, itemCount)
.forEach(
@@ -99,7 +102,7 @@ void failedProcessingIsReplayed() throws Exception {
}
return result;
};
- try (DynamicBatch batch = new DynamicBatch<>(action, 100)) {
+ try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) {
int firstRoundCount = itemCount / 5;
IntStream.range(0, firstRoundCount)
.forEach(
@@ -118,4 +121,37 @@ void failedProcessingIsReplayed() throws Exception {
waitAtMost(() -> collected.get() == itemCount);
}
}
+
+ @Test
+ void lowThrottlingValueShouldStillHighPublishingRate() throws Exception {
+ int batchSize = 10;
+ Semaphore semaphore = new Semaphore(batchSize);
+ DynamicBatch.BatchConsumer action =
+ items -> {
+ semaphore.release(items.size());
+ return true;
+ };
+
+ try (DynamicBatch batch = new DynamicBatch<>(action, batchSize, 10_000)) {
+ MetricRegistry metrics = new MetricRegistry();
+ Meter rate = metrics.meter("publishing-rate");
+ AtomicBoolean keepGoing = new AtomicBoolean(true);
+ AtomicLong sequence = new AtomicLong();
+ new Thread(
+ () -> {
+ while (keepGoing.get() && !Thread.interrupted()) {
+ long id = sequence.getAndIncrement();
+ if (semaphore.tryAcquire()) {
+ batch.add(id);
+ rate.mark();
+ }
+ }
+ })
+ .start();
+ long start = System.nanoTime();
+ waitAtMost(
+ () ->
+ System.nanoTime() - start > TimeUnit.SECONDS.toNanos(1) && rate.getMeanRate() > 1000);
+ }
+ }
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java
index 28422d40ad..7198facf5a 100644
--- a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java
@@ -26,7 +26,6 @@
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.Properties;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.io.ByteArrayOutputStream;
@@ -149,7 +148,7 @@ public TestDesc(String description, List sizes, int expectedCalls) {
tests.forEach(
test -> {
Channel channel = Mockito.mock(Channel.class);
- Mockito.when(channel.alloc()).thenReturn(ByteBufAllocator.DEFAULT);
+ Mockito.when(channel.alloc()).thenReturn(Utils.byteBufAllocator());
Mockito.when(channel.writeAndFlush(Mockito.any()))
.thenReturn(Mockito.mock(ChannelFuture.class));
diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java
index 24f50ee0cd..e234d723c4 100644
--- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java
@@ -16,18 +16,22 @@
import static com.rabbitmq.stream.impl.Assertions.assertThat;
import static com.rabbitmq.stream.impl.LoadBalancerClusterTest.LOAD_BALANCER_ADDRESS;
+import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_1_2;
import static com.rabbitmq.stream.impl.TestUtils.newLoggerLevel;
import static com.rabbitmq.stream.impl.TestUtils.sync;
+import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static com.rabbitmq.stream.impl.ThreadUtils.threadFactory;
import static com.rabbitmq.stream.impl.Tuples.pair;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.InstanceOfAssertFactories.stream;
import ch.qos.logback.classic.Level;
import com.google.common.collect.Streams;
import com.google.common.util.concurrent.RateLimiter;
import com.rabbitmq.stream.*;
+import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster;
import com.rabbitmq.stream.impl.TestUtils.Sync;
import com.rabbitmq.stream.impl.Tuples.Pair;
@@ -41,15 +45,20 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
import org.junit.jupiter.api.*;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -201,15 +210,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
syncs = consumers.stream().map(c -> c.waitForNewMessages(100)).collect(toList());
syncs.forEach(s -> assertThat(s).completes());
- nodes.forEach(
- n -> {
- LOGGER.info("Restarting node {}...", n);
- Cli.restartNode(n);
- LOGGER.info("Restarted node {}.", n);
- });
- LOGGER.info("Rebalancing...");
- Cli.rebalance();
- LOGGER.info("Rebalancing over.");
+ restartCluster();
Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis());
@@ -291,8 +292,132 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ @BrokerVersionAtLeast(RABBITMQ_4_1_2)
+ void sacWithClusterRestart(boolean superStream) throws Exception {
+ environment =
+ environmentBuilder
+ .uris(URIS)
+ .netty()
+ .bootstrapCustomizer(
+ b -> {
+ b.option(
+ ChannelOption.CONNECT_TIMEOUT_MILLIS,
+ (int) BACK_OFF_DELAY_POLICY.delay(0).toMillis());
+ })
+ .environmentBuilder()
+ .maxConsumersByConnection(1)
+ .build();
+
+ int consumerCount = 3;
+ AtomicLong lastOffset = new AtomicLong(0);
+ String app = "app-name";
+ String s = TestUtils.streamName(testInfo);
+ ProducerState pState = null;
+ List consumers = Collections.emptyList();
+ try {
+ StreamCreator sCreator = environment.streamCreator().stream(s);
+ if (superStream) {
+ sCreator = sCreator.superStream().partitions(1).creator();
+ }
+ sCreator.create();
+
+ pState = new ProducerState(s, true, superStream, environment);
+ pState.start();
+
+ Map consumerStatus = new ConcurrentHashMap<>();
+ consumers =
+ IntStream.range(0, consumerCount)
+ .mapToObj(
+ i ->
+ new ConsumerState(
+ s,
+ environment,
+ b -> {
+ b.singleActiveConsumer()
+ .name(app)
+ .noTrackingStrategy()
+ .consumerUpdateListener(
+ ctx -> {
+ consumerStatus.put(i, ctx.isActive());
+ return OffsetSpecification.offset(lastOffset.get());
+ });
+ if (superStream) {
+ b.superStream(s);
+ } else {
+ b.stream(s);
+ }
+ },
+ (ctx, m) -> lastOffset.set(ctx.offset())))
+ .collect(toList());
+
+ Sync sync = pState.waitForNewMessages(100);
+ assertThat(sync).completes();
+ sync = consumers.get(0).waitForNewMessages(100);
+ assertThat(sync).completes();
+
+ String streamArg = superStream ? s + "-0" : s;
+
+ Callable checkConsumers =
+ () -> {
+ waitAtMost(
+ () -> {
+ List subscriptions = Cli.listGroupConsumers(streamArg, app);
+ LOGGER.info("Group consumers: {}", subscriptions);
+ return subscriptions.size() == consumerCount
+ && subscriptions.stream()
+ .filter(sub -> sub.state().startsWith("active"))
+ .count()
+ == 1
+ && subscriptions.stream()
+ .filter(sub -> sub.state().startsWith("waiting"))
+ .count()
+ == 2;
+ },
+ () ->
+ "Group consumers not in expected state: "
+ + Cli.listGroupConsumers(streamArg, app));
+ return null;
+ };
+
+ checkConsumers.call();
+
+ restartCluster();
+
+ Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis());
+
+ sync = pState.waitForNewMessages(100);
+ assertThat(sync).completes(ASSERTION_TIMEOUT);
+ int activeIndex =
+ consumerStatus.entrySet().stream()
+ .filter(Map.Entry::getValue)
+ .map(Map.Entry::getKey)
+ .findFirst()
+ .orElseThrow(() -> new IllegalStateException("No active consumer found"));
+
+ sync = consumers.get(activeIndex).waitForNewMessages(100);
+ assertThat(sync).completes(ASSERTION_TIMEOUT);
+
+ checkConsumers.call();
+
+ } finally {
+ if (pState != null) {
+ pState.close();
+ }
+ consumers.forEach(ConsumerState::close);
+ if (superStream) {
+ environment.deleteSuperStream(s);
+ } else {
+ environment.deleteStream(s);
+ }
+ }
+ }
+
private static class ProducerState implements AutoCloseable {
+ private static final AtomicLong MSG_ID_SEQ = new AtomicLong(0);
+
private static final byte[] BODY = "hello".getBytes(StandardCharsets.UTF_8);
private final String stream;
@@ -306,9 +431,19 @@ private static class ProducerState implements AutoCloseable {
final AtomicReference lastExceptionInstant = new AtomicReference<>();
private ProducerState(String stream, boolean dynamicBatch, Environment environment) {
+ this(stream, dynamicBatch, false, environment);
+ }
+
+ private ProducerState(
+ String stream, boolean dynamicBatch, boolean superStream, Environment environment) {
this.stream = stream;
- this.producer =
- environment.producerBuilder().stream(stream).dynamicBatch(dynamicBatch).build();
+ ProducerBuilder builder = environment.producerBuilder().dynamicBatch(dynamicBatch);
+ if (superStream) {
+ builder.superStream(stream).routing(m -> m.getProperties().getMessageIdAsString());
+ } else {
+ builder.stream(stream);
+ }
+ this.producer = builder.build();
}
void start() {
@@ -327,7 +462,14 @@ void start() {
try {
this.limiter.acquire(1);
this.producer.send(
- producer.messageBuilder().addData(BODY).build(), confirmationHandler);
+ producer
+ .messageBuilder()
+ .properties()
+ .messageId(MSG_ID_SEQ.getAndIncrement())
+ .messageBuilder()
+ .addData(BODY)
+ .build(),
+ confirmationHandler);
} catch (Throwable e) {
this.lastException.set(e);
this.lastExceptionInstant.set(Instant.now());
@@ -380,16 +522,27 @@ private static class ConsumerState implements AutoCloseable {
final AtomicReference postHandle = new AtomicReference<>(() -> {});
private ConsumerState(String stream, Environment environment) {
+ this(stream, environment, b -> b.stream(stream), (ctx, m) -> {});
+ }
+
+ private ConsumerState(
+ String stream,
+ Environment environment,
+ java.util.function.Consumer customizer,
+ MessageHandler delegateHandler) {
this.stream = stream;
- this.consumer =
- environment.consumerBuilder().stream(stream)
+ ConsumerBuilder builder =
+ environment
+ .consumerBuilder()
.offset(OffsetSpecification.first())
.messageHandler(
(ctx, m) -> {
+ delegateHandler.handle(ctx, m);
receivedCount.incrementAndGet();
postHandle.get().run();
- })
- .build();
+ });
+ customizer.accept(builder);
+ this.consumer = builder.build();
}
Sync waitForNewMessages(int messageCount) {
@@ -414,4 +567,16 @@ public void close() {
this.consumer.close();
}
}
+
+ private static void restartCluster() {
+ nodes.forEach(
+ n -> {
+ LOGGER.info("Restarting node {}...", n);
+ Cli.restartNode(n);
+ LOGGER.info("Restarted node {}.", n);
+ });
+ LOGGER.info("Rebalancing...");
+ Cli.rebalance();
+ LOGGER.info("Rebalancing over.");
+ }
}
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
index f076a78fde..9588002d54 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
@@ -216,7 +216,7 @@ void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed(boolean lazyInit
}
@Test
- void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) throws Exception {
+ void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) {
int messageCount = 100;
int streamCount = 20;
int producersCount = ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT * 3 + 10;
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
index e8d3f25e34..8dafb175ad 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java
@@ -26,7 +26,6 @@
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.impl.Client.ClientParameters;
import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException;
-import io.netty.buffer.ByteBufAllocator;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
@@ -94,7 +93,7 @@ Client.ClientParameters duplicate() {
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
null,
- ByteBufAllocator.DEFAULT,
+ Utils.byteBufAllocator(),
false,
type -> "locator-connection",
cf,
@@ -163,7 +162,7 @@ void shouldTryUrisOnInitializationFailure() throws Exception {
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
null,
- ByteBufAllocator.DEFAULT,
+ Utils.byteBufAllocator(),
false,
type -> "locator-connection",
cf,
@@ -195,7 +194,7 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled(
ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT,
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT,
null,
- ByteBufAllocator.DEFAULT,
+ Utils.byteBufAllocator(),
lazyInit,
type -> "locator-connection",
cf,
diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
index 1150a90842..51f320a46c 100644
--- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
+++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java
@@ -75,7 +75,7 @@ public class StreamProducerUnitTest {
void init() {
mocks = MockitoAnnotations.openMocks(this);
executorService = Executors.newScheduledThreadPool(2);
- when(channel.alloc()).thenReturn(ByteBufAllocator.DEFAULT);
+ when(channel.alloc()).thenReturn(Utils.byteBufAllocator());
when(channel.writeAndFlush(Mockito.any())).thenReturn(channelFuture);
when(client.allocateNoCheck(any(ByteBufAllocator.class), anyInt()))
.thenAnswer(
diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
index 59d88cf6aa..52214838b8 100644
--- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
+++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java
@@ -73,7 +73,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
-import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
@@ -736,7 +735,7 @@ private static Field field(Class> cls, String name) {
return field;
}
- private static class ExecutorServiceCloseableResourceWrapper implements CloseableResource {
+ private static class ExecutorServiceCloseableResourceWrapper implements AutoCloseable {
private final ExecutorService executorService;
@@ -1065,7 +1064,8 @@ public enum BrokerVersion {
RABBITMQ_3_11_11("3.11.11"),
RABBITMQ_3_11_14("3.11.14"),
RABBITMQ_3_13_0("3.13.0"),
- RABBITMQ_4_0_0("4.0.0");
+ RABBITMQ_4_0_0("4.0.0"),
+ RABBITMQ_4_1_2("4.1.2");
final String value;