diff --git a/.github/workflows/publish-snapshot.yml b/.github/workflows/publish-snapshot.yml index 8d22f67352..a327ebae89 100644 --- a/.github/workflows/publish-snapshot.yml +++ b/.github/workflows/publish-snapshot.yml @@ -14,7 +14,7 @@ jobs: distribution: 'temurin' java-version: '21' cache: 'maven' - server-id: ossrh + server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} @@ -22,6 +22,6 @@ jobs: - name: Publish snapshot run: ./mvnw clean deploy -Psnapshots -DskipITs -DskipTests env: - MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} + MAVEN_USERNAME: ${{ secrets.CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.CENTRAL_TOKEN }} MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index bb1c9f9029..3430115816 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -9,59 +9,32 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Evaluate release type - run: ci/evaluate-release.sh - name: Set up JDK uses: actions/setup-java@v4 with: distribution: 'temurin' java-version: '11' cache: 'maven' - server-id: ${{ env.maven_server_id }} + server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} gpg-passphrase: MAVEN_GPG_PASSPHRASE - - name: Release Stream Java Client (GA) - if: ${{ env.ga_release == 'true' }} + - name: Release Stream Java Client run: | git config user.name "rabbitmq-ci" git config user.email "rabbitmq-ci@users.noreply.github.com" ci/release-stream-java-client.sh env: - MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} + MAVEN_USERNAME: ${{ secrets.CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.CENTRAL_TOKEN }} MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} - - name: Release Stream Java Client (Milestone/RC) - if: ${{ env.ga_release != 'true' }} - run: | - git config user.name "rabbitmq-ci" - git config user.email "rabbitmq-ci@users.noreply.github.com" - ci/release-stream-java-client.sh - env: - MAVEN_USERNAME: '' - MAVEN_PASSWORD: ${{ secrets.PACKAGECLOUD_TOKEN }} - MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} - - name: Checkout tls-gen - uses: actions/checkout@v4 - with: - repository: rabbitmq/tls-gen - path: './tls-gen' - - name: Start broker - run: ci/start-broker.sh - - name: Set up JDK for sanity check and documentation generation + - name: Set up JDK for documentation generation uses: actions/setup-java@v4 with: distribution: 'temurin' java-version: '21' cache: 'maven' - - name: Sanity Check - run: | - source ./release-versions.txt - export RABBITMQ_LIBRARY_VERSION=$RELEASE_VERSION - curl -Ls https://sh.jbang.dev | bash -s - src/test/java/SanityCheck.java - - name: Stop broker - run: docker stop rabbitmq && docker rm rabbitmq - name: Publish Documentation run: | git config user.name "rabbitmq-ci" diff --git a/.github/workflows/test-rabbitmq-alphas.yml b/.github/workflows/test-rabbitmq-alphas.yml index 0414fce497..6b8e9e32c6 100644 --- a/.github/workflows/test-rabbitmq-alphas.yml +++ b/.github/workflows/test-rabbitmq-alphas.yml @@ -14,7 +14,6 @@ jobs: strategy: matrix: rabbitmq-image: - - pivotalrabbitmq/rabbitmq:v4.0.x-otp27 - pivotalrabbitmq/rabbitmq:v4.1.x-otp27 - pivotalrabbitmq/rabbitmq:main-otp27 name: Test against ${{ matrix.rabbitmq-image }} diff --git a/.github/workflows/test-supported-java-versions.yml b/.github/workflows/test-supported-java-versions.yml index f7694cdc18..f2ac5a1289 100644 --- a/.github/workflows/test-supported-java-versions.yml +++ b/.github/workflows/test-supported-java-versions.yml @@ -11,7 +11,7 @@ jobs: strategy: matrix: distribution: [ 'temurin' ] - version: [ '11', '17', '21', '23', '24-ea', '25-ea' ] + version: [ '11', '17', '21', '24', '25-ea' ] include: - distribution: 'semeru' version: '17' diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 523b05cbf9..8011f560d5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -26,7 +26,7 @@ jobs: distribution: 'temurin' java-version: '21' cache: 'maven' - server-id: ossrh + server-id: central server-username: MAVEN_USERNAME server-password: MAVEN_PASSWORD gpg-private-key: ${{ secrets.MAVEN_GPG_PRIVATE_KEY }} @@ -60,8 +60,8 @@ jobs: - name: Publish snapshot run: ./mvnw clean deploy -Psnapshots -DskipITs -DskipTests env: - MAVEN_USERNAME: ${{ secrets.OSSRH_USERNAME }} - MAVEN_PASSWORD: ${{ secrets.OSSRH_TOKEN }} + MAVEN_USERNAME: ${{ secrets.CENTRAL_USERNAME }} + MAVEN_PASSWORD: ${{ secrets.CENTRAL_TOKEN }} MAVEN_GPG_PASSPHRASE: ${{ secrets.MAVEN_GPG_PASSPHRASE }} - name: Publish Documentation run: | diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 4deefa0459..0000000000 --- a/Dockerfile +++ /dev/null @@ -1,96 +0,0 @@ -FROM ubuntu:22.04 as builder - -ARG stream_perf_test_url="set-url-here" - -RUN set -eux; \ - \ - apt-get update; \ - apt-get -y upgrade; \ - apt-get install --yes --no-install-recommends \ - ca-certificates \ - wget \ - gnupg \ - jq - -ARG JAVA_VERSION="21" - -RUN if [ "$(uname -m)" = "aarch64" ] || [ "$(uname -m)" = "arm64" ]; then echo "ARM"; ARCH="arm"; BUNDLE="jdk"; else echo "x86"; ARCH="x86"; BUNDLE="jdk"; fi \ - && wget "https://api.azul.com/zulu/download/community/v1.0/bundles/latest/?java_version=$JAVA_VERSION&ext=tar.gz&os=linux&arch=$ARCH&hw_bitness=64&release_status=ga&bundle_type=$BUNDLE" -O jdk-info.json -RUN wget --progress=bar:force:noscroll -O "jdk.tar.gz" $(cat jdk-info.json | jq --raw-output .url) -RUN echo "$(cat jdk-info.json | jq --raw-output .sha256_hash) *jdk.tar.gz" | sha256sum --check --strict - - -RUN set -eux; \ - if [ "$(uname -m)" = "x86_64" ] ; then JAVA_PATH="/usr/lib/jdk-$JAVA_VERSION"; \ - mkdir $JAVA_PATH && \ - tar --extract --file jdk.tar.gz --directory "$JAVA_PATH" --strip-components 1; \ - $JAVA_PATH/bin/jlink --compress=2 --output /jre --add-modules java.base,jdk.management,java.naming,java.xml,jdk.unsupported,jdk.crypto.cryptoki,jdk.httpserver; \ - /jre/bin/java -version; \ - fi - -RUN set -eux; \ - if [ "$(uname -m)" = "aarch64" ] || [ "$(uname -m)" = "arm64" ] ; then JAVA_PATH="/jre"; \ - mkdir $JAVA_PATH && \ - tar --extract --file jdk.tar.gz --directory "$JAVA_PATH" --strip-components 1; \ - fi - -# pgpkeys.uk is quite reliable, but allow for substitutions locally -ARG PGP_KEYSERVER=hkps://keys.openpgp.org -# If you are building this image locally and are getting `gpg: keyserver receive failed: No data` errors, -# run the build with a different PGP_KEYSERVER, e.g. docker build --tag rabbitmq:3.7 --build-arg PGP_KEYSERVER=pgpkeys.eu 3.7/ubuntu -# For context, see https://github.com/docker-library/official-images/issues/4252 - -# https://www.rabbitmq.com/signatures.html#importing-gpg -ENV RABBITMQ_PGP_KEY_ID="0x0A9AF2115F4687BD29803A206B73A36E6026DFCA" -ENV STREAM_PERF_TEST_HOME="/stream_perf_test" - -RUN set -eux; \ - \ - wget --progress dot:giga --output-document "/usr/local/src/stream-perf-test.jar.asc" "$stream_perf_test_url.asc"; \ - wget --progress dot:giga --output-document "/usr/local/src/stream-perf-test.jar" "$stream_perf_test_url"; \ - STREAM_PERF_TEST_SHA256="$(wget -qO- $stream_perf_test_url.sha256)"; \ - echo "$STREAM_PERF_TEST_SHA256 /usr/local/src/stream-perf-test.jar" | sha256sum --check --strict -; \ - \ - export GNUPGHOME="$(mktemp -d)"; \ - gpg --batch --keyserver "$PGP_KEYSERVER" --recv-keys "$RABBITMQ_PGP_KEY_ID"; \ - gpg --batch --verify "/usr/local/src/stream-perf-test.jar.asc" "/usr/local/src/stream-perf-test.jar"; \ - gpgconf --kill all; \ - rm -rf "$GNUPGHOME"; \ - \ - mkdir -p "$STREAM_PERF_TEST_HOME"; \ - cp /usr/local/src/stream-perf-test.jar $STREAM_PERF_TEST_HOME/stream-perf-test.jar - -FROM ubuntu:22.04 - -# we need locales support for characters like ยต to show up correctly in the console -RUN set -eux; \ - apt-get update; \ - apt-get -y upgrade; \ - apt-get install -y --no-install-recommends \ - locales \ - wget \ - ; \ - rm -rf /var/lib/apt/lists/*; \ - locale-gen en_US.UTF-8 - -ENV LANG en_US.UTF-8 -ENV LANGUAGE en_US:en -ENV LC_ALL en_US.UTF-8 - -ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk/jre -RUN mkdir -p $JAVA_HOME -COPY --from=builder /jre $JAVA_HOME/ -RUN ln -svT $JAVA_HOME/bin/java /usr/local/bin/java - -RUN mkdir -p /stream_perf_test -WORKDIR /stream_perf_test -COPY --from=builder /stream_perf_test ./ -RUN set -eux; \ - if [ "$(uname -m)" = "x86_64" ] ; then java -jar stream-perf-test.jar --help ; \ - fi - -RUN groupadd --gid 1000 stream-perf-test -RUN useradd --uid 1000 --gid stream-perf-test --comment "perf-test user" stream-perf-test - -USER stream-perf-test:stream-perf-test - -ENTRYPOINT ["java", "-Dio.netty.processId=1", "-jar", "stream-perf-test.jar"] diff --git a/README.adoc b/README.adoc index 95445f6e73..2a96cd7585 100644 --- a/README.adoc +++ b/README.adoc @@ -16,8 +16,7 @@ Please refer to the https://rabbitmq.github.io/rabbitmq-stream-java-client/stabl == Project Maturity -The project is in development and stabilization phase. -Features and API are subject to change, but https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#stability-of-programming-interfaces[breaking changes] will be kept to a minimum. +The library is stable and production-ready. == Support @@ -51,17 +50,7 @@ This library requires at least Java 11, but Java 21 or more is recommended. == Versioning -The RabbitMQ Stream Java Client is in development and stabilization phase. -When the stabilization phase ends, a 1.0.0 version will be cut, and -https://semver.org/[semantic versioning] is likely to be enforced. - -Before reaching the stable phase, the client will use a versioning scheme of `[0.MINOR.PATCH]` where: - -* `0` indicates the project is still in a stabilization phase. -* `MINOR` is a 0-based number incrementing with each new release cycle. It generally reflects significant changes like new features and potentially some programming interfaces changes. -* `PATCH` is a 0-based number incrementing with each service release, that is bux fixes. - -Breaking changes between releases can happen but will be kept to a minimum. +This library uses https://semver.org/[semantic versioning]. == Build Instructions @@ -80,7 +69,7 @@ Launch the broker: ---- docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 \ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \ - rabbitmq:4.0 + rabbitmq:4.1 ---- Enable the stream plugin: diff --git a/ci/cluster/docker-compose.yml b/ci/cluster/docker-compose.yml index 39345d3e8d..40a6860ee1 100644 --- a/ci/cluster/docker-compose.yml +++ b/ci/cluster/docker-compose.yml @@ -6,7 +6,7 @@ services: - rabbitmq-cluster hostname: node0 container_name: rabbitmq0 - image: ${RABBITMQ_IMAGE:-rabbitmq:4.0} + image: ${RABBITMQ_IMAGE:-rabbitmq:4.1} pull_policy: always ports: - "5672:5672" @@ -22,7 +22,7 @@ services: - rabbitmq-cluster hostname: node1 container_name: rabbitmq1 - image: ${RABBITMQ_IMAGE:-rabbitmq:4.0} + image: ${RABBITMQ_IMAGE:-rabbitmq:4.1} pull_policy: always ports: - "5673:5672" @@ -38,7 +38,7 @@ services: - rabbitmq-cluster hostname: node2 container_name: rabbitmq2 - image: ${RABBITMQ_IMAGE:-rabbitmq:4.0} + image: ${RABBITMQ_IMAGE:-rabbitmq:4.1} pull_policy: always ports: - "5674:5672" diff --git a/ci/evaluate-release.sh b/ci/evaluate-release.sh deleted file mode 100755 index 4ad656d7a0..0000000000 --- a/ci/evaluate-release.sh +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env bash - -source ./release-versions.txt - -if [[ $RELEASE_VERSION == *[RCM]* ]] -then - echo "prerelease=true" >> $GITHUB_ENV - echo "ga_release=false" >> $GITHUB_ENV - echo "maven_server_id=packagecloud-rabbitmq-maven-milestones" >> $GITHUB_ENV -else - echo "prerelease=false" >> $GITHUB_ENV - echo "ga_release=true" >> $GITHUB_ENV - echo "maven_server_id=ossrh" >> $GITHUB_ENV -fi \ No newline at end of file diff --git a/ci/start-broker.sh b/ci/start-broker.sh index a45909c7bf..38c60cc1b9 100755 --- a/ci/start-broker.sh +++ b/ci/start-broker.sh @@ -2,7 +2,7 @@ LOCAL_SCRIPT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.0} +RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.1} wait_for_message() { while ! docker logs "$1" | grep -q "$2"; diff --git a/ci/start-cluster.sh b/ci/start-cluster.sh index c7a5a9f321..b8440984fb 100755 --- a/ci/start-cluster.sh +++ b/ci/start-cluster.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -export RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.0} +export RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-rabbitmq:4.1} wait_for_message() { while ! docker logs "$1" | grep -q "$2"; diff --git a/pom.xml b/pom.xml index 84d285b149..9b0a6102db 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.rabbitmq stream-client - 0.24.0 + 1.2.0-SNAPSHOT RabbitMQ Stream Java Client The RabbitMQ Stream Java client library allows Java applications to interface with @@ -43,33 +43,33 @@ https://github.com/rabbitmq/rabbitmq-stream-java-client scm:git:git://github.com/rabbitmq/rabbitmq-stream-java-client.git scm:git:https://github.com/rabbitmq/rabbitmq-stream-java-client.git - v0.24.0 + HEAD true 1.7.36 1.2.13 - 4.2.0.Final + 4.2.2.Final 0.34.1 - 4.2.30 - 1.14.5 - 13.1.1 + 4.2.32 + 1.15.1 + 13.1.2 4.7.5 1.27.1 - 1.5.7-2 + 1.5.7-3 1.8.0 1.1.10.7 - 5.12.1 + 5.13.1 3.27.3 - 5.17.0 + 5.18.0 5.25.0 3.17.0 1.18.0 - 2.12.1 + 2.13.1 0.10.6 1.2.5 - 1.4.4 + 1.5.1 1.0.4 3.14.0 3.5.3 @@ -78,7 +78,7 @@ 3.2.7 3.2.1 3.3.1 - 3.4.1 + 3.5.0 3.3.1 3.11.2 3.4.2 @@ -88,19 +88,18 @@ 2.3.2 3.2.1 1.37 - 2.44.4 - 1.26.0 + 2.44.5 + 1.27.0 0.8.13 4.9.3.0 4.9.3 - 4.0 + 4.1 6026DFCA yyyy-MM-dd'T'HH:mm:ss'Z' UTF-8 - 0.0.6 - 1.7.0 + 0.8.0 true true @@ -613,14 +612,18 @@ + + org.sonatype.central + central-publishing-maven-plugin + ${central-publishing-maven-plugin.version} + true + + central + false + + + - - - io.packagecloud.maven.wagon - maven-packagecloud-wagon - ${maven.packagecloud.wagon.version} - - @@ -631,26 +634,6 @@ false false - - - ossrh - https://oss.sonatype.org/content/repositories/snapshots - - - - - - milestone - - false - false - - - - packagecloud-rabbitmq-maven-milestones - packagecloud+https://packagecloud.io/rabbitmq/maven-milestones - - @@ -659,29 +642,6 @@ false false - - - - - org.sonatype.plugins - nexus-staging-maven-plugin - ${nexus-staging-maven-plugin.version} - true - - ossrh - https://oss.sonatype.org/ - false - 20 - - - - - - - ossrh - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - jvm-test-arguments-below-java-21 @@ -702,7 +662,6 @@ - diff --git a/release-versions.txt b/release-versions.txt index 7e4e8269f6..067a873690 100644 --- a/release-versions.txt +++ b/release-versions.txt @@ -1,5 +1,5 @@ -RELEASE_VERSION="0.24.0" -DEVELOPMENT_VERSION="1.0.0-SNAPSHOT" +RELEASE_VERSION="1.1.0" +DEVELOPMENT_VERSION="1.2.0-SNAPSHOT" RELEASE_BRANCH="main" LATEST=true diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index f4c5d0be9a..75a3d0da76 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -263,7 +263,7 @@ It is the developer's responsibility to close the `EventLoopGroup` they provide. |`netty#ByteBufAllocator` |`ByteBuf` allocator. -|ByteBufAllocator.DEFAULT +|PooledByteBufAllocator.DEFAULT |`netty#channelCustomizer` |Extension point to customize Netty's `Channel` instances used for connections. diff --git a/src/docs/asciidoc/overview.adoc b/src/docs/asciidoc/overview.adoc index 4e1f736a02..6040e03172 100644 --- a/src/docs/asciidoc/overview.adoc +++ b/src/docs/asciidoc/overview.adoc @@ -75,30 +75,19 @@ recovery and automatic re-subscription for consumers. == Versioning -The RabbitMQ Stream Java Client is in development and stabilization phase. -When the stabilization phase ends, a 1.0.0 version will be cut, and -https://semver.org/[semantic versioning] is likely to be enforced. +This library uses https://semver.org/[semantic versioning]. -Before reaching the stable phase, the client will use a versioning scheme of `[0.MINOR.PATCH]` where: - -* `0` indicates the project is still in a stabilization phase. -* `MINOR` is a 0-based number incrementing with each new release cycle. It generally reflects significant changes like new features and potentially some programming interfaces changes. -* `PATCH` is a 0-based number incrementing with each service release, that is bux fixes. - -Breaking changes between releases can happen but will be kept to a minimum. The next section provides more details about the evolution of programming interfaces. [[stability-of-programming-interfaces]] == Stability of Programming Interfaces -The RabbitMQ Stream Java Client is in active development but its programming interfaces will remain as stable as possible. -There is no guarantee though that they will remain completely stable, at least until it reaches version 1.0.0. - The client contains 2 sets of programming interfaces whose stability are of interest for application developers: * Application Programming Interfaces (API): those are the ones used to write application logic. They include the interfaces and classes in the `com.rabbitmq.stream` package (e.g. `Producer`, `Consumer`, `Message`). -These API constitute the main programming model of the client and will be kept as stable as possible. +These API constitute the main programming model of the client and are kept as stable as possible. +New features may require to add methods to existing interfaces. * Service Provider Interfaces (SPI): those are interfaces to implement mainly technical behavior in the client. They are not meant to be used to implement application logic. Application developers may have to refer to them in the configuration phase and if they want to customize some internal behavior of the client. diff --git a/src/docs/asciidoc/setup.adoc b/src/docs/asciidoc/setup.adoc index e9eaaa6df8..1db3aef926 100644 --- a/src/docs/asciidoc/setup.adoc +++ b/src/docs/asciidoc/setup.adoc @@ -139,8 +139,8 @@ With Maven: - ossrh - https://oss.sonatype.org/content/repositories/snapshots + central-portal-snapshots + https://central.sonatype.com/repository/maven-snapshots/ true false @@ -154,7 +154,14 @@ With Gradle: [source,groovy,subs="attributes,specialcharacters"] ---- repositories { - maven { url 'https://oss.sonatype.org/content/repositories/snapshots' } + maven { + name = 'Central Portal Snapshots' + url = 'https://central.sonatype.com/repository/maven-snapshots/' + // Only search this repository for the specific dependency + content { + includeModule("com.rabbitmq", "{project-artifact-id}") + } + } mavenCentral() } ---- diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java index 33fac4dcbc..4c8a5239f5 100644 --- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java @@ -436,31 +436,6 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy( /** Helper to configure TLS. */ interface TlsConfiguration { - /** - * Enable hostname verification. - * - *

