From 717070c703a5921bb7edfacff4e1ef39d616b1cf Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 27 May 2025 15:30:44 +0530 Subject: [PATCH 01/10] Added oldTimeStamp field --- .../io/cdap/plugin/oracle/OracleSource.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 53f75613b..d28b4c5b3 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -101,6 +101,7 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig { public static final String NAME_CONNECTION = "connection"; public static final String DEFAULT_ROW_PREFETCH_VALUE = "40"; public static final String DEFAULT_BATCH_SIZE = "10"; + public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp"; @Name(NAME_USE_CONNECTION) @Nullable @@ -123,6 +124,14 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig { @Nullable private Integer defaultRowPrefetch; + @Name("treatAsOldTimestamp") + @Description("For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain backward compatibility.") + @Macro + @Nullable + @MetadataProperty(key = "hidden", value = "true") + private boolean treatAsOldTimestamp; + + public OracleSourceConfig(String host, int port, String user, String password, String jdbcPluginName, String connectionArguments, String connectionType, String database, String role, int defaultBatchValue, int defaultRowPrefetch, @@ -163,6 +172,10 @@ public OracleConnectorConfig getConnection() { return connection; } + public boolean shouldTreatAsOldTimestamp() { + return treatAsOldTimestamp; + } + @Override public void validate(FailureCollector collector) { ConfigUtil.validateConnection(this, useConnection, connection, collector); @@ -177,6 +190,17 @@ public String getTransactionIsolationLevel() { @Override protected void validateField(FailureCollector collector, Schema.Field field, Schema actualFieldSchema, Schema expectedFieldSchema) { + // For handling backward compatibility with pipelines built prior to plugin version change. + // If the config flag 'treatAsOldTimestamp' is enabled, allow DATETIME fields (actual schema) + // to be treated as TIMESTAMP_MICROS (expected schema). This preserves legacy behavior where + // non-UTC timestamp fields were interpreted as TIMESTAMP_MICROS. + if (shouldTreatAsOldTimestamp()) { + if (Schema.LogicalType.DATETIME.equals(actualFieldSchema.getLogicalType()) + && Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType())) { + return; + } + } + // This change is needed to make sure that the pipeline upgrade continues to work post upgrade. // Since the older handling of the precision less used to convert to the decimal type, // and the new version would try to convert to the String type. In that case the output schema would From 59f1c1b62ca4c6072a0838ecb4192ca1fc5f8569 Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 27 May 2025 16:11:29 +0530 Subject: [PATCH 02/10] Made default treatAsOldTimestamp false --- .../src/main/java/io/cdap/plugin/oracle/OracleSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index d28b4c5b3..333b40b1c 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -124,12 +124,12 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig { @Nullable private Integer defaultRowPrefetch; - @Name("treatAsOldTimestamp") + @Name(TREAT_AS_OLD_TIMESTAMP) @Description("For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain backward compatibility.") @Macro @Nullable @MetadataProperty(key = "hidden", value = "true") - private boolean treatAsOldTimestamp; + private boolean treatAsOldTimestamp = false; public OracleSourceConfig(String host, int port, String user, String password, String jdbcPluginName, From fa0fe82b0ba168f08b5017f476c0d73634975366 Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 27 May 2025 18:00:43 +0530 Subject: [PATCH 03/10] Added treatAsOldTimeStamp in widgets --- oracle-plugin/widgets/Oracle-batchsource.json | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/oracle-plugin/widgets/Oracle-batchsource.json b/oracle-plugin/widgets/Oracle-batchsource.json index 5eca20cc4..16722f1b2 100644 --- a/oracle-plugin/widgets/Oracle-batchsource.json +++ b/oracle-plugin/widgets/Oracle-batchsource.json @@ -246,6 +246,15 @@ "default": "40", "min": "1" } + }, + { + "widget-type": "hidden", + "label": "Treat As Old Timestamp", + "name": "treatAsOldTimestamp", + "description": "For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain backward compatibility.", + "widget-attributes": { + "default": "false" + } } ] } From 757daf5161ffd3dc24cbf006224cfb40ca23ef32 Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 27 May 2025 19:10:56 +0530 Subject: [PATCH 04/10] Added treatAsOldTimestamp in OracleSourceSchemaReader --- .../java/io/cdap/plugin/oracle/OracleConnector.java | 2 +- .../java/io/cdap/plugin/oracle/OracleSource.java | 4 ++-- .../plugin/oracle/OracleSourceSchemaReader.java | 13 ++++++++++--- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java index 3d2f7399a..4794be082 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java @@ -112,7 +112,7 @@ protected DBConnectorPath getDBConnectorPath(String path) { @Override protected SchemaReader getSchemaReader(String sessionID) { - return new OracleSourceSchemaReader(sessionID); + return new OracleSourceSchemaReader(sessionID, false); } @Override diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 333b40b1c..c21ff10cf 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -125,10 +125,10 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig { private Integer defaultRowPrefetch; @Name(TREAT_AS_OLD_TIMESTAMP) - @Description("For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain backward compatibility.") + @Description("For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain" + + "backward compatibility.") @Macro @Nullable - @MetadataProperty(key = "hidden", value = "true") private boolean treatAsOldTimestamp = false; diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index 7d35f9bc7..aabad6fdc 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -66,13 +66,16 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { private final String sessionID; + private final boolean treatAsOldTimestamp; + public OracleSourceSchemaReader() { - this(null); + this(null, false); } - public OracleSourceSchemaReader(String sessionID) { + public OracleSourceSchemaReader(String sessionID, boolean treatAsOldTimestamp) { super(); this.sessionID = sessionID; + this.treatAsOldTimestamp = treatAsOldTimestamp; } @Override @@ -84,7 +87,11 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); case Types.TIMESTAMP: case TIMESTAMP_LTZ: - return Schema.of(Schema.LogicalType.DATETIME); + if (treatAsOldTimestamp) { + return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + } else { + return Schema.of(Schema.LogicalType.DATETIME); + } case BINARY_FLOAT: return Schema.of(Schema.Type.FLOAT); case BINARY_DOUBLE: From 4c94f9b49b43b534ae339892954921f2de470a43 Mon Sep 17 00:00:00 2001 From: Krish Date: Tue, 27 May 2025 19:23:28 +0530 Subject: [PATCH 05/10] Removed extra validation --- .../main/java/io/cdap/plugin/oracle/OracleSource.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index c21ff10cf..b2150206e 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -190,17 +190,6 @@ public String getTransactionIsolationLevel() { @Override protected void validateField(FailureCollector collector, Schema.Field field, Schema actualFieldSchema, Schema expectedFieldSchema) { - // For handling backward compatibility with pipelines built prior to plugin version change. - // If the config flag 'treatAsOldTimestamp' is enabled, allow DATETIME fields (actual schema) - // to be treated as TIMESTAMP_MICROS (expected schema). This preserves legacy behavior where - // non-UTC timestamp fields were interpreted as TIMESTAMP_MICROS. - if (shouldTreatAsOldTimestamp()) { - if (Schema.LogicalType.DATETIME.equals(actualFieldSchema.getLogicalType()) - && Schema.LogicalType.TIMESTAMP_MICROS.equals(expectedFieldSchema.getLogicalType())) { - return; - } - } - // This change is needed to make sure that the pipeline upgrade continues to work post upgrade. // Since the older handling of the precision less used to convert to the decimal type, // and the new version would try to convert to the String type. In that case the output schema would From 6d2795917720c17fa32b1fa2fbce8ee331c6596e Mon Sep 17 00:00:00 2001 From: Krish Date: Wed, 28 May 2025 13:27:23 +0530 Subject: [PATCH 06/10] Updated source constructor --- .../src/main/java/io/cdap/plugin/oracle/OracleSource.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index b2150206e..7e3b7de03 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -136,9 +136,9 @@ public OracleSourceConfig(String host, int port, String user, String password, S String connectionArguments, String connectionType, String database, String role, int defaultBatchValue, int defaultRowPrefetch, String importQuery, Integer numSplits, int fetchSize, - String boundingQuery, String splitBy, Boolean useSSL) { + String boundingQuery, String splitBy, Boolean useSSL, boolean treatAsOldTimestamp) { this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments, - connectionType, database, role, useSSL); + connectionType, database, role, useSSL, treatAsOldTimestamp); this.defaultBatchValue = defaultBatchValue; this.defaultRowPrefetch = defaultRowPrefetch; this.fetchSize = fetchSize; @@ -146,6 +146,7 @@ public OracleSourceConfig(String host, int port, String user, String password, S this.numSplits = numSplits; this.boundingQuery = boundingQuery; this.splitBy = splitBy; + this.treatAsOldTimestamp = treatAsOldTimestamp; } @Override From 444d0fa12c32b87d39229f483f0394da28af380c Mon Sep 17 00:00:00 2001 From: Krish Date: Wed, 28 May 2025 13:50:44 +0530 Subject: [PATCH 07/10] Removed treatAsOldTimeStamp from OracleConnectorConfig --- .../src/main/java/io/cdap/plugin/oracle/OracleSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 7e3b7de03..4515cc238 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -138,7 +138,7 @@ public OracleSourceConfig(String host, int port, String user, String password, S String importQuery, Integer numSplits, int fetchSize, String boundingQuery, String splitBy, Boolean useSSL, boolean treatAsOldTimestamp) { this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments, - connectionType, database, role, useSSL, treatAsOldTimestamp); + connectionType, database, role, useSSL); this.defaultBatchValue = defaultBatchValue; this.defaultRowPrefetch = defaultRowPrefetch; this.fetchSize = fetchSize; From 209683c920b29dc7ef2652af59bd242ef81fda3f Mon Sep 17 00:00:00 2001 From: Krish Date: Thu, 29 May 2025 12:27:05 +0530 Subject: [PATCH 08/10] Changed primitive boolean to Boolean --- .../java/io/cdap/plugin/oracle/OracleSource.java | 14 +++++++++----- .../plugin/oracle/OracleSourceSchemaReader.java | 3 ++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 4515cc238..d0cff4f23 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -63,7 +63,7 @@ protected String createConnectionString() { @Override protected SchemaReader getSchemaReader() { - return new OracleSourceSchemaReader(); + return new OracleSourceSchemaReader(null, oracleSourceConfig.shouldTreatAsOldTimestamp()); } @Override @@ -129,14 +129,14 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig { + "backward compatibility.") @Macro @Nullable - private boolean treatAsOldTimestamp = false; + private Boolean treatAsOldTimestamp = false; public OracleSourceConfig(String host, int port, String user, String password, String jdbcPluginName, String connectionArguments, String connectionType, String database, String role, int defaultBatchValue, int defaultRowPrefetch, String importQuery, Integer numSplits, int fetchSize, - String boundingQuery, String splitBy, Boolean useSSL, boolean treatAsOldTimestamp) { + String boundingQuery, String splitBy, Boolean useSSL, Boolean treatAsOldTimestamp) { this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, role, useSSL); this.defaultBatchValue = defaultBatchValue; @@ -173,8 +173,12 @@ public OracleConnectorConfig getConnection() { return connection; } - public boolean shouldTreatAsOldTimestamp() { - return treatAsOldTimestamp; + public Boolean shouldTreatAsOldTimestamp() { + if (treatAsOldTimestamp == null) { + return false; + } else { + return treatAsOldTimestamp; + } } @Override diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index aabad6fdc..6da370c26 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -72,7 +72,7 @@ public OracleSourceSchemaReader() { this(null, false); } - public OracleSourceSchemaReader(String sessionID, boolean treatAsOldTimestamp) { + public OracleSourceSchemaReader(String sessionID, Boolean treatAsOldTimestamp) { super(); this.sessionID = sessionID; this.treatAsOldTimestamp = treatAsOldTimestamp; @@ -86,6 +86,7 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti case TIMESTAMP_TZ: return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); case Types.TIMESTAMP: + return Schema.of(Schema.LogicalType.DATETIME); case TIMESTAMP_LTZ: if (treatAsOldTimestamp) { return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); From 681c7f4c94fbd4aefd7e73b2388c89213848ac90 Mon Sep 17 00:00:00 2001 From: Krish Date: Thu, 29 May 2025 12:33:05 +0530 Subject: [PATCH 09/10] Fixed Boolean return logic --- .../src/main/java/io/cdap/plugin/oracle/OracleSource.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index d0cff4f23..1e4b158ae 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -174,11 +174,7 @@ public OracleConnectorConfig getConnection() { } public Boolean shouldTreatAsOldTimestamp() { - if (treatAsOldTimestamp == null) { - return false; - } else { - return treatAsOldTimestamp; - } + return Boolean.TRUE.equals(treatAsOldTimestamp); } @Override From f70a62bb9d5978577fc6bd43ef51f51f74f7bb61 Mon Sep 17 00:00:00 2001 From: Krish Date: Fri, 30 May 2025 10:56:29 +0530 Subject: [PATCH 10/10] Changed datatype and added comments --- .../src/main/java/io/cdap/plugin/oracle/OracleSource.java | 2 +- .../io/cdap/plugin/oracle/OracleSourceSchemaReader.java | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 1e4b158ae..4cf014d6b 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -173,7 +173,7 @@ public OracleConnectorConfig getConnection() { return connection; } - public Boolean shouldTreatAsOldTimestamp() { + public boolean shouldTreatAsOldTimestamp() { return Boolean.TRUE.equals(treatAsOldTimestamp); } diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index 6da370c26..ad48c6462 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -72,7 +72,7 @@ public OracleSourceSchemaReader() { this(null, false); } - public OracleSourceSchemaReader(String sessionID, Boolean treatAsOldTimestamp) { + public OracleSourceSchemaReader(String sessionID, boolean treatAsOldTimestamp) { super(); this.sessionID = sessionID; this.treatAsOldTimestamp = treatAsOldTimestamp; @@ -88,6 +88,10 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti case Types.TIMESTAMP: return Schema.of(Schema.LogicalType.DATETIME); case TIMESTAMP_LTZ: + // TIMESTAMP_LTZ (Local timezone timestamp) + // - Legacy behavior used TIMESTAMP_MICROS + // - New behavior uses DATETIME for accurate semantic representation + // Use treatAsOldTimestamp flag to ensure backward compatibility if (treatAsOldTimestamp) { return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); } else {