11package io .github .kimmking .gateway .outbound .okhttp ;
22
3+ import io .github .kimmking .gateway .outbound .httpclient4 .NamedThreadFactory ;
4+ import io .netty .buffer .Unpooled ;
5+ import io .netty .channel .ChannelFutureListener ;
6+ import io .netty .channel .ChannelHandlerContext ;
7+ import io .netty .handler .codec .http .DefaultFullHttpResponse ;
8+ import io .netty .handler .codec .http .FullHttpRequest ;
9+ import io .netty .handler .codec .http .FullHttpResponse ;
10+ import io .netty .handler .codec .http .HttpUtil ;
11+ import okhttp3 .OkHttpClient ;
12+ import okhttp3 .Request ;
13+ import okhttp3 .Response ;
14+ import org .apache .http .HttpResponse ;
15+ import org .apache .http .client .methods .HttpGet ;
16+ import org .apache .http .concurrent .FutureCallback ;
17+ import org .apache .http .impl .nio .client .HttpAsyncClients ;
18+ import org .apache .http .impl .nio .reactor .IOReactorConfig ;
19+ import org .apache .http .protocol .HTTP ;
20+ import org .apache .http .util .EntityUtils ;
21+
22+ import java .io .IOException ;
23+ import java .util .concurrent .*;
24+
25+ import static io .netty .handler .codec .http .HttpResponseStatus .NO_CONTENT ;
26+ import static io .netty .handler .codec .http .HttpResponseStatus .OK ;
27+ import static io .netty .handler .codec .http .HttpVersion .HTTP_1_1 ;
28+
329public class OkhttpOutboundHandler {
4- }
30+
31+ private OkHttpClient client ;
32+ private ExecutorService proxyService ;
33+ private String backendUrl ;
34+
35+ public OkhttpOutboundHandler (String backendUrl ) {
36+ this .backendUrl = backendUrl .endsWith ("/" ) ? backendUrl .substring (0 , backendUrl .length () - 1 ) : backendUrl ;
37+ int cores = Runtime .getRuntime ().availableProcessors () * 2 ;
38+ long keepAliveTime = 1000 ;
39+ int queueSize = 2048 ;
40+ RejectedExecutionHandler handler = new ThreadPoolExecutor .CallerRunsPolicy ();//.DiscardPolicy();
41+ proxyService = new ThreadPoolExecutor (cores , cores ,
42+ keepAliveTime , TimeUnit .MILLISECONDS , new ArrayBlockingQueue <>(queueSize ),
43+ new NamedThreadFactory ("proxyService" ), handler );
44+
45+ client = new OkHttpClient ();
46+ }
47+
48+ public void handle (final FullHttpRequest fullRequest , final ChannelHandlerContext ctx ) {
49+ final String url = this .backendUrl + fullRequest .uri ();
50+ proxyService .submit (() -> fetchGet (fullRequest , ctx , url ));
51+ }
52+
53+ private void fetchGet (final FullHttpRequest inbound , final ChannelHandlerContext ctx , final String url ) {
54+ System .out .println (inbound .headers ().get ("nio" ));
55+ Request request = new Request .Builder ()
56+ .addHeader ("Content-Type" , "text/html" )
57+ .addHeader ("charset" , "UTF-8" )
58+ .addHeader ("nio" , inbound .headers ().get ("nio" ))
59+ .url (url )
60+ .build ();
61+ try {
62+ Response response = client .newCall (request ).execute ();
63+ handleResponse (inbound , ctx , response );
64+
C1E2
} catch (Exception e ) {
65+ e .printStackTrace ();
66+ }
67+ }
68+
69+ private void handleResponse (final FullHttpRequest fullRequest , final ChannelHandlerContext ctx , final Response endpointResponse ) throws Exception {
70+ FullHttpResponse response = null ;
71+ try {
72+ byte [] body = endpointResponse .body ().bytes ();
73+
74+ response = new DefaultFullHttpResponse (HTTP_1_1 , OK , Unpooled .wrappedBuffer (body ));
75+ response .headers ().set ("Content-Type" , "application/json" );
76+ response .headers ().setInt ("Content-Length" , Integer .parseInt (endpointResponse .header ("Content-Length" )));
77+
78+ } catch (Exception e ) {
79+ e .printStackTrace ();
80+ response = new DefaultFullHttpResponse (HTTP_1_1 , NO_CONTENT );
81+ exceptionCaught (ctx , e );
82+ } finally {
83+ if (fullRequest != null ) {
84+ if (!HttpUtil .isKeepAlive (fullRequest )) {
85+ ctx .write (response ).addListener (ChannelFutureListener .CLOSE );
86+ } else {
87+ ctx .write (response );
88+ }
89+ }
90+ ctx .flush ();
91+ }
92+
93+ }
94+
95+ public void exceptionCaught (ChannelHandlerContext ctx , Throwable cause ) {
96+ cause .printStackTrace ();
97+ ctx .close ();
98+ }
99+ }
0 commit comments