diff --git a/java/outbox.md b/java/outbox.md index 13e8af955..b2c84c51c 100644 --- a/java/outbox.md +++ b/java/outbox.md @@ -49,7 +49,7 @@ To enable the persistence for the outbox, you need to add the service `outbox` o } ``` -::: warning +::: warning Be aware that you need to migrate the database schemas of all tenants after you've enhanced your model with an outbox version from `@sap/cds` version 6.0.0 or later. ::: @@ -173,7 +173,7 @@ To avoid this, you can apply a manual workaround as follows: 1. Customize the outbox configuration and isolating them via distinct namespaces for each service. 2. Adapt the Audit Log outbox configuration. 3. Adapt the messaging outbox configuration per service. - + These steps are described in the following sections. #### Deactivate Default Outboxes @@ -228,7 +228,7 @@ cds: ::: tip Important Note It is crucial to **deactivate** the default outboxes, and ensure **unique outbox namespaces** in order to achieve proper isolation between services in a shared DB scenario. ::: - + ## Outboxing CAP Service Events @@ -424,29 +424,50 @@ It is crucial to make the service `OutboxDeadLetterQueueService` accessible for ::: -### Filter for Dead Entries - -This filtering can't be done on the database since the maximum number of attempts is only available from the CDS properties. +### Reading Dead Entries -To ensure that only dead outbox entries are returned when reading `DeadOutboxMessages`, the following code provides the handler for the `DeadLetterQueueService` and the `@After-READ` handler that filters for the dead outbox entries: +Filtering the dead entries is done by adding an appropriate `where`-clause to all `READ`-queries which matches all outbox message entries that have been retried for the maximum number of times. The following code provides an example handler implementation defining this behavior for the `DeadLetterQueueService`: ```java @Component @ServiceName(OutboxDeadLetterQueueService_.CDS_NAME) public class DeadOutboxMessagesHandler implements EventHandler { - @After(entity = DeadOutboxMessages_.CDS_NAME) - public void filterDeadEntries(CdsReadEventContext context) { - CdsProperties.Outbox outboxConfigs = context.getCdsRuntime().getEnvironment().getCdsProperties().getOutbox(); - List deadEntries = context - .getResult() - .listOf(DeadOutboxMessages.class) - .stream() - .filter(entry -> entry.getAttempts() >= outboxConfigs.getService(entry.getTarget()).getMaxAttempts()) - .toList(); - - context.setResult(deadEntries); - } + private final PersistenceService db; + + public DeadOutboxMessagesHandler(@Qualifier(PersistenceService.DEFAULT_NAME) PersistenceService db) { + this.db = db; + } + + @Before(entity = DeadOutboxMessages_.CDS_NAME) + public void addDeadEntryFilter(CdsReadEventContext context) { + Optional outboxFilters = this.createOutboxFilters(context.getCdsRuntime()); + + if (outboxFilters.isPresent()) { + CqnSelect modifiedCqn = + copy( + context.getCqn(), + new Modifier() { + @Override + public CqnPredicate where(Predicate where) { + return outboxFilters.get().and(where); + } + }); + context.setCqn(modifiedCqn); + } + } + + private Optional createOutboxFilters(CdsRuntime runtime) { + CdsProperties.Outbox outboxConfigs = runtime.getEnvironment().getCdsProperties().getOutbox(); + + return runtime.getServiceCatalog().getServices(OutboxService.class) + .map(service -> { + OutboxServiceConfig config = outboxConfigs.getService(service.getName()); + return CQL.get(Messages.TARGET).eq(service.getName()) + .and(CQL.get(Messages.ATTEMPTS).ge(config.getMaxAttempts())); + }) + .reduce(Predicate::or); + } } ``` @@ -488,8 +509,7 @@ The injected `PersistenceService` instance is used to perform the operations on [Learn more about CQL statement inspection.](./working-with-cql/query-introspection#cqnanalyzer){.learn-more} ::: tip Use paging logic -Avoid to read all entries of the `cds.outbox.Messages` or `OutboxDeadLetterQueueService.DeadOutboxMessages` table at once, as the size of an entry is unpredictable -and depends on the size of the payload. Prefer paging logic instead. +Avoid reading all outbox entries at once in case entries which have large request payloads are present. Prefer `READ`-queries with paging instead. ::: ## Observability using Open Telemetry