diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java index 3d2f7399a..4794be082 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleConnector.java @@ -112,7 +112,7 @@ protected DBConnectorPath getDBConnectorPath(String path) { @Override protected SchemaReader getSchemaReader(String sessionID) { - return new OracleSourceSchemaReader(sessionID); + return new OracleSourceSchemaReader(sessionID, false); } @Override diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java index 53f75613b..4cf014d6b 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSource.java @@ -63,7 +63,7 @@ protected String createConnectionString() { @Override protected SchemaReader getSchemaReader() { - return new OracleSourceSchemaReader(); + return new OracleSourceSchemaReader(null, oracleSourceConfig.shouldTreatAsOldTimestamp()); } @Override @@ -101,6 +101,7 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig { public static final String NAME_CONNECTION = "connection"; public static final String DEFAULT_ROW_PREFETCH_VALUE = "40"; public static final String DEFAULT_BATCH_SIZE = "10"; + public static final String TREAT_AS_OLD_TIMESTAMP = "treatAsOldTimestamp"; @Name(NAME_USE_CONNECTION) @Nullable @@ -123,11 +124,19 @@ public static class OracleSourceConfig extends AbstractDBSpecificSourceConfig { @Nullable private Integer defaultRowPrefetch; + @Name(TREAT_AS_OLD_TIMESTAMP) + @Description("For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain" + + "backward compatibility.") + @Macro + @Nullable + private Boolean treatAsOldTimestamp = false; + + public OracleSourceConfig(String host, int port, String user, String password, String jdbcPluginName, String connectionArguments, String connectionType, String database, String role, int defaultBatchValue, int defaultRowPrefetch, String importQuery, Integer numSplits, int fetchSize, - String boundingQuery, String splitBy, Boolean useSSL) { + String boundingQuery, String splitBy, Boolean useSSL, Boolean treatAsOldTimestamp) { this.connection = new OracleConnectorConfig(host, port, user, password, jdbcPluginName, connectionArguments, connectionType, database, role, useSSL); this.defaultBatchValue = defaultBatchValue; @@ -137,6 +146,7 @@ public OracleSourceConfig(String host, int port, String user, String password, S this.numSplits = numSplits; this.boundingQuery = boundingQuery; this.splitBy = splitBy; + this.treatAsOldTimestamp = treatAsOldTimestamp; } @Override @@ -163,6 +173,10 @@ public OracleConnectorConfig getConnection() { return connection; } + public boolean shouldTreatAsOldTimestamp() { + return Boolean.TRUE.equals(treatAsOldTimestamp); + } + @Override public void validate(FailureCollector collector) { ConfigUtil.validateConnection(this, useConnection, connection, collector); diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java index 7d35f9bc7..ad48c6462 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java @@ -66,13 +66,16 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { private final String sessionID; + private final boolean treatAsOldTimestamp; + public OracleSourceSchemaReader() { - this(null); + this(null, false); } - public OracleSourceSchemaReader(String sessionID) { + public OracleSourceSchemaReader(String sessionID, boolean treatAsOldTimestamp) { super(); this.sessionID = sessionID; + this.treatAsOldTimestamp = treatAsOldTimestamp; } @Override @@ -83,8 +86,17 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti case TIMESTAMP_TZ: return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); case Types.TIMESTAMP: - case TIMESTAMP_LTZ: return Schema.of(Schema.LogicalType.DATETIME); + case TIMESTAMP_LTZ: + // TIMESTAMP_LTZ (Local timezone timestamp) + // - Legacy behavior used TIMESTAMP_MICROS + // - New behavior uses DATETIME for accurate semantic representation + // Use treatAsOldTimestamp flag to ensure backward compatibility + if (treatAsOldTimestamp) { + return Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); + } else { + return Schema.of(Schema.LogicalType.DATETIME); + } case BINARY_FLOAT: return Schema.of(Schema.Type.FLOAT); case BINARY_DOUBLE: diff --git a/oracle-plugin/widgets/Oracle-batchsource.json b/oracle-plugin/widgets/Oracle-batchsource.json index 5eca20cc4..16722f1b2 100644 --- a/oracle-plugin/widgets/Oracle-batchsource.json +++ b/oracle-plugin/widgets/Oracle-batchsource.json @@ -246,6 +246,15 @@ "default": "40", "min": "1" } + }, + { + "widget-type": "hidden", + "label": "Treat As Old Timestamp", + "name": "treatAsOldTimestamp", + "description": "For internal use only. If set to true, DATETIME types will be treated as TIMESTAMP_MICROS to maintain backward compatibility.", + "widget-attributes": { + "default": "false" + } } ] }