Skip to content

Commit f7ac968

Browse files
committed
HTTPCLIENT-2310: Async Connect exec handler incorrectly pipes CONNECT requests through the main request protocol chain
1 parent a46c9e5 commit f7ac968

1 file changed

Lines changed: 114 additions & 34 deletions

File tree

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java

Lines changed: 114 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929

3030
import java.io.IOException;
3131
import java.io.InterruptedIOException;
32+
import java.nio.ByteBuffer;
33+
import java.util.List;
34+
import java.util.concurrent.atomic.AtomicReference;
3235

3336
import org.apache.hc.client5.http.AuthenticationStrategy;
3437
import org.apache.hc.client5.http.HttpRoute;
@@ -53,6 +56,7 @@
5356
import org.apache.hc.core5.concurrent.CancellableDependency;
5457
import org.apache.hc.core5.concurrent.FutureCallback;
5558
import org.apache.hc.core5.http.EntityDetails;
59+
import org.apache.hc.core5.http.Header;
5660
import org.apache.hc.core5.http.HttpException;
5761
import org.apache.hc.core5.http.HttpHost;
5862
import org.apache.hc.core5.http.HttpRequest;
@@ -62,8 +66,13 @@
6266
import org.apache.hc.core5.http.Method;
6367
import org.apache.hc.core5.http.message.BasicHttpRequest;
6468
import org.apache.hc.core5.http.message.StatusLine;
69+
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
6570
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
6671
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
72+
import org.apache.hc.core5.http.nio.CapacityChannel;
73+
import org.apache.hc.core5.http.nio.DataStreamChannel;
74+
import org.apache.hc.core5.http.nio.RequestChannel;
75+
import org.apache.hc.core5.http.protocol.HttpContext;
6776
import org.apache.hc.core5.http.protocol.HttpCoreContext;
6877
import org.apache.hc.core5.http.protocol.HttpProcessor;
6978
import org.apache.hc.core5.util.Args;
@@ -255,20 +264,20 @@ public void cancelled() {
255264
if (LOG.isDebugEnabled()) {
256265
LOG.debug("{} create tunnel", exchangeId);
257266
}
258-
createTunnel(state, proxy, target, scope, chain, new AsyncExecCallback() {
267+
createTunnel(state, proxy, target, scope, new AsyncExecCallback() {
259268

260-
@Override
261-
public AsyncDataConsumer handleResponse(
262-
final HttpResponse response,
263-
final EntityDetails entityDetails) throws HttpException, IOException {
264-
return asyncExecCallback.handleResponse(response, entityDetails);
265-
}
269+
@Override
270+
public AsyncDataConsumer handleResponse(
271+
final HttpResponse response,
272+
final EntityDetails entityDetails) throws HttpException, IOException {
273+
return asyncExecCallback.handleResponse(response, entityDetails);
274+
}
266275

267-
@Override
268-
public void handleInformationResponse(
269-
final HttpResponse response) throws HttpException, IOException {
270-
asyncExecCallback.handleInformationResponse(response);
271-
}
276+
@Override
277+
public void handleInformationResponse(
278+
final HttpResponse response) throws HttpException, IOException {
279+
asyncExecCallback.handleInformationResponse(response);
280+
}
272281

273282
@Override
274283
public void completed() {
@@ -302,6 +311,7 @@ public void completed() {
302311

303312
@Override
304313
public void failed(final Exception cause) {
314+
execRuntime.markConnectionNonReusable();
305315
asyncExecCallback.failed(cause);
306316
}
307317

@@ -370,30 +380,75 @@ private void createTunnel(
370380
final HttpHost proxy,
371381
final HttpHost nextHop,
372382
final AsyncExecChain.Scope scope,
373-
final AsyncExecChain chain,
374383
final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
375384

385+
final CancellableDependency operation = scope.cancellableDependency;
376386
final HttpClientContext clientContext = scope.clientContext;
387+
final AsyncExecRuntime execRuntime = scope.execRuntime;
388+
final String exchangeId = scope.exchangeId;
377389

378390
final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange();
379391

380392
if (authCacheKeeper != null) {
381393
authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext);
382394
}
383395

384-
final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
385-
connect.setVersion(HttpVersion.HTTP_1_1);
396+
final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
397+
398+
private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>();
386399

387-
proxyHttpProcessor.process(connect, null, clientContext);
388-
authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
400+
@Override
401+
public void releaseResources() {
402+
final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
403+
if (entityConsumer != null) {
404+
entityConsumer.releaseResources();
405+
}
406+
}
389407

390-
chain.proceed(connect, null, scope, new AsyncExecCallback() {
408+
@Override
409+
public void failed(final Exception cause) {
410+
final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
411+
if (entityConsumer != null) {
412+
entityConsumer.releaseResources();
413+
}
414+
asyncExecCallback.failed(cause);
415+
}
416+
417+
@Override
418+
public void cancel() {
419+
failed(new InterruptedIOException());
420+
}
391421

392422
@Override
393-
public AsyncDataConsumer handleResponse(
394-
final HttpResponse response,
395-
final EntityDetails entityDetails) throws HttpException, IOException {
423+
public void produceRequest(final RequestChannel requestChannel,
424+
final HttpContext httpContext) throws HttpException, IOException {
425+
final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString());
426+
connect.setVersion(HttpVersion.HTTP_1_1);
427+
428+
proxyHttpProcessor.process(connect, null, clientContext);
429+
authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext);
396430

431+
requestChannel.sendRequest(connect, null, clientContext);
432+
}
433+
434+
@Override
435+
public void produce(final DataStreamChannel dataStreamChannel) throws IOException {
436+
}
437+
438+
@Override
439+
public int available() {
440+
return 0;
441+
}
442+
443+
@Override
444+
public void consumeInformation(final HttpResponse httpResponse,
445+
final HttpContext httpContext) throws HttpException, IOException {
446+
}
447+
448+
@Override
449+
public void consumeResponse(final HttpResponse response,
450+
final EntityDetails entityDetails,
451+
final HttpContext httpContext) throws HttpException, IOException {
397452
clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
398453
proxyHttpProcessor.process(response, entityDetails, clientContext);
399454

@@ -404,31 +459,56 @@ public AsyncDataConsumer handleResponse(
404459

405460
if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) {
406461
state.challenged = true;
407-
return null;
408-
}
409-
state.challenged = false;
410-
if (status >= HttpStatus.SC_REDIRECTION) {
411-
state.tunnelRefused = true;
412-
return asyncExecCallback.handleResponse(response, entityDetails);
462+
} else {
463+
state.challenged = false;
464+
if (status >= HttpStatus.SC_REDIRECTION) {
465+
state.tunnelRefused = true;
466+
entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
467+
} else if (status == HttpStatus.SC_OK) {
468+
asyncExecCallback.completed();
469+
} else {
470+
throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response));
471+
}
413472
}
414-
return null;
415473
}
416474

417475
@Override
418-
public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException {
476+
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
477+
final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
478+
if (entityConsumer != null) {
479+
entityConsumer.updateCapacity(capacityChannel);
480+
} else {
481+
capacityChannel.update(Integer.MAX_VALUE);
482+
}
419483
}
420484

421485
@Override
422-
public void completed() {
423-
asyncExecCallback.completed();
486+
public void consume(final ByteBuffer src) throws IOException {
487+
final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
488+
if (entityConsumer != null) {
489+
entityConsumer.consume(src);
490+
}
424491
}
425492

426493
@Override
427-
public void failed(final Exception cause) {
428-
asyncExecCallback.failed(cause);
494+
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
495+
final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
496+
if (entityConsumer != null) {
497+
entityConsumer.streamEnd(trailers);
498+
}
499+
asyncExecCallback.completed();
429500
}
430501

431-
});
502+
};
503+
504+
if (LOG.isDebugEnabled()) {
505+
operation.setDependency(execRuntime.execute(
506+
exchangeId,
507+
new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler),
508+
clientContext));
509+
} else {
510+
operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext));
511+
}
432512

433513
}
434514

0 commit comments

Comments
 (0)