From 13e7de93b50ce1618f7ba199e2ceaa590a3ca33a Mon Sep 17 00:00:00 2001 From: rabbitmq-ci Date: Wed, 9 Apr 2025 09:00:39 +0000 Subject: [PATCH 01/54] [maven-release-plugin] prepare for next development iteration --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 84d285b149..a0bf5ed037 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.rabbitmq stream-client - 0.24.0 + 1.0.0-SNAPSHOT RabbitMQ Stream Java Client The RabbitMQ Stream Java client library allows Java applications to interface with @@ -43,7 +43,7 @@ https://github.com/rabbitmq/rabbitmq-stream-java-client scm:git:git://github.com/rabbitmq/rabbitmq-stream-java-client.git scm:git:https://github.com/rabbitmq/rabbitmq-stream-java-client.git - v0.24.0 + HEAD From fef3d93f2bdc65f2d881a3666a423aa6b82bc5e9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 11 Apr 2025 16:29:24 +0000 Subject: [PATCH 02/54] Bump com.google.code.gson:gson from 2.12.1 to 2.13.0 Bumps [com.google.code.gson:gson](https://github.com/google/gson) from 2.12.1 to 2.13.0. - [Release notes](https://github.com/google/gson/releases) - [Changelog](https://github.com/google/gson/blob/main/CHANGELOG.md) - [Commits](https://github.com/google/gson/compare/gson-parent-2.12.1...gson-parent-2.13.0) --- updated-dependencies: - dependency-name: com.google.code.gson:gson dependency-version: 2.13.0 dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index a0bf5ed037..07f4a0e764 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ 5.25.0 3.17.0 1.18.0 - 2.12.1 + 2.13.0 0.10.6 1.2.5 1.4.4 From 7366f859e0a418b2ac420303d6a88cafc53c41fc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 11 Apr 2025 16:29:33 +0000 Subject: [PATCH 03/54] Bump org.junit:junit-bom from 5.12.1 to 5.12.2 Bumps [org.junit:junit-bom](https://github.com/junit-team/junit5) from 5.12.1 to 5.12.2. - [Release notes](https://github.com/junit-team/junit5/releases) - [Commits](https://github.com/junit-team/junit5/compare/r5.12.1...r5.12.2) --- updated-dependencies: - dependency-name: org.junit:junit-bom dependency-version: 5.12.2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index a0bf5ed037..63c3d4db69 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ 1.5.7-2 1.8.0 1.1.10.7 - 5.12.1 + 5.12.2 3.27.3 5.17.0 5.25.0 From 51971e2ca693faf4596cea28f0d6957898a7ba47 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 14 Apr 2025 16:36:53 +0000 Subject: [PATCH 04/54] Bump io.micrometer:micrometer-core from 1.14.5 to 1.14.6 Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.14.5 to 1.14.6. - [Release notes](https://github.com/micrometer-metrics/micrometer/releases) - [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.14.5...v1.14.6) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-core dependency-version: 1.14.6 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 04d69506be..b6e4a330b8 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 4.2.0.Final 0.34.1 4.2.30 - 1.14.5 + 1.14.6 13.1.1 4.7.5 1.27.1 From 5e7a66c03763e9be31e1b656192231eda38c52da Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 14 Apr 2025 16:37:05 +0000 Subject: [PATCH 05/54] Bump io.micrometer:micrometer-tracing-integration-test Bumps [io.micrometer:micrometer-tracing-integration-test](https://github.com/micrometer-metrics/tracing) from 1.4.4 to 1.4.5. - [Release notes](https://github.com/micrometer-metrics/tracing/releases) - [Commits](https://github.com/micrometer-metrics/tracing/compare/v1.4.4...v1.4.5) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-tracing-integration-test dependency-version: 1.4.5 dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 04d69506be..856b2a7909 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 2.13.0 0.10.6 1.2.5 - 1.4.4 + 1.4.5 1.0.4 3.14.0 3.5.3 From d566333b33c8bb6110706f3177f16c6be55bd8d6 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 15 Apr 2025 16:13:13 +0000 Subject: [PATCH 06/54] Bump com.swiftmq:swiftmq-client from 13.1.1 to 13.1.2 Bumps [com.swiftmq:swiftmq-client](https://github.com/iitsoftware/swiftmq-client) from 13.1.1 to 13.1.2. - [Release notes](https://github.com/iitsoftware/swiftmq-client/releases) - [Commits](https://github.com/iitsoftware/swiftmq-client/compare/13.1.1...13.1.2) --- updated-dependencies: - dependency-name: com.swiftmq:swiftmq-client dependency-version: 13.1.2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 191cbeb4c2..5b94ad978d 100644 --- a/pom.xml +++ b/pom.xml @@ -54,7 +54,7 @@ 0.34.1 4.2.30 1.14.6 - 13.1.1 + 13.1.2 4.7.5 1.27.1 1.5.7-2 From 7f3a09577bcd04b88bad2070d6e3d2d9a91c6dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 09:48:32 +0200 Subject: [PATCH 07/54] Use RabbitMQ 4.1 as default on CI --- .github/workflows/test-rabbitmq-alphas.yml | 1 - README.adoc | 2 +- ci/cluster/docker-compose.yml | 6 +++--- ci/start-broker.sh | 2 +- ci/start-cluster.sh | 2 +- 5 files changed, 6 insertions(+), 7 deletions(-) diff --git a/.github/workflows/test-rabbitmq-alphas.yml b/.github/workflows/test-rabbitmq-alphas.yml index 0414fce497..6b8e9e32c6 100644 --- a/.github/workflows/test-rabbitmq-alphas.yml +++ b/.github/workflows/test-rabbitmq-alphas.yml @@ -14,7 +14,6 @@ jobs: strategy: matrix: rabbitmq-image: - - pivotalrabbitmq/rabbitmq:v4.0.x-otp27 - pivotalrabbitmq/rabbitmq:v4.1.x-otp27 - pivotalrabbitmq/rabbitmq:main-otp27 name: Test against ${{ matrix.rabbitmq-image }} diff --git a/README.adoc b/README.adoc index 95445f6e73..05af0b4178 100644 --- a/README.adoc +++ b/README.adoc @@ -80,7 +80,7 @@ Launch the broker: ---- docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 \ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \ - rabbitmq:4.0 + rabbitmq:4.1 ---- Enable the stream plugin: diff --git a/ci/cluster/docker-compose.yml b/ci/cluster/docker-compose.yml index 39345d3e8d..40a6860ee1 100644 --- a/ci/cluster/docker-compose.yml +++ b/ci/cluster/docker-compose.yml @@ -6,7 +6,7 @@ services: - rabbitmq-cluster hostname: node0 container_name: rabbitmq0 - image: ${RABBITMQ_IMAGE:-rabbitmq:4.0} + image: ${RABBITMQ_IMAGE:-rabbitmq:4.1} pull_policy: always ports: - "5672:5672" @@ -22,7 +22,7 @@ services: - rabbitmq-cluster hostname: node1 container_name: rabbitmq1 - image: ${RABBITMQ_IMAGE:-rabbitmq:4.0} + image: ${RABBITMQ_IMAGE:-rabbitmq:4.1} pull_policy: always ports: - "5673:5672" @@ -38,7 +38,7 @@ services: - rabbitmq-cluster hostname: node2 container_name: rabbitmq2 - image: ${RABBITMQ_IMAGE:-rabbitmq:4.0} + image: ${RABBITMQ_IMAGE:-rabbitmq:4.1} pull_policy: always ports: - "5674:5672" diff --git a/ci/start-broker.sh b/ci/start-broker.sh index a45909c7bf..38c60cc1b9 100755 --- a/ci/start-broker.sh +++ b/ci/start-broker.sh @@ -2,7 +2,7 @@ LOCAL_SCRIPT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.0} +RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.1} wait_for_message() { while ! docker logs "$1" | grep -q "$2"; diff --git a/ci/start-cluster.sh b/ci/start-cluster.sh index c7a5a9f321..b8440984fb 100755 --- a/ci/start-cluster.sh +++ b/ci/start-cluster.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -export RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.0} +export RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.1} wait_for_message() { while ! docker logs "$1" | grep -q "$2"; From 8883d16051871398b7a2285fef192fcb536cc074 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:11:11 +0200 Subject: [PATCH 08/54] Remove deprecated TlsConfiguration#hostnameVerification methods References #740 --- .../rabbitmq/stream/EnvironmentBuilder.java | 25 ------------------- .../stream/impl/StreamEnvironmentBuilder.java | 14 ----------- 2 files changed, 39 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java index 33fac4dcbc..4c8a5239f5 100644 --- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java @@ -436,31 +436,6 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy( /** Helper to configure TLS. */ interface TlsConfiguration { - /** - * Enable hostname verification. - * - *

Hostname verification is enabled by default. - * - * @return the TLS configuration helper - * @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link - * #sslContext(SslContext)} - */ - @Deprecated(forRemoval = true) - TlsConfiguration hostnameVerification(); - - /** - * Enable or disable hostname verification. - * - *

