8000 Change each StreamingSubscriberConnection to have its own executor by default. by dpcollins-google · Pull Request #4622 · googleapis/google-cloud-java · GitHub
[go: up one dir, main page]

Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ public class Subscriber extends AbstractApiService {
private final String subscriptionName;
private final FlowControlSettings flowControlSettings;
private final Duration maxAckExtensionPeriod;
private final ScheduledExecutorService executor;
// The ExecutorProvider used to generate executors for processing messages.
private final ExecutorProvider executorProvider;
// An instantiation of the SystemExecutorProvider used for processing acks
// and other system actions.
@Nullable private final ScheduledExecutorService alarmsExecutor;
private final Distribution ackLatencyDistribution =
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
Expand Down Expand Up @@ -132,16 +135,7 @@ private Subscriber(Builder builder) {

this.numPullers = builder.parallelPullCount;

executor = builder.executorProvider.getExecutor();
if (builder.executorProvider.shouldAutoClose()) {
closeables.add(
new AutoCloseable() {
@Override
public void close() throws IOException {
executor.shutdown();
}
});
}
executorProvider = builder.executorProvider;

ExecutorProvider systemExecutorProvider = builder.systemExecutorProvider;
if (systemExecutorProvider == null) {
Expand Down Expand Up @@ -322,6 +316,17 @@ public void run() {
private void startStreamingConnections() {
synchronized (streamingSubscriberConnections) {
for (int i = 0; i < numPullers; i++) {
final ScheduledExecutorService executor = executorProvider.getExecutor();
if (executorProvider.shouldAutoClose()) {
closeables.add(
new AutoCloseable() {
@Override
public void close() {
executor.shutdown();
}
});
}

streamingSubscriberConnections.add(
new StreamingSubscriberConnection(
subscriptionName,
Expand Down Expand Up @@ -364,7 +369,7 @@ private void stopAllStreamingConnections() {
private void startConnections(
List<? extends ApiService> connections, final ApiService.Listener connectionsListener) {
for (ApiService subscriber : connections) {
subscriber.addListener(connectionsListener, executor);
subscriber.addListener(connectionsListener, alarmsExecutor);
subscriber.startAsync();
}
for (ApiService subscriber : connections) {
Expand Down Expand Up @@ -398,8 +403,7 @@ public static final class Builder {

static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(
THREADS_PER_CHANNEL * Runtime.getRuntime().availableProcessors())
.setExecutorThreadCount(THREADS_PER_CHANNEL)
.build();

String subscriptionName;
Expand Down Expand Up @@ -502,7 +506,10 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) {
return this;
}

/** Gives the ability to set a custom executor. */
/**
* Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be
* called {@link Builder#parallelPullCount} times.
*/
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
return this;
Expand Down
0