diff --git a/.gitignore b/.gitignore index 00fd2b4..07273ce 100644 --- a/.gitignore +++ b/.gitignore @@ -5,16 +5,11 @@ */target/* */bin/* */WebContent/* -/.idea -/*.iml /*/*.iml -.idea/* -*.jar -*.war -*.ear -*.iml -./target/* +.idea/ + +target/ # Mobile Tools for Java (J2ME) .mtj.tmp/ @@ -23,6 +18,7 @@ *.jar *.war *.ear +*.iml # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* diff --git a/README.md b/README.md index 5ac9f66..7c76cc5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,52 @@ -# mpush-client-java -mpush java client +## 介绍 +#### mpush-client-java是一个纯java实现的一个MPUS客户端,不依赖其他任何第三方框架。 + +## 用途 +#### 主要用于android sdk底层通信,该工程本身不包含任何android相关代码。 + +## 当前版本 + +```groovy +compile 'com.github.mpusher:mpush-client-java:0.0.2' +``` + +```xml + + com.github.mpusher + mpush-client-java + 0.0.2 + +``` + +## 源码测试 + +参见 [`com.mpush.client.MPushClientTest.java`](https://github.com/mpusher/mpush-client-java/blob/master/src/test/java/com/mpush/client/MPushClientTest.java) +```java +public class MPushClientTest { + private static final String publicKey = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCghPCWCobG8nTD24juwSVataW7iViRxcTkey/B792VZEhuHjQvA3cAJgx2Lv8GnX8NIoShZtoCg3Cx6ecs+VEPD2fBcg2L4JK7xldGpOJ3ONEAyVsLOttXZtNXvyDZRijiErQALMTorcgi79M5uVX9/jMv2Ggb2XAeZhlLD28fHwIDAQAB";//公钥对应服务端的私钥 + private static final String allocServer = "http://127.0.0.1:9999/";//用于获取MPUSH server的ip:port, 用于负载均衡 + + public static void main(String[] args) throws Exception { + Client client = ClientConfig + .build() + .setPublicKey(publicKey) + .setAllotServer(allocServer) + .setDeviceId("1111111111") + .setOsName("Android") + .setOsVersion("6.0") + .setClientVersion("2.0") + .setUserId("doctor43test") + .setSessionStorageDir(MPushClientTest.class.getResource("/").getFile()) + .setLogger(new DefaultLogger()) + .setLogEnabled(true) + .setEnableHttpProxy(false) + .setClientListener(new L()) + .create(); + client.start(); + + LockSupport.park(); + } +``` +#### 说明: + +allocServer的实现参照[AllocServer.java](https://github.com/mpusher/alloc/blob/master/src/main/java/com/shinemo/mpush/alloc/AllocServer.java) diff --git a/pom.xml b/pom.xml index f08b54b..ce0c433 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ com.github.mpusher mpush-client-java - 0.0.2 + 0.8.1 jar mpush-client-java MPUSH消息推送客户端SDK @@ -48,9 +48,76 @@ https://oss.sonatype.org/service/local/staging/deploy/maven2 - - true - + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar-no-fork + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + attach-javadocs + + jar + + + + + + org.apache.maven.plugins + maven-gpg-plugin + + + sign-artifacts + verify + + sign + + + + + + + + + jar + + + + maven-jar-plugin + + + + false + + + true + + ../lib/ + + com.mpush.test.Main + + + + + + package + + + + + @@ -73,43 +140,6 @@ true - - org.apache.maven.plugins - maven-source-plugin - - - attach-sources - - jar-no-fork - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - - - attach-javadocs - - jar - - - - - - org.apache.maven.plugins - maven-gpg-plugin - - - sign-artifacts - verify - - sign - - - - @@ -120,4 +150,4 @@ 4.10 - \ No newline at end of file + diff --git a/src/main/java/com/mpush/api/BindCallback.java b/src/main/java/com/mpush/api/BindCallback.java new file mode 100644 index 0000000..51da223 --- /dev/null +++ b/src/main/java/com/mpush/api/BindCallback.java @@ -0,0 +1,29 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.api; + +/** + * Created by ohun on 16/10/17. + * + * @author ohun@live.cn (夜色) + */ +public interface BindCallback { + +} diff --git a/src/main/java/com/mpush/api/Client.java b/src/main/java/com/mpush/api/Client.java index 9a9f6b6..ef2f53a 100644 --- a/src/main/java/com/mpush/api/Client.java +++ b/src/main/java/com/mpush/api/Client.java @@ -37,4 +37,6 @@ public interface Client extends MPushProtocol { boolean isRunning(); + void onNetStateChange(boolean isConnected); + } diff --git a/src/main/java/com/mpush/api/ClientListener.java b/src/main/java/com/mpush/api/ClientListener.java index bf7658c..719d1ac 100644 --- a/src/main/java/com/mpush/api/ClientListener.java +++ b/src/main/java/com/mpush/api/ClientListener.java @@ -33,7 +33,11 @@ public interface ClientListener { void onHandshakeOk(Client client, int heartbeat); - void onReceivePush(Client client, byte[] content); + void onReceivePush(Client client, byte[] content, int messageId); void onKickUser(String deviceId, String userId); + + void onBind(boolean success, String userId); + + void onUnbind(boolean success, String userId); } diff --git a/src/main/java/com/mpush/api/Constants.java b/src/main/java/com/mpush/api/Constants.java index 402aa4e..a7f5332 100644 --- a/src/main/java/com/mpush/api/Constants.java +++ b/src/main/java/com/mpush/api/Constants.java @@ -30,6 +30,8 @@ public interface Constants { Charset UTF_8 = Charset.forName("UTF-8"); + int DEFAULT_SO_TIMEOUT = 1000 * 3;//客户端连接超时时间 + int DEFAULT_WRITE_TIMEOUT = 1000 * 10;//10s默认packet写超时 byte[] EMPTY_BYTES = new byte[0]; diff --git a/src/main/java/com/mpush/api/Message.java b/src/main/java/com/mpush/api/Message.java index 5b581cb..a4ee885 100644 --- a/src/main/java/com/mpush/api/Message.java +++ b/src/main/java/com/mpush/api/Message.java @@ -32,6 +32,10 @@ public interface Message { Connection getConnection(); + void decodeBody(); + + void encodeBody(); + void send(); void sendRaw(); diff --git a/src/main/java/com/mpush/api/ack/AckCallback.java b/src/main/java/com/mpush/api/ack/AckCallback.java new file mode 100644 index 0000000..37bcdbe --- /dev/null +++ b/src/main/java/com/mpush/api/ack/AckCallback.java @@ -0,0 +1,33 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.api.ack; + +import com.mpush.api.protocol.Packet; + +/** + * Created by ohun on 2016/11/13. + * + * @author ohun@live.cn (夜色) + */ +public interface AckCallback { + void onSuccess(Packet response); + + void onTimeout(Packet request); +} diff --git a/src/main/java/com/mpush/api/ack/AckContext.java b/src/main/java/com/mpush/api/ack/AckContext.java new file mode 100644 index 0000000..21f5b32 --- /dev/null +++ b/src/main/java/com/mpush/api/ack/AckContext.java @@ -0,0 +1,96 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.api.ack; + +import com.mpush.api.protocol.Packet; + +/** + * Created by ohun on 2016/11/13. + * + * @author ohun@live.cn (夜色) + */ +public class AckContext { + public AckCallback callback; + public AckModel ackModel = AckModel.AUTO_ACK; + public int timeout = 1000; + public Packet request; + public int retryCount; + public RetryCondition retryCondition; + + public static AckContext build(AckCallback callback) { + AckContext context = new AckContext(); + context.setCallback(callback); + return context; + } + + public AckCallback getCallback() { + return callback; + } + + public AckContext setCallback(AckCallback callback) { + this.callback = callback; + return this; + } + + public AckModel getAckModel() { + return ackModel; + } + + public AckContext setAckModel(AckModel ackModel) { + this.ackModel = ackModel; + return this; + } + + public int getTimeout() { + return timeout; + } + + public AckContext setTimeout(int timeout) { + this.timeout = timeout; + return this; + } + + public Packet getRequest() { + return request; + } + + public AckContext setRequest(Packet request) { + this.request = request; + return this; + } + + public int getRetryCount() { + return retryCount; + } + + public AckContext setRetryCount(int retryCount) { + this.retryCount = retryCount; + return this; + } + + public RetryCondition getRetryCondition() { + return retryCondition; + } + + public AckContext setRetryCondition(RetryCondition retryCondition) { + this.retryCondition = retryCondition; + return this; + } +} diff --git a/src/main/java/com/mpush/api/ack/AckModel.java b/src/main/java/com/mpush/api/ack/AckModel.java new file mode 100644 index 0000000..42fa6bc --- /dev/null +++ b/src/main/java/com/mpush/api/ack/AckModel.java @@ -0,0 +1,38 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.api.ack; + +import com.mpush.api.protocol.Packet; + +/** + * Created by ohun on 16/9/6. + * + * @author ohun@live.cn (夜色) + */ +public enum AckModel { + NO_ACK((byte) 0),//不需要ACK + AUTO_ACK(Packet.FLAG_AUTO_ACK),//客户端收到消息后自动确认消息 + BIZ_ACK(Packet.FLAG_BIZ_ACK);//由客户端业务自己确认消息是否到达 + public final byte flag; + + AckModel(byte flag) { + this.flag = flag; + } +} diff --git a/src/main/java/com/mpush/api/ack/RetryCondition.java b/src/main/java/com/mpush/api/ack/RetryCondition.java new file mode 100644 index 0000000..80f09db --- /dev/null +++ b/src/main/java/com/mpush/api/ack/RetryCondition.java @@ -0,0 +1,32 @@ +/* + * (C) Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.api.ack; + +import com.mpush.api.connection.Connection; +import com.mpush.api.protocol.Packet; + +/** + * Created by ohun on 18/9/29. + * + * @author ohun@live.cn (夜色) + */ +public interface RetryCondition { + boolean test(Connection connection, Packet packet); +} diff --git a/src/main/java/com/mpush/api/connection/Connection.java b/src/main/java/com/mpush/api/connection/Connection.java index 06a3b81..1d75ace 100644 --- a/src/main/java/com/mpush/api/connection/Connection.java +++ b/src/main/java/com/mpush/api/connection/Connection.java @@ -20,7 +20,6 @@ package com.mpush.api.connection; - import com.mpush.api.Client; import com.mpush.api.protocol.Packet; @@ -35,15 +34,15 @@ public interface Connection { void connect(); - SessionContext getSessionContext(); + void close(); - void send(Packet packet); + void reconnect(); - void close(); + void send(Packet packet);//TODO add send Listener boolean isConnected(); - void reconnect(); + boolean isAutoConnect(); boolean isReadTimeout(); @@ -53,8 +52,11 @@ public interface Connection { void setLastWriteTime(); + void resetTimeout(); + + SessionContext getSessionContext(); + SocketChannel getChannel(); Client getClient(); - } diff --git a/src/main/java/com/mpush/api/connection/SessionContext.java b/src/main/java/com/mpush/api/connection/SessionContext.java index 9902a5c..f47ba38 100644 --- a/src/main/java/com/mpush/api/connection/SessionContext.java +++ b/src/main/java/com/mpush/api/connection/SessionContext.java @@ -29,6 +29,7 @@ public final class SessionContext { public int heartbeat; public Cipher cipher; public String bindUser; + public String tags; public void changeCipher(Cipher cipher) { this.cipher = cipher; @@ -38,8 +39,14 @@ public void setHeartbeat(int heartbeat) { this.heartbeat = heartbeat; } - public void setBindUser(String bindUser) { + public SessionContext setBindUser(String bindUser) { this.bindUser = bindUser; + return this; + } + + public SessionContext setTags(String tags) { + this.tags = tags; + return this; } public boolean handshakeOk() { diff --git a/src/main/java/com/mpush/api/protocol/Command.java b/src/main/java/com/mpush/api/protocol/Command.java index 55ff9a1..18aa70d 100644 --- a/src/main/java/com/mpush/api/protocol/Command.java +++ b/src/main/java/com/mpush/api/protocol/Command.java @@ -48,6 +48,7 @@ public enum Command { GATEWAY_CHAT(20), GROUP(21), GATEWAY_GROUP(22), + ACK(23), UNKNOWN(-1); Command(int cmd) { diff --git a/src/main/java/com/mpush/api/protocol/ErrorCode.java b/src/main/java/com/mpush/api/protocol/ErrorCode.java new file mode 100644 index 0000000..9420436 --- /dev/null +++ b/src/main/java/com/mpush/api/protocol/ErrorCode.java @@ -0,0 +1,55 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.api.protocol; + +/** + * Created by ohun on 2015/12/30. + * + * @author ohun@live.cn + */ +public enum ErrorCode { + OFFLINE(1, "user offline"), + PUSH_CLIENT_FAILURE(2, "push to client failure"), + ROUTER_CHANGE(3, "router change"), + ACK_TIMEOUT(4, "ack timeout"), + DISPATCH_ERROR(100, "handle message error"), + UNSUPPORTED_CMD(101, "unsupported command"), + REPEAT_HANDSHAKE(102, "repeat handshake"), + SESSION_EXPIRED(103, "session expired"), + INVALID_DEVICE(104, "invalid device"), + UNKNOWN(-1, "unknown"); + + ErrorCode(int code, String errorMsg) { + this.errorMsg = errorMsg; + this.errorCode = (byte) code; + } + + public final byte errorCode; + public final String errorMsg; + + public static ErrorCode toEnum(int code) { + for (ErrorCode errorCode : values()) { + if (errorCode.errorCode == code) { + return errorCode; + } + } + return UNKNOWN; + } +} diff --git a/src/main/java/com/mpush/api/protocol/MPushProtocol.java b/src/main/java/com/mpush/api/protocol/MPushProtocol.java index 47a51de..9622ec3 100644 --- a/src/main/java/com/mpush/api/protocol/MPushProtocol.java +++ b/src/main/java/com/mpush/api/protocol/MPushProtocol.java @@ -22,6 +22,7 @@ import com.mpush.api.http.HttpRequest; import com.mpush.api.http.HttpResponse; +import com.mpush.api.push.PushContext; import java.util.concurrent.Future; @@ -32,15 +33,24 @@ */ public interface MPushProtocol { + /** + * 健康检查, 检测读写超时, 发送心跳 + * + * @return true/false Client + */ boolean healthCheck(); void fastConnect(); void handshake(); - void bindUser(String userId); + void bindUser(String userId, String tags); void unbindUser(); + void ack(int messageId); + + Future push(PushContext context); + Future sendHttp(HttpRequest request); } diff --git a/src/main/java/com/mpush/api/protocol/Packet.java b/src/main/java/com/mpush/api/protocol/Packet.java index 07c73b6..0d91b48 100644 --- a/src/main/java/com/mpush/api/protocol/Packet.java +++ b/src/main/java/com/mpush/api/protocol/Packet.java @@ -20,19 +20,21 @@ package com.mpush.api.protocol; - import java.nio.ByteBuffer; /** * Created by ohun on 2015/12/19. * * @author ohun@live.cn (夜色) - * bodyLength(4)+cmd(1)+cc(2)+flags(1)+sessionId(4)+lrc(1)+body(n) + * bodyLength(4)+cmd(1)+cc(2)+flags(1)+sessionId(4)+lrc(1)+body(n) */ public final class Packet { public static final int HEADER_LEN = 13;//packet包头协议长度 + public static final byte FLAG_CRYPTO = 0x01;//packet包启用加密 public static final byte FLAG_COMPRESS = 0x02;//packet包启用压缩 + public static final byte FLAG_BIZ_ACK = 0x04; + public static final byte FLAG_AUTO_ACK = 0x08; public static final byte HB_PACKET_BYTE = -33; public static final Packet HB_PACKET = new Packet(Command.HEARTBEAT); @@ -71,7 +73,7 @@ public void addFlag(byte flag) { } public boolean hasFlag(byte flag) { - return (flags & flag) != 0; + return (flags & flag) == flag; } public short calcCheckCode() { @@ -99,7 +101,7 @@ public byte calcLrc() { return lrc; } - public boolean vaildCheckCode() { + public boolean validCheckCode() { return calcCheckCode() == cc; } diff --git a/src/main/java/com/mpush/api/push/PushCallback.java b/src/main/java/com/mpush/api/push/PushCallback.java new file mode 100644 index 0000000..8c9bb87 --- /dev/null +++ b/src/main/java/com/mpush/api/push/PushCallback.java @@ -0,0 +1,32 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.api.push; + +/** + * Created by ohun on 16/10/13. + * + * @author ohun@live.cn (夜色) + */ +public interface PushCallback { + + void onSuccess(); + + void onTimeout(); +} diff --git a/src/main/java/com/mpush/api/push/PushContext.java b/src/main/java/com/mpush/api/push/PushContext.java new file mode 100644 index 0000000..7939da7 --- /dev/null +++ b/src/main/java/com/mpush/api/push/PushContext.java @@ -0,0 +1,63 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.api.push; + +import com.mpush.api.Constants; +import com.mpush.api.ack.AckContext; +import com.mpush.api.ack.AckModel; + +/** + * Created by ohun on 16/10/13. + * + * @author ohun@live.cn (夜色) + */ +public final class PushContext extends AckContext { + public byte[] content; + + public PushContext(byte[] content) { + this.content = content; + } + + public static PushContext build(byte[] content) { + return new PushContext(content); + } + + public static PushContext build(String content) { + return new PushContext(content.getBytes(Constants.UTF_8)); + } + + public byte[] getContent() { + return content; + } + + public PushContext setContent(byte[] content) { + this.content = content; + return this; + } + + public AckModel getAckModel() { + return ackModel; + } + + public PushContext setAckModel(AckModel ackModel) { + this.ackModel = ackModel; + return this; + } +} diff --git a/src/main/java/com/mpush/api/push/PushResult.java b/src/main/java/com/mpush/api/push/PushResult.java new file mode 100644 index 0000000..01e255f --- /dev/null +++ b/src/main/java/com/mpush/api/push/PushResult.java @@ -0,0 +1,28 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.api.push; + +/** + * Created by ohun on 16/10/13. + * + * @author ohun@live.cn (夜色) + */ +public final class PushResult { +} diff --git a/src/main/java/com/mpush/client/AckRequestMgr.java b/src/main/java/com/mpush/client/AckRequestMgr.java new file mode 100644 index 0000000..76e6f32 --- /dev/null +++ b/src/main/java/com/mpush/client/AckRequestMgr.java @@ -0,0 +1,178 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.client; + +import com.mpush.api.Logger; +import com.mpush.api.ack.AckCallback; +import com.mpush.api.ack.AckContext; +import com.mpush.api.ack.AckModel; +import com.mpush.api.ack.RetryCondition; +import com.mpush.api.connection.Connection; +import com.mpush.api.protocol.Packet; +import com.mpush.util.thread.ExecutorManager; + +import java.util.Map; +import java.util.concurrent.*; + +/** + * Created by ohun on 2016/11/13. + * + * @author ohun@live.cn (夜色) + */ +public final class AckRequestMgr { + private static AckRequestMgr I; + + private final Logger logger = ClientConfig.I.getLogger(); + + private final Map queue = new ConcurrentHashMap<>(); + private final ScheduledExecutorService timer = ExecutorManager.INSTANCE.getTimerThread(); + private final Callable NONE = new Callable() { + @Override + public Boolean call() throws Exception { + return Boolean.FALSE; + } + }; + private Connection connection; + + + public static AckRequestMgr I() { + if (I == null) { + synchronized (AckRequestMgr.class) { + if (I == null) { + I = new AckRequestMgr(); + } + } + } + return I; + } + + private AckRequestMgr() { + } + + public Future add(int sessionId, AckContext context) { + if (context.ackModel == AckModel.NO_ACK) return null; + if (context.callback == null) return null; + return addTask(new RequestTask(sessionId, context)); + } + + public RequestTask getAndRemove(int sessionId) { + return queue.remove(sessionId); + } + + + public void clear() { + for (RequestTask task : queue.values()) { + try { + task.future.cancel(true); + } catch (Exception e) { + } + } + } + + private RequestTask addTask(RequestTask task) { + queue.put(task.sessionId, task); + task.future = timer.schedule(task, task.timeout, TimeUnit.MILLISECONDS); + return task; + } + + public void setConnection(Connection connection) { + this.connection = connection; + } + + public final class RequestTask extends FutureTask implements Runnable { + private final int timeout; + private final long sendTime; + private final int sessionId; + private AckCallback callback; + private Packet request; + private Future future; + private int retryCount; + private RetryCondition retryCondition; + + private RequestTask(AckCallback callback, int timeout, int sessionId, Packet request, int retryCount, RetryCondition retryCondition) { + super(NONE); + this.callback = callback; + this.timeout = timeout; + this.sendTime = System.currentTimeMillis(); + this.sessionId = sessionId; + this.request = request; + this.retryCount = retryCount; + this.retryCondition = retryCondition; + } + + private RequestTask(int sessionId, AckContext context) { + this(context.callback, context.timeout, sessionId, context.request, context.retryCount, context.retryCondition); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException(); + } + + @Override + public void run() { + queue.remove(sessionId); + timeout(); + } + + public void timeout() { + call(null); + } + + public void success(Packet packet) { + call(packet); + } + + private void call(Packet response) { + if (this.future.cancel(true)) { + boolean success = response != null; + this.set(success); + if (callback != null) { + if (success) { + logger.d("receive one ack response, sessionId=%d, costTime=%d, request=%s, response=%s" + , sessionId, (System.currentTimeMillis() - sendTime), request, response); + callback.onSuccess(response); + } else if (request != null && retryCount > 0) { + if (retryCondition == null || retryCondition.test(connection, request)) { + logger.w("one ack request timeout, retry=%d, sessionId=%d, costTime=%d, request=%s" + , retryCount, sessionId, (System.currentTimeMillis() - sendTime), request); + addTask(copy(retryCount - 1)); + connection.send(request); + } else { + logger.w("one ack request timeout, but ignore by condition, retry=%d, sessionId=%d, costTime=%d, request=%s" + , retryCount, sessionId, (System.currentTimeMillis() - sendTime), request); + } + } else { + logger.w("one ack request timeout, sessionId=%d, costTime=%d, request=%s" + , sessionId, (System.currentTimeMillis() - sendTime), request); + callback.onTimeout(request); + } + } + callback = null; + request = null; + retryCondition = null; + } + } + + private RequestTask copy(int retryCount) { + return new RequestTask(callback, timeout, sessionId, request, retryCount, retryCondition); + } + } +} diff --git a/src/main/java/com/mpush/client/AllotClient.java b/src/main/java/com/mpush/client/AllotClient.java index d82ffac..3d14d09 100644 --- a/src/main/java/com/mpush/client/AllotClient.java +++ b/src/main/java/com/mpush/client/AllotClient.java @@ -24,16 +24,22 @@ import com.mpush.api.Logger; import com.mpush.util.IOUtils; +import javax.net.ssl.*; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import static com.mpush.api.Constants.DEFAULT_SO_TIMEOUT; + /** * Created by yxx on 2016/6/8. * @@ -64,8 +70,12 @@ public List queryServerAddressList() { try { URL url = new URL(config.getAllotServer()); connection = (HttpURLConnection) url.openConnection(); - connection.setConnectTimeout(3000); - connection.setReadTimeout(3000); + if (config.getAllotServer().startsWith("https")) { + ((HttpsURLConnection) connection).setSSLSocketFactory(getSSLSocketFactory()); + ((HttpsURLConnection) connection).setHostnameVerifier(new NullHostnameVerifier()); + } + connection.setConnectTimeout(DEFAULT_SO_TIMEOUT); + connection.setReadTimeout(DEFAULT_SO_TIMEOUT); connection.setUseCaches(false); connection.setDoInput(true); connection.setDoOutput(false); @@ -75,8 +85,8 @@ public List queryServerAddressList() { connection.disconnect(); return serverAddress; } - } catch (IOException e) { - logger.e(e, "get server address ex, when connect server."); + } catch (Exception e) { + logger.e(e, "get server address ex, when connect server. allot=%s", config.getAllotServer()); return Collections.emptyList(); } @@ -101,11 +111,50 @@ public List queryServerAddressList() { if (content.length > 0) { String result = new String(content, Constants.UTF_8); logger.w("get server address success result=%s", result); + List serverAddress = new ArrayList<>(); serverAddress.addAll(Arrays.asList(result.split(","))); + this.serverAddress = serverAddress; } else { logger.w("get server address failure return content empty."); } return serverAddress; } + + private SSLSocketFactory getSSLSocketFactory() { + return getTrustAllContext().getSocketFactory(); + } + + private SSLContext getTrustAllContext() { + SSLContext sslContext = null; + try { + sslContext = SSLContext.getInstance("TLS"); + sslContext.init(null, new TrustManager[]{new TrustAnyTrustManager()}, new SecureRandom()); + } catch (Exception e) { + throw new RuntimeException(e); + } + return sslContext; + } + + private static class TrustAnyTrustManager implements X509TrustManager { + public void checkClientTrusted(X509Certificate[] chain, String authType) + throws CertificateException { + } + + public void checkServerTrusted(X509Certificate[] chain, String authType) + throws CertificateException { + } + + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } + + private static class NullHostnameVerifier implements HostnameVerifier { + + @Override + public boolean verify(String s, SSLSession sslSession) { + return true; + } + } } diff --git a/src/main/java/com/mpush/client/ClientConfig.java b/src/main/java/com/mpush/client/ClientConfig.java index dbd4a1a..27c3803 100644 --- a/src/main/java/com/mpush/client/ClientConfig.java +++ b/src/main/java/com/mpush/client/ClientConfig.java @@ -45,6 +45,7 @@ public final class ClientConfig { private String osVersion; private String clientVersion; private String userId; + private String tags; private int maxHeartbeat = Constants.DEF_HEARTBEAT; private int minHeartbeat = Constants.DEF_HEARTBEAT; private int aesKeyLength = 16; @@ -54,6 +55,10 @@ public final class ClientConfig { private Logger logger; private boolean logEnabled; private boolean enableHttpProxy = true; + private int handshakeTimeoutMills = 3000; + private int handshakeRetryCount = 0; + private int bindUserTimeoutMills = 3000; + private int bindUserRetryCount = 1; public static ClientConfig build() { return I = new ClientConfig(); @@ -246,4 +251,45 @@ public ClientConfig setServerPort(int serverPort) { this.serverPort = serverPort; return this; } + + public String getTags() { + return tags; + } + + public ClientConfig setTags(String tags) { + this.tags = tags; + return this; + } + + public int getHandshakeTimeoutMills() { + return handshakeTimeoutMills; + } + + public void setHandshakeTimeoutMills(int handshakeTimeoutMills) { + this.handshakeTimeoutMills = handshakeTimeoutMills; + } + + public int getHandshakeRetryCount() { + return handshakeRetryCount; + } + + public void setHandshakeRetryCount(int handshakeRetryCount) { + this.handshakeRetryCount = handshakeRetryCount; + } + + public int getBindUserTimeoutMills() { + return bindUserTimeoutMills; + } + + public void setBindUserTimeoutMills(int bindUserTimeoutMills) { + this.bindUserTimeoutMills = bindUserTimeoutMills; + } + + public int getBindUserRetryCount() { + return bindUserRetryCount; + } + + public void setBindUserRetryCount(int bindUserRetryCount) { + this.bindUserRetryCount = bindUserRetryCount; + } } diff --git a/src/main/java/com/mpush/client/DefaultClientListener.java b/src/main/java/com/mpush/client/DefaultClientListener.java index f732fe1..ce4b274 100644 --- a/src/main/java/com/mpush/client/DefaultClientListener.java +++ b/src/main/java/com/mpush/client/DefaultClientListener.java @@ -57,34 +57,42 @@ public void run() { } }); } + AckRequestMgr.I().clear(); } @Override public void onHandshakeOk(final Client client, final int heartbeat) { - if (listener != null) { - executor.execute(new Runnable() { - @Override - public void run() { - listener.onHandshakeOk(client, heartbeat); - } - }); - } else { - //do heathCheck + if (listener != null) {//dispatcher已经使用了Executor,此处直接同步调用 + listener.onHandshakeOk(client, heartbeat); } - client.bindUser(ClientConfig.I.getUserId()); + client.bindUser(ClientConfig.I.getUserId(), ClientConfig.I.getTags()); } @Override - public void onReceivePush(final Client client, final byte[] content) { - if (listener != null) { - listener.onReceivePush(client, content); + public void onReceivePush(final Client client, final byte[] content, int messageId) { + if (listener != null) {//dispatcher已经使用了Executor,此处直接同步调用 + listener.onReceivePush(client, content, messageId); } } @Override public void onKickUser(String deviceId, String userId) { - if (listener != null) { + if (listener != null) {//dispatcher已经使用了Executor,此处直接同步调用 listener.onKickUser(deviceId, userId); } } + + @Override + public void onBind(boolean success, String userId) { + if (listener != null) {//dispatcher已经使用了Executor,此处直接同步调用 + listener.onBind(success, userId); + } + } + + @Override + public void onUnbind(boolean success, String userId) { + if (listener != null) {//dispatcher已经使用了Executor,此处直接同步调用 + listener.onUnbind(success, userId); + } + } } \ No newline at end of file diff --git a/src/main/java/com/mpush/client/HttpRequestQueue.java b/src/main/java/com/mpush/client/HttpRequestMgr.java similarity index 91% rename from src/main/java/com/mpush/client/HttpRequestQueue.java rename to src/main/java/com/mpush/client/HttpRequestMgr.java index 75c8097..5f35271 100644 --- a/src/main/java/com/mpush/client/HttpRequestQueue.java +++ b/src/main/java/com/mpush/client/HttpRequestMgr.java @@ -36,16 +36,16 @@ import java.util.concurrent.TimeUnit; import static java.net.HttpURLConnection.HTTP_CLIENT_TIMEOUT; -import static java.net.HttpURLConnection.HTTP_NOT_FOUND; /** * Created by yxx on 2016/2/16. * * @author ohun@live.cn */ -public final class HttpRequestQueue { +public final class HttpRequestMgr { + private static HttpRequestMgr I; private final Map queue = new ConcurrentHashMap<>(); - private final ScheduledExecutorService timer = ExecutorManager.INSTANCE.getHttpRequestThread(); + private final ScheduledExecutorService timer = ExecutorManager.INSTANCE.getTimerThread(); private final Executor executor = ExecutorManager.INSTANCE.getDispatchThread(); //private final HttpResponse response404 = new HttpResponse(HTTP_NOT_FOUND, "Not Found", null, null); private final HttpResponse response408 = new HttpResponse(HTTP_CLIENT_TIMEOUT, "Request Timeout", null, null); @@ -57,6 +57,20 @@ public HttpResponse call() throws Exception { }; private final Logger logger = ClientConfig.I.getLogger(); + public static HttpRequestMgr I() { + if (I == null) { + synchronized (AckRequestMgr.class) { + if (I == null) { + I = new HttpRequestMgr(); + } + } + } + return I; + } + + private HttpRequestMgr() { + } + public Future add(int sessionId, HttpRequest request) { RequestTask task = new RequestTask(sessionId, request); queue.put(sessionId, task); diff --git a/src/main/java/com/mpush/client/MPushClient.java b/src/main/java/com/mpush/client/MPushClient.java index 335e03a..fe5bc82 100644 --- a/src/main/java/com/mpush/client/MPushClient.java +++ b/src/main/java/com/mpush/client/MPushClient.java @@ -22,17 +22,19 @@ import com.mpush.api.Client; import com.mpush.api.Logger; +import com.mpush.api.ack.AckCallback; +import com.mpush.api.ack.AckContext; +import com.mpush.api.ack.RetryCondition; +import com.mpush.api.connection.Connection; import com.mpush.api.connection.SessionContext; import com.mpush.api.connection.SessionStorage; import com.mpush.api.http.HttpRequest; import com.mpush.api.http.HttpResponse; import com.mpush.api.protocol.Command; import com.mpush.api.protocol.Packet; +import com.mpush.api.push.PushContext; import com.mpush.handler.HttpProxyHandler; -import com.mpush.message.BindUserMessage; -import com.mpush.message.FastConnectMessage; -import com.mpush.message.HandshakeMessage; -import com.mpush.message.HttpRequestMessage; +import com.mpush.message.*; import com.mpush.security.AesCipher; import com.mpush.security.CipherBox; import com.mpush.session.PersistentSession; @@ -40,37 +42,45 @@ import com.mpush.util.thread.ExecutorManager; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import static com.mpush.api.Constants.*; +import static com.mpush.api.Constants.MAX_HB_TIMEOUT_COUNT; /** * Created by ohun on 2016/1/17. * * @author ohun@live.cn (夜色) */ -public final class MPushClient implements Client { - public enum State {Started, Shutdown, Destroyed} +/*package*/final class MPushClient implements Client, AckCallback { - private final AtomicReference clientState = new AtomicReference(State.Shutdown); + private enum State {Started, Shutdown, Destroyed} + + private final AtomicReference clientState = new AtomicReference<>(State.Shutdown); - private final MessageDispatcher receiver; private final TcpConnection connection; private final ClientConfig config; private final Logger logger; private int hbTimeoutTimes; - private HttpRequestQueue requestQueue; + private AckRequestMgr ackRequestMgr; + private HttpRequestMgr httpRequestMgr; /*package*/ MPushClient(ClientConfig config) { this.config = config; this.logger = config.getLogger(); - this.receiver = new MessageDispatcher(); - this.connection = new TcpConnection(this, receiver); + + MessageDispatcher receiver = new MessageDispatcher(); + if (config.isEnableHttpProxy()) { - this.requestQueue = new HttpRequestQueue(); - this.receiver.register(Command.HTTP_PROXY, new HttpProxyHandler(requestQueue)); + this.httpRequestMgr = HttpRequestMgr.I(); + receiver.register(Command.HTTP_PROXY, new HttpProxyHandler()); } + + this.ackRequestMgr = AckRequestMgr.I(); + this.connection = new TcpConnection(this, receiver); + this.ackRequestMgr.setConnection(this.connection); } @Override @@ -107,6 +117,50 @@ public boolean isRunning() { return clientState.get() == State.Started && connection.isConnected(); } + /** + * 这个方法主要用于解决网络十分不稳定的场景: + * 正常情况如果网络断开,就应该关闭连接,反之则应去建立连接 + * 但是在网络抖动厉害时就会发生连接频繁的建立/断开。 + *

+ * 处理这种场景的其中一个方案是: + * 当网络断开时不主动关闭连接,而是尝试发送一次心跳检测, + * 如果能收到响应,说明网络短时间内又恢复了, + * 否则就断开连接,等待网络恢复并重建连接。 + * + * @param isConnected true/false + */ + @Override + public void onNetStateChange(boolean isConnected) { + connection.setAutoConnect(isConnected); + logger.i("network state change, isConnected=%b, connection=%s", isConnected, connection); + if (isConnected) { //当有网络时,去尝试重连 + connection.connect(); + } else if (connection.isConnected()) { //无网络,如果连接没有断开,尝试发送一次心跳检测,用于快速校验网络状况 + connection.resetTimeout();//心跳检测前,重置上次读写数据包的时间戳 + hbTimeoutTimes = MAX_HB_TIMEOUT_COUNT - 2;//总共要调用两次healthCheck,第一次用于发送心跳,第二次用于检测是否超时 + final ScheduledExecutorService timer = ExecutorManager.INSTANCE.getTimerThread(); + + //隔3s后发送一次心跳检测,看能不能收到服务端的响应 + timer.schedule(new Runnable() { + int checkCount = 0; + + @Override + public void run() { + logger.w("network disconnected, try test tcp connection checkCount=%d, connection=%s", checkCount, connection); + //如果期间连接状态发生变化,取消任务 + if (connection.isAutoConnect() || !connection.isConnected()) return; + + if (++checkCount <= 2) { + if (healthCheck() && checkCount < 2) { + timer.schedule(this, 3, TimeUnit.SECONDS); + } + } + } + + }, 3, TimeUnit.SECONDS); + } + } + @Override public boolean healthCheck() { @@ -159,9 +213,16 @@ public void fastConnect() { message.sessionId = session.sessionId; message.maxHeartbeat = config.getMaxHeartbeat(); message.minHeartbeat = config.getMinHeartbeat(); + message.encodeBody(); + ackRequestMgr.add(message.getSessionId(), AckContext + .build(this) + .setRequest(message.getPacket()) + .setTimeout(config.getHandshakeTimeoutMills()) + .setRetryCount(config.getHandshakeRetryCount()) + ); + logger.w("<<< do fast connect, message=%s", message); message.sendRaw(); connection.getSessionContext().changeCipher(session.cipher); - logger.w("<<< do fast connect, message=%s", message); } @Override @@ -177,37 +238,74 @@ public void handshake() { message.clientVersion = config.getClientVersion(); message.maxHeartbeat = config.getMaxHeartbeat(); message.minHeartbeat = config.getMinHeartbeat(); + message.encodeBody(); + ackRequestMgr.add(message.getSessionId(), AckContext + .build(this) + .setRequest(message.getPacket()) + .setTimeout(config.getHandshakeTimeoutMills()) + .setRetryCount(config.getHandshakeRetryCount()) + ); + logger.w("<<< do handshake, message=%s", message); message.send(); context.changeCipher(new AesCipher(message.clientKey, message.iv)); - logger.w("<<< do handshake, message=%s", message); } @Override - public void bindUser(String userId) { + public void bindUser(final String userId, final String tags) { + if (!connection.getSessionContext().handshakeOk()) { + logger.w("connection is not handshake ok!"); + return; + } + if (Strings.isBlank(userId)) { logger.w("bind user is null"); return; } SessionContext context = connection.getSessionContext(); - if (userId.equals(context.bindUser)) return; - context.setBindUser(userId); - config.setUserId(userId); - BindUserMessage + if (context.bindUser != null) { + if (userId.equals(context.bindUser)) {//已经绑定 + if (tags != null && tags.equals(context.tags)) return; + } else { + unbindUser();//切换用户,要先解绑老用户 + } + } + context.setBindUser(userId).setTags(tags); + config.setUserId(userId).setTags(tags); + BindUserMessage message = BindUserMessage .buildBind(connection) .setUserId(userId) - .send(); + .setTags(tags); + message.encodeBody(); + ackRequestMgr.add(message.getSessionId(), AckContext + .build(this) + .setRequest(message.getPacket()) + .setTimeout(config.getBindUserTimeoutMills()) + .setRetryCount(config.getBindUserRetryCount()) + .setRetryCondition(new RetryCondition() { + @Override + public boolean test(Connection connection, Packet packet) { + return connection.getSessionContext().handshakeOk(); + } + }) + ); logger.w("<<< do bind user, userId=%s", userId); + message.send(); + } @Override public void unbindUser() { + if (!connection.getSessionContext().handshakeOk()) { + logger.w("connection is not handshake ok!"); + return; + } String userId = config.getUserId(); if (Strings.isBlank(userId)) { logger.w("unbind user is null"); return; } - config.setUserId(null); - connection.getSessionContext().setBindUser(null); + config.setUserId(null).setTags(null); + connection.getSessionContext().setBindUser(null).setTags(null); BindUserMessage .buildUnbind(connection) .setUserId(userId) @@ -215,6 +313,28 @@ public void unbindUser() { logger.w("<<< do unbind user, userId=%s", userId); } + @Override + public void ack(int messageId) { + if (messageId > 0) { + AckMessage ackMessage = new AckMessage(messageId, connection); + ackMessage.sendRaw(); + logger.d("<<< send ack for push messageId=%d", messageId); + } + } + + @Override + public Future push(PushContext context) { + if (!connection.getSessionContext().handshakeOk()) { + logger.w("connection is not handshake ok!"); + return null; + } + PushMessage message = new PushMessage(context.content, connection); + message.addFlag(context.ackModel.flag); + message.send(); + logger.d("<<< send push message=%s", message); + return ackRequestMgr.add(message.getSessionId(), context); + } + @Override public Future sendHttp(HttpRequest request) { if (connection.getSessionContext().handshakeOk()) { @@ -225,8 +345,18 @@ public Future sendHttp(HttpRequest request) { message.body = request.getBody(); message.send(); logger.d("<<< send http proxy, request=%s", request); - return requestQueue.add(message.getSessionId(), request); + return httpRequestMgr.add(message.getSessionId(), request); } return null; } + + @Override + public void onSuccess(Packet response) { + + } + + @Override + public void onTimeout(Packet request) { + this.connection.reconnect(); + } } diff --git a/src/main/java/com/mpush/client/MessageDispatcher.java b/src/main/java/com/mpush/client/MessageDispatcher.java index 89e4773..59bea40 100644 --- a/src/main/java/com/mpush/client/MessageDispatcher.java +++ b/src/main/java/com/mpush/client/MessageDispatcher.java @@ -42,6 +42,7 @@ public final class MessageDispatcher implements PacketReceiver { private final Executor executor = ExecutorManager.INSTANCE.getDispatchThread(); private final Map handlers = new HashMap<>(); private final Logger logger = ClientConfig.I.getLogger(); + private final AckRequestMgr ackRequestMgr; public MessageDispatcher() { register(Command.HEARTBEAT, new HeartbeatHandler()); @@ -51,6 +52,9 @@ public MessageDispatcher() { register(Command.OK, new OkMessageHandler()); register(Command.ERROR, new ErrorMessageHandler()); register(Command.PUSH, new PushMessageHandler()); + register(Command.ACK, new AckHandler()); + + this.ackRequestMgr = AckRequestMgr.I(); } public void register(Command command, MessageHandler handler) { @@ -65,6 +69,7 @@ public void onReceive(final Packet packet, final Connection connection) { @Override public void run() { try { + doAckResponse(packet); handler.handle(packet, connection); } catch (Throwable throwable) { logger.e(throwable, "handle message error, packet=%s", packet); @@ -73,8 +78,15 @@ public void run() { } }); } else { - logger.w("<<< receive unsupported message, do reconnect, packet=%s", packet); - connection.reconnect(); + logger.w("<<< receive unsupported message, packet=%s", packet); + //connection.reconnect(); + } + } + + private void doAckResponse(Packet packet) { + AckRequestMgr.RequestTask task = ackRequestMgr.getAndRemove(packet.sessionId); + if (task != null) { + task.success(packet); } } } diff --git a/src/main/java/com/mpush/client/TcpConnection.java b/src/main/java/com/mpush/client/TcpConnection.java index bfb4b7d..2252616 100644 --- a/src/main/java/com/mpush/client/TcpConnection.java +++ b/src/main/java/com/mpush/client/TcpConnection.java @@ -38,6 +38,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; +import static com.mpush.api.Constants.DEFAULT_SO_TIMEOUT; import static com.mpush.api.Constants.MAX_RESTART_COUNT; import static com.mpush.api.Constants.MAX_TOTAL_RESTART_COUNT; import static com.mpush.client.TcpConnection.State.*; @@ -52,9 +53,8 @@ public final class TcpConnection implements Connection { public enum State {connecting, connected, disconnecting, disconnected} - private final AtomicReference state = new AtomicReference(disconnected); + private final AtomicReference state = new AtomicReference<>(disconnected); private final EventLock connLock = new EventLock(); - private final ClientConfig config; private final Logger logger; private final ClientListener listener; private final MPushClient client; @@ -71,8 +71,8 @@ public enum State {connecting, connected, disconnecting, disconnected} private volatile boolean autoConnect = true; public TcpConnection(MPushClient client, PacketReceiver receiver) { + ClientConfig config = ClientConfig.I; this.client = client; - this.config = ClientConfig.I; this.logger = config.getLogger(); this.listener = config.getClientListener(); this.allotClient = new AllotClient(); @@ -180,9 +180,8 @@ private boolean doReconnect() { private boolean doConnect() { List address = allotClient.getServerAddress(); if (address != null && address.size() > 0) { - Iterator it = address.iterator(); - while (it.hasNext()) { - String[] host_port = it.next().split(":"); + for (int i = 0; i < address.size(); i++) { + String[] host_port = address.get(i).split(":"); if (host_port.length == 2) { String host = host_port[0]; @@ -192,8 +191,7 @@ private boolean doConnect() { return true; } } - - it.remove(); + address.remove(i--); } } return false; @@ -206,7 +204,6 @@ private boolean doConnect(String host, int port) { try { channel = SocketChannel.open(); channel.socket().setTcpNoDelay(true); - channel.socket().setKeepAlive(true); channel.connect(new InetSocketAddress(host, port)); logger.w("connect server ok [%s:%s]", host, port); onConnected(channel); @@ -264,6 +261,16 @@ public boolean isReadTimeout() { return System.currentTimeMillis() - lastReadTime > context.heartbeat + 1000; } + @Override + public void resetTimeout() { + lastReadTime = lastWriteTime = 0; + } + + @Override + public boolean isAutoConnect() { + return autoConnect; + } + @Override public SessionContext getSessionContext() { return context; @@ -276,7 +283,14 @@ public boolean isWriteTimeout() { @Override public String toString() { - return "TcpConnection{" + ", lastReadTime=" + lastReadTime + ", lastWriteTime=" + lastWriteTime + ", context=" - + context + '}'; + return "TcpConnection{" + + "state=" + state + + ", channel=" + channel + + ", lastReadTime=" + lastReadTime + + ", lastWriteTime=" + lastWriteTime + + ", totalReconnectCount=" + totalReconnectCount + + ", reconnectCount=" + reconnectCount + + ", autoConnect=" + autoConnect + + '}'; } } diff --git a/src/main/java/com/mpush/codec/AsyncPacketReader.java b/src/main/java/com/mpush/codec/AsyncPacketReader.java index cf6cace..07088a2 100644 --- a/src/main/java/com/mpush/codec/AsyncPacketReader.java +++ b/src/main/java/com/mpush/codec/AsyncPacketReader.java @@ -101,7 +101,15 @@ private boolean read(SocketChannel channel, ByteBuffer in) { } catch (IOException e) { logger.e(e, "read packet ex, do reconnect"); readCount = -1; + sleep4Reconnect(); } return readCount > 0; } + + private void sleep4Reconnect() { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + } + } } diff --git a/src/main/java/com/mpush/handler/AckHandler.java b/src/main/java/com/mpush/handler/AckHandler.java new file mode 100644 index 0000000..9e946e9 --- /dev/null +++ b/src/main/java/com/mpush/handler/AckHandler.java @@ -0,0 +1,56 @@ +/* + * (C) Copyright 2015-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * ohun@live.cn (夜色) + */ + +package com.mpush.handler; + +import com.mpush.api.Logger; +import com.mpush.api.connection.Connection; +import com.mpush.api.protocol.Packet; +import com.mpush.client.ClientConfig; +import com.mpush.client.AckRequestMgr; +import com.mpush.message.AckMessage; + +/** + * Created by ohun on 16/9/5. + * + * @author ohun@live.cn (夜色) + */ +public class AckHandler extends BaseMessageHandler { + + private final Logger logger; + private final AckRequestMgr ackRequestMgr; + + public AckHandler() { + this.logger = ClientConfig.I.getLogger(); + this.ackRequestMgr = AckRequestMgr.I(); + } + + @Override + public AckMessage decode(Packet packet, Connection connection) { + return new AckMessage(packet, connection); + } + + @Override + public void handle(AckMessage message) { + /*AckRequestMgr.RequestTask task = ackRequestMgr.getAndRemove(message.getSessionId()); + if (task != null) { + task.success(message.getPacket()); + }*/ + } +} diff --git a/src/main/java/com/mpush/handler/BaseMessageHandler.java b/src/main/java/com/mpush/handler/BaseMessageHandler.java index 9230f73..acc6346 100644 --- a/src/main/java/com/mpush/handler/BaseMessageHandler.java +++ b/src/main/java/com/mpush/handler/BaseMessageHandler.java @@ -33,6 +33,7 @@ public abstract class BaseMessageHandler implements MessageHa public void handle(Packet packet, Connection connection) { T t = decode(packet, connection); if (t != null) { + t.decodeBody(); handle(t); } } diff --git a/src/main/java/com/mpush/handler/ErrorMessageHandler.java b/src/main/java/com/mpush/handler/ErrorMessageHandler.java index c5aa8cb..1de83be 100644 --- a/src/main/java/com/mpush/handler/ErrorMessageHandler.java +++ b/src/main/java/com/mpush/handler/ErrorMessageHandler.java @@ -20,13 +20,14 @@ package com.mpush.handler; - +import com.mpush.api.Logger; import com.mpush.api.connection.Connection; import com.mpush.api.protocol.Command; -import com.mpush.message.ErrorMessage; -import com.mpush.client.ClientConfig; -import com.mpush.api.Logger; import com.mpush.api.protocol.Packet; +import com.mpush.client.ClientConfig; +import com.mpush.message.ErrorMessage; + +import static com.mpush.api.protocol.ErrorCode.REPEAT_HANDSHAKE; /** * Created by ohun on 2015/12/30. @@ -48,7 +49,10 @@ public void handle(ErrorMessage message) { ClientConfig.I.getSessionStorage().clearSession(); message.getConnection().getClient().handshake(); } else if (message.cmd == Command.HANDSHAKE.cmd) { - message.getConnection().getClient().stop(); + if (message.code != REPEAT_HANDSHAKE.errorCode //重复握手的错误消息直接忽略 + && !REPEAT_HANDSHAKE.errorMsg.equals(message.reason)) { + message.getConnection().getClient().stop(); + } } else { message.getConnection().reconnect(); } diff --git a/src/main/java/com/mpush/handler/HttpProxyHandler.java b/src/main/java/com/mpush/handler/HttpProxyHandler.java index b2ae733..8ca9df5 100644 --- a/src/main/java/com/mpush/handler/HttpProxyHandler.java +++ b/src/main/java/com/mpush/handler/HttpProxyHandler.java @@ -20,11 +20,10 @@ package com.mpush.handler; - import com.mpush.api.connection.Connection; import com.mpush.api.http.HttpResponse; import com.mpush.api.protocol.Packet; -import com.mpush.client.HttpRequestQueue; +import com.mpush.client.HttpRequestMgr; import com.mpush.api.Logger; import com.mpush.client.ClientConfig; import com.mpush.message.HttpResponseMessage; @@ -36,10 +35,10 @@ */ public final class HttpProxyHandler extends BaseMessageHandler { private final Logger logger = ClientConfig.I.getLogger(); - private final HttpRequestQueue queue; + private final HttpRequestMgr httpRequestMgr; - public HttpProxyHandler(HttpRequestQueue queue) { - this.queue = queue; + public HttpProxyHandler() { + this.httpRequestMgr = HttpRequestMgr.I(); } @Override @@ -49,7 +48,7 @@ public HttpResponseMessage decode(Packet packet, Connection connection) { @Override public void handle(HttpResponseMessage message) { - HttpRequestQueue.RequestTask task = queue.getAndRemove(message.getSessionId()); + HttpRequestMgr.RequestTask task = httpRequestMgr.getAndRemove(message.getSessionId()); if (task != null) { HttpResponse response = new HttpResponse(message.statusCode, message.reasonPhrase, message.headers, message.body); task.setResponse(response); diff --git a/src/main/java/com/mpush/handler/OkMessageHandler.java b/src/main/java/com/mpush/handler/OkMessageHandler.java index 8ef2910..30ecc99 100644 --- a/src/main/java/com/mpush/handler/OkMessageHandler.java +++ b/src/main/java/com/mpush/handler/OkMessageHandler.java @@ -20,8 +20,8 @@ package com.mpush.handler; - import com.mpush.api.connection.Connection; +import com.mpush.api.protocol.Command; import com.mpush.api.protocol.Packet; import com.mpush.message.OkMessage; import com.mpush.api.Logger; @@ -42,6 +42,12 @@ public OkMessage decode(Packet packet, Connection connection) { @Override public void handle(OkMessage message) { + if (message.cmd == Command.BIND.cmd) { + ClientConfig.I.getClientListener().onBind(true, message.getConnection().getSessionContext().bindUser); + } else if (message.cmd == Command.UNBIND.cmd) { + ClientConfig.I.getClientListener().onUnbind(true, null); + } + logger.w(">>> receive ok message=%s", message); } } diff --git a/src/main/java/com/mpush/handler/PushMessageHandler.java b/src/main/java/com/mpush/handler/PushMessageHandler.java index bc69d22..44eaea2 100644 --- a/src/main/java/com/mpush/handler/PushMessageHandler.java +++ b/src/main/java/com/mpush/handler/PushMessageHandler.java @@ -23,6 +23,7 @@ import com.mpush.api.ClientListener; import com.mpush.api.connection.Connection; import com.mpush.api.protocol.Packet; +import com.mpush.message.AckMessage; import com.mpush.message.PushMessage; import com.mpush.client.ClientConfig; import com.mpush.api.Logger; @@ -43,7 +44,13 @@ public PushMessage decode(Packet packet, Connection connection) { @Override public void handle(PushMessage message) { - logger.d(">>> receive push messaged=%s", message.content.length); - listener.onReceivePush(message.getConnection().getClient(), message.content); + logger.d(">>> receive push message=%s", message.content.length); + listener.onReceivePush(message.getConnection().getClient(), + message.content, + message.bizAck() ? message.getSessionId() : 0); + if (message.autoAck()) { + AckMessage.from(message).sendRaw(); + logger.d("<<< send ack for push messageId=%d", message.getSessionId()); + } } } diff --git a/src/main/java/com/mpush/message/AckMessage.java b/src/main/java/com/mpush/message/AckMessage.java new file mode 100644 index 0000000..053653a --- /dev/null +++ b/src/main/java/com/mpush/message/AckMessage.java @@ -0,0 +1,46 @@ +package com.mpush.message; + +import com.mpush.api.connection.Connection; +import com.mpush.api.protocol.Command; +import com.mpush.api.protocol.Packet; +import com.mpush.util.ByteBuf; + +import java.nio.ByteBuffer; + +/** + * Created by ohun on 16/9/5. + * + * @author ohun@live.cn (夜色) + */ +public class AckMessage extends ByteBufMessage { + + public AckMessage(int sessionId, Connection connection) { + super(new Packet(Command.ACK, sessionId), connection); + } + + public AckMessage(Packet packet, Connection connection) { + super(packet, connection); + } + + @Override + protected void decode(ByteBuffer body) { + + } + + @Override + protected void encode(ByteBuf body) { + + } + + + public static AckMessage from(BaseMessage src) { + return new AckMessage(new Packet(Command.ACK, src.getSessionId()), src.connection); + } + + @Override + public String toString() { + return "AckMessage{" + + "packet=" + packet + + '}'; + } +} \ No newline at end of file diff --git a/src/main/java/com/mpush/message/BaseMessage.java b/src/main/java/com/mpush/message/BaseMessage.java index d072ca9..31545f1 100644 --- a/src/main/java/com/mpush/message/BaseMessage.java +++ b/src/main/java/com/mpush/message/BaseMessage.java @@ -20,7 +20,6 @@ package com.mpush.message; - import com.mpush.util.IOUtils; import com.mpush.client.ClientConfig; import com.mpush.api.Message; @@ -36,17 +35,23 @@ * @author ohun@live.cn (夜色) */ public abstract class BaseMessage implements Message { + public static final byte STATUS_DECODED = 1; + public static final byte STATUS_ENCODED = 2; private static final AtomicInteger SID_SEQ = new AtomicInteger(); protected final Packet packet; protected final Connection connection; + protected byte status = 0; public BaseMessage(Packet packet, Connection connection) { this.packet = packet; this.connection = connection; - this.decodeBody(); } - protected void decodeBody() { + @Override + public void decodeBody() { + if ((status & STATUS_DECODED) != 0) return; + else status |= STATUS_DECODED; + if (packet.body != null && packet.body.length > 0) { //1.解密 byte[] tmp = packet.body; @@ -70,7 +75,11 @@ protected void decodeBody() { } } - protected void encodeBody() { + @Override + public void encodeBody() { + if ((status & STATUS_ENCODED) != 0) return; + else status |= STATUS_ENCODED; + byte[] tmp = encode(); if (tmp != null && tmp.length > 0) { //1.压缩 @@ -95,6 +104,13 @@ protected void encodeBody() { } } + private void encodeBodyRaw() { + if ((status & STATUS_ENCODED) != 0) return; + else status |= STATUS_ENCODED; + + packet.body = encode(); + } + protected abstract void decode(byte[] body); protected abstract byte[] encode(); @@ -121,7 +137,7 @@ public void send() { @Override public void sendRaw() { - packet.body = encode(); + encodeBodyRaw(); connection.send(packet); } diff --git a/src/main/java/com/mpush/message/FastConnectMessage.java b/src/main/java/com/mpush/message/FastConnectMessage.java index e831818..fad79f5 100644 --- a/src/main/java/com/mpush/message/FastConnectMessage.java +++ b/src/main/java/com/mpush/message/FastConnectMessage.java @@ -20,7 +20,6 @@ package com.mpush.message; - import com.mpush.api.connection.Connection; import com.mpush.api.protocol.Command; import com.mpush.api.protocol.Packet; @@ -66,7 +65,8 @@ public void encode(ByteBuf body) { @Override public String toString() { return "FastConnectMessage{" + - "sessionId='" + sessionId + '\'' + + "sessionId=" + packet.sessionId + + ", sessionId='" + sessionId + '\'' + ", deviceId='" + deviceId + '\'' + ", minHeartbeat=" + minHeartbeat + ", maxHeartbeat=" + maxHeartbeat + diff --git a/src/main/java/com/mpush/message/FastConnectOkMessage.java b/src/main/java/com/mpush/message/FastConnectOkMessage.java index d81b5a6..a84c197 100644 --- a/src/main/java/com/mpush/message/FastConnectOkMessage.java +++ b/src/main/java/com/mpush/message/FastConnectOkMessage.java @@ -20,7 +20,6 @@ package com.mpush.message; - import com.mpush.api.connection.Connection; import com.mpush.api.protocol.Packet; import com.mpush.util.ByteBuf; @@ -61,7 +60,8 @@ public FastConnectOkMessage setHeartbeat(int heartbeat) { @Override public String toString() { return "FastConnectOkMessage{" + - "heartbeat=" + heartbeat + + "sessionId=" + packet.sessionId + + ", heartbeat=" + heartbeat + '}'; } } diff --git a/src/main/java/com/mpush/message/HandshakeMessage.java b/src/main/java/com/mpush/message/HandshakeMessage.java index 27ed9b2..233d1fe 100644 --- a/src/main/java/com/mpush/message/HandshakeMessage.java +++ b/src/main/java/com/mpush/message/HandshakeMessage.java @@ -20,7 +20,6 @@ package com.mpush.message; - import com.mpush.api.connection.Connection; import com.mpush.api.protocol.Command; import com.mpush.api.protocol.Packet; @@ -82,7 +81,8 @@ protected void encode(ByteBuf body) { @Override public String toString() { return "HandshakeMessage{" + - "deviceId='" + deviceId + '\'' + + "sessionId=" + packet.sessionId + + ", deviceId='" + deviceId + '\'' + ", osName='" + osName + '\'' + ", osVersion='" + osVersion + '\'' + ", clientVersion='" + clientVersion + '\'' + diff --git a/src/main/java/com/mpush/message/PushMessage.java b/src/main/java/com/mpush/message/PushMessage.java index 58d3f28..f4b84db 100644 --- a/src/main/java/com/mpush/message/PushMessage.java +++ b/src/main/java/com/mpush/message/PushMessage.java @@ -21,6 +21,7 @@ import com.mpush.api.connection.Connection; +import com.mpush.api.protocol.Command; import com.mpush.api.protocol.Packet; import com.mpush.api.Constants; @@ -33,6 +34,11 @@ public final class PushMessage extends BaseMessage { public byte[] content; + public PushMessage(byte[] content, Connection connection) { + super(new Packet(Command.PUSH, genSessionId()), connection); + this.content = content; + } + public PushMessage(Packet packet, Connection connection) { super(packet, connection); } @@ -47,6 +53,19 @@ public byte[] encode() { return content; } + public boolean autoAck() { + return packet.hasFlag(Packet.FLAG_AUTO_ACK); + } + + public boolean bizAck() { + return packet.hasFlag(Packet.FLAG_BIZ_ACK); + } + + public PushMessage addFlag(byte flag) { + packet.addFlag(flag); + return this; + } + @Override public String toString() { return "PushMessage{" + diff --git a/src/main/java/com/mpush/util/MPUtils.java b/src/main/java/com/mpush/util/MPUtils.java index 683ec79..4265ee7 100644 --- a/src/main/java/com/mpush/util/MPUtils.java +++ b/src/main/java/com/mpush/util/MPUtils.java @@ -24,6 +24,7 @@ import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; +import java.util.regex.Pattern; /** * Created by ohun on 2016/1/25. @@ -32,6 +33,19 @@ */ public final class MPUtils { + /** + * Quick and dirty pattern to differentiate IP addresses from hostnames. This is an approximation + * of Android's private InetAddress#isNumeric API. + *

+ *

This matches IPv6 addresses as a hex string containing at least one colon, and possibly + * including dots after the first colon. It matches IPv4 addresses as strings containing only + * decimal digits and dots. This pattern matches strings like "a:.23" and "54" that are neither IP + * addresses nor hostnames; they will be verified as IP addresses (which is a more strict + * verification). + */ + private static final Pattern VERIFY_AS_IP_ADDRESS = Pattern.compile( + "([0-9a-fA-F]*:[0-9a-fA-F:.]*)|([\\d.]+)"); + public static String parseHost2Ip(String host) { InetAddress ia = null; try { @@ -79,4 +93,11 @@ public static Map headerFromString(String headersString) { } return headers; } + + /** + * Returns true if {@code host} is not a host name and might be an IP address. + */ + public static boolean verifyAsIpAddress(String host) { + return VERIFY_AS_IP_ADDRESS.matcher(host).matches(); + } } diff --git a/src/main/java/com/mpush/util/thread/ExecutorManager.java b/src/main/java/com/mpush/util/thread/ExecutorManager.java index 124b498..f122863 100644 --- a/src/main/java/com/mpush/util/thread/ExecutorManager.java +++ b/src/main/java/com/mpush/util/thread/ExecutorManager.java @@ -40,12 +40,11 @@ public final class ExecutorManager { public static final String READ_THREAD_NAME = THREAD_NAME_PREFIX + "read-t"; public static final String DISPATCH_THREAD_NAME = THREAD_NAME_PREFIX + "dispatch-t"; public static final String START_THREAD_NAME = THREAD_NAME_PREFIX + "start-t"; - public static final String HTTP_THREAD_NAME = THREAD_NAME_PREFIX + "http-t"; + public static final String TIMER_THREAD_NAME = THREAD_NAME_PREFIX + "timer-t"; public static final ExecutorManager INSTANCE = new ExecutorManager(); private ThreadPoolExecutor writeThread; private ThreadPoolExecutor dispatchThread; - private ThreadPoolExecutor startThread; - private ScheduledExecutorService httpRequestThread; + private ScheduledExecutorService timerThread; public ThreadPoolExecutor getWriteThread() { if (writeThread == null || writeThread.isShutdown()) { @@ -69,24 +68,13 @@ public ThreadPoolExecutor getDispatchThread() { return dispatchThread; } - public ThreadPoolExecutor getStartThread() { - if (startThread == null || startThread.isShutdown()) { - startThread = new ThreadPoolExecutor(1, 1, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(1), - new NamedThreadFactory(START_THREAD_NAME), + public ScheduledExecutorService getTimerThread() { + if (timerThread == null || timerThread.isShutdown()) { + timerThread = new ScheduledThreadPoolExecutor(1, + new NamedThreadFactory(TIMER_THREAD_NAME), new RejectedHandler()); } - return startThread; - } - - public ScheduledExecutorService getHttpRequestThread() { - if (httpRequestThread == null || httpRequestThread.isShutdown()) { - httpRequestThread = new ScheduledThreadPoolExecutor(1, - new NamedThreadFactory(HTTP_THREAD_NAME), - new RejectedHandler()); - } - return httpRequestThread; + return timerThread; } public synchronized void shutdown() { @@ -98,14 +86,9 @@ public synchronized void shutdown() { dispatchThread.shutdownNow(); dispatchThread = null; } - if (startThread != null) { - startThread.shutdownNow(); - startThread = null; - - } - if (httpRequestThread != null) { - httpRequestThread.shutdownNow(); - httpRequestThread = null; + if (timerThread != null) { + timerThread.shutdownNow(); + timerThread = null; } } @@ -117,7 +100,7 @@ private static class RejectedHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - ClientConfig.I.getLogger().w("a task was rejected execute=%s", executor); + ClientConfig.I.getLogger().w("a task was rejected r=%s", r); } } } diff --git a/src/test/java/com/mpush/client/MPushClientTest.java b/src/test/java/com/mpush/client/MPushClientTest.java index 5d46ff9..5068191 100644 --- a/src/test/java/com/mpush/client/MPushClientTest.java +++ b/src/test/java/com/mpush/client/MPushClientTest.java @@ -22,17 +22,11 @@ import com.mpush.api.Client; import com.mpush.api.ClientListener; -import com.mpush.api.Constants; -import com.mpush.api.http.HttpCallback; -import com.mpush.api.http.HttpMethod; -import com.mpush.api.http.HttpRequest; -import com.mpush.api.http.HttpResponse; import com.mpush.util.DefaultLogger; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.LockSupport; /** * Created by ohun on 2016/1/25. @@ -41,36 +35,59 @@ */ public class MPushClientTest { private static final String publicKey = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCghPCWCobG8nTD24juwSVataW7iViRxcTkey/B792VZEhuHjQvA3cAJgx2Lv8GnX8NIoShZtoCg3Cx6ecs+VEPD2fBcg2L4JK7xldGpOJ3ONEAyVsLOttXZtNXvyDZRijiErQALMTorcgi79M5uVX9/jMv2Ggb2XAeZhlLD28fHwIDAQAB"; - private static final String allocServer = "http://127.0.0.1:9999/"; + private static final String allocServer = "http://103.60.220.145:9999/"; public static void main(String[] args) throws Exception { - Client client = ClientConfig - .build() - .setPublicKey(publicKey) - //.setAllotServer(allocServer) - .setServerHost("111.1.57.148") - .setServerPort(20882) - .setDeviceId("1111111111") - .setOsName("Android") - .setOsVersion("6.0") - .setClientVersion("2.0") - .setUserId("doctor43test") - .setMaxHeartbeat(10000) - .setMinHeartbeat(10000) - .setSessionStorageDir(MPushClientTest.class.getResource("/").getFile()) - .setLogger(new DefaultLogger()) - .setLogEnabled(true) - .setEnableHttpProxy(true) - .setClientListener(new L()) - .create(); - client.start(); - } + int count = 1; + String serverHost = "127.0.0.1"; + int sleep = 1000; + + if (args != null && args.length > 0) { + count = Integer.parseInt(args[0]); + if (args.length > 1) { + serverHost = args[1]; + } + if (args.length > 2) { + sleep = Integer.parseInt(args[1]); + } + } + ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + ClientListener listener = new L(scheduledExecutor); + Client client = null; + String cacheDir = MPushClientTest.class.getResource("/").getFile(); + for (int i = 0; i < count; i++) { + client = ClientConfig + .build() + .setPublicKey(publicKey) + //.setAllotServer(allocServer) + .setServerHost(serverHost) + .setServerPort(3000) + .setDeviceId("deviceId-test" + i) + .setOsName("android") + .setOsVersion("6.0") + .setClientVersion("2.0") + .setUserId("user-" + i) + .setTags("tag-" + i) + .setSessionStorageDir(cacheDir + i) + .setLogger(new DefaultLogger()) + .setLogEnabled(true) + .setEnableHttpProxy(true) + .setClientListener(listener) + .create(); + client.start(); + Thread.sleep(sleep); + } + } public static class L implements ClientListener { - Thread thread; + private final ScheduledExecutorService scheduledExecutor; boolean flag = true; + public L(ScheduledExecutorService scheduledExecutor) { + this.scheduledExecutor = scheduledExecutor; + } + @Override public void onConnected(Client client) { flag = true; @@ -79,37 +96,24 @@ public void onConnected(Client client) { @Override public void onDisConnected(Client client) { flag = false; - thread.interrupt(); } @Override public void onHandshakeOk(final Client client, final int heartbeat) { - thread = new Thread(new Runnable() { + scheduledExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { - while (flag && client.isRunning()) { - try { - Thread.sleep(heartbeat); - } catch (InterruptedException e) { - break; - } - client.healthCheck(); - /* client.stop(); - try { - Thread.sleep(3000); - } catch (InterruptedException e) { - break; - } - client.start();*/ - } + client.healthCheck(); } - }); - thread.start(); + }, 10, 10, TimeUnit.SECONDS); + + //client.push(PushContext.build("test")); + } @Override - public void onReceivePush(Client client, byte[] content) { - + public void onReceivePush(Client client, byte[] content, int messageId) { + if (messageId > 0) client.ack(messageId); } @Override @@ -117,5 +121,14 @@ public void onKickUser(String deviceId, String userId) { } + @Override + public void onBind(boolean success, String userId) { + + } + + @Override + public void onUnbind(boolean success, String userId) { + + } } } \ No newline at end of file