Hostname verification is enabled by default. - * - * @param hostnameVerification whether to enable hostname verification or not - * @return the TLS configuration helper - * @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link - * #sslContext(SslContext)} - */ - @Deprecated(forRemoval = true) - TlsConfiguration hostnameVerification(boolean hostnameVerification); - /** * Netty {@link SslContext} for TLS connections. * diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index bf7f395fca..f3b38cb884 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -372,20 +372,6 @@ private DefaultTlsConfiguration(EnvironmentBuilder environmentBuilder) { this.environmentBuilder = environmentBuilder; } - @Override - @SuppressWarnings("removal") - public TlsConfiguration hostnameVerification() { - this.hostnameVerification = true; - return this; - } - - @Override - @SuppressWarnings("removal") - public TlsConfiguration hostnameVerification(boolean hostnameVerification) { - this.hostnameVerification = hostnameVerification; - return this; - } - @Override public TlsConfiguration sslContext(SslContext sslContext) { this.sslContext = sslContext; From f2a90a7e22d277d474992f880744c48927d4cbf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:24:33 +0200 Subject: [PATCH 09/54] Set release version to 1.0.0 (cherry picked from commit 4cd49eb09a4ff0e7c26086b4ff14a7676fe8c206) --- release-versions.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/release-versions.txt b/release-versions.txt index 7e4e8269f6..9ff62f9ab4 100644 --- a/release-versions.txt +++ b/release-versions.txt @@ -1,5 +1,5 @@ -RELEASE_VERSION="0.24.0" -DEVELOPMENT_VERSION="1.0.0-SNAPSHOT" +RELEASE_VERSION="1.0.0" +DEVELOPMENT_VERSION="1.1.0-SNAPSHOT" RELEASE_BRANCH="main" LATEST=true From 8a6c2ca06c18fe4eb781842e2f92ffd87d8c64c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:24:52 +0200 Subject: [PATCH 10/54] Use RabbitMQ 4.1 in documentation (cherry picked from commit d75f6981ae72f97683583a80071c98e4e5cc8a61) --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5b94ad978d..3c45aabddb 100644 --- a/pom.xml +++ b/pom.xml @@ -94,7 +94,7 @@ 4.9.3.0 4.9.3 - 4.0 + 4.1 6026DFCA yyyy-MM-dd'T'HH:mm:ss'Z' From 1089f6cd48252ca0527369f8b0f49f569fd2d316 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:25:41 +0200 Subject: [PATCH 11/54] Mention API is stable now library is in 1.0.0 (cherry picked from commit c48e8f69c9508e5b11921ed47be1ea4d560718dc) --- README.adoc | 7 +++++-- src/docs/asciidoc/overview.adoc | 17 +++-------------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/README.adoc b/README.adoc index 05af0b4178..19869c317b 100644 --- a/README.adoc +++ b/README.adoc @@ -16,8 +16,11 @@ Please refer to the https://rabbitmq.github.io/rabbitmq-stream-java-client/stabl == Project Maturity -The project is in development and stabilization phase. -Features and API are subject to change, but https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#stability-of-programming-interfaces[breaking changes] will be kept to a minimum. +The library is stable and production-ready. + +== Versioning + +This library uses https://semver.org/[semantic versioning]. == Support diff --git a/src/docs/asciidoc/overview.adoc b/src/docs/asciidoc/overview.adoc index 4e1f736a02..6040e03172 100644 --- a/src/docs/asciidoc/overview.adoc +++ b/src/docs/asciidoc/overview.adoc @@ -75,30 +75,19 @@ recovery and automatic re-subscription for consumers. == Versioning -The RabbitMQ Stream Java Client is in development and stabilization phase. -When the stabilization phase ends, a 1.0.0 version will be cut, and -https://semver.org/[semantic versioning] is likely to be enforced. +This library uses https://semver.org/[semantic versioning]. -Before reaching the stable phase, the client will use a versioning scheme of `[0.MINOR.PATCH]` where: - -* `0` indicates the project is still in a stabilization phase. -* `MINOR` is a 0-based number incrementing with each new release cycle. It generally reflects significant changes like new features and potentially some programming interfaces changes. -* `PATCH` is a 0-based number incrementing with each service release, that is bux fixes. - -Breaking changes between releases can happen but will be kept to a minimum. The next section provides more details about the evolution of programming interfaces. [[stability-of-programming-interfaces]] == Stability of Programming Interfaces -The RabbitMQ Stream Java Client is in active development but its programming interfaces will remain as stable as possible. -There is no guarantee though that they will remain completely stable, at least until it reaches version 1.0.0. - The client contains 2 sets of programming interfaces whose stability are of interest for application developers: * Application Programming Interfaces (API): those are the ones used to write application logic. They include the interfaces and classes in the `com.rabbitmq.stream` package (e.g. `Producer`, `Consumer`, `Message`). -These API constitute the main programming model of the client and will be kept as stable as possible. +These API constitute the main programming model of the client and are kept as stable as possible. +New features may require to add methods to existing interfaces. * Service Provider Interfaces (SPI): those are interfaces to implement mainly technical behavior in the client. They are not meant to be used to implement application logic. Application developers may have to refer to them in the configuration phase and if they want to customize some internal behavior of the client. From 28709386e1dcef3130bf00787683bfed586c744a Mon Sep 17 00:00:00 2001 From: rabbitmq-ci Date: Wed, 16 Apr 2025 15:36:58 +0000 Subject: [PATCH 12/54] [maven-release-plugin] prepare release v1.0.0 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 3c45aabddb..375e3457c9 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.rabbitmq stream-client - 1.0.0-SNAPSHOT + 1.0.0 RabbitMQ Stream Java Client The RabbitMQ Stream Java client library allows Java applications to interface with @@ -43,7 +43,7 @@ https://github.com/rabbitmq/rabbitmq-stream-java-client scm:git:git://github.com/rabbitmq/rabbitmq-stream-java-client.git scm:git:https://github.com/rabbitmq/rabbitmq-stream-java-client.git - HEAD + v1.0.0 From 97aaa8445d03a75a807348380538bf4133ed0d3f Mon Sep 17 00:00:00 2001 From: rabbitmq-ci Date: Wed, 16 Apr 2025 15:36:59 +0000 Subject: [PATCH 13/54] [maven-release-plugin] prepare for next development iteration --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 375e3457c9..ea8e4cba52 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.rabbitmq stream-client - 1.0.0 + 1.1.0-SNAPSHOT RabbitMQ Stream Java Client The RabbitMQ Stream Java client library allows Java applications to interface with @@ -43,7 +43,7 @@ https://github.com/rabbitmq/rabbitmq-stream-java-client scm:git:git://github.com/rabbitmq/rabbitmq-stream-java-client.git scm:git:https://github.com/rabbitmq/rabbitmq-stream-java-client.git - v1.0.0 + HEAD From 0b1d720d9d325e9dc98403063c948d3d18cba6aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:41:20 +0200 Subject: [PATCH 14/54] Set release version to 1.1.0 --- release-versions.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release-versions.txt b/release-versions.txt index 9ff62f9ab4..c3aa198d6e 100644 --- a/release-versions.txt +++ b/release-versions.txt @@ -1,4 +1,4 @@ -RELEASE_VERSION="1.0.0" +RELEASE_VERSION="1.1.0" DEVELOPMENT_VERSION="1.1.0-SNAPSHOT" RELEASE_BRANCH="main" LATEST=true From 59b95c00d13fe7c6a5060ce5b71d60a45b2b4b1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:41:39 +0200 Subject: [PATCH 15/54] Delete Dockerfile Leftover from Stream PerfTest. --- Dockerfile | 96 ------------------------------------------------------ 1 file changed, 96 deletions(-) delete mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 4deefa0459..0000000000 --- a/Dockerfile +++ /dev/null @@ -1,96 +0,0 @@ -FROM ubuntu:22.04 as builder - -ARG stream_perf_test_url="set-url-here" - -RUN set -eux; \ - \ - apt-get update; \ - apt-get -y upgrade; \ - apt-get install --yes --no-install-recommends \ - ca-certificates \ - wget \ - gnupg \ - jq - -ARG JAVA_VERSION="21" - -RUN if [ "$(uname -m)" = "aarch64" ] || [ "$(uname -m)" = "arm64" ]; then echo "ARM"; ARCH="arm"; BUNDLE="jdk"; else echo "x86"; ARCH="x86"; BUNDLE="jdk"; fi \ - && wget "https://api.azul.com/zulu/download/community/v1.0/bundles/latest/?java_version=$JAVA_VERSION&ext=tar.gz&os=linux&arch=$ARCH&hw_bitness=64&release_status=ga&bundle_type=$BUNDLE" -O jdk-info.json -RUN wget --progress=bar:force:noscroll -O "jdk.tar.gz" $(cat jdk-info.json | jq --raw-output .url) -RUN echo "$(cat jdk-info.json | jq --raw-output .sha256_hash) *jdk.tar.gz" | sha256sum --check --strict - - -RUN set -eux; \ - if [ "$(uname -m)" = "x86_64" ] ; then JAVA_PATH="/usr/lib/jdk-$JAVA_VERSION"; \ - mkdir $JAVA_PATH && \ - tar --extract --file jdk.tar.gz --directory "$JAVA_PATH" --strip-components 1; \ - $JAVA_PATH/bin/jlink --compress=2 --output /jre --add-modules java.base,jdk.management,java.naming,java.xml,jdk.unsupported,jdk.crypto.cryptoki,jdk.httpserver; \ - /jre/bin/java -version; \ - fi - -RUN set -eux; \ - if [ "$(uname -m)" = "aarch64" ] || [ "$(uname -m)" = "arm64" ] ; then JAVA_PATH="/jre"; \ - mkdir $JAVA_PATH && \ - tar --extract --file jdk.tar.gz --directory "$JAVA_PATH" --strip-components 1; \ - fi - -# pgpkeys.uk is quite reliable, but allow for substitutions locally -ARG PGP_KEYSERVER=hkps://keys.openpgp.org -# If you are building this image locally and are getting `gpg: keyserver receive failed: No data` errors, -# run the build with a different PGP_KEYSERVER, e.g. docker build --tag rabbitmq:3.7 --build-arg PGP_KEYSERVER=pgpkeys.eu 3.7/ubuntu -# For context, see https://github.com/docker-library/official-images/issues/4252 - -# https://www.rabbitmq.com/signatures.html#importing-gpg -ENV RABBITMQ_PGP_KEY_ID="0x0A9AF2115F4687BD29803A206B73A36E6026DFCA" -ENV STREAM_PERF_TEST_HOME="/stream_perf_test" - -RUN set -eux; \ - \ - wget --progress dot:giga --output-document "/usr/local/src/stream-perf-test.jar.asc" "$stream_perf_test_url.asc"; \ - wget --progress dot:giga --output-document "/usr/local/src/stream-perf-test.jar" "$stream_perf_test_url"; \ - STREAM_PERF_TEST_SHA256="$(wget -qO- $stream_perf_test_url.sha256)"; \ - echo "$STREAM_PERF_TEST_SHA256 /usr/local/src/stream-perf-test.jar" | sha256sum --check --strict -; \ - \ - export GNUPGHOME="$(mktemp -d)"; \ - gpg --batch --keyserver "$PGP_KEYSERVER" --recv-keys "$RABBITMQ_PGP_KEY_ID"; \ - gpg --batch --verify "/usr/local/src/stream-perf-test.jar.asc" "/usr/local/src/stream-perf-test.jar"; \ - gpgconf --kill all; \ - rm -rf "$GNUPGHOME"; \ - \ - mkdir -p "$STREAM_PERF_TEST_HOME"; \ - cp /usr/local/src/stream-perf-test.jar $STREAM_PERF_TEST_HOME/stream-perf-test.jar - -FROM ubuntu:22.04 - -# we need locales support for characters like ยต to show up correctly in the console -RUN set -eux; \ - apt-get update; \ - apt-get -y upgrade; \ - apt-get install -y --no-install-recommends \ - locales \ - wget \ - ; \ - rm -rf /var/lib/apt/lists/*; \ - locale-gen en_US.UTF-8 - -ENV LANG en_US.UTF-8 -ENV LANGUAGE en_US:en -ENV LC_ALL en_US.UTF-8 - -ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk/jre -RUN mkdir -p $JAVA_HOME -COPY --from=builder /jre $JAVA_HOME/ -RUN ln -svT $JAVA_HOME/bin/java /usr/local/bin/java - -RUN mkdir -p /stream_perf_test -WORKDIR /stream_perf_test -COPY --from=builder /stream_perf_test ./ -RUN set -eux; \ - if [ "$(uname -m)" = "x86_64" ] ; then java -jar stream-perf-test.jar --help ; \ - fi - -RUN groupadd --gid 1000 stream-perf-test -RUN useradd --uid 1000 --gid stream-perf-test --comment "perf-test user" stream-perf-test - -USER stream-perf-test:stream-perf-test - -ENTRYPOINT ["java", "-Dio.netty.processId=1", "-jar", "stream-perf-test.jar"] From 9d3cea55c816a8aa9c0c1895dcdb73908f7d3f80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 16 Apr 2025 17:44:45 +0200 Subject: [PATCH 16/54] Remove pre-1.0 versioning section from readme --- README.adoc | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/README.adoc b/README.adoc index 19869c317b..2a96cd7585 100644 --- a/README.adoc +++ b/README.adoc @@ -18,10 +18,6 @@ Please refer to the https://rabbitmq.github.io/rabbitmq-stream-java-client/stabl The library is stable and production-ready. -== Versioning - -This library uses https://semver.org/[semantic versioning]. - == Support * For questions: https://groups.google.com/forum/#!forum/rabbitmq-users[RabbitMQ Users] @@ -54,17 +50,7 @@ This library requires at least Java 11, but Java 21 or more is recommended. == Versioning -The RabbitMQ Stream Java Client is in development and stabilization phase. -When the stabilization phase ends, a 1.0.0 version will be cut, and -https://semver.org/[semantic versioning] is likely to be enforced. - -Before reaching the stable phase, the client will use a versioning scheme of `[0.MINOR.PATCH]` where: - -* `0` indicates the project is still in a stabilization phase. -* `MINOR` is a 0-based number incrementing with each new release cycle. It generally reflects significant changes like new features and potentially some programming interfaces changes. -* `PATCH` is a 0-based number incrementing with each service release, that is bux fixes. - -Breaking changes between releases can happen but will be kept to a minimum. +This library uses https://semver.org/[semantic versioning]. == Build Instructions From e571b1a28439ea82bf7aef3abb834dc1113651eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 17 Apr 2025 16:40:10 +0200 Subject: [PATCH 17/54] Test against Java 24 stable And remove Java 23. --- .github/workflows/test-supported-java-versions.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-supported-java-versions.yml b/.github/workflows/test-supported-java-versions.yml index f7694cdc18..f2ac5a1289 100644 --- a/.github/workflows/test-supported-java-versions.yml +++ b/.github/workflows/test-supported-java-versions.yml @@ -11,7 +11,7 @@ jobs: strategy: matrix: distribution: [ 'temurin' ] - version: [ '11', '17', '21', '23', '24-ea', '25-ea' ] + version: [ '11', '17', '21', '24', '25-ea' ] include: - distribution: 'semeru' version: '17' From e4b76cc77fcbca7f66497070c3548707f3c9c5ea Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 24 Apr 2025 16:14:36 +0000 Subject: [PATCH 18/54] Bump com.google.code.gson:gson from 2.13.0 to 2.13.1 Bumps [com.google.code.gson:gson](https://github.com/google/gson) from 2.13.0 to 2.13.1. - [Release notes](https://github.com/google/gson/releases) - [Changelog](https://github.com/google/gson/blob/main/CHANGELOG.md) - [Commits](https://github.com/google/gson/compare/gson-parent-2.13.0...gson-parent-2.13.1) --- updated-dependencies: - dependency-name: com.google.code.gson:gson dependency-version: 2.13.1 dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ea8e4cba52..f5c8558d8d 100644 --- a/pom.xml +++ b/pom.xml @@ -66,7 +66,7 @@ 5.25.0 3.17.0 1.18.0 - 2.13.0 + 2.13.1 0.10.6 1.2.5 1.4.5 From f616d4c02fe5b57d4d1a5561d959e1b6c8b60e6b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 5 May 2025 17:07:38 +0000 Subject: [PATCH 19/54] Bump com.github.luben:zstd-jni from 1.5.7-2 to 1.5.7-3 Bumps [com.github.luben:zstd-jni](https://github.com/luben/zstd-jni) from 1.5.7-2 to 1.5.7-3. - [Commits](https://github.com/luben/zstd-jni/compare/v1.5.7-2...v1.5.7-3) --- updated-dependencies: - dependency-name: com.github.luben:zstd-jni dependency-version: 1.5.7-3 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index f5c8558d8d..274b44d75f 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ 13.1.2 4.7.5 1.27.1 - 1.5.7-2 + 1.5.7-3 1.8.0 1.1.10.7 5.12.2 From 5ff3ec21260bd417b106d032b3c32618217d46fe Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 6 May 2025 15:14:00 +0000 Subject: [PATCH 20/54] Bump netty.version from 4.2.0.Final to 4.2.1.Final Bumps `netty.version` from 4.2.0.Final to 4.2.1.Final. Updates `io.netty:netty-transport` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-codec` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-handler` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-transport-native-epoll` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) --- updated-dependencies: - dependency-name: io.netty:netty-transport dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-codec dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-handler dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-transport-native-epoll dependency-version: 4.2.1.Final dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 274b44d75f..ebc8fa1fda 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.0.Final + 4.2.1.Final 0.34.1 4.2.30 1.14.6 From 40a6560c4ac5b8a2f9a6ed13a6cf68b3d8daa59b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 7 May 2025 10:33:02 +0200 Subject: [PATCH 21/54] Back to Netty 4.2.0 Experiencing unexpected disconnections in some tests. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ebc8fa1fda..274b44d75f 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.1.Final + 4.2.0.Final 0.34.1 4.2.30 1.14.6 From 5ff57aabe08ecf8050cd0af0c937994c831ef691 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 7 May 2025 16:03:29 +0000 Subject: [PATCH 22/54] Bump netty.version from 4.2.0.Final to 4.2.1.Final Bumps `netty.version` from 4.2.0.Final to 4.2.1.Final. Updates `io.netty:netty-transport` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-codec` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-handler` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) Updates `io.netty:netty-transport-native-epoll` from 4.2.0.Final to 4.2.1.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.0.Final...netty-4.2.1.Final) --- updated-dependencies: - dependency-name: io.netty:netty-transport dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-codec dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-handler dependency-version: 4.2.1.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-transport-native-epoll dependency-version: 4.2.1.Final dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 274b44d75f..ebc8fa1fda 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.0.Final + 4.2.1.Final 0.34.1 4.2.30 1.14.6 From b2a2718bc580ec3e4798302b25c3da1ed1c7ad48 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 7 May 2025 16:03:45 +0000 Subject: [PATCH 23/54] Bump com.google.googlejavaformat:google-java-format Bumps [com.google.googlejavaformat:google-java-format](https://github.com/google/google-java-format) from 1.26.0 to 1.27.0. - [Release notes](https://github.com/google/google-java-format/releases) - [Commits](https://github.com/google/google-java-format/compare/v1.26.0...v1.27.0) --- updated-dependencies: - dependency-name: com.google.googlejavaformat:google-java-format dependency-version: 1.27.0 dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 274b44d75f..ce9052892a 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ 3.2.1 1.37 2.44.4 - 1.26.0 + 1.27.0 0.8.13 4.9.3.0 4.9.3 From 2a02f06754416945e5e936dfa57baa7a2ee9044d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 12 May 2025 09:03:35 +0200 Subject: [PATCH 24/54] Back to Netty 4.2.0 Experiencing unexpected disconnections in some tests. --- pom.xml | 2 +- .../stream/impl/StreamProducerTest.java | 63 +++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4c8ac0194a..ce9052892a 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.1.Final + 4.2.0.Final 0.34.1 4.2.30 1.14.6 diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index 63254247ad..bff57ad269 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -57,6 +58,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import wiremock.org.checkerframework.checker.units.qual.A; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamProducerTest { @@ -88,6 +90,67 @@ void tearDown() { environment.close(); } + private static AtomicLong rate() { + AtomicLong count = new AtomicLong(); + AtomicLong tick = new AtomicLong(System.nanoTime()); + + Executors.newSingleThreadScheduledExecutor() + .scheduleAtFixedRate( + () -> { + long now = System.nanoTime(); + long before = tick.getAndSet(now); + long elapsed = now - before; + long sent = count.getAndSet(0); + System.out.println("Rate " + (sent * 1_000_000_000L / elapsed) + " msg/s"); + }, + 1, + 1, + TimeUnit.SECONDS); + return count; + } + + @Test + void test() { + AtomicLong count = rate(); + Producer producer = environment.producerBuilder().stream(stream) + .maxUnconfirmedMessages(10) + .build(); + + while(true) { + producer.send(producer.messageBuilder().build(), s -> { }); + count.incrementAndGet(); + } + + } + + @Test + void client() throws Exception { + int permits = 10; + Semaphore semaphore = new Semaphore(permits); + Client client = cf.get(new Client.ClientParameters().publishConfirmListener(new Client.PublishConfirmListener() { + @Override + public void handle(byte publisherId, long publishingId) { + semaphore.release(); + } + })); + + byte pubId = (byte) 0; + client.declarePublisher(pubId, null, stream); + + AtomicLong count = rate(); + + List messages = IntStream.range(0, permits).mapToObj(ignored -> client + .messageBuilder() + .addData("hello".getBytes(StandardCharsets.UTF_8)) + .build()).collect(Collectors.toList()); + while (true) { + semaphore.acquire(permits); + client.publish(pubId, messages); + count.addAndGet(permits); + } + + } + @Test void send() throws Exception { int batchSize = 10; From 65cb76d93c54f9dcc22ef4a45e8a590c5b196e99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 12 May 2025 10:15:47 +0200 Subject: [PATCH 25/54] Revert "Back to Netty 4.2.0" This reverts commit 2a02f06754416945e5e936dfa57baa7a2ee9044d. --- pom.xml | 2 +- .../stream/impl/StreamProducerTest.java | 63 ------------------- 2 files changed, 1 insertion(+), 64 deletions(-) diff --git a/pom.xml b/pom.xml index ce9052892a..4c8ac0194a 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.0.Final + 4.2.1.Final 0.34.1 4.2.30 1.14.6 diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java index bff57ad269..63254247ad 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerTest.java @@ -41,7 +41,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -58,7 +57,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import wiremock.org.checkerframework.checker.units.qual.A; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamProducerTest { @@ -90,67 +88,6 @@ void tearDown() { environment.close(); } - private static AtomicLong rate() { - AtomicLong count = new AtomicLong(); - AtomicLong tick = new AtomicLong(System.nanoTime()); - - Executors.newSingleThreadScheduledExecutor() - .scheduleAtFixedRate( - () -> { - long now = System.nanoTime(); - long before = tick.getAndSet(now); - long elapsed = now - before; - long sent = count.getAndSet(0); - System.out.println("Rate " + (sent * 1_000_000_000L / elapsed) + " msg/s"); - }, - 1, - 1, - TimeUnit.SECONDS); - return count; - } - - @Test - void test() { - AtomicLong count = rate(); - Producer producer = environment.producerBuilder().stream(stream) - .maxUnconfirmedMessages(10) - .build(); - - while(true) { - producer.send(producer.messageBuilder().build(), s -> { }); - count.incrementAndGet(); - } - - } - - @Test - void client() throws Exception { - int permits = 10; - Semaphore semaphore = new Semaphore(permits); - Client client = cf.get(new Client.ClientParameters().publishConfirmListener(new Client.PublishConfirmListener() { - @Override - public void handle(byte publisherId, long publishingId) { - semaphore.release(); - } - })); - - byte pubId = (byte) 0; - client.declarePublisher(pubId, null, stream); - - AtomicLong count = rate(); - - List messages = IntStream.range(0, permits).mapToObj(ignored -> client - .messageBuilder() - .addData("hello".getBytes(StandardCharsets.UTF_8)) - .build()).collect(Collectors.toList()); - while (true) { - semaphore.acquire(permits); - client.publish(pubId, messages); - count.addAndGet(permits); - } - - } - @Test void send() throws Exception { int batchSize = 10; From 05df2a56da0e9d53d24170e2eb51779d75b96147 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 12 May 2025 10:16:19 +0200 Subject: [PATCH 26/54] Back to Netty 4.2.0 Experiencing unexpected disconnections in some tests. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4c8ac0194a..ce9052892a 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.1.Final + 4.2.0.Final 0.34.1 4.2.30 1.14.6 From 8a198eeb45592726736e21580330ece1bfeb5586 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 12 May 2025 14:54:47 +0200 Subject: [PATCH 27/54] Make dynamic batch pump more aggressively Low value for maxUnconfirmedMessages combined with unfortunate timing can make the dynamic batch flush only on timeout. This commit makes the dynamic batch class "pump" for new items (messages) more aggressively, which mitigates the problem. References #750 --- .../rabbitmq/stream/impl/DynamicBatch.java | 26 +++++++++----- .../stream/impl/DynamicBatchTest.java | 36 +++++++++++++++++++ 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index 7e2d1a7369..c6038dfed6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -69,15 +69,7 @@ private void loop() { if (state.items.size() >= state.batchSize) { this.maybeCompleteBatch(state, true); } else { - item = this.requests.poll(); - if (item == null) { - this.maybeCompleteBatch(state, false); - } else { - state.items.add(item); - if (state.items.size() >= state.batchSize) { - this.maybeCompleteBatch(state, true); - } - } + pump(state, 2); } } else { this.maybeCompleteBatch(state, false); @@ -85,6 +77,22 @@ private void loop() { } } + private void pump(State state, int pumpCount) { + if (pumpCount <= 0) { + return; + } + T item = this.requests.poll(); + if (item == null) { + this.maybeCompleteBatch(state, false); + } else { + state.items.add(item); + if (state.items.size() >= state.batchSize) { + this.maybeCompleteBatch(state, true); + } + this.pump(state, pumpCount - 1); + } + } + private static final class State { int batchSize; diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java index dca9810762..50698320f8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -23,8 +23,11 @@ import com.rabbitmq.stream.impl.TestUtils.Sync; import java.util.Locale; import java.util.Random; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; @@ -118,4 +121,37 @@ void failedProcessingIsReplayed() throws Exception { waitAtMost(() -> collected.get() == itemCount); } } + + @Test + void lowThrottlingValueShouldStillHighPublishingRate() throws Exception { + int batchSize = 10; + Semaphore semaphore = new Semaphore(batchSize); + DynamicBatch.BatchConsumer action = + items -> { + semaphore.release(items.size()); + return true; + }; + + try (DynamicBatch batch = new DynamicBatch<>(action, batchSize)) { + MetricRegistry metrics = new MetricRegistry(); + Meter rate = metrics.meter("publishing-rate"); + AtomicBoolean keepGoing = new AtomicBoolean(true); + AtomicLong sequence = new AtomicLong(); + new Thread( + () -> { + while (keepGoing.get() && !Thread.interrupted()) { + long id = sequence.getAndIncrement(); + if (semaphore.tryAcquire()) { + batch.add(id); + rate.mark(); + } + } + }) + .start(); + long start = System.nanoTime(); + waitAtMost( + () -> + System.nanoTime() - start > TimeUnit.SECONDS.toNanos(1) && rate.getMeanRate() > 1000); + } + } } From 196f917c7835a12ae543bed983d0f05540f632d0 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 May 2025 16:14:01 +0000 Subject: [PATCH 28/54] Bump io.micrometer:micrometer-core from 1.14.6 to 1.14.7 Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.14.6 to 1.14.7. - [Release notes](https://github.com/micrometer-metrics/micrometer/releases) - [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.14.6...v1.14.7) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-core dependency-version: 1.14.7 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ce9052892a..fe18f7409c 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 4.2.0.Final 0.34.1 4.2.30 - 1.14.6 + 1.14.7 13.1.2 4.7.5 1.27.1 From c610ef8e7ab25d8cf4b8d5b0c91f0a141e682dcc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 May 2025 16:16:56 +0000 Subject: [PATCH 29/54] Bump io.micrometer:micrometer-tracing-integration-test Bumps [io.micrometer:micrometer-tracing-integration-test](https://github.com/micrometer-metrics/tracing) from 1.4.5 to 1.4.6. - [Release notes](https://github.com/micrometer-metrics/tracing/releases) - [Commits](https://github.com/micrometer-metrics/tracing/compare/v1.4.5...v1.4.6) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-tracing-integration-test dependency-version: 1.4.6 dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ce9052892a..56447fb07f 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 2.13.1 0.10.6 1.2.5 - 1.4.5 + 1.4.6 1.0.4 3.14.0 3.5.3 From ea06465bdbdd5046e84ca24ba93f5c0f1189f853 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 13 May 2025 16:20:26 +0000 Subject: [PATCH 30/54] Bump io.micrometer:micrometer-tracing-integration-test Bumps [io.micrometer:micrometer-tracing-integration-test](https://github.com/micrometer-metrics/tracing) from 1.4.6 to 1.5.0. - [Release notes](https://github.com/micrometer-metrics/tracing/releases) - [Commits](https://github.com/micrometer-metrics/tracing/compare/v1.4.6...v1.5.0) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-tracing-integration-test dependency-version: 1.5.0 dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e880edfbbe..aab50b070e 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 2.13.1 0.10.6 1.2.5 - 1.4.6 + 1.5.0 1.0.4 3.14.0 3.5.3 From 447e63b664203596091264e44e62e7fdbfcb07a1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 13 May 2025 16:20:41 +0000 Subject: [PATCH 31/54] Bump io.micrometer:micrometer-core from 1.14.7 to 1.15.0 Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.14.7 to 1.15.0. - [Release notes](https://github.com/micrometer-metrics/micrometer/releases) - [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.14.7...v1.15.0) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-core dependency-version: 1.15.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e880edfbbe..d2a12b8ac6 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 4.2.0.Final 0.34.1 4.2.30 - 1.14.7 + 1.15.0 13.1.2 4.7.5 1.27.1 From 35beeeb21952edf3222e15ecba9fe28c7e777a40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 20 May 2025 17:44:32 +0200 Subject: [PATCH 32/54] Bump Netty to 4.2.1 References #748 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index de6e8e1c42..d85d136d5a 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.0.Final + 4.2.1.Final 0.34.1 4.2.30 1.15.0 From d364f6b030963a9c92082b088c43dc850c079b55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Tue, 20 May 2025 17:44:57 +0200 Subject: [PATCH 33/54] Use Netty's pooled memory allocator Instead of the adaptative one, which is the default in 4.2. The pooled allocator was the default in 4.1. --- src/main/java/com/rabbitmq/stream/impl/Client.java | 2 +- src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java | 1 + .../com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java | 2 +- src/main/java/com/rabbitmq/stream/impl/Utils.java | 6 ++++++ src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java | 3 +-- src/test/java/com/rabbitmq/stream/impl/FrameTest.java | 3 +-- .../com/rabbitmq/stream/impl/StreamEnvironmentTest.java | 2 +- .../rabbitmq/stream/impl/StreamEnvironmentUnitTest.java | 7 +++---- .../com/rabbitmq/stream/impl/StreamProducerUnitTest.java | 2 +- 9 files changed, 16 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 8679844939..e69681f747 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -256,7 +256,7 @@ public Client(ClientParameters parameters) { b.option( ChannelOption.ALLOCATOR, parameters.byteBufAllocator == null - ? ByteBufAllocator.DEFAULT + ? Utils.byteBufAllocator() : parameters.byteBufAllocator); } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index c6038dfed6..37494535b6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -111,6 +111,7 @@ private void maybeCompleteBatch(State state, boolean increaseIfCompleted) { state.items = new ArrayList<>(state.batchSize); } } catch (Exception e) { + // e.printStackTrace(); LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage()); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index f3b38cb884..1be66aa36b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -422,7 +422,7 @@ static class DefaultNettyConfiguration implements NettyConfiguration { private final EnvironmentBuilder environmentBuilder; private EventLoopGroup eventLoopGroup; - private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT; + private ByteBufAllocator byteBufAllocator = Utils.byteBufAllocator(); private Consumer channelCustomizer = noOpConsumer(); private Consumer bootstrapCustomizer = noOpConsumer(); diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index 4ea934e911..4b535bb21d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -19,6 +19,8 @@ import com.rabbitmq.stream.*; import com.rabbitmq.stream.impl.Client.ClientParameters; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoopGroup; import io.netty.channel.MultiThreadIoEventLoopGroup; @@ -415,6 +417,10 @@ static EventLoopGroup eventLoopGroup() { return new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); } + static ByteBufAllocator byteBufAllocator() { + return PooledByteBufAllocator.DEFAULT; + } + /* class to help testing SAC on super streams */ diff --git a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java index 02844308dd..987cf311c9 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java @@ -20,7 +20,6 @@ import com.rabbitmq.stream.impl.ServerFrameHandler.DeliverVersion1FrameHandler; import com.rabbitmq.stream.metrics.NoOpMetricsCollector; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -51,7 +50,7 @@ public MessageBuilder messageBuilder() { ByteBuf generateFrameBuffer( int nbMessages, long chunkOffset, int dataSize, Iterable messages) { - ByteBuf bb = ByteBufAllocator.DEFAULT.buffer(1024); + ByteBuf bb = Utils.byteBufAllocator().buffer(1024); bb.writeShort(Utils.encodeRequestCode(Constants.COMMAND_DELIVER)) .writeShort(Constants.VERSION_1) .writeByte(1) // subscription id diff --git a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java index 28422d40ad..7198facf5a 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java @@ -26,7 +26,6 @@ import com.rabbitmq.stream.Message; import com.rabbitmq.stream.Properties; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import java.io.ByteArrayOutputStream; @@ -149,7 +148,7 @@ public TestDesc(String description, List sizes, int expectedCalls) { tests.forEach( test -> { Channel channel = Mockito.mock(Channel.class); - Mockito.when(channel.alloc()).thenReturn(ByteBufAllocator.DEFAULT); + Mockito.when(channel.alloc()).thenReturn(Utils.byteBufAllocator()); Mockito.when(channel.writeAndFlush(Mockito.any())) .thenReturn(Mockito.mock(ChannelFuture.class)); diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index f076a78fde..9588002d54 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -216,7 +216,7 @@ void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed(boolean lazyInit } @Test - void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) throws Exception { + void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) { int messageCount = 100; int streamCount = 20; int producersCount = ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT * 3 + 10; diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java index e8d3f25e34..8dafb175ad 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java @@ -26,7 +26,6 @@ import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException; -import io.netty.buffer.ByteBufAllocator; import java.net.URI; import java.time.Duration; import java.util.Arrays; @@ -94,7 +93,7 @@ Client.ClientParameters duplicate() { ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, - ByteBufAllocator.DEFAULT, + Utils.byteBufAllocator(), false, type -> "locator-connection", cf, @@ -163,7 +162,7 @@ void shouldTryUrisOnInitializationFailure() throws Exception { ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, - ByteBufAllocator.DEFAULT, + Utils.byteBufAllocator(), false, type -> "locator-connection", cf, @@ -195,7 +194,7 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled( ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, - ByteBufAllocator.DEFAULT, + Utils.byteBufAllocator(), lazyInit, type -> "locator-connection", cf, diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 1150a90842..51f320a46c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -75,7 +75,7 @@ public class StreamProducerUnitTest { void init() { mocks = MockitoAnnotations.openMocks(this); executorService = Executors.newScheduledThreadPool(2); - when(channel.alloc()).thenReturn(ByteBufAllocator.DEFAULT); + when(channel.alloc()).thenReturn(Utils.byteBufAllocator()); when(channel.writeAndFlush(Mockito.any())).thenReturn(channelFuture); when(client.allocateNoCheck(any(ByteBufAllocator.class), anyInt())) .thenAnswer( From fe8384059b5f9be9c89bda6cf6556eef86353818 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 21 May 2025 08:39:33 +0200 Subject: [PATCH 34/54] Update default Netty memory allocator in documentation --- src/docs/asciidoc/api.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index f4c5d0be9a..75a3d0da76 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -263,7 +263,7 @@ It is the developer's responsibility to close the `EventLoopGroup` they provide. |`netty#ByteBufAllocator` |`ByteBuf` allocator. -|ByteBufAllocator.DEFAULT +|PooledByteBufAllocator.DEFAULT |`netty#channelCustomizer` |Extension point to customize Netty's `Channel` instances used for connections. From dcb1d5658fadc849bbf974c3e1707edbc162d4eb Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 21 May 2025 16:33:30 +0000 Subject: [PATCH 35/54] Bump org.mockito:mockito-core from 5.17.0 to 5.18.0 Bumps [org.mockito:mockito-core](https://github.com/mockito/mockito) from 5.17.0 to 5.18.0. - [Release notes](https://github.com/mockito/mockito/releases) - [Commits](https://github.com/mockito/mockito/compare/v5.17.0...v5.18.0) --- updated-dependencies: - dependency-name: org.mockito:mockito-core dependency-version: 5.18.0 dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d85d136d5a..26b1feb7f2 100644 --- a/pom.xml +++ b/pom.xml @@ -62,7 +62,7 @@ 1.1.10.7 5.12.2 3.27.3 - 5.17.0 + 5.18.0 5.25.0 3.17.0 1.18.0 From 478db44ee71a5c5e3bac27d1ae35cb76e0091385 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 26 May 2025 16:25:38 +0000 Subject: [PATCH 36/54] Bump io.dropwizard.metrics:metrics-core from 4.2.30 to 4.2.32 Bumps [io.dropwizard.metrics:metrics-core](https://github.com/dropwizard/metrics) from 4.2.30 to 4.2.32. - [Release notes](https://github.com/dropwizard/metrics/releases) - [Commits](https://github.com/dropwizard/metrics/compare/v4.2.30...v4.2.32) --- updated-dependencies: - dependency-name: io.dropwizard.metrics:metrics-core dependency-version: 4.2.32 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 26b1feb7f2..0f394611f7 100644 --- a/pom.xml +++ b/pom.xml @@ -52,7 +52,7 @@ 1.2.13 4.2.1.Final 0.34.1 - 4.2.30 + 4.2.32 1.15.0 13.1.2 4.7.5 From 6268e19033d75dbf7734978788991c2b196fa20e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 28 May 2025 16:55:56 +0000 Subject: [PATCH 37/54] Bump com.diffplug.spotless:spotless-maven-plugin from 2.44.4 to 2.44.5 Bumps [com.diffplug.spotless:spotless-maven-plugin](https://github.com/diffplug/spotless) from 2.44.4 to 2.44.5. - [Release notes](https://github.com/diffplug/spotless/releases) - [Changelog](https://github.com/diffplug/spotless/blob/main/CHANGES.md) - [Commits](https://github.com/diffplug/spotless/compare/maven/2.44.4...maven/2.44.5) --- updated-dependencies: - dependency-name: com.diffplug.spotless:spotless-maven-plugin dependency-version: 2.44.5 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0f394611f7..4e105dfaf5 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ 2.3.2 3.2.1 1.37 - 2.44.4 + 2.44.5 1.27.0 0.8.13 4.9.3.0 From e53adf956af56a8c76342321c21e3f96c19ac32c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 2 Jun 2025 14:49:33 +0000 Subject: [PATCH 38/54] Bump org.apache.maven.plugins:maven-clean-plugin from 3.4.1 to 3.5.0 Bumps [org.apache.maven.plugins:maven-clean-plugin](https://github.com/apache/maven-clean-plugin) from 3.4.1 to 3.5.0. - [Release notes](https://github.com/apache/maven-clean-plugin/releases) - [Commits](https://github.com/apache/maven-clean-plugin/compare/maven-clean-plugin-3.4.1...maven-clean-plugin-3.5.0) --- updated-dependencies: - dependency-name: org.apache.maven.plugins:maven-clean-plugin dependency-version: 3.5.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4e105dfaf5..3fbdb98893 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,7 @@ 3.2.7 3.2.1 3.3.1 - 3.4.1 + 3.5.0 3.3.1 3.11.2 3.4.2 From f3cd8bbe78c84f2d2baf77228cbd73c92a097b55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 2 Jun 2025 16:53:46 +0200 Subject: [PATCH 39/54] Use AutoCloseable instead of JUnit closeable resource --- src/test/java/com/rabbitmq/stream/impl/TestUtils.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 59d88cf6aa..60aa2cc2a5 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -73,7 +73,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ExtensionContext.Namespace; -import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -736,7 +735,7 @@ private static Field field(Class cls, String name) { return field; } - private static class ExecutorServiceCloseableResourceWrapper implements CloseableResource { + private static class ExecutorServiceCloseableResourceWrapper implements AutoCloseable { private final ExecutorService executorService; From 4933d809d133b6ac2ede4e7e80b9f0febba9572d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 2 Jun 2025 16:53:55 +0200 Subject: [PATCH 40/54] Bump JUnit to 5.13.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3fbdb98893..cfbe040910 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ 1.5.7-3 1.8.0 1.1.10.7 - 5.12.2 + 5.13.0 3.27.3 5.18.0 5.25.0 From 126f4da8c052e8edba13cef9998fff7269fa60d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Mon, 2 Jun 2025 17:06:27 +0200 Subject: [PATCH 41/54] Set release version to 1.1.0 --- release-versions.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/release-versions.txt b/release-versions.txt index c3aa198d6e..067a873690 100644 --- a/release-versions.txt +++ b/release-versions.txt @@ -1,5 +1,5 @@ RELEASE_VERSION="1.1.0" -DEVELOPMENT_VERSION="1.1.0-SNAPSHOT" +DEVELOPMENT_VERSION="1.2.0-SNAPSHOT" RELEASE_BRANCH="main" LATEST=true From 1d09406460862afff6eab2749be7d27efa6ae36f Mon Sep 17 00:00:00 2001 From: rabbitmq-ci Date: Mon, 2 Jun 2025 15:27:55 +0000 Subject: [PATCH 42/54] [maven-release-plugin] prepare release v1.1.0 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index cfbe040910..6156a897ad 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.rabbitmq stream-client - 1.1.0-SNAPSHOT + 1.1.0 RabbitMQ Stream Java Client The RabbitMQ Stream Java client library allows Java applications to interface with @@ -43,7 +43,7 @@ https://github.com/rabbitmq/rabbitmq-stream-java-client scm:git:git://github.com/rabbitmq/rabbitmq-stream-java-client.git scm:git:https://github.com/rabbitmq/rabbitmq-stream-java-client.git - HEAD + v1.1.0 From 9ddd03d4808683ea62deed999e4848dc6ed55625 Mon Sep 17 00:00:00 2001 From: rabbitmq-ci Date: Mon, 2 Jun 2025 15:27:56 +0000 Subject: [PATCH 43/54] [maven-release-plugin] prepare for next development iteration --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 6156a897ad..43656ddc0a 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.rabbitmq stream-client - 1.1.0 + 1.2.0-SNAPSHOT RabbitMQ Stream Java Client The RabbitMQ Stream Java client library allows Java applications to interface with @@ -43,7 +43,7 @@ https://github.com/rabbitmq/rabbitmq-stream-java-client scm:git:git://github.com/rabbitmq/rabbitmq-stream-java-client.git scm:git:https://github.com/rabbitmq/rabbitmq-stream-java-client.git - v1.1.0 + HEAD From bd03605a05d571f9ab2819106f80118a816e768f Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 5 Jun 2025 16:40:46 +0000 Subject: [PATCH 44/54] Bump netty.version from 4.2.1.Final to 4.2.2.Final Bumps `netty.version` from 4.2.1.Final to 4.2.2.Final. Updates `io.netty:netty-transport` from 4.2.1.Final to 4.2.2.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.1.Final...netty-4.2.2.Final) Updates `io.netty:netty-codec` from 4.2.1.Final to 4.2.2.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.1.Final...netty-4.2.2.Final) Updates `io.netty:netty-handler` from 4.2.1.Final to 4.2.2.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.1.Final...netty-4.2.2.Final) Updates `io.netty:netty-transport-native-epoll` from 4.2.1.Final to 4.2.2.Final - [Commits](https://github.com/netty/netty/compare/netty-4.2.1.Final...netty-4.2.2.Final) --- updated-dependencies: - dependency-name: io.netty:netty-transport dependency-version: 4.2.2.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-codec dependency-version: 4.2.2.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-handler dependency-version: 4.2.2.Final dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: io.netty:netty-transport-native-epoll dependency-version: 4.2.2.Final dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 43656ddc0a..6b6a2d8159 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ true 1.7.36 1.2.13 - 4.2.1.Final + 4.2.2.Final 0.34.1 4.2.32 1.15.0 From 2a87e9937dffee3426871421cda0669746f3c771 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Jun 2025 17:05:50 +0000 Subject: [PATCH 45/54] Bump org.junit:junit-bom from 5.13.0 to 5.13.1 Bumps [org.junit:junit-bom](https://github.com/junit-team/junit5) from 5.13.0 to 5.13.1. - [Release notes](https://github.com/junit-team/junit5/releases) - [Commits](https://github.com/junit-team/junit5/compare/r5.13.0...r5.13.1) --- updated-dependencies: - dependency-name: org.junit:junit-bom dependency-version: 5.13.1 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6b6a2d8159..aa96ed72d2 100644 --- a/pom.xml +++ b/pom.xml @@ -60,7 +60,7 @@ 1.5.7-3 1.8.0 1.1.10.7 - 5.13.0 + 5.13.1 3.27.3 5.18.0 5.25.0 From f9a6cf5cb22d073adb924ac4580d98bb8a1571c5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 10 Jun 2025 16:57:28 +0000 Subject: [PATCH 46/54] Bump io.micrometer:micrometer-tracing-integration-test Bumps [io.micrometer:micrometer-tracing-integration-test](https://github.com/micrometer-metrics/tracing) from 1.5.0 to 1.5.1. - [Release notes](https://github.com/micrometer-metrics/tracing/releases) - [Commits](https://github.com/micrometer-metrics/tracing/compare/v1.5.0...v1.5.1) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-tracing-integration-test dependency-version: 1.5.1 dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index aa96ed72d2..9d4208f926 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ 2.13.1 0.10.6 1.2.5 - 1.5.0 + 1.5.1 1.0.4 3.14.0 3.5.3 From 3e31570ff6ba4bc4e68ca2dfce320d23691a4a37 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 10 Jun 2025 16:57:35 +0000 Subject: [PATCH 47/54] Bump io.micrometer:micrometer-core from 1.15.0 to 1.15.1 Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.15.0 to 1.15.1. - [Release notes](https://github.com/micrometer-metrics/micrometer/releases) - [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.15.0...v1.15.1) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-core dependency-version: 1.15.1 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index aa96ed72d2..de7d82cd8f 100644 --- a/pom.xml +++ b/pom.xml @@ -53,7 +53,7 @@ 4.2.2.Final 0.34.1 4.2.32 - 1.15.0 + 1.15.1 13.1.2 4.7.5 1.27.1 From 47f58c0b1a2789a53d4f7493e427db3fdfef31a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 12 Jun 2025 16:44:21 +0200 Subject: [PATCH 48/54] Configure project to publish to Central Portal OSSRH reaches EOL on June 30th, 2025. --- .github/workflows/publish-snapshot.yml | 6 +-- .github/workflows/release.yml | 35 ++------------ .github/workflows/test.yml | 6 +-- ci/evaluate-release.sh | 14 ------ pom.xml | 65 +++++--------------------- src/test/java/SanityCheck.java | 1 - 6 files changed, 23 insertions(+), 104 deletions(-) delete mode 100755 ci/evaluate-release.sh diff --git a/.github/workflows/publish-snapshot.yml b/.github/workflows/publish-snapshot.yml index 8d22f67352..a327ebae89 100644 --- a/.github/workflows/publish-snapshot.yml +++ b/.github/workflows/publish-snapshot.yml @@ -14,7 +14,7 @@ jobs: distribution: 'temurin' java-version: '21' cache: 'maven' - server-id: ossrh + server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} @@ -22,6 +22,6 @@ jobs: - name: Publish snapshot run: ./mvnw clean deploy -Psnapshots -DskipITs -DskipTests env: - MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} + MAVEN_USERNAME: ${{ secrets.CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.CENTRAL_TOKEN }} MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index bb1c9f9029..23b8163a46 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -17,51 +17,26 @@ jobs: distribution: 'temurin' java-version: '11' cache: 'maven' - server-id: ${{ env.maven_server_id }} + server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} gpg-passphrase: MAVEN_GPG_PASSPHRASE - - name: Release Stream Java Client (GA) - if: ${{ env.ga_release == 'true' }} + - name: Release Stream Java Client run: | git config user.name "rabbitmq-ci" git config user.email "rabbitmq-ci@users.noreply.github.com" ci/release-stream-java-client.sh env: - MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} + MAVEN_USERNAME: ${{ secrets.CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.CENTRAL_TOKEN }} MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} - - name: Release Stream Java Client (Milestone/RC) - if: ${{ env.ga_release != 'true' }} - run: | - git config user.name "rabbitmq-ci" - git config user.email "rabbitmq-ci@users.noreply.github.com" - ci/release-stream-java-client.sh - env: - MAVEN_USERNAME: '' - MAVEN_PASSWORD: ${{ secrets.PACKAGECLOUD_TOKEN }} - MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} - - name: Checkout tls-gen - uses: actions/checkout@v4 - with: - repository: rabbitmq/tls-gen - path: './tls-gen' - - name: Start broker - run: ci/start-broker.sh - - name: Set up JDK for sanity check and documentation generation + - name: Set up JDK for documentation generation uses: actions/setup-java@v4 with: distribution: 'temurin' java-version: '21' cache: 'maven' - - name: Sanity Check - run: | - source ./release-versions.txt - export RABBITMQ_LIBRARY_VERSION=$RELEASE_VERSION - curl -Ls https://sh.jbang.dev | bash -s - src/test/java/SanityCheck.java - - name: Stop broker - run: docker stop rabbitmq && docker rm rabbitmq - name: Publish Documentation run: | git config user.name "rabbitmq-ci" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 523b05cbf9..8011f560d5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -26,7 +26,7 @@ jobs: distribution: 'temurin' java-version: '21' cache: 'maven' - server-id: ossrh + server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} @@ -60,8 +60,8 @@ jobs: - name: Publish snapshot run: ./mvnw clean deploy -Psnapshots -DskipITs -DskipTests env: - MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} + MAVEN_USERNAME: ${{ secrets.CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.CENTRAL_TOKEN }} MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} - name: Publish Documentation run: | diff --git a/ci/evaluate-release.sh b/ci/evaluate-release.sh deleted file mode 100755 index 4ad656d7a0..0000000000 --- a/ci/evaluate-release.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env bash - -source ./release-versions.txt - -if [[ $RELEASE_VERSION == *[RCM]* ]] -then - echo "prerelease=true" >> $GITHUB_ENV - echo "ga_release=false" >> $GITHUB_ENV - echo "maven_server_id=packagecloud-rabbitmq-maven-milestones" >> $GITHUB_ENV -else - echo "prerelease=false" >> $GITHUB_ENV - echo "ga_release=true" >> $GITHUB_ENV - echo "maven_server_id=ossrh" >> $GITHUB_ENV -fi \ No newline at end of file diff --git a/pom.xml b/pom.xml index a9af3d2fb8..7f883871a1 100644 --- a/pom.xml +++ b/pom.xml @@ -99,8 +99,7 @@ 6026DFCA yyyy-MM-dd'T'HH:mm:ss'Z' UTF-8 - 0.0.6 - 1.7.0 + 0.7.0 true true @@ -613,14 +612,18 @@ + + org.sonatype.central + central-publishing-maven-plugin + ${central-publishing-maven-plugin.version} + true + + central + true + + + - - - io.packagecloud.maven.wagon - maven-packagecloud-wagon - ${maven.packagecloud.wagon.version} - - @@ -631,26 +634,6 @@ false false - - - ossrh - https://oss.sonatype.org/content/repositories/snapshots - - - - - - milestone - - false - false - - - - packagecloud-rabbitmq-maven-milestones - packagecloud+https://packagecloud.io/rabbitmq/maven-milestones - - @@ -659,29 +642,6 @@ false false - - - - - org.sonatype.plugins - nexus-staging-maven-plugin - ${nexus-staging-maven-plugin.version} - true - - ossrh - https://oss.sonatype.org/ - false - 20 - - - - - - - ossrh - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - jvm-test-arguments-below-java-21 @@ -702,7 +662,6 @@ - diff --git a/src/test/java/SanityCheck.java b/src/test/java/SanityCheck.java index 4e550c8bd2..7ee907180c 100755 --- a/src/test/java/SanityCheck.java +++ b/src/test/java/SanityCheck.java @@ -1,5 +1,4 @@ ///usr/bin/env jbang "$0" "$@" ; exit $? -//REPOS mavencentral,ossrh-staging=https://oss.sonatype.org/content/groups/staging/,rabbitmq-packagecloud-milestones=https://packagecloud.io/rabbitmq/maven-milestones/maven2 //DEPS com.rabbitmq:stream-client:${env.RABBITMQ_LIBRARY_VERSION} //DEPS org.slf4j:slf4j-simple:1.7.36 From fc83c4d57a1fef4b70735bd418d3f274767cece3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 12 Jun 2025 16:59:38 +0200 Subject: [PATCH 49/54] Remove usage of deleted script --- .github/workflows/release.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 23b8163a46..3430115816 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -9,8 +9,6 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Evaluate release type - run: ci/evaluate-release.sh - name: Set up JDK uses: actions/setup-java@v4 with: From f6b389e048ee0e9e0fc07644af73502a27a6626e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Thu, 12 Jun 2025 17:42:56 +0200 Subject: [PATCH 50/54] Use new snapshot repository in documentation --- src/docs/asciidoc/setup.adoc | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/docs/asciidoc/setup.adoc b/src/docs/asciidoc/setup.adoc index e9eaaa6df8..1db3aef926 100644 --- a/src/docs/asciidoc/setup.adoc +++ b/src/docs/asciidoc/setup.adoc @@ -139,8 +139,8 @@ With Maven: - ossrh - https://oss.sonatype.org/content/repositories/snapshots + central-portal-snapshots + https://central.sonatype.com/repository/maven-snapshots/ true false @@ -154,7 +154,14 @@ With Gradle: [source,groovy,subs="attributes,specialcharacters"] ---- repositories { - maven { url 'https://oss.sonatype.org/content/repositories/snapshots' } + maven { + name = 'Central Portal Snapshots' + url = 'https://central.sonatype.com/repository/maven-snapshots/' + // Only search this repository for the specific dependency + content { + includeModule("com.rabbitmq", "{project-artifact-id}") + } + } mavenCentral() } ---- From 3bcea64354af3ac90391037bb7bf0db90cfc4f56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 13 Jun 2025 09:35:27 +0200 Subject: [PATCH 51/54] Disable auto-publish --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7f883871a1..e01f48f3bf 100644 --- a/pom.xml +++ b/pom.xml @@ -619,7 +619,7 @@ true central - true + false From a067613ae4fee4a159816b4686d13093d47490bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Fri, 13 Jun 2025 16:34:52 +0200 Subject: [PATCH 52/54] Consider max unconfirmed messages in dynamic batch configuration A small value for max unconfirmed messages can impact the dynamic batch mechanism. This commit sets the min batch size to half the max unconfirmed messages value if it is less than the configured batch size. References #757 --- .../rabbitmq/stream/impl/DynamicBatch.java | 19 ++++++++++++------- .../impl/DynamicBatchMessageAccumulator.java | 7 +++++-- .../rabbitmq/stream/impl/ProducerUtils.java | 2 ++ .../rabbitmq/stream/impl/StreamProducer.java | 1 + .../stream/impl/DynamicBatchTest.java | 6 +++--- 5 files changed, 23 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index 37494535b6..3c288d5428 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -28,17 +28,22 @@ final class DynamicBatch implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class); - private static final int MIN_BATCH_SIZE = 32; - private static final int MAX_BATCH_SIZE = 8192; + private static final int MIN_BATCH_SIZE = 16; private final BlockingQueue requests = new LinkedBlockingQueue<>(); private final BatchConsumer consumer; - private final int configuredBatchSize; + private final int configuredBatchSize, minBatchSize, maxBatchSize; private final Thread thread; - DynamicBatch(BatchConsumer consumer, int batchSize) { + DynamicBatch(BatchConsumer consumer, int batchSize, int maxUnconfirmed) { this.consumer = consumer; - this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); + if (batchSize < maxUnconfirmed) { + this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2); + } else { + this.minBatchSize = min(1, maxUnconfirmed / 2); + } + this.configuredBatchSize = batchSize; + this.maxBatchSize = batchSize * 2; this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop); this.thread.start(); } @@ -104,9 +109,9 @@ private void maybeCompleteBatch(State state, boolean increaseIfCompleted) { boolean completed = this.consumer.process(state.items); if (completed) { if (increaseIfCompleted) { - state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE); + state.batchSize = min(state.batchSize * 2, this.maxBatchSize); } else { - state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE); + state.batchSize = max(state.batchSize / 2, this.minBatchSize); } state.items = new ArrayList<>(state.batchSize); } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java index ee8c397e13..8c763cde86 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -38,6 +38,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { DynamicBatchMessageAccumulator( int subEntrySize, int batchSize, + int maxUnconfirmedMessages, Codec codec, int maxFrameSize, ToLongFunction publishSequenceFunction, @@ -75,7 +76,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { } return result; }, - batchSize); + batchSize, + maxUnconfirmedMessages); } else { byte compressionCode = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); @@ -124,7 +126,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { } return result; }, - batchSize * subEntrySize); + batchSize * subEntrySize, + maxUnconfirmedMessages); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java index 5ae8faa7dd..691fc65c57 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java @@ -30,6 +30,7 @@ static MessageAccumulator createMessageAccumulator( boolean dynamicBatch, int subEntrySize, int batchSize, + int maxUnconfirmedMessages, CompressionCodec compressionCodec, Codec codec, ByteBufAllocator byteBufAllocator, @@ -44,6 +45,7 @@ static MessageAccumulator createMessageAccumulator( return new DynamicBatchMessageAccumulator( subEntrySize, batchSize, + maxUnconfirmedMessages, codec, maxFrameSize, publishSequenceFunction, diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 27552512c8..fd3cb8f25d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -180,6 +180,7 @@ public int fragmentLength(Object entity) { dynamicBatch, subEntrySize, batchSize, + maxUnconfirmedMessages, compressionCodec, environment.codec(), environment.byteBufAllocator(), diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java index 50698320f8..07a2877385 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -71,7 +71,7 @@ void itemAreProcessed() { sync.down(items.size()); return true; }; - try (DynamicBatch batch = new DynamicBatch<>(action, 100)) { + try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) { RateLimiter rateLimiter = RateLimiter.create(10000); IntStream.range(0, itemCount) .forEach( @@ -102,7 +102,7 @@ void failedProcessingIsReplayed() throws Exception { } return result; }; - try (DynamicBatch batch = new DynamicBatch<>(action, 100)) { + try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) { int firstRoundCount = itemCount / 5; IntStream.range(0, firstRoundCount) .forEach( @@ -132,7 +132,7 @@ void lowThrottlingValueShouldStillHighPublishingRate() throws Exception { return true; }; - try (DynamicBatch batch = new DynamicBatch<>(action, batchSize)) { + try (DynamicBatch batch = new DynamicBatch<>(action, batchSize, 10_000)) { MetricRegistry metrics = new MetricRegistry(); Meter rate = metrics.meter("publishing-rate"); AtomicBoolean keepGoing = new AtomicBoolean(true); From 9e16341d6e9f03c2e01dfa0a2aebc809b309fd1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= <514737+acogoluegnes@users.noreply.github.com> Date: Wed, 11 Jun 2025 15:51:13 +0200 Subject: [PATCH 53/54] Add SAC test against cluster References rabbitmq/rabbitmq-server#13672 --- .../stream/impl/StreamProducerBuilder.java | 6 +- src/test/java/com/rabbitmq/stream/Cli.java | 48 +++++ .../stream/impl/RecoveryClusterTest.java | 197 ++++++++++++++++-- .../com/rabbitmq/stream/impl/TestUtils.java | 3 +- 4 files changed, 234 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 43a57bbc5c..b44290868c 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -28,8 +28,8 @@ class StreamProducerBuilder implements ProducerBuilder { - static final boolean DEFAULT_DYNAMIC_BATCH = true; -// Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true")); + static final boolean DEFAULT_DYNAMIC_BATCH = + Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true")); private final StreamEnvironment environment; @@ -201,7 +201,7 @@ public Producer build() { if (this.routingConfiguration == null && this.superStream != null) { throw new IllegalArgumentException( - "A routing configuration must specified when a super stream is set"); + "A routing configuration must be specified when a super stream is set"); } if (this.stream != null) { diff --git a/src/test/java/com/rabbitmq/stream/Cli.java b/src/test/java/com/rabbitmq/stream/Cli.java index 7097725af4..5c583a461b 100644 --- a/src/test/java/com/rabbitmq/stream/Cli.java +++ b/src/test/java/com/rabbitmq/stream/Cli.java @@ -178,6 +178,30 @@ static List toConnectionInfoList(String json) { return GSON.fromJson(json, new TypeToken>() {}.getType()); } + public static List listGroupConsumers(String stream, String reference) { + ProcessState process = + rabbitmqStreams( + format( + "list_stream_group_consumers -q --stream %s --reference %s " + + "--formatter table subscription_id,state", + stream, reference)); + + List itemList = Collections.emptyList(); + String content = process.output(); + String[] lines = content.split(System.lineSeparator()); + if (lines.length > 1) { + itemList = new ArrayList<>(lines.length - 1); + for (int i = 1; i < lines.length; i++) { + String line = lines[i]; + String[] fields = line.split("\t"); + String id = fields[0]; + String state = fields[1].replace("\"", ""); + itemList.add(new SubscriptionInfo(Integer.parseInt(id), state)); + } + } + return itemList; + } + public static void restartStream(String stream) { rabbitmqStreams(" restart_stream " + stream); } @@ -420,6 +444,30 @@ public String toString() { } } + public static final class SubscriptionInfo { + + private final int id; + private final String state; + + public SubscriptionInfo(int id, String state) { + this.id = id; + this.state = state; + } + + public int id() { + return this.id; + } + + public String state() { + return this.state; + } + + @Override + public String toString() { + return "SubscriptionInfo{id='" + id + '\'' + ", state='" + state + '\'' + '}'; + } + } + public static class ProcessState { private final InputStreamPumpState inputState; diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index 24f50ee0cd..e234d723c4 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -16,18 +16,22 @@ import static com.rabbitmq.stream.impl.Assertions.assertThat; import static com.rabbitmq.stream.impl.LoadBalancerClusterTest.LOAD_BALANCER_ADDRESS; +import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_1_2; import static com.rabbitmq.stream.impl.TestUtils.newLoggerLevel; import static com.rabbitmq.stream.impl.TestUtils.sync; +import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; import static com.rabbitmq.stream.impl.ThreadUtils.threadFactory; import static com.rabbitmq.stream.impl.Tuples.pair; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.stream; import ch.qos.logback.classic.Level; import com.google.common.collect.Streams; import com.google.common.util.concurrent.RateLimiter; import com.rabbitmq.stream.*; +import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster; import com.rabbitmq.stream.impl.TestUtils.Sync; import com.rabbitmq.stream.impl.Tuples.Pair; @@ -41,15 +45,20 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -201,15 +210,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru syncs = consumers.stream().map(c -> c.waitForNewMessages(100)).collect(toList()); syncs.forEach(s -> assertThat(s).completes()); - nodes.forEach( - n -> { - LOGGER.info("Restarting node {}...", n); - Cli.restartNode(n); - LOGGER.info("Restarted node {}.", n); - }); - LOGGER.info("Rebalancing..."); - Cli.rebalance(); - LOGGER.info("Rebalancing over."); + restartCluster(); Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis()); @@ -291,8 +292,132 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @BrokerVersionAtLeast(RABBITMQ_4_1_2) + void sacWithClusterRestart(boolean superStream) throws Exception { + environment = + environmentBuilder + .uris(URIS) + .netty() + .bootstrapCustomizer( + b -> { + b.option( + ChannelOption.CONNECT_TIMEOUT_MILLIS, + (int) BACK_OFF_DELAY_POLICY.delay(0).toMillis()); + }) + .environmentBuilder() + .maxConsumersByConnection(1) + .build(); + + int consumerCount = 3; + AtomicLong lastOffset = new AtomicLong(0); + String app = "app-name"; + String s = TestUtils.streamName(testInfo); + ProducerState pState = null; + List consumers = Collections.emptyList(); + try { + StreamCreator sCreator = environment.streamCreator().stream(s); + if (superStream) { + sCreator = sCreator.superStream().partitions(1).creator(); + } + sCreator.create(); + + pState = new ProducerState(s, true, superStream, environment); + pState.start(); + + Map consumerStatus = new ConcurrentHashMap<>(); + consumers = + IntStream.range(0, consumerCount) + .mapToObj( + i -> + new ConsumerState( + s, + environment, + b -> { + b.singleActiveConsumer() + .name(app) + .noTrackingStrategy() + .consumerUpdateListener( + ctx -> { + consumerStatus.put(i, ctx.isActive()); + return OffsetSpecification.offset(lastOffset.get()); + }); + if (superStream) { + b.superStream(s); + } else { + b.stream(s); + } + }, + (ctx, m) -> lastOffset.set(ctx.offset()))) + .collect(toList()); + + Sync sync = pState.waitForNewMessages(100); + assertThat(sync).completes(); + sync = consumers.get(0).waitForNewMessages(100); + assertThat(sync).completes(); + + String streamArg = superStream ? s + "-0" : s; + + Callable checkConsumers = + () -> { + waitAtMost( + () -> { + List subscriptions = Cli.listGroupConsumers(streamArg, app); + LOGGER.info("Group consumers: {}", subscriptions); + return subscriptions.size() == consumerCount + && subscriptions.stream() + .filter(sub -> sub.state().startsWith("active")) + .count() + == 1 + && subscriptions.stream() + .filter(sub -> sub.state().startsWith("waiting")) + .count() + == 2; + }, + () -> + "Group consumers not in expected state: " + + Cli.listGroupConsumers(streamArg, app)); + return null; + }; + + checkConsumers.call(); + + restartCluster(); + + Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis()); + + sync = pState.waitForNewMessages(100); + assertThat(sync).completes(ASSERTION_TIMEOUT); + int activeIndex = + consumerStatus.entrySet().stream() + .filter(Map.Entry::getValue) + .map(Map.Entry::getKey) + .findFirst() + .orElseThrow(() -> new IllegalStateException("No active consumer found")); + + sync = consumers.get(activeIndex).waitForNewMessages(100); + assertThat(sync).completes(ASSERTION_TIMEOUT); + + checkConsumers.call(); + + } finally { + if (pState != null) { + pState.close(); + } + consumers.forEach(ConsumerState::close); + if (superStream) { + environment.deleteSuperStream(s); + } else { + environment.deleteStream(s); + } + } + } + private static class ProducerState implements AutoCloseable { + private static final AtomicLong MSG_ID_SEQ = new AtomicLong(0); + private static final byte[] BODY = "hello".getBytes(StandardCharsets.UTF_8); private final String stream; @@ -306,9 +431,19 @@ private static class ProducerState implements AutoCloseable { final AtomicReference lastExceptionInstant = new AtomicReference<>(); private ProducerState(String stream, boolean dynamicBatch, Environment environment) { + this(stream, dynamicBatch, false, environment); + } + + private ProducerState( + String stream, boolean dynamicBatch, boolean superStream, Environment environment) { this.stream = stream; - this.producer = - environment.producerBuilder().stream(stream).dynamicBatch(dynamicBatch).build(); + ProducerBuilder builder = environment.producerBuilder().dynamicBatch(dynamicBatch); + if (superStream) { + builder.superStream(stream).routing(m -> m.getProperties().getMessageIdAsString()); + } else { + builder.stream(stream); + } + this.producer = builder.build(); } void start() { @@ -327,7 +462,14 @@ void start() { try { this.limiter.acquire(1); this.producer.send( - producer.messageBuilder().addData(BODY).build(), confirmationHandler); + producer + .messageBuilder() + .properties() + .messageId(MSG_ID_SEQ.getAndIncrement()) + .messageBuilder() + .addData(BODY) + .build(), + confirmationHandler); } catch (Throwable e) { this.lastException.set(e); this.lastExceptionInstant.set(Instant.now()); @@ -380,16 +522,27 @@ private static class ConsumerState implements AutoCloseable { final AtomicReference postHandle = new AtomicReference<>(() -> {}); private ConsumerState(String stream, Environment environment) { + this(stream, environment, b -> b.stream(stream), (ctx, m) -> {}); + } + + private ConsumerState( + String stream, + Environment environment, + java.util.function.Consumer customizer, + MessageHandler delegateHandler) { this.stream = stream; - this.consumer = - environment.consumerBuilder().stream(stream) + ConsumerBuilder builder = + environment + .consumerBuilder() .offset(OffsetSpecification.first()) .messageHandler( (ctx, m) -> { + delegateHandler.handle(ctx, m); receivedCount.incrementAndGet(); postHandle.get().run(); - }) - .build(); + }); + customizer.accept(builder); + this.consumer = builder.build(); } Sync waitForNewMessages(int messageCount) { @@ -414,4 +567,16 @@ public void close() { this.consumer.close(); } } + + private static void restartCluster() { + nodes.forEach( + n -> { + LOGGER.info("Restarting node {}...", n); + Cli.restartNode(n); + LOGGER.info("Restarted node {}.", n); + }); + LOGGER.info("Rebalancing..."); + Cli.rebalance(); + LOGGER.info("Rebalancing over."); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 60aa2cc2a5..52214838b8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -1064,7 +1064,8 @@ public enum BrokerVersion { RABBITMQ_3_11_11("3.11.11"), RABBITMQ_3_11_14("3.11.14"), RABBITMQ_3_13_0("3.13.0"), - RABBITMQ_4_0_0("4.0.0"); + RABBITMQ_4_0_0("4.0.0"), + RABBITMQ_4_1_2("4.1.2"); final String value; From 50ab623bda3edd756c4319207b2970f6a8eb05e2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 18 Jun 2025 16:12:16 +0000 Subject: [PATCH 54/54] Bump org.sonatype.central:central-publishing-maven-plugin Bumps [org.sonatype.central:central-publishing-maven-plugin](https://github.com/sonatype/central-publishing-maven-plugin) from 0.7.0 to 0.8.0. - [Commits](https://github.com/sonatype/central-publishing-maven-plugin/commits) --- updated-dependencies: - dependency-name: org.sonatype.central:central-publishing-maven-plugin dependency-version: 0.8.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index e01f48f3bf..9b0a6102db 100644 --- a/pom.xml +++ b/pom.xml @@ -99,7 +99,7 @@ 6026DFCA yyyy-MM-dd'T'HH:mm:ss'Z' UTF-8 - 0.7.0 + 0.8.0 true true