diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index d862fd2..3c2dd89 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -1,31 +1,7 @@ -# Which issue does this PR close? +## Rationale -Closes # -# Rationale for this change - - +## Detailed Changes -# What changes are included in this PR? - - -# Are there any user-facing changes? - - - -# How does this change test - - +## Test Plan diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..4a69db2 --- /dev/null +++ b/Makefile @@ -0,0 +1,3 @@ + +fmt: + mvn formatter:format diff --git a/ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java b/ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java index c2f54ba..80946c9 100644 --- a/ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java +++ b/ceresdb-grpc/src/main/java/io/ceresdb/rpc/GrpcClient.java @@ -5,6 +5,7 @@ import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -68,7 +69,6 @@ /** * Grpc client implementation. - * */ public class GrpcClient implements RpcClient { @@ -596,11 +596,29 @@ private Channel getCheckedChannel(final Endpoint endpoint, final Consumer maxAge + backoff) { + ch.shutdown(); + IdChannel newChannel = this.newChannel(endpoint); + this.managedChannelPool.put(endpoint, newChannel); + return newChannel; + } + } + + return ch; } private IdChannel newChannel(final Endpoint endpoint) { diff --git a/ceresdb-grpc/src/main/java/io/ceresdb/rpc/IdChannel.java b/ceresdb-grpc/src/main/java/io/ceresdb/rpc/IdChannel.java index 75bd204..3d3067b 100644 --- a/ceresdb-grpc/src/main/java/io/ceresdb/rpc/IdChannel.java +++ b/ceresdb-grpc/src/main/java/io/ceresdb/rpc/IdChannel.java @@ -14,11 +14,11 @@ /** * A managed channel that has a channel id. - * */ public class IdChannel extends ManagedChannel { - private static final AtomicLong ID_ALLOC = new AtomicLong(); + private static final AtomicLong ID_ALLOC = new AtomicLong(); + private final long createTime = System.currentTimeMillis(); private final long channelId; private final ManagedChannel channel; @@ -32,6 +32,10 @@ public IdChannel(ManagedChannel channel) { this.channel = channel; } + public long getCreateTime() { + return createTime; + } + public long getChannelId() { return channelId; } @@ -98,9 +102,7 @@ public void enterIdle() { @Override public String toString() { - return "IdChannel{" + // - "channelId=" + channelId + // - ", channel=" + channel + // - '}'; + return "IdChannel{" + "createTime=" + createTime + ", channelId=" + channelId + '}'; } + } diff --git a/ceresdb-rpc/src/main/java/io/ceresdb/rpc/RpcOptions.java b/ceresdb-rpc/src/main/java/io/ceresdb/rpc/RpcOptions.java index e32ed2b..20cf072 100644 --- a/ceresdb-rpc/src/main/java/io/ceresdb/rpc/RpcOptions.java +++ b/ceresdb-rpc/src/main/java/io/ceresdb/rpc/RpcOptions.java @@ -91,6 +91,11 @@ public class RpcOptions implements Copiable { private boolean logOnLimitChange = true; + /** + * Max time in milliseconds a connection can live, 0 means forever. + */ + private long connectionMaxAgeMs = 0; + public int getDefaultRpcTimeout() { return defaultRpcTimeout; } @@ -99,6 +104,14 @@ public void setDefaultRpcTimeout(int defaultRpcTimeout) { this.defaultRpcTimeout = defaultRpcTimeout; } + public long getConnectionMaxAgeMs() { + return connectionMaxAgeMs; + } + + public void setConnectionMaxAgeMs(long connectionMaxAgeMs) { + this.connectionMaxAgeMs = connectionMaxAgeMs; + } + public int getRpcThreadPoolSize() { return rpcThreadPoolSize; } @@ -238,29 +251,20 @@ public RpcOptions copy() { opts.smoothing = this.smoothing; opts.blockOnLimit = this.blockOnLimit; opts.logOnLimitChange = this.logOnLimitChange; + opts.connectionMaxAgeMs = this.connectionMaxAgeMs; return opts; } @Override public String toString() { - return "RpcOptions{" + // - "defaultRpcTimeout=" + defaultRpcTimeout + // - ", rpcThreadPoolSize=" + rpcThreadPoolSize + // - ", rpcThreadPoolQueueSize=" + rpcThreadPoolQueueSize + // - ", maxInboundMessageSize=" + maxInboundMessageSize + // - ", flowControlWindow=" + flowControlWindow + // - ", idleTimeoutSeconds=" + idleTimeoutSeconds + // - ", keepAliveTimeSeconds=" + keepAliveTimeSeconds + // - ", keepAliveTimeoutSeconds=" + keepAliveTimeoutSeconds + // - ", keepAliveWithoutCalls=" + keepAliveWithoutCalls + // - ", limitKind=" + limitKind + // - ", initialLimit=" + initialLimit + // - ", maxLimit=" + maxLimit + // - ", longRttWindow=" + longRttWindow + // - ", smoothing=" + smoothing + // - ", blockOnLimit=" + blockOnLimit + // - ", logOnLimitChange=" + logOnLimitChange + // - '}'; + return "RpcOptions{" + "defaultRpcTimeout=" + defaultRpcTimeout + ", rpcThreadPoolSize=" + rpcThreadPoolSize + + ", rpcThreadPoolQueueSize=" + rpcThreadPoolQueueSize + ", maxInboundMessageSize=" + + maxInboundMessageSize + ", flowControlWindow=" + flowControlWindow + ", idleTimeoutSeconds=" + + idleTimeoutSeconds + ", keepAliveTimeSeconds=" + keepAliveTimeSeconds + ", keepAliveTimeoutSeconds=" + + keepAliveTimeoutSeconds + ", keepAliveWithoutCalls=" + keepAliveWithoutCalls + ", limitKind=" + + limitKind + ", initialLimit=" + initialLimit + ", maxLimit=" + maxLimit + ", longRttWindow=" + + longRttWindow + ", smoothing=" + smoothing + ", blockOnLimit=" + blockOnLimit + ", logOnLimitChange=" + + logOnLimitChange + ", connectionMaxAge=" + connectionMaxAgeMs + '}'; } public static RpcOptions newDefault() { diff --git a/pom.xml b/pom.xml index 5fd8430..b29053f 100644 --- a/pom.xml +++ b/pom.xml @@ -77,7 +77,7 @@ 1.0.0 3.21.7 - 1.0.3 + 1.0.4 1.7.21