From 130ba683391fbe5ff26474f74c871bc30f942771 Mon Sep 17 00:00:00 2001 From: Rob Rudin Date: Tue, 10 Feb 2026 11:06:00 -0500 Subject: [PATCH] MLE-26918 Added fromView support for incremental write The plan is to dump eval support and just offer fromLexicons and fromView, but need to test out fromView a bit first. Did some refactoring too because the constructors had gotten so ugly - there's now an IncrementalWriteConfig class that holds all the inputs from the Builder, so that filter constructors only need that as an arg. --- .../filter/IncrementalWriteConfig.java | 79 ++++++++++++++++++ .../filter/IncrementalWriteEvalFilter.java | 10 +-- .../filter/IncrementalWriteFilter.java | 80 ++++++++++++------- .../filter/IncrementalWriteOpticFilter.java | 8 +- .../filter/IncrementalWriteViewFilter.java | 57 +++++++++++++ .../filter/IncrementalWriteTest.java | 51 ++++++++++++ .../ml-schemas/tde/incrementalWriteHash.json | 25 ++++++ 7 files changed, 271 insertions(+), 39 deletions(-) create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteConfig.java create mode 100644 marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteViewFilter.java create mode 100644 test-app/src/main/ml-schemas/tde/incrementalWriteHash.json diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteConfig.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteConfig.java new file mode 100644 index 000000000..d350f71f1 --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteConfig.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.marklogic.client.document.DocumentWriteOperation; + +import java.util.Collections; +import java.util.Map; +import java.util.function.Consumer; + +/** + * Configuration for incremental write filtering. + * + * @since 8.1.0 + */ +public class IncrementalWriteConfig { + + private final String hashKeyName; + private final String timestampKeyName; + private final boolean canonicalizeJson; + private final Consumer skippedDocumentsConsumer; + private final String[] jsonExclusions; + private final String[] xmlExclusions; + private final Map xmlNamespaces; + private final String schemaName; + private final String viewName; + + public IncrementalWriteConfig(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, + Consumer skippedDocumentsConsumer, + String[] jsonExclusions, String[] xmlExclusions, Map xmlNamespaces, + String schemaName, String viewName) { + this.hashKeyName = hashKeyName; + this.timestampKeyName = timestampKeyName; + this.canonicalizeJson = canonicalizeJson; + this.skippedDocumentsConsumer = skippedDocumentsConsumer; + this.jsonExclusions = jsonExclusions; + this.xmlExclusions = xmlExclusions; + this.xmlNamespaces = xmlNamespaces != null ? Collections.unmodifiableMap(xmlNamespaces) : null; + this.schemaName = schemaName; + this.viewName = viewName; + } + + public String getHashKeyName() { + return hashKeyName; + } + + public String getTimestampKeyName() { + return timestampKeyName; + } + + public boolean isCanonicalizeJson() { + return canonicalizeJson; + } + + public Consumer getSkippedDocumentsConsumer() { + return skippedDocumentsConsumer; + } + + public String[] getJsonExclusions() { + return jsonExclusions; + } + + public String[] getXmlExclusions() { + return xmlExclusions; + } + + public Map getXmlNamespaces() { + return xmlNamespaces != null ? xmlNamespaces : Collections.emptyMap(); + } + + public String getSchemaName() { + return schemaName; + } + + public String getViewName() { + return viewName; + } +} diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java index fc0546798..838087203 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteEvalFilter.java @@ -12,9 +12,6 @@ import com.marklogic.client.document.DocumentWriteSet; import com.marklogic.client.io.JacksonHandle; -import java.util.Map; -import java.util.function.Consumer; - /** * Uses server-side JavaScript code to get the existing hash values for a set of URIs. * @@ -31,9 +28,8 @@ class IncrementalWriteEvalFilter extends IncrementalWriteFilter { response """; - IncrementalWriteEvalFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, - Consumer skippedDocumentsConsumer, String[] jsonExclusions, String[] xmlExclusions, Map xmlNamespaces) { - super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces); + IncrementalWriteEvalFilter(IncrementalWriteConfig config) { + super(config); } @Override @@ -47,7 +43,7 @@ public DocumentWriteSet apply(DocumentWriteSetFilter.Context context) { try { JsonNode response = context.getDatabaseClient().newServerEval().javascript(EVAL_SCRIPT) - .addVariable("hashKeyName", hashKeyName) + .addVariable("hashKeyName", getConfig().getHashKeyName()) .addVariable("uris", new JacksonHandle(uris)) .evalAs(JsonNode.class); diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java index 730910c0b..61df914a9 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteFilter.java @@ -53,6 +53,8 @@ public static class Builder { private String[] jsonExclusions; private String[] xmlExclusions; private Map xmlNamespaces; + private String schemaName; + private String viewName; /** * @param keyName the name of the MarkLogic metadata key that will hold the hash value; defaults to "incrementalWriteHash". @@ -128,13 +130,43 @@ public Builder xmlNamespaces(Map namespaces) { return this; } + /** + * Configures the filter to use a TDE view for retrieving hash values instead of field range indexes. + * This approach requires a TDE template to be deployed that extracts the URI and hash metadata. + * + * @param schemaName the schema name of the TDE view + * @param viewName the view name of the TDE view + * @return this builder + */ + public Builder fromView(String schemaName, String viewName) { + boolean schemaEmpty = schemaName == null || schemaName.trim().isEmpty(); + boolean viewEmpty = viewName == null || viewName.trim().isEmpty(); + + if (schemaEmpty && !viewEmpty) { + throw new IllegalArgumentException("Schema name cannot be null or empty when view name is provided"); + } + if (!schemaEmpty && viewEmpty) { + throw new IllegalArgumentException("View name cannot be null or empty when schema name is provided"); + } + + this.schemaName = schemaName; + this.viewName = viewName; + return this; + } + public IncrementalWriteFilter build() { validateJsonExclusions(); validateXmlExclusions(); + IncrementalWriteConfig config = new IncrementalWriteConfig(hashKeyName, timestampKeyName, canonicalizeJson, + skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces, schemaName, viewName); + + if (schemaName != null && viewName != null) { + return new IncrementalWriteViewFilter(config); + } if (useEvalQuery) { - return new IncrementalWriteEvalFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces); + return new IncrementalWriteEvalFilter(config); } - return new IncrementalWriteOpticFilter(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces); + return new IncrementalWriteOpticFilter(config); } private void validateJsonExclusions() { @@ -181,26 +213,18 @@ private void validateXmlExclusions() { } } - protected final String hashKeyName; - private final String timestampKeyName; - private final boolean canonicalizeJson; - private final Consumer skippedDocumentsConsumer; - private final String[] jsonExclusions; - private final String[] xmlExclusions; - private final Map xmlNamespaces; + private final IncrementalWriteConfig config; // Hardcoding this for now, with a good general purpose hashing function. // See https://xxhash.com for benchmarks. private final LongHashFunction hashFunction = LongHashFunction.xx3(); - public IncrementalWriteFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, Consumer skippedDocumentsConsumer, String[] jsonExclusions, String[] xmlExclusions, Map xmlNamespaces) { - this.hashKeyName = hashKeyName; - this.timestampKeyName = timestampKeyName; - this.canonicalizeJson = canonicalizeJson; - this.skippedDocumentsConsumer = skippedDocumentsConsumer; - this.jsonExclusions = jsonExclusions; - this.xmlExclusions = xmlExclusions; - this.xmlNamespaces = xmlNamespaces; + public IncrementalWriteFilter(IncrementalWriteConfig config) { + this.config = config; + } + + public IncrementalWriteConfig getConfig() { + return config; } protected final DocumentWriteSet filterDocuments(Context context, Function hashRetriever) { @@ -230,19 +254,19 @@ protected final DocumentWriteSet filterDocuments(Context context, Function 0) { - content = ContentExclusionUtil.applyJsonExclusions(doc.getUri(), content, jsonExclusions); + if (config.getJsonExclusions() != null && config.getJsonExclusions().length > 0) { + content = ContentExclusionUtil.applyJsonExclusions(doc.getUri(), content, config.getJsonExclusions()); } jc = new JsonCanonicalizer(content); return jc.getEncodedString(); @@ -274,9 +298,9 @@ private String serializeContent(DocumentWriteOperation doc) { logger.warn("Unable to canonicalize JSON content for URI {}, using original content for hashing; cause: {}", doc.getUri(), e.getMessage()); } - } else if (xmlExclusions != null && xmlExclusions.length > 0) { + } else if (config.getXmlExclusions() != null && config.getXmlExclusions().length > 0) { try { - content = ContentExclusionUtil.applyXmlExclusions(doc.getUri(), content, xmlNamespaces, xmlExclusions); + content = ContentExclusionUtil.applyXmlExclusions(doc.getUri(), content, config.getXmlNamespaces(), config.getXmlExclusions()); } catch (Exception e) { logger.warn("Unable to apply XML exclusions for URI {}, using original content for hashing; cause: {}", doc.getUri(), e.getMessage()); @@ -316,4 +340,6 @@ protected static DocumentWriteOperation addHashToMetadata(DocumentWriteOperation return new DocumentWriteOperationImpl(op.getUri(), newMetadata, op.getContent(), op.getTemporalDocumentURI()); } + + } diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java index a52d21ad3..b7fed3099 100644 --- a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteOpticFilter.java @@ -10,7 +10,6 @@ import java.util.HashMap; import java.util.Map; -import java.util.function.Consumer; /** * Uses an Optic query to get the existing hash values for a set of URIs. @@ -19,9 +18,8 @@ */ class IncrementalWriteOpticFilter extends IncrementalWriteFilter { - IncrementalWriteOpticFilter(String hashKeyName, String timestampKeyName, boolean canonicalizeJson, - Consumer skippedDocumentsConsumer, String[] jsonExclusions, String[] xmlExclusions, Map xmlNamespaces) { - super(hashKeyName, timestampKeyName, canonicalizeJson, skippedDocumentsConsumer, jsonExclusions, xmlExclusions, xmlNamespaces); + IncrementalWriteOpticFilter(IncrementalWriteConfig config) { + super(config); } @Override @@ -39,7 +37,7 @@ public DocumentWriteSet apply(Context context) { Map existingHashes = rowTemplate.query(op -> op.fromLexicons(Map.of( "uri", op.cts.uriReference(), - "hash", op.cts.fieldReference(super.hashKeyName) + "hash", op.cts.fieldReference(getConfig().getHashKeyName()) )).where( op.cts.documentQuery(op.xs.stringSeq(uris)) ), diff --git a/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteViewFilter.java b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteViewFilter.java new file mode 100644 index 000000000..02dc75439 --- /dev/null +++ b/marklogic-client-api/src/main/java/com/marklogic/client/datamovement/filter/IncrementalWriteViewFilter.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved. + */ +package com.marklogic.client.datamovement.filter; + +import com.marklogic.client.FailedRequestException; +import com.marklogic.client.document.DocumentWriteOperation; +import com.marklogic.client.document.DocumentWriteSet; +import com.marklogic.client.row.RowTemplate; + +import java.util.HashMap; +import java.util.Map; + +/** + * Uses an Optic query with fromView to get the existing hash values for a set of URIs from a TDE view. + * This implementation requires a TDE template to be deployed that extracts the URI and hash metadata. + * + * @since 8.1.0 + */ +class IncrementalWriteViewFilter extends IncrementalWriteFilter { + + IncrementalWriteViewFilter(IncrementalWriteConfig config) { + super(config); + } + + @Override + public DocumentWriteSet apply(Context context) { + final String[] uris = context.getDocumentWriteSet().stream() + .filter(op -> DocumentWriteOperation.OperationType.DOCUMENT_WRITE.equals(op.getOperationType())) + .map(DocumentWriteOperation::getUri) + .toArray(String[]::new); + + RowTemplate rowTemplate = new RowTemplate(context.getDatabaseClient()); + + try { + Map existingHashes = rowTemplate.query(op -> + op.fromView(getConfig().getSchemaName(), getConfig().getViewName()) + .where(op.in(op.col("uri"), op.xs.stringSeq(uris))), + + rows -> { + Map map = new HashMap<>(); + rows.forEach(row -> { + String uri = row.getString("uri"); + String existingHash = row.getString("hash"); + map.put(uri, existingHash); + }); + return map; + } + ); + + return filterDocuments(context, uri -> existingHashes.get(uri)); + } catch (FailedRequestException e) { + String message = "Unable to query for existing incremental write hashes from view " + getConfig().getSchemaName() + "." + getConfig().getViewName() + "; cause: " + e.getMessage(); + throw new FailedRequestException(message, e.getFailedRequest()); + } + } +} diff --git a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java index 0ac60b97a..0d0817ed3 100644 --- a/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java +++ b/marklogic-client-api/src/test/java/com/marklogic/client/datamovement/filter/IncrementalWriteTest.java @@ -228,6 +228,57 @@ void binaryDocument() { "expected. Exclusions cannot be specified for them."); } + @Test + void fromView() { + filter = IncrementalWriteFilter.newBuilder() + .fromView("javaClient", "incrementalWriteHash") + .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) + .build(); + + verifyIncrementalWriteWorks(); + } + + @Test + void emptyValuesForFromView() { + filter = IncrementalWriteFilter.newBuilder() + // Empty/null values are ignored, as long as both schema/view are empty/null. This makes life a little + // easier for a connector in that the connector does not need to check for empty/null values. + .fromView("", null) + .onDocumentsSkipped(docs -> skippedCount.addAndGet(docs.length)) + .build(); + + verifyIncrementalWriteWorks(); + } + + @Test + void invalidSchemaArg() { + IncrementalWriteFilter.Builder builder = IncrementalWriteFilter.newBuilder(); + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> builder.fromView(null, "theView")); + assertEquals("Schema name cannot be null or empty when view name is provided", ex.getMessage()); + } + + @Test + void invalidViewArg() { + IncrementalWriteFilter.Builder builder = IncrementalWriteFilter.newBuilder(); + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, () -> builder.fromView("javaClient", null)); + assertEquals("View name cannot be null or empty when schema name is provided", ex.getMessage()); + } + + @Test + void invalidView() { + filter = IncrementalWriteFilter.newBuilder() + .fromView("javaClient", "this-view-doesnt-exist") + .build(); + + writeTenDocuments(); + + assertNotNull(batchFailure.get()); + String message = batchFailure.get().getMessage(); + assertTrue(message.contains("Unable to query for existing incremental write hashes") && message.contains("SQL-TABLENOTFOUND"), + "When the user tries to use the incremental write feature with an invalid view, " + + "we should fail with a helpful error message. Actual message: " + message); + } + private void verifyIncrementalWriteWorks() { writeTenDocuments(); verifyDocumentsHasHashInMetadataKey(); diff --git a/test-app/src/main/ml-schemas/tde/incrementalWriteHash.json b/test-app/src/main/ml-schemas/tde/incrementalWriteHash.json new file mode 100644 index 000000000..d5044414c --- /dev/null +++ b/test-app/src/main/ml-schemas/tde/incrementalWriteHash.json @@ -0,0 +1,25 @@ +{ + "template": { + "description": "For incremental write that uses op.fromView instead of op.fromLexicons", + "context": "/doc", + "rows": [ + { + "schemaName": "javaClient", + "viewName": "incrementalWriteHash", + "columns": [ + { + "name": "uri", + "scalarType": "string", + "val": "xdmp:node-uri(.)" + }, + { + "name": "hash", + "scalarType": "string", + "val": "xdmp:node-metadata-value(., 'incrementalWriteHash')", + "nullable": true + } + ] + } + ] + } +}