10000 Always close Channel when AsyncHandler returns ABORT, close #1350, cl… · chakra-coder/async-http-client@cb92e37 · GitHub
[go: up one dir, main page]

Skip to content

Commit cb92e37

Browse files
committed
Always close Channel when AsyncHandler returns ABORT, close AsyncHttpClient#1350, close AsyncHttpClient#1306
Motivation: We currently handle ABORT in a very gentle manner: ignore next chunks, drain channel until last chunk is received and then offer the channel to the pool if keep-alive is possible. This is way too gentle, and prevents us from closing infinite streams. Modifications: Have ABORT close the Channel, both for HTTP and WebSocket Result: We now close infinite streams.
1 parent 06798b9 commit cb92e37

File tree

3 files changed

+49
-68
lines changed

3 files changed

+49
-68
lines changed

client/src/main/java/org/asynchttpclient/netty/handler/AsyncHttpClientHandler.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,25 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
225225
private boolean isHandledByReactiveStreams(ChannelHandlerContext ctx) {
226226
return Channels.getAttribute(ctx.channel()) instanceof StreamedResponsePublisher;
227227
}
228+
229+
protected void finishUpdate(NettyResponseFuture<?> future, Channel channel, boolean keepAlive, boolean expectOtherChunks) throws IOException {
230+
future.cancelTimeouts();
231+
232+
if (!keepAlive) {
233+
channelManager.closeChannel(channel);
234+
} else if (expectOtherChunks) {
235+
channelManager.drainChannelAndOffer(channel, future);
236+
} else {
237+
channelManager.tryToOfferChannelToPool(channel, future.getAsyncHandler(), keepAlive, future.getPartitionKey());
238+
}
239+
240+
try {
241+
future.done();
242+
} catch (Exception t) {
243+
// Never propagate exception once we know we are done.
244+
logger.debug(t.getMessage(), t);
245+
}
246+
}
228247

229248
public abstract void handleRead(Channel channel, NettyResponseFuture<?> future, Object message) throws Exception;
230249

client/src/main/java/org/asynchttpclient/netty/handler/HttpHandler.java

Lines changed: 27 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.netty.handler.codec.http.HttpObject;
2222
import io.netty.handler.codec.http.HttpRequest;
2323
import io.netty.handler.codec.http.HttpResponse;
24-
import io.netty.handler.codec.http.HttpUtil;
2524
import io.netty.handler.codec.http.LastHttpContent;
2625

2726
import java.io.IOException;
@@ -45,75 +44,30 @@ public HttpHandler(AsyncHttpClientConfig config, ChannelManager channelManager,
4544
super(config, channelManager, requestSender);
4645
}
4746

48-
private void finishUpdate(final NettyResponseFuture<?> future, Channel channel, boolean expectOtherChunks) throws IOException {
49-
50-
future.cancelTimeouts();
51-
52-
boolean keepAlive = future.isKeepAlive();
53-
if (expectOtherChunks && keepAlive)
54-
channelManager.drainChannelAndOffer(channel, future);
55-
else
56-
channelManager.tryToOfferChannelToPool(channel, future.getAsyncHandler(), keepAlive, future.getPartitionKey());
57-
58-
try {
59-
future.done();
60-
} catch (Exception t) {
61-
// Never propagate exception once we know we are done.
62-
logger.debug(t.getMessage(), t);
63-
}
64-
}
65-
66-
private boolean updateBodyAndInterrupt(NettyResponseFuture<?> future, AsyncHandler<?> handler, HttpResponseBodyPart bodyPart) throws Exception {
67-
boolean interrupt = handler.onBodyPartReceived(bodyPart) != State.CONTINUE;
68-
if (interrupt)
69-
future.setKeepAlive(false);
70-
return interrupt;
71-
}
72-
73-
private void notifyHandler(Channel channel, NettyResponseFuture<?> future, HttpResponse response, AsyncHandler<?> handler, NettyResponseStatus status,
74-
HttpRequest httpRequest, HttpResponseHeaders responseHeaders) throws IOException, Exception {
75-
76-
boolean exit = exitAfterHandlingStatus(channel, future, response, handler, status, httpRequest) || //
77-
exitAfterHandlingHeaders(channel, future, response, handler, responseHeaders, httpRequest) || //
78-
exitAfterHandlingReactiveStreams(channel, future, response, handler, httpRequest);
79-
80-
if (exit)
81-
finishUpdate(future, channel, HttpUtil.isTransferEncodingChunked(httpRequest) || HttpUtil.isTransferEncodingChunked(response));
82-
}
83-
84-
private boolean exitAfterHandlingStatus(//
85-
Channel channel,//
86-
NettyResponseFuture<?> future,//
87-
HttpResponse response, AsyncHandler<?> handler,//
88-
NettyResponseStatus status,//
89-
HttpRequest httpRequest) throws IOException, Exception {
90-
return handler.onStatusReceived(status) != State.CONTINUE;
47+
private boolean abortAfterHandlingStatus(//
48+
AsyncHandler<?> handler,//
49+
NettyResponseStatus status) throws IOException, Exception {
50+
return handler.onStatusReceived(status) == State.ABORT;
9151
}
9252

93-
private boolean exitAfterHandlingHeaders(//
94-
Channel channel,//
95-
NettyResponseFuture<?> future,//
96-
HttpResponse response,//
53+
private boolean abortAfterHandlingHeaders(//
9754
AsyncHandler<?> handler,//
98-
HttpResponseHeaders responseHeaders,//
99-
HttpRequest httpRequest) throws IOException, Exception {
100-
return !response.headers().isEmpty() && handler.onHeadersReceived(responseHeaders) != State.CONTINUE;
55+
HttpResponseHeaders responseHeaders) throws IOException, Exception {
56+
return !responseHeaders.getHeaders().isEmpty() && handler.onHeadersReceived(responseHeaders) == State.ABORT;
10157
}
10258

103-
private boolean exitAfterHandlingReactiveStreams(//
59+
private boolean abortAfterHandlingReactiveStreams(//
10460
Channel channel,//
10561
NettyResponseFuture<?> future,//
106-
HttpResponse response,//
107-
AsyncHandler<?> handler,//
108-
HttpRequest httpRequest) throws IOException {
62+
AsyncHandler<?> handler) throws IOException {
10963
if (handler instanceof StreamedAsyncHandler) {
11064
StreamedAsyncHandler<?> streamedAsyncHandler = (StreamedAsyncHandler<?>) handler;
11165
StreamedResponsePublisher publisher = new StreamedResponsePublisher(channel.eventLoop(), channelManager, future, channel);
11266
// FIXME do we really need to pass the event loop?
11367
// FIXME move this to ChannelManager
11468
channel.pipeline().addLast(channel.eventLoop(), "streamedAsyncHandler", publisher);
11569
Channels.setAttribute(channel, publisher);
116-
return streamedAsyncHandler.onStream(publisher) != State.CONTINUE;
70+
return streamedAsyncHandler.onStream(publisher) == State.ABORT;
11771
}
11872
return false;
11973
}
@@ -129,7 +83,13 @@ private void handleHttpResponse(final HttpResponse response, final Channel chann
12983
HttpResponseHeaders responseHeaders = new HttpResponseHeaders(response.headers());
13084

13185
if (!interceptors.exitAfterIntercept(channel, future, handler, response, status, responseHeaders)) {
132-
notifyHandler(channel, future, response, handler, status, httpRequest, responseHeaders);
86+
boolean abort = abortAfterHandlingStatus(handler, status) || //
87+
abortAfterHandlingHeaders(handler, responseHeaders) || //
88+
abortAfterHandlingReactiveStreams(channel, future, handler);
89+
90+
if (abort) {
91+
finishUpdate(future, channel, false, false);
92+
}
13393
}
13494
}
13595

@@ -138,26 +98,28 @@ private void handleChunk(HttpContent chunk,//
13898
final NettyResponseFuture<?> future,//
13999
AsyncHandler<?> handler) throws IOException, Exception {
140100

141-
boolean interrupt = false;
101+
boolean abort = false;
142102
boolean last = chunk instanceof LastHttpContent;
143103

144104
// Netty 4: the last chunk is not empty
145105
if (last) {
146106
LastHttpContent lastChunk = (LastHttpContent) chunk;
147107
HttpHeaders trailingHeaders = lastChunk.trailingHeaders();
148108
if (!trailingHeaders.isEmpty()) {
149-
interrupt = handler.onHeadersReceived(new HttpResponseHeaders(trailingHeaders, true)) != State.CONTINUE;
109+
abort = handler.onHeadersReceived(new HttpResponseHeaders(trailingHeaders, true)) == State.ABORT;
150110
}
151111
}
152112

153113
ByteBuf buf = chunk.content();
154-
if (!interrupt && !(handler instanceof StreamedAsyncHandler) && (buf.readableBytes() > 0 || last)) {
155-
HttpResponseBodyPart part = config.getResponseBodyPartFactory().newResponseBodyPart(buf, last);
156-
interrupt = updateBodyAndInterrupt(future, handler, part);
114+
if (!abort && !(handler instanceof StreamedAsyncHandler) && (buf.readableBytes() > 0 || last)) {
115+
HttpResponseBodyPart bodyPart = config.getResponseBodyPartFactory().newResponseBodyPart(buf, last);
116+
abort = handler.onBodyPartReceived(bodyPart) == State.ABORT;
157117
}
158118

159-
if (interrupt || last)
160-
finishUpdate(future, channel, !last);
119+
if (abort || last) {
120+
boolean keepAlive = !abort && future.isKeepAlive();
121+
finishUpdate(future, channel, keepAlive, !last);
122+
}
161123
}
162124

163125
@Override
@@ -207,7 +169,7 @@ private void readFailed(Channel channel, NettyResponseFuture<?> future, Throwabl
207169
} catch (Exception abortException) {
208170
logger.debug("Abort failed", abortException);
209171
} finally {
210-
finishUpdate(future, channel, false);
172+
finishUpdate(future, channel, false, false);
211173
}
212174
}
213175

client/src/main/java/org/asynchttpclient/netty/handler/WebSocketHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@ private void upgrade(Channel channel, NettyResponseFuture<?> future, WebSocketUp
8787
future.done();
8888
}
8989

90-
private void abort(NettyResponseFuture<?> future, WebSocketUpgradeHandler handler, HttpResponseStatus status) throws Exception {
90+
private void abort(Channel channel, NettyResponseFuture<?> future, WebSocketUpgradeHandler handler, HttpResponseStatus status) throws Exception {
9191
try {
9292
handler.onThrowable(new IOException("Invalid Status code=" + status.getStatusCode() + " text=" + status.getStatusText()));
9393
} finally {
94-
future.done();
94+
finishUpdate(future, channel, false, false);
9595
}
9696
}
9797

@@ -115,7 +115,7 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
115115
upgrade(channel, future, handler, response, responseHeaders);
116116
break;
117117
default:
118-
abort(future, handler, status);
118+
abort(channel, future, handler, status);
119119
}
120120
}
121121

0 commit comments

Comments
 (0)
0