queue = new ConcurrentHashMap<>();
- private final ScheduledExecutorService timer = ExecutorManager.INSTANCE.getHttpRequestThread();
+ private final ScheduledExecutorService timer = ExecutorManager.INSTANCE.getTimerThread();
private final Executor executor = ExecutorManager.INSTANCE.getDispatchThread();
//private final HttpResponse response404 = new HttpResponse(HTTP_NOT_FOUND, "Not Found", null, null);
private final HttpResponse response408 = new HttpResponse(HTTP_CLIENT_TIMEOUT, "Request Timeout", null, null);
diff --git a/src/main/java/com/mpush/client/MPushClient.java b/src/main/java/com/mpush/client/MPushClient.java
index 51c4640..c8fe172 100644
--- a/src/main/java/com/mpush/client/MPushClient.java
+++ b/src/main/java/com/mpush/client/MPushClient.java
@@ -31,7 +31,6 @@
import com.mpush.api.protocol.Command;
import com.mpush.api.protocol.Packet;
import com.mpush.api.push.PushContext;
-import com.mpush.handler.AckHandler;
import com.mpush.handler.HttpProxyHandler;
import com.mpush.message.*;
import com.mpush.security.AesCipher;
@@ -41,6 +40,8 @@
import com.mpush.util.thread.ExecutorManager;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static com.mpush.api.Constants.MAX_HB_TIMEOUT_COUNT;
@@ -114,10 +115,47 @@ public boolean isRunning() {
return clientState.get() == State.Started && connection.isConnected();
}
+ /**
+ * 这个方法主要用于解决网络十分不稳定的场景:
+ * 正常情况如果网络断开,就应该关闭连接,反之则应去建立连接
+ * 但是在网络抖动厉害时就会发生连接频繁的建立/断开。
+ *
+ * 处理这种场景的其中一个方案是:
+ * 当网络断开时不主动关闭连接,而是尝试发送一次心跳检测,
+ * 如果能收到响应,说明网络短时间内又恢复了,
+ * 否则就断开连接,等待网络恢复并重建连接。
+ *
+ * @param isConnected true/false
+ */
@Override
public void onNetStateChange(boolean isConnected) {
connection.setAutoConnect(isConnected);
- if (isConnected) connection.connect();
+
+ if (isConnected) { //当有网络时,去尝试重连
+ connection.connect();
+ } else if (connection.isConnected()) { //无网络,如果连接没有断开,尝试发送一次心跳检测,用于快速校验网络状况
+ connection.resetTimeout();//心跳检测前,重置上次读写数据包的时间戳
+ hbTimeoutTimes = MAX_HB_TIMEOUT_COUNT - 2;//总共要调用两次healthCheck,第一次用于发送心跳,第二次用于检测是否超时
+ final ScheduledExecutorService timer = ExecutorManager.INSTANCE.getTimerThread();
+
+ //隔3s后发送一次心跳检测,看能不能收到服务端的响应
+ timer.schedule(new Runnable() {
+ int checkCount = 0;
+
+ @Override
+ public void run() {
+ //如果期间连接状态发生变化,取消任务
+ if (connection.isAutoConnect() || !connection.isConnected()) return;
+
+ if (++checkCount <= 2) {
+ if (healthCheck() && checkCount < 2) {
+ timer.schedule(this, 3, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ }, 3, TimeUnit.SECONDS);
+ }
}
@Override
diff --git a/src/main/java/com/mpush/client/TcpConnection.java b/src/main/java/com/mpush/client/TcpConnection.java
index 724996e..e70af9f 100644
--- a/src/main/java/com/mpush/client/TcpConnection.java
+++ b/src/main/java/com/mpush/client/TcpConnection.java
@@ -261,6 +261,16 @@ public boolean isReadTimeout() {
return System.currentTimeMillis() - lastReadTime > context.heartbeat + 1000;
}
+ @Override
+ public void resetTimeout() {
+ lastReadTime = lastWriteTime = 0;
+ }
+
+ @Override
+ public boolean isAutoConnect() {
+ return autoConnect;
+ }
+
@Override
public SessionContext getSessionContext() {
return context;
diff --git a/src/main/java/com/mpush/util/thread/ExecutorManager.java b/src/main/java/com/mpush/util/thread/ExecutorManager.java
index 124b498..7b38a71 100644
--- a/src/main/java/com/mpush/util/thread/ExecutorManager.java
+++ b/src/main/java/com/mpush/util/thread/ExecutorManager.java
@@ -40,12 +40,12 @@ public final class ExecutorManager {
public static final String READ_THREAD_NAME = THREAD_NAME_PREFIX + "read-t";
public static final String DISPATCH_THREAD_NAME = THREAD_NAME_PREFIX + "dispatch-t";
public static final String START_THREAD_NAME = THREAD_NAME_PREFIX + "start-t";
- public static final String HTTP_THREAD_NAME = THREAD_NAME_PREFIX + "http-t";
+ public static final String TIMER_THREAD_NAME = THREAD_NAME_PREFIX + "timer-t";
public static final ExecutorManager INSTANCE = new ExecutorManager();
private ThreadPoolExecutor writeThread;
private ThreadPoolExecutor dispatchThread;
private ThreadPoolExecutor startThread;
- private ScheduledExecutorService httpRequestThread;
+ private ScheduledExecutorService timerThread;
public ThreadPoolExecutor getWriteThread() {
if (writeThread == null || writeThread.isShutdown()) {
@@ -80,13 +80,13 @@ public ThreadPoolExecutor getStartThread() {
return startThread;
}
- public ScheduledExecutorService getHttpRequestThread() {
- if (httpRequestThread == null || httpRequestThread.isShutdown()) {
- httpRequestThread = new ScheduledThreadPoolExecutor(1,
- new NamedThreadFactory(HTTP_THREAD_NAME),
+ public ScheduledExecutorService getTimerThread() {
+ if (timerThread == null || timerThread.isShutdown()) {
+ timerThread = new ScheduledThreadPoolExecutor(1,
+ new NamedThreadFactory(TIMER_THREAD_NAME),
new RejectedHandler());
}
- return httpRequestThread;
+ return timerThread;
}
public synchronized void shutdown() {
@@ -103,9 +103,9 @@ public synchronized void shutdown() {
startThread = null;
}
- if (httpRequestThread != null) {
- httpRequestThread.shutdownNow();
- httpRequestThread = null;
+ if (timerThread != null) {
+ timerThread.shutdownNow();
+ timerThread = null;
}
}
diff --git a/src/test/java/com/mpush/client/MPushClientTest.java b/src/test/java/com/mpush/client/MPushClientTest.java
index 414c98e..5f32514 100644
--- a/src/test/java/com/mpush/client/MPushClientTest.java
+++ b/src/test/java/com/mpush/client/MPushClientTest.java
@@ -106,7 +106,7 @@ public void onHandshakeOk(final Client client, final int heartbeat) {
public void run() {
client.healthCheck();
}
- }, heartbeat, heartbeat, TimeUnit.MILLISECONDS);
+ }, 10, 10, TimeUnit.SECONDS);
//client.push(PushContext.build("test"));
From c8f289b722c5af200d24a5c9de1f285fa76f6691 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Fri, 2 Dec 2016 11:14:23 +0800
Subject: [PATCH 39/53] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=AE=BE=E7=BD=AE?=
=?UTF-8?q?=E7=BD=91=E7=BB=9C=E5=8F=98=E5=8C=96=E6=8E=A5=E5=8F=A3,=20?=
=?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BD=91=E7=BB=9C=E4=B8=8D=E7=A8=B3=E5=AE=9A?=
=?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/main/java/com/mpush/client/MPushClient.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/src/main/java/com/mpush/client/MPushClient.java b/src/main/java/com/mpush/client/MPushClient.java
index c8fe172..d14d8de 100644
--- a/src/main/java/com/mpush/client/MPushClient.java
+++ b/src/main/java/com/mpush/client/MPushClient.java
@@ -130,7 +130,7 @@ public boolean isRunning() {
@Override
public void onNetStateChange(boolean isConnected) {
connection.setAutoConnect(isConnected);
-
+ logger.i("network state change, isConnected=%b, connection=%s", isConnected, connection);
if (isConnected) { //当有网络时,去尝试重连
connection.connect();
} else if (connection.isConnected()) { //无网络,如果连接没有断开,尝试发送一次心跳检测,用于快速校验网络状况
@@ -144,6 +144,7 @@ public void onNetStateChange(boolean isConnected) {
@Override
public void run() {
+ logger.w("network disconnected, try test tcp connection checkCount=%d, connection=%s", checkCount, connection);
//如果期间连接状态发生变化,取消任务
if (connection.isAutoConnect() || !connection.isConnected()) return;
From d002e1d187cabb9e2e13c5693ea18185b0e33fd1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Fri, 2 Dec 2016 11:20:07 +0800
Subject: [PATCH 40/53] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=AE=BE=E7=BD=AE?=
=?UTF-8?q?=E7=BD=91=E7=BB=9C=E5=8F=98=E5=8C=96=E6=8E=A5=E5=8F=A3,=20?=
=?UTF-8?q?=E4=BC=98=E5=8C=96=E7=BD=91=E7=BB=9C=E4=B8=8D=E7=A8=B3=E5=AE=9A?=
=?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/main/java/com/mpush/client/TcpConnection.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git a/src/main/java/com/mpush/client/TcpConnection.java b/src/main/java/com/mpush/client/TcpConnection.java
index e70af9f..63a5ad1 100644
--- a/src/main/java/com/mpush/client/TcpConnection.java
+++ b/src/main/java/com/mpush/client/TcpConnection.java
@@ -283,7 +283,14 @@ public boolean isWriteTimeout() {
@Override
public String toString() {
- return "TcpConnection{" + ", lastReadTime=" + lastReadTime + ", lastWriteTime=" + lastWriteTime + ", context="
- + context + '}';
+ return "TcpConnection{" +
+ "state=" + state +
+ ", channel=" + channel +
+ ", lastReadTime=" + lastReadTime +
+ ", lastWriteTime=" + lastWriteTime +
+ ", totalReconnectCount=" + totalReconnectCount +
+ ", reconnectCount=" + reconnectCount +
+ ", autoConnect=" + autoConnect +
+ '}';
}
}
From e0394647360be48b78de151427e9eda8e6eb1c93 Mon Sep 17 00:00:00 2001
From: maksim <154536744@qq.com>
Date: Wed, 14 Dec 2016 13:39:19 +0800
Subject: [PATCH 41/53] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E6=97=A0=E7=94=A8?=
=?UTF-8?q?=E7=9A=84keep-alive?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/main/java/com/mpush/client/TcpConnection.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/java/com/mpush/client/TcpConnection.java b/src/main/java/com/mpush/client/TcpConnection.java
index 63a5ad1..2252616 100644
--- a/src/main/java/com/mpush/client/TcpConnection.java
+++ b/src/main/java/com/mpush/client/TcpConnection.java
@@ -38,6 +38,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
+import static com.mpush.api.Constants.DEFAULT_SO_TIMEOUT;
import static com.mpush.api.Constants.MAX_RESTART_COUNT;
import static com.mpush.api.Constants.MAX_TOTAL_RESTART_COUNT;
import static com.mpush.client.TcpConnection.State.*;
@@ -203,7 +204,6 @@ private boolean doConnect(String host, int port) {
try {
channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
- channel.socket().setKeepAlive(true);
channel.connect(new InetSocketAddress(host, port));
logger.w("connect server ok [%s:%s]", host, port);
onConnected(channel);
From 9990aecd72cc37abdbdbea7f93fdb622c1a222a4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Wed, 14 Dec 2016 13:40:21 +0800
Subject: [PATCH 42/53] =?UTF-8?q?=E8=AE=BE=E7=BD=AE=E8=B6=85=E6=97=B6?=
=?UTF-8?q?=E6=97=B6=E9=97=B4=E5=B8=B8=E9=87=8F?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/main/java/com/mpush/api/Constants.java | 2 ++
src/main/java/com/mpush/client/AllotClient.java | 6 ++++--
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git a/src/main/java/com/mpush/api/Constants.java b/src/main/java/com/mpush/api/Constants.java
index 402aa4e..a7f5332 100644
--- a/src/main/java/com/mpush/api/Constants.java
+++ b/src/main/java/com/mpush/api/Constants.java
@@ -30,6 +30,8 @@
public interface Constants {
Charset UTF_8 = Charset.forName("UTF-8");
+ int DEFAULT_SO_TIMEOUT = 1000 * 3;//客户端连接超时时间
+
int DEFAULT_WRITE_TIMEOUT = 1000 * 10;//10s默认packet写超时
byte[] EMPTY_BYTES = new byte[0];
diff --git a/src/main/java/com/mpush/client/AllotClient.java b/src/main/java/com/mpush/client/AllotClient.java
index 7aadc48..c65ddcc 100644
--- a/src/main/java/com/mpush/client/AllotClient.java
+++ b/src/main/java/com/mpush/client/AllotClient.java
@@ -35,6 +35,8 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import static com.mpush.api.Constants.DEFAULT_SO_TIMEOUT;
+
/**
* Created by yxx on 2016/6/8.
*
@@ -65,8 +67,8 @@ public List queryServerAddressList() {
try {
URL url = new URL(config.getAllotServer());
connection = (HttpURLConnection) url.openConnection();
- connection.setConnectTimeout(3000);
- connection.setReadTimeout(3000);
+ connection.setConnectTimeout(DEFAULT_SO_TIMEOUT);
+ connection.setReadTimeout(DEFAULT_SO_TIMEOUT);
connection.setUseCaches(false);
connection.setDoInput(true);
connection.setDoOutput(false);
From f6392e92823bd795ee4c90ba31af9ae94b07231b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Wed, 28 Dec 2016 19:25:52 +0800
Subject: [PATCH 43/53] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../java/com/mpush/message/BaseMessage.java | 6 +++---
.../mpush/util/thread/ExecutorManager.java | 19 +------------------
.../com/mpush/client/MPushClientTest.java | 1 -
3 files changed, 4 insertions(+), 22 deletions(-)
diff --git a/src/main/java/com/mpush/message/BaseMessage.java b/src/main/java/com/mpush/message/BaseMessage.java
index ae5786c..31545f1 100644
--- a/src/main/java/com/mpush/message/BaseMessage.java
+++ b/src/main/java/com/mpush/message/BaseMessage.java
@@ -35,12 +35,12 @@
* @author ohun@live.cn (夜色)
*/
public abstract class BaseMessage implements Message {
- public static final int STATUS_DECODED = 1;
- public static final int STATUS_ENCODED = 2;
+ public static final byte STATUS_DECODED = 1;
+ public static final byte STATUS_ENCODED = 2;
private static final AtomicInteger SID_SEQ = new AtomicInteger();
protected final Packet packet;
protected final Connection connection;
- protected int status = 0;
+ protected byte status = 0;
public BaseMessage(Packet packet, Connection connection) {
this.packet = packet;
diff --git a/src/main/java/com/mpush/util/thread/ExecutorManager.java b/src/main/java/com/mpush/util/thread/ExecutorManager.java
index 7b38a71..f122863 100644
--- a/src/main/java/com/mpush/util/thread/ExecutorManager.java
+++ b/src/main/java/com/mpush/util/thread/ExecutorManager.java
@@ -44,7 +44,6 @@ public final class ExecutorManager {
public static final ExecutorManager INSTANCE = new ExecutorManager();
private ThreadPoolExecutor writeThread;
private ThreadPoolExecutor dispatchThread;
- private ThreadPoolExecutor startThread;
private ScheduledExecutorService timerThread;
public ThreadPoolExecutor getWriteThread() {
@@ -69,17 +68,6 @@ public ThreadPoolExecutor getDispatchThread() {
return dispatchThread;
}
- public ThreadPoolExecutor getStartThread() {
- if (startThread == null || startThread.isShutdown()) {
- startThread = new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue(1),
- new NamedThreadFactory(START_THREAD_NAME),
- new RejectedHandler());
- }
- return startThread;
- }
-
public ScheduledExecutorService getTimerThread() {
if (timerThread == null || timerThread.isShutdown()) {
timerThread = new ScheduledThreadPoolExecutor(1,
@@ -98,11 +86,6 @@ public synchronized void shutdown() {
dispatchThread.shutdownNow();
dispatchThread = null;
}
- if (startThread != null) {
- startThread.shutdownNow();
- startThread = null;
-
- }
if (timerThread != null) {
timerThread.shutdownNow();
timerThread = null;
@@ -117,7 +100,7 @@ private static class RejectedHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- ClientConfig.I.getLogger().w("a task was rejected execute=%s", executor);
+ ClientConfig.I.getLogger().w("a task was rejected r=%s", r);
}
}
}
diff --git a/src/test/java/com/mpush/client/MPushClientTest.java b/src/test/java/com/mpush/client/MPushClientTest.java
index 5f32514..d9dae12 100644
--- a/src/test/java/com/mpush/client/MPushClientTest.java
+++ b/src/test/java/com/mpush/client/MPushClientTest.java
@@ -22,7 +22,6 @@
import com.mpush.api.Client;
import com.mpush.api.ClientListener;
-import com.mpush.api.push.PushContext;
import com.mpush.util.DefaultLogger;
import java.util.concurrent.Executors;
From cd688a173a82c9860c7c199db59a0c729e7923c4 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Fri, 20 Jan 2017 16:43:44 +0800
Subject: [PATCH 44/53] =?UTF-8?q?=E4=BF=AE=E6=94=B9=20alloc?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/test/java/com/mpush/client/MPushClientTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/test/java/com/mpush/client/MPushClientTest.java b/src/test/java/com/mpush/client/MPushClientTest.java
index d9dae12..536da60 100644
--- a/src/test/java/com/mpush/client/MPushClientTest.java
+++ b/src/test/java/com/mpush/client/MPushClientTest.java
@@ -35,7 +35,7 @@
*/
public class MPushClientTest {
private static final String publicKey = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCghPCWCobG8nTD24juwSVataW7iViRxcTkey/B792VZEhuHjQvA3cAJgx2Lv8GnX8NIoShZtoCg3Cx6ecs+VEPD2fBcg2L4JK7xldGpOJ3ONEAyVsLOttXZtNXvyDZRijiErQALMTorcgi79M5uVX9/jMv2Ggb2XAeZhlLD28fHwIDAQAB";
- private static final String allocServer = "http://103.246.161.44:9999/";
+ private static final String allocServer = "http://103.60.220.145:9999/";
public static void main(String[] args) throws Exception {
int count = 1;
From 47df46732da2603bb5b92266686b6159ed64a4b5 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Thu, 23 Feb 2017 16:11:16 +0800
Subject: [PATCH 45/53] =?UTF-8?q?=E4=BF=AE=E6=94=B9=20alloc?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/main/java/com/mpush/client/MessageDispatcher.java | 5 +++--
src/main/java/com/mpush/handler/AckHandler.java | 4 ++--
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/src/main/java/com/mpush/client/MessageDispatcher.java b/src/main/java/com/mpush/client/MessageDispatcher.java
index 8995d24..8e26786 100644
--- a/src/main/java/com/mpush/client/MessageDispatcher.java
+++ b/src/main/java/com/mpush/client/MessageDispatcher.java
@@ -52,6 +52,7 @@ public MessageDispatcher() {
register(Command.OK, new OkMessageHandler());
register(Command.ERROR, new ErrorMessageHandler());
register(Command.PUSH, new PushMessageHandler());
+ register(Command.ACK, new AckHandler());
this.ackRequestMgr = AckRequestMgr.I();
}
@@ -76,8 +77,8 @@ public void run() {
}
});
} else {
- logger.w("<<< receive unsupported message, do reconnect, packet=%s", packet);
- connection.reconnect();
+ logger.w("<<< receive unsupported message, packet=%s", packet);
+ //connection.reconnect();
}
}
diff --git a/src/main/java/com/mpush/handler/AckHandler.java b/src/main/java/com/mpush/handler/AckHandler.java
index bff2794..9e946e9 100644
--- a/src/main/java/com/mpush/handler/AckHandler.java
+++ b/src/main/java/com/mpush/handler/AckHandler.java
@@ -48,9 +48,9 @@ public AckMessage decode(Packet packet, Connection connection) {
@Override
public void handle(AckMessage message) {
- AckRequestMgr.RequestTask task = ackRequestMgr.getAndRemove(message.getSessionId());
+ /*AckRequestMgr.RequestTask task = ackRequestMgr.getAndRemove(message.getSessionId());
if (task != null) {
task.success(message.getPacket());
- }
+ }*/
}
}
From 72904f75fb30b58d3e3b768059ce3c7f2826f5a9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Thu, 23 Feb 2017 16:12:13 +0800
Subject: [PATCH 46/53] register ack handler
---
src/main/java/com/mpush/client/MessageDispatcher.java | 1 +
1 file changed, 1 insertion(+)
diff --git a/src/main/java/com/mpush/client/MessageDispatcher.java b/src/main/java/com/mpush/client/MessageDispatcher.java
index 8e26786..59bea40 100644
--- a/src/main/java/com/mpush/client/MessageDispatcher.java
+++ b/src/main/java/com/mpush/client/MessageDispatcher.java
@@ -53,6 +53,7 @@ public MessageDispatcher() {
register(Command.ERROR, new ErrorMessageHandler());
register(Command.PUSH, new PushMessageHandler());
register(Command.ACK, new AckHandler());
+
this.ackRequestMgr = AckRequestMgr.I();
}
From 7ec1d8d0a2841f366d38d6ec4c22234c29f02a2a Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Thu, 9 Mar 2017 09:56:28 +0800
Subject: [PATCH 47/53] add alloc log
---
src/main/java/com/mpush/client/AllotClient.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/main/java/com/mpush/client/AllotClient.java b/src/main/java/com/mpush/client/AllotClient.java
index c65ddcc..1c4cbef 100644
--- a/src/main/java/com/mpush/client/AllotClient.java
+++ b/src/main/java/com/mpush/client/AllotClient.java
@@ -79,7 +79,7 @@ public List queryServerAddressList() {
return serverAddress;
}
} catch (IOException e) {
- logger.e(e, "get server address ex, when connect server.");
+ logger.e(e, "get server address ex, when connect server. allot=%s", config.getAllotServer());
return Collections.emptyList();
}
From f5833a44f0313fa912d2212d75bf6dbe925f7642 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Thu, 17 Aug 2017 15:43:33 +0800
Subject: [PATCH 48/53] =?UTF-8?q?AllotClient=E5=A2=9E=E5=8A=A0https?=
=?UTF-8?q?=E6=94=AF=E6=8C=81?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../java/com/mpush/client/AllotClient.java | 52 +++++++++++++++++--
src/main/java/com/mpush/util/MPUtils.java | 21 ++++++++
2 files changed, 68 insertions(+), 5 deletions(-)
diff --git a/src/main/java/com/mpush/client/AllotClient.java b/src/main/java/com/mpush/client/AllotClient.java
index 1c4cbef..3d14d09 100644
--- a/src/main/java/com/mpush/client/AllotClient.java
+++ b/src/main/java/com/mpush/client/AllotClient.java
@@ -24,16 +24,19 @@
import com.mpush.api.Logger;
import com.mpush.util.IOUtils;
+import javax.net.ssl.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
import static com.mpush.api.Constants.DEFAULT_SO_TIMEOUT;
@@ -67,6 +70,10 @@ public List queryServerAddressList() {
try {
URL url = new URL(config.getAllotServer());
connection = (HttpURLConnection) url.openConnection();
+ if (config.getAllotServer().startsWith("https")) {
+ ((HttpsURLConnection) connection).setSSLSocketFactory(getSSLSocketFactory());
+ ((HttpsURLConnection) connection).setHostnameVerifier(new NullHostnameVerifier());
+ }
connection.setConnectTimeout(DEFAULT_SO_TIMEOUT);
connection.setReadTimeout(DEFAULT_SO_TIMEOUT);
connection.setUseCaches(false);
@@ -78,7 +85,7 @@ public List queryServerAddressList() {
connection.disconnect();
return serverAddress;
}
- } catch (IOException e) {
+ } catch (Exception e) {
logger.e(e, "get server address ex, when connect server. allot=%s", config.getAllotServer());
return Collections.emptyList();
}
@@ -105,9 +112,7 @@ public List queryServerAddressList() {
String result = new String(content, Constants.UTF_8);
logger.w("get server address success result=%s", result);
List serverAddress = new ArrayList<>();
- for (String s : result.split(",")) {
- serverAddress.add(s);
- }
+ serverAddress.addAll(Arrays.asList(result.split(",")));
this.serverAddress = serverAddress;
} else {
logger.w("get server address failure return content empty.");
@@ -115,4 +120,41 @@ public List queryServerAddressList() {
return serverAddress;
}
+
+ private SSLSocketFactory getSSLSocketFactory() {
+ return getTrustAllContext().getSocketFactory();
+ }
+
+ private SSLContext getTrustAllContext() {
+ SSLContext sslContext = null;
+ try {
+ sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(null, new TrustManager[]{new TrustAnyTrustManager()}, new SecureRandom());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return sslContext;
+ }
+
+ private static class TrustAnyTrustManager implements X509TrustManager {
+ public void checkClientTrusted(X509Certificate[] chain, String authType)
+ throws CertificateException {
+ }
+
+ public void checkServerTrusted(X509Certificate[] chain, String authType)
+ throws CertificateException {
+ }
+
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ }
+
+ private static class NullHostnameVerifier implements HostnameVerifier {
+
+ @Override
+ public boolean verify(String s, SSLSession sslSession) {
+ return true;
+ }
+ }
}
diff --git a/src/main/java/com/mpush/util/MPUtils.java b/src/main/java/com/mpush/util/MPUtils.java
index 683ec79..4265ee7 100644
--- a/src/main/java/com/mpush/util/MPUtils.java
+++ b/src/main/java/com/mpush/util/MPUtils.java
@@ -24,6 +24,7 @@
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
+import java.util.regex.Pattern;
/**
* Created by ohun on 2016/1/25.
@@ -32,6 +33,19 @@
*/
public final class MPUtils {
+ /**
+ * Quick and dirty pattern to differentiate IP addresses from hostnames. This is an approximation
+ * of Android's private InetAddress#isNumeric API.
+ *
+ *
This matches IPv6 addresses as a hex string containing at least one colon, and possibly
+ * including dots after the first colon. It matches IPv4 addresses as strings containing only
+ * decimal digits and dots. This pattern matches strings like "a:.23" and "54" that are neither IP
+ * addresses nor hostnames; they will be verified as IP addresses (which is a more strict
+ * verification).
+ */
+ private static final Pattern VERIFY_AS_IP_ADDRESS = Pattern.compile(
+ "([0-9a-fA-F]*:[0-9a-fA-F:.]*)|([\\d.]+)");
+
public static String parseHost2Ip(String host) {
InetAddress ia = null;
try {
@@ -79,4 +93,11 @@ public static Map headerFromString(String headersString) {
}
return headers;
}
+
+ /**
+ * Returns true if {@code host} is not a host name and might be an IP address.
+ */
+ public static boolean verifyAsIpAddress(String host) {
+ return VERIFY_AS_IP_ADDRESS.matcher(host).matches();
+ }
}
From 2f5543a70fcbe09f745d6ee074d0cc0e0591ef15 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Thu, 17 Aug 2017 15:44:05 +0800
Subject: [PATCH 49/53] =?UTF-8?q?=E5=8D=87=E7=BA=A7=E7=89=88=E6=9C=AC?=
=?UTF-8?q?=E5=88=B00.8.0?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 5ef0e16..546f377 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
com.github.mpusher
mpush-client-java
- 0.0.6
+ 0.8.0
jar
mpush-client-java
MPUSH消息推送客户端SDK
From 70d11a12389e0ba0d8671c97db5f7b9e17683cbd Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Sat, 9 Jun 2018 17:33:51 +0800
Subject: [PATCH 50/53] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E4=BC=98=E5=8C=96?=
=?UTF-8?q?=E5=A2=9E=E5=8A=A0ErrorCode?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../com/mpush/api/protocol/ErrorCode.java | 55 +++++++++++++++++++
.../mpush/handler/ErrorMessageHandler.java | 14 +++--
2 files changed, 64 insertions(+), 5 deletions(-)
create mode 100644 src/main/java/com/mpush/api/protocol/ErrorCode.java
diff --git a/src/main/java/com/mpush/api/protocol/ErrorCode.java b/src/main/java/com/mpush/api/protocol/ErrorCode.java
new file mode 100644
index 0000000..9420436
--- /dev/null
+++ b/src/main/java/com/mpush/api/protocol/ErrorCode.java
@@ -0,0 +1,55 @@
+/*
+ * (C) Copyright 2015-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Contributors:
+ * ohun@live.cn (夜色)
+ */
+
+package com.mpush.api.protocol;
+
+/**
+ * Created by ohun on 2015/12/30.
+ *
+ * @author ohun@live.cn
+ */
+public enum ErrorCode {
+ OFFLINE(1, "user offline"),
+ PUSH_CLIENT_FAILURE(2, "push to client failure"),
+ ROUTER_CHANGE(3, "router change"),
+ ACK_TIMEOUT(4, "ack timeout"),
+ DISPATCH_ERROR(100, "handle message error"),
+ UNSUPPORTED_CMD(101, "unsupported command"),
+ REPEAT_HANDSHAKE(102, "repeat handshake"),
+ SESSION_EXPIRED(103, "session expired"),
+ INVALID_DEVICE(104, "invalid device"),
+ UNKNOWN(-1, "unknown");
+
+ ErrorCode(int code, String errorMsg) {
+ this.errorMsg = errorMsg;
+ this.errorCode = (byte) code;
+ }
+
+ public final byte errorCode;
+ public final String errorMsg;
+
+ public static ErrorCode toEnum(int code) {
+ for (ErrorCode errorCode : values()) {
+ if (errorCode.errorCode == code) {
+ return errorCode;
+ }
+ }
+ return UNKNOWN;
+ }
+}
diff --git a/src/main/java/com/mpush/handler/ErrorMessageHandler.java b/src/main/java/com/mpush/handler/ErrorMessageHandler.java
index c5aa8cb..1de83be 100644
--- a/src/main/java/com/mpush/handler/ErrorMessageHandler.java
+++ b/src/main/java/com/mpush/handler/ErrorMessageHandler.java
@@ -20,13 +20,14 @@
package com.mpush.handler;
-
+import com.mpush.api.Logger;
import com.mpush.api.connection.Connection;
import com.mpush.api.protocol.Command;
-import com.mpush.message.ErrorMessage;
-import com.mpush.client.ClientConfig;
-import com.mpush.api.Logger;
import com.mpush.api.protocol.Packet;
+import com.mpush.client.ClientConfig;
+import com.mpush.message.ErrorMessage;
+
+import static com.mpush.api.protocol.ErrorCode.REPEAT_HANDSHAKE;
/**
* Created by ohun on 2015/12/30.
@@ -48,7 +49,10 @@ public void handle(ErrorMessage message) {
ClientConfig.I.getSessionStorage().clearSession();
message.getConnection().getClient().handshake();
} else if (message.cmd == Command.HANDSHAKE.cmd) {
- message.getConnection().getClient().stop();
+ if (message.code != REPEAT_HANDSHAKE.errorCode //重复握手的错误消息直接忽略
+ && !REPEAT_HANDSHAKE.errorMsg.equals(message.reason)) {
+ message.getConnection().getClient().stop();
+ }
} else {
message.getConnection().reconnect();
}
From 0f4357bc11a77fa82755357dea9fcbd5e2c9c919 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Sat, 9 Jun 2018 17:36:13 +0800
Subject: [PATCH 51/53] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=8F=A1=E6=89=8B?=
=?UTF-8?q?=E9=87=8D=E8=AF=95=E5=AF=BC=E8=87=B4=E6=9C=8D=E5=8A=A1=E7=AB=AF?=
=?UTF-8?q?AES=E8=A7=A3=E5=AF=86=E9=94=99=E8=AF=AFbug,=20=E5=90=8C?=
=?UTF-8?q?=E6=97=B6=E5=A2=9E=E5=8A=A0=E6=8F=A1=E6=89=8B=E5=92=8C=E7=BB=91?=
=?UTF-8?q?=E5=AE=9A=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4=E9=85=8D=E7=BD=AE?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../java/com/mpush/client/ClientConfig.java | 36 +++++++++++++++++++
.../java/com/mpush/client/MPushClient.java | 36 ++++++++++++-------
2 files changed, 59 insertions(+), 13 deletions(-)
diff --git a/src/main/java/com/mpush/client/ClientConfig.java b/src/main/java/com/mpush/client/ClientConfig.java
index b32a5ef..27c3803 100644
--- a/src/main/java/com/mpush/client/ClientConfig.java
+++ b/src/main/java/com/mpush/client/ClientConfig.java
@@ -55,6 +55,10 @@ public final class ClientConfig {
private Logger logger;
private boolean logEnabled;
private boolean enableHttpProxy = true;
+ private int handshakeTimeoutMills = 3000;
+ private int handshakeRetryCount = 0;
+ private int bindUserTimeoutMills = 3000;
+ private int bindUserRetryCount = 1;
public static ClientConfig build() {
return I = new ClientConfig();
@@ -256,4 +260,36 @@ public ClientConfig setTags(String tags) {
this.tags = tags;
return this;
}
+
+ public int getHandshakeTimeoutMills() {
+ return handshakeTimeoutMills;
+ }
+
+ public void setHandshakeTimeoutMills(int handshakeTimeoutMills) {
+ this.handshakeTimeoutMills = handshakeTimeoutMills;
+ }
+
+ public int getHandshakeRetryCount() {
+ return handshakeRetryCount;
+ }
+
+ public void setHandshakeRetryCount(int handshakeRetryCount) {
+ this.handshakeRetryCount = handshakeRetryCount;
+ }
+
+ public int getBindUserTimeoutMills() {
+ return bindUserTimeoutMills;
+ }
+
+ public void setBindUserTimeoutMills(int bindUserTimeoutMills) {
+ this.bindUserTimeoutMills = bindUserTimeoutMills;
+ }
+
+ public int getBindUserRetryCount() {
+ return bindUserRetryCount;
+ }
+
+ public void setBindUserRetryCount(int bindUserRetryCount) {
+ this.bindUserRetryCount = bindUserRetryCount;
+ }
}
diff --git a/src/main/java/com/mpush/client/MPushClient.java b/src/main/java/com/mpush/client/MPushClient.java
index d14d8de..ca1c381 100644
--- a/src/main/java/com/mpush/client/MPushClient.java
+++ b/src/main/java/com/mpush/client/MPushClient.java
@@ -215,8 +215,8 @@ public void fastConnect() {
ackRequestMgr.add(message.getSessionId(), AckContext
.build(this)
.setRequest(message.getPacket())
- .setTimeout(1000)
- .setRetryCount(3)
+ .setTimeout(config.getHandshakeTimeoutMills())
+ .setRetryCount(config.getHandshakeRetryCount())
);
logger.w("<<< do fast connect, message=%s", message);
message.sendRaw();
@@ -239,9 +239,9 @@ public void handshake() {
message.encodeBody();
ackRequestMgr.add(message.getSessionId(), AckContext
.build(this)
- .setTimeout(1000)
.setRequest(message.getPacket())
- .setRetryCount(3)
+ .setTimeout(config.getHandshakeTimeoutMills())
+ .setRetryCount(config.getHandshakeRetryCount())
);
logger.w("<<< do handshake, message=%s", message);
message.send();
@@ -250,6 +250,11 @@ public void handshake() {
@Override
public void bindUser(final String userId, final String tags) {
+ if (!connection.getSessionContext().handshakeOk()) {
+ logger.w("connection is not handshake ok!");
+ return;
+ }
+
if (Strings.isBlank(userId)) {
logger.w("bind user is null");
return;
@@ -271,9 +276,9 @@ public void bindUser(final String userId, final String tags) {
message.encodeBody();
ackRequestMgr.add(message.getSessionId(), AckContext
.build(this)
- .setTimeout(3000)
.setRequest(message.getPacket())
- .setRetryCount(5)
+ .setTimeout(config.getBindUserTimeoutMills())
+ .setRetryCount(config.getBindUserRetryCount())
);
logger.w("<<< do bind user, userId=%s", userId);
message.send();
@@ -282,6 +287,10 @@ public void bindUser(final String userId, final String tags) {
@Override
public void unbindUser() {
+ if (!connection.getSessionContext().handshakeOk()) {
+ logger.w("connection is not handshake ok!");
+ return;
+ }
String userId = config.getUserId();
if (Strings.isBlank(userId)) {
logger.w("unbind user is null");
@@ -307,14 +316,15 @@ public void ack(int messageId) {
@Override
public Future push(PushContext context) {
- if (connection.getSessionContext().handshakeOk()) {
- PushMessage message = new PushMessage(context.content, connection);
- message.addFlag(context.ackModel.flag);
- message.send();
- logger.d("<<< send push message=%s", message);
- return ackRequestMgr.add(message.getSessionId(), context);
+ if (!connection.getSessionContext().handshakeOk()) {
+ logger.w("connection is not handshake ok!");
+ return null;
}
- return null;
+ PushMessage message = new PushMessage(context.content, connection);
+ message.addFlag(context.ackModel.flag);
+ message.send();
+ logger.d("<<< send push message=%s", message);
+ return ackRequestMgr.add(message.getSessionId(), context);
}
@Override
From 9967769b0819207605a3580ee1d662c12bb157eb Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Sat, 9 Jun 2018 17:36:43 +0800
Subject: [PATCH 52/53] =?UTF-8?q?=E5=8D=87=E7=BA=A7=E7=89=88=E6=9C=AC?=
=?UTF-8?q?=E5=88=B00.8.1?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 546f377..ce0c433 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
com.github.mpusher
mpush-client-java
- 0.8.0
+ 0.8.1
jar
mpush-client-java
MPUSH消息推送客户端SDK
From e5fb89fefa020ef9be5b2b325d8b7af13e0eed77 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=A4=9C=E8=89=B2?=
Date: Sat, 29 Sep 2018 16:31:06 +0800
Subject: [PATCH 53/53] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=BB=91=E5=AE=9A?=
=?UTF-8?q?=E9=87=8D=E8=AF=95=E5=AF=BC=E8=87=B4=E6=9C=8D=E5=8A=A1=E7=AB=AF?=
=?UTF-8?q?AES=E8=A7=A3=E5=AF=86=E9=94=99=E8=AF=AFbug,=20=E5=90=8C?=
=?UTF-8?q?=E6=97=B6=E5=A2=9E=E5=8A=A0=E8=B6=85=E6=97=B6=E9=87=8D=E8=AF=95?=
=?UTF-8?q?=E6=9D=A1=E4=BB=B6=E8=AE=BE=E7=BD=AE?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../java/com/mpush/api/ack/AckContext.java | 10 ++++++
.../com/mpush/api/ack/RetryCondition.java | 32 +++++++++++++++++++
.../com/mpush/api/connection/Connection.java | 11 +++----
.../java/com/mpush/client/AckRequestMgr.java | 30 ++++++++++-------
.../java/com/mpush/client/MPushClient.java | 8 +++++
.../com/mpush/client/MPushClientTest.java | 2 +-
6 files changed, 74 insertions(+), 19 deletions(-)
create mode 100644 src/main/java/com/mpush/api/ack/RetryCondition.java
diff --git a/src/main/java/com/mpush/api/ack/AckContext.java b/src/main/java/com/mpush/api/ack/AckContext.java
index 253f73e..21f5b32 100644
--- a/src/main/java/com/mpush/api/ack/AckContext.java
+++ b/src/main/java/com/mpush/api/ack/AckContext.java
@@ -32,6 +32,7 @@ public class AckContext {
public int timeout = 1000;
public Packet request;
public int retryCount;
+ public RetryCondition retryCondition;
public static AckContext build(AckCallback callback) {
AckContext context = new AckContext();
@@ -83,4 +84,13 @@ public AckContext setRetryCount(int retryCount) {
this.retryCount = retryCount;
return this;
}
+
+ public RetryCondition getRetryCondition() {
+ return retryCondition;
+ }
+
+ public AckContext setRetryCondition(RetryCondition retryCondition) {
+ this.retryCondition = retryCondition;
+ return this;
+ }
}
diff --git a/src/main/java/com/mpush/api/ack/RetryCondition.java b/src/main/java/com/mpush/api/ack/RetryCondition.java
new file mode 100644
index 0000000..80f09db
--- /dev/null
+++ b/src/main/java/com/mpush/api/ack/RetryCondition.java
@@ -0,0 +1,32 @@
+/*
+ * (C) Copyright 2015-2020 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Contributors:
+ * ohun@live.cn (夜色)
+ */
+
+package com.mpush.api.ack;
+
+import com.mpush.api.connection.Connection;
+import com.mpush.api.protocol.Packet;
+
+/**
+ * Created by ohun on 18/9/29.
+ *
+ * @author ohun@live.cn (夜色)
+ */
+public interface RetryCondition {
+ boolean test(Connection connection, Packet packet);
+}
diff --git a/src/main/java/com/mpush/api/connection/Connection.java b/src/main/java/com/mpush/api/connection/Connection.java
index c962347..1d75ace 100644
--- a/src/main/java/com/mpush/api/connection/Connection.java
+++ b/src/main/java/com/mpush/api/connection/Connection.java
@@ -34,15 +34,15 @@ public interface Connection {
void connect();
- SessionContext getSessionContext();
+ void close();
- void send(Packet packet);
+ void reconnect();
- void close();
+ void send(Packet packet);//TODO add send Listener
boolean isConnected();
- void reconnect();
+ boolean isAutoConnect();
boolean isReadTimeout();
@@ -54,10 +54,9 @@ public interface Connection {
void resetTimeout();
- boolean isAutoConnect();
+ SessionContext getSessionContext();
SocketChannel getChannel();
Client getClient();
-
}
diff --git a/src/main/java/com/mpush/client/AckRequestMgr.java b/src/main/java/com/mpush/client/AckRequestMgr.java
index 7192bfe..76e6f32 100644
--- a/src/main/java/com/mpush/client/AckRequestMgr.java
+++ b/src/main/java/com/mpush/client/AckRequestMgr.java
@@ -23,6 +23,7 @@
import com.mpush.api.ack.AckCallback;
import com.mpush.api.ack.AckContext;
import com.mpush.api.ack.AckModel;
+import com.mpush.api.ack.RetryCondition;
import com.mpush.api.connection.Connection;
import com.mpush.api.protocol.Packet;
import com.mpush.util.thread.ExecutorManager;
@@ -103,8 +104,9 @@ public final class RequestTask extends FutureTask implements Runnable {
private Packet request;
private Future> future;
private int retryCount;
+ private RetryCondition retryCondition;
- private RequestTask(AckCallback callback, int timeout, int sessionId, Packet request, int retryCount) {
+ private RequestTask(AckCallback callback, int timeout, int sessionId, Packet request, int retryCount, RetryCondition retryCondition) {
super(NONE);
this.callback = callback;
this.timeout = timeout;
@@ -112,10 +114,11 @@ private RequestTask(AckCallback callback, int timeout, int sessionId, Packet req
this.sessionId = sessionId;
this.request = request;
this.retryCount = retryCount;
+ this.retryCondition = retryCondition;
}
private RequestTask(int sessionId, AckContext context) {
- this(context.callback, context.timeout, sessionId, context.request, context.retryCount);
+ this(context.callback, context.timeout, sessionId, context.request, context.retryCount, context.retryCondition);
}
@Override
@@ -144,29 +147,32 @@ private void call(Packet response) {
if (callback != null) {
if (success) {
logger.d("receive one ack response, sessionId=%d, costTime=%d, request=%s, response=%s"
- , sessionId, (System.currentTimeMillis() - sendTime), request, response
- );
+ , sessionId, (System.currentTimeMillis() - sendTime), request, response);
callback.onSuccess(response);
} else if (request != null && retryCount > 0) {
- logger.w("one ack request timeout, retry=%d, sessionId=%d, costTime=%d, request=%s"
- , retryCount, sessionId, (System.currentTimeMillis() - sendTime), request
- );
- addTask(copy(retryCount - 1));
- connection.send(request);
+ if (retryCondition == null || retryCondition.test(connection, request)) {
+ logger.w("one ack request timeout, retry=%d, sessionId=%d, costTime=%d, request=%s"
+ , retryCount, sessionId, (System.currentTimeMillis() - sendTime), request);
+ addTask(copy(retryCount - 1));
+ connection.send(request);
+ } else {
+ logger.w("one ack request timeout, but ignore by condition, retry=%d, sessionId=%d, costTime=%d, request=%s"
+ , retryCount, sessionId, (System.currentTimeMillis() - sendTime), request);
+ }
} else {
logger.w("one ack request timeout, sessionId=%d, costTime=%d, request=%s"
- , sessionId, (System.currentTimeMillis() - sendTime), request
- );
+ , sessionId, (System.currentTimeMillis() - sendTime), request);
callback.onTimeout(request);
}
}
callback = null;
request = null;
+ retryCondition = null;
}
}
private RequestTask copy(int retryCount) {
- return new RequestTask(callback, timeout, sessionId, request, retryCount);
+ return new RequestTask(callback, timeout, sessionId, request, retryCount, retryCondition);
}
}
}
diff --git a/src/main/java/com/mpush/client/MPushClient.java b/src/main/java/com/mpush/client/MPushClient.java
index ca1c381..fe5bc82 100644
--- a/src/main/java/com/mpush/client/MPushClient.java
+++ b/src/main/java/com/mpush/client/MPushClient.java
@@ -24,6 +24,8 @@
import com.mpush.api.Logger;
import com.mpush.api.ack.AckCallback;
import com.mpush.api.ack.AckContext;
+import com.mpush.api.ack.RetryCondition;
+import com.mpush.api.connection.Connection;
import com.mpush.api.connection.SessionContext;
import com.mpush.api.connection.SessionStorage;
import com.mpush.api.http.HttpRequest;
@@ -279,6 +281,12 @@ public void bindUser(final String userId, final String tags) {
.setRequest(message.getPacket())
.setTimeout(config.getBindUserTimeoutMills())
.setRetryCount(config.getBindUserRetryCount())
+ .setRetryCondition(new RetryCondition() {
+ @Override
+ public boolean test(Connection connection, Packet packet) {
+ return connection.getSessionContext().handshakeOk();
+ }
+ })
);
logger.w("<<< do bind user, userId=%s", userId);
message.send();
diff --git a/src/test/java/com/mpush/client/MPushClientTest.java b/src/test/java/com/mpush/client/MPushClientTest.java
index 536da60..5068191 100644
--- a/src/test/java/com/mpush/client/MPushClientTest.java
+++ b/src/test/java/com/mpush/client/MPushClientTest.java
@@ -60,7 +60,7 @@ public static void main(String[] args) throws Exception {
client = ClientConfig
.build()
.setPublicKey(publicKey)
- .setAllotServer(allocServer)
+ //.setAllotServer(allocServer)
.setServerHost(serverHost)
.setServerPort(3000)
.setDeviceId("deviceId-test" + i)