Hostname verification is enabled by default. - * - * @return the TLS configuration helper - * @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link - * #sslContext(SslContext)} - */ - @Deprecated(forRemoval = true) - TlsConfiguration hostnameVerification(); - - /** - * Enable or disable hostname verification. - * - *

Hostname verification is enabled by default. - * - * @param hostnameVerification whether to enable hostname verification or not - * @return the TLS configuration helper - * @deprecated use {@link SslContextBuilder#endpointIdentificationAlgorithm(String)} with {@link - * #sslContext(SslContext)} - */ - @Deprecated(forRemoval = true) - TlsConfiguration hostnameVerification(boolean hostnameVerification); - /** * Netty {@link SslContext} for TLS connections. * diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 8679844939..e69681f747 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -256,7 +256,7 @@ public Client(ClientParameters parameters) { b.option( ChannelOption.ALLOCATOR, parameters.byteBufAllocator == null - ? ByteBufAllocator.DEFAULT + ? Utils.byteBufAllocator() : parameters.byteBufAllocator); } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java index 7e2d1a7369..3c288d5428 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java @@ -28,17 +28,22 @@ final class DynamicBatch implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class); - private static final int MIN_BATCH_SIZE = 32; - private static final int MAX_BATCH_SIZE = 8192; + private static final int MIN_BATCH_SIZE = 16; private final BlockingQueue requests = new LinkedBlockingQueue<>(); private final BatchConsumer consumer; - private final int configuredBatchSize; + private final int configuredBatchSize, minBatchSize, maxBatchSize; private final Thread thread; - DynamicBatch(BatchConsumer consumer, int batchSize) { + DynamicBatch(BatchConsumer consumer, int batchSize, int maxUnconfirmed) { this.consumer = consumer; - this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE); + if (batchSize < maxUnconfirmed) { + this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2); + } else { + this.minBatchSize = min(1, maxUnconfirmed / 2); + } + this.configuredBatchSize = batchSize; + this.maxBatchSize = batchSize * 2; this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop); this.thread.start(); } @@ -69,15 +74,7 @@ private void loop() { if (state.items.size() >= state.batchSize) { this.maybeCompleteBatch(state, true); } else { - item = this.requests.poll(); - if (item == null) { - this.maybeCompleteBatch(state, false); - } else { - state.items.add(item); - if (state.items.size() >= state.batchSize) { - this.maybeCompleteBatch(state, true); - } - } + pump(state, 2); } } else { this.maybeCompleteBatch(state, false); @@ -85,6 +82,22 @@ private void loop() { } } + private void pump(State state, int pumpCount) { + if (pumpCount <= 0) { + return; + } + T item = this.requests.poll(); + if (item == null) { + this.maybeCompleteBatch(state, false); + } else { + state.items.add(item); + if (state.items.size() >= state.batchSize) { + this.maybeCompleteBatch(state, true); + } + this.pump(state, pumpCount - 1); + } + } + private static final class State { int batchSize; @@ -96,13 +109,14 @@ private void maybeCompleteBatch(State state, boolean increaseIfCompleted) { boolean completed = this.consumer.process(state.items); if (completed) { if (increaseIfCompleted) { - state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE); + state.batchSize = min(state.batchSize * 2, this.maxBatchSize); } else { - state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE); + state.batchSize = max(state.batchSize / 2, this.minBatchSize); } state.items = new ArrayList<>(state.batchSize); } } catch (Exception e) { + // e.printStackTrace(); LOGGER.warn("Error during dynamic batch completion: {}", e.getMessage()); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java index ee8c397e13..8c763cde86 100644 --- a/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java +++ b/src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java @@ -38,6 +38,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { DynamicBatchMessageAccumulator( int subEntrySize, int batchSize, + int maxUnconfirmedMessages, Codec codec, int maxFrameSize, ToLongFunction publishSequenceFunction, @@ -75,7 +76,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { } return result; }, - batchSize); + batchSize, + maxUnconfirmedMessages); } else { byte compressionCode = compressionCodec == null ? Compression.NONE.code() : compressionCodec.code(); @@ -124,7 +126,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator { } return result; }, - batchSize * subEntrySize); + batchSize * subEntrySize, + maxUnconfirmedMessages); } } diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java index 5ae8faa7dd..691fc65c57 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java @@ -30,6 +30,7 @@ static MessageAccumulator createMessageAccumulator( boolean dynamicBatch, int subEntrySize, int batchSize, + int maxUnconfirmedMessages, CompressionCodec compressionCodec, Codec codec, ByteBufAllocator byteBufAllocator, @@ -44,6 +45,7 @@ static MessageAccumulator createMessageAccumulator( return new DynamicBatchMessageAccumulator( subEntrySize, batchSize, + maxUnconfirmedMessages, codec, maxFrameSize, publishSequenceFunction, diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index bf7f395fca..1be66aa36b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -372,20 +372,6 @@ private DefaultTlsConfiguration(EnvironmentBuilder environmentBuilder) { this.environmentBuilder = environmentBuilder; } - @Override - @SuppressWarnings("removal") - public TlsConfiguration hostnameVerification() { - this.hostnameVerification = true; - return this; - } - - @Override - @SuppressWarnings("removal") - public TlsConfiguration hostnameVerification(boolean hostnameVerification) { - this.hostnameVerification = hostnameVerification; - return this; - } - @Override public TlsConfiguration sslContext(SslContext sslContext) { this.sslContext = sslContext; @@ -436,7 +422,7 @@ static class DefaultNettyConfiguration implements NettyConfiguration { private final EnvironmentBuilder environmentBuilder; private EventLoopGroup eventLoopGroup; - private ByteBufAllocator byteBufAllocator = ByteBufAllocator.DEFAULT; + private ByteBufAllocator byteBufAllocator = Utils.byteBufAllocator(); private Consumer channelCustomizer = noOpConsumer(); private Consumer bootstrapCustomizer = noOpConsumer(); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java index 27552512c8..fd3cb8f25d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducer.java @@ -180,6 +180,7 @@ public int fragmentLength(Object entity) { dynamicBatch, subEntrySize, batchSize, + maxUnconfirmedMessages, compressionCodec, environment.codec(), environment.byteBufAllocator(), diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java index 43a57bbc5c..b44290868c 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamProducerBuilder.java @@ -28,8 +28,8 @@ class StreamProducerBuilder implements ProducerBuilder { - static final boolean DEFAULT_DYNAMIC_BATCH = true; -// Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true")); + static final boolean DEFAULT_DYNAMIC_BATCH = + Boolean.parseBoolean(System.getProperty("rabbitmq.stream.producer.dynamic.batch", "true")); private final StreamEnvironment environment; @@ -201,7 +201,7 @@ public Producer build() { if (this.routingConfiguration == null && this.superStream != null) { throw new IllegalArgumentException( - "A routing configuration must specified when a super stream is set"); + "A routing configuration must be specified when a super stream is set"); } if (this.stream != null) { diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index 4ea934e911..4b535bb21d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -19,6 +19,8 @@ import com.rabbitmq.stream.*; import com.rabbitmq.stream.impl.Client.ClientParameters; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoopGroup; import io.netty.channel.MultiThreadIoEventLoopGroup; @@ -415,6 +417,10 @@ static EventLoopGroup eventLoopGroup() { return new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory()); } + static ByteBufAllocator byteBufAllocator() { + return PooledByteBufAllocator.DEFAULT; + } + /* class to help testing SAC on super streams */ diff --git a/src/test/java/SanityCheck.java b/src/test/java/SanityCheck.java index 4e550c8bd2..7ee907180c 100755 --- a/src/test/java/SanityCheck.java +++ b/src/test/java/SanityCheck.java @@ -1,5 +1,4 @@ ///usr/bin/env jbang "$0" "$@" ; exit $? -//REPOS mavencentral,ossrh-staging=https://oss.sonatype.org/content/groups/staging/,rabbitmq-packagecloud-milestones=https://packagecloud.io/rabbitmq/maven-milestones/maven2 //DEPS com.rabbitmq:stream-client:${env.RABBITMQ_LIBRARY_VERSION} //DEPS org.slf4j:slf4j-simple:1.7.36 diff --git a/src/test/java/com/rabbitmq/stream/Cli.java b/src/test/java/com/rabbitmq/stream/Cli.java index 7097725af4..5c583a461b 100644 --- a/src/test/java/com/rabbitmq/stream/Cli.java +++ b/src/test/java/com/rabbitmq/stream/Cli.java @@ -178,6 +178,30 @@ static List toConnectionInfoList(String json) { return GSON.fromJson(json, new TypeToken>() {}.getType()); } + public static List listGroupConsumers(String stream, String reference) { + ProcessState process = + rabbitmqStreams( + format( + "list_stream_group_consumers -q --stream %s --reference %s " + + "--formatter table subscription_id,state", + stream, reference)); + + List itemList = Collections.emptyList(); + String content = process.output(); + String[] lines = content.split(System.lineSeparator()); + if (lines.length > 1) { + itemList = new ArrayList<>(lines.length - 1); + for (int i = 1; i < lines.length; i++) { + String line = lines[i]; + String[] fields = line.split("\t"); + String id = fields[0]; + String state = fields[1].replace("\"", ""); + itemList.add(new SubscriptionInfo(Integer.parseInt(id), state)); + } + } + return itemList; + } + public static void restartStream(String stream) { rabbitmqStreams(" restart_stream " + stream); } @@ -420,6 +444,30 @@ public String toString() { } } + public static final class SubscriptionInfo { + + private final int id; + private final String state; + + public SubscriptionInfo(int id, String state) { + this.id = id; + this.state = state; + } + + public int id() { + return this.id; + } + + public String state() { + return this.state; + } + + @Override + public String toString() { + return "SubscriptionInfo{id='" + id + '\'' + ", state='" + state + '\'' + '}'; + } + } + public static class ProcessState { private final InputStreamPumpState inputState; diff --git a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java index 02844308dd..987cf311c9 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DeliveryTest.java @@ -20,7 +20,6 @@ import com.rabbitmq.stream.impl.ServerFrameHandler.DeliverVersion1FrameHandler; import com.rabbitmq.stream.metrics.NoOpMetricsCollector; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -51,7 +50,7 @@ public MessageBuilder messageBuilder() { ByteBuf generateFrameBuffer( int nbMessages, long chunkOffset, int dataSize, Iterable messages) { - ByteBuf bb = ByteBufAllocator.DEFAULT.buffer(1024); + ByteBuf bb = Utils.byteBufAllocator().buffer(1024); bb.writeShort(Utils.encodeRequestCode(Constants.COMMAND_DELIVER)) .writeShort(Constants.VERSION_1) .writeByte(1) // subscription id diff --git a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java index dca9810762..07a2877385 100644 --- a/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java @@ -23,8 +23,11 @@ import com.rabbitmq.stream.impl.TestUtils.Sync; import java.util.Locale; import java.util.Random; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; @@ -68,7 +71,7 @@ void itemAreProcessed() { sync.down(items.size()); return true; }; - try (DynamicBatch batch = new DynamicBatch<>(action, 100)) { + try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) { RateLimiter rateLimiter = RateLimiter.create(10000); IntStream.range(0, itemCount) .forEach( @@ -99,7 +102,7 @@ void failedProcessingIsReplayed() throws Exception { } return result; }; - try (DynamicBatch batch = new DynamicBatch<>(action, 100)) { + try (DynamicBatch batch = new DynamicBatch<>(action, 100, 10_000)) { int firstRoundCount = itemCount / 5; IntStream.range(0, firstRoundCount) .forEach( @@ -118,4 +121,37 @@ void failedProcessingIsReplayed() throws Exception { waitAtMost(() -> collected.get() == itemCount); } } + + @Test + void lowThrottlingValueShouldStillHighPublishingRate() throws Exception { + int batchSize = 10; + Semaphore semaphore = new Semaphore(batchSize); + DynamicBatch.BatchConsumer action = + items -> { + semaphore.release(items.size()); + return true; + }; + + try (DynamicBatch batch = new DynamicBatch<>(action, batchSize, 10_000)) { + MetricRegistry metrics = new MetricRegistry(); + Meter rate = metrics.meter("publishing-rate"); + AtomicBoolean keepGoing = new AtomicBoolean(true); + AtomicLong sequence = new AtomicLong(); + new Thread( + () -> { + while (keepGoing.get() && !Thread.interrupted()) { + long id = sequence.getAndIncrement(); + if (semaphore.tryAcquire()) { + batch.add(id); + rate.mark(); + } + } + }) + .start(); + long start = System.nanoTime(); + waitAtMost( + () -> + System.nanoTime() - start > TimeUnit.SECONDS.toNanos(1) && rate.getMeanRate() > 1000); + } + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java index 28422d40ad..7198facf5a 100644 --- a/src/test/java/com/rabbitmq/stream/impl/FrameTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/FrameTest.java @@ -26,7 +26,6 @@ import com.rabbitmq.stream.Message; import com.rabbitmq.stream.Properties; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import java.io.ByteArrayOutputStream; @@ -149,7 +148,7 @@ public TestDesc(String description, List sizes, int expectedCalls) { tests.forEach( test -> { Channel channel = Mockito.mock(Channel.class); - Mockito.when(channel.alloc()).thenReturn(ByteBufAllocator.DEFAULT); + Mockito.when(channel.alloc()).thenReturn(Utils.byteBufAllocator()); Mockito.when(channel.writeAndFlush(Mockito.any())) .thenReturn(Mockito.mock(ChannelFuture.class)); diff --git a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java index 24f50ee0cd..e234d723c4 100644 --- a/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java @@ -16,18 +16,22 @@ import static com.rabbitmq.stream.impl.Assertions.assertThat; import static com.rabbitmq.stream.impl.LoadBalancerClusterTest.LOAD_BALANCER_ADDRESS; +import static com.rabbitmq.stream.impl.TestUtils.BrokerVersion.RABBITMQ_4_1_2; import static com.rabbitmq.stream.impl.TestUtils.newLoggerLevel; import static com.rabbitmq.stream.impl.TestUtils.sync; +import static com.rabbitmq.stream.impl.TestUtils.waitAtMost; import static com.rabbitmq.stream.impl.ThreadUtils.threadFactory; import static com.rabbitmq.stream.impl.Tuples.pair; import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.InstanceOfAssertFactories.stream; import ch.qos.logback.classic.Level; import com.google.common.collect.Streams; import com.google.common.util.concurrent.RateLimiter; import com.rabbitmq.stream.*; +import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster; import com.rabbitmq.stream.impl.TestUtils.Sync; import com.rabbitmq.stream.impl.Tuples.Pair; @@ -41,15 +45,20 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -201,15 +210,7 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru syncs = consumers.stream().map(c -> c.waitForNewMessages(100)).collect(toList()); syncs.forEach(s -> assertThat(s).completes()); - nodes.forEach( - n -> { - LOGGER.info("Restarting node {}...", n); - Cli.restartNode(n); - LOGGER.info("Restarted node {}.", n); - }); - LOGGER.info("Rebalancing..."); - Cli.rebalance(); - LOGGER.info("Rebalancing over."); + restartCluster(); Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis()); @@ -291,8 +292,132 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @BrokerVersionAtLeast(RABBITMQ_4_1_2) + void sacWithClusterRestart(boolean superStream) throws Exception { + environment = + environmentBuilder + .uris(URIS) + .netty() + .bootstrapCustomizer( + b -> { + b.option( + ChannelOption.CONNECT_TIMEOUT_MILLIS, + (int) BACK_OFF_DELAY_POLICY.delay(0).toMillis()); + }) + .environmentBuilder() + .maxConsumersByConnection(1) + .build(); + + int consumerCount = 3; + AtomicLong lastOffset = new AtomicLong(0); + String app = "app-name"; + String s = TestUtils.streamName(testInfo); + ProducerState pState = null; + List consumers = Collections.emptyList(); + try { + StreamCreator sCreator = environment.streamCreator().stream(s); + if (superStream) { + sCreator = sCreator.superStream().partitions(1).creator(); + } + sCreator.create(); + + pState = new ProducerState(s, true, superStream, environment); + pState.start(); + + Map consumerStatus = new ConcurrentHashMap<>(); + consumers = + IntStream.range(0, consumerCount) + .mapToObj( + i -> + new ConsumerState( + s, + environment, + b -> { + b.singleActiveConsumer() + .name(app) + .noTrackingStrategy() + .consumerUpdateListener( + ctx -> { + consumerStatus.put(i, ctx.isActive()); + return OffsetSpecification.offset(lastOffset.get()); + }); + if (superStream) { + b.superStream(s); + } else { + b.stream(s); + } + }, + (ctx, m) -> lastOffset.set(ctx.offset()))) + .collect(toList()); + + Sync sync = pState.waitForNewMessages(100); + assertThat(sync).completes(); + sync = consumers.get(0).waitForNewMessages(100); + assertThat(sync).completes(); + + String streamArg = superStream ? s + "-0" : s; + + Callable checkConsumers = + () -> { + waitAtMost( + () -> { + List subscriptions = Cli.listGroupConsumers(streamArg, app); + LOGGER.info("Group consumers: {}", subscriptions); + return subscriptions.size() == consumerCount + && subscriptions.stream() + .filter(sub -> sub.state().startsWith("active")) + .count() + == 1 + && subscriptions.stream() + .filter(sub -> sub.state().startsWith("waiting")) + .count() + == 2; + }, + () -> + "Group consumers not in expected state: " + + Cli.listGroupConsumers(streamArg, app)); + return null; + }; + + checkConsumers.call(); + + restartCluster(); + + Thread.sleep(BACK_OFF_DELAY_POLICY.delay(0).toMillis()); + + sync = pState.waitForNewMessages(100); + assertThat(sync).completes(ASSERTION_TIMEOUT); + int activeIndex = + consumerStatus.entrySet().stream() + .filter(Map.Entry::getValue) + .map(Map.Entry::getKey) + .findFirst() + .orElseThrow(() -> new IllegalStateException("No active consumer found")); + + sync = consumers.get(activeIndex).waitForNewMessages(100); + assertThat(sync).completes(ASSERTION_TIMEOUT); + + checkConsumers.call(); + + } finally { + if (pState != null) { + pState.close(); + } + consumers.forEach(ConsumerState::close); + if (superStream) { + environment.deleteSuperStream(s); + } else { + environment.deleteStream(s); + } + } + } + private static class ProducerState implements AutoCloseable { + private static final AtomicLong MSG_ID_SEQ = new AtomicLong(0); + private static final byte[] BODY = "hello".getBytes(StandardCharsets.UTF_8); private final String stream; @@ -306,9 +431,19 @@ private static class ProducerState implements AutoCloseable { final AtomicReference lastExceptionInstant = new AtomicReference<>(); private ProducerState(String stream, boolean dynamicBatch, Environment environment) { + this(stream, dynamicBatch, false, environment); + } + + private ProducerState( + String stream, boolean dynamicBatch, boolean superStream, Environment environment) { this.stream = stream; - this.producer = - environment.producerBuilder().stream(stream).dynamicBatch(dynamicBatch).build(); + ProducerBuilder builder = environment.producerBuilder().dynamicBatch(dynamicBatch); + if (superStream) { + builder.superStream(stream).routing(m -> m.getProperties().getMessageIdAsString()); + } else { + builder.stream(stream); + } + this.producer = builder.build(); } void start() { @@ -327,7 +462,14 @@ void start() { try { this.limiter.acquire(1); this.producer.send( - producer.messageBuilder().addData(BODY).build(), confirmationHandler); + producer + .messageBuilder() + .properties() + .messageId(MSG_ID_SEQ.getAndIncrement()) + .messageBuilder() + .addData(BODY) + .build(), + confirmationHandler); } catch (Throwable e) { this.lastException.set(e); this.lastExceptionInstant.set(Instant.now()); @@ -380,16 +522,27 @@ private static class ConsumerState implements AutoCloseable { final AtomicReference postHandle = new AtomicReference<>(() -> {}); private ConsumerState(String stream, Environment environment) { + this(stream, environment, b -> b.stream(stream), (ctx, m) -> {}); + } + + private ConsumerState( + String stream, + Environment environment, + java.util.function.Consumer customizer, + MessageHandler delegateHandler) { this.stream = stream; - this.consumer = - environment.consumerBuilder().stream(stream) + ConsumerBuilder builder = + environment + .consumerBuilder() .offset(OffsetSpecification.first()) .messageHandler( (ctx, m) -> { + delegateHandler.handle(ctx, m); receivedCount.incrementAndGet(); postHandle.get().run(); - }) - .build(); + }); + customizer.accept(builder); + this.consumer = builder.build(); } Sync waitForNewMessages(int messageCount) { @@ -414,4 +567,16 @@ public void close() { this.consumer.close(); } } + + private static void restartCluster() { + nodes.forEach( + n -> { + LOGGER.info("Restarting node {}...", n); + Cli.restartNode(n); + LOGGER.info("Restarted node {}.", n); + }); + LOGGER.info("Rebalancing..."); + Cli.rebalance(); + LOGGER.info("Rebalancing over."); + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java index f076a78fde..9588002d54 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java @@ -216,7 +216,7 @@ void producersAndConsumersShouldBeClosedWhenEnvironmentIsClosed(boolean lazyInit } @Test - void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) throws Exception { + void growShrinkResourcesWhenProducersConsumersAreOpenedAndClosed(TestInfo info) { int messageCount = 100; int streamCount = 20; int producersCount = ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT * 3 + 10; diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java index e8d3f25e34..8dafb175ad 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java @@ -26,7 +26,6 @@ import com.rabbitmq.stream.StreamException; import com.rabbitmq.stream.impl.Client.ClientParameters; import com.rabbitmq.stream.impl.StreamEnvironment.LocatorNotAvailableException; -import io.netty.buffer.ByteBufAllocator; import java.net.URI; import java.time.Duration; import java.util.Arrays; @@ -94,7 +93,7 @@ Client.ClientParameters duplicate() { ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, - ByteBufAllocator.DEFAULT, + Utils.byteBufAllocator(), false, type -> "locator-connection", cf, @@ -163,7 +162,7 @@ void shouldTryUrisOnInitializationFailure() throws Exception { ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, - ByteBufAllocator.DEFAULT, + Utils.byteBufAllocator(), false, type -> "locator-connection", cf, @@ -195,7 +194,7 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled( ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT, null, - ByteBufAllocator.DEFAULT, + Utils.byteBufAllocator(), lazyInit, type -> "locator-connection", cf, diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java index 1150a90842..51f320a46c 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java @@ -75,7 +75,7 @@ public class StreamProducerUnitTest { void init() { mocks = MockitoAnnotations.openMocks(this); executorService = Executors.newScheduledThreadPool(2); - when(channel.alloc()).thenReturn(ByteBufAllocator.DEFAULT); + when(channel.alloc()).thenReturn(Utils.byteBufAllocator()); when(channel.writeAndFlush(Mockito.any())).thenReturn(channelFuture); when(client.allocateNoCheck(any(ByteBufAllocator.class), anyInt())) .thenAnswer( diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 59d88cf6aa..52214838b8 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -73,7 +73,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ExtensionContext.Namespace; -import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; @@ -736,7 +735,7 @@ private static Field field(Class cls, String name) { return field; } - private static class ExecutorServiceCloseableResourceWrapper implements CloseableResource { + private static class ExecutorServiceCloseableResourceWrapper implements AutoCloseable { private final ExecutorService executorService; @@ -1065,7 +1064,8 @@ public enum BrokerVersion { RABBITMQ_3_11_11("3.11.11"), RABBITMQ_3_11_14("3.11.14"), RABBITMQ_3_13_0("3.13.0"), - RABBITMQ_4_0_0("4.0.0"); + RABBITMQ_4_0_0("4.0.0"), + RABBITMQ_4_1_2("4.1.2"); final String value;