1717
1818import com .google .api .core .InternalApi ;
1919import com .google .api .gax .grpc .ChannelFactory ;
20+ import com .google .bigtable .v2 .PeerInfo ;
21+ import com .google .cloud .bigtable .data .v2 .stub .MetadataExtractorInterceptor ;
2022import com .google .cloud .bigtable .gaxx .grpc .ChannelPoolHealthChecker .ProbeResult ;
2123import com .google .common .annotations .VisibleForTesting ;
2224import com .google .common .base .Preconditions ;
@@ -543,7 +545,9 @@ static class Entry implements BigtableChannelObserver {
543545 * outstanding RPCs has to happen when the ClientCall is closed or the ClientCall failed to
544546 * start.
545547 */
546- @ VisibleForTesting final AtomicReference <Boolean > isAltsHolder = new AtomicReference <>(null );
548+ @ VisibleForTesting
549+ final AtomicReference <com .google .bigtable .v2 .PeerInfo .TransportType > transportChannelHolder =
550+ new AtomicReference <>(com .google .bigtable .v2 .PeerInfo .TransportType .TRANSPORT_TYPE_UNKNOWN );
547551
548552 @ VisibleForTesting final AtomicInteger errorCount = new AtomicInteger (0 );
549553 @ VisibleForTesting final AtomicInteger successCount = new AtomicInteger (0 );
@@ -576,10 +580,22 @@ static class Entry implements BigtableChannelObserver {
576580 this .channel = channel ;
577581 }
578582
579- void checkAndSetIsAlts (ClientCall <?, ?> call ) {
580- // TODO(populate ALTS holder)
581- boolean result = false ;
582- isAltsHolder .compareAndSet (null , result );
583+ void checkAndSetTransportType (Status status , CallOptions callOptions ) {
584+ // set to UNKNOWN if error
585+ if (!status .isOk ()) {
586+ transportChannelHolder .set (
587+ com .google .bigtable .v2 .PeerInfo .TransportType .TRANSPORT_TYPE_UNKNOWN );
588+ return ;
589+ }
590+ MetadataExtractorInterceptor .SidebandData sidebandData =
591+ MetadataExtractorInterceptor .SidebandData .from (callOptions );
592+
593+ if (sidebandData != null ) {
594+ com .google .bigtable .v2 .PeerInfo peerInfo = sidebandData .getPeerInfo ();
595+ if (peerInfo != null ) {
596+ transportChannelHolder .set (peerInfo .getTransportType ());
597+ }
598+ }
583599 }
584600
585601 ManagedChannel getManagedChannel () {
@@ -683,9 +699,8 @@ public long getAndResetSuccessCount() {
683699 }
684700
685701 @ Override
686- public boolean isAltsChannel () {
687- Boolean val = isAltsHolder .get ();
688- return val != null && val ;
702+ public PeerInfo .TransportType getTransportType () {
703+ return transportChannelHolder .get ();
689704 }
690705
691706 void incrementErrorCount () {
@@ -717,7 +732,7 @@ public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
717732 methodDescriptor .getType () == MethodDescriptor .MethodType .SERVER_STREAMING ;
718733 Entry entry = getRetainedEntry (index , isStreaming );
719734 return new ReleasingClientCall <>(
720- entry .channel .newCall (methodDescriptor , callOptions ), entry , isStreaming );
735+ entry .channel .newCall (methodDescriptor , callOptions ), entry , isStreaming , callOptions );
721736 }
722737 }
723738
@@ -726,13 +741,19 @@ static class ReleasingClientCall<ReqT, RespT> extends SimpleForwardingClientCall
726741 @ Nullable private CancellationException cancellationException ;
727742 final Entry entry ;
728743 private final boolean isStreaming ;
744+ private final CallOptions callOptions ;
729745 private final AtomicBoolean wasClosed = new AtomicBoolean ();
730746 private final AtomicBoolean wasReleased = new AtomicBoolean ();
731747
732- public ReleasingClientCall (ClientCall <ReqT , RespT > delegate , Entry entry , boolean isStreaming ) {
748+ public ReleasingClientCall (
749+ ClientCall <ReqT , RespT > delegate ,
750+ Entry entry ,
751+ boolean isStreaming ,
752+ CallOptions callOptions ) {
733753 super (delegate );
734754 this .entry = entry ;
735755 this .isStreaming = isStreaming ;
756+ this .callOptions = callOptions ;
736757 }
737758
738759 @ Override
@@ -741,12 +762,11 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
741762 throw new IllegalStateException ("Call is already cancelled" , cancellationException );
742763 }
743764 try {
744- entry .checkAndSetIsAlts (delegate ());
745-
746765 super .start (
747766 new SimpleForwardingClientCallListener <RespT >(responseListener ) {
748767 @ Override
749768 public void onClose (Status status , Metadata trailers ) {
769+ entry .checkAndSetTransportType (status , callOptions );
750770 if (!wasClosed .compareAndSet (false , true )) {
751771 LOG .log (
752772 Level .WARNING ,
0 commit comments