diff --git a/examples/BUILD.bazel b/examples/BUILD.bazel index 82d6b9fb2c1..563fa07ce84 100644 --- a/examples/BUILD.bazel +++ b/examples/BUILD.bazel @@ -80,6 +80,7 @@ java_library( "@io_grpc_grpc_java//netty", ], deps = [ + ":_health_java_grpc", ":echo_java_grpc", ":echo_java_proto", ":hello_streaming_java_grpc", @@ -93,7 +94,11 @@ java_library( "@io_grpc_grpc_java//api", "@io_grpc_grpc_java//context", "@io_grpc_grpc_java//protobuf", + "@io_grpc_grpc_java//services:health", + "@io_grpc_grpc_java//services:healthlb", "@io_grpc_grpc_java//stub", + "@io_grpc_grpc_proto//:health_proto", + "@io_grpc_grpc_proto//:health_java_proto", "@maven//:com_google_api_grpc_proto_google_common_protos", "@maven//:com_google_code_findbugs_jsr305", "@maven//:com_google_code_gson_gson", @@ -217,3 +222,29 @@ java_binary( ":examples", ], ) + +java_binary( + name = "healthservice-server", + testonly = 1, + main_class = "io.grpc.examples.healthservice.HealthServiceServer", + runtime_deps = [ + ":examples", + ], +) + +java_binary( + name = "healthservice-client", + testonly = 1, + main_class = "io.grpc.examples.healthservice.HealthServiceClient", + runtime_deps = [ + ":examples", + ], +) + +java_grpc_library( + name = "_health_java_grpc", + srcs = ["@io_grpc_grpc_proto//:health_proto"], + visibility = ["//visibility:private"], + deps = ["@io_grpc_grpc_proto//:health_java_proto"], +) + diff --git a/examples/README.md b/examples/README.md index f83d04f08d9..ae849fa7d6d 100644 --- a/examples/README.md +++ b/examples/README.md @@ -119,6 +119,20 @@ before trying out the examples. +-
+ Health Service + + The [health service example](src/main/java/io/grpc/examples/healthservice) + provides a HelloWorld gRPC server that doesn't like short names along with a + health service. It also provides a client application which makes HelloWorld + calls and checks the health status. + + The client application also shows how the round robin load balancer can + utilize the health status to avoid making calls to a service that is + not actively serving. +
+ + - [Keep Alive](src/main/java/io/grpc/examples/keepalive) ### To build the examples diff --git a/examples/build.gradle b/examples/build.gradle index c330fb1cc0f..a2980b28afa 100644 --- a/examples/build.gradle +++ b/examples/build.gradle @@ -27,6 +27,7 @@ def protocVersion = protobufVersion dependencies { implementation "io.grpc:grpc-protobuf:${grpcVersion}" + implementation "io.grpc:grpc-services:${grpcVersion}" implementation "io.grpc:grpc-stub:${grpcVersion}" compileOnly "org.apache.tomcat:annotations-api:6.0.53" @@ -223,6 +224,20 @@ task cancellationServer(type: CreateStartScripts) { classpath = startScripts.classpath } +task healthServiceServer(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.healthservice.HealthServiceServer' + applicationName = 'health-service-server' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + +task healthServiceClient(type: CreateStartScripts) { + mainClass = 'io.grpc.examples.healthservice.HealthServiceClient' + applicationName = 'health-service-client' + outputDir = new File(project.buildDir, 'tmp/scripts/' + name) + classpath = startScripts.classpath +} + task multiplexingServer(type: CreateStartScripts) { mainClass = 'io.grpc.examples.multiplex.MultiplexingServer' applicationName = 'multiplexing-server' @@ -259,6 +274,8 @@ applicationDistribution.into('bin') { from(deadlineClient) from(keepAliveServer) from(keepAliveClient) + from(healthServiceServer) + from(healthServiceClient) from(cancellationClient) from(cancellationServer) from(multiplexingServer) diff --git a/examples/pom.xml b/examples/pom.xml index 4482a2b8c8f..13f8164fb42 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -42,6 +42,10 @@ io.grpc grpc-protobuf + + io.grpc + grpc-services + io.grpc grpc-stub diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java new file mode 100644 index 00000000000..471084feab6 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceClient.java @@ -0,0 +1,194 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.healthservice; + +import io.grpc.Channel; +import io.grpc.Grpc; +import io.grpc.InsecureChannelCredentials; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.StatusRuntimeException; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.health.v1.HealthGrpc; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A client that requests a greeting from the {@link HelloWorldServer}. + */ +public class HealthServiceClient { + private static final Logger logger = Logger.getLogger(HealthServiceClient.class.getName()); + + private final GreeterGrpc.GreeterBlockingStub greeterBlockingStub; + private final HealthGrpc.HealthStub healthStub; + private final HealthGrpc.HealthBlockingStub healthBlockingStub; + + private final HealthCheckRequest healthRequest; + + /** Construct client for accessing HelloWorld server using the existing channel. */ + public HealthServiceClient(Channel channel) { + greeterBlockingStub = GreeterGrpc.newBlockingStub(channel); + healthStub = HealthGrpc.newStub(channel); + healthBlockingStub = HealthGrpc.newBlockingStub(channel); + healthRequest = HealthCheckRequest.getDefaultInstance(); + LoadBalancerProvider roundRobin = LoadBalancerRegistry.getDefaultRegistry() + .getProvider("round_robin"); + + } + + private ServingStatus checkHealth(String prefix) { + HealthCheckResponse response = + healthBlockingStub.check(healthRequest); + logger.info(prefix + ", current health is: " + response.getStatus()); + return response.getStatus(); + } + + /** Say hello to server. */ + public void greet(String name) { + logger.info("Will try to greet " + name + " ..."); + HelloRequest request = HelloRequest.newBuilder().setName(name).build(); + HelloReply response; + try { + response = greeterBlockingStub.sayHello(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } catch (Exception e) { + e.printStackTrace(); + return; + } + logger.info("Greeting: " + response.getMessage()); + } + + + private static void runTest(String target, String[] users, boolean useRoundRobin) + throws InterruptedException { + ManagedChannelBuilder builder = + Grpc.newChannelBuilder(target, InsecureChannelCredentials.create()); + + // Round Robin, when a healthCheckConfig is present in the default service configuration, runs + // a watch on the health service and when picking an endpoint will + // consider a transport to a server whose service is not in SERVING state to be unavailable. + // Since we only have a single server we are connecting to, then the load balancer will + // return an error without sending the RPC. + if (useRoundRobin) { + builder = builder + .defaultLoadBalancingPolicy("round_robin") + .defaultServiceConfig(generateHealthConfig("")); + } + + ManagedChannel channel = builder.build(); + + System.out.println("\nDoing test with" + (useRoundRobin ? "" : "out") + + " the Round Robin load balancer\n"); + + try { + HealthServiceClient client = new HealthServiceClient(channel); + if (!useRoundRobin) { + client.checkHealth("Before call"); + } + client.greet(users[0]); + if (!useRoundRobin) { + client.checkHealth("After user " + users[0]); + } + + for (String user : users) { + client.greet(user); + Thread.sleep(100); // Since the health update is asynchronous give it time to propagate + } + + if (!useRoundRobin) { + client.checkHealth("After all users"); + Thread.sleep(10000); + client.checkHealth("After 10 second wait"); + } else { + Thread.sleep(10000); + } + client.greet("Larry"); + } finally { + // ManagedChannels use resources like threads and TCP connections. To prevent leaking these + // resources the channel should be shut down when it will no longer be used. If it may be used + // again leave it running. + channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); + } + } + private static Map generateHealthConfig(String serviceName) { + Map config = new HashMap<>(); + Map serviceMap = new HashMap<>(); + + config.put("healthCheckConfig", serviceMap); + serviceMap.put("serviceName", serviceName); + return config; + } + + /** + * Uses a server with both a greet service and the health service. + * If provided, the first element of {@code args} is the name to use in the + * greeting. The second argument is the target server. + * This has an example of using the health service directly through the unary call + * check + * to get the current health. It also utilizes the health of the server's greet service + * indirectly through the round robin load balancer, which uses the streaming rpc + * watch (you can see how it is done in + * {@link io.grpc.protobuf.services.HealthCheckingLoadBalancerFactory}). + */ + public static void main(String[] args) throws Exception { + System.setProperty("java.util.logging.SimpleFormatter.format", + "%1$tH:%1$tM:%1$tS %4$s %2$s: %5$s%6$s%n"); + + String[] users = {"world", "foo", "I am Grut"}; + // Access a service running on the local machine on port 50051 + String target = "localhost:50051"; + // Allow passing in the user and target strings as command line arguments + if (args.length > 0) { + if ("--help".equals(args[0])) { + System.err.println("Usage: [target [name] [name] ...]"); + System.err.println(""); + System.err.println(" target The server to connect to. Defaults to " + target); + System.err.println(" name The names you wish to be greeted by. Defaults to " + Arrays.toString(users)); + System.exit(1); + } + target = args[0]; + } + if (args.length > 1) { + users = new String[args.length-1]; + for (int i=0; i < users.length; i++) { + users[i] = args[i+1]; + } + } + + // Will see failures of rpc's sent while server service is not serving, where the failures come + // from the server + runTest(target, users, false); + + // The client will throw an error when sending the rpc to a non-serving service because the + // round robin load balancer uses the health service's watch rpc. + runTest(target, users, true); + + } +} diff --git a/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java new file mode 100644 index 00000000000..f6547c11103 --- /dev/null +++ b/examples/src/main/java/io/grpc/examples/healthservice/HealthServiceServer.java @@ -0,0 +1,140 @@ +/* + * Copyright 2023 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.examples.healthservice; + +import io.grpc.Grpc; +import io.grpc.InsecureServerCredentials; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.examples.helloworld.GreeterGrpc; +import io.grpc.examples.helloworld.HelloReply; +import io.grpc.examples.helloworld.HelloRequest; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.protobuf.services.HealthStatusManager; +import io.grpc.stub.StreamObserver; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +/** + * Server that manages startup/shutdown of a {@code Greeter} server. + */ +public class HealthServiceServer { + private static final Logger logger = Logger.getLogger(HealthServiceServer.class.getName()); + + private Server server; + private HealthStatusManager health; + + private void start() throws IOException { + /* The port on which the server should run */ + int port = 50051; + health = new HealthStatusManager(); + server = Grpc.newServerBuilderForPort(port, InsecureServerCredentials.create()) + .addService(new GreeterImpl()) + .addService(health.getHealthService()) + .build() + .start(); + logger.info("Server started, listening on " + port); + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println("*** shutting down gRPC server since JVM is shutting down"); + try { + HealthServiceServer.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** server shut down"); + } + }); + + health.setStatus("", ServingStatus.SERVING); + } + + private void stop() throws InterruptedException { + if (server != null) { + server.shutdown().awaitTermination(30, TimeUnit.SECONDS); + } + } + + /** + * Await termination on the main thread since the grpc library uses daemon threads. + */ + private void blockUntilShutdown() throws InterruptedException { + if (server != null) { + server.awaitTermination(); + } + } + + /** + * Main launches the server from the command line. + */ + public static void main(String[] args) throws IOException, InterruptedException { + System.setProperty("java.util.logging.SimpleFormatter.format", + "%1$tH:%1$tM:%1$tS %4$s %2$s: %5$s%6$s%n"); + + final HealthServiceServer server = new HealthServiceServer(); + server.start(); + server.blockUntilShutdown(); + } + + private class GreeterImpl extends GreeterGrpc.GreeterImplBase { + boolean isServing = true; + + @Override + public void sayHello(HelloRequest req, StreamObserver responseObserver) { + if (!isServing) { + responseObserver.onError( + Status.INTERNAL.withDescription("Not Serving right now").asRuntimeException()); + return; + } + + if (isNameLongEnough(req)) { + HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } else { + logger.warning("Tiny message received, throwing a temper tantrum"); + health.setStatus("", ServingStatus.NOT_SERVING); + isServing = false; + + // In 10 seconds set it back to serving + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + isServing = true; + health.setStatus("", ServingStatus.SERVING); + logger.info("tantrum complete"); + } + }).start(); + responseObserver.onError( + Status.INVALID_ARGUMENT.withDescription("Offended by short name").asRuntimeException()); + } + } + + private boolean isNameLongEnough(HelloRequest req) { + return isServing && req.getName().length() >= 5; + } + } +}