10000 Merge pull request #186 from rabbitmq/rabbitmq-java-client-79 · aashish13/rabbitmq-java-client@019e76d · GitHub
[go: up one dir, main page]

Skip to content

Commit 019e76d

Browse files
Merge pull request rabbitmq#186 from rabbitmq/rabbitmq-java-client-79
Introduce metrics support
2 parents d352e28 + fc5b603 commit 019e76d

20 files changed

+1389
-36
lines changed

pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,12 @@
5454
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
5555

5656
<slf4j.version>1.7.21</slf4j.version>
57+
<metrics.version>3.1.2</metrics.version>
5758
<logback.version>1.1.7</logback.version>
5859
<commons-cli.version>1.1</commons-cli.version>
5960
<junit.version>4.12</junit.version>
61+
<awaitility.version>2.0.0</awaitility.version>
62+
<mockito.version>1.10.19</mockito.version>
6063

6164
<!--
6265
These groovy scripts are used later in this POM file to generate
@@ -468,6 +471,12 @@
468471
<artifactId>slf4j-api</artifactId>
469472
<version>${slf4j.version}</version>
470473
</dependency>
474+
<dependency>
475+
<groupId>io.dropwizard.metrics</groupId>
476+
<artifactId>metrics-core</artifactId>
477+
<version>${metrics.version}</version>
478+
<optional>true</optional>
479+
</dependency>
471480
<dependency>
472481
<groupId>commons-cli</groupId>
473482
<artifactId>commons-cli</artifactId>
@@ -486,6 +495,18 @@
486495
<version>${logback.version}</version>
487496
<scope>test</scope>
488497
</dependency>
498+
<dependency>
499+
<groupId>org.awaitility</groupId>
500+
<artifactId>awaitility</artifactId>
501+
<version>${awaitility.version}</version>
502+
<scope>test</scope>
503+
</dependency>
504+
<dependency>
505+
<groupId>org.mockito</groupId>
506+
<artifactId>mockito-core</artifactId>
507+
<version>${mockito.version}</version>
508+
<scope>test</scope>
509+
</dependency>
489510
</dependencies>
490511

491512
<build>

src/main/java/com/rabbitmq/client/Channel.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1026,4 +1026,5 @@ void basicNack(long deliveryTag, boolean multiple, boolean requeue)
10261026
* @throws IOException Problem transmitting method.
10271027
*/
10281028
long consumerCount(String queue) throws IOException;
1029+
10291030
}

src/main/java/com/rabbitmq/client/Connection.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,4 +257,26 @@ public interface Connection extends ShutdownNotifier, Closeable { // rename to A
257257
* @see com.rabbitmq.client.ExceptionHandler
258258
*/
259259
ExceptionHandler getExceptionHandler();
260+
261+
/**
262+
* Unique ID for this connection.
263+
* This ID should be unique, otherwise some services
264+
* like the metrics collector won't work properly.
265+
* This ID doesn't have to be provided by the client,
266+
* services that require it will assign it automatically
267+
* if it's not set.
268+
* @return
269+
*/
270+
String getId();
271+
272+
/**
273+
* Set the unique ID for this connection.
274+
* This ID should be unique, otherwise some services
275+
* like the metrics collector won't work properly.
276+
* This ID doesn't have to be provided by the client,
277+
* services that require it will assign it automatically
278+
* if it's not set.
279+
* @return
280+
*/
281+
void setId(String id);
260282
}

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ public class ConnectionFactory implements Cloneable {
106106
// to use recovery intervals > Integer.MAX_VALUE in practice.
107107
private long networkRecoveryInterval = 5000;
108108

109+
private MetricsCollector metricsCollector;
110+
109111
/** @return the default host to use for connections */
110112
public String getHost() {
111113
return host;
@@ -631,6 +633,14 @@ public void setTopologyRecoveryEnabled(boolean topologyRecovery) {
631633
this.topologyRecovery = topologyRecovery;
632634
}
633635

636+
public void setMetricsCollector(MetricsCollector metricsCollector) {
637+
this.metricsCollector = metricsCollector;
638+
}
639+
640+
public MetricsCollector getMetricsCollector() {
641+
return metricsCollector;
642+
}
643+
634644
protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
635645
return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL());
636646
}
@@ -850,6 +860,9 @@ public Connection newConnection(ExecutorService executor, List<Address> addrs, S
850860
*/
851861
public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
852862
throws IOException, TimeoutException {
863+
if(this.metricsCollector == null) {
864+
this.metricsCollector = new NoOpMetricsCollector();
865+
}
853866
// make sure we respect the provided thread factory
854867
FrameHandlerFactory fhFactory = createFrameHandlerFactory();
855868
ConnectionParams params = params(executor);
@@ -862,7 +875,7 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
862875

863876
if (isAutomaticRecoveryEnabled()) {
864877
// see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
865-
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver);
878+
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
866879

867880
conn.init();
868881
return conn;
@@ -872,8 +885,9 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres
872885
for (Address addr : addrs) {
873886
try {
874887
FrameHandler handler = fhFactory.create(addr);
875-
AMQConnection conn = new AMQConnection(params, handler);
888+
AMQConnection conn = new AMQConnection(params, handler, metricsCollector);
876889
conn.start();
890+
this.metricsCollector.newConnection(conn);
877891
return conn;
878892
} catch (IOException e) {
879893
lastException = e;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client;
17+
18+
/**
19+
* Interface to gather execution data of the client.
20+
* Note transactions are not supported: they deal with
21+
* publishing and acknowledgments and the collector contract
22+
* assumes then that published messages and acks sent
23+
* in a transaction are always counted, even if the
24+
* transaction is rolled back.
25+
*
26+
*/
27+
public interface MetricsCollector {
28+
29+
void newConnection(Connection connection);
30+
31+
void closeConnection(Connection connection);
32+
33+
void newChannel(Channel channel);
34+
35+
void closeChannel(Channel channel);
36+
37+
void basicPublish(Channel channel);
38+
39+
void consumedMessage(Channel channel, long deliveryTag, boolean autoAck);
40+
41+
void consumedMessage(Channel channel, long deliveryTag, String consumerTag);
42+
43+
void basicAck(Channel channel, long deliveryTag, boolean multiple);
44+
45+
void basicNack(Channel channel, long deliveryTag);
46+
47+
void basicReject(Channel channel, long deliveryTag);
48+
49+
void basicConsume(Channel channel, String consumerTag, boolean autoAck);
50+
51+
void basicCancel(Channel channel, String consumerTag);
52+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Java client library, is triple-licensed under the
4+
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
5+
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
6+
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
// info@rabbitmq.com.
15+
16+
package com.rabbitmq.client;
17+
18+
/**
19+
*
20+
*/
21+
public class NoOpMetricsCollector implements MetricsCollector {
22+
23+
@Override
24+
public void newConnection(Connection connection) {
25+
26+
}
27+
28+
@Override
29+
public void closeConnection(Connection connection) {
30+
31+
}
32+
33+
@Override
34+
public void newChannel(Channel channel) {
35+
36+
}
37+
38+
@Override
39+
public void closeChannel(Channel channel) {
40+
41+
}
42+
43+
@Override
44+
public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
45+
46+
}
47+
48+
@Override
49+
public void basicNack(Channel channel, long deliveryTag) {
50+
51+
}
52+
53+
@Override
54+
public void basicReject(Channel channel, long deliveryTag) {
55+
56+
}
57+
58+
@Override
59+
public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
60+
61+
}
62+
63+
@Override
64+
public void basicCancel(Channel channel, String consumerTag) {
65+
66+
}
67+
68+
@Override
69+
public void basicPublish(Channel channel) {
70+
71+
}
72+
73+
@Override
74+
public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) {
75+
76+
}
77+
78+
@Override
79+
public void consumedMessage(Channel channel, long deliveryTag, String consumerTag) {
80+
81+
}
82+
83+
}

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,8 @@
2323
import java.util.*;
2424
import java.util.concurrent.*;
2525

26-
import com.rabbitmq.client.AMQP;
27-
import com.rabbitmq.client.AuthenticationFailureException;
28-
import com.rabbitmq.client.BlockedListener;
29-
import com.rabbitmq.client.ExceptionHandler;
26+
import com.rabbitmq.client.*;
3027
import com.rabbitmq.client.Method;
31-
import com.rabbitmq.client.AlreadyClosedException;
32-
import com.rabbitmq.client.Channel;
33-
import com.rabbitmq.client.Command;
34-
import com.rabbitmq.client.Connection;
35-
import com.rabbitmq.client.ConnectionFactory;
36-
import com.rabbitmq.client.LongString;
37-
import com.rabbitmq.client.MissedHeartbeatException;
38-
import com.rabbitmq.client.PossibleAuthenticationFailureException;
39-
import com.rabbitmq.client.ProtocolVersionMismatchException;
40-
import com.rabbitmq.client.SaslConfig;
41-
import com.rabbitmq.client.SaslMechanism;
42-
import com.rabbitmq.client.ShutdownSignalException;
4328
import com.rabbitmq.client.impl.AMQChannel.BlockingRpcContinuation;
4429
import com.rabbitmq.client.impl.recovery.RecoveryCanBeginListener;
4530
import com.rabbitmq.utility.BlockingCell;
@@ -61,6 +46,7 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
6146
private final ExecutorService shutdownExecutor;
6247
private Thread mainLoopThread;
6348
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
49+
private String id;
6450

6551
private final List<RecoveryCanBeginListener> recoveryCanBeginListeners =
6652
new ArrayList<RecoveryCanBeginListener>();
@@ -140,6 +126,7 @@ public static Map<String, Object> defaultClientProperties() {
140126
private final String username;
141127
private final String password;
142128
private final Collection<BlockedListener> blockedListeners = new CopyOnWriteArrayList<BlockedListener>();
129+
protected final MetricsCollector metricsCollector;
143130

144131
/* State modified after start - all volatile */
145132

@@ -199,10 +186,14 @@ public Map<String, Object> getServerProperties() {
199186
return _serverProperties;
200187
}
201188

189+
public AMQConnection(ConnectionParams params, FrameHandler frameHandler) {
190+
this(params, frameHandler, new NoOpMetricsCollector());
191+
}
192+
202193
/** Construct a new connection
203194
* @param params parameters for it
204195
*/
205-
public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
196+
public AMQConnection(ConnectionParams params, FrameHandler frameHandler, MetricsCollector metricsCollector)
206197
{
207198
checkPreconditions();
208199
this.username = params.getUsername();
@@ -228,6 +219,8 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
228219
this._brokerInitiatedShutdown = false;
229220

230221
this._inConnectionNegotiation = true; // we start out waiting for the first protocol response
222+
223+
this.metricsCollector = metricsCollector;
231224
}
232225

233226
private void initializeConsumerWorkService() {
@@ -393,7 +386,7 @@ public void start()
393386
}
394387

395388
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
396-
ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory);
389+
ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory, this.metricsCollector);
397390
result.setShutdownExecutor(this.shutdownExecutor);
398391
return result;
399392
}
@@ -496,15 +489,19 @@ public Channel createChannel(int channelNumber) throws IOException {
496489
ensureIsOpen();
497490
ChannelManager cm = _channelManager;
498491
if (cm == null) return null;
499-
return cm.createChannel(this, channelNumber);
492+
Channel channel = cm.createChannel(this, channelNumber);
493+
metricsCollector.newChannel(channel);
494+
return channel;
500495
}
501496

502497
/** Public API - {@inheritDoc} */
503498
public Channel createChannel() throws IOException {
504499
ensureIsOpen();
505500
ChannelManager cm = _channelManager;
506501
if (cm == null) return null;
507-
return cm.createChannel(this);
502+
Channel channel = cm.createChannel(this);
503+
metricsCollector.newChannel(channel);
504+
return channel;
508505
}
509506

510507
/**
@@ -926,4 +923,16 @@ public boolean removeBlockedListener(BlockedListener listener) {
926923
public void clearBlockedListeners() {
927924
blockedListeners.clear();
928925
}
926+
927+
/** Public API - {@inheritDoc} */
928+
@Override
929+
public String getId() {
930+
return id;
931+
}
932+
933+
/** Public API - {@inheritDoc} */
934+
@Override
935+
public void setId(String id) {
936+
this.id = id;
937+
}
929938
}

0 commit comments

Comments
 (0)
0