|
36 | 36 | import java.time.Clock; |
37 | 37 | import java.util.ArrayList; |
38 | 38 | import java.util.List; |
| 39 | +import java.util.Optional; |
39 | 40 | import java.util.Random; |
40 | 41 | import java.util.concurrent.CancellationException; |
41 | 42 | import java.util.concurrent.ConcurrentLinkedQueue; |
|
50 | 51 | import java.util.logging.Logger; |
51 | 52 | import javax.annotation.Nullable; |
52 | 53 |
|
| 54 | +import static com.google.bigtable.v2.PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN; |
| 55 | + |
53 | 56 | /** |
54 | 57 | * A {@link ManagedChannel} that will send requests round-robin via a set of channels. |
55 | 58 | * |
@@ -545,9 +548,11 @@ static class Entry implements BigtableChannelObserver { |
545 | 548 | * outstanding RPCs has to happen when the ClientCall is closed or the ClientCall failed to |
546 | 549 | * start. |
547 | 550 | */ |
| 551 | + |
| 552 | + /** this contains the PeerInfo field of the most recent rpc on this channel entry. */ |
548 | 553 | @VisibleForTesting |
549 | | - final AtomicReference<com.google.bigtable.v2.PeerInfo.TransportType> transportTypeHolder = |
550 | | - new AtomicReference<>(com.google.bigtable.v2.PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN); |
| 554 | + volatile PeerInfo.TransportType transportType = |
| 555 | + TRANSPORT_TYPE_UNKNOWN; |
551 | 556 |
|
552 | 557 | @VisibleForTesting final AtomicInteger errorCount = new AtomicInteger(0); |
553 | 558 | @VisibleForTesting final AtomicInteger successCount = new AtomicInteger(0); |
@@ -580,19 +585,17 @@ static class Entry implements BigtableChannelObserver { |
580 | 585 | this.channel = channel; |
581 | 586 | } |
582 | 587 |
|
583 | | - void checkAndSetTransportType(CallOptions callOptions) { |
| 588 | + void setTransportType(CallOptions callOptions) { |
584 | 589 | MetadataExtractorInterceptor.SidebandData sidebandData = |
585 | 590 | MetadataExtractorInterceptor.SidebandData.from(callOptions); |
586 | 591 |
|
587 | 592 | // Set to the specific transport type if present, otherwise default to UNKNOWN |
588 | 593 | // we could check the Status and set it to unknown, but we might have PeerInfo with some non |
589 | 594 | // OK Status |
590 | | - if (sidebandData != null && sidebandData.getPeerInfo() != null) { |
591 | | - transportTypeHolder.set(sidebandData.getPeerInfo().getTransportType()); |
592 | | - } else { |
593 | | - transportTypeHolder.set( |
594 | | - com.google.bigtable.v2.PeerInfo.TransportType.TRANSPORT_TYPE_UNKNOWN); |
595 | | - } |
| 595 | + transportType = Optional.ofNullable(sidebandData) |
| 596 | + .map(MetadataExtractorInterceptor.SidebandData::getPeerInfo) |
| 597 | + .map(PeerInfo::getTransportType) |
| 598 | + .orElse(TRANSPORT_TYPE_UNKNOWN); |
596 | 599 | } |
597 | 600 |
|
598 | 601 | ManagedChannel getManagedChannel() { |
@@ -697,7 +700,7 @@ public long getAndResetSuccessCount() { |
697 | 700 |
|
698 | 701 | @Override |
699 | 702 | public PeerInfo.TransportType getTransportType() { |
700 | | - return transportTypeHolder.get(); |
| 703 | + return transportType; |
701 | 704 | } |
702 | 705 |
|
703 | 706 | void incrementErrorCount() { |
@@ -761,9 +764,13 @@ public void start(Listener<RespT> responseListener, Metadata headers) { |
761 | 764 | try { |
762 | 765 | super.start( |
763 | 766 | new SimpleForwardingClientCallListener<RespT>(responseListener) { |
| 767 | + @Override |
| 768 | + public void onHeaders(Metadata headers) { |
| 769 | + entry.setTransportType(callOptions); |
| 770 | + super.onHeaders(headers); |
| 771 | + } |
764 | 772 | @Override |
765 | 773 | public void onClose(Status status, Metadata trailers) { |
766 | | - entry.checkAndSetTransportType(callOptions); |
767 | 774 | if (!wasClosed.compareAndSet(false, true)) { |
768 | 775 | LOG.log( |
769 | 776 | Level.WARNING, |
|
0 commit comments