Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fb52bbf
Save changes.
kannanjgithub Jan 19, 2026
f518d5e
Save changes.
kannanjgithub Jan 20, 2026
3874631
Save changes.
kannanjgithub Jan 20, 2026
0bad82f
Save changes.
kannanjgithub Jan 21, 2026
51b38e2
Fix enum test.
kannanjgithub Jan 21, 2026
4da06a4
Revert temp changes.
kannanjgithub Jan 21, 2026
df70a27
Revert temp changes.
kannanjgithub Jan 21, 2026
bee005f
Try with server streaming alone.
kannanjgithub Jan 23, 2026
efce818
Interceptor logic.
kannanjgithub Jan 31, 2026
2ac2d47
Closing the stream after the rpc is done.
kannanjgithub Feb 10, 2026
4214889
Style fixes.
kannanjgithub Feb 10, 2026
e099d1d
Fix test name.
kannanjgithub Feb 10, 2026
3d75bf8
Fix style warnings.
kannanjgithub Feb 10, 2026
84d9528
Fix build.
kannanjgithub Feb 10, 2026
3136bca
Review comments - Build channel separately for MCS connection scaling…
kannanjgithub Feb 12, 2026
c3fc7c3
Address Review comments.
kannanjgithub Feb 12, 2026
93cb9ad
Expand the test name on the client side as well.
kannanjgithub Feb 13, 2026
3719011
Add debug print statements to diagnose why server is not starting.
kannanjgithub Feb 13, 2026
dbb3881
Revert "Add debug print statements to diagnose why server is not star…
kannanjgithub Feb 13, 2026
bd36d59
Add temp debug stmts for server start.
kannanjgithub Feb 13, 2026
0216d3b
Rename request proto field.
kannanjgithub Feb 19, 2026
3346bf8
Rename request proto field.
kannanjgithub Feb 19, 2026
d27128b
Rename response proto field.
kannanjgithub Feb 19, 2026
a8d66e1
Address review comments.
kannanjgithub Feb 20, 2026
9b66653
Specify MCS limit directly via command line arg.
kannanjgithub Mar 31, 2026
809ae0f
Merge remote-tracking branch 'origin/mcs-interop-tests' into mcs-inte…
kannanjgithub Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Style fixes.
  • Loading branch information
kannanjgithub committed Feb 10, 2026
commit 42148896ee20cecb9b6a99044ceebc472d42fc80
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.ComputeEngineCredentials;
Expand All @@ -40,16 +39,12 @@
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.InsecureServerCredentials;
import io.grpc.InternalManagedChannelBuilder;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.LongUpDownCounterMetricInstrument;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MetricInstrument;
import io.grpc.MetricSink;
import io.grpc.ServerBuilder;
import io.grpc.TlsChannelCredentials;
import io.grpc.alts.AltsChannelCredentials;
Expand All @@ -58,7 +53,6 @@
import io.grpc.auth.MoreCallCredentials;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.JsonParser;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.netty.InsecureFromHttp1ChannelCredentials;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
Expand All @@ -72,8 +66,6 @@
import io.grpc.testing.integration.Messages.ResponseParameters;
import io.grpc.testing.integration.Messages.SimpleRequest;
import io.grpc.testing.integration.Messages.SimpleResponse;
import io.grpc.testing.integration.Messages.StreamingInputCallRequest;
import io.grpc.testing.integration.Messages.StreamingInputCallResponse;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import io.grpc.testing.integration.Messages.TestOrcaReport;
Expand All @@ -83,9 +75,7 @@
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -612,12 +602,10 @@ private ClientInterceptor maybeCreateAdditionalMetadataInterceptor(
}

private class Tester extends AbstractInteropTest {
private FakeMetricsSink fakeMetricsSink = new FakeMetricsSink();

@Override
protected ManagedChannelBuilder<?> createChannelBuilder() {
boolean useSubchannelMetricsSink = testCase.equals(MCS_CS.toString());
boolean useGeneric = testCase.equals(MCS_CS.toString())? true : false;
boolean useGeneric = testCase.equals(MCS_CS.toString()) ? true : false;
ChannelCredentials channelCredentials;
if (customCredentialsType != null) {
useGeneric = true; // Retain old behavior; avoids erroring if incompatible
Expand Down Expand Up @@ -694,9 +682,6 @@ protected ManagedChannelBuilder<?> createChannelBuilder() {
if (addMdInterceptor != null) {
channelBuilder.intercept(addMdInterceptor);
}
if (useSubchannelMetricsSink) {
InternalManagedChannelBuilder.addMetricSink(channelBuilder, fakeMetricsSink);
}
return channelBuilder;
}
if (!useOkHttp) {
Expand Down Expand Up @@ -1070,7 +1055,8 @@ protected int operationTimeoutMillis() {
return 15000;
}

class StreamingOutputCallResponseObserver implements StreamObserver<StreamingOutputCallResponse> {
class StreamingOutputCallResponseObserver implements
StreamObserver<StreamingOutputCallResponse> {
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
Comment thread
ejona86 marked this conversation as resolved.
private volatile boolean isCompleted = true;

Expand Down Expand Up @@ -1101,30 +1087,35 @@ public void testMcs() throws Exception {
asyncStub.fullDuplexCall(responseObserver1);
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
.setPayload(Payload.newBuilder().setBody(
ByteString.copyFrom(MCS_CS.description().getBytes())).build()).build();
ByteString.copyFromUtf8(MCS_CS.description())).build()).build();
streamObserver1.onNext(request);
Object responseObj = responseObserver1.take();
StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse) responseObj;
String clientSocketAddressInCall1 = new String(callResponse.getPayload().getBody().toByteArray());
String clientSocketAddressInCall1 = new String(callResponse.getPayload().getBody()
.toByteArray(), UTF_8);
assertThat(clientSocketAddressInCall1).isNotEmpty();

StreamingOutputCallResponseObserver responseObserver2 = new StreamingOutputCallResponseObserver();
StreamingOutputCallResponseObserver responseObserver2 =
new StreamingOutputCallResponseObserver();
StreamObserver<StreamingOutputCallRequest> streamObserver2 =
asyncStub.fullDuplexCall(responseObserver2);
streamObserver2.onNext(request);
callResponse = (StreamingOutputCallResponse) responseObserver2.take();
String clientSocketAddressInCall2 = new String(callResponse.getPayload().getBody().toByteArray());
String clientSocketAddressInCall2 =
new String(callResponse.getPayload().getBody().toByteArray(), UTF_8);

assertThat(clientSocketAddressInCall1).isEqualTo(clientSocketAddressInCall2);

// The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new
// connection to be created in the same subchannel and not get queued.
StreamingOutputCallResponseObserver responseObserver3 = new StreamingOutputCallResponseObserver();
StreamingOutputCallResponseObserver responseObserver3 =
new StreamingOutputCallResponseObserver();
StreamObserver<StreamingOutputCallRequest> streamObserver3 =
asyncStub.fullDuplexCall(responseObserver3);
streamObserver3.onNext(request);
callResponse = (StreamingOutputCallResponse) responseObserver3.take();
String clientSocketAddressInCall3 = new String(callResponse.getPayload().getBody().toByteArray());
String clientSocketAddressInCall3 =
new String(callResponse.getPayload().getBody().toByteArray(), UTF_8);

assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1);

Expand All @@ -1135,8 +1126,7 @@ public void testMcs() throws Exception {
streamObserver3.onCompleted();
assertThat(responseObserver3.isCompleted).isTrue();
}

}
}

private static String validTestCasesHelpText() {
StringBuilder builder = new StringBuilder();
Expand All @@ -1149,39 +1139,4 @@ private static String validTestCasesHelpText() {
}
return builder.toString();
}

static class FakeMetricsSink implements MetricSink {
private volatile long openConnectionCount;

@Override
public Map<String, Boolean> getEnabledMetrics() {
return null;
}

@Override
public Set<String> getOptionalLabels() {
return null;
}

@Override
public int getMeasuresSize() {
return 0;
}

@Override
public void updateMeasures(List<MetricInstrument> instruments) {}

@Override
public void addLongUpDownCounter(LongUpDownCounterMetricInstrument metricInstrument, long value,
List<String> requiredLabelValues,
List<String> optionalLabelValues) {
if (metricInstrument.getName().equals("grpc.subchannel.open_connections")) {
openConnectionCount = value;
}
}

synchronized long getOpenConnectionCount() {
return openConnectionCount;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.google.common.collect.Queues;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.ByteString;
import io.grpc.Attributes;
import io.grpc.Context;
import io.grpc.Contexts;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Metadata;
Expand Down Expand Up @@ -62,7 +62,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import io.grpc.Context;

/**
* Implementation of the business logic for the TestService. Uses an executor to schedule chunks
Expand All @@ -71,7 +70,6 @@
public class TestServiceImpl implements io.grpc.BindableService, AsyncService {
static Context.Key<SocketAddress> PEER_ADDRESS_CONTEXT_KEY = Context.key("peer-address");
private final Random random = new Random();

private final ScheduledExecutorService executor;
private final ByteString compressableBuffer;
private final MetricRecorder metricRecorder;
Expand Down Expand Up @@ -568,9 +566,6 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
// Create a new context with the peer address value
Context newContext = Context.current().withValue(PEER_ADDRESS_CONTEXT_KEY, peerAddress);
try {

// Continue the call processing within the new context
// return newContext.call(() -> next.startCall(call, headers));
return Contexts.interceptCall(newContext, call, headers, next);
} catch (Exception ex) {
throw new RuntimeException(ex);
Expand Down