From 4782751f82fdb3ffbf0b2d0dad17de4777589b9d Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 20 Mar 2015 14:33:08 +0100 Subject: [PATCH 1/5] Test for rabbitmq/rabbitmq-server#78 --- src/com/rabbitmq/client/Channel.java | 11 ++ src/com/rabbitmq/client/impl/ChannelN.java | 7 ++ .../impl/recovery/AutorecoveringChannel.java | 7 ++ .../rabbitmq/client/test/BrokerTestCase.java | 13 +++ .../test/server/XDeathHeaderGrowth.java | 103 ++++++++++++++++++ 5 files changed, 141 insertions(+) create mode 100644 test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java diff --git a/src/com/rabbitmq/client/Channel.java b/src/com/rabbitmq/client/Channel.java index fae5e22437..b400914135 100644 --- a/src/com/rabbitmq/client/Channel.java +++ b/src/com/rabbitmq/client/Channel.java @@ -563,6 +563,17 @@ void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolea */ void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException; + /** + * Bind a queue to an exchange, with a blank routing key and no extra arguments. + * @see com.rabbitmq.client.AMQP.Queue.Bind + * @see com.rabbitmq.client.AMQP.Queue.BindOk + * @param queue the name of the queue + * @param exchange the name of the exchange + * @return a binding-confirm method if the binding was successfully created + * @throws java.io.IOException if an error is encountered + */ + Queue.BindOk queueBind(String queue, String exchange) throws IOException; + /** * Bind a queue to an exchange, with no extra arguments. * @see com.rabbitmq.client.AMQP.Queue.Bind diff --git a/src/com/rabbitmq/client/impl/ChannelN.java b/src/com/rabbitmq/client/impl/ChannelN.java index 481dbf51bc..f3461d65dd 100644 --- a/src/com/rabbitmq/client/impl/ChannelN.java +++ b/src/com/rabbitmq/client/impl/ChannelN.java @@ -925,6 +925,13 @@ public Queue.BindOk queueBind(String queue, String exchange, .getMethod(); } + /** Public API - {@inheritDoc} */ + public Queue.BindOk queueBind(String queue, String exchange) + throws IOException + { + return queueBind(queue, exchange, "", null); + } + /** Public API - {@inheritDoc} */ public Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException diff --git a/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 5225f06921..7c8849edf4 100644 --- a/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -14,6 +14,7 @@ import com.rabbitmq.client.ReturnListener; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; +import com.rabbitmq.client.impl.AMQImpl; import java.io.IOException; import java.util.ArrayList; @@ -301,6 +302,12 @@ public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) t delegate.queueDeleteNoWait(queue, ifUnused, ifEmpty); } + public AMQP.Queue.BindOk queueBind(String queue, String exchange) + throws IOException + { + return queueBind(queue, exchange, "", null); + } + public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException { return queueBind(queue, exchange, routingKey, null); } diff --git a/test/src/com/rabbitmq/client/test/BrokerTestCase.java b/test/src/com/rabbitmq/client/test/BrokerTestCase.java index fc57d270a6..2521e1f19b 100644 --- a/test/src/com/rabbitmq/client/test/BrokerTestCase.java +++ b/test/src/com/rabbitmq/client/test/BrokerTestCase.java @@ -18,6 +18,7 @@ package com.rabbitmq.client.test; import java.io.IOException; +import java.util.Map; import java.util.UUID; import junit.framework.TestCase; @@ -226,6 +227,14 @@ protected void declareDurableQueue(String q) throws IOException { channel.queueDeclare(q, true, false, false, null); } + protected void declareTransientQueue(String q) throws IOException { + channel.queueDeclare(q, false, false, false, null); + } + + protected void declareTransientQueue(String q, Map args) throws IOException { + channel.queueDeclare(q, false, false, false, args); + } + protected void declareDurableTopicExchange(String x) throws IOException { channel.exchangeDeclare(x, "topic", true); } @@ -234,6 +243,10 @@ protected void declareTransientTopicExchange(String x) throws IOException { channel.exchangeDeclare(x, "topic", false); } + protected void declareTransientFanoutExchange(String x) throws IOException { + channel.exchangeDeclare(x, "fanout", false); + } + protected void deleteExchange(String x) throws IOException { channel.exchangeDelete(x); } diff --git a/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java b/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java new file mode 100644 index 0000000000..3bc4e3bb47 --- /dev/null +++ b/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java @@ -0,0 +1,103 @@ +package com.rabbitmq.client.test.server; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.test.BrokerTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +class RejectingConsumer extends DefaultConsumer { + private CountDownLatch latch; + private Map headers; + + public RejectingConsumer(Channel channel, CountDownLatch latch) { + super(channel); + this.latch = latch; + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) + throws IOException { + if(this.latch.getCount() > 0) { + this.getChannel().basicReject(envelope.getDeliveryTag(), false); + } else { + this.getChannel().basicAck(envelope.getDeliveryTag(), false); + } + this.headers = properties.getHeaders(); + latch.countDown(); + } + + public Map getHeaders() { + return headers; + } +} + +public class XDeathHeaderGrowth extends BrokerTestCase { + public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedException { + final String x1 = "issues.rabbitmq-server-78.fanout1"; + declareTransientFanoutExchange(x1); + final String x2 = "issues.rabbitmq-server-78.fanout2"; + declareTransientFanoutExchange(x2); + final String x3 = "issues.rabbitmq-server-78.fanout3"; + declareTransientFanoutExchange(x3); + + final String q1 = "issues.rabbitmq-server-78.queue1"; + Map args1 = argumentsForDeadLetteringTo(x1); + declareTransientQueue(q1, args1); + + final String q2 = "issues.rabbitmq-server-78.queue2"; + Map args2 = argumentsForDeadLetteringTo(x2); + declareTransientQueue(q2, args2); + this.channel.queueBind(q2, x1); + + final String q3 = "issues.rabbitmq-server-78.queue3"; + Map args3 = argumentsForDeadLetteringTo(x3); + declareTransientQueue(q3, args3); + this.channel.queueBind(q3, x2); + + final String qz = "issues.rabbitmq-server-78.destination"; + Map args4 = argumentsForDeadLetteringTo(x3); + declareTransientQueue(qz, args4); + this.channel.queueBind(qz, x3); + + CountDownLatch latch = new CountDownLatch(5); + RejectingConsumer cons = new RejectingConsumer(this.channel, latch); + this.channel.basicConsume(qz, cons); + + this.channel.basicPublish("", q1, null, "msg".getBytes()); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + List> events = (List>)cons.getHeaders().get("x-death"); + assertEquals(4, events.size()); + + List qs = new ArrayList(); + for (Map evt : events) { + qs.add(evt.get("queue").toString()); + } + Collections.sort(qs); + assertEquals(Arrays.asList(qz, q1, q2, q3), qs); + } + + private Map argumentsForDeadLetteringTo(String dlx) { + return argumentsForDeadLetteringTo(dlx, 1); + } + + private Map argumentsForDeadLetteringTo(String dlx, int ttl) { + Map m = new HashMap(); + m.put("x-dead-letter-exchange", dlx); + m.put("x-dead-letter-routing-key", "some-routing-key"); + m.put("x-message-ttl", ttl); + return m; + } +} From 854ce3bb9eaeb7fd63a97cd252696ea9be31acf0 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 20 Mar 2015 17:03:36 +0100 Subject: [PATCH 2/5] Take Channel#QueueBind/2 out Per team feedback. --- src/com/rabbitmq/client/Channel.java | 11 ----------- src/com/rabbitmq/client/impl/ChannelN.java | 7 ------- .../client/impl/recovery/AutorecoveringChannel.java | 6 ------ .../client/test/server/XDeathHeaderGrowth.java | 7 +++---- 4 files changed, 3 insertions(+), 28 deletions(-) diff --git a/src/com/rabbitmq/client/Channel.java b/src/com/rabbitmq/client/Channel.java index b400914135..fae5e22437 100644 --- a/src/com/rabbitmq/client/Channel.java +++ b/src/com/rabbitmq/client/Channel.java @@ -563,17 +563,6 @@ void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolea */ void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException; - /** - * Bind a queue to an exchange, with a blank routing key and no extra arguments. - * @see com.rabbitmq.client.AMQP.Queue.Bind - * @see com.rabbitmq.client.AMQP.Queue.BindOk - * @param queue the name of the queue - * @param exchange the name of the exchange - * @return a binding-confirm method if the binding was successfully created - * @throws java.io.IOException if an error is encountered - */ - Queue.BindOk queueBind(String queue, String exchange) throws IOException; - /** * Bind a queue to an exchange, with no extra arguments. * @see com.rabbitmq.client.AMQP.Queue.Bind diff --git a/src/com/rabbitmq/client/impl/ChannelN.java b/src/com/rabbitmq/client/impl/ChannelN.java index f3461d65dd..481dbf51bc 100644 --- a/src/com/rabbitmq/client/impl/ChannelN.java +++ b/src/com/rabbitmq/client/impl/ChannelN.java @@ -925,13 +925,6 @@ public Queue.BindOk queueBind(String queue, String exchange, .getMethod(); } - /** Public API - {@inheritDoc} */ - public Queue.BindOk queueBind(String queue, String exchange) - throws IOException - { - return queueBind(queue, exchange, "", null); - } - /** Public API - {@inheritDoc} */ public Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException diff --git a/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 7c8849edf4..244de5b667 100644 --- a/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -302,12 +302,6 @@ public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) t delegate.queueDeleteNoWait(queue, ifUnused, ifEmpty); } - public AMQP.Queue.BindOk queueBind(String queue, String exchange) - throws IOException - { - return queueBind(queue, exchange, "", null); - } - public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException { return queueBind(queue, exchange, routingKey, null); } diff --git a/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java b/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java index 3bc4e3bb47..2692d5f6eb 100644 --- a/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java +++ b/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java @@ -2,7 +2,6 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.test.BrokerTestCase; @@ -60,17 +59,17 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep final String q2 = "issues.rabbitmq-server-78.queue2"; Map args2 = argumentsForDeadLetteringTo(x2); declareTransientQueue(q2, args2); - this.channel.queueBind(q2, x1); + this.channel.queueBind(q2, x1, ""); final String q3 = "issues.rabbitmq-server-78.queue3"; Map args3 = argumentsForDeadLetteringTo(x3); declareTransientQueue(q3, args3); - this.channel.queueBind(q3, x2); + this.channel.queueBind(q3, x2, ""); final String qz = "issues.rabbitmq-server-78.destination"; Map args4 = argumentsForDeadLetteringTo(x3); declareTransientQueue(qz, args4); - this.channel.queueBind(qz, x3); + this.channel.queueBind(qz, x3, ""); CountDownLatch latch = new CountDownLatch(5); RejectingConsumer cons = new RejectingConsumer(this.channel, latch); From a0c92f1bdd0ea90ec3f12a043bcfa4788bd5b86c Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 25 Mar 2015 00:32:37 +0300 Subject: [PATCH 3/5] Assert on x-death event counters --- .../client/test/server/XDeathHeaderGrowth.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java b/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java index 2692d5f6eb..edf39edf2b 100644 --- a/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java +++ b/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java @@ -32,7 +32,9 @@ public void handleDelivery(String consumerTag, Envelope envelope, if(this.latch.getCount() > 0) { this.getChannel().basicReject(envelope.getDeliveryTag(), false); } else { - this.getChannel().basicAck(envelope.getDeliveryTag(), false); + if(this.getChannel().isOpen()) { + this.getChannel().basicAck(envelope.getDeliveryTag(), false); + } } this.headers = properties.getHeaders(); latch.countDown(); @@ -44,6 +46,7 @@ public Map getHeaders() { } public class XDeathHeaderGrowth extends BrokerTestCase { + @SuppressWarnings("unchecked") public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedException { final String x1 = "issues.rabbitmq-server-78.fanout1"; declareTransientFanoutExchange(x1); @@ -71,7 +74,7 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep declareTransientQueue(qz, args4); this.channel.queueBind(qz, x3, ""); - CountDownLatch latch = new CountDownLatch(5); + CountDownLatch latch = new CountDownLatch(10); RejectingConsumer cons = new RejectingConsumer(this.channel, latch); this.channel.basicConsume(qz, cons); @@ -86,6 +89,12 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep } Collections.sort(qs); assertEquals(Arrays.asList(qz, q1, q2, q3), qs); + List cs = new ArrayList(); + for (Map evt : events) { + cs.add((Long)evt.get("counter")); + } + Collections.sort(cs); + assertEquals(Arrays.asList(1L, 1L, 1L, 9L), cs); } private Map argumentsForDeadLetteringTo(String dlx) { From 017d016899a3b3e06c8ed6eae1a79e1b673895f7 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 25 Mar 2015 19:19:20 +0300 Subject: [PATCH 4/5] Remove unused import --- src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java b/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java index 244de5b667..5225f06921 100644 --- a/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java +++ b/src/com/rabbitmq/client/impl/recovery/AutorecoveringChannel.java @@ -14,7 +14,6 @@ import com.rabbitmq.client.ReturnListener; import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; -import com.rabbitmq.client.impl.AMQImpl; import java.io.IOException; import java.util.ArrayList; From a3736509597d3f5631145236027c15ea3a72a2ad Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 26 Mar 2015 00:22:05 +0300 Subject: [PATCH 5/5] x-death.counter => x-death.count --- .../src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java b/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java index edf39edf2b..9a259672e8 100644 --- a/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java +++ b/test/src/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java @@ -91,7 +91,7 @@ public void testBoundedXDeathHeaderGrowth() throws IOException, InterruptedExcep assertEquals(Arrays.asList(qz, q1, q2, q3), qs); List cs = new ArrayList(); for (Map evt : events) { - cs.add((Long)evt.get("counter")); + cs.add((Long)evt.get("count")); } Collections.sort(cs); assertEquals(Arrays.asList(1L, 1L, 1L, 9L), cs);