From 97aaa8445d03a75a807348380538bf4133ed0d3f Mon Sep 17 00:00:00 2001 From: rabbitmq-ci Date: Wed, 16 Apr 2025 15:36:59 +0000 Subject: [PATCH 01/43] [maven-release-plugin] prepare for next development iteration --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 375e3457c9..ea8e4cba52 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.rabbitmq stream-client - 1.0.0 + 1.1.0-SNAPSHOT RabbitMQ Stream Java Client The RabbitMQ Stream Java client library allows Java applications to interface with @@ -43,7 +43,7 @@ https://github.com/rabbitmq/rabbitmq-stream-java-client scm:git:git://github.com/rabbitmq/rabbitmq-stream-java-client.git scm:git:https://github.com/rabbitmq/rabbitmq-stream-java-client.git - v1.0.0 + HEAD From 0b1d720d9d325e9dc98403063c948d3d18cba6aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:41:20 +0200 Subject: [PATCH 02/43] Set release version to 1.1.0 --- release-versions.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release-versions.txt b/release-versions.txt index 9ff62f9ab4..c3aa198d6e 100644 --- a/release-versions.txt +++ b/release-versions.txt @@ -1,4 +1,4 @@ -RELEASE_VERSION="1.0.0" +RELEASE_VERSION="1.1.0" DEVELOPMENT_VERSION="1.1.0-SNAPSHOT" RELEASE_BRANCH="main" LATEST=true From 59b95c00d13fe7c6a5060ce5b71d60a45b2b4b1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:41:39 +0200 Subject: [PATCH 03/43] Delete Dockerfile Leftover from Stream PerfTest. --- Dockerfile | 96 ------------------------------------------------------ 1 file changed, 96 deletions(-) delete mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 4deefa0459..0000000000 --- a/Dockerfile +++ /dev/null @@ -1,96 +0,0 @@ -FROM ubuntu:22.04 as builder - -ARG stream_perf_test_url="set-url-here" - -RUN set -eux; \ - \ - apt-get update; \ - apt-get -y upgrade; \ - apt-get install --yes --no-install-recommends \ - ca-certificates \ - wget \ - gnupg \ - jq - -ARG JAVA_VERSION="21" - -RUN if [ "$(uname -m)" = "aarch64" ] || [ "$(uname -m)" = "arm64" ]; then echo "ARM"; ARCH="arm"; BUNDLE="jdk"; else echo "x86"; ARCH="x86"; BUNDLE="jdk"; fi \ - && wget "https://api.azul.com/zulu/download/community/v1.0/bundles/latest/?java_version=$JAVA_VERSION&ext=tar.gz&os=linux&arch=$ARCH&hw_bitness=64&release_status=ga&bundle_type=$BUNDLE" -O jdk-info.json -RUN wget --progress=bar:force:noscroll -O "jdk.tar.gz" $(cat jdk-info.json | jq --raw-output .url) -RUN echo "$(cat jdk-info.json | jq --raw-output .sha256_hash) *jdk.tar.gz" | sha256sum --check --strict - - -RUN set -eux; \ - if [ "$(uname -m)" = "x86_64" ] ; then JAVA_PATH="/usr/lib/jdk-$JAVA_VERSION"; \ - mkdir $JAVA_PATH && \ - tar --extract --file jdk.tar.gz --directory "$JAVA_PATH" --strip-components 1; \ - $JAVA_PATH/bin/jlink --compress=2 --output /jre --add-modules java.base,jdk.management,java.naming,java.xml,jdk.unsupported,jdk.crypto.cryptoki,jdk.httpserver; \ - /jre/bin/java -version; \ - fi - -RUN set -eux; \ - if [ "$(uname -m)" = "aarch64" ] || [ "$(uname -m)" = "arm64" ] ; then JAVA_PATH="/jre"; \ - mkdir $JAVA_PATH && \ - tar --extract --file jdk.tar.gz --directory "$JAVA_PATH" --strip-components 1; \ - fi - -# pgpkeys.uk is quite reliable, but allow for substitutions locally -ARG PGP_KEYSERVER=hkps://keys.openpgp.org -# If you are building this image locally and are getting `gpg: keyserver receive failed: No data` errors, -# run the build with a different PGP_KEYSERVER, e.g. docker build --tag rabbitmq:3.7 --build-arg PGP_KEYSERVER=pgpkeys.eu 3.7/ubuntu -# For context, see https://github.com/docker-library/official-images/issues/4252 - -# https://www.rabbitmq.com/signatures.html#importing-gpg -ENV RABBITMQ_PGP_KEY_ID="0x0A9AF2115F4687BD29803A206B73A36E6026DFCA" -ENV STREAM_PERF_TEST_HOME="/stream_perf_test" - -RUN set -eux; \ - \ - wget --progress dot:giga --output-document "/usr/local/src/stream-perf-test.jar.asc" "$stream_perf_test_url.asc"; \ - wget --progress dot:giga --output-document "/usr/local/src/stream-perf-test.jar" "$stream_perf_test_url"; \ - STREAM_PERF_TEST_SHA256="$(wget -qO- $stream_perf_test_url.sha256)"; \ - echo "$STREAM_PERF_TEST_SHA256 /usr/local/src/stream-perf-test.jar" | sha256sum --check --strict -; \ - \ - export GNUPGHOME="$(mktemp -d)"; \ - gpg --batch --keyserver "$PGP_KEYSERVER" --recv-keys "$RABBITMQ_PGP_KEY_ID"; \ - gpg --batch --verify "/usr/local/src/stream-perf-test.jar.asc" "/usr/local/src/stream-perf-test.jar"; \ - gpgconf --kill all; \ - rm -rf "$GNUPGHOME"; \ - \ - mkdir -p "$STREAM_PERF_TEST_HOME"; \ - cp /usr/local/src/stream-perf-test.jar $STREAM_PERF_TEST_HOME/stream-perf-test.jar - -FROM ubuntu:22.04 - -# we need locales support for characters like ยต to show up correctly in the console -RUN set -eux; \ - apt-get update; \ - apt-get -y upgrade; \ - apt-get install -y --no-install-recommends \ - locales \ - wget \ - ; \ - rm -rf /var/lib/apt/lists/*; \ - locale-gen en_US.UTF-8 - -ENV LANG en_US.UTF-8 -ENV LANGUAGE en_US:en -ENV LC_ALL en_US.UTF-8 - -ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk/jre -RUN mkdir -p $JAVA_HOME -COPY --from=builder /jre $JAVA_HOME/ -RUN ln -svT $JAVA_HOME/bin/java /usr/local/bin/java - -RUN mkdir -p /stream_perf_test -WORKDIR /stream_perf_test -COPY --from=builder /stream_perf_test ./ -RUN set -eux; \ - if [ "$(uname -m)" = "x86_64" ] ; then java -jar stream-perf-test.jar --help ; \ - fi - -RUN groupadd --gid 1000 stream-perf-test -RUN useradd --uid 1000 --gid stream-perf-test --comment "perf-test user" stream-perf-test - -USER stream-perf-test:stream-perf-test - -ENTRYPOINT ["java", "-Dio.netty.processId=1", "-jar", "stream-perf-test.jar"] From 9d3cea55c816a8aa9c0c1895dcdb73908f7d3f80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:44:45 +0200 Subject: [PATCH 04/43] Remove pre-1.0 versioning section from readme --- README.adoc | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/README.adoc b/README.adoc index 19869c317b..2a96cd7585 100644 --- a/README.adoc +++ b/README.adoc @@ -18,10 +18,6 @@ Please refer to the https://rabbitmq.github.io/rabbitmq-stream-java-client/stabl The library is stable and production-ready. -== Versioning - -This library uses https://semver.org/[semantic versioning]. - == Support * For questions: https://groups.google.com/forum/#!forum/rabbitmq-users[RabbitMQ Users] @@ -54,17 +50,7 @@ This library requires at least Java 11, but Java 21 or more is recommended. == Versioning -The RabbitMQ Stream Java Client is in development and stabilization phase. -When the stabilization phase ends, a 1.0.0 version will be cut, and -https://semver.org/[semantic versioning] is likely to be enforced. - -Before reaching the stable phase, the client will use a versioning scheme of `[0.MINOR.PATCH]` where: - -* `0` indicates the project is still in a stabilization phase. -* `MINOR` is a 0-based number incrementing with each new release cycle. It generally reflects significant changes like new features and potentially some programming interfaces changes. -* `PATCH` is a 0-based number incrementing with each service release, that is bux fixes. - -Breaking changes between releases can happen but will be kept to a minimum. +This library uses https://semver.org/[semantic versioning]. == Build Instructions From e571b1a28439ea82bf7aef3abb834dc1113651eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 17 Apr 2025 16:40:10 +0200 Subject: [PATCH 05/43] Test against Java 24 stable And remove Java 23. --- .github/workflows/test-supported-java-versions.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-supported-java-versions.yml b/.github/workflows/test-supported-java-versions.yml index f7694cdc18..f2ac5a1289 100644 --- a/.github/workflows/test-supported-java-versions.yml +++ b/.github/workflows/test-supported-java-versions.yml @@ -11,7 +11,7 @@ jobs: strategy: matrix: distribution: [ 'temurin' ] - version: [ '11', '17', '21', '23', '24-ea', '25-ea' ] + version: [ '11', '17', '21', '24', '25-ea' ] include: - distribution: 'semeru' version: '17' From e4b76cc77fcbca7f66497070c3548707f3c9c5ea Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 24 Apr 2025 16:14:36 +0000 Subject: [PATCH 06/43] Bump com.google.code.gson:gson from 2.13.0 to 2.13.1 Bumps [com.google.code.gson:gson](https://github.com/google/gson) from 2.13.0 to 2.13.1. - [Release notes](https://github.com/google/gson/releases) - [Changelog](https://github.com/google/gson/blob/main/CHANGELOG.md) - [Commits](https://github.com/google/gson/compare/gson-parent-2.13.0...gson-parent-2.13.1) --- updated-dependencies: - dependency-name: com.google.code.gson:gson dependency-version: 2.13.1 dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ea8e4cba52..f5c8558d8d 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ 5.25.0 3.17.0 1.18.0 - 2.13.0 + 2.13.1 0.10.6 1.2.5 1.4.5 From f616d4c02fe5b57d4d1a5561d959e1b6c8b60e6b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 5 May 2025 17:07:38 +0000 Subject: [PATCH 07/43] Bump com.github.luben:zstd-jni from 1.5.7-2 to 1.5.7-3 Bumps [com.github.luben:zstd-jni](https://github.com/luben/zstd-jni) from 1.5.7-2 to 1.5.7-3. - [Commits](https://github.com/luben/zstd-jni/compare/v1.5.7-2...v1.5.7-3) --- updated-dependencies: - dependency-name: com.github.luben:zstd-jni dependency-version: 1.5.7-3 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index f5c8558d8d..274b44d75f 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ 13.1.2 4.7.5 1.27.1 - 1.5.7-2 + 1.5.7-3 1.8.0 1.1.10.7 5.12.2 From 5ff3ec21260bd417b106d032b3c32618217d46fe Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 6 May 2025 15:14:00 +0000 Subject: [PATCH 08/43] Bump netty.version from 4.2.0.Final to 4.2.1.Final Bumps `netty.version` from 4.2.0.Final to 4.2.1.Final. Updates `io.netty:netty-transport` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-codec` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-handler` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-transport-native-epoll` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) --- updated-dependencies: - dependency-name: io.netty:netty-transport dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-codec dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-handler dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-transport-native-epoll dependency-version: 4.2.1.Final dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 274b44d75f..ebc8fa1fda 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.0.Final + 4.2.1.Final 0.34.1 4.2.30 1.14.6 From 40a6560c4ac5b8a2f9a6ed13a6cf68b3d8daa59b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 7 May 2025 10:33:02 +0200 Subject: [PATCH 09/43] Back to Netty 4.2.0 Experiencing unexpected disconnections in some tests. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ebc8fa1fda..274b44d75f 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.1.Final + 4.2.0.Final 0.34.1 4.2.30 1.14.6 From 5ff57aabe08ecf8050cd0af0c937994c831ef691 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 7 May 2025 16:03:29 +0000 Subject: [PATCH 10/43] Bump netty.version from 4.2.0.Final to 4.2.1.Final Bumps `netty.version` from 4.2.0.Final to 4.2.1.Final. Updates `io.netty:netty-transport` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-codec` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-handler` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-transport-native-epoll` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) --- updated-dependencies: - dependency-name: io.netty:netty-transport dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-codec dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-handler dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-transport-native-epoll dependency-version: 4.2.1.Final dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 274b44d75f..ebc8fa1fda 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.0.Final + 4.2.1.Final 0.34.1 4.2.30 1.14.6 From b2a2718bc580ec3e4798302b25c3da1ed1c7ad48 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 7 May 2025 16:03:45 +0000 Subject: [PATCH 11/43] Bump com.google.googlejavaformat:google-java-format Bumps [com.google.googlejavaformat:google-java-format](https://github.com/google/google-java-format) from 1.26.0 to 1.27.0. - [Release notes](https://github.com/google/google-java-format/releases) - [Commits](https://github.com/google/google-java-format/compare/v1.26.0...v1.27.0) --- updated-dependencies: - dependency-name: com.google.googlejavaformat:google-java-format dependency-version: 1.27.0 dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 274b44d75f..ce9052892a 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ 3.2.1 1.37 2.44.4 - 1.26.0 + 1.27.0 0.8.13 4.9.3.0 4.9.3 From 2a02f06754416945e5e936dfa57baa7a2ee9044d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 12 May 2025 09:03:35 +0200 Subject: [PATCH 12/43] Back to Netty 4.2.0 Experiencing unexpected disconnections in some tests. --- pom.xml | 2 +- .../stream/impl/StreamProducerTest.java | 63 +++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4c8ac0194a..ce9052892a 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.1.Final + 4.2.0.Final 0.34.1 4.2.30 1.14.6 diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index 63254247ad..bff57ad269 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -57,6 +58,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import wiremock.org.checkerframework.checker.units.qual.A; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamProducerTest { @@ -88,6 +90,67 @@ void tearDown() { environment.close(); } + private static AtomicLong rate() { + AtomicLong count = new AtomicLong(); + AtomicLong tick = new AtomicLong(System.nanoTime()); + + Executors.newSingleThreadScheduledExecutor() + .scheduleAtFixedRate( + () -> { + long now = System.nanoTime(); + long before = tick.getAndSet(now); + long elapsed = now - before; + long sent = count.getAndSet(0); + System.out.println("Rate " + (sent * 1_000_000_000L / elapsed) + " msg/s"); + }, + 1, + 1, + TimeUnit.SECONDS); + return count; + } + + @Test + void test() { + AtomicLong count = rate(); + Producer producer = environment.producerBuilder().stream(stream) + .maxUnconfirmedMessages(10) + .build(); + + while(true) { + producer.send(producer.messageBuilder().build(), s -> { }); + count.incrementAndGet(); + } + + } + + @Test + void client() throws Exception { + int permits = 10; + Semaphore semaphore = new Semaphore(permits); + Client client = cf.get(new Client.ClientParameters().publishConfirmListener(new Client.PublishConfirmListener() { + @Override + public void handle(byte publisherId, long publishingId) { + semaphore.release(); + } + })); + + byte pubId = (byte) 0; + client.declarePublisher(pubId, null, stream); + + AtomicLong count = rate(); + + List messages = IntStream.range(0, permits).mapToObj(ignored -> client + .messageBuilder() + .addData("hello".getBytes(StandardCharsets.UTF_8)) + .build()).collect(Collectors.toList()); + while (true) { + semaphore.acquire(permits); + client.publish(pubId, messages); + count.addAndGet(permits); + } + + } + @Test void send() throws Exception { int batchSize = 10; From 65cb76d93c54f9dcc22ef4a45e8a590c5b196e99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 12 May 2025 10:15:47 +0200 Subject: [PATCH 13/43] Revert "Back to Netty 4.2.0" This reverts commit 2a02f06754416945e5e936dfa57baa7a2ee9044d. --- pom.xml | 2 +- .../stream/impl/StreamProducerTest.java | 63 ------------------- 2 files changed, 1 insertion(+), 64 deletions(-) diff --git a/pom.xml b/pom.xml index ce9052892a..4c8ac0194a 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.0.Final + 4.2.1.Final 0.34.1 4.2.30 1.14.6 diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index bff57ad269..63254247ad 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -41,7 +41,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -58,7 +57,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import wiremock.org.checkerframework.checker.units.qual.A; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamProducerTest { @@ -90,67 +88,6 @@ void tearDown() { environment.close(); } - private static AtomicLong rate() { - AtomicLong count = new AtomicLong(); - AtomicLong tick = new AtomicLong(System.nanoTime()); - - Executors.newSingleThreadScheduledExecutor() - .scheduleAtFixedRate( - () -> { - long now = System.nanoTime(); - long before = tick.getAndSet(now); - long elapsed = now - before; - long sent = count.getAndSet(0); - System.out.println("Rate " + (sent * 1_000_000_000L / elapsed) + " msg/s"); - }, - 1, - 1, - TimeUnit.SECONDS); - return count; - } - - @Test - void test() { - AtomicLong count = rate(); - Producer producer = environment.producerBuilder().stream(stream) - .maxUnconfirmedMessages(10) - .build(); - - while(true) { - producer.send(producer.messageBuilder().build(), s -> { }); - count.incrementAndGet(); - } - - } - - @Test - void client() throws Exception { - int permits = 10; - Semaphore semaphore = new Semaphore(permits); - Client client = cf.get(new Client.ClientParameters().publishConfirmListener(new Client.PublishConfirmListener() { - @Override - public void handle(byte publisherId, long publishingId) { - semaphore.release(); - } - })); - - byte pubId = (byte) 0; - client.declarePublisher(pubId, null, stream); - - AtomicLong count = rate(); - - List messages = IntStream.range(0, permits).mapToObj(ignored -> client - .messageBuilder() - .addData("hello".getBytes(StandardCharsets.UTF_8)) - .build()).collect(Collectors.toList()); - while (true) { - semaphore.acquire(permits); - client.publish(pubId, messages); - count.addAndGet(permits); - } - - } - @Test void send() throws Exception { int batchSize = 10; From 05df2a56da0e9d53d24170e2eb51779d75b96147 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 12 May 2025 10:16:19 +0200 Subject: [PATCH 14/43] Back to Netty 4.2.0 Experiencing unexpected disconnections in some tests. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4c8ac0194a..ce9052892a 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.1.Final + 4.2.0.Final 0.34.1 4.2.30 1.14.6 From 8a198eeb45592726736e21580330ece1bfeb5586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 12 May 2025 14:54:47 +0200 Subject: [PATCH 15/43] Make dynamic batch pump more aggressively Low value for maxUnconfirmedMessages combined with unfortunate timing can make the dynamic batch flush only on timeout. This commit makes the dynamic batch class "pump" for new items (messages) more aggressively, which mitigates the problem. References #750 --- .../rabbitmq/stream/impl/DynamicBatch.java | 26 +++++++++----- .../stream/impl/DynamicBatchTest.java | 36 +++++++++++++++++++ 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index 7e2d1a7369..c6038dfed6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -69,15 +69,7 @@ private void loop() { if (state.items.size() >= state.batchSize) { this.maybeCompleteBatch(state, true); } else { - item = this.requests.poll(); - if (item == null) { - this.maybeCompleteBatch(state, false); - } else { - state.items.add(item); - if (state.items.size() >= state.batchSize) { - this.maybeCompleteBatch(state, true); - } - } + pump(state, 2); } } else { this.maybeCompleteBatch(state, false); @@ -85,6 +77,22 @@ private void loop() { } } + private void pump(State state, int pumpCount) { + if (pumpCount <= 0) { + return; + } + T item = this.requests.poll(); + if (item == null) { + this.maybeCompleteBatch(state, false); + } else { + state.items.add(item); + if (state.items.size() >= state.batchSize) { + this.maybeCompleteBatch(state, true); + } + this.pump(state, pumpCount - 1); + } + } + private static final class State { int batchSize; diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java index dca9810762..50698320f8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -23,8 +23,11 @@ import com.rabbitmq.stream.impl.TestUtils.Sync; import java.util.Locale; import java.util.Random; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; @@ -118,4 +121,37 @@ void failedProcessingIsReplayed() throws Exception { waitAtMost(() -> collected.get() == itemCount); } } + + @Test + void lowThrottlingValueShouldStillHighPublishingRate() throws Exception { + int batchSize = 10; + Semaphore semaphore = new Semaphore(batchSize); + DynamicBatch.BatchConsumer action = + items -> { + semaphore.release(items.size()); + return true; + }; + + try (DynamicBatch batch = new DynamicBatch<>(action, batchSize)) { + MetricRegistry metrics = new MetricRegistry(); + Meter rate = metrics.meter("publishing-rate"); + AtomicBoolean keepGoing = new AtomicBoolean(true); + AtomicLong sequence = new AtomicLong(); + new Thread( + () -> { + while (keepGoing.get() && !Thread.interrupted()) { + long id = sequence.getAndIncrement(); + if (semaphore.tryAcquire()) { + batch.add(id); + rate.mark(); + } + } + }) + .start(); + long start = System.nanoTime(); + waitAtMost( + () -> + System.nanoTime() - start > TimeUnit.SECONDS.toNanos(1) && rate.getMeanRate() > 1000); + } + } } From 196f917c7835a12ae543bed983d0f05540f632d0 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 May 2025 16:14:01 +0000 Subject: [PATCH 16/43] Bump io.micrometer:micrometer-core from 1.14.6 to 1.14.7 Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.14.6 to 1.14.7. - [Release notes](https://github.com/micrometer-metrics/micrometer/releases) - [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.14.6...v1.14.7) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-core dependency-version: 1.14.7 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ce9052892a..fe18f7409c 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 4.2.0.Final 0.34.1 4.2.30 - 1.14.6 + 1.14.7 13.1.2 4.7.5 1.27.1 From c610ef8e7ab25d8cf4b8d5b0c91f0a141e682dcc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 May 2025 16:16:56 +0000 Subject: [PATCH 17/43] Bump io.micrometer:micrometer-tracing-integration-test Bumps [io.micrometer:micrometer-tracing-integration-test](https://github.com/micrometer-metrics/tracing) from 1.4.5 to 1.4.6. - [Release notes](https://github.com/micrometer-metrics/tracing/releases) - [Commits](https://github.com/micrometer-metrics/tracing/compare/v1.4.5...v1.4.6) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-tracing-integration-test dependency-version: 1.4.6 dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ce9052892a..56447fb07f 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 2.13.1 0.10.6 1.2.5 - 1.4.5 + 1.4.6 1.0.4 3.14.0 3.5.3 From ea06465bdbdd5046e84ca24ba93f5c0f1189f853 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 13 May 2025 16:20:26 +0000 Subject: [PATCH 18/43] Bump io.micrometer:micrometer-tracing-integration-test Bumps [io.micrometer:micrometer-tracing-integration-test](https://github.com/micrometer-metrics/tracing) from 1.4.6 to 1.5.0. - [Release notes](https://github.com/micrometer-metrics/tracing/releases) - [Commits](https://github.com/micrometer-metrics/tracing/compare/v1.4.6...v1.5.0) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-tracing-integration-test dependency-version: 1.5.0 dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e880edfbbe..aab50b070e 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 2.13.1 0.10.6 1.2.5 - 1.4.6 + 1.5.0 1.0.4 3.14.0 3.5.3 From 447e63b664203596091264e44e62e7fdbfcb07a1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 13 May 2025 16:20:41 +0000 Subject: [PATCH 19/43] Bump io.micrometer:micrometer-core from 1.14.7 to 1.15.0 Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.14.7 to 1.15.0. - [Release notes](https://github.com/micrometer-metrics/micrometer/releases) - [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.14.7...v1.15.0) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-core dependency-version: 1.15.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e880edfbbe..d2a12b8ac6 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 4.2.0.Final 0.34.1 4.2.30 - 1.14.7 + 1.15.0 13.1.2 4.7.5 1.27.1 From 35beeeb21952edf3222e15ecba9fe28c7e777a40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 20 May 2025 17:44:32 +0200 Subject: [PATCH 20/43] Bump Netty to 4.2.1 References #748 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index de6e8e1c42..d85d136d5a 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.0.Final + 4.2.1.Final 0.34.1 4.2.30 1.15.0 From d364f6b030963a9c92082b088c43dc850c079b55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 20 May 2025 17:44:57 +0200 Subject: [PATCH 21/43] Use Netty's pooled memory allocator Instead of the adaptative one, which is the default in 4.2. The pooled allocator was the default in 4.1. --- src/main/java/com/rabbitmq/stream/impl/Client.java | 2 +- src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java | 1 + .../com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java | 2 +- src/main/java/com/rabbitmq/stream/impl/Utils.java | 6 ++++++ src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java | 3 +-- src/test/java/com/rabbitmq/stream/impl/FrameTest.java | 3 +-- .../com/rabbitmq/stream/impl/StreamEnvironmentTest.java | 2 +- .../rabbitmq/stream/impl/StreamEnvironmentUnitTest.java | 7 +++---- .../com/rabbitmq/stream/impl/StreamProducerUnitTest.java | 2 +- 9 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 8679844939..e69681f747 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -256,7 +256,7 @@ public Client(ClientParameters parameters) { b.option( ChannelOption.ALLOCATOR, parameters.byteBufAllocator == null - ? ByteBufAllocator.DEFAULT + ? Utils.byteBufAllocator() : parameters.byteBufAllocator); } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index c6038dfed6..37494535b6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -111,6 +111,7 @@ private void maybeCompleteBatch(State state, boolean increaseIfCompleted) { state.items = new ArrayList<>(state.batchSize); } } catch (Exception e) { + // e.printStackTrace(); LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage()); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index f3b38cb884..1be66aa36b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -422,7 +422,7 @@ static class DefaultNettyConfiguration implements NettyConfiguration { private final EnvironmentBuilder environmentBuilder; private EventLoopGroup eventLoopGroup; - private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT; + private ByteBufAllocator byteBufAllocator = Utils.byteBufAllocator(); private Consumer channelCustomizer = noOpConsumer(); private Consumer bootstrapCustomizer = noOpConsumer(); diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index 4ea934e911..4b535bb21d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -19,6 +19,8 @@ import com.rabbitmq.stream.*; import com.rabbitmq.stream.impl.Client.ClientParameters; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoopGroup; import io.netty.channel.MultiThreadIoEventLoopGroup; @@ -415,6 +417,10 @@ static EventLoopGroup eventLoopGroup() { return new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); } + static ByteBufAllocator byteBufAllocator() { + return PooledByteBufAllocator.DEFAULT; + } + /* class to help testing SAC on super streams */ diff --git a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java index 02844308dd..987cf311c9 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java @@ -20,7 +20,6 @@ import com.rabbitmq.stream.impl.ServerFrameHandler.DeliverVersion1FrameHandler; import com.rabbitmq.stream.metrics.NoOpMetricsCollector; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -51,7 +50,7 @@ public MessageBuilder messageBuilder() { ByteBuf generateFrameBuffer( int nbMessages, long chunkOffset, int dataSize, Iterable messages) { - ByteBuf bb = ByteBufAllocator.DEFAULT.buffer(1024); + ByteBuf bb = Utils.byteBufAllocator().buffer(1024); bb.writeShort(Utils.encodeRequestCode(Constants.COMMAND_DELIVER)) .writeShort(Constants.VERSION_1) .writeByte(1) // subscription id diff --git a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java index 28422d40ad..7198facf5a 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java @@ -26,7 +26,6 @@ import com.rabbitmq.stream.Message; import com.rabbitmq.stream.Properties; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import java.io.ByteArrayOutputStream; @@ -149,7 +148,7 @@ public TestDesc(String description, List sizes, int expectedCalls) { tests.forEach( test -> { Channel channel = Mockito.mock(Channel.class); - Mockito.when(channel.alloc()).thenReturn(ByteBufAllocator.DEFAULT); + Mockito.when(channel.alloc()).thenReturn(Utils.byteBufAllocator()); Mockito.when(channel.writeAndFlush(Mockito.any())) .thenReturn(Mockito.mock(ChannelFuture.class)); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index f076a78fde..9588002d54 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -216,7 +216,7 @@ void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed(boolean lazyInit } @Test - void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) throws Exception { + void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) { int messageCount = 100; int streamCount = 20; int producersCount = ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT * 3 + 10; diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java index e8d3f25e34..8dafb175ad 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java @@ -26,7 +26,6 @@ import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException; -import io.netty.buffer.ByteBufAllocator; import java.net.URI; import java.time.Duration; import java.util.Arrays; @@ -94,7 +93,7 @@ Client.ClientParameters duplicate() { ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, - ByteBufAllocator.DEFAULT, + Utils.byteBufAllocator(), false, type -> "locator-connection", cf, @@ -163,7 +162,7 @@ void shouldTryUrisOnInitializationFailure() throws Exception { ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, - ByteBufAllocator.DEFAULT, + Utils.byteBufAllocator(), false, type -> "locator-connection", cf, @@ -195,7 +194,7 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled( ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, - ByteBufAllocator.DEFAULT, + Utils.byteBufAllocator(), lazyInit, type -> "locator-connection", cf, diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 1150a90842..51f320a46c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -75,7 +75,7 @@ public class StreamProducerUnitTest { void init() { mocks = MockitoAnnotations.openMocks(this); executorService = Executors.newScheduledThreadPool(2); - when(channel.alloc()).thenReturn(ByteBufAllocator.DEFAULT); + when(channel.alloc()).thenReturn(Utils.byteBufAllocator()); when(channel.writeAndFlush(Mockito.any())).thenReturn(channelFuture); when(client.allocateNoCheck(any(ByteBufAllocator.class), anyInt())) .thenAnswer( From fe8384059b5f9be9c89bda6cf6556eef86353818 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 21 May 2025 08:39:33 +0200 Subject: [PATCH 22/43] Update default Netty memory allocator in documentation --- src/docs/asciidoc/api.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index f4c5d0be9a..75a3d0da76 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -263,7 +263,7 @@ It is the developer's responsibility to close the `EventLoopGroup` they provide. |`netty#ByteBufAllocator` |`ByteBuf` allocator. -|ByteBufAllocator.DEFAULT +|PooledByteBufAllocator.DEFAULT |`netty#channelCustomizer` |Extension point to customize Netty's `Channel` instances used for connections. From dcb1d5658fadc849bbf974c3e1707edbc162d4eb Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 21 May 2025 16:33:30 +0000 Subject: [PATCH 23/43] Bump org.mockito:mockito-core from 5.17.0 to 5.18.0 Bumps [org.mockito:mockito-core](https://github.com/mockito/mockito) from 5.17.0 to 5.18.0. - [Release notes](https://github.com/mockito/mockito/releases) - [Commits](https://github.com/mockito/mockito/compare/v5.17.0...v5.18.0) --- updated-dependencies: - dependency-name: org.mockito:mockito-core dependency-version: 5.18.0 dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d85d136d5a..26b1feb7f2 100644 --- a/pom.xml +++ b/pom.xml @@ -62,7 +62,7 @@ 1.1.10.7 5.12.2 3.27.3 - 5.17.0 + 5.18.0 5.25.0 3.17.0 1.18.0 From 478db44ee71a5c5e3bac27d1ae35cb76e0091385 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 26 May 2025 16:25:38 +0000 Subject: [PATCH 24/43] Bump io.dropwizard.metrics:metrics-core from 4.2.30 to 4.2.32 Bumps [io.dropwizard.metrics:metrics-core](https://github.com/dropwizard/metrics) from 4.2.30 to 4.2.32. - [Release notes](https://github.com/dropwizard/metrics/releases) - [Commits](https://github.com/dropwizard/metrics/compare/v4.2.30...v4.2.32) --- updated-dependencies: - dependency-name: io.dropwizard.metrics:metrics-core dependency-version: 4.2.32 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 26b1feb7f2..0f394611f7 100644 --- a/pom.xml +++ b/pom.xml @@ -52,7 +52,7 @@ 1.2.13 4.2.1.Final 0.34.1 - 4.2.30 + 4.2.32 1.15.0 13.1.2 4.7.5 From 6268e19033d75dbf7734978788991c2b196fa20e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 16:55:56 +0000 Subject: [PATCH 25/43] Bump com.diffplug.spotless:spotless-maven-plugin from 2.44.4 to 2.44.5 Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.44.4 to 2.44.5. - [Release notes](https://github.com/diffplug/spotless/releases) - [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md) - [Commits](https://github.com/diffplug/spotless/compare/maven/2.44.4...maven/2.44.5) --- updated-dependencies: - dependency-name: com.diffplug.spotless:spotless-maven-plugin dependency-version: 2.44.5 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0f394611f7..4e105dfaf5 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ 2.3.2 3.2.1 1.37 - 2.44.4 + 2.44.5 1.27.0 0.8.13 4.9.3.0 From e53adf956af56a8c76342321c21e3f96c19ac32c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 2 Jun 2025 14:49:33 +0000 Subject: [PATCH 26/43] Bump org.apache.maven.plugins:maven-clean-plugin from 3.4.1 to 3.5.0 Bumps [org.apache.maven.plugins:maven-clean-plugin](https://github.com/apache/maven-clean-plugin) from 3.4.1 to 3.5.0. - [Release notes](https://github.com/apache/maven-clean-plugin/releases) - [Commits](https://github.com/apache/maven-clean-plugin/compare/maven-clean-plugin-3.4.1...maven-clean-plugin-3.5.0) --- updated-dependencies: - dependency-name: org.apache.maven.plugins:maven-clean-plugin dependency-version: 3.5.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4e105dfaf5..3fbdb98893 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,7 @@ 3.2.7 3.2.1 3.3.1 - 3.4.1 + 3.5.0 3.3.1 3.11.2 3.4.2 From f3cd8bbe78c84f2d2baf77228cbd73c92a097b55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 2 Jun 2025 16:53:46 +0200 Subject: [PATCH 27/43] Use AutoCloseable instead of JUnit closeable resource --- src/test/java/com/rabbitmq/stream/impl/TestUtils.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 59d88cf6aa..60aa2cc2a5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -73,7 +73,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ExtensionContext.Namespace; -import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -736,7 +735,7 @@ private static Field field(Class cls, String name) { return field; } - private static class ExecutorServiceCloseableResourceWrapper implements CloseableResource { + private static class ExecutorServiceCloseableResourceWrapper implements AutoCloseable { private final ExecutorService executorService; From 4933d809d133b6ac2ede4e7e80b9f0febba9572d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 2 Jun 2025 16:53:55 +0200 Subject: [PATCH 28/43] Bump JUnit to 5.13.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3fbdb98893..cfbe040910 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ 1.5.7-3 1.8.0 1.1.10.7 - 5.12.2 + 5.13.0 3.27.3 5.18.0 5.25.0 From 126f4da8c052e8edba13cef9998fff7269fa60d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 2 Jun 2025 17:06:27 +0200 Subject: [PATCH 29/43] Set release version to 1.1.0 --- release-versions.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release-versions.txt b/release-versions.txt index c3aa198d6e..067a873690 100644 --- a/release-versions.txt +++ b/release-versions.txt @@ -1,5 +1,5 @@ RELEASE_VERSION="1.1.0" -DEVELOPMENT_VERSION="1.1.0-SNAPSHOT" +DEVELOPMENT_VERSION="1.2.0-SNAPSHOT" RELEASE_BRANCH="main" LATEST=true From 1d09406460862afff6eab2749be7d27efa6ae36f Mon Sep 17 00:00:00 2001 From: rabbitmq-ci Date: Mon, 2 Jun 2025 15:27:55 +0000 Subject: [PATCH 30/43] [maven-release-plugin] prepare release v1.1.0 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index cfbe040910..6156a897ad 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.rabbitmq stream-client - 1.1.0-SNAPSHOT + 1.1.0 RabbitMQ Stream Java Client The RabbitMQ Stream Java client library allows Java applications to interface with @@ -43,7 +43,7 @@ https://github.com/rabbitmq/rabbitmq-stream-java-client scm:git:git://github.com/rabbitmq/rabbitmq-stream-java-client.git scm:git:https://github.com/rabbitmq/rabbitmq-stream-java-client.git - HEAD + v1.1.0 From 9ddd03d4808683ea62deed999e4848dc6ed55625 Mon Sep 17 00:00:00 2001 From: rabbitmq-ci Date: Mon, 2 Jun 2025 15:27:56 +0000 Subject: [PATCH 31/43] [maven-release-plugin] prepare for next development iteration --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 6156a897ad..43656ddc0a 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.rabbitmq stream-client - 1.1.0 + 1.2.0-SNAPSHOT RabbitMQ Stream Java Client The RabbitMQ Stream Java client library allows Java applications to interface with @@ -43,7 +43,7 @@ https://github.com/rabbitmq/rabbitmq-stream-java-client scm:git:git://github.com/rabbitmq/rabbitmq-stream-java-client.git scm:git:https://github.com/rabbitmq/rabbitmq-stream-java-client.git - v1.1.0 + HEAD From bd03605a05d571f9ab2819106f80118a816e768f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 5 Jun 2025 16:40:46 +0000 Subject: [PATCH 32/43] Bump netty.version from 4.2.1.Final to 4.2.2.Final Bumps `netty.version` from 4.2.1.Final to 4.2.2.Final. Updates `io.netty:netty-transport` from 4.2.1.Final to 4.2.2.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.1.Final...netty-4.2.2.Final) Updates `io.netty:netty-codec` from 4.2.1.Final to 4.2.2.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.1.Final...netty-4.2.2.Final) Updates `io.netty:netty-handler` from 4.2.1.Final to 4.2.2.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.1.Final...netty-4.2.2.Final) Updates `io.netty:netty-transport-native-epoll` from 4.2.1.Final to 4.2.2.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.1.Final...netty-4.2.2.Final) --- updated-dependencies: - dependency-name: io.netty:netty-transport dependency-version: 4.2.2.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-codec dependency-version: 4.2.2.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-handler dependency-version: 4.2.2.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-transport-native-epoll dependency-version: 4.2.2.Final dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 43656ddc0a..6b6a2d8159 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.1.Final + 4.2.2.Final 0.34.1 4.2.32 1.15.0 From 2a87e9937dffee3426871421cda0669746f3c771 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Jun 2025 17:05:50 +0000 Subject: [PATCH 33/43] Bump org.junit:junit-bom from 5.13.0 to 5.13.1 Bumps [org.junit:junit-bom](https://github.com/junit-team/junit5) from 5.13.0 to 5.13.1. - [Release notes](https://github.com/junit-team/junit5/releases) - [Commits](https://github.com/junit-team/junit5/compare/r5.13.0...r5.13.1) --- updated-dependencies: - dependency-name: org.junit:junit-bom dependency-version: 5.13.1 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6b6a2d8159..aa96ed72d2 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ 1.5.7-3 1.8.0 1.1.10.7 - 5.13.0 + 5.13.1 3.27.3 5.18.0 5.25.0 From f9a6cf5cb22d073adb924ac4580d98bb8a1571c5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 10 Jun 2025 16:57:28 +0000 Subject: [PATCH 34/43] Bump io.micrometer:micrometer-tracing-integration-test Bumps [io.micrometer:micrometer-tracing-integration-test](https://github.com/micrometer-metrics/tracing) from 1.5.0 to 1.5.1. - [Release notes](https://github.com/micrometer-metrics/tracing/releases) - [Commits](https://github.com/micrometer-metrics/tracing/compare/v1.5.0...v1.5.1) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-tracing-integration-test dependency-version: 1.5.1 dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index aa96ed72d2..9d4208f926 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 2.13.1 0.10.6 1.2.5 - 1.5.0 + 1.5.1 1.0.4 3.14.0 3.5.3 From 3e31570ff6ba4bc4e68ca2dfce320d23691a4a37 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 10 Jun 2025 16:57:35 +0000 Subject: [PATCH 35/43] Bump io.micrometer:micrometer-core from 1.15.0 to 1.15.1 Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.15.0 to 1.15.1. - [Release notes](https://github.com/micrometer-metrics/micrometer/releases) - [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.15.0...v1.15.1) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-core dependency-version: 1.15.1 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index aa96ed72d2..de7d82cd8f 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 4.2.2.Final 0.34.1 4.2.32 - 1.15.0 + 1.15.1 13.1.2 4.7.5 1.27.1 From 47f58c0b1a2789a53d4f7493e427db3fdfef31a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 12 Jun 2025 16:44:21 +0200 Subject: [PATCH 36/43] Configure project to publish to Central Portal OSSRH reaches EOL on June 30th, 2025. --- .github/workflows/publish-snapshot.yml | 6 +-- .github/workflows/release.yml | 35 ++------------ .github/workflows/test.yml | 6 +-- ci/evaluate-release.sh | 14 ------ pom.xml | 65 +++++--------------------- src/test/java/SanityCheck.java | 1 - 6 files changed, 23 insertions(+), 104 deletions(-) delete mode 100755 ci/evaluate-release.sh diff --git a/.github/workflows/publish-snapshot.yml b/.github/workflows/publish-snapshot.yml index 8d22f67352..a327ebae89 100644 --- a/.github/workflows/publish-snapshot.yml +++ b/.github/workflows/publish-snapshot.yml @@ -14,7 +14,7 @@ jobs: distribution: 'temurin' java-version: '21' cache: 'maven' - server-id: ossrh + server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} @@ -22,6 +22,6 @@ jobs: - name: Publish snapshot run: ./mvnw clean deploy -Psnapshots -DskipITs -DskipTests env: - MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} + MAVEN_USERNAME: ${{ secrets.CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.CENTRAL_TOKEN }} MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index bb1c9f9029..23b8163a46 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -17,51 +17,26 @@ jobs: distribution: 'temurin' java-version: '11' cache: 'maven' - server-id: ${{ env.maven_server_id }} + server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} gpg-passphrase: MAVEN_GPG_PASSPHRASE - - name: Release Stream Java Client (GA) - if: ${{ env.ga_release == 'true' }} + - name: Release Stream Java Client run: | git config user.name "rabbitmq-ci" git config user.email "rabbitmq-ci@users.noreply.github.com" ci/release-stream-java-client.sh env: - MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} + MAVEN_USERNAME: ${{ secrets.CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.CENTRAL_TOKEN }} MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} - - name: Release Stream Java Client (Milestone/RC) - if: ${{ env.ga_release != 'true' }} - run: | - git config user.name "rabbitmq-ci" - git config user.email "rabbitmq-ci@users.noreply.github.com" - ci/release-stream-java-client.sh - env: - MAVEN_USERNAME: '' - MAVEN_PASSWORD: ${{ secrets.PACKAGECLOUD_TOKEN }} - MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} - - name: Checkout tls-gen - uses: actions/checkout@v4 - with: - repository: rabbitmq/tls-gen - path: './tls-gen' - - name: Start broker - run: ci/start-broker.sh - - name: Set up JDK for sanity check and documentation generation + - name: Set up JDK for documentation generation uses: actions/setup-java@v4 with: distribution: 'temurin' java-version: '21' cache: 'maven' - - name: Sanity Check - run: | - source ./release-versions.txt - export RABBITMQ_LIBRARY_VERSION=$RELEASE_VERSION - curl -Ls https://sh.jbang.dev | bash -s - src/test/java/SanityCheck.java - - name: Stop broker - run: docker stop rabbitmq && docker rm rabbitmq - name: Publish Documentation run: | git config user.name "rabbitmq-ci" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 523b05cbf9..8011f560d5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -26,7 +26,7 @@ jobs: distribution: 'temurin' java-version: '21' cache: 'maven' - server-id: ossrh + server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} @@ -60,8 +60,8 @@ jobs: - name: Publish snapshot run: ./mvnw clean deploy -Psnapshots -DskipITs -DskipTests env: - MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} + MAVEN_USERNAME: ${{ secrets.CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.CENTRAL_TOKEN }} MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} - name: Publish Documentation run: | diff --git a/ci/evaluate-release.sh b/ci/evaluate-release.sh deleted file mode 100755 index 4ad656d7a0..0000000000 --- a/ci/evaluate-release.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env bash - -source ./release-versions.txt - -if [[ $RELEASE_VERSION == *[RCM]* ]] -then - echo "prerelease=true" >> $GITHUB_ENV - echo "ga_release=false" >> $GITHUB_ENV - echo "maven_server_id=packagecloud-rabbitmq-maven-milestones" >> $GITHUB_ENV -else - echo "prerelease=false" >> $GITHUB_ENV - echo "ga_release=true" >> $GITHUB_ENV - echo "maven_server_id=ossrh" >> $GITHUB_ENV -fi \ No newline at end of file diff --git a/pom.xml b/pom.xml index a9af3d2fb8..7f883871a1 100644 --- a/pom.xml +++ b/pom.xml @@ -99,8 +99,7 @@ 6026DFCA yyyy-MM-dd'T'HH:mm:ss'Z' UTF-8 - 0.0.6 - 1.7.0 + 0.7.0 true true @@ -613,14 +612,18 @@ + + org.sonatype.central + central-publishing-maven-plugin + ${central-publishing-maven-plugin.version} + true + + central + true + + + - - - io.packagecloud.maven.wagon - maven-packagecloud-wagon - ${maven.packagecloud.wagon.version} - - @@ -631,26 +634,6 @@ false false - - - ossrh - https://oss.sonatype.org/content/repositories/snapshots - - - - - - milestone - - false - false - - - - packagecloud-rabbitmq-maven-milestones - packagecloud+https://packagecloud.io/rabbitmq/maven-milestones - - @@ -659,29 +642,6 @@ false false - - - - - org.sonatype.plugins - nexus-staging-maven-plugin - ${nexus-staging-maven-plugin.version} - true - - ossrh - https://oss.sonatype.org/ - false - 20 - - - - - - - ossrh - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - jvm-test-arguments-below-java-21 @@ -702,7 +662,6 @@ - diff --git a/src/test/java/SanityCheck.java b/src/test/java/SanityCheck.java index 4e550c8bd2..7ee907180c 100755 --- a/src/test/java/SanityCheck.java +++ b/src/test/java/SanityCheck.java @@ -1,5 +1,4 @@ ///usr/bin/env jbang "$0" "$@" ; exit $? -//REPOS mavencentral,ossrh-staging=https://oss.sonatype.org/content/groups/staging/,rabbitmq-packagecloud-milestones=https://packagecloud.io/rabbitmq/maven-milestones/maven2 //DEPS com.rabbitmq:stream-client:${env.RABBITMQ_LIBRARY_VERSION} //DEPS org.slf4j:slf4j-simple:1.7.36 From fc83c4d57a1fef4b70735bd418d3f274767cece3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 12 Jun 2025 16:59:38 +0200 Subject: [PATCH 37/43] Remove usage of deleted script --- .github/workflows/release.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 23b8163a46..3430115816 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -9,8 +9,6 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Evaluate release type - run: ci/evaluate-release.sh - name: Set up JDK uses: actions/setup-java@v4 with: From f6b389e048ee0e9e0fc07644af73502a27a6626e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 12 Jun 2025 17:42:56 +0200 Subject: [PATCH 38/43] Use new snapshot repository in documentation --- src/docs/asciidoc/setup.adoc | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/docs/asciidoc/setup.adoc b/src/docs/asciidoc/setup.adoc index e9eaaa6df8..1db3aef926 100644 --- a/src/docs/asciidoc/setup.adoc +++ b/src/docs/asciidoc/setup.adoc @@ -139,8 +139,8 @@ With Maven: - ossrh - https://oss.sonatype.org/content/repositories/snapshots + central-portal-snapshots + https://central.sonatype.com/repository/maven-snapshots/ true false @@ -154,7 +154,14 @@ With Gradle: [source,groovy,subs="attributes,specialcharacters"] ---- repositories { - maven { url 'https://oss.sonatype.org/content/repositories/snapshots' } + maven { + name = 'Central Portal Snapshots' + url = 'https://central.sonatype.com/repository/maven-snapshots/' + // Only search this repository for the specific dependency + content { + includeModule("com.rabbitmq", "{project-artifact-id}") + } + } mavenCentral() } ---- From 3bcea64354af3ac90391037bb7bf0db90cfc4f56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 13 Jun 2025 09:35:27 +0200 Subject: [PATCH 39/43] Disable auto-publish --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7f883871a1..e01f48f3bf 100644 --- a/pom.xml +++ b/pom.xml @@ -619,7 +619,7 @@ true central - true + false From a067613ae4fee4a159816b4686d13093d47490bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 13 Jun 2025 16:34:52 +0200 Subject: [PATCH 40/43] Consider max unconfirmed messages in dynamic batch configuration A small value for max unconfirmed messages can impact the dynamic batch mechanism. This commit sets the min batch size to half the max unconfirmed messages value if it is less than the configured batch size. References #757 --- .../rabbitmq/stream/impl/DynamicBatch.java | 19 ++++++++++++------- .../impl/DynamicBatchMessageAccumulator.java | 7 +++++-- .../rabbitmq/stream/impl/ProducerUtils.java | 2 ++ .../rabbitmq/stream/impl/StreamProducer.java | 1 + .../stream/impl/DynamicBatchTest.java | 6 +++--- 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index 37494535b6..3c288d5428 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -28,17 +28,22 @@ final class DynamicBatch implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class); - private static final int MIN_BATCH_SIZE = 32; - private static final int MAX_BATCH_SIZE = 8192; + private static final int MIN_BATCH_SIZE = 16; private final BlockingQueue requests = new LinkedBlockingQueue<>(); private final BatchConsumer consumer; - private final int configuredBatchSize; + private final int configuredBatchSize, minBatchSize, maxBatchSize; private final Thread thread; - DynamicBatch(BatchConsumer consumer, int batchSize) { + DynamicBatch(BatchConsumer consumer, int batchSize, int maxUnconfirmed) { this.consumer = consumer; - this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); + if (batchSize < maxUnconfirmed) { + this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2); + } else { + this.minBatchSize = min(1, maxUnconfirmed / 2); + } + this.configuredBatchSize = batchSize; + this.maxBatchSize = batchSize * 2; this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop); this.thread.start(); } @@ -104,9 +109,9 @@ private void maybeCompleteBatch(State state, boolean increaseIfCompleted) { boolean completed = this.consumer.process(state.items); if (completed) { if (increaseIfCompleted) { - state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE); + state.batchSize = min(state.batchSize * 2, this.maxBatchSize); } else { - state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE); + state.batchSize = max(state.batchSize / 2, this.minBatchSize); } state.items = new ArrayList<>(state.batchSize); } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java index ee8c397e13..8c763cde86 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -38,6 +38,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { DynamicBatchMessageAccumulator( int subEntrySize, int batchSize, + int maxUnconfirmedMessages, Codec codec, int maxFrameSize, ToLongFunction publishSequenceFunction, @@ -75,7 +76,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { } return result; }, - batchSize); + batchSize, + maxUnconfirmedMessages); } else { byte compressionCode = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); @@ -124,7 +126,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { } return result; }, - batchSize * subEntrySize); + batchSize * subEntrySize, + maxUnconfirmedMessages); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java index 5ae8faa7dd..691fc65c57 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java @@ -30,6 +30,7 @@ static MessageAccumulator createMessageAccumulator( boolean dynamicBatch, int subEntrySize, int batchSize, + int maxUnconfirmedMessages, CompressionCodec compressionCodec, Codec codec, ByteBufAllocator byteBufAllocator, @@ -44,6 +45,7 @@ static MessageAccumulator createMessageAccumulator( return new DynamicBatchMessageAccumulator( subEntrySize, batchSize, + maxUnconfirmedMessages, codec, maxFrameSize, publishSequenceFunction, diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 27552512c8..fd3cb8f25d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -180,6 +180,7 @@ public int fragmentLength(Object entity) { dynamicBatch, subEntrySize, batchSize, + maxUnconfirmedMessages, compressionCodec, environment.codec(), environment.byteBufAllocator(), diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java index 50698320f8..07a2877385 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -71,7 +71,7 @@ void itemAreProcessed() { sync.down(items.size()); return true; }; - try (DynamicBatch batch = new DynamicBatch<>(action, 100)) { + try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) { RateLimiter rateLimiter = RateLimiter.create(10000); IntStream.range(0, itemCount) .forEach( @@ -102,7 +102,7 @@ void failedProcessingIsReplayed() throws Exception { } return result; }; - try (DynamicBatch batch = new DynamicBatch<>(action, 100)) { + try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) { int firstRoundCount = itemCount / 5; IntStream.range(0, firstRoundCount) .forEach( @@ -132,7 +132,7 @@ void lowThrottlingValueShouldStillHighPublishingRate() throws Exception { return true; }; - try (DynamicBatch batch = new DynamicBatch<>(action, batchSize)) { + try (DynamicBatch batch = new DynamicBatch<>(action, batchSize, 10_000)) { MetricRegistry metrics = new MetricRegistry(); Meter rate = metrics.meter("publishing-rate"); AtomicBoolean keepGoing = new AtomicBoolean(true); From 9e16341d6e9f03c2e01dfa0a2aebc809b309fd1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 11 Jun 2025 15:51:13 +0200 Subject: [PATCH 41/43] Add SAC test against cluster References rabbitmq/rabbitmq-server#13672 --- .../stream/impl/StreamProducerBuilder.java | 6 +- src/test/java/com/rabbitmq/stream/Cli.java | 48 +++++ .../stream/impl/RecoveryClusterTest.java | 197 ++++++++++++++++-- .../com/rabbitmq/stream/impl/TestUtils.java | 3 +- 4 files changed, 234 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 43a57bbc5c..b44290868c 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -28,8 +28,8 @@ class StreamProducerBuilder implements ProducerBuilder { - static final boolean DEFAULT_DYNAMIC_BATCH = true; -// Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true")); + static final boolean DEFAULT_DYNAMIC_BATCH = + Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true")); private final StreamEnvironment environment; @@ -201,7 +201,7 @@ public Producer build() { if (this.routingConfiguration == null && this.superStream != null) { throw new IllegalArgumentException( - "A routing configuration must specified when a super stream is set"); + "A routing configuration must be specified when a super stream is set"); } if (this.stream != null) { diff --git a/src/test/java/com/rabbitmq/stream/Cli.java b/src/test/java/com/rabbitmq/stream/Cli.java index 7097725af4..5c583a461b 100644 --- a/src/test/java/com/rabbitmq/stream/Cli.java +++ b/src/test/java/com/rabbitmq/stream/Cli.java @@ -178,6 +178,30 @@ static List toConnectionInfoList(String json) { return GSON.fromJson(json, new TypeToken>() {}.getType()); } + public static List listGroupConsumers(String stream, String reference) { + ProcessState process = + rabbitmqStreams( + format( + "list_stream_group_consumers -q --stream %s --reference %s " + + "--formatter table subscription_id,state", + stream, reference)); + + List itemList = Collections.emptyList(); + String content = process.output(); + String[] lines = content.split(System.lineSeparator()); + if (lines.length > 1) { + itemList = new ArrayList<>(lines.length - 1); + for (int i = 1; i < lines.length; i++) { + String line = lines[i]; + String[] fields = line.split("\t"); + String id = fields[0]; + String state = fields[1].replace("\"", ""); + itemList.add(new SubscriptionInfo(Integer.parseInt(id), state)); + } + } + return itemList; + } + public static void restartStream(String stream) { rabbitmqStreams(" restart_stream " + stream); } @@ -420,6 +444,30 @@ public String toString() { } } + public static final class SubscriptionInfo { + + private final int id; + private final String state; + + public SubscriptionInfo(int id, String state) { + this.id = id; + this.state = state; + } + + public int id() { + return this.id; + } + + public String state() { + return this.state; + } + + @Override + public String toString() { + return "SubscriptionInfo{id='" + id + '\'' + ", state='" + state + '\'' + '}'; + } + } + public static class ProcessState { private final InputStreamPumpState inputState; diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index 24f50ee0cd..e234d723c4 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -16,18 +16,22 @@ import static com.rabbitmq.stream.impl.Assertions.assertThat; import static com.rabbitmq.stream.impl.LoadBalancerClusterTest.LOAD_BALANCER_ADDRESS; +import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_1_2; import static com.rabbitmq.stream.impl.TestUtils.newLoggerLevel; import static com.rabbitmq.stream.impl.TestUtils.sync; +import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; import static com.rabbitmq.stream.impl.ThreadUtils.threadFactory; import static com.rabbitmq.stream.impl.Tuples.pair; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.stream; import ch.qos.logback.classic.Level; import com.google.common.collect.Streams; import com.google.common.util.concurrent.RateLimiter; import com.rabbitmq.stream.*; +import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster; import com.rabbitmq.stream.impl.TestUtils.Sync; import com.rabbitmq.stream.impl.Tuples.Pair; @@ -41,15 +45,20 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -201,15 +210,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru syncs = consumers.stream().map(c -> c.waitForNewMessages(100)).collect(toList()); syncs.forEach(s -> assertThat(s).completes()); - nodes.forEach( - n -> { - LOGGER.info("Restarting node {}...", n); - Cli.restartNode(n); - LOGGER.info("Restarted node {}.", n); - }); - LOGGER.info("Rebalancing..."); - Cli.rebalance(); - LOGGER.info("Rebalancing over."); + restartCluster(); Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis()); @@ -291,8 +292,132 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @BrokerVersionAtLeast(RABBITMQ_4_1_2) + void sacWithClusterRestart(boolean superStream) throws Exception { + environment = + environmentBuilder + .uris(URIS) + .netty() + .bootstrapCustomizer( + b -> { + b.option( + ChannelOption.CONNECT_TIMEOUT_MILLIS, + (int) BACK_OFF_DELAY_POLICY.delay(0).toMillis()); + }) + .environmentBuilder() + .maxConsumersByConnection(1) + .build(); + + int consumerCount = 3; + AtomicLong lastOffset = new AtomicLong(0); + String app = "app-name"; + String s = TestUtils.streamName(testInfo); + ProducerState pState = null; + List consumers = Collections.emptyList(); + try { + StreamCreator sCreator = environment.streamCreator().stream(s); + if (superStream) { + sCreator = sCreator.superStream().partitions(1).creator(); + } + sCreator.create(); + + pState = new ProducerState(s, true, superStream, environment); + pState.start(); + + Map consumerStatus = new ConcurrentHashMap<>(); + consumers = + IntStream.range(0, consumerCount) + .mapToObj( + i -> + new ConsumerState( + s, + environment, + b -> { + b.singleActiveConsumer() + .name(app) + .noTrackingStrategy() + .consumerUpdateListener( + ctx -> { + consumerStatus.put(i, ctx.isActive()); + return OffsetSpecification.offset(lastOffset.get()); + }); + if (superStream) { + b.superStream(s); + } else { + b.stream(s); + } + }, + (ctx, m) -> lastOffset.set(ctx.offset()))) + .collect(toList()); + + Sync sync = pState.waitForNewMessages(100); + assertThat(sync).completes(); + sync = consumers.get(0).waitForNewMessages(100); + assertThat(sync).completes(); + + String streamArg = superStream ? s + "-0" : s; + + Callable checkConsumers = + () -> { + waitAtMost( + () -> { + List subscriptions = Cli.listGroupConsumers(streamArg, app); + LOGGER.info("Group consumers: {}", subscriptions); + return subscriptions.size() == consumerCount + && subscriptions.stream() + .filter(sub -> sub.state().startsWith("active")) + .count() + == 1 + && subscriptions.stream() + .filter(sub -> sub.state().startsWith("waiting")) + .count() + == 2; + }, + () -> + "Group consumers not in expected state: " + + Cli.listGroupConsumers(streamArg, app)); + return null; + }; + + checkConsumers.call(); + + restartCluster(); + + Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis()); + + sync = pState.waitForNewMessages(100); + assertThat(sync).completes(ASSERTION_TIMEOUT); + int activeIndex = + consumerStatus.entrySet().stream() + .filter(Map.Entry::getValue) + .map(Map.Entry::getKey) + .findFirst() + .orElseThrow(() -> new IllegalStateException("No active consumer found")); + + sync = consumers.get(activeIndex).waitForNewMessages(100); + assertThat(sync).completes(ASSERTION_TIMEOUT); + + checkConsumers.call(); + + } finally { + if (pState != null) { + pState.close(); + } + consumers.forEach(ConsumerState::close); + if (superStream) { + environment.deleteSuperStream(s); + } else { + environment.deleteStream(s); + } + } + } + private static class ProducerState implements AutoCloseable { + private static final AtomicLong MSG_ID_SEQ = new AtomicLong(0); + private static final byte[] BODY = "hello".getBytes(StandardCharsets.UTF_8); private final String stream; @@ -306,9 +431,19 @@ private static class ProducerState implements AutoCloseable { final AtomicReference lastExceptionInstant = new AtomicReference<>(); private ProducerState(String stream, boolean dynamicBatch, Environment environment) { + this(stream, dynamicBatch, false, environment); + } + + private ProducerState( + String stream, boolean dynamicBatch, boolean superStream, Environment environment) { this.stream = stream; - this.producer = - environment.producerBuilder().stream(stream).dynamicBatch(dynamicBatch).build(); + ProducerBuilder builder = environment.producerBuilder().dynamicBatch(dynamicBatch); + if (superStream) { + builder.superStream(stream).routing(m -> m.getProperties().getMessageIdAsString()); + } else { + builder.stream(stream); + } + this.producer = builder.build(); } void start() { @@ -327,7 +462,14 @@ void start() { try { this.limiter.acquire(1); this.producer.send( - producer.messageBuilder().addData(BODY).build(), confirmationHandler); + producer + .messageBuilder() + .properties() + .messageId(MSG_ID_SEQ.getAndIncrement()) + .messageBuilder() + .addData(BODY) + .build(), + confirmationHandler); } catch (Throwable e) { this.lastException.set(e); this.lastExceptionInstant.set(Instant.now()); @@ -380,16 +522,27 @@ private static class ConsumerState implements AutoCloseable { final AtomicReference postHandle = new AtomicReference<>(() -> {}); private ConsumerState(String stream, Environment environment) { + this(stream, environment, b -> b.stream(stream), (ctx, m) -> {}); + } + + private ConsumerState( + String stream, + Environment environment, + java.util.function.Consumer customizer, + MessageHandler delegateHandler) { this.stream = stream; - this.consumer = - environment.consumerBuilder().stream(stream) + ConsumerBuilder builder = + environment + .consumerBuilder() .offset(OffsetSpecification.first()) .messageHandler( (ctx, m) -> { + delegateHandler.handle(ctx, m); receivedCount.incrementAndGet(); postHandle.get().run(); - }) - .build(); + }); + customizer.accept(builder); + this.consumer = builder.build(); } Sync waitForNewMessages(int messageCount) { @@ -414,4 +567,16 @@ public void close() { this.consumer.close(); } } + + private static void restartCluster() { + nodes.forEach( + n -> { + LOGGER.info("Restarting node {}...", n); + Cli.restartNode(n); + LOGGER.info("Restarted node {}.", n); + }); + LOGGER.info("Rebalancing..."); + Cli.rebalance(); + LOGGER.info("Rebalancing over."); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 60aa2cc2a5..52214838b8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -1064,7 +1064,8 @@ public enum BrokerVersion { RABBITMQ_3_11_11("3.11.11"), RABBITMQ_3_11_14("3.11.14"), RABBITMQ_3_13_0("3.13.0"), - RABBITMQ_4_0_0("4.0.0"); + RABBITMQ_4_0_0("4.0.0"), + RABBITMQ_4_1_2("4.1.2"); final String value; From 50ab623bda3edd756c4319207b2970f6a8eb05e2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 18 Jun 2025 16:12:16 +0000 Subject: [PATCH 42/43] Bump org.sonatype.central:central-publishing-maven-plugin Bumps [org.sonatype.central:central-publishing-maven-plugin](https://github.com/sonatype/central-publishing-maven-plugin) from 0.7.0 to 0.8.0. - [Commits](https://github.com/sonatype/central-publishing-maven-plugin/commits) --- updated-dependencies: - dependency-name: org.sonatype.central:central-publishing-maven-plugin dependency-version: 0.8.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e01f48f3bf..9b0a6102db 100644 --- a/pom.xml +++ b/pom.xml @@ -99,7 +99,7 @@ 6026DFCA yyyy-MM-dd'T'HH:mm:ss'Z' UTF-8 - 0.7.0 + 0.8.0 true true From 9dfe66bdd0169259bc690a7b480d2460c4c7cc98 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 20 Jun 2025 10:26:42 +0200 Subject: [PATCH 43/43] Add test with message too large to fit in one frame --- src/main/java/com/rabbitmq/stream/impl/Client.java | 3 ++- .../com/rabbitmq/stream/impl/StreamProducerTest.java | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index e69681f747..e1b3f069dc 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -117,6 +117,7 @@ public class Client implements AutoCloseable { public static final int DEFAULT_PORT = 5552; public static final int DEFAULT_TLS_PORT = 5551; static final int MAX_REFERENCE_SIZE = 256; + static final int DEFAULT_MAX_FRAME_SIZE = 1048576; static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_WRITE_CALLBACK = new OutboundMessageWriteCallback(); static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK = @@ -2363,7 +2364,7 @@ public static class ClientParameters { CompressionCodecFactory compressionCodecFactory; private String virtualHost = "/"; private Duration requestedHeartbeat = Duration.ofSeconds(60); - private int requestedMaxFrameSize = 1048576; + private int requestedMaxFrameSize = DEFAULT_MAX_FRAME_SIZE; private PublishConfirmListener publishConfirmListener = NO_OP_PUBLISH_CONFIRM_LISTENER; private PublishErrorListener publishErrorListener = NO_OP_PUBLISH_ERROR_LISTENER; private ChunkListener chunkListener = diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index 63254247ad..47780c26c0 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -666,4 +666,16 @@ void creationShouldFailWithDetailsWhenUnknownHost() { .isInstanceOfAny(ConnectTimeoutException.class, UnknownHostException.class); } } + + @Test + void messageLargerThanMaxFrameSizeShouldThrowException() { + int messageSize = Client.DEFAULT_MAX_FRAME_SIZE + 1; + Producer producer = environment.producerBuilder().stream(stream).build(); + assertThatThrownBy( + () -> + producer.send( + producer.messageBuilder().addData(new byte[messageSize]).build(), + confirmationStatus -> {})) + .isInstanceOf(IllegalArgumentException.class); + } }