21
21
import io .netty .handler .codec .http .HttpObject ;
22
22
import io .netty .handler .codec .http .HttpRequest ;
23
23
import io .netty .handler .codec .http .HttpResponse ;
24
- import io .netty .handler .codec .http .HttpUtil ;
25
24
import io .netty .handler .codec .http .LastHttpContent ;
26
25
27
26
import java .io .IOException ;
@@ -45,75 +44,30 @@ public HttpHandler(AsyncHttpClientConfig config, ChannelManager channelManager,
45
44
super (config , channelManager , requestSender );
46
45
}
47
46
48
- private void finishUpdate (final NettyResponseFuture <?> future , Channel channel , boolean expectOtherChunks ) throws IOException {
49
-
50
- future .cancelTimeouts ();
51
-
52
- boolean keepAlive = future .isKeepAlive ();
53
- if (expectOtherChunks && keepAlive )
54
- channelManager .drainChannelAndOffer (channel , future );
55
- else
56
- channelManager .tryToOfferChannelToPool (channel , future .getAsyncHandler (), keepAlive , future .getPartitionKey ());
57
-
58
- try {
59
- future .done ();
60
- } catch (Exception t ) {
61
- // Never propagate exception once we know we are done.
62
- logger .debug (t .getMessage (), t );
63
- }
64
- }
65
-
66
- private boolean updateBodyAndInterrupt (NettyResponseFuture <?> future , AsyncHandler <?> handler , HttpResponseBodyPart bodyPart ) throws Exception {
67
- boolean interrupt = handler .onBodyPartReceived (bodyPart ) != State .CONTINUE ;
68
- if (interrupt )
69
- future .setKeepAlive (false );
70
- return interrupt ;
71
- }
72
-
73
- private void notifyHandler (Channel channel , NettyResponseFuture <?> future , HttpResponse response , AsyncHandler <?> handler , NettyResponseStatus status ,
74
- HttpRequest httpRequest , HttpResponseHeaders responseHeaders ) throws IOException , Exception {
75
-
76
- boolean exit = exitAfterHandlingStatus (channel , future , response , handler , status , httpRequest ) || //
77
- exitAfterHandlingHeaders (channel , future , response , handler , responseHeaders , httpRequest ) || //
78
- exitAfterHandlingReactiveStreams (channel , future , response , handler , httpRequest );
79
-
80
- if (exit )
81
- finishUpdate (future , channel , HttpUtil .isTransferEncodingChunked (httpRequest ) || HttpUtil .isTransferEncodingChunked (response ));
82
- }
83
-
84
- private boolean exitAfterHandlingStatus (//
85
- Channel channel ,//
86
- NettyResponseFuture <?> future ,//
87
- HttpResponse response , AsyncHandler <?> handler ,//
88
- NettyResponseStatus status ,//
89
- HttpRequest httpRequest ) throws IOException , Exception {
90
- return handler .onStatusReceived (status ) != State .CONTINUE ;
47
+ private boolean abortAfterHandlingStatus (//
48
+ AsyncHandler <?> handler ,//
49
+ NettyResponseStatus status ) throws IOException , Exception {
50
+ return handler .onStatusReceived (status ) == State .ABORT ;
91
51
}
92
52
93
- private boolean exitAfterHandlingHeaders (//
94
- Channel channel ,//
95
- NettyResponseFuture <?> future ,//
96
- HttpResponse response ,//
53
+ private boolean abortAfterHandlingHeaders (//
97
54
AsyncHandler <?> handler ,//
98
- HttpResponseHeaders responseHeaders ,//
99
- HttpRequest httpRequest ) throws IOException , Exception {
100
- return !response .headers ().isEmpty () && handler .onHeadersReceived (responseHeaders ) != State .CONTINUE ;
55
+ HttpResponseHeaders responseHeaders ) throws IOException , Exception {
56
+ return !responseHeaders .getHeaders ().isEmpty () && handler .onHeadersReceived (responseHeaders ) == State .ABORT ;
101
57
}
102
58
103
- private boolean exitAfterHandlingReactiveStreams (//
59
+ private boolean abortAfterHandlingReactiveStreams (//
104
60
Channel channel ,//
105
61
NettyResponseFuture <?> future ,//
106
- HttpResponse response ,//
107
- AsyncHandler <?> handler ,//
108
- HttpRequest httpRequest ) throws IOException {
62
+ AsyncHandler <?> handler ) throws IOException {
109
63
if (handler instanceof StreamedAsyncHandler ) {
110
64
StreamedAsyncHandler <?> streamedAsyncHandler = (StreamedAsyncHandler <?>) handler ;
111
65
StreamedResponsePublisher publisher = new StreamedResponsePublisher (channel .eventLoop (), channelManager , future , channel );
112
66
// FIXME do we really need to pass the event loop?
113
67
// FIXME move this to ChannelManager
114
68
channel .pipeline ().addLast (channel .eventLoop (), "streamedAsyncHandler" , publisher );
115
69
Channels .setAttribute (channel , publisher );
116
- return streamedAsyncHandler .onStream (publisher ) != State .CONTINUE ;
70
+ return streamedAsyncHandler .onStream (publisher ) == State .ABORT ;
117
71
}
118
72
return false ;
119
73
}
@@ -129,7 +83,13 @@ private void handleHttpResponse(final HttpResponse response, final Channel chann
129
83
HttpResponseHeaders responseHeaders = new HttpResponseHeaders (response .headers ());
130
84
131
85
if (!interceptors .exitAfterIntercept (channel , future , handler , response , status , responseHeaders )) {
132
- notifyHandler (channel , future , response , handler , status , httpRequest , responseHeaders );
86
+ boolean abort = abortAfterHandlingStatus (handler , status ) || //
87
+ abortAfterHandlingHeaders (handler , responseHeaders ) || //
88
+ abortAfterHandlingReactiveStreams (channel , future , handler );
89
+
90
+ if (abort ) {
91
+ finishUpdate (future , channel , false , false );
92
+ }
133
93
}
134
94
}
135
95
@@ -138,26 +98,28 @@ private void handleChunk(HttpContent chunk,//
138
98
final NettyResponseFuture <?> future ,//
139
99
AsyncHandler <?> handler ) throws IOException , Exception {
140
100
141
- boolean interrupt = false ;
101
+ boolean abort = false ;
142
102
boolean last = chunk instanceof LastHttpContent ;
143
103
144
104
// Netty 4: the last chunk is not empty
145
105
if (last ) {
146
106
LastHttpContent lastChunk = (LastHttpContent ) chunk ;
147
107
HttpHeaders trailingHeaders = lastChunk .trailingHeaders ();
148
108
if (!trailingHeaders .isEmpty ()) {
149
- interrupt = handler .onHeadersReceived (new HttpResponseHeaders (trailingHeaders , true )) != State .CONTINUE ;
109
+ abort = handler .onHeadersReceived (new HttpResponseHeaders (trailingHeaders , true )) == State .ABORT ;
150
110
}
151
111
}
152
112
153
113
ByteBuf buf = chunk .content ();
154
- if (!interrupt && !(handler instanceof StreamedAsyncHandler ) && (buf .readableBytes () > 0 || last )) {
155
- HttpResponseBodyPart part = config .getResponseBodyPartFactory ().newResponseBodyPart (buf , last );
156
- interrupt = updateBodyAndInterrupt ( future , handler , part ) ;
114
+ if (!abort && !(handler instanceof StreamedAsyncHandler ) && (buf .readableBytes () > 0 || last )) {
115
+ HttpResponseBodyPart bodyPart = config .getResponseBodyPartFactory ().newResponseBodyPart (buf , last );
116
+ abort = handler . onBodyPartReceived ( bodyPart ) == State . ABORT ;
157
117
}
158
118
159
- if (interrupt || last )
160
- finishUpdate (future , channel , !last );
119
+ if (abort || last ) {
120
+ boolean keepAlive = !abort && future .isKeepAlive ();
121
+ finishUpdate (future , channel , keepAlive , !last );
122
+ }
161
123
}
162
124
163
125
@ Override
@@ -207,7 +169,7 @@ private void readFailed(Channel channel, NettyResponseFuture<?> future, Throwabl
207
169
} catch (Exception abortException ) {
208
170
logger .debug ("Abort failed" , abortException );
209
171
} finally {
210
- finishUpdate (future , channel , false );
172
+ finishUpdate (future , channel , false , false );
211
173
}
212
174
}
213
175
0 commit comments