clientState = new AtomicReference<>(State.Shutdown);
- private final MessageDispatcher receiver;
private final TcpConnection connection;
private final ClientConfig config;
private final Logger logger;
private int hbTimeoutTimes;
- private HttpRequestQueue requestQueue;
+ private AckRequestMgr ackRequestMgr;
+ private HttpRequestMgr httpRequestMgr;
/*package*/ MPushClient(ClientConfig config) {
this.config = config;
this.logger = config.getLogger();
- this.receiver = new MessageDispatcher();
- this.connection = new TcpConnection(this, receiver);
+
+ MessageDispatcher receiver = new MessageDispatcher();
+
if (config.isEnableHttpProxy()) {
- this.requestQueue = new HttpRequestQueue();
- this.receiver.register(Command.HTTP_PROXY, new HttpProxyHandler(requestQueue));
+ this.httpRequestMgr = HttpRequestMgr.I();
+ receiver.register(Command.HTTP_PROXY, new HttpProxyHandler());
}
+
+ this.ackRequestMgr = AckRequestMgr.I();
+ this.connection = new TcpConnection(this, receiver);
+ this.ackRequestMgr.setConnection(this.connection);
}
@Override
@@ -107,6 +117,50 @@ public boolean isRunning() {
return clientState.get() == State.Started && connection.isConnected();
}
+ /**
+ * 这个方法主要用于解决网络十分不稳定的场景:
+ * 正常情况如果网络断开,就应该关闭连接,反之则应去建立连接
+ * 但是在网络抖动厉害时就会发生连接频繁的建立/断开。
+ *
+ * 处理这种场景的其中一个方案是:
+ * 当网络断开时不主动关闭连接,而是尝试发送一次心跳检测,
+ * 如果能收到响应,说明网络短时间内又恢复了,
+ * 否则就断开连接,等待网络恢复并重建连接。
+ *
+ * @param isConnected true/false
+ */
+ @Override
+ public void onNetStateChange(boolean isConnected) {
+ connection.setAutoConnect(isConnected);
+ logger.i("network state change, isConnected=%b, connection=%s", isConnected, connection);
+ if (isConnected) { //当有网络时,去尝试重连
+ connection.connect();
+ } else if (connection.isConnected()) { //无网络,如果连接没有断开,尝试发送一次心跳检测,用于快速校验网络状况
+ connection.resetTimeout();//心跳检测前,重置上次读写数据包的时间戳
+ hbTimeoutTimes = MAX_HB_TIMEOUT_COUNT - 2;//总共要调用两次healthCheck,第一次用于发送心跳,第二次用于检测是否超时
+ final ScheduledExecutorService timer = ExecutorManager.INSTANCE.getTimerThread();
+
+ //隔3s后发送一次心跳检测,看能不能收到服务端的响应
+ timer.schedule(new Runnable() {
+ int checkCount = 0;
+
+ @Override
+ public void run() {
+ logger.w("network disconnected, try test tcp connection checkCount=%d, connection=%s", checkCount, connection);
+ //如果期间连接状态发生变化,取消任务
+ if (connection.isAutoConnect() || !connection.isConnected()) return;
+
+ if (++checkCount <= 2) {
+ if (healthCheck() && checkCount < 2) {
+ timer.schedule(this, 3, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ }, 3, TimeUnit.SECONDS);
+ }
+ }
+
@Override
public boolean healthCheck() {
@@ -159,9 +213,16 @@ public void fastConnect() {
message.sessionId = session.sessionId;
message.maxHeartbeat = config.getMaxHeartbeat();
message.minHeartbeat = config.getMinHeartbeat();
+ message.encodeBody();
+ ackRequestMgr.add(message.getSessionId(), AckContext
+ .build(this)
+ .setRequest(message.getPacket())
+ .setTimeout(config.getHandshakeTimeoutMills())
+ .setRetryCount(config.getHandshakeRetryCount())
+ );
+ logger.w("<<< do fast connect, message=%s", message);
message.sendRaw();
connection.getSessionContext().changeCipher(session.cipher);
- logger.w("<<< do fast connect, message=%s", message);
}
@Override
@@ -177,37 +238,74 @@ public void handshake() {
message.clientVersion = config.getClientVersion();
message.maxHeartbeat = config.getMaxHeartbeat();
message.minHeartbeat = config.getMinHeartbeat();
+ message.encodeBody();
+ ackRequestMgr.add(message.getSessionId(), AckContext
+ .build(this)
+ .setRequest(message.getPacket())
+ .setTimeout(config.getHandshakeTimeoutMills())
+ .setRetryCount(config.getHandshakeRetryCount())
+ );
+ logger.w("<<< do handshake, message=%s", message);
message.send();
context.changeCipher(new AesCipher(message.clientKey, message.iv));
- logger.w("<<< do handshake, message=%s", message);
}
@Override
- public void bindUser(String userId) {
+ public void bindUser(final String userId, final String tags) {
+ if (!connection.getSessionContext().handshakeOk()) {
+ logger.w("connection is not handshake ok!");
+ return;
+ }
+
if (Strings.isBlank(userId)) {
logger.w("bind user is null");
return;
}
SessionContext context = connection.getSessionContext();
- if (userId.equals(context.bindUser)) return;
- context.setBindUser(userId);
- config.setUserId(userId);
- BindUserMessage
+ if (context.bindUser != null) {
+ if (userId.equals(context.bindUser)) {//已经绑定
+ if (tags != null && tags.equals(context.tags)) return;
+ } else {
+ unbindUser();//切换用户,要先解绑老用户
+ }
+ }
+ context.setBindUser(userId).setTags(tags);
+ config.setUserId(userId).setTags(tags);
+ BindUserMessage message = BindUserMessage
.buildBind(connection)
.setUserId(userId)
- .send();
+ .setTags(tags);
+ message.encodeBody();
+ ackRequestMgr.add(message.getSessionId(), AckContext
+ .build(this)
+ .setRequest(message.getPacket())
+ .setTimeout(config.getBindUserTimeoutMills())
+ .setRetryCount(config.getBindUserRetryCount())
+ .setRetryCondition(new RetryCondition() {
+ @Override
+ public boolean test(Connection connection, Packet packet) {
+ return connection.getSessionContext().handshakeOk();
+ }
+ })
+ );
logger.w("<<< do bind user, userId=%s", userId);
+ message.send();
+
}
@Override
public void unbindUser() {
+ if (!connection.getSessionContext().handshakeOk()) {
+ logger.w("connection is not handshake ok!");
+ return;
+ }
String userId = config.getUserId();
if (Strings.isBlank(userId)) {
logger.w("unbind user is null");
return;
}
- config.setUserId(null);
- connection.getSessionContext().setBindUser(null);
+ config.setUserId(null).setTags(null);
+ connection.getSessionContext().setBindUser(null).setTags(null);
BindUserMessage
.buildUnbind(connection)
.setUserId(userId)
@@ -215,6 +313,28 @@ public void unbindUser() {
logger.w("<<< do unbind user, userId=%s", userId);
}
+ @Override
+ public void ack(int messageId) {
+ if (messageId > 0) {
+ AckMessage ackMessage = new AckMessage(messageId, connection);
+ ackMessage.sendRaw();
+ logger.d("<<< send ack for push messageId=%d", messageId);
+ }
+ }
+
+ @Override
+ public Future push(PushContext context) {
+ if (!connection.getSessionContext().handshakeOk()) {
+ logger.w("connection is not handshake ok!");
+ return null;
+ }
+ PushMessage message = new PushMessage(context.content, connection);
+ message.addFlag(context.ackModel.flag);
+ message.send();
+ logger.d("<<< send push message=%s", message);
+ return ackRequestMgr.add(message.getSessionId(), context);
+ }
+
@Override
public Future sendHttp(HttpRequest request) {
if (connection.getSessionContext().handshakeOk()) {
@@ -225,8 +345,18 @@ public Future sendHttp(HttpRequest request) {
message.body = request.getBody();
message.send();
logger.d("<<< send http proxy, request=%s", request);
- return requestQueue.add(message.getSessionId(), request);
+ return httpRequestMgr.add(message.getSessionId(), request);
}
return null;
}
+
+ @Override
+ public void onSuccess(Packet response) {
+
+ }
+
+ @Override
+ public void onTimeout(Packet request) {
+ this.connection.reconnect();
+ }
}
diff --git a/src/main/java/com/mpush/client/MessageDispatcher.java b/src/main/java/com/mpush/client/MessageDispatcher.java
index 89e4773..59bea40 100644
--- a/src/main/java/com/mpush/client/MessageDispatcher.java
+++ b/src/main/java/com/mpush/client/MessageDispatcher.java
@@ -42,6 +42,7 @@ public final class MessageDispatcher implements PacketReceiver {
private final Executor executor = ExecutorManager.INSTANCE.getDispatchThread();
private final Map handlers = new HashMap<>();
private final Logger logger = ClientConfig.I.getLogger();
+ private final AckRequestMgr ackRequestMgr;
public MessageDispatcher() {
register(Command.HEARTBEAT, new HeartbeatHandler());
@@ -51,6 +52,9 @@ public MessageDispatcher() {
register(Command.OK, new OkMessageHandler());
register(Command.ERROR, new ErrorMessageHandler());
register(Command.PUSH, new PushMessageHandler());
+ register(Command.ACK, new AckHandler());
+
+ this.ackRequestMgr = AckRequestMgr.I();
}
public void register(Command command, MessageHandler handler) {
@@ -65,6 +69,7 @@ public void onReceive(final Packet packet, final Connection connection) {
@Override
public void run() {
try {
+ doAckResponse(packet);
handler.handle(packet, connection);
} catch (Throwable throwable) {
logger.e(throwable, "handle message error, packet=%s", packet);
@@ -73,8 +78,15 @@ public void run() {
}
});
} else {
- logger.w("<<< receive unsupported message, do reconnect, packet=%s", packet);
- connection.reconnect();
+ logger.w("<<< receive unsupported message, packet=%s", packet);
+ //connection.reconnect();
+ }
+ }
+
+ private void doAckResponse(Packet packet) {
+ AckRequestMgr.RequestTask task = ackRequestMgr.getAndRemove(packet.sessionId);
+ if (task != null) {
+ task.success(packet);
}
}
}
diff --git a/src/main/java/com/mpush/client/TcpConnection.java b/src/main/java/com/mpush/client/TcpConnection.java
index bfb4b7d..2252616 100644
--- a/src/main/java/com/mpush/client/TcpConnection.java
+++ b/src/main/java/com/mpush/client/TcpConnection.java
@@ -38,6 +38,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
+import static com.mpush.api.Constants.DEFAULT_SO_TIMEOUT;
import static com.mpush.api.Constants.MAX_RESTART_COUNT;
import static com.mpush.api.Constants.MAX_TOTAL_RESTART_COUNT;
import static com.mpush.client.TcpConnection.State.*;
@@ -52,9 +53,8 @@
public final class TcpConnection implements Connection {
public enum State {connecting, connected, disconnecting, disconnected}
- private final AtomicReference state = new AtomicReference(disconnected);
+ private final AtomicReference state = new AtomicReference<>(disconnected);
private final EventLock connLock = new EventLock();
- private final ClientConfig config;
private final Logger logger;
private final ClientListener listener;
private final MPushClient client;
@@ -71,8 +71,8 @@ public enum State {connecting, connected, disconnecting, disconnected}
private volatile boolean autoConnect = true;
public TcpConnection(MPushClient client, PacketReceiver receiver) {
+ ClientConfig config = ClientConfig.I;
this.client = client;
- this.config = ClientConfig.I;
this.logger = config.getLogger();
this.listener = config.getClientListener();
this.allotClient = new AllotClient();
@@ -180,9 +180,8 @@ private boolean doReconnect() {
private boolean doConnect() {
List address = allotClient.getServerAddress();
if (address != null && address.size() > 0) {
- Iterator it = address.iterator();
- while (it.hasNext()) {
- String[] host_port = it.next().split(":");
+ for (int i = 0; i < address.size(); i++) {
+ String[] host_port = address.get(i).split(":");
if (host_port.length == 2) {
String host = host_port[0];
@@ -192,8 +191,7 @@ private boolean doConnect() {
return true;
}
}
-
- it.remove();
+ address.remove(i--);
}
}
return false;
@@ -206,7 +204,6 @@ private boolean doConnect(String host, int port) {
try {
channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
- channel.socket().setKeepAlive(true);
channel.connect(new InetSocketAddress(host, port));
logger.w("connect server ok [%s:%s]", host, port);
onConnected(channel);
@@ -264,6 +261,16 @@ public boolean isReadTimeout() {
return System.currentTimeMillis() - lastReadTime > context.heartbeat + 1000;
}
+ @Override
+ public void resetTimeout() {
+ lastReadTime = lastWriteTime = 0;
+ }
+
+ @Override
+ public boolean isAutoConnect() {
+ return autoConnect;
+ }
+
@Override
public SessionContext getSessionContext() {
return context;
@@ -276,7 +283,14 @@ public boolean isWriteTimeout() {
@Override
public String toString() {
- return "TcpConnection{" + ", lastReadTime=" + lastReadTime + ", lastWriteTime=" + lastWriteTime + ", context="
- + context + '}';
+ return "TcpConnection{" +
+ "state=" + state +
+ ", channel=" + channel +
+ ", lastReadTime=" + lastReadTime +
+ ", lastWriteTime=" + lastWriteTime +
+ ", totalReconnectCount=" + totalReconnectCount +
+ ", reconnectCount=" + reconnectCount +
+ ", autoConnect=" + autoConnect +
+ '}';
}
}
diff --git a/src/main/java/com/mpush/codec/AsyncPacketReader.java b/src/main/java/com/mpush/codec/AsyncPacketReader.java
index cf6cace..07088a2 100644
--- a/src/main/java/com/mpush/codec/AsyncPacketReader.java
+++ b/src/main/java/com/mpush/codec/AsyncPacketReader.java
@@ -101,7 +101,15 @@ private boolean read(SocketChannel channel, ByteBuffer in) {
} catch (IOException e) {
logger.e(e, "read packet ex, do reconnect");
readCount = -1;
+ sleep4Reconnect();
}
return readCount > 0;
}
+
+ private void sleep4Reconnect() {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ }
+ }
}
diff --git a/src/main/java/com/mpush/handler/AckHandler.java b/src/main/java/com/mpush/handler/AckHandler.java
new file mode 100644
index 0000000..9e946e9
--- /dev/null
+++ b/src/main/java/com/mpush/handler/AckHandler.java
@@ -0,0 +1,56 @@
+/*
+ * (C) Copyright 2015-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * Contributors:
+ * ohun@live.cn (夜色)
+ */
+
+package com.mpush.handler;
+
+import com.mpush.api.Logger;
+import com.mpush.api.connection.Connection;
+import com.mpush.api.protocol.Packet;
+import com.mpush.client.ClientConfig;
+import com.mpush.client.AckRequestMgr;
+import com.mpush.message.AckMessage;
+
+/**
+ * Created by ohun on 16/9/5.
+ *
+ * @author ohun@live.cn (夜色)
+ */
+public class AckHandler extends BaseMessageHandler {
+
+ private final Logger logger;
+ private final AckRequestMgr ackRequestMgr;
+
+ public AckHandler() {
+ this.logger = ClientConfig.I.getLogger();
+ this.ackRequestMgr = AckRequestMgr.I();
+ }
+
+ @Override
+ public AckMessage decode(Packet packet, Connection connection) {
+ return new AckMessage(packet, connection);
+ }
+
+ @Override
+ public void handle(AckMessage message) {
+ /*AckRequestMgr.RequestTask task = ackRequestMgr.getAndRemove(message.getSessionId());
+ if (task != null) {
+ task.success(message.getPacket());
+ }*/
+ }
+}
diff --git a/src/main/java/com/mpush/handler/BaseMessageHandler.java b/src/main/java/com/mpush/handler/BaseMessageHandler.java
index 9230f73..acc6346 100644
--- a/src/main/java/com/mpush/handler/BaseMessageHandler.java
+++ b/src/main/java/com/mpush/handler/BaseMessageHandler.java
@@ -33,6 +33,7 @@ public abstract class BaseMessageHandler implements MessageHa
public void handle(Packet packet, Connection connection) {
T t = decode(packet, connection);
if (t != null) {
+ t.decodeBody();
handle(t);
}
}
diff --git a/src/main/java/com/mpush/handler/ErrorMessageHandler.java b/src/main/java/com/mpush/handler/ErrorMessageHandler.java
index c5aa8cb..1de83be 100644
--- a/src/main/java/com/mpush/handler/ErrorMessageHandler.java
+++ b/src/main/java/com/mpush/handler/ErrorMessageHandler.java
@@ -20,13 +20,14 @@
package com.mpush.handler;
-
+import com.mpush.api.Logger;
import com.mpush.api.connection.Connection;
import com.mpush.api.protocol.Command;
-import com.mpush.message.ErrorMessage;
-import com.mpush.client.ClientConfig;
-import com.mpush.api.Logger;
import com.mpush.api.protocol.Packet;
+import com.mpush.client.ClientConfig;
+import com.mpush.message.ErrorMessage;
+
+import static com.mpush.api.protocol.ErrorCode.REPEAT_HANDSHAKE;
/**
* Created by ohun on 2015/12/30.
@@ -48,7 +49,10 @@ public void handle(ErrorMessage message) {
ClientConfig.I.getSessionStorage().clearSession();
message.getConnection().getClient().handshake();
} else if (message.cmd == Command.HANDSHAKE.cmd) {
- message.getConnection().getClient().stop();
+ if (message.code != REPEAT_HANDSHAKE.errorCode //重复握手的错误消息直接忽略
+ && !REPEAT_HANDSHAKE.errorMsg.equals(message.reason)) {
+ message.getConnection().getClient().stop();
+ }
} else {
message.getConnection().reconnect();
}
diff --git a/src/main/java/com/mpush/handler/HttpProxyHandler.java b/src/main/java/com/mpush/handler/HttpProxyHandler.java
index b2ae733..8ca9df5 100644
--- a/src/main/java/com/mpush/handler/HttpProxyHandler.java
+++ b/src/main/java/com/mpush/handler/HttpProxyHandler.java
@@ -20,11 +20,10 @@
package com.mpush.handler;
-
import com.mpush.api.connection.Connection;
import com.mpush.api.http.HttpResponse;
import com.mpush.api.protocol.Packet;
-import com.mpush.client.HttpRequestQueue;
+import com.mpush.client.HttpRequestMgr;
import com.mpush.api.Logger;
import com.mpush.client.ClientConfig;
import com.mpush.message.HttpResponseMessage;
@@ -36,10 +35,10 @@
*/
public final class HttpProxyHandler extends BaseMessageHandler {
private final Logger logger = ClientConfig.I.getLogger();
- private final HttpRequestQueue queue;
+ private final HttpRequestMgr httpRequestMgr;
- public HttpProxyHandler(HttpRequestQueue queue) {
- this.queue = queue;
+ public HttpProxyHandler() {
+ this.httpRequestMgr = HttpRequestMgr.I();
}
@Override
@@ -49,7 +48,7 @@ public HttpResponseMessage decode(Packet packet, Connection connection) {
@Override
public void handle(HttpResponseMessage message) {
- HttpRequestQueue.RequestTask task = queue.getAndRemove(message.getSessionId());
+ HttpRequestMgr.RequestTask task = httpRequestMgr.getAndRemove(message.getSessionId());
if (task != null) {
HttpResponse response = new HttpResponse(message.statusCode, message.reasonPhrase, message.headers, message.body);
task.setResponse(response);
diff --git a/src/main/java/com/mpush/handler/OkMessageHandler.java b/src/main/java/com/mpush/handler/OkMessageHandler.java
index 8ef2910..30ecc99 100644
--- a/src/main/java/com/mpush/handler/OkMessageHandler.java
+++ b/src/main/java/com/mpush/handler/OkMessageHandler.java
@@ -20,8 +20,8 @@
package com.mpush.handler;
-
import com.mpush.api.connection.Connection;
+import com.mpush.api.protocol.Command;
import com.mpush.api.protocol.Packet;
import com.mpush.message.OkMessage;
import com.mpush.api.Logger;
@@ -42,6 +42,12 @@ public OkMessage decode(Packet packet, Connection connection) {
@Override
public void handle(OkMessage message) {
+ if (message.cmd == Command.BIND.cmd) {
+ ClientConfig.I.getClientListener().onBind(true, message.getConnection().getSessionContext().bindUser);
+ } else if (message.cmd == Command.UNBIND.cmd) {
+ ClientConfig.I.getClientListener().onUnbind(true, null);
+ }
+
logger.w(">>> receive ok message=%s", message);
}
}
diff --git a/src/main/java/com/mpush/handler/PushMessageHandler.java b/src/main/java/com/mpush/handler/PushMessageHandler.java
index bc69d22..44eaea2 100644
--- a/src/main/java/com/mpush/handler/PushMessageHandler.java
+++ b/src/main/java/com/mpush/handler/PushMessageHandler.java
@@ -23,6 +23,7 @@
import com.mpush.api.ClientListener;
import com.mpush.api.connection.Connection;
import com.mpush.api.protocol.Packet;
+import com.mpush.message.AckMessage;
import com.mpush.message.PushMessage;
import com.mpush.client.ClientConfig;
import com.mpush.api.Logger;
@@ -43,7 +44,13 @@ public PushMessage decode(Packet packet, Connection connection) {
@Override
public void handle(PushMessage message) {
- logger.d(">>> receive push messaged=%s", message.content.length);
- listener.onReceivePush(message.getConnection().getClient(), message.content);
+ logger.d(">>> receive push message=%s", message.content.length);
+ listener.onReceivePush(message.getConnection().getClient(),
+ message.content,
+ message.bizAck() ? message.getSessionId() : 0);
+ if (message.autoAck()) {
+ AckMessage.from(message).sendRaw();
+ logger.d("<<< send ack for push messageId=%d", message.getSessionId());
+ }
}
}
diff --git a/src/main/java/com/mpush/message/AckMessage.java b/src/main/java/com/mpush/message/AckMessage.java
new file mode 100644
index 0000000..053653a
--- /dev/null
+++ b/src/main/java/com/mpush/message/AckMessage.java
@@ -0,0 +1,46 @@
+package com.mpush.message;
+
+import com.mpush.api.connection.Connection;
+import com.mpush.api.protocol.Command;
+import com.mpush.api.protocol.Packet;
+import com.mpush.util.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Created by ohun on 16/9/5.
+ *
+ * @author ohun@live.cn (夜色)
+ */
+public class AckMessage extends ByteBufMessage {
+
+ public AckMessage(int sessionId, Connection connection) {
+ super(new Packet(Command.ACK, sessionId), connection);
+ }
+
+ public AckMessage(Packet packet, Connection connection) {
+ super(packet, connection);
+ }
+
+ @Override
+ protected void decode(ByteBuffer body) {
+
+ }
+
+ @Override
+ protected void encode(ByteBuf body) {
+
+ }
+
+
+ public static AckMessage from(BaseMessage src) {
+ return new AckMessage(new Packet(Command.ACK, src.getSessionId()), src.connection);
+ }
+
+ @Override
+ public String toString() {
+ return "AckMessage{" +
+ "packet=" + packet +
+ '}';
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/mpush/message/BaseMessage.java b/src/main/java/com/mpush/message/BaseMessage.java
index d072ca9..31545f1 100644
--- a/src/main/java/com/mpush/message/BaseMessage.java
+++ b/src/main/java/com/mpush/message/BaseMessage.java
@@ -20,7 +20,6 @@
package com.mpush.message;
-
import com.mpush.util.IOUtils;
import com.mpush.client.ClientConfig;
import com.mpush.api.Message;
@@ -36,17 +35,23 @@
* @author ohun@live.cn (夜色)
*/
public abstract class BaseMessage implements Message {
+ public static final byte STATUS_DECODED = 1;
+ public static final byte STATUS_ENCODED = 2;
private static final AtomicInteger SID_SEQ = new AtomicInteger();
protected final Packet packet;
protected final Connection connection;
+ protected byte status = 0;
public BaseMessage(Packet packet, Connection connection) {
this.packet = packet;
this.connection = connection;
- this.decodeBody();
}
- protected void decodeBody() {
+ @Override
+ public void decodeBody() {
+ if ((status & STATUS_DECODED) != 0) return;
+ else status |= STATUS_DECODED;
+
if (packet.body != null && packet.body.length > 0) {
//1.解密
byte[] tmp = packet.body;
@@ -70,7 +75,11 @@ protected void decodeBody() {
}
}
- protected void encodeBody() {
+ @Override
+ public void encodeBody() {
+ if ((status & STATUS_ENCODED) != 0) return;
+ else status |= STATUS_ENCODED;
+
byte[] tmp = encode();
if (tmp != null && tmp.length > 0) {
//1.压缩
@@ -95,6 +104,13 @@ protected void encodeBody() {
}
}
+ private void encodeBodyRaw() {
+ if ((status & STATUS_ENCODED) != 0) return;
+ else status |= STATUS_ENCODED;
+
+ packet.body = encode();
+ }
+
protected abstract void decode(byte[] body);
protected abstract byte[] encode();
@@ -121,7 +137,7 @@ public void send() {
@Override
public void sendRaw() {
- packet.body = encode();
+ encodeBodyRaw();
connection.send(packet);
}
diff --git a/src/main/java/com/mpush/message/FastConnectMessage.java b/src/main/java/com/mpush/message/FastConnectMessage.java
index e831818..fad79f5 100644
--- a/src/main/java/com/mpush/message/FastConnectMessage.java
+++ b/src/main/java/com/mpush/message/FastConnectMessage.java
@@ -20,7 +20,6 @@
package com.mpush.message;
-
import com.mpush.api.connection.Connection;
import com.mpush.api.protocol.Command;
import com.mpush.api.protocol.Packet;
@@ -66,7 +65,8 @@ public void encode(ByteBuf body) {
@Override
public String toString() {
return "FastConnectMessage{" +
- "sessionId='" + sessionId + '\'' +
+ "sessionId=" + packet.sessionId +
+ ", sessionId='" + sessionId + '\'' +
", deviceId='" + deviceId + '\'' +
", minHeartbeat=" + minHeartbeat +
", maxHeartbeat=" + maxHeartbeat +
diff --git a/src/main/java/com/mpush/message/FastConnectOkMessage.java b/src/main/java/com/mpush/message/FastConnectOkMessage.java
index d81b5a6..a84c197 100644
--- a/src/main/java/com/mpush/message/FastConnectOkMessage.java
+++ b/src/main/java/com/mpush/message/FastConnectOkMessage.java
@@ -20,7 +20,6 @@
package com.mpush.message;
-
import com.mpush.api.connection.Connection;
import com.mpush.api.protocol.Packet;
import com.mpush.util.ByteBuf;
@@ -61,7 +60,8 @@ public FastConnectOkMessage setHeartbeat(int heartbeat) {
@Override
public String toString() {
return "FastConnectOkMessage{" +
- "heartbeat=" + heartbeat +
+ "sessionId=" + packet.sessionId +
+ ", heartbeat=" + heartbeat +
'}';
}
}
diff --git a/src/main/java/com/mpush/message/HandshakeMessage.java b/src/main/java/com/mpush/message/HandshakeMessage.java
index 27ed9b2..233d1fe 100644
--- a/src/main/java/com/mpush/message/HandshakeMessage.java
+++ b/src/main/java/com/mpush/message/HandshakeMessage.java
@@ -20,7 +20,6 @@
package com.mpush.message;
-
import com.mpush.api.connection.Connection;
import com.mpush.api.protocol.Command;
import com.mpush.api.protocol.Packet;
@@ -82,7 +81,8 @@ protected void encode(ByteBuf body) {
@Override
public String toString() {
return "HandshakeMessage{" +
- "deviceId='" + deviceId + '\'' +
+ "sessionId=" + packet.sessionId +
+ ", deviceId='" + deviceId + '\'' +
", osName='" + osName + '\'' +
", osVersion='" + osVersion + '\'' +
", clientVersion='" + clientVersion + '\'' +
diff --git a/src/main/java/com/mpush/message/PushMessage.java b/src/main/java/com/mpush/message/PushMessage.java
index 58d3f28..f4b84db 100644
--- a/src/main/java/com/mpush/message/PushMessage.java
+++ b/src/main/java/com/mpush/message/PushMessage.java
@@ -21,6 +21,7 @@
import com.mpush.api.connection.Connection;
+import com.mpush.api.protocol.Command;
import com.mpush.api.protocol.Packet;
import com.mpush.api.Constants;
@@ -33,6 +34,11 @@ public final class PushMessage extends BaseMessage {
public byte[] content;
+ public PushMessage(byte[] content, Connection connection) {
+ super(new Packet(Command.PUSH, genSessionId()), connection);
+ this.content = content;
+ }
+
public PushMessage(Packet packet, Connection connection) {
super(packet, connection);
}
@@ -47,6 +53,19 @@ public byte[] encode() {
return content;
}
+ public boolean autoAck() {
+ return packet.hasFlag(Packet.FLAG_AUTO_ACK);
+ }
+
+ public boolean bizAck() {
+ return packet.hasFlag(Packet.FLAG_BIZ_ACK);
+ }
+
+ public PushMessage addFlag(byte flag) {
+ packet.addFlag(flag);
+ return this;
+ }
+
@Override
public String toString() {
return "PushMessage{" +
diff --git a/src/main/java/com/mpush/util/MPUtils.java b/src/main/java/com/mpush/util/MPUtils.java
index 683ec79..4265ee7 100644
--- a/src/main/java/com/mpush/util/MPUtils.java
+++ b/src/main/java/com/mpush/util/MPUtils.java
@@ -24,6 +24,7 @@
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
+import java.util.regex.Pattern;
/**
* Created by ohun on 2016/1/25.
@@ -32,6 +33,19 @@
*/
public final class MPUtils {
+ /**
+ * Quick and dirty pattern to differentiate IP addresses from hostnames. This is an approximation
+ * of Android's private InetAddress#isNumeric API.
+ *
+ *
This matches IPv6 addresses as a hex string containing at least one colon, and possibly
+ * including dots after the first colon. It matches IPv4 addresses as strings containing only
+ * decimal digits and dots. This pattern matches strings like "a:.23" and "54" that are neither IP
+ * addresses nor hostnames; they will be verified as IP addresses (which is a more strict
+ * verification).
+ */
+ private static final Pattern VERIFY_AS_IP_ADDRESS = Pattern.compile(
+ "([0-9a-fA-F]*:[0-9a-fA-F:.]*)|([\\d.]+)");
+
public static String parseHost2Ip(String host) {
InetAddress ia = null;
try {
@@ -79,4 +93,11 @@ public static Map headerFromString(String headersString) {
}
return headers;
}
+
+ /**
+ * Returns true if {@code host} is not a host name and might be an IP address.
+ */
+ public static boolean verifyAsIpAddress(String host) {
+ return VERIFY_AS_IP_ADDRESS.matcher(host).matches();
+ }
}
diff --git a/src/main/java/com/mpush/util/thread/ExecutorManager.java b/src/main/java/com/mpush/util/thread/ExecutorManager.java
index 124b498..f122863 100644
--- a/src/main/java/com/mpush/util/thread/ExecutorManager.java
+++ b/src/main/java/com/mpush/util/thread/ExecutorManager.java
@@ -40,12 +40,11 @@ public final class ExecutorManager {
public static final String READ_THREAD_NAME = THREAD_NAME_PREFIX + "read-t";
public static final String DISPATCH_THREAD_NAME = THREAD_NAME_PREFIX + "dispatch-t";
public static final String START_THREAD_NAME = THREAD_NAME_PREFIX + "start-t";
- public static final String HTTP_THREAD_NAME = THREAD_NAME_PREFIX + "http-t";
+ public static final String TIMER_THREAD_NAME = THREAD_NAME_PREFIX + "timer-t";
public static final ExecutorManager INSTANCE = new ExecutorManager();
private ThreadPoolExecutor writeThread;
private ThreadPoolExecutor dispatchThread;
- private ThreadPoolExecutor startThread;
- private ScheduledExecutorService httpRequestThread;
+ private ScheduledExecutorService timerThread;
public ThreadPoolExecutor getWriteThread() {
if (writeThread == null || writeThread.isShutdown()) {
@@ -69,24 +68,13 @@ public ThreadPoolExecutor getDispatchThread() {
return dispatchThread;
}
- public ThreadPoolExecutor getStartThread() {
- if (startThread == null || startThread.isShutdown()) {
- startThread = new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue(1),
- new NamedThreadFactory(START_THREAD_NAME),
+ public ScheduledExecutorService getTimerThread() {
+ if (timerThread == null || timerThread.isShutdown()) {
+ timerThread = new ScheduledThreadPoolExecutor(1,
+ new NamedThreadFactory(TIMER_THREAD_NAME),
new RejectedHandler());
}
- return startThread;
- }
-
- public ScheduledExecutorService getHttpRequestThread() {
- if (httpRequestThread == null || httpRequestThread.isShutdown()) {
- httpRequestThread = new ScheduledThreadPoolExecutor(1,
- new NamedThreadFactory(HTTP_THREAD_NAME),
- new RejectedHandler());
- }
- return httpRequestThread;
+ return timerThread;
}
public synchronized void shutdown() {
@@ -98,14 +86,9 @@ public synchronized void shutdown() {
dispatchThread.shutdownNow();
dispatchThread = null;
}
- if (startThread != null) {
- startThread.shutdownNow();
- startThread = null;
-
- }
- if (httpRequestThread != null) {
- httpRequestThread.shutdownNow();
- httpRequestThread = null;
+ if (timerThread != null) {
+ timerThread.shutdownNow();
+ timerThread = null;
}
}
@@ -117,7 +100,7 @@ private static class RejectedHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- ClientConfig.I.getLogger().w("a task was rejected execute=%s", executor);
+ ClientConfig.I.getLogger().w("a task was rejected r=%s", r);
}
}
}
diff --git a/src/test/java/com/mpush/client/MPushClientTest.java b/src/test/java/com/mpush/client/MPushClientTest.java
index 5d46ff9..5068191 100644
--- a/src/test/java/com/mpush/client/MPushClientTest.java
+++ b/src/test/java/com/mpush/client/MPushClientTest.java
@@ -22,17 +22,11 @@
import com.mpush.api.Client;
import com.mpush.api.ClientListener;
-import com.mpush.api.Constants;
-import com.mpush.api.http.HttpCallback;
-import com.mpush.api.http.HttpMethod;
-import com.mpush.api.http.HttpRequest;
-import com.mpush.api.http.HttpResponse;
import com.mpush.util.DefaultLogger;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.LockSupport;
/**
* Created by ohun on 2016/1/25.
@@ -41,36 +35,59 @@
*/
public class MPushClientTest {
private static final String publicKey = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCghPCWCobG8nTD24juwSVataW7iViRxcTkey/B792VZEhuHjQvA3cAJgx2Lv8GnX8NIoShZtoCg3Cx6ecs+VEPD2fBcg2L4JK7xldGpOJ3ONEAyVsLOttXZtNXvyDZRijiErQALMTorcgi79M5uVX9/jMv2Ggb2XAeZhlLD28fHwIDAQAB";
- private static final String allocServer = "http://127.0.0.1:9999/";
+ private static final String allocServer = "http://103.60.220.145:9999/";
public static void main(String[] args) throws Exception {
- Client client = ClientConfig
- .build()
- .setPublicKey(publicKey)
- //.setAllotServer(allocServer)
- .setServerHost("111.1.57.148")
- .setServerPort(20882)
- .setDeviceId("1111111111")
- .setOsName("Android")
- .setOsVersion("6.0")
- .setClientVersion("2.0")
- .setUserId("doctor43test")
- .setMaxHeartbeat(10000)
- .setMinHeartbeat(10000)
- .setSessionStorageDir(MPushClientTest.class.getResource("/").getFile())
- .setLogger(new DefaultLogger())
- .setLogEnabled(true)
- .setEnableHttpProxy(true)
- .setClientListener(new L())
- .create();
- client.start();
- }
+ int count = 1;
+ String serverHost = "127.0.0.1";
+ int sleep = 1000;
+
+ if (args != null && args.length > 0) {
+ count = Integer.parseInt(args[0]);
+ if (args.length > 1) {
+ serverHost = args[1];
+ }
+ if (args.length > 2) {
+ sleep = Integer.parseInt(args[1]);
+ }
+ }
+ ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ ClientListener listener = new L(scheduledExecutor);
+ Client client = null;
+ String cacheDir = MPushClientTest.class.getResource("/").getFile();
+ for (int i = 0; i < count; i++) {
+ client = ClientConfig
+ .build()
+ .setPublicKey(publicKey)
+ //.setAllotServer(allocServer)
+ .setServerHost(serverHost)
+ .setServerPort(3000)
+ .setDeviceId("deviceId-test" + i)
+ .setOsName("android")
+ .setOsVersion("6.0")
+ .setClientVersion("2.0")
+ .setUserId("user-" + i)
+ .setTags("tag-" + i)
+ .setSessionStorageDir(cacheDir + i)
+ .setLogger(new DefaultLogger())
+ .setLogEnabled(true)
+ .setEnableHttpProxy(true)
+ .setClientListener(listener)
+ .create();
+ client.start();
+ Thread.sleep(sleep);
+ }
+ }
public static class L implements ClientListener {
- Thread thread;
+ private final ScheduledExecutorService scheduledExecutor;
boolean flag = true;
+ public L(ScheduledExecutorService scheduledExecutor) {
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
@Override
public void onConnected(Client client) {
flag = true;
@@ -79,37 +96,24 @@ public void onConnected(Client client) {
@Override
public void onDisConnected(Client client) {
flag = false;
- thread.interrupt();
}
@Override
public void onHandshakeOk(final Client client, final int heartbeat) {
- thread = new Thread(new Runnable() {
+ scheduledExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
- while (flag && client.isRunning()) {
- try {
- Thread.sleep(heartbeat);
- } catch (InterruptedException e) {
- break;
- }
- client.healthCheck();
- /* client.stop();
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- break;
- }
- client.start();*/
- }
+ client.healthCheck();
}
- });
- thread.start();
+ }, 10, 10, TimeUnit.SECONDS);
+
+ //client.push(PushContext.build("test"));
+
}
@Override
- public void onReceivePush(Client client, byte[] content) {
-
+ public void onReceivePush(Client client, byte[] content, int messageId) {
+ if (messageId > 0) client.ack(messageId);
}
@Override
@@ -117,5 +121,14 @@ public void onKickUser(String deviceId, String userId) {
}
+ @Override
+ public void onBind(boolean success, String userId) {
+
+ }
+
+ @Override
+ public void onUnbind(boolean success, String userId) {
+
+ }
}
}
\ No newline at end of file