Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions basic/grpc-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
gRPC Client Sample
==================

This sample demonstrates how to use Spring Integration's gRPC support with an **Outbound Gateway** to make gRPC requests to a server.

The sample implements a gRPC client that calls a `HelloWorldService` demonstrating different communication patterns:

* **SayHello** - Unary RPC (single request, single response)
* **StreamSayHello** - Server streaming RPC (single request, multiple responses)

The client automatically executes both examples on startup using `ApplicationRunner` beans.

## Running the Sample

**Important:** Start the gRPC server first (see the grpc-server sample) and it must have the same gRPC proto files as the client.

Then start the gRPC client using Gradle:

$ gradlew :grpc-client:bootRun

#### Using an IDE such as SpringSource Tool Suite™ (STS)

In STS (Eclipse), go to package **org.springframework.integration.samples.grpc**, right-click **Application** and select **Run as** --> **Java Application** (or Spring Boot Application).

### Output

The client will automatically send requests to the server and display the responses:

```
Single response reply: message: "Hello Jack"

Stream received reply: Hello Jack
Stream received reply: Hello again!
```

## Configuration

The gRPC server connection is configured in `application.properties`:

```properties
grpc.server.host=localhost
grpc.server.port=9090
```

## Resources

* [Spring Integration gRPC Documentation](https://docs.spring.io/spring-integration/reference/grpc.html)
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.samples.grpc;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* Main application class for the gRPC client sample.
* This Spring Boot application demonstrates how to use Spring Integration
* with gRPC for client-side communication.
*
* @author Glenn Renfro
*/
@SpringBootApplication
public final class Application {

private Application() {
}

/**
* Main entry point for the gRPC client application.
*
* @param args command line arguments
*/
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.samples.grpc.configuration;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.grpc.dsl.Grpc;
import org.springframework.integration.grpc.outbound.GrpcOutboundGateway;
import org.springframework.integration.grpc.proto.HelloReply;
import org.springframework.integration.grpc.proto.HelloRequest;
import org.springframework.integration.grpc.proto.HelloWorldServiceGrpc;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

/**
* Configuration class for the gRPC client sample.
* Configures the gRPC channel, message channels, and integration flows
* for both single response and streaming response scenarios.
*
* @author Glenn Renfro
*/
@Configuration
class ClientHelloWorldConfiguration {

private static final Log LOG = LogFactory.getLog(ClientHelloWorldConfiguration.class);

/**
* Creates a managed gRPC channel for communication with the server.
*
* @param host the gRPC server host (default: localhost)
* @param port the gRPC server port (default: 9090)
* @return the configured managed channel
*/
@Bean(destroyMethod = "shutdownNow")
ManagedChannel managedChannel(@Value("${grpc.server.host:localhost}") String host,
@Value("${grpc.server.port:9090}") int port) {
return ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build();
}

/**
* Creates a message channel for single response gRPC requests.
*
* @return the direct message channel
*/
@Bean
MessageChannel grpcInputChannelSingleResponse() {
return new DirectChannel();
}

/**
* Creates a message channel for streaming response gRPC requests.
*
* @return the direct message channel
*/
@Bean
MessageChannel grpcInputChannelStreamResponse() {
return new DirectChannel();
}

/**
* Creates a FluxMessageChannel for output.
*
* @return the flux message channel
*/
@Bean
FluxMessageChannel grpcStreamOutputChannel() {
return new FluxMessageChannel();
}

/**
* Creates an application runner that sends a single gRPC request and receives a single response.
*
* @param grpcInputChannelSingleResponse the message channel for single response requests
* @param replyTimeout the time in milliseconds to await for the response. Defaults to 10,000 milliseconds.
* @return the application runner
*/
@Bean
ApplicationRunner grpcClientSingleResponse(MessageChannel grpcInputChannelSingleResponse,
@Value("${grpc.client.single.reply.timeout:10000}") long replyTimeout) {
return args -> {
HelloRequest request = HelloRequest.newBuilder().setName("Jack").build();
QueueChannel replyChannel = new QueueChannel();
Message<?> requestMessage = MessageBuilder.withPayload(request)
.setReplyChannel(replyChannel)
.build();
grpcInputChannelSingleResponse.send(requestMessage);
Message<?> reply = replyChannel.receive(replyTimeout);
if (reply != null) {
LOG.info("Single response reply: " + reply.getPayload());
}
else {
LOG.warn("No reply received");
}
};
}

/**
* Creates an application runner that sends a gRPC request and receives a stream of responses.
*
* @param grpcInputChannelStreamResponse the message channel for streaming response requests
* @param grpcStreamOutputChannel channel that contains the responses
* @param replyTimeout the time in seconds to await for the response. Defaults to 1 second.
* @return the application runner
*/
@Bean
ApplicationRunner grpcClientStreamResponse(MessageChannel grpcInputChannelStreamResponse,
FluxMessageChannel grpcStreamOutputChannel,
@Value("${grpc.client.stream.reply.timeout:1}") int replyTimeout) {
return args -> {
CountDownLatch latch = new CountDownLatch(1);

Flux.from(grpcStreamOutputChannel)
.doOnSubscribe(subscription -> {
HelloRequest request = HelloRequest.newBuilder().setName("Jack").build();
Message<?> requestMessage = MessageBuilder.withPayload(request).build();
grpcInputChannelStreamResponse.send(requestMessage);
})
.map(message -> (HelloReply) message.getPayload())
.map(HelloReply::getMessage)
.doOnNext(msg -> LOG.info("Stream received reply: " + msg))
.doOnComplete(latch::countDown)
.doOnError(error -> {
LOG.error("Error in stream: " + error.getMessage(), error);
latch.countDown();
})
.subscribe();

latch.await(replyTimeout, TimeUnit.SECONDS);
};
}

/**
* Creates an integration flow for outbound gRPC requests with single responses.
*
* @param managedChannel the gRPC managed channel
* @param grpcInputChannelSingleResponse the input message channel
* @return the integration flow
*/
@Bean
IntegrationFlow grpcOutboundFlowSingleResponse(ManagedChannel managedChannel,
MessageChannel grpcInputChannelSingleResponse) {
return IntegrationFlow.from(grpcInputChannelSingleResponse)
.handle(Grpc.outboundGateway(managedChannel, HelloWorldServiceGrpc.class)
.methodName("SayHello"))
.get();
}

/**
* Creates an integration flow for outbound gRPC requests with streaming responses.
*
* @param managedChannel the gRPC managed channel
* @param grpcInputChannelStreamResponse the input message channel
* @param grpcStreamOutputChannel channel containing the results
* @return the integration flow
*/
@Bean
IntegrationFlow grpcOutboundFlowStreamResponse(ManagedChannel managedChannel,
MessageChannel grpcInputChannelStreamResponse,
FluxMessageChannel grpcStreamOutputChannel) {
//TODO: Need advice, there has to be a better way than what I have below (or is this a bug in the
// GrpcOutboundGateway?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let's debug it together to see what is going on!

GrpcOutboundGateway gateway = new GrpcOutboundGateway(managedChannel, HelloWorldServiceGrpc.class) {
@Override
protected boolean shouldSplitOutput(Iterable<?> reply) {
return false;
}
};
gateway.setMethodName("StreamSayHello");
gateway.setAsync(true);

return IntegrationFlow.from(grpcInputChannelStreamResponse)
.handle(gateway)
.channel(grpcStreamOutputChannel)
.get();
}

}
27 changes: 27 additions & 0 deletions basic/grpc-client/src/main/proto/hello.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
syntax = "proto3";
import "types.proto";

package integration.grpc.hello;

option java_package = "org.springframework.integration.grpc.proto";

option java_multiple_files = true;

option java_outer_classname = "HelloWorldProto";

// The greeting service definition.
service HelloWorldService {

// Sends a greeting
rpc SayHello(HelloRequest) returns (HelloReply) {}

// Sends a greeting and something else
rpc StreamSayHello(HelloRequest) returns (stream HelloReply) {}

// Sends a greeting to everyone presenting
rpc HelloToEveryOne(stream HelloRequest) returns (HelloReply) {}

// Streams requests and replies
rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {}

}
23 changes: 23 additions & 0 deletions basic/grpc-client/src/main/proto/types.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
syntax = "proto3";

package integration.grpc.hello;

option java_package = "org.springframework.integration.grpc.proto";

option java_multiple_files = true;

option java_outer_classname = "TypesProto";

// The request message containing the user's name.
message HelloRequest {

string name = 1;

}

// The response message containing the greetings
message HelloReply {

string message = 1;

}
4 changes: 4 additions & 0 deletions basic/grpc-client/src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
grpc.server.host=localhost
grpc.server.port=9090
grpc.client.stream.reply.timeout=1
grpc.client.single.reply.timeout=10000
Loading