8000 Pre-create the shutdown Thread by YannRobert · Pull Request #86 · rabbitmq/rabbitmq-java-client · GitHub
[go: up one dir, main page]

Skip to content

Pre-create the shutdown Thread #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions src/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,18 @@
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.*;

import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;

import javax.net.SocketFactory;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;

import com.rabbitmq.client.impl.AMQConnection;
import com.rabbitmq.client.impl.ConnectionParams;
import com.rabbitmq.client.impl.DefaultExceptionHandler;
import com.rabbitmq.client.impl.FrameHandler;
import com.rabbitmq.client.impl.FrameHandlerFactory;
import com.rabbitmq.client.impl.*;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;

/**
Expand Down Expand Up @@ -95,6 +88,11 @@ public class ConnectionFactory implements Cloneable {
private SaslConfig saslConfig = DefaultSaslConfig.PLAIN;
private ExecutorService sharedExecutor;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
private final ThreadPoolExecutor shutdownThreadPoolExecutor = new ShutdownThreadPoolExecutorFactory().create(new ThreadFactory() {
public Thread newThread(Runnable r) {
return threadFactory.newThread(r);
}
});
private SocketConfigurator socketConf = new DefaultSocketConfigurator();
private ExceptionHandler exceptionHandler = new DefaultExceptionHandler();

Expand Down Expand Up @@ -631,7 +629,7 @@ public Connection newConnection(ExecutorService executor, Address[] addrs)
public ConnectionParams params(ExecutorService executor) {
return new ConnectionParams(username, password, executor, virtualHost, getClientProperties(),
requestedFrameMax, requestedChannelMax, requestedHeartbeat, shutdownTimeout, saslConfig,
networkRecoveryInterval, topologyRecovery, exceptionHandler, threadFactory);
networkRecoveryInterval, topologyRecovery, exceptionHandler, threadFactory, shutdownThreadPoolExecutor);
}

/**
Expand Down
32 changes: 21 additions & 11 deletions src/com/rabbitmq/client/impl/AMQConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.*;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AuthenticationFailureException;
Expand Down Expand Up @@ -67,7 +63,13 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
public static final int HANDSHAKE_TIMEOUT = 10000;
private final ExecutorService executor;
private Thread mainLoopThread;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
private ThreadFactory threadFactory;
private final ThreadPoolExecutor shutdownThreadPoolExecutor;
private ChannelManagerFactory channelManagerFactory = new ChannelManagerFactory() {
public ChannelManager instantiateChannelManager(int channelMax) {
return new ChannelManager(_workService, channelMax, getShutdownThreadPoolExecutor(), new DefaultChannelNFactory());
}
};

/**
* Retrieve a copy of the default table of client properties that
Expand Down Expand Up @@ -222,6 +224,7 @@ public AMQConnection(ConnectionParams params, FrameHandler frameHandler)
this.saslConfig = params.getSaslConfig();
this.executor = params.getExecutor();
this.threadFactory = params.getThreadFactory();
this.shutdownThreadPoolExecutor = params.getShutdownThreadPoolExecutor();

this._channelManager = null;

Expand Down Expand Up @@ -357,7 +360,7 @@ public void start()
int channelMax =
negotiateChannelMax(this.requestedChannelMax,
connTune.getChannelMax());
_channelManager = instantiateChannelManager(channelMax, threadFactory);
_channelManager = channelManagerFactory.instantiateChannelManager(channelMax);

int frameMax =
negotiatedMaxValue(this.requestedFrameMax,
Expand Down Expand Up @@ -394,10 +397,6 @@ public void start()
return;
}

protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
return new ChannelManager(this._workService, channelMax, threadFactory);
}

/**
* Private API, allows for easier simulation of bogus clients.
*/
Expand Down Expand Up @@ -464,6 +463,17 @@ public ThreadFactory getThreadFactory() {
return threadFactory;
}

/**
* @return Thread factory used by this connection.
*/
public ThreadPoolExecutor getShutdownThreadPoolExecutor() {
return shutdownThreadPoolExecutor;
}

protected void setChannelManagerFactory(ChannelManagerFactory channelManagerFactory) {
this.channelManagerFactory = channelManagerFactory;
}

public Map<String, Object> getClientProperties() {
return new HashMap<String, Object>(_clientProperties);
}
Expand Down
28 changes: 11 additions & 17 deletions src/com/rabbitmq/client/impl/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.IntAllocator;
Expand All @@ -40,22 +37,19 @@ public class ChannelManager {
private final IntAllocator channelNumberAllocator;

private final ConsumerWorkService workService;
private final ChannelNFactory channelNFactory;

private final Set<CountDownLatch> shutdownSet = new HashSet<CountDownLatch>();

/** Maximum channel number available on this connection. */
private final int _channelMax;
private final ThreadFactory threadFactory;
private final ThreadPoolExecutor shutdownThreadPoolExecutor;

public int getChannelMax(){
return _channelMax;
}

public ChannelManager(ConsumerWorkService workService, int channelMax) {
this(workService, channelMax, Executors.defaultThreadFactory());
}

public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) {
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadPoolExecutor shutdownThreadPoolExecutor, ChannelNFactory channelNFactory) {
if (channelMax == 0) {
// The framing encoding only allows for unsigned 16-bit integers
// for the channel number
Expand All @@ -64,8 +58,11 @@ public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFac
_channelMax = channelMax;
channelNumberAllocator = new IntAllocator(1, channelMax);

shutdownThreadPoolExecutor.prestartAllCoreThreads();

this.workService = workService;
this.threadFactory = threadFactory;
this.shutdownThreadPoolExecutor = shutdownThreadPoolExecutor;
this.channelNFactory = channelNFactory;
}

/**
Expand Down Expand Up @@ -117,8 +114,7 @@ public void run() {
ssWorkService.shutdown();
}
};
Thread shutdownThread = Environment.newThread(threadFactory, target, "ConsumerWorkService shutdown monitor", true);
shutdownThread.start();
shutdownThreadPoolExecutor.execute(target);
}

public ChannelN createChannel(AMQConnection connection) throws IOException {
Expand Down Expand Up @@ -158,14 +154,12 @@ private ChannelN addNewChannel(AMQConnection connection, int channelNumber) thro
+ "use. This should never happen. "
+ "Please report this as a bug.");
}
ChannelN ch = instantiateChannel(connection, channelNumber, this.workService);
ChannelN ch = channelNFactory.instanciate(connection, channelNumber, this.workService);
_channelMap.put(ch.getChannelNumber(), ch);
return ch;
}

protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
return new ChannelN(connection, channelNumber, workService);
}


/**
* Remove the channel from the channel map and free the number for re-use.
Expand Down
7 changes: 7 additions & 0 deletions src/com/rabbitmq/client/impl/ChannelManagerFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.rabbitmq.client.impl;

public interface ChannelManagerFactory {

ChannelManager instantiateChannelManager(int channelMax);

}
7 changes: 7 additions & 0 deletions src/com/rabbitmq/client/impl/ChannelNFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.rabbitmq.client.impl;

public interface ChannelNFactory {

ChannelN instanciate(AMQConnection connection, int channelNumber, ConsumerWorkService workService);

}
9 changes: 8 additions & 1 deletion src/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

public class ConnectionParams {
private final String username;
Expand All @@ -23,6 +24,7 @@ public class ConnectionParams {

private final ExceptionHandler exceptionHandler;
private final ThreadFactory threadFactory;
private final ThreadPoolExecutor shutdownThreadPoolExecutor;

/**
* @param username name used to establish connection
Expand All @@ -43,7 +45,7 @@ public ConnectionParams(String username, String password, ExecutorService execut
String virtualHost, Map<String, Object> clientProperties,
int requestedFrameMax, int requestedChannelMax, int requestedHeartbeat,
int shutdownTimeout, SaslConfig saslConfig, long networkRecoveryInterval,
boolean topologyRecovery, ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {
boolean topologyRecovery, ExceptionHandler exceptionHandler, ThreadFactory threadFactory, ThreadPoolExecutor shutdownThreadPoolExecutor) {
this.username = username;
this.password = password;
this.executor = executor;
Expand All @@ -58,6 +60,7 @@ public ConnectionParams(String username, String password, ExecutorService execut
this.topologyRecovery = topologyRecovery;
this.exceptionHandler = exceptionHandler;
this.threadFactory = threadFactory;
this.shutdownThreadPoolExecutor = shutdownThreadPoolExecutor;
}

public String getUsername() {
Expand Down Expand Up @@ -115,4 +118,8 @@ public boolean isTopologyRecoveryEnabled() {
public ThreadFactory getThreadFactory() {
return threadFactory;
}

public ThreadPoolExecutor getShutdownThreadPoolExecutor() {
return shutdownThreadPoolExecutor;
}
}
7 changes: 7 additions & 0 deletions src/com/rabbitmq/client/impl/DefaultChannelNFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.rabbitmq.client.impl;

public class DefaultChannelNFactory implements ChannelNFactory {
public ChannelN instanciate(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
return new ChannelN(connection, channelNumber, workService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.rabbitmq.client.impl;

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;

public class ShutdownThreadPoolExecutorFactory {

public ThreadPoolExecutor create(final ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new IllegalArgumentException();
}
return new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
public Thread newThread(Runnable r) {
return Environment.newThread(threadFactory, r, "ConsumerWorkService shutdown monitor", true);
}
});
}

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
package com.rabbitmq.client.impl.recovery;

import com.rabbitmq.client.impl.AMQConnection;
import com.rabbitmq.client.impl.ConnectionParams;
import com.rabbitmq.client.impl.FrameHandler;

import java.util.concurrent.ThreadFactory;
import com.rabbitmq.client.impl.*;

/**
* {@link com.rabbitmq.client.impl.AMQConnection} modification that uses {@link com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN}
*
* @since 3.3.0
*/
public class RecoveryAwareAMQConnection extends AMQConnection {
public RecoveryAwareAMQConnection(ConnectionParams params, FrameHandler handler) {
super(params, handler);
}

@Override
protected RecoveryAwareChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
return new RecoveryAwareChannelManager(super._workService, channelMax, threadFactory);
setChannelManagerFactory(new ChannelManagerFactory() {
public ChannelManager instantiateChannelManager(int channelMax) {
return new ChannelManager(_workService, channelMax, getShutdownThreadPoolExecutor(), new RecoveryAwareChannelNFactory());
}
});
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.rabbitmq.client.impl.recovery;

import com.rabbitmq.client.impl.AMQConnection;
import com.rabbitmq.client.impl.ChannelN;
import com.rabbitmq.client.impl.ChannelNFactory;
import com.rabbitmq.client.impl.ConsumerWorkService;

public class RecoveryAwareChannelNFactory implements ChannelNFactory {
public ChannelN instanciate(AMQConnection connection, int channelNumber, ConsumerWorkService workService) {
return new RecoveryAwareChannelN(connection, channelNumber, workService);
}
}
Loading
0