Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public class Subscriber {

public static void main(String[] args) throws Exception {
try (var client = new DaprClientBuilder().buildPreviewClient()) {
// Subscribe to events - receives raw String data directly
client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING)
// Subscribe to topic - receives raw String data directly
client.subscribeToTopic(PUBSUB_NAME, topicName, TypeRef.STRING)
.doOnNext(message -> {
System.out.println("Subscriber got: " + message);
})
Expand All @@ -79,8 +79,8 @@ public class SubscriberCloudEvent {

public static void main(String[] args) throws Exception {
try (var client = new DaprClientBuilder().buildPreviewClient()) {
// Subscribe to events - receives CloudEvent<String> with full metadata
client.subscribeToEvents(PUBSUB_NAME, topicName, new TypeRef<CloudEvent<String>>() {})
// Subscribe to topic - receives CloudEvent<String> with full metadata
client.subscribeToTopic(PUBSUB_NAME, topicName, new TypeRef<CloudEvent<String>>() {})
.doOnNext(cloudEvent -> {
System.out.println("Received CloudEvent:");
System.out.println(" ID: " + cloudEvent.getId());
Expand All @@ -101,7 +101,7 @@ public class SubscriberCloudEvent {
You can also pass metadata to the subscription, for example to enable raw payload mode:

```java
client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPayload", "true"))
client.subscribeToTopic(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPayload", "true"))
.doOnNext(message -> {
System.out.println("Subscriber got: " + message);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public static void main(String[] args) throws Exception {
try (var client = new DaprClientBuilder().buildPreviewClient()) {
System.out.println("Subscribing to topic: " + topicName);

// Subscribe to events - receives raw String data directly
client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING)
// Subscribe to topic - receives raw String data directly
client.subscribeToTopic(PUBSUB_NAME, topicName, TypeRef.STRING)
.doOnNext(message -> {
System.out.println("Subscriber got: " + message);
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public static void main(String[] args) throws Exception {
try (var client = new DaprClientBuilder().buildPreviewClient()) {
System.out.println("Subscribing to topic: " + topicName + " (CloudEvent mode)");

// Subscribe to events - receives CloudEvent<String> with full metadata
// Subscribe to topic - receives CloudEvent<String> with full metadata
// Use TypeRef<CloudEvent<String>> to get CloudEvent wrapper with metadata
client.subscribeToEvents(PUBSUB_NAME, topicName, new TypeRef<CloudEvent<String>>() {})
client.subscribeToTopic(PUBSUB_NAME, topicName, new TypeRef<CloudEvent<String>>() {})
.doOnNext(cloudEvent -> {
System.out.println("Received CloudEvent:");
System.out.println(" ID: " + cloudEvent.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ public void testPubSubFlux() throws Exception {

Set<String> messages = Collections.synchronizedSet(new HashSet<>());

// subscribeToEvents now returns Flux<T> directly (raw data)
var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME_FLUX, TypeRef.STRING)
// subscribeToTopic returns Flux<T> directly (raw data)
var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_FLUX, TypeRef.STRING)
.doOnNext(rawMessage -> {
// rawMessage is String directly
if (rawMessage.contains(runId)) {
Expand Down Expand Up @@ -196,7 +196,7 @@ public void testPubSubCloudEvent() throws Exception {
Set<String> messageIds = Collections.synchronizedSet(new HashSet<>());

// Use TypeRef<CloudEvent<String>> to receive full CloudEvent with metadata
var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME_CLOUDEVENT, new TypeRef<CloudEvent<String>>(){})
var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_CLOUDEVENT, new TypeRef<CloudEvent<String>>(){})
.doOnNext(cloudEvent -> {
if (cloudEvent.getData() != null && cloudEvent.getData().contains(runId)) {
messageIds.add(cloudEvent.getId());
Expand Down Expand Up @@ -241,8 +241,8 @@ public void testPubSubRawPayload() throws Exception {
Set<String> messages = Collections.synchronizedSet(new HashSet<>());
Map<String, String> metadata = Map.of("rawPayload", "true");

// Use subscribeToEvents with rawPayload metadata
var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME_RAWPAYLOAD, TypeRef.STRING, metadata)
// Use subscribeToTopic with rawPayload metadata
var disposable = previewClient.subscribeToTopic(PUBSUB_NAME, TOPIC_NAME_RAWPAYLOAD, TypeRef.STRING, metadata)
.doOnNext(rawMessage -> {
if (rawMessage.contains(runId)) {
messages.add(rawMessage);
Expand Down
6 changes: 3 additions & 3 deletions sdk/src/main/java/io/dapr/client/DaprClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -482,15 +482,15 @@ public <T> Subscription subscribeToEvents(
* {@inheritDoc}
*/
@Override
public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type) {
return subscribeToEvents(pubsubName, topic, type, null);
public <T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T> type) {
return subscribeToTopic(pubsubName, topic, type, null);
}

/**
* {@inheritDoc}
*/
@Override
public <T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type, Map<String, String> metadata) {
public <T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T> type, Map<String, String> metadata) {
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder =
DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder()
.setTopic(topic)
Expand Down
8 changes: 4 additions & 4 deletions sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ <T> Mono<BulkPublishResponse<T>> publishEvents(String pubsubName, String topicNa
* @param type Type for object deserialization.
* @param <T> Type of object deserialization.
* @return An active subscription.
* @deprecated Use {@link #subscribeToEvents(String, String, TypeRef)} instead for a more reactive approach.
* @deprecated Use {@link #subscribeToTopic(String, String, TypeRef)} instead for a more reactive approach.
*/
@Deprecated
<T> Subscription subscribeToEvents(
Expand All @@ -298,12 +298,12 @@ <T> Subscription subscribeToEvents(
* @return A Flux of deserialized event payloads.
* @param <T> Type of the event payload.
*/
<T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type);
<T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T> type);

/**
* Subscribe to pubsub events via streaming using Project Reactor Flux with metadata support.
*
* <p>If metadata is null or empty, this method delegates to {@link #subscribeToEvents(String, String, TypeRef)}.
* <p>If metadata is null or empty, this method delegates to {@link #subscribeToTopic(String, String, TypeRef)}.
* Use metadata {@code {"rawPayload": "true"}} for raw payload subscriptions where Dapr
* delivers messages without CloudEvent wrapping.
*
Expand All @@ -314,7 +314,7 @@ <T> Subscription subscribeToEvents(
* @return A Flux of deserialized event payloads.
* @param <T> Type of the event payload.
*/
<T> Flux<T> subscribeToEvents(String pubsubName, String topic, TypeRef<T> type, Map<String, String> metadata);
<T> Flux<T> subscribeToTopic(String pubsubName, String topic, TypeRef<T> type, Map<String, String> metadata);

/*
* Converse with an LLM.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,8 @@ public void onCompleted() {
final AtomicInteger eventCount = new AtomicInteger(0);
final Semaphore gotAll = new Semaphore(0);

// subscribeToEvents now returns Flux<T> directly (raw data)
var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING)
// subscribeToTopic returns Flux<T> directly (raw data)
var disposable = previewClient.subscribeToTopic(pubsubName, topicName, TypeRef.STRING)
.doOnNext(rawData -> {
// rawData is String directly, not CloudEvent
assertEquals(data, rawData);
Expand Down Expand Up @@ -728,8 +728,8 @@ public void onCompleted() {
final Semaphore gotAll = new Semaphore(0);
Map<String, String> metadata = Map.of("rawPayload", "true");

// Use subscribeToEvents with rawPayload metadata
var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING, metadata)
// Use subscribeToTopic with rawPayload metadata
var disposable = previewClient.subscribeToTopic(pubsubName, topicName, TypeRef.STRING, metadata)
.doOnNext(rawData -> {
assertEquals(data, rawData);
assertTrue(rawData instanceof String);
Expand Down
Loading