diff --git a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java
index 47e8b0a52..94ea6004b 100644
--- a/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java
+++ b/amazon-redshift-plugin/src/test/java/io/cdap/plugin/amazon/redshift/RedshiftConnectorUnitTest.java
@@ -28,7 +28,9 @@ public class RedshiftConnectorUnitTest {
@Rule
public ExpectedException expectedEx = ExpectedException.none();
- private static final RedshiftConnector CONNECTOR = new RedshiftConnector(null);
+ private static final RedshiftConnector CONNECTOR = new RedshiftConnector(new RedshiftConnectorConfig(
+ "username", "password", "jdbc", "", "localhost",
+ "db", 5432));
/**
* Unit test for getTableName()
diff --git a/amazon-redshift-plugin/widgets/Redshift-batchsource.json b/amazon-redshift-plugin/widgets/Redshift-batchsource.json
index 943e2d24e..586a8993b 100644
--- a/amazon-redshift-plugin/widgets/Redshift-batchsource.json
+++ b/amazon-redshift-plugin/widgets/Redshift-batchsource.json
@@ -156,6 +156,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
@@ -228,7 +259,7 @@
"name": "connection"
}
]
- },
+ }
],
"jump-config": {
"datasets": [
diff --git a/amazon-redshift-plugin/widgets/Redshift-connector.json b/amazon-redshift-plugin/widgets/Redshift-connector.json
index 3a2af8e01..f392e3a78 100644
--- a/amazon-redshift-plugin/widgets/Redshift-connector.json
+++ b/amazon-redshift-plugin/widgets/Redshift-connector.json
@@ -69,6 +69,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": []
diff --git a/aurora-mysql-plugin/widgets/AuroraMysql-action.json b/aurora-mysql-plugin/widgets/AuroraMysql-action.json
index efc5f98ff..bd2bac558 100644
--- a/aurora-mysql-plugin/widgets/AuroraMysql-action.json
+++ b/aurora-mysql-plugin/widgets/AuroraMysql-action.json
@@ -90,6 +90,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json b/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json
index a435e4e4f..6663be7ce 100644
--- a/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json
+++ b/aurora-mysql-plugin/widgets/AuroraMysql-batchsink.json
@@ -116,6 +116,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json b/aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json
index 50b435645..bd2bb88a9 100644
--- a/aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json
+++ b/aurora-mysql-plugin/widgets/AuroraMysql-batchsource.json
@@ -135,6 +135,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/aurora-mysql-plugin/widgets/AuroraMysql-postaction.json b/aurora-mysql-plugin/widgets/AuroraMysql-postaction.json
index cc33cf0a1..64da4f1bc 100644
--- a/aurora-mysql-plugin/widgets/AuroraMysql-postaction.json
+++ b/aurora-mysql-plugin/widgets/AuroraMysql-postaction.json
@@ -105,6 +105,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/aurora-postgresql-plugin/widgets/AuroraPostgres-action.json b/aurora-postgresql-plugin/widgets/AuroraPostgres-action.json
index 1f3bca862..e012f65eb 100644
--- a/aurora-postgresql-plugin/widgets/AuroraPostgres-action.json
+++ b/aurora-postgresql-plugin/widgets/AuroraPostgres-action.json
@@ -79,6 +79,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json b/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json
index 53979d6d4..bfc83bd4e 100644
--- a/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json
+++ b/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsink.json
@@ -121,6 +121,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsource.json b/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsource.json
index 14b00b974..fc2503c67 100644
--- a/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsource.json
+++ b/aurora-postgresql-plugin/widgets/AuroraPostgres-batchsource.json
@@ -124,6 +124,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/aurora-postgresql-plugin/widgets/AuroraPostgres-postaction.json b/aurora-postgresql-plugin/widgets/AuroraPostgres-postaction.json
index 3fdb1a14b..8b328160d 100644
--- a/aurora-postgresql-plugin/widgets/AuroraPostgres-postaction.json
+++ b/aurora-postgresql-plugin/widgets/AuroraPostgres-postaction.json
@@ -94,6 +94,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
]
}
diff --git a/cloudsql-mysql-plugin/pom.xml b/cloudsql-mysql-plugin/pom.xml
index e5f6810e4..312a82ec5 100644
--- a/cloudsql-mysql-plugin/pom.xml
+++ b/cloudsql-mysql-plugin/pom.xml
@@ -26,11 +26,45 @@
CloudSQL MySQL plugin
cloudsql-mysql-plugin
4.0.0
+ CloudSQL MySQL database plugins
+ https://github.com/data-integrations/database-plugins
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
+
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
io.cdap.cdap
cdap-etl-api
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-api
+ ${cdap.version}
provided
@@ -41,11 +75,12 @@
io.cdap.plugin
hydrator-common
+ ${cdap.plugin.version}
io.cdap.plugin
mysql-plugin
- 1.13.0-SNAPSHOT
+ ${project.version}
@@ -59,24 +94,26 @@
io.cdap.cdap
hydrator-test
+ ${cdap.version}
+ test
io.cdap.cdap
cdap-data-pipeline3_2.12
+ ${cdap.version}
test
junit
junit
-
-
- io.cdap.cdap
- cdap-api
- provided
+ ${junit.version}
+ test
org.mockito
mockito-core
+ ${mockito.version}
+ test
org.jetbrains
diff --git a/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties b/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties
index 89a1b23df..ec101ed02 100644
--- a/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties
+++ b/cloudsql-mysql-plugin/src/e2e-test/resources/errorMessage.properties
@@ -1,9 +1,9 @@
-errorMessageInvalidSourceDatabase=SQL error while getting query schema: Error: Unknown database 'invalidDatabase', SQLState: 42000, ErrorCode: 1049
+errorMessageInvalidSourceDatabase=SQL Error occurred, sqlState: '42000', errorCode: '1049'
errorMessageInvalidImportQuery=Import Query select must contain the string '$CONDITIONS'. if Number of Splits is not set\
\ to 1. Include '$CONDITIONS' in the Import Query
errorMessageCloudMySqlInvalidReferenceName=Invalid reference name
errorMessageBlankUsername=Username is required when password is given.
-errorMessageBlankPassword=SQL error while getting query schema: Error: Access denied for user
+errorMessageBlankPassword=SQL Error occurred, sqlState: '28000', errorCode: '1045', errorMessage: SQL Exception occurred: [Message='Access denied for user '
errorMessageInvalidFetchSize=Invalid fetch size. Fetch size must be a positive integer.
errorMessageBlankSplitBy=Split-By Field Name must be specified if Number of Splits is not set to 1. Specify the Split-by Field Name.
errorMessageInvalidNumberOfSplits=Invalid value for Number of Splits '0'. Must be at least 1. Specify a Number of Splits no less than 1.
@@ -17,10 +17,9 @@ validationSuccessMessage=No errors found.
validationErrorMessage=COUNT ERROR found
errorLogsMessageInvalidTableName=Spark program 'phase-1' failed with error: Stage 'CloudSQL MySQL' encountered : io.cdap.cdap.etl.api.validation.ValidationException: Errors were encountered during validation. \
Table 'Table123' does not exist.. Please check the system logs for more details.
-errorLogsMessageInvalidCredentials =Spark program 'phase-1' failed with error: Stage 'CloudSQL MySQL' encountered : io.cdap.cdap.etl.api.validation.ValidationException: Errors were encountered during validation. \
- Exception while trying to validate schema of database table
+errorLogsMessageInvalidCredentials=Spark program 'phase-1' failed with error: Stage 'CloudSQL MySQL' encountered : io.cdap.cdap.api.exception.ProgramFailureException: SQL Error occurred, sqlState: '28000', errorCode: '1045', errorMessage: SQL Exception occurred: [Message='Access denied for user 'testUser'
errorLogsMessageInvalidBoundingQuery=Spark program 'phase-1' failed with error: Stage 'CloudSQL MySQL' encountered : java.io.IOException: You have an error in your SQL syntax; \
check the manual that corresponds to your MySQL server version for the right syntax to use near 'table' at line 1. Please check the system logs for more details.
-errorMessageInvalidPassword=SQL error while getting query schema: Error: Access denied for user
+errorMessageInvalidPassword=SQL Error occurred, sqlState: '28000', errorCode: '1045', errorMessage: SQL Exception occurred: [Message='Access denied for user '
errorMessagePrivateConnectionName=Enter the internal IP address of the Compute Engine VM cloudsql proxy is running on, to connect to a private
-errorMessageWithBlankPassword=Exception while trying to validate schema of database table
+errorMessageWithBlankPassword=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '28000', errorCode: '1045', errorMessage: SQL Exception occurred: [Message='Access denied for user '
diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java
index b4b87c81b..0751bd160 100644
--- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java
+++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLConnector.java
@@ -31,6 +31,7 @@
import io.cdap.plugin.common.Constants;
import io.cdap.plugin.common.ReferenceNames;
import io.cdap.plugin.common.db.DBConnectorPath;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.connector.AbstractDBSpecificConnector;
@@ -108,4 +109,12 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
properties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
properties.put(CloudSQLMySQLSink.CloudSQLMySQLSinkConfig.TABLE_NAME, table);
}
+
+ @Override
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new CloudSQLMySQLErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
+ }
}
diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java
index 6cd1b0031..30fc229fe 100644
--- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java
+++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSink.java
@@ -35,6 +35,7 @@
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.db.DBRecord;
import io.cdap.plugin.db.config.AbstractDBSpecificSinkConfig;
import io.cdap.plugin.db.sink.AbstractDBSink;
@@ -110,8 +111,11 @@ protected String getErrorDetailsProviderClassName() {
}
@Override
- protected String getExternalDocumentationLink() {
- return DBUtils.CLOUDSQLMYSQL_SUPPORTED_DOC_URL;
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new CloudSQLMySQLErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
}
@Override
diff --git a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java
index 201360c67..80358ce4b 100644
--- a/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java
+++ b/cloudsql-mysql-plugin/src/main/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSource.java
@@ -31,6 +31,7 @@
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig;
import io.cdap.plugin.db.source.AbstractDBSource;
@@ -82,11 +83,6 @@ protected Class extends DBWritable> getDBRecordType() {
return MysqlDBRecord.class;
}
- @Override
- protected String getExternalDocumentationLink() {
- return DBUtils.CLOUDSQLMYSQL_SUPPORTED_DOC_URL;
- }
-
@Override
protected String createConnectionString() {
if (CloudSQLUtil.PRIVATE_INSTANCE.equalsIgnoreCase(
@@ -138,6 +134,14 @@ protected String getErrorDetailsProviderClassName() {
return CloudSQLMySQLErrorDetailsProvider.class.getName();
}
+ @Override
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new CloudSQLMySQLErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
+ }
+
/** CloudSQL MySQL source config. */
public static class CloudSQLMySQLSourceConfig extends AbstractDBSpecificSourceConfig {
diff --git a/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java b/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java
index 65a14502e..93c06981a 100644
--- a/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java
+++ b/cloudsql-mysql-plugin/src/test/java/io/cdap/plugin/cloudsql/mysql/CloudSQLMySQLSinkTest.java
@@ -20,6 +20,9 @@
import org.junit.Assert;
import org.junit.Test;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class CloudSQLMySQLSinkTest {
@Test
public void testSetColumnsInfo() {
@@ -27,7 +30,13 @@ public void testSetColumnsInfo() {
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("insert", Schema.of(Schema.Type.STRING)));
- CloudSQLMySQLSink cloudSQLMySQLSink = new CloudSQLMySQLSink(new CloudSQLMySQLSink.CloudSQLMySQLSinkConfig());
+
+ CloudSQLMySQLSink.CloudSQLMySQLSinkConfig mockConfig = mock(CloudSQLMySQLSink.CloudSQLMySQLSinkConfig.class);
+ when(mockConfig.getInitialRetryDuration()).thenReturn(5); // or appropriate value
+ when(mockConfig.getMaxRetryDuration()).thenReturn(80); // or appropriate value
+ when(mockConfig.getMaxRetryCount()).thenReturn(5); // or appropriate value
+
+ CloudSQLMySQLSink cloudSQLMySQLSink = new CloudSQLMySQLSink(mockConfig);
Assert.assertNotNull(outputSchema.getFields());
cloudSQLMySQLSink.setColumnsInfo(outputSchema.getFields());
Assert.assertEquals("`id`,`name`,`insert`", cloudSQLMySQLSink.getDbColumns());
diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-action.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-action.json
index 66d6ebb85..0dd6f8f41 100644
--- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-action.json
+++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-action.json
@@ -112,6 +112,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"filters": [
diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json
index 89a7d7736..3a3277ed8 100644
--- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json
+++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsink.json
@@ -176,6 +176,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"filters": [
diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json
index 4ac7747f4..a90154670 100644
--- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json
+++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-batchsource.json
@@ -175,6 +175,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-connector.json b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-connector.json
index b5c2c9993..1cebc7850 100644
--- a/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-connector.json
+++ b/cloudsql-mysql-plugin/widgets/CloudSQLMySQL-connector.json
@@ -94,6 +94,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/cloudsql-postgresql-plugin/pom.xml b/cloudsql-postgresql-plugin/pom.xml
index f3ba5520d..2256d7b44 100644
--- a/cloudsql-postgresql-plugin/pom.xml
+++ b/cloudsql-postgresql-plugin/pom.xml
@@ -26,11 +26,45 @@
CloudSQL PostgreSQL plugin
cloudsql-postgresql-plugin
4.0.0
+ CloudSQL PostgreSQL database plugins
+ https://github.com/data-integrations/database-plugins
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
+
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
io.cdap.cdap
cdap-etl-api
+ ${cdap.version}
+ provided
+
+
+ io.cdap.cdap
+ cdap-api
+ ${cdap.version}
provided
@@ -41,6 +75,7 @@
io.cdap.plugin
hydrator-common
+ ${cdap.plugin.version}
io.cdap.plugin
@@ -63,24 +98,26 @@
io.cdap.cdap
hydrator-test
+ ${cdap.version}
+ test
io.cdap.cdap
cdap-data-pipeline3_2.12
+ ${cdap.version}
test
junit
junit
-
-
- io.cdap.cdap
- cdap-api
- provided
+ ${junit.version}
+ test
org.mockito
mockito-core
+ ${mockito.version}
+ test
org.jetbrains
diff --git a/cloudsql-postgresql-plugin/src/e2e-test/features/source/RunTime.feature b/cloudsql-postgresql-plugin/src/e2e-test/features/source/RunTime.feature
index a9e3ff26b..c4c2eeae6 100644
--- a/cloudsql-postgresql-plugin/src/e2e-test/features/source/RunTime.feature
+++ b/cloudsql-postgresql-plugin/src/e2e-test/features/source/RunTime.feature
@@ -147,7 +147,9 @@ Feature: CloudSQL-PostGreSQL Source - Run Time scenarios
And Save and Deploy Pipeline
And Run the Pipeline in Runtime
And Wait till pipeline is in running state
+ And Open and capture logs
And Verify the pipeline status is "Failed"
+ And Close the pipeline logs
Then Open Pipeline logs and verify Log entries having below listed Level and Message:
| Level | Message |
| ERROR | errorLogsMessageInvalidBoundingQuery |
@@ -189,6 +191,7 @@ Feature: CloudSQL-PostGreSQL Source - Run Time scenarios
Then Save the pipeline
Then Preview and run the pipeline
Then Wait till pipeline preview is in running state and check if any error occurs
+ Then Open and capture pipeline preview logs
Then Verify the preview run status of pipeline in the logs is "failed"
@CLOUDSQLPOSTGRESQL_SOURCE_TEST @CLOUDSQLPOSTGRESQL_TARGET_TEST
diff --git a/cloudsql-postgresql-plugin/src/e2e-test/resources/errorMessage.properties b/cloudsql-postgresql-plugin/src/e2e-test/resources/errorMessage.properties
index 7e9cd2337..a344e472f 100644
--- a/cloudsql-postgresql-plugin/src/e2e-test/resources/errorMessage.properties
+++ b/cloudsql-postgresql-plugin/src/e2e-test/resources/errorMessage.properties
@@ -7,19 +7,19 @@ errorMessageInvalidNumberOfSplits=Invalid value for Number of Splits '0'. Must b
errorMessageNumberOfSplitNotNumber=Unable to create config for batchsource CloudSQLPostgreSQL 'numSplits' is invalid: Value of \
field class io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig.numSplits is expected to be a number.
errorMessageInvalidFetchSize=Invalid fetch size. Fetch size must be a positive integer.
-errorMessageInvalidSourceDatabase=SQL error while getting query schema: Error: FATAL: database "invalidDatabase" does not exist,
+errorMessageInvalidSourceDatabase=SQL Error occurred, sqlState: '3D000', errorCode: '0', errorMessage: SQL Exception occurred: [Message='FATAL: database "invalidDatabase" does not exist', SQLState='3D000', ErrorCode='0'].
errorMessageInvalidImportQuery=Import Query select must contain the string '$CONDITIONS'. if Number of Splits is not set\
\ to 1. Include '$CONDITIONS' in the Import Query
errorMessageBlankUsername=Username is required when password is given.
-errorMessageBlankPassword=SQL error while getting query schema:
-errorMessageInvalidPassword=SQL error while getting query schema: Error: FATAL: password authentication failed for user
+errorMessageBlankPassword=SQL Error occurred, sqlState: '99999', errorCode: '0', errorMessage: SQL Exception occurred: [Message='Something unusual has occurred to cause the driver to fail. Please report this exception.', SQLState='99999', ErrorCode='0'].
+errorMessageInvalidPassword=SQL Error occurred, sqlState: '28P01', errorCode: '0', errorMessage: SQL Exception occurred: [Message='FATAL: password authentication failed for user
errorMessageInvalidSourceHost=SQL error while getting query schema: The connection attempt failed.
errorMessageInvalidTableName=Table 'table' does not exist. Ensure table '"table"' is set correctly and that the
-errorMessageInvalidSinkDatabase=Exception while trying to validate schema of database table
+errorMessageInvalidSinkDatabase=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '3D000', errorCode: '0', errorMessage: SQL Exception occurred: [Message='FATAL: database "invalidDB" does not exist', SQLState='3D000', ErrorCode='0'].'
errorLogsMessageInvalidBoundingQuery=The column index is out of range: 1, number of columns: 0..
errorMessageConnectionName=Connection Name must be in the format :: to connect to \
a public CloudSQL PostgreSQL instance.
errorMessagePrivateConnectionName=Enter the internal IP address of the Compute Engine VM cloudsql proxy is running on, \
to connect to a private CloudSQL PostgreSQL instance.
-errorMessageWithBlankPassword=Exception while trying to validate schema of database table
+errorMessageWithBlankPassword=Error encountered while configuring the stage: 'SQL Error occurred, sqlState: '99999', errorCode: '0', errorMessage: SQL Exception occurred: [Message='Something unusual has occurred to cause the driver to fail. Please report this exception.', SQLState='99999', ErrorCode='0'].'
errorMessageUpdateUpsertOperationName=Table key must be set if the operation is 'Update' or 'Upsert'.
diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java
index 07c83ebbe..348a1b94c 100644
--- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java
+++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLConnector.java
@@ -30,6 +30,7 @@
import io.cdap.plugin.common.Constants;
import io.cdap.plugin.common.ReferenceNames;
import io.cdap.plugin.common.db.DBConnectorPath;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.common.db.DBPath;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.connector.AbstractDBSpecificConnector;
@@ -118,4 +119,12 @@ protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath pa
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
sinkProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table));
}
+
+ @Override
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new CloudSQLPostgreSQLErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
+ }
}
diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java
index 060b67f82..dcfdd3579 100644
--- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java
+++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSink.java
@@ -36,6 +36,7 @@
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.db.DBRecord;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.config.AbstractDBSpecificSinkConfig;
@@ -154,8 +155,11 @@ protected String getErrorDetailsProviderClassName() {
}
@Override
- protected String getExternalDocumentationLink() {
- return DBUtils.CLOUDSQLPOSTGRES_SUPPORTED_DOC_URL;
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new CloudSQLPostgreSQLErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
}
/** CloudSQL PostgreSQL sink config. */
diff --git a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java
index db3f2d708..7ebc7d809 100644
--- a/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java
+++ b/cloudsql-postgresql-plugin/src/main/java/io/cdap/plugin/cloudsql/postgres/CloudSQLPostgreSQLSource.java
@@ -31,6 +31,7 @@
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ConfigUtil;
import io.cdap.plugin.common.LineageRecorder;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.config.AbstractDBSpecificSourceConfig;
import io.cdap.plugin.db.source.AbstractDBSource;
@@ -88,13 +89,16 @@ protected Class extends DBWritable> getDBRecordType() {
}
@Override
- protected String getExternalDocumentationLink() {
- return DBUtils.CLOUDSQLPOSTGRES_SUPPORTED_DOC_URL;
+ protected String getErrorDetailsProviderClassName() {
+ return CloudSQLPostgreSQLErrorDetailsProvider.class.getName();
}
@Override
- protected String getErrorDetailsProviderClassName() {
- return CloudSQLPostgreSQLErrorDetailsProvider.class.getName();
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new CloudSQLPostgreSQLErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
}
@Override
diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json
index eab240679..e14646154 100644
--- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json
+++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-action.json
@@ -112,6 +112,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"filters": [
diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json
index 2fda594dd..8d6578413 100644
--- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json
+++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsink.json
@@ -192,6 +192,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json
index 96ea97ac2..ea449120d 100644
--- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json
+++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-batchsource.json
@@ -175,6 +175,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [
diff --git a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-connector.json b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-connector.json
index 9824f91bd..36013ac40 100644
--- a/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-connector.json
+++ b/cloudsql-postgresql-plugin/widgets/CloudSQLPostgreSQL-connector.json
@@ -94,6 +94,37 @@
}
}
]
+ },
+ {
+ "properties": [
+ {
+ "widget-type": "hidden",
+ "label": "Initial Retry Duration (sec)",
+ "name": "initialRetryDuration",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Duration (sec)",
+ "name": "maxRetryDuration",
+ "widget-attributes": {
+ "default": 80,
+ "minimum": 0
+ }
+ },
+ {
+ "widget-type": "hidden",
+ "label": "Maximum Retry Count",
+ "name": "maxRetryCount",
+ "widget-attributes": {
+ "default": 5,
+ "minimum": 0
+ }
+ }
+ ]
}
],
"outputs": [],
diff --git a/database-commons/pom.xml b/database-commons/pom.xml
index 67dc8e82e..98bcb3803 100644
--- a/database-commons/pom.xml
+++ b/database-commons/pom.xml
@@ -26,33 +26,76 @@
Database Commons
database-commons
4.0.0
+ Database Commons
+ https://github.com/data-integrations/database-plugins
+
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+ A business-friendly OSS license
+
+
+
+
+
+ CDAP
+ cdap-dev@googlegroups.com
+ CDAP
+ http://cdap.io
+
+
+
+
+ scm:git:https://github.com/cdapio/hydrator-plugins.git
+ scm:git:git@github.com:cdapio/hydrator-plugins.git
+ https://github.com/cdapio/hydrator-plugins.git
+ HEAD
+
io.cdap.cdap
cdap-etl-api
+ ${cdap.version}
+ provided
io.cdap.plugin
hydrator-common
+ ${cdap.plugin.version}
com.google.guava
guava
+ ${guava.version}
+
+
+
+
+ dev.failsafe
+ failsafe
io.cdap.cdap
hydrator-test
+ ${cdap.version}
+ test
io.cdap.cdap
cdap-data-pipeline3_2.12
+ ${cdap.version}
+ test
junit
junit
+ ${junit.version}
+ test
com.mockrunner
@@ -63,6 +106,8 @@
org.mockito
mockito-core
+ ${mockito.version}
+ test
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java
index c5320e25e..2a5aaa988 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/ConnectionConfig.java
@@ -24,6 +24,7 @@
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.db.config.DatabaseConnectionConfig;
+import io.cdap.plugin.util.RetryUtils;
import java.util.Collections;
import java.util.HashMap;
@@ -72,6 +73,37 @@ public abstract class ConnectionConfig extends PluginConfig implements DatabaseC
@Macro
public String connectionArguments;
+ @Name(RetryUtils.NAME_INITIAL_RETRY_DURATION)
+ @Description("Time taken for the first retry. Default is 5 seconds.")
+ @Nullable
+ @Macro
+ private Integer initialRetryDuration;
+
+ @Name(RetryUtils.NAME_MAX_RETRY_DURATION)
+ @Description("Maximum time in seconds retries can take. Default is 80 seconds.")
+ @Nullable
+ @Macro
+ private Integer maxRetryDuration;
+
+ @Name(RetryUtils.NAME_MAX_RETRY_COUNT)
+ @Description("Maximum number of retries allowed. Default is 5.")
+ @Nullable
+ @Macro
+ private Integer maxRetryCount;
+
+
+ public Integer getInitialRetryDuration() {
+ return initialRetryDuration == null ? RetryUtils.DEFAULT_INITIAL_RETRY_DURATION_SECONDS : initialRetryDuration;
+ }
+
+ public Integer getMaxRetryDuration() {
+ return maxRetryDuration == null ? RetryUtils.DEFAULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration;
+ }
+
+ public Integer getMaxRetryCount() {
+ return maxRetryCount == null ? RetryUtils.DEFAULT_MAX_RETRY_COUNT : maxRetryCount;
+ }
+
public ConnectionConfig() {
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/RetryExceptions.java b/database-commons/src/main/java/io/cdap/plugin/db/RetryExceptions.java
new file mode 100644
index 000000000..13720ab9a
--- /dev/null
+++ b/database-commons/src/main/java/io/cdap/plugin/db/RetryExceptions.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.db;
+
+import java.sql.SQLTransientException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Check if an exception or any of its causes is a retryable {@link java.sql.SQLTransientException}.
+ */
+public class RetryExceptions {
+ public static boolean isRetryable(Throwable t) {
+ Set seen = new HashSet<>();
+ while (t != null && seen.add(t)) {
+ if (t instanceof SQLTransientException) {
+ return true;
+ }
+ t = t.getCause();
+ }
+ return false;
+ }
+}
+
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBArgumentSetter.java b/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBArgumentSetter.java
index 5e22abf85..d7aeb51c3 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBArgumentSetter.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/action/AbstractDBArgumentSetter.java
@@ -16,19 +16,22 @@
package io.cdap.plugin.db.action;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.StageConfigurer;
import io.cdap.cdap.etl.api.action.Action;
import io.cdap.cdap.etl.api.action.ActionContext;
import io.cdap.cdap.etl.api.action.SettableArguments;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.util.DBUtils;
import io.cdap.plugin.util.DriverCleanup;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import io.cdap.plugin.util.RetryUtils;
import java.sql.Connection;
import java.sql.Driver;
-import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -41,9 +44,13 @@ public class AbstractDBArgumentSetter extends Action {
private static final String JDBC_PLUGIN_ID = "driver";
private final ArgumentSetterConfig config;
+ private final RetryPolicy> retryPolicy;
+ protected DBErrorDetailsProvider dbErrorDetailsProvider;
public AbstractDBArgumentSetter(ArgumentSetterConfig config) {
this.config = config;
+ this.retryPolicy = RetryPolicyUtil.getRetryPolicy(config.getInitialRetryDuration(), config.getMaxRetryDuration(),
+ config.getMaxRetryCount());
}
@Override
@@ -100,10 +107,22 @@ private void processArguments(Class extends Driver> driverClass,
Properties connectionProperties = new Properties();
connectionProperties.putAll(config.getConnectionArguments());
try {
- Connection connection = DriverManager
- .getConnection(config.getConnectionString(), connectionProperties);
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery(config.getQuery());
+ executeWithRetry(failureCollector, settableArguments, connectionProperties);
+ } finally {
+ driverCleanup.destroy();
+ }
+ }
+
+ private void executeWithRetry(FailureCollector failureCollector, SettableArguments settableArguments,
+ Properties connectionProperties) throws SQLException {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ config.getConnectionString(), connectionProperties, getErrorDetailsProvider())) {
+ ResultSet resultSet;
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getErrorDetailsProvider())) {
+ resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, statement,
+ config.getQuery(), getErrorDetailsProvider());
+ }
boolean hasRecord = resultSet.next();
if (!hasRecord) {
failureCollector.addFailure("No record found.",
@@ -118,8 +137,6 @@ private void processArguments(Class extends Driver> driverClass,
.addFailure("More than one records found.",
"The argument selection conditions must match only one record.");
}
- } finally {
- driverCleanup.destroy();
}
}
@@ -138,4 +155,17 @@ private void setArguments(ResultSet resultSet, FailureCollector failureCollector
arguments.set(column, resultSet.getString(column));
}
}
+
+ /**
+ * Returns the DBErrorDetailsProvider instance.
+ * Override this method to provide a custom DBErrorDetailsProvider instance.
+ *
+ * @return DBErrorDetailsProvider instance
+ */
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new DBErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
+ }
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/action/DBRun.java b/database-commons/src/main/java/io/cdap/plugin/db/action/DBRun.java
index e2ccfc57e..1aaf000bd 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/action/DBRun.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/action/DBRun.java
@@ -16,12 +16,15 @@
package io.cdap.plugin.db.action;
+import dev.failsafe.RetryPolicy;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.util.DBUtils;
import io.cdap.plugin.util.DriverCleanup;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import io.cdap.plugin.util.RetryUtils;
import java.sql.Connection;
import java.sql.Driver;
-import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
@@ -34,6 +37,8 @@ public class DBRun {
private final QueryConfig config;
private final Class extends Driver> driverClass;
private boolean enableAutoCommit;
+ private final RetryPolicy> retryPolicy;
+ protected DBErrorDetailsProvider dbErrorDetailsProvider;
public DBRun(QueryConfig config, Class extends Driver> driverClass, Boolean enableAutocommit) {
this.config = config;
@@ -41,6 +46,20 @@ public DBRun(QueryConfig config, Class extends Driver> driverClass, Boolean en
if (enableAutocommit != null) {
this.enableAutoCommit = enableAutocommit;
}
+ this.retryPolicy = RetryPolicyUtil.getRetryPolicy(config.getInitialRetryDuration(), config.getMaxRetryDuration(),
+ config.getMaxRetryCount());
+ }
+
+ /**
+ * Returns the DBErrorDetailsProvider instance.
+ *
+ * @return DBErrorDetailsProvider instance
+ */
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new DBErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
}
/**
@@ -55,13 +74,15 @@ public void run() throws SQLException, InstantiationException, IllegalAccessExce
Properties connectionProperties = new Properties();
connectionProperties.putAll(config.getConnectionArguments());
- try (Connection connection = DriverManager.getConnection(config.getConnectionString(), connectionProperties)) {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ config.getConnectionString(), connectionProperties, getErrorDetailsProvider())) {
executeInitQueries(connection, config.getInitQueries());
if (!enableAutoCommit) {
connection.setAutoCommit(false);
}
- try (Statement statement = connection.createStatement()) {
- statement.execute(config.query);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getErrorDetailsProvider())) {
+ RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, config.query, getErrorDetailsProvider());
if (!enableAutoCommit) {
connection.commit();
}
@@ -76,8 +97,9 @@ public void run() throws SQLException, InstantiationException, IllegalAccessExce
private void executeInitQueries(Connection connection, List initQueries) throws SQLException {
for (String query : initQueries) {
- try (Statement statement = connection.createStatement()) {
- statement.execute(query);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getErrorDetailsProvider())) {
+ RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, getErrorDetailsProvider());
}
}
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java
index 5b92a85f7..3d9ed16ff 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSinkConfig.java
@@ -155,4 +155,16 @@ public Operation getOperationName() {
public String getRelationTableKey() {
return relationTableKey;
}
+
+ public Integer getInitialRetryDuration() {
+ return getConnection().getInitialRetryDuration();
+ }
+
+ public Integer getMaxRetryDuration() {
+ return getConnection().getMaxRetryDuration();
+ }
+
+ public Integer getMaxRetryCount() {
+ return getConnection().getMaxRetryCount();
+ }
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java
index 41c577397..f15939ab7 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/config/AbstractDBSpecificSourceConfig.java
@@ -268,4 +268,16 @@ public Integer getFetchSize() {
return fetchSize;
}
+ public Integer getInitialRetryDuration() {
+ return getConnection().getInitialRetryDuration();
+ }
+
+ public Integer getMaxRetryDuration() {
+ return getConnection().getMaxRetryDuration();
+ }
+
+ public Integer getMaxRetryCount() {
+ return getConnection().getMaxRetryCount();
+ }
+
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseConnectionConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseConnectionConfig.java
index 55cfe363f..e1fde69a1 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseConnectionConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/config/DatabaseConnectionConfig.java
@@ -50,4 +50,10 @@ public interface DatabaseConnectionConfig {
*/
String getPassword();
+ Integer getInitialRetryDuration();
+
+ Integer getMaxRetryDuration();
+
+ Integer getMaxRetryCount();
+
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBConnectorConfig.java b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBConnectorConfig.java
index 4bee056f8..646c5e388 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBConnectorConfig.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBConnectorConfig.java
@@ -25,6 +25,7 @@
import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.common.db.DBConnectorProperties;
import io.cdap.plugin.db.ConnectionConfig;
+import io.cdap.plugin.util.RetryUtils;
import java.util.Collections;
import java.util.HashMap;
@@ -63,6 +64,26 @@ public abstract class AbstractDBConnectorConfig extends PluginConfig implements
@Macro
protected String connectionArguments;
+
+ @Name(RetryUtils.NAME_INITIAL_RETRY_DURATION)
+ @Description("Time taken for the first retry. Default is 5 seconds.")
+ @Nullable
+ @Macro
+ private Integer initialRetryDuration;
+
+ @Name(RetryUtils.NAME_MAX_RETRY_DURATION)
+ @Description("Maximum time in seconds retries can take. Default is 80 seconds.")
+ @Nullable
+ @Macro
+ private Integer maxRetryDuration;
+
+ @Name(RetryUtils.NAME_MAX_RETRY_COUNT)
+ @Description("Maximum number of retries allowed. Default is 5.")
+ @Nullable
+ @Macro
+ private Integer maxRetryCount;
+
+
@Nullable
@Override
public String getUser() {
@@ -74,6 +95,18 @@ public String getUser() {
public String getPassword() {
return password;
}
+
+ public Integer getInitialRetryDuration() {
+ return initialRetryDuration == null ? RetryUtils.DEFAULT_INITIAL_RETRY_DURATION_SECONDS : initialRetryDuration;
+ }
+
+ public Integer getMaxRetryDuration() {
+ return maxRetryDuration == null ? RetryUtils.DEFAULT_MAX_RETRY_DURATION_SECONDS : maxRetryDuration;
+ }
+
+ public Integer getMaxRetryCount() {
+ return maxRetryCount == null ? RetryUtils.DEFAULT_MAX_RETRY_COUNT : maxRetryCount;
+ }
@Override
public Properties getConnectionArgumentsProperties() {
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java
index 8a9b7b6e4..0308cf7a4 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/connector/AbstractDBSpecificConnector.java
@@ -17,6 +17,7 @@
package io.cdap.plugin.db.connector;
import com.google.common.collect.Maps;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.data.batch.InputFormatProvider;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.batch.BatchConnector;
@@ -28,11 +29,14 @@
import io.cdap.plugin.common.SourceInputFormatProvider;
import io.cdap.plugin.common.db.AbstractDBConnector;
import io.cdap.plugin.common.db.DBConnectorPath;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.common.util.ExceptionUtils;
import io.cdap.plugin.db.CommonSchemaReader;
import io.cdap.plugin.db.ConnectionConfigAccessor;
import io.cdap.plugin.db.SchemaReader;
import io.cdap.plugin.db.source.DataDrivenETLDBInputFormat;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import io.cdap.plugin.util.RetryUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -56,10 +60,14 @@ public abstract class AbstractDBSpecificConnector extends
implements BatchConnector {
private final AbstractDBConnectorConfig config;
+ private final RetryPolicy> retryPolicy;
+ protected DBErrorDetailsProvider dbErrorDetailsProvider;
protected AbstractDBSpecificConnector(AbstractDBConnectorConfig config) {
super(config);
this.config = config;
+ this.retryPolicy = RetryPolicyUtil.getRetryPolicy(config.getInitialRetryDuration(), config.getMaxRetryDuration(),
+ config.getMaxRetryCount());
}
public abstract boolean supportSchema();
@@ -116,6 +124,19 @@ public InputFormatProvider getInputFormatProvider(ConnectorContext context, Samp
return new SourceInputFormatProvider(DataDrivenETLDBInputFormat.class, connectionConfigAccessor.getConfiguration());
}
+ /**
+ * Returns the DBErrorDetailsProvider instance.
+ * Override this method to provide a custom DBErrorDetailsProvider instance.
+ *
+ * @return DBErrorDetailsProvider instance
+ */
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new DBErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
+ }
+
protected Connection getConnection(DBConnectorPath path) {
return getConnection(getConnectionString(path.getDatabase()), config.getConnectionArgumentsProperties());
}
@@ -172,13 +193,16 @@ protected String getStratifiedQuery(String tableName, int limit, String strata,
protected Schema loadTableSchema(Connection connection, String query, @Nullable Integer timeoutSec, String sessionID)
throws SQLException {
- Statement statement = connection.createStatement();
- statement.setMaxRows(1);
- if (timeoutSec != null) {
- statement.setQueryTimeout(timeoutSec);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getErrorDetailsProvider())) {
+ statement.setMaxRows(1);
+ if (timeoutSec != null) {
+ statement.setQueryTimeout(timeoutSec);
+ }
+ ResultSet resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, statement, query,
+ getErrorDetailsProvider());
+ return Schema.recordOf("outputSchema", getSchemaReader(sessionID).getSchemaFields(resultSet));
}
- ResultSet resultSet = statement.executeQuery(query);
- return Schema.recordOf("outputSchema", getSchemaReader(sessionID).getSchemaFields(resultSet));
}
protected void setConnectionProperties(Map properties, ConnectorSpecRequest request) {
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java b/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java
index 0bb4bf123..0923b9114 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/sink/AbstractDBSink.java
@@ -18,6 +18,7 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
@@ -25,10 +26,6 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
-import io.cdap.cdap.api.exception.ErrorCategory;
-import io.cdap.cdap.api.exception.ErrorCodeType;
-import io.cdap.cdap.api.exception.ErrorType;
-import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
@@ -42,6 +39,7 @@
import io.cdap.plugin.common.ReferenceBatchSink;
import io.cdap.plugin.common.ReferencePluginConfig;
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.db.ColumnType;
import io.cdap.plugin.db.CommonSchemaReader;
import io.cdap.plugin.db.ConnectionConfig;
@@ -53,6 +51,8 @@
import io.cdap.plugin.db.config.DatabaseSinkConfig;
import io.cdap.plugin.util.DBUtils;
import io.cdap.plugin.util.DriverCleanup;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import io.cdap.plugin.util.RetryUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
@@ -61,7 +61,6 @@
import java.sql.Connection;
import java.sql.Driver;
-import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
@@ -98,12 +97,16 @@ public abstract class AbstractDBSink retryPolicy;
+ protected DBErrorDetailsProvider dbErrorDetailsProvider;
public AbstractDBSink(T dbSinkConfig) {
super(new ReferencePluginConfig(dbSinkConfig.getReferenceName()));
this.dbSinkConfig = dbSinkConfig;
this.configAccessor = new ConnectionConfigAccessor();
this.configuration = configAccessor.getConfiguration();
+ this.retryPolicy = RetryPolicyUtil.getRetryPolicy(dbSinkConfig.getInitialRetryDuration(),
+ dbSinkConfig.getMaxRetryDuration(), dbSinkConfig.getMaxRetryCount());
}
private String getJDBCPluginId() {
@@ -126,7 +129,8 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
Class extends Driver> driverClass = DBUtils.getDriverClass(
pipelineConfigurer, dbSinkConfig, ConnectionConfig.JDBC_PLUGIN_TYPE);
if (driverClass != null && dbSinkConfig.canConnect()) {
- validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema, dbSinkConfig.getDBSchemaName());
+ validateSchema(collector, driverClass, dbSinkConfig.getTableName(), inputSchema,
+ dbSinkConfig.getDBSchemaName());
}
}
public void validateOperations(FailureCollector collector, T dbSinkConfig, @Nullable Schema inputSchema) {
@@ -179,13 +183,16 @@ protected String getErrorDetailsProviderClassName() {
}
/**
- * Returns the external documentation link.
- * Override this method to provide a custom external documentation link.
+ * Returns the DBErrorDetailsProvider instance.
+ * Override this method to provide a custom DBErrorDetailsProvider instance.
*
- * @return external documentation link
+ * @return DBErrorDetailsProvider instance
*/
- protected String getExternalDocumentationLink() {
- return null;
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new DBErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
}
@Override
@@ -206,16 +213,16 @@ public void prepareRun(BatchSinkContext context) {
// Load the plugin class to make sure it is available.
Class extends Driver> driverClass = context.loadPluginClass(getJDBCPluginId());
// make sure that the destination table exists and column types are correct
+ FailureCollector collector = context.getFailureCollector();
try {
if (Objects.nonNull(outputSchema)) {
- FailureCollector collector = context.getFailureCollector();
validateOperations(collector, dbSinkConfig, outputSchema);
validateSchema(collector, driverClass, tableName,
outputSchema, dbSchemaName);
- collector.getOrThrowException();
} else {
- outputSchema = inferSchema(driverClass);
+ outputSchema = inferSchema(collector, driverClass);
}
+ collector.getOrThrowException();
} finally {
DBUtils.cleanup(driverClass);
}
@@ -286,13 +293,14 @@ protected void setColumnsInfo(List fields) {
public void initialize(BatchRuntimeContext context) throws Exception {
super.initialize(context);
driverClass = context.loadPluginClass(getJDBCPluginId());
- Schema outputSchema = Optional.ofNullable(context.getInputSchema()).orElse(inferSchema(driverClass));
+ FailureCollector collector = context.getFailureCollector();
+ Schema outputSchema = Optional.ofNullable(context.getInputSchema()).orElse(inferSchema(collector, driverClass));
setColumnsInfo(outputSchema.getFields());
setResultSetMetadata();
}
- private Schema inferSchema(Class extends Driver> driverClass) {
+ private Schema inferSchema(FailureCollector collector, Class extends Driver> driverClass) {
List inferredFields = new ArrayList<>();
String dbSchemaName = dbSinkConfig.getDBSchemaName();
String fullyQualifiedTableName = dbSchemaName == null ? dbSinkConfig.getEscapedTableName()
@@ -302,33 +310,20 @@ private Schema inferSchema(Class extends Driver> driverClass) {
dbSinkConfig.getJdbcPluginName());
Properties connectionProperties = new Properties();
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
- try (Connection connection = DriverManager.getConnection(dbSinkConfig.getConnectionString(),
- connectionProperties)) {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ dbSinkConfig.getConnectionString(), connectionProperties, getErrorDetailsProvider())) {
executeInitQueries(connection, dbSinkConfig.getInitQueries());
-
- try (Statement statement = connection.createStatement();
- ResultSet rs = statement.executeQuery("SELECT * FROM " + fullyQualifiedTableName
- + " WHERE 1 = 0")) {
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy,
+ connection, getErrorDetailsProvider());
+ ResultSet rs = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, statement,
+ String.format("SELECT * FROM %s WHERE 1 = 0", fullyQualifiedTableName), getErrorDetailsProvider())) {
inferredFields.addAll(getSchemaReader().getSchemaFields(rs));
}
} catch (SQLException e) {
- // wrap exception to ensure SQLException-child instances not exposed to contexts w/o jdbc driver in classpath
- String errorMessage =
- String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].", e.getMessage(),
- e.getSQLState(), e.getErrorCode());
- String errorMessageWithDetails = String.format("Error while reading table metadata." +
- "Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), e.getErrorCode(), e.getSQLState());
- String externalDocumentationLink = getExternalDocumentationLink();
- if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
- if (!errorMessage.endsWith(".")) {
- errorMessage = errorMessage + ".";
- }
- errorMessage = String.format("%s For more details, see %s", errorMessageWithDetails, errorMessage);
- }
- throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
- errorMessage, errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE,
- e.getSQLState(), externalDocumentationLink, new SQLException(e.getMessage(),
- e.getSQLState(), e.getErrorCode()));
+ collector.addFailure(
+ String.format("Exception while trying to infer schema of database table '%s' for connection '%s' with %s. "
+ + "[SQLState='%s', ErrorCode='%s']", fullyQualifiedTableName, dbSinkConfig.getConnectionString(),
+ e.getMessage(), e.getSQLState(), e.getErrorCode()), null).withStacktrace(e.getStackTrace());
}
} catch (IllegalAccessException | InstantiationException | SQLException e) {
throw new InvalidStageException("JDBC Driver unavailable: " + dbSinkConfig.getJdbcPluginName(), e);
@@ -357,7 +352,7 @@ public void destroy() {
}
}
- private void setResultSetMetadata() throws Exception {
+ private void setResultSetMetadata() throws SQLException, IllegalAccessException, InstantiationException {
List columnTypes = new ArrayList<>(columns.size());
String connectionString = dbSinkConfig.getConnectionString();
String dbSchemaName = dbSinkConfig.getDBSchemaName();
@@ -369,14 +364,16 @@ private void setResultSetMetadata() throws Exception {
Properties connectionProperties = new Properties();
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
- try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ connectionString, connectionProperties, getErrorDetailsProvider())) {
executeInitQueries(connection, dbSinkConfig.getInitQueries());
- try (Statement statement = connection.createStatement();
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getErrorDetailsProvider());
// Run a query against the DB table that returns 0 records, but returns valid ResultSetMetadata
// that can be used to construct DBRecord objects to sink to the database table.
- ResultSet rs = statement.executeQuery(String.format("SELECT %s FROM %s WHERE 1 = 0",
- dbColumns, fullyQualifiedTableName))
- ) {
+ ResultSet rs = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, statement,
+ String.format("SELECT %s FROM %s WHERE 1 = 0", dbColumns, fullyQualifiedTableName),
+ getErrorDetailsProvider())) {
columnTypes.addAll(getMatchedColumnTypeList(rs, columns));
}
}
@@ -438,32 +435,31 @@ private void validateSchema(FailureCollector collector, Class extends Driver>
Properties connectionProperties = new Properties();
connectionProperties.putAll(dbSinkConfig.getConnectionArguments());
- try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ connectionString, connectionProperties, getErrorDetailsProvider())) {
executeInitQueries(connection, dbSinkConfig.getInitQueries());
try (ResultSet tables = connection.getMetaData().getTables(null, dbSchemaName, tableName, null)) {
if (!tables.next()) {
- collector.addFailure(
- String.format("Table '%s' does not exist.", tableName),
- String.format("Ensure table '%s' is set correctly and that the connection string '%s' " +
- "points to a valid database.", fullyQualifiedTableName, connectionString))
+ collector.addFailure(String.format("Table '%s' does not exist.", tableName),
+ String.format("Ensure table '%s' is set correctly and that the connection string '%s' " +
+ "points to a valid database.", fullyQualifiedTableName, connectionString))
.withConfigProperty(DBSinkConfig.TABLE_NAME);
return;
}
}
setColumnsInfo(inputSchema.getFields());
- try (PreparedStatement pStmt = connection.prepareStatement(String.format("SELECT %s FROM %s WHERE 1 = 0",
- dbColumns,
- fullyQualifiedTableName));
- ResultSet rs = pStmt.executeQuery()) {
+ try (PreparedStatement pStmt = RetryUtils.prepareStatementWithRetry((RetryPolicy) retryPolicy,
+ connection, String.format("SELECT %s FROM %s WHERE 1 = 0", dbColumns, fullyQualifiedTableName),
+ getErrorDetailsProvider());
+ ResultSet rs = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, pStmt,
+ getErrorDetailsProvider())) {
getFieldsValidator().validateFields(inputSchema, rs, collector);
}
} catch (SQLException e) {
- LOG.error("Exception while trying to validate schema of database table {} for connection {}.",
- fullyQualifiedTableName, connectionString, e);
collector.addFailure(
- String.format("Exception while trying to validate schema of database table '%s' for connection '%s' with %s",
- fullyQualifiedTableName, connectionString, e.getMessage()),
- null).withStacktrace(e.getStackTrace());
+ String.format("Exception while trying to validate schema of database table '%s' for connection '%s' with %s. "
+ + "[SQLState='%s', ErrorCode='%s']", fullyQualifiedTableName, connectionString, e.getMessage(),
+ e.getSQLState(), e.getErrorCode()), null).withStacktrace(e.getStackTrace());
}
}
@@ -486,8 +482,9 @@ protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
private void executeInitQueries(Connection connection, List initQueries) throws SQLException {
for (String query : initQueries) {
- try (Statement statement = connection.createStatement()) {
- statement.execute(query);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getErrorDetailsProvider())) {
+ RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, getErrorDetailsProvider());
}
}
}
diff --git a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
index 54d1e2ab6..908d27eb9 100644
--- a/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
+++ b/database-commons/src/main/java/io/cdap/plugin/db/source/AbstractDBSource.java
@@ -18,6 +18,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
+import dev.failsafe.RetryPolicy;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
@@ -25,10 +26,6 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
-import io.cdap.cdap.api.exception.ErrorCategory;
-import io.cdap.cdap.api.exception.ErrorCodeType;
-import io.cdap.cdap.api.exception.ErrorType;
-import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.plugin.PluginConfig;
import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.FailureCollector;
@@ -42,6 +39,7 @@
import io.cdap.plugin.common.ReferenceBatchSource;
import io.cdap.plugin.common.ReferencePluginConfig;
import io.cdap.plugin.common.SourceInputFormatProvider;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
import io.cdap.plugin.db.CommonSchemaReader;
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.db.ConnectionConfigAccessor;
@@ -52,6 +50,8 @@
import io.cdap.plugin.db.config.DatabaseSourceConfig;
import io.cdap.plugin.util.DBUtils;
import io.cdap.plugin.util.DriverCleanup;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import io.cdap.plugin.util.RetryUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
@@ -62,7 +62,6 @@
import java.io.IOException;
import java.sql.Connection;
import java.sql.Driver;
-import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -87,13 +86,16 @@ public abstract class AbstractDBSource retryPolicy;
+ protected DBErrorDetailsProvider dbErrorDetailsProvider;
protected final T sourceConfig;
protected Class extends Driver> driverClass;
public AbstractDBSource(T sourceConfig) {
super(new ReferencePluginConfig(sourceConfig.getReferenceName()));
this.sourceConfig = sourceConfig;
+ this.retryPolicy = RetryPolicyUtil.getRetryPolicy(sourceConfig.getInitialRetryDuration(),
+ sourceConfig.getMaxRetryDuration(), sourceConfig.getMaxRetryCount());
}
@Override
@@ -137,7 +139,6 @@ public Schema getSchema(Class extends Driver> driverClass) throws IllegalAcces
SQLException, InstantiationException {
DriverCleanup driverCleanup;
try {
-
driverCleanup = loadPluginClassAndGetDriver(driverClass);
try {
return getSchema();
@@ -168,13 +169,14 @@ public Schema getSchema() throws SQLException {
}
private Schema loadSchemaFromDB(Connection connection, String query) throws SQLException {
- Statement statement = connection.createStatement();
- statement.setMaxRows(1);
- if (query.contains("$CONDITIONS")) {
- query = removeConditionsClause(query);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getErrorDetailsProvider())) {
+ statement.setMaxRows(1);
+ String finalQuery = query.contains("$CONDITIONS") ? removeConditionsClause(query) : query;
+ ResultSet resultSet = RetryUtils.executeQueryWithRetry((RetryPolicy) retryPolicy, statement,
+ finalQuery, getErrorDetailsProvider());
+ return Schema.recordOf("outputSchema", getSchemaReader().getSchemaFields(resultSet));
}
- ResultSet resultSet = statement.executeQuery(query);
- return Schema.recordOf("outputSchema", getSchemaReader().getSchemaFields(resultSet));
}
@VisibleForTesting
@@ -194,28 +196,10 @@ private Schema loadSchemaFromDB(Class extends Driver> driverClass)
Properties connectionProperties = new Properties();
connectionProperties.putAll(sourceConfig.getConnectionArguments());
- try (Connection connection = DriverManager.getConnection(connectionString, connectionProperties)) {
+ try (Connection connection = RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ connectionString, connectionProperties, getErrorDetailsProvider())) {
executeInitQueries(connection, sourceConfig.getInitQueries());
return loadSchemaFromDB(connection, sourceConfig.getImportQuery());
-
- } catch (SQLException e) {
- // wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc driver in classpath
- String errorMessage =
- String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].", e.getMessage(),
- e.getSQLState(), e.getErrorCode());
- String errorMessageWithDetails = String.format("Error occurred while trying to get schema from database." +
- "Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(), e.getErrorCode(), e.getSQLState());
- String externalDocumentationLink = getExternalDocumentationLink();
- if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
- if (!errorMessage.endsWith(".")) {
- errorMessage = errorMessage + ".";
- }
- errorMessage = String.format("%s For more details, see %s", errorMessage, externalDocumentationLink);
- }
- throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
- errorMessage, errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE,
- e.getSQLState(), externalDocumentationLink, new SQLException(e.getMessage(),
- e.getSQLState(), e.getErrorCode()));
} finally {
driverCleanup.destroy();
}
@@ -223,8 +207,9 @@ private Schema loadSchemaFromDB(Class extends Driver> driverClass)
private void executeInitQueries(Connection connection, List initQueries) throws SQLException {
for (String query : initQueries) {
- try (Statement statement = connection.createStatement()) {
- statement.execute(query);
+ try (Statement statement = RetryUtils.createStatementWithRetry((RetryPolicy) retryPolicy, connection,
+ getErrorDetailsProvider())) {
+ RetryUtils.executeInitQueryWithRetry(retryPolicy, statement, query, getErrorDetailsProvider());
}
}
}
@@ -243,6 +228,19 @@ protected String getErrorDetailsProviderClassName() {
return null;
}
+ /**
+ * Returns the DBErrorDetailsProvider instance.
+ * Override this method to provide a custom DBErrorDetailsProvider instance.
+ *
+ * @return DBErrorDetailsProvider instance
+ */
+ protected DBErrorDetailsProvider getErrorDetailsProvider() {
+ if (dbErrorDetailsProvider == null) {
+ dbErrorDetailsProvider = new DBErrorDetailsProvider();
+ }
+ return dbErrorDetailsProvider;
+ }
+
private DriverCleanup loadPluginClassAndGetDriver(Class extends Driver> driverClass)
throws IllegalAccessException, InstantiationException, SQLException {
@@ -262,11 +260,12 @@ private DriverCleanup loadPluginClassAndGetDriver(Class extends Driver> driver
}
}
- private Connection getConnection() throws SQLException {
+ private Connection getConnection() {
String connectionString = createConnectionString();
Properties connectionProperties = new Properties();
connectionProperties.putAll(sourceConfig.getConnectionArguments());
- return DriverManager.getConnection(connectionString, connectionProperties);
+ return RetryUtils.createConnectionWithRetry((RetryPolicy) retryPolicy,
+ connectionString, connectionProperties, getErrorDetailsProvider());
}
@Override
@@ -376,16 +375,6 @@ protected Class extends DBWritable> getDBRecordType() {
return DBRecord.class;
}
- /**
- * Returns the external documentation link.
- * Override this method to provide a custom external documentation link.
- *
- * @return external documentation link
- */
- protected String getExternalDocumentationLink() {
- return null;
- }
-
@Override
public void initialize(BatchRuntimeContext context) throws Exception {
super.initialize(context);
diff --git a/database-commons/src/main/java/io/cdap/plugin/util/RetryPolicyUtil.java b/database-commons/src/main/java/io/cdap/plugin/util/RetryPolicyUtil.java
new file mode 100644
index 000000000..91d6bd6ba
--- /dev/null
+++ b/database-commons/src/main/java/io/cdap/plugin/util/RetryPolicyUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package io.cdap.plugin.util;
+
+import dev.failsafe.RetryPolicy;
+import io.cdap.cdap.api.Config;
+import io.cdap.plugin.db.RetryExceptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+/**
+ * Utility class for creating standardized {@link dev.failsafe.RetryPolicy} configurations
+ * to handle transient SQL exceptions using the Failsafe library.
+ */
+public class RetryPolicyUtil extends Config {
+ public static final Logger LOG = LoggerFactory.getLogger(RetryPolicyUtil.class);
+
+ /**
+ * Create a RetryPolicy using custom config values.
+ */
+ public static RetryPolicy getRetryPolicy(Integer initialRetryDuration,
+ Integer maxRetryDuration, Integer maxRetryCount) {
+ return RetryPolicy.builder()
+ .handleIf((failure) -> RetryExceptions.isRetryable(failure))
+ .withBackoff(Duration.ofSeconds(initialRetryDuration), Duration.ofSeconds(maxRetryDuration))
+ .withMaxRetries(maxRetryCount)
+ .onRetry(e -> LOG.debug("Retrying... Attempt {}",
+ e.getAttemptCount()))
+ .onFailedAttempt(e -> LOG.debug("Failed Attempt : {}", e.getLastException()))
+ .onFailure(e -> LOG.debug("Failed after retries." +
+ " Reason: {}",
+ e.getException() != null ? e.getException().getMessage() : "Unknown error"))
+ .build();
+ }
+}
diff --git a/database-commons/src/main/java/io/cdap/plugin/util/RetryUtils.java b/database-commons/src/main/java/io/cdap/plugin/util/RetryUtils.java
new file mode 100644
index 000000000..433978358
--- /dev/null
+++ b/database-commons/src/main/java/io/cdap/plugin/util/RetryUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.util;
+
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeException;
+import dev.failsafe.RetryPolicy;
+import io.cdap.plugin.common.db.DBErrorDetailsProvider;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+
+/**
+ * Utility class for retrieving common methods using {@link dev.failsafe.RetryPolicy}
+ */
+public final class RetryUtils {
+
+ public static final String NAME_INITIAL_RETRY_DURATION = "initialRetryDuration";
+ public static final String NAME_MAX_RETRY_DURATION = "maxRetryDuration";
+ public static final String NAME_MAX_RETRY_COUNT = "maxRetryCount";
+ public static final int DEFAULT_INITIAL_RETRY_DURATION_SECONDS = 5;
+ public static final int DEFAULT_MAX_RETRY_COUNT = 5;
+ public static final int DEFAULT_MAX_RETRY_DURATION_SECONDS = 80;
+
+ public static Connection createConnectionWithRetry(RetryPolicy retryPolicy, String connectionString,
+ Properties connectionProperties, DBErrorDetailsProvider dbErrorDetailsProvider) {
+ try {
+ return Failsafe.with(retryPolicy).get(() -> DriverManager
+ .getConnection(connectionString, connectionProperties)
+ );
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, dbErrorDetailsProvider);
+ }
+ }
+
+ public static Statement createStatementWithRetry(RetryPolicy retryPolicy, Connection connection,
+ DBErrorDetailsProvider dbErrorDetailsProvider) {
+ try {
+ return Failsafe.with(retryPolicy).get(connection::createStatement);
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, dbErrorDetailsProvider);
+ }
+ }
+
+ public static PreparedStatement prepareStatementWithRetry(RetryPolicy retryPolicy,
+ Connection connection, String sqlQuery, DBErrorDetailsProvider dbErrorDetailsProvider) {
+ try {
+ return Failsafe.with(retryPolicy).get(() -> connection.prepareStatement(sqlQuery));
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, dbErrorDetailsProvider);
+ }
+ }
+
+ public static ResultSet executeQueryWithRetry(RetryPolicy retryPolicy,
+ PreparedStatement preparedStatement, DBErrorDetailsProvider dbErrorDetailsProvider) {
+ try {
+ return Failsafe.with(retryPolicy).get(() -> preparedStatement.executeQuery());
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, dbErrorDetailsProvider);
+ }
+ }
+
+ public static ResultSet executeQueryWithRetry(RetryPolicy retryPolicy, Statement statement,
+ String query, DBErrorDetailsProvider dbErrorDetailsProvider) {
+ try {
+ return Failsafe.with(retryPolicy).get(() -> statement.executeQuery(query));
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, dbErrorDetailsProvider);
+ }
+ }
+
+ public static void executeInitQueryWithRetry(RetryPolicy> retryPolicy, Statement statement, String query,
+ DBErrorDetailsProvider dbErrorDetailsProvider) {
+ try {
+ Failsafe.with(retryPolicy).run(() -> statement.execute(query));
+ } catch (Exception e) {
+ throw unwrapFailsafeException(e, dbErrorDetailsProvider);
+ }
+ }
+
+ private static RuntimeException unwrapFailsafeException(Exception e,
+ DBErrorDetailsProvider dbErrorDetailsProvider) {
+ if (e instanceof FailsafeException) {
+ Throwable cause = e.getCause();
+ if (cause instanceof SQLException) {
+ return dbErrorDetailsProvider.getProgramFailureException((SQLException) cause, null);
+ } else if (cause instanceof RuntimeException) {
+ return (RuntimeException) cause;
+ } else if (cause instanceof Error) {
+ return new RuntimeException("Operation failed with error", cause);
+ } else {
+ return new RuntimeException("Operation failed", cause);
+ }
+ }
+ if (e instanceof SQLException) {
+ return dbErrorDetailsProvider.getProgramFailureException((SQLException) e, null);
+ }
+ if (e instanceof RuntimeException) {
+ return (RuntimeException) e;
+ }
+ return new RuntimeException("Unexpected checked exception", e);
+ }
+}
diff --git a/database-commons/src/test/java/io/cdap/plugin/db/RetryPolicyUtilTest.java b/database-commons/src/test/java/io/cdap/plugin/db/RetryPolicyUtilTest.java
new file mode 100644
index 000000000..fa330de4e
--- /dev/null
+++ b/database-commons/src/test/java/io/cdap/plugin/db/RetryPolicyUtilTest.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright © 2025 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.db;
+
+import dev.failsafe.Failsafe;
+import dev.failsafe.FailsafeException;
+import dev.failsafe.RetryPolicy;
+import io.cdap.plugin.db.connector.AbstractDBConnectorConfig;
+import io.cdap.plugin.util.RetryPolicyUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.SQLSyntaxErrorException;
+import java.sql.SQLTransientConnectionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RetryPolicyUtilTest {
+
+ private AbstractDBConnectorConfig mockConfig;
+
+ @Before
+ public void setup() {
+ mockConfig = mock(AbstractDBConnectorConfig.class);
+ when(mockConfig.getInitialRetryDuration()).thenReturn(5);
+ when(mockConfig.getMaxRetryDuration()).thenReturn(10);
+ when(mockConfig.getMaxRetryCount()).thenReturn(2);
+ }
+
+ @Test
+ public void testCreateConnectionRetryPolicy_Retryable() {
+ RetryPolicy