8000 Use Dropwizard Metrics for statistics · ThinkLib/rabbitmq-java-client@6a3d48f · GitHub
[go: up one dir, main page]

Skip to content

Commit 6a3d48f

Browse files
committed
Use Dropwizard Metrics for statistics
Fixes rabbitmq#79
1 parent 45abea6 commit 6a3d48f

File tree

5 files changed

+545
-227
lines changed

5 files changed

+545
-227
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
5555

5656
<slf4j.version>1.7.21</slf4j.version>
57+
<metrics.version>3.1.2</metrics.version>
5758
<logback.version>1.1.7</logback.version>
5859
<commons-cli.version>1.1</commons-cli.version>
5960
<junit.version>4.12</junit.version>
@@ -470,6 +471,12 @@
470471
<artifactId>slf4j-api</artifactId>
471472
<version>${slf4j.version}</version>
472473
</dependency>
474+
<dependency>
475+
<groupId>io.dropwizard.metrics</groupId>
476+
<artifactId>metrics-core</artifactId>
477+
<version>${metrics.version}</version>
478+
<optional>true</optional>
479+
</dependency>
473480
<dependency>
474481
<groupId>commons-cli</groupId>
475482
<artifactId>commons-cli</artifactId>
Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
package com.rabbitmq.client.impl;
2+
3+
import com.rabbitmq.client.*;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
7+
import java.util.*;
8+
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.concurrent.ConcurrentMap;
10+
import java.util.concurrent.locks.Lock;
11+
import java.util.concurrent.locks.ReentrantLock;
12+
13+
/**
14+
*
15+
*/
16+
public abstract class BaseStatistics implements StatisticsCollector {
17+
18+
private static final Logger LOGGER = LoggerFactory.getLogger(BaseStatistics.class);
19+
20+
// TODO protect each call in a try/catch block (for core features not to fail because of stats)
21+
22+
private final ConcurrentMap<String, ConnectionState> connectionState = new ConcurrentHashMap<String, ConnectionState>();
23+
24+
protected abstract void incrementConnectionCount();
25+
26+
protected abstract void decrementConnectionCount();
27+
28+
protected abstract void incrementChannelCount();
29+
30+
protected abstract void addToChannelCount(long nbChannel);
31+
32+
protected abstract void decrementChannelCount();
33+
34+
protected abstract void incrementPublishedMessageCount();
35+
36+
protected abstract void incrementConsumedMessageCount();
37+
38+
protected abstract void incrementAcknowledgedMessageCount();
39+
40+
protected abstract void incrementRejectedMessageCount();
41+
42+
protected abstract void resetConnectionCount();
43+
protected abstract void resetChannelCount();
44+
protected abstract void resetPublishedMessageCount();
45+
protected abstract void resetConsumedMessageCount();
46+
protected abstract void resetAcknowledgedMessageCount();
47+
protected abstract void resetRejectedMessageCount();
48+
49+
@Override
50+
public void newConnection(final Connection connection) {
51+
if(connection.getId() == null) {
52+
connection.setId(UUID.randomUUID().toString());
53+
}
54+
incrementConnectionCount();
55+
connectionState.put(connection.getId(), new ConnectionState(connection));
56+
connection.addShutdownListener(new ShutdownListener() {
57+
@Override
58+
public void shutdownCompleted(ShutdownSignalException cause) {
59+
closeConnection(connection);
60+
}
61+
});
62+
}
63+
64+
@Override
65+
public void closeConnection(Connection connection) {
66+
ConnectionState removed = connectionState.remove(connection.getId());
67+
if(removed != null) {
68+
decrementConnectionCount();
69+
}
70+
}
71+
72+
@Override
73+
public void newChannel(final Channel channel) {
74+
incrementChannelCount();
75+
channel.addShutdownListener(new ShutdownListener() {
76+
@Override
77+
public void shutdownCompleted(ShutdownSignalException cause) {
78+
closeChannel(channel);
79+
}
80+
});
81+
connectionState(channel.getConnection()).channelState.put(channel.getChannelNumber(), new ChannelState(channel));
82+
}
83+
84+
@Override
85+
public void closeChannel(Channel channel) {
86+
ChannelState removed = connectionState(channel.getConnection()).channelState.remove(channel.getChannelNumber());
87+
if(removed != null) {
88+
decrementChannelCount();
89+
}
90+
}
91+
92+
@Override
93+
public void basicPublish(Channel channel) {
94+
incrementPublishedMessageCount();
95+
}
96+
97+
@Override
98+
public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
99+
if(!autoAck) {
100+
ChannelState channelState = channelState(channel);
101+
channelState.lock.lock();
102+
try {
103+
channelState(channel).consumersWithManualAck.add(consumerTag);
104+
} finally {
105+
channelState.lock.unlock();
106+
}
107+
108+
}
109+
}
110+
111+
@Override
112+
public void basicCancel(Channel channel, String consumerTag) {
113+
ChannelState channelState = channelState(channel);
114+
channelState.lock.lock();
115+
try {
116+
channelState(channel).consumersWithManualAck.remove(consumerTag);
117+
} finally {
118+
channelState.lock.unlock();
119+
}
120+
}
121+
122+
@Override
123+
public void consumedMessage(Channel channel, long deliveryTag, boolean autoAck) {
124+
incrementConsumedMessageCount();
125+
if(!autoAck) {
126+
ChannelState channelState = channelState(channel);
127+
channelState.lock.lock();
128+
try {
129+
channelState(channel).unackedMessageDeliveryTags.add(deliveryTag);
130+
} finally {
131+
channelState.lock.unlock();
132+
}
133+
}
134+
}
135+
136+
@Override
137+
public void consumedMessage(Channel channel, long deliveryTag, String consumerTag) {
138+
incrementConsumedMessageCount();
139+
ChannelState channelState = channelState(channel);
140+
channelState.lock.lock();
141+
try {
142+
if(channelState.consumersWithManualAck.contains(consumerTag)) {
143+
channelState.unackedMessageDeliveryTags.add(deliveryTag);
144+
}
145+
} finally {
146+
channelState.lock.unlock();
147+
}
148+
149+
}
150+
151+
@Override
152+
public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
153+
updateChannelStateAfterAckReject(channel, deliveryTag, multiple, new Runnable() {
154+
@Override
155+
public void run() {
156+
incrementAcknowledgedMessageCount();
157+
}
158+
});
159+
}
160+
161+
@Override
162+
public void basicNack(Channel channel, long deliveryTag) {
163+
updateChannelStateAfterAckReject(channel, deliveryTag, true, new Runnable() {
164+
@Override
165+
public void run() {
166+
incrementRejectedMessageCount();
167+
}
168+
});
169+
}
170+
171+
@Override
172+
public void basicReject(Channel channel, long deliveryTag) {
173+
updateChannelStateAfterAckReject(channel, deliveryTag, false, new Runnable() {
174+
@Override
175+
public void run() {
176+
incrementRejectedMessageCount();
177+
}
178+
});
179+
}
180+
181+
private void updateChannelStateAfterAckReject(Channel channel, long deliveryTag, boolean multiple, Runnable counterAction) {
182+
ChannelState channelState = channelState(channel);
183+
channelState.lock.lock();
184+
try {
185+
if(multiple) {
186+
Iterator<Long> iterator = channelState.unackedMessageDeliveryTags.iterator();
187+
while(iterator.hasNext()) {
188+
long messageDeliveryTag = iterator.next();
189+
if(messageDeliveryTag <= deliveryTag) {
190+
iterator.remove();
191+
counterAction.run();
192+
}
193+
}
194+
} else {
195+
channelState.unackedMessageDeliveryTags.remove(deliveryTag);
196+
counterAction.run();
197+
}
198+
} finally {
199+
channelState.lock.unlock();
200+
}
201+
}
202+
203+
@Override
204+
public void clear() {
205+
resetConnectionCount();
206+
resetChannelCount();
207+
resetPublishedMessageCount();
208+
resetConsumedMessageCount();
209+
resetAcknowledgedMessageCount();
210+
resetRejectedMessageCount();
211+
212+
connectionState.clear();
213+
}
214+
215+
private ConnectionState connectionState(Connection connection) {
216+
return connectionState.get(connection.getId());
217+
}
218+
219+
private ChannelState channelState(Channel channel) {
220+
return connectionState(channel.getConnection()).channelState.get(channel.getChannelNumber());
221+
}
222+
223+
public void cleanStaleState() {
224+
try {
225+
Iterator<Map.Entry<String, ConnectionState>> connectionStateIterator = connectionState.entrySet().iterator();
226+
while(connectionStateIterator.hasNext()) {
227+
Map.Entry<String, ConnectionState> connectionEntry = connectionStateIterator.next();
228+
Connection connection = connectionEntry.getValue().connection;
229+
if(connection.isOpen()) {
230+
Iterator<Map.Entry<Integer, ChannelState>> channelStateIterator = connectionEntry.getValue().channelState.entrySet().iterator();
231+
while(channelStateIterator.hasNext()) {
232+
Map.Entry<Integer, ChannelState> channelStateEntry = channelStateIterator.next();
233+
Channel channel = channelStateEntry.getValue().channel;
234+
if(!channel.isOpen()) {
235+
channelStateIterator.remove();
236+
decrementChannelCount();
237+
LOGGER.info("Ripped off state of channel {} of connection {}. This is abnormal, please report.",
238+
channel.getChannelNumber(), connection.getId());
239+
}
240+
}
241+
} else {
242+
connectionStateIterator.remove();
243+
decrementConnectionCount();
244+
addToChannelCount(-connectionEntry.getValue().channelState.size());
245+
LOGGER.info("Ripped off state of connection {}. This is abnormal, please report.",
246+
connection.getId());
247+
}
248+
}
249+
} catch(Exception e) {
250+
LOGGER.info("Error during periodic clean of statistics: "+e.getMessage());
251+
}
252+
}
253+
254+
private static class ConnectionState {
255+
256+
final ConcurrentMap<Integer, ChannelState> channelState = new ConcurrentHashMap<Integer, ChannelState>();
257+
final Connection connection;
258+
259+
private ConnectionState(Connection connection) {
260+
this.connection = connection;
261+
}
262+
}
263+
264+
private static class ChannelState {
265+
266+
final Lock lock = new ReentrantLock();
267+
268+
final Set<Long> unackedMessageDeliveryTags = new HashSet<Long>();
269+
final Set<String> consumersWithManualAck = new HashSet<String>();
270+
271+
final Channel channel;
272+
273+
private ChannelState(Channel channel) {
274+
this.channel = channel;
275+
}
276+
277+
}
278+
279+
}

0 commit comments

Comments
 (0)
0