88import okhttp3 .OkHttpClient ;
99import okhttp3 .Request ;
1010import okhttp3 .RequestBody ;
11+ import org .apache .curator .RetryPolicy ;
12+ import org .apache .curator .framework .CuratorFramework ;
13+ import org .apache .curator .framework .CuratorFrameworkFactory ;
14+ import org .apache .curator .framework .recipes .cache .TreeCache ;
15+ import org .apache .curator .framework .recipes .cache .TreeCacheEvent ;
16+ import org .apache .curator .framework .recipes .cache .TreeCacheListener ;
17+ import org .apache .curator .retry .ExponentialBackoffRetry ;
18+ import org .apache .zookeeper .CreateMode ;
1119
1220import java .io .IOException ;
1321import java .lang .reflect .InvocationHandler ;
1422import java .lang .reflect .Method ;
1523import java .lang .reflect .Proxy ;
24+ import java .net .InetAddress ;
1625import java .util .ArrayList ;
1726import java .util .List ;
1827
@@ -26,37 +35,89 @@ public static <T, filters> T createFromRegistry(final Class<T> serviceClass, fin
2635
2736 // 加filte之一
2837
29- // curator Provider list from zk
38+ String service = serviceClass .getCanonicalName ();//"io.kimking.rpcfx.demo.api.UserService";
39+ System .out .println ("====> " +service );
3040 List <String > invokers = new ArrayList <>();
41+
42+ RetryPolicy retryPolicy = new ExponentialBackoffRetry (1000 , 3 );
43+ CuratorFramework client = CuratorFrameworkFactory .builder ().connectString ("localhost:2181" ).namespace ("rpcfx" ).retryPolicy (retryPolicy ).build ();
44+ client .start ();
45+
46+ try {
47+ // ServiceProviderDesc userServiceSesc = ServiceProviderDesc.builder()
48+ // .host(InetAddress.getLocalHost().getHostAddress())
49+ // .port(8082).serviceClass(service).build();
50+ // String userServiceSescJson = JSON.toJSONString(userServiceSesc);
51+
52+
53+ if ( null == client .checkExists ().forPath ("/" + service )) {
54+ return null ;
55+ }
56+
57+ fetchInvokers (client , service , invokers );
58+
59+ final TreeCache treeCache = TreeCache .newBuilder (client , "/" + service ).setCacheData (true ).setMaxDepth (2 ).build ();
60+ treeCache .getListenable ().addListener (new TreeCacheListener () {
61+ public void childEvent (CuratorFramework curatorFramework , TreeCacheEvent treeCacheEvent ) throws Exception {
62+ System .out .println ("treeCacheEvent: " +treeCacheEvent );
63+ fetchInvokers (client , service , invokers );
64+ }
65+ });
66+ treeCache .start ();
67+
68+ } catch (Exception ex ) {
69+ ex .printStackTrace ();
70+ }
71+
72+ //
73+ //
74+ // // register service
75+ // // xxx "io.kimmking.rpcfx.demo.api.UserService"
76+ //
77+
78+
79+ // curator Provider list from zk
80+
3181 // 1. 简单:从zk拿到服务提供的列表
3282 // 2. 挑战:监听zk的临时节点,根据事件更新这个list(注意,需要做个全局map保持每个服务的提供者List)
3383
34- List < String > urls = router . route ( invokers );
84+ return ( T ) create ( serviceClass , invokers , router , loadBalance , filter );
3585
36- String url = loadBalance . select ( urls ); // router, loadbalance
86+ }
3787
38- return (T ) create (serviceClass , url , filter );
3988
40- }
4189
42- public static <T > T create (final Class <T > serviceClass , final String url , Filter ... filters ) {
90+ private static void fetchInvokers (CuratorFramework client , String service , List <String > invokers ) throws Exception {
91+ List <String > services = client .getChildren ().forPath ("/" + service );
92+ invokers .clear ();
93+ for (String svc : services ) {
94+ System .out .println (svc );
95+ String url = svc .replace ("_" , ":" );
96+ invokers .add ("http://" + url );
97+ }
98+ }
4399
100+ private static <T > Object create (Class <T > serviceClass , List <String > invokers , Router router , LoadBalancer loadBalance , Filter ... filters ) {
44101 // 0. 替换动态代理 -> 字节码生成
45- return (T ) Proxy .newProxyInstance (Rpcfx .class .getClassLoader (), new Class []{serviceClass }, new RpcfxInvocationHandler ( serviceClass , url , filters ));
46-
102+ return (T ) Proxy .newProxyInstance (Rpcfx .class .getClassLoader (), new Class []{serviceClass },
103+ new RpcfxInvocationHandler ( serviceClass , invokers , router , loadBalance , filters ));
47104 }
48105
49106 public static class RpcfxInvocationHandler implements InvocationHandler {
50107
51108 public static final MediaType JSONTYPE = MediaType .get ("application/json; charset=utf-8" );
52109
53110 private final Class <?> serviceClass ;
54- private final String url ;
111+ private final List <String > invokers ;
112+ private final Router router ;
113+ private final LoadBalancer loadBalance ;
55114 private final Filter [] filters ;
56115
57- public <T > RpcfxInvocationHandler (Class <T > serviceClass , String url , Filter ... filters ) {
116+ public <T > RpcfxInvocationHandler (Class <T > serviceClass , List < String > invokers , Router router , LoadBalancer loadBalance , Filter ... filters ) {
58117 this .serviceClass = serviceClass ;
59- this .url = url ;
118+ this .invokers = invokers ;
119+ this .router = router ;
120+ this .loadBalance = loadBalance ;
60121 this .filters = filters ;
61122 }
62123
@@ -67,6 +128,17 @@ public <T> RpcfxInvocationHandler(Class<T> serviceClass, String url, Filter... f
67128 @ Override
68129 public Object invoke (Object proxy , Method method , Object [] params ) throws Throwable {
69130
131+ List <String > urls = router .route (invokers );
132+ System .out .println ("router.route => " );
133+ urls .forEach (System .out ::println );
134+ String url = loadBalance .select (urls ); // router, loadbalance
135+ System .out .println ("loadBalance.select => " );
136+ System .out .println ("final => " + url );
137+
138+ if (url == null ) {
139+ throw new RuntimeException ("No available providers from registry center." );
140+ }
141+
70142 // 加filter地方之二
71143 // mock == true, new Student("hubao");
72144
@@ -94,13 +166,15 @@ public Object invoke(Object proxy, Method method, Object[] params) throws Throwa
94166 return JSON .parse (response .getResult ().toString ());
95167 }
96168
169+ OkHttpClient client = new OkHttpClient ();
170+
97171 private RpcfxResponse post (RpcfxRequest req , String url ) throws IOException {
98172 String reqJson = JSON .toJSONString (req );
99173 System .out .println ("req json: " +reqJson );
100174
101175 // 1.可以复用client
102176 // 2.尝试使用httpclient或者netty client
103- OkHttpClient client = new OkHttpClient ();
177+
104178 final Request request = new Request .Builder ()
105179 .url (url )
106180 .post (RequestBody .create (JSONTYPE , reqJson ))
0 commit comments