From ed8c78515bd82fed7d26fcc83a4d5d34e1589a89 Mon Sep 17 00:00:00 2001 From: Thomas Bonk <130759028+t-bonk@users.noreply.github.com> Date: Fri, 27 Jun 2025 14:49:10 +0200 Subject: [PATCH 1/7] Provided a better solution for reading dead outbox entries --- java/outbox.md | 74 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 58 insertions(+), 16 deletions(-) diff --git a/java/outbox.md b/java/outbox.md index 7a400f9830..48fd8602d3 100644 --- a/java/outbox.md +++ b/java/outbox.md @@ -355,29 +355,71 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for ::: -### Filter for Dead Entries +### Reading Dead Entries -This filtering can't be done on the database since the maximum number of attempts is only available from the CDS properties. - -To ensure that only dead outbox entries are returned when reading `DeadOutboxMessages`, the following code provides the handler for the `DeadLetterQueueService` and the `@After-READ` handler that filters for the dead outbox entries: +This filtering of dead entries are done on the database by adding `where` clauses for each outbox and their maximum number of retries. The following code provides the handler for the `DeadLetterQueueService` that reads the dead entries directly from the entity `Messages`: ```java @Component @ServiceName(OutboxDeadLetterQueueService_.CDS_NAME) public class DeadOutboxMessagesHandler implements EventHandler { - @After(entity = DeadOutboxMessages_.CDS_NAME) - public void filterDeadEntries(CdsReadEventContext context) { - CdsProperties.Outbox outboxConfigs = context.getCdsRuntime().getEnvironment().getCdsProperties().getOutbox(); - List deadEntries = context - .getResult() - .listOf(DeadOutboxMessages.class) - .stream() - .filter(entry -> entry.getAttempts() >= outboxConfigs.getService(entry.getTarget()).getMaxAttempts()) - .toList(); - - context.setResult(deadEntries); - } + private final PersistenceService db; + + public DeadOutboxMessagesHandler(@Qualifier(PersistenceService.DEFAULT_NAME) PersistenceService db) { + this.db = db; + } + + @On(service = PersistenceService.DEFAULT_NAME, entity = DeadOutboxMessages_.CDS_NAME) + public void readDeadOutboxMessages(CdsReadEventContext context) { + Optional outboxFilters = this.createOutboxFilters(context.getCdsRuntime()); + CqnSelect cqn = context.getCqn(); + Select> select = Select + .from(Messages_.CDS_NAME) + .columns(cqn.items()); + + select = select.groupBy(cqn.groupBy()); + select = select.excluding(cqn.excluding()); + if(cqn.having().isPresent()) { + select = select.having(cqn.having().get()); + } + if(cqn.search().isPresent()) { + select.search(cqn.search().get()); + } + if(cqn.where().isPresent()) { + CqnPredicate where = cqn.where().get(); + if (outboxFilters.isPresent()) { + where = outboxFilters.get().and(where); + } + select = select.where(where); + } else if (outboxFilters.isPresent()) { + select = select.where(outboxFilters.get()); + } + select = select.orderBy(cqn.orderBy()).limit(cqn.top(), cqn.skip()).inlineCount(); + + List deadMessages = this.db.run(select).list(); + context.setResult(ResultBuilder.selectedRows(deadMessages).inlineCount(deadMessages.size()).result()); + } + + private Optional createOutboxFilters(CdsRuntime runtime) { + List outboxServices = runtime.getServiceCatalog().getServices(OutboxService.class) + .filter(s -> !s.getName().equals(OutboxService.INMEMORY_NAME)).toList(); + CdsProperties.Outbox outboxConfigs = runtime.getEnvironment().getCdsProperties().getOutbox(); + + Predicate where = null; + for(OutboxService service : outboxServices) { + OutboxServiceConfig config = outboxConfigs.getService(service.getName()); + Predicate targetPredicate = CQL.get(Messages.TARGET).eq(service.getName()).and(CQL.get(Messages.ATTEMPTS).ge(config.getMaxAttempts())); + + if (where == null) { + where = targetPredicate; + } else { + where = where.or(targetPredicate); + } + } + + return Optional.ofNullable(where); + } } ``` From e01d28123f2fae8411c3d668c394a920016e3125 Mon Sep 17 00:00:00 2001 From: Thomas Bonk <130759028+t-bonk@users.noreply.github.com> Date: Mon, 14 Jul 2025 10:22:26 +0200 Subject: [PATCH 2/7] Added a before handler to modify the select statement --- java/outbox.md | 55 ++++++++++++++++++++++---------------------------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/java/outbox.md b/java/outbox.md index 48fd8602d3..3bac8b19b4 100644 --- a/java/outbox.md +++ b/java/outbox.md @@ -357,7 +357,8 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for ### Reading Dead Entries -This filtering of dead entries are done on the database by adding `where` clauses for each outbox and their maximum number of retries. The following code provides the handler for the `DeadLetterQueueService` that reads the dead entries directly from the entity `Messages`: +This filtering of dead entries is done on the database by adding `where` clauses for each outbox and their maximum number of retries. The following code provides the handler for the `DeadLetterQueueService` that modifies the where clause by adding +additional conditions for filtering the outbox entries: ```java @Component @@ -370,35 +371,28 @@ public class DeadOutboxMessagesHandler implements EventHandler { this.db = db; } - @On(service = PersistenceService.DEFAULT_NAME, entity = DeadOutboxMessages_.CDS_NAME) - public void readDeadOutboxMessages(CdsReadEventContext context) { - Optional outboxFilters = this.createOutboxFilters(context.getCdsRuntime()); + @Before(entity = DeadOutboxMessages_.CDS_NAME) + public void modifyWhereClause(CdsReadEventContext context) { CqnSelect cqn = context.getCqn(); - Select> select = Select - .from(Messages_.CDS_NAME) - .columns(cqn.items()); - - select = select.groupBy(cqn.groupBy()); - select = select.excluding(cqn.excluding()); - if(cqn.having().isPresent()) { - select = select.having(cqn.having().get()); - } - if(cqn.search().isPresent()) { - select.search(cqn.search().get()); - } - if(cqn.where().isPresent()) { - CqnPredicate where = cqn.where().get(); - if (outboxFilters.isPresent()) { - where = outboxFilters.get().and(where); - } - select = select.where(where); - } else if (outboxFilters.isPresent()) { - select = select.where(outboxFilters.get()); - } - select = select.orderBy(cqn.orderBy()).limit(cqn.top(), cqn.skip()).inlineCount(); - - List deadMessages = this.db.run(select).list(); - context.setResult(ResultBuilder.selectedRows(deadMessages).inlineCount(deadMessages.size()).result()); + Optional outboxFilters = this.createOutboxFilters(context.getCdsRuntime()); + CqnSelect modifiedCqn = copy( + cqn, + new Modifier() { + @Override + public CqnPredicate where(Predicate where) { + if (where != null && outboxFilters.isPresent()) { + return where.and(outboxFilters.get()); + } else if (where == null && outboxFilters.isPresent()) { + return outboxFilters.get(); + } else if (where != null && !outboxFilters.isPresent()) { + return where; + } else { + return null; + } + } + }); + + context.setCqn(modifiedCqn); } private Optional createOutboxFilters(CdsRuntime runtime) { @@ -461,8 +455,7 @@ The injected `PersistenceService` instance is used to perform the operations on [Learn more about CQL statement inspection.](./working-with-cql/query-introspection#cqnanalyzer){.learn-more} ::: tip Use paging logic -Avoid to read all entries of the `cds.outbox.Messages` or `OutboxDeadLetterQueueService.DeadOutboxMessages` table at once, as the size of an entry is unpredictable -and depends on the size of the payload. Prefer paging logic instead. +Avoid to read all entries of the `cds.outbox.Messages` or `OutboxDeadLetterQueueService.DeadOutboxMessages` table at once, as the size of an entry is unpredictable and depends on the size of the payload. Prefer paging logic instead. ::: ## Observability using Open Telemetry From 5ac85af2f2d8cac3e15672bf0e43eec392037b2a Mon Sep 17 00:00:00 2001 From: Thomas Bonk <130759028+t-bonk@users.noreply.github.com> Date: Fri, 18 Jul 2025 09:48:51 +0200 Subject: [PATCH 3/7] Apply suggestions from code review Co-authored-by: BraunMatthias <59841349+BraunMatthias@users.noreply.github.com> --- java/outbox.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/java/outbox.md b/java/outbox.md index 3bac8b19b4..7ecb787646 100644 --- a/java/outbox.md +++ b/java/outbox.md @@ -357,8 +357,7 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for ### Reading Dead Entries -This filtering of dead entries is done on the database by adding `where` clauses for each outbox and their maximum number of retries. The following code provides the handler for the `DeadLetterQueueService` that modifies the where clause by adding -additional conditions for filtering the outbox entries: +Filtering the dead entries is done by adding by adding an appropriate `where`-clause to `READ`-queries on the outbox message entries which reached maximum number of retries. The following code provides an example handler implementation defining this behaviour for the `DeadLetterQueueService`: ```java @Component @@ -372,11 +371,10 @@ public class DeadOutboxMessagesHandler implements EventHandler { } @Before(entity = DeadOutboxMessages_.CDS_NAME) - public void modifyWhereClause(CdsReadEventContext context) { - CqnSelect cqn = context.getCqn(); + public void addDeadEntryFilter(CdsReadEventContext context) { Optional outboxFilters = this.createOutboxFilters(context.getCdsRuntime()); CqnSelect modifiedCqn = copy( - cqn, + context.getCqn(), new Modifier() { @Override public CqnPredicate where(Predicate where) { @@ -455,7 +453,7 @@ The injected `PersistenceService` instance is used to perform the operations on [Learn more about CQL statement inspection.](./working-with-cql/query-introspection#cqnanalyzer){.learn-more} ::: tip Use paging logic -Avoid to read all entries of the `cds.outbox.Messages` or `OutboxDeadLetterQueueService.DeadOutboxMessages` table at once, as the size of an entry is unpredictable and depends on the size of the payload. Prefer paging logic instead. +Avoid to read all outbox entries at once as a single entry can have significant size reflecting the request's payload. Prefer `READ`-queries with paging instead. ::: ## Observability using Open Telemetry From 0d5b5edbbb80004fe75b18510228cacad57626a6 Mon Sep 17 00:00:00 2001 From: Thomas Bonk <130759028+t-bonk@users.noreply.github.com> Date: Fri, 18 Jul 2025 10:39:52 +0200 Subject: [PATCH 4/7] changes after code review --- java/outbox.md | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/java/outbox.md b/java/outbox.md index 7ecb787646..5a006806c8 100644 --- a/java/outbox.md +++ b/java/outbox.md @@ -364,7 +364,7 @@ Filtering the dead entries is done by adding by adding an appropriate `where`-cl @ServiceName(OutboxDeadLetterQueueService_.CDS_NAME) public class DeadOutboxMessagesHandler implements EventHandler { - private final PersistenceService db; + private final PersistenceService db; public DeadOutboxMessagesHandler(@Qualifier(PersistenceService.DEFAULT_NAME) PersistenceService db) { this.db = db; @@ -373,24 +373,19 @@ public class DeadOutboxMessagesHandler implements EventHandler { @Before(entity = DeadOutboxMessages_.CDS_NAME) public void addDeadEntryFilter(CdsReadEventContext context) { Optional outboxFilters = this.createOutboxFilters(context.getCdsRuntime()); - CqnSelect modifiedCqn = copy( - context.getCqn(), - new Modifier() { - @Override - public CqnPredicate where(Predicate where) { - if (where != null && outboxFilters.isPresent()) { - return where.and(outboxFilters.get()); - } else if (where == null && outboxFilters.isPresent()) { - return outboxFilters.get(); - } else if (where != null && !outboxFilters.isPresent()) { - return where; - } else { - return null; - } - } - }); - - context.setCqn(modifiedCqn); + + if (outboxFilters.isPresent()) { + CqnSelect modifiedCqn = + copy( + context.getCqn(), + new Modifier() { + @Override + public CqnPredicate where(Predicate where) { + return outboxFilters.get().and(where); + } + }); + context.setCqn(modifiedCqn); + } } private Optional createOutboxFilters(CdsRuntime runtime) { From 589ee70726cffe342684d7016f1fad09d09db2a1 Mon Sep 17 00:00:00 2001 From: Thomas Bonk <130759028+t-bonk@users.noreply.github.com> Date: Fri, 22 Aug 2025 13:33:43 +0200 Subject: [PATCH 5/7] Update java/outbox.md Co-authored-by: BraunMatthias <59841349+BraunMatthias@users.noreply.github.com> --- java/outbox.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/outbox.md b/java/outbox.md index 5a006806c8..67838bab67 100644 --- a/java/outbox.md +++ b/java/outbox.md @@ -357,7 +357,7 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for ### Reading Dead Entries -Filtering the dead entries is done by adding by adding an appropriate `where`-clause to `READ`-queries on the outbox message entries which reached maximum number of retries. The following code provides an example handler implementation defining this behaviour for the `DeadLetterQueueService`: +Filtering the dead entries is done by adding an appropriate `where`-clause to all `READ`-queries which covers all outbox message entries with maximum number of retries. The following code provides an example handler implementation defining this behaviour for the `DeadLetterQueueService`: ```java @Component From ce44db54450991aa57aa28fdd4a24ba8a5f48605 Mon Sep 17 00:00:00 2001 From: Thomas Bonk <130759028+t-bonk@users.noreply.github.com> Date: Thu, 22 Jan 2026 16:09:16 +0100 Subject: [PATCH 6/7] optimized createOutboxFilters method --- java/outbox.md | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/java/outbox.md b/java/outbox.md index 9c60fa0948..4c41b4ad45 100644 --- a/java/outbox.md +++ b/java/outbox.md @@ -49,7 +49,7 @@ To enable the persistence for the outbox, you need to add the service `outbox` o } ``` -::: warning +::: warning Be aware that you need to migrate the database schemas of all tenants after you've enhanced your model with an outbox version from `@sap/cds` version 6.0.0 or later. ::: @@ -173,7 +173,7 @@ To avoid this, you can apply a manual workaround as follows: 1. Customize the outbox configuration and isolating them via distinct namespaces for each service. 2. Adapt the Audit Log outbox configuration. 3. Adapt the messaging outbox configuration per service. - + These steps are described in the following sections. #### Deactivate Default Outboxes @@ -228,7 +228,7 @@ cds: ::: tip Important Note It is crucial to **deactivate** the default outboxes, and ensure **unique outbox namespaces** in order to achieve proper isolation between services in a shared DB scenario. ::: - + ## Outboxing CAP Service Events @@ -458,23 +458,15 @@ public class DeadOutboxMessagesHandler implements EventHandler { } private Optional createOutboxFilters(CdsRuntime runtime) { - List outboxServices = runtime.getServiceCatalog().getServices(OutboxService.class) - .filter(s -> !s.getName().equals(OutboxService.INMEMORY_NAME)).toList(); CdsProperties.Outbox outboxConfigs = runtime.getEnvironment().getCdsProperties().getOutbox(); - Predicate where = null; - for(OutboxService service : outboxServices) { - OutboxServiceConfig config = outboxConfigs.getService(service.getName()); - Predicate targetPredicate = CQL.get(Messages.TARGET).eq(service.getName()).and(CQL.get(Messages.ATTEMPTS).ge(config.getMaxAttempts())); - - if (where == null) { - where = targetPredicate; - } else { - where = where.or(targetPredicate); - } - } - - return Optional.ofNullable(where); + return runtime.getServiceCatalog().getServices(OutboxService.class) + .map(service -> { + OutboxServiceConfig config = outboxConfigs.getService(service.getName()); + return CQL.get(Messages.TARGET).eq(service.getName()) + .and(CQL.get(Messages.ATTEMPTS).ge(config.getMaxAttempts())); + }) + .reduce(Predicate::or); } } ``` From f6b9211bcb75d72390f0a54c7ba08b443e343b0a Mon Sep 17 00:00:00 2001 From: Mahati Shankar <93712176+smahati@users.noreply.github.com> Date: Thu, 22 Jan 2026 22:56:08 +0100 Subject: [PATCH 7/7] cosmetics --- java/outbox.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/outbox.md b/java/outbox.md index 4c41b4ad45..b2c84c51c6 100644 --- a/java/outbox.md +++ b/java/outbox.md @@ -426,7 +426,7 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for ### Reading Dead Entries -Filtering the dead entries is done by adding an appropriate `where`-clause to all `READ`-queries which covers all outbox message entries with maximum number of retries. The following code provides an example handler implementation defining this behaviour for the `DeadLetterQueueService`: +Filtering the dead entries is done by adding an appropriate `where`-clause to all `READ`-queries which matches all outbox message entries that have been retried for the maximum number of times. The following code provides an example handler implementation defining this behavior for the `DeadLetterQueueService`: ```java @Component @@ -509,7 +509,7 @@ The injected `PersistenceService` instance is used to perform the operations on [Learn more about CQL statement inspection.](./working-with-cql/query-introspection#cqnanalyzer){.learn-more} ::: tip Use paging logic -Avoid to read all outbox entries at once as a single entry can have significant size reflecting the request's payload. Prefer `READ`-queries with paging instead. +Avoid reading all outbox entries at once in case entries which have large request payloads are present. Prefer `READ`-queries with paging instead. ::: ## Observability using Open Telemetry