2929
3030import java .io .IOException ;
3131import java .io .InterruptedIOException ;
32+ import java .nio .ByteBuffer ;
33+ import java .util .List ;
34+ import java .util .concurrent .atomic .AtomicReference ;
3235
3336import org .apache .hc .client5 .http .AuthenticationStrategy ;
3437import org .apache .hc .client5 .http .HttpRoute ;
5356import org .apache .hc .core5 .concurrent .CancellableDependency ;
5457import org .apache .hc .core5 .concurrent .FutureCallback ;
5558import org .apache .hc .core5 .http .EntityDetails ;
59+ import org .apache .hc .core5 .http .Header ;
5660import org .apache .hc .core5 .http .HttpException ;
5761import org .apache .hc .core5 .http .HttpHost ;
5862import org .apache .hc .core5 .http .HttpRequest ;
6266import org .apache .hc .core5 .http .Method ;
6367import org .apache .hc .core5 .http .message .BasicHttpRequest ;
6468import org .apache .hc .core5 .http .message .StatusLine ;
69+ import org .apache .hc .core5 .http .nio .AsyncClientExchangeHandler ;
6570import org .apache .hc .core5 .http .nio .AsyncDataConsumer ;
6671import org .apache .hc .core5 .http .nio .AsyncEntityProducer ;
72+ import org .apache .hc .core5 .http .nio .CapacityChannel ;
73+ import org .apache .hc .core5 .http .nio .DataStreamChannel ;
74+ import org .apache .hc .core5 .http .nio .RequestChannel ;
75+ import org .apache .hc .core5 .http .protocol .HttpContext ;
6776import org .apache .hc .core5 .http .protocol .HttpCoreContext ;
6877import org .apache .hc .core5 .http .protocol .HttpProcessor ;
6978import org .apache .hc .core5 .util .Args ;
@@ -255,20 +264,20 @@ public void cancelled() {
255264 if (LOG .isDebugEnabled ()) {
256265 LOG .debug ("{} create tunnel" , exchangeId );
257266 }
258- createTunnel (state , proxy , target , scope , chain , new AsyncExecCallback () {
267+ createTunnel (state , proxy , target , scope , new AsyncExecCallback () {
259268
260- @ Override
261- public AsyncDataConsumer handleResponse (
262- final HttpResponse response ,
263- final EntityDetails entityDetails ) throws HttpException , IOException {
264- return asyncExecCallback .handleResponse (response , entityDetails );
265- }
269+ @ Override
270+ public AsyncDataConsumer handleResponse (
271+ final HttpResponse response ,
272+ final EntityDetails entityDetails ) throws HttpException , IOException {
273+ return asyncExecCallback .handleResponse (response , entityDetails );
274+ }
266275
267- @ Override
268- public void handleInformationResponse (
269- final HttpResponse response ) throws HttpException , IOException {
270- asyncExecCallback .handleInformationResponse (response );
271- }
276+ @ Override
277+ public void handleInformationResponse (
278+ final HttpResponse response ) throws HttpException , IOException {
279+ asyncExecCallback .handleInformationResponse (response );
280+ }
272281
273282 @ Override
274283 public void completed () {
@@ -302,6 +311,7 @@ public void completed() {
302311
303312 @ Override
304313 public void failed (final Exception cause ) {
314+ execRuntime .markConnectionNonReusable ();
305315 asyncExecCallback .failed (cause );
306316 }
307317
@@ -370,30 +380,75 @@ private void createTunnel(
370380 final HttpHost proxy ,
371381 final HttpHost nextHop ,
372382 final AsyncExecChain .Scope scope ,
373- final AsyncExecChain chain ,
374383 final AsyncExecCallback asyncExecCallback ) throws HttpException , IOException {
375384
385+ final CancellableDependency operation = scope .cancellableDependency ;
376386 final HttpClientContext clientContext = scope .clientContext ;
387+ final AsyncExecRuntime execRuntime = scope .execRuntime ;
388+ final String exchangeId = scope .exchangeId ;
377389
378390 final AuthExchange proxyAuthExchange = proxy != null ? clientContext .getAuthExchange (proxy ) : new AuthExchange ();
379391
380392 if (authCacheKeeper != null ) {
381393 authCacheKeeper .loadPreemptively (proxy , null , proxyAuthExchange , clientContext );
382394 }
383395
384- final HttpRequest connect = new BasicHttpRequest (Method .CONNECT , nextHop , nextHop .toHostString ());
385- connect .setVersion (HttpVersion .HTTP_1_1 );
396+ final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler () {
397+
398+ private final AtomicReference <AsyncDataConsumer > entityConsumerRef = new AtomicReference <>();
386399
387- proxyHttpProcessor .process (connect , null , clientContext );
388- authenticator .addAuthResponse (proxy , ChallengeType .PROXY , connect , proxyAuthExchange , clientContext );
400+ @ Override
401+ public void releaseResources () {
402+ final AsyncDataConsumer entityConsumer = entityConsumerRef .getAndSet (null );
403+ if (entityConsumer != null ) {
404+ entityConsumer .releaseResources ();
405+ }
406+ }
389407
390- chain .proceed (connect , null , scope , new AsyncExecCallback () {
408+ @ Override
409+ public void failed (final Exception cause ) {
410+ final AsyncDataConsumer entityConsumer = entityConsumerRef .getAndSet (null );
411+ if (entityConsumer != null ) {
412+ entityConsumer .releaseResources ();
413+ }
414+ asyncExecCallback .failed (cause );
415+ }
416+
417+ @ Override
418+ public void cancel () {
419+ failed (new InterruptedIOException ());
420+ }
391421
392422 @ Override
393- public AsyncDataConsumer handleResponse (
394- final HttpResponse response ,
395- final EntityDetails entityDetails ) throws HttpException , IOException {
423+ public void produceRequest (final RequestChannel requestChannel ,
424+ final HttpContext httpContext ) throws HttpException , IOException {
425+ final HttpRequest connect = new BasicHttpRequest (Method .CONNECT , nextHop , nextHop .toHostString ());
426+ connect .setVersion (HttpVersion .HTTP_1_1 );
427+
428+ proxyHttpProcessor .process (connect , null , clientContext );
429+ authenticator .addAuthResponse (proxy , ChallengeType .PROXY , connect , proxyAuthExchange , clientContext );
396430
431+ requestChannel .sendRequest (connect , null , clientContext );
432+ }
433+
434+ @ Override
435+ public void produce (final DataStreamChannel dataStreamChannel ) throws IOException {
436+ }
437+
438+ @ Override
439+ public int available () {
440+ return 0 ;
441+ }
442+
443+ @ Override
444+ public void consumeInformation (final HttpResponse httpResponse ,
445+ final HttpContext httpContext ) throws HttpException , IOException {
446+ }
447+
448+ @ Override
449+ public void consumeResponse (final HttpResponse response ,
450+ final EntityDetails entityDetails ,
451+ final HttpContext httpContext ) throws HttpException , IOException {
397452 clientContext .setAttribute (HttpCoreContext .HTTP_RESPONSE , response );
398453 proxyHttpProcessor .process (response , entityDetails , clientContext );
399454
@@ -404,31 +459,56 @@ public AsyncDataConsumer handleResponse(
404459
405460 if (needAuthentication (proxyAuthExchange , proxy , response , clientContext )) {
406461 state .challenged = true ;
407- return null ;
408- }
409- state .challenged = false ;
410- if (status >= HttpStatus .SC_REDIRECTION ) {
411- state .tunnelRefused = true ;
412- return asyncExecCallback .handleResponse (response , entityDetails );
462+ } else {
463+ state .challenged = false ;
464+ if (status >= HttpStatus .SC_REDIRECTION ) {
465+ state .tunnelRefused = true ;
466+ entityConsumerRef .set (asyncExecCallback .handleResponse (response , entityDetails ));
467+ } else if (status == HttpStatus .SC_OK ) {
468+ asyncExecCallback .completed ();
469+ } else {
470+ throw new HttpException ("Unexpected response to CONNECT request: " + new StatusLine (response ));
471+ }
413472 }
414- return null ;
415473 }
416474
417475 @ Override
418- public void handleInformationResponse (final HttpResponse response ) throws HttpException , IOException {
476+ public void updateCapacity (final CapacityChannel capacityChannel ) throws IOException {
477+ final AsyncDataConsumer entityConsumer = entityConsumerRef .get ();
478+ if (entityConsumer != null ) {
479+ entityConsumer .updateCapacity (capacityChannel );
480+ } else {
481+ capacityChannel .update (Integer .MAX_VALUE );
482+ }
419483 }
420484
421485 @ Override
422- public void completed () {
423- asyncExecCallback .completed ();
486+ public void consume (final ByteBuffer src ) throws IOException {
487+ final AsyncDataConsumer entityConsumer = entityConsumerRef .get ();
488+ if (entityConsumer != null ) {
489+ entityConsumer .consume (src );
490+ }
424491 }
425492
426493 @ Override
427- public void failed (final Exception cause ) {
428- asyncExecCallback .failed (cause );
494+ public void streamEnd (final List <? extends Header > trailers ) throws HttpException , IOException {
495+ final AsyncDataConsumer entityConsumer = entityConsumerRef .getAndSet (null );
496+ if (entityConsumer != null ) {
497+ entityConsumer .streamEnd (trailers );
498+ }
499+ asyncExecCallback .completed ();
429500 }
430501
431- });
502+ };
503+
504+ if (LOG .isDebugEnabled ()) {
505+ operation .setDependency (execRuntime .execute (
506+ exchangeId ,
507+ new LoggingAsyncClientExchangeHandler (LOG , exchangeId , internalExchangeHandler ),
508+ clientContext ));
509+ } else {
510+ operation .setDependency (execRuntime .execute (exchangeId , internalExchangeHandler , clientContext ));
511+ }
432512
433513 }
434514
0 commit comments