diff --git a/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java b/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java index a5a9fcf5f..b187c7670 100644 --- a/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java +++ b/database-commons/src/main/java/io/cdap/plugin/db/DBRecord.java @@ -314,10 +314,7 @@ protected void upsertOperation(PreparedStatement stmt) throws SQLException { } private boolean fillUpdateParams(List updatedKeyList, ColumnType columnType) { - if (operationName.equals(Operation.UPDATE) && updatedKeyList.contains(columnType.getName())) { - return true; - } - return false; + return operationName.equals(Operation.UPDATE) && updatedKeyList.contains(columnType.getName()); } private Schema getNonNullableSchema(Schema.Field field) { diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java new file mode 100644 index 000000000..4a7b18278 --- /dev/null +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleETLDBOutputFormat.java @@ -0,0 +1,126 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import io.cdap.plugin.db.sink.ETLDBOutputFormat; + +/** + * Class that extends {@link ETLDBOutputFormat} to implement the abstract methods + */ +public class OracleETLDBOutputFormat extends ETLDBOutputFormat { + + /** + * This method is used to construct the upsert query for Oracle using MERGE statement. + * Example - MERGE INTO my_table target + * USING (SELECT ? AS id, ? AS name, ? AS age FROM dual) source + * ON (target.id = source.id) + * WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age + * WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source.id, source.name, source.age) + * @param table - Name of the table + * @param fieldNames - All the columns of the table + * @param listKeys - The columns used as keys for matching + * @return Upsert query in the form of string + */ + @Override + public String constructUpsertQuery(String table, String[] fieldNames, String[] listKeys) { + if (listKeys == null) { + throw new IllegalArgumentException( + "'Relation Table Key' must be specified for upsert operations. " + + "Please provide the list of key columns used to match records in the target table."); + } else if (fieldNames == null) { + throw new IllegalArgumentException( + "'Field Names' must be specified for upsert operations. " + + "Please provide the list of columns to be written to the target table."); + } else { + StringBuilder query = new StringBuilder(); + + // MERGE INTO target_table target + query.append("MERGE INTO ").append(table).append(" target "); + + // USING (SELECT ? AS col1, ? AS col2, ... FROM dual) source + query.append("USING (SELECT "); + for (int i = 0; i < fieldNames.length; ++i) { + query.append("? AS ").append(fieldNames[i]); + if (i != fieldNames.length - 1) { + query.append(", "); + } + } + query.append(" FROM dual) source "); + + // ON (target.key1 = source.key1 AND target.key2 = source.key2 ...) + query.append("ON ("); + for (int i = 0; i < listKeys.length; ++i) { + query.append("target.").append(listKeys[i]).append(" = source.").append(listKeys[i]); + if (i != listKeys.length - 1) { + query.append(" AND "); + } + } + query.append(") "); + + // WHEN MATCHED THEN UPDATE SET target.col1 = source.col1, target.col2 = source.col2 ... + // Only update non-key columns + query.append("WHEN MATCHED THEN UPDATE SET "); + boolean firstUpdateColumn = true; + for (String fieldName : fieldNames) { + boolean isKeyColumn = false; + for (String listKey : listKeys) { + String listKeyNoQuote = listKey.replace("\"", ""); + if (listKeyNoQuote.equals(fieldName)) { + isKeyColumn = true; + break; + } + } + if (!isKeyColumn) { + if (!firstUpdateColumn) { + query.append(", "); + } + query.append("target.").append(fieldName).append(" = source.").append(fieldName); + firstUpdateColumn = false; + } + } + + // WHEN NOT MATCHED THEN INSERT (col1, col2, ...) VALUES (source.col1, source.col2, ...) + query.append(" WHEN NOT MATCHED THEN INSERT ("); + for (int i = 0; i < fieldNames.length; ++i) { + query.append(fieldNames[i]); + if (i != fieldNames.length - 1) { + query.append(", "); + } + } + query.append(") VALUES ("); + for (int i = 0; i < fieldNames.length; ++i) { + query.append("source.").append(fieldNames[i]); + if (i != fieldNames.length - 1) { + query.append(", "); + } + } + query.append(")"); + + return query.toString(); + } + } + + @Override + public String constructUpdateQuery(String table, String[] fieldNames, String[] listKeys) { + // Oracle JDBC does not accept a trailing semicolon in prepared statements. + String query = super.constructUpdateQuery(table, fieldNames, listKeys); + if (query.endsWith(";")) { + return query.substring(0, query.length() - 1); + } + return query; + } +} diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java index 40ecfbe9e..0cf6320de 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSink.java @@ -23,6 +23,7 @@ import io.cdap.cdap.api.annotation.MetadataProperty; import io.cdap.cdap.api.annotation.Name; import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.batch.Output; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.etl.api.FailureCollector; import io.cdap.cdap.etl.api.batch.BatchSink; @@ -31,6 +32,7 @@ import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.ConfigUtil; import io.cdap.plugin.common.LineageRecorder; +import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider; import io.cdap.plugin.common.db.DBErrorDetailsProvider; import io.cdap.plugin.db.DBRecord; import io.cdap.plugin.db.SchemaReader; @@ -60,7 +62,8 @@ public OracleSink(OracleSinkConfig oracleSinkConfig) { @Override protected DBRecord getDBRecord(StructuredRecord output) { - return new OracleSinkDBRecord(output, columnTypes); + return new OracleSinkDBRecord(output, columnTypes, oracleSinkConfig.getOperationName(), + oracleSinkConfig.getRelationTableKey()); } @Override @@ -72,6 +75,13 @@ protected FieldsValidator getFieldsValidator() { protected SchemaReader getSchemaReader() { return new OracleSinkSchemaReader(); } + + @Override + protected void addOutputContext(BatchSinkContext context) { + context.addOutput(Output.of(oracleSinkConfig.getReferenceName(), + new SinkOutputFormatProvider(OracleETLDBOutputFormat.class, getConfiguration()))); + } + @Override protected LineageRecorder getLineageRecorder(BatchSinkContext context) { String fqn = DBUtils.constructFQN("oracle", diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java index 01b9a8247..ee77a7fc9 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSinkDBRecord.java @@ -19,6 +19,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.plugin.db.ColumnType; +import io.cdap.plugin.db.Operation; import io.cdap.plugin.db.SchemaReader; import java.sql.PreparedStatement; @@ -31,9 +32,12 @@ */ public class OracleSinkDBRecord extends OracleSourceDBRecord { - public OracleSinkDBRecord(StructuredRecord record, List columnTypes) { + public OracleSinkDBRecord(StructuredRecord record, List columnTypes, Operation operationName, + String relationTableKey) { this.record = record; this.columnTypes = columnTypes; + this.operationName = operationName; + this.relationTableKey = relationTableKey; } @Override @@ -50,4 +54,13 @@ protected void insertOperation(PreparedStatement stmt) throws SQLException { writeToDB(stmt, field, fieldIndex); } } + + @Override + protected void upsertOperation(PreparedStatement stmt) throws SQLException { + for (int fieldIndex = 0; fieldIndex < columnTypes.size(); fieldIndex++) { + ColumnType columnType = columnTypes.get(fieldIndex); + Schema.Field field = record.getSchema().getField(columnType.getName()); + writeToDB(stmt, field, fieldIndex); + } + } } diff --git a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java index 7d7c69d2b..79e43d5d7 100644 --- a/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java +++ b/oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceDBRecord.java @@ -19,11 +19,17 @@ import com.google.common.io.ByteStreams; import io.cdap.cdap.api.common.Bytes; import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.format.StructuredRecord.Builder; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.data.schema.Schema.Field; +import io.cdap.cdap.api.data.schema.Schema.LogicalType; +import io.cdap.cdap.api.data.schema.Schema.Type; import io.cdap.cdap.etl.api.validation.InvalidStageException; import io.cdap.plugin.db.ColumnType; import io.cdap.plugin.db.DBRecord; import io.cdap.plugin.db.SchemaReader; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; import java.io.IOException; import java.io.InputStream; @@ -45,8 +51,8 @@ import java.util.List; /** - * Oracle Source implementation {@link org.apache.hadoop.mapreduce.lib.db.DBWritable} and - * {@link org.apache.hadoop.io.Writable}. + * Oracle Source implementation {@link DBWritable} and + * {@link Writable}. */ public class OracleSourceDBRecord extends DBRecord { @@ -77,11 +83,11 @@ protected SchemaReader getSchemaReader() { public void readFields(ResultSet resultSet) throws SQLException { Schema schema = getSchema(); ResultSetMetaData metadata = resultSet.getMetaData(); - StructuredRecord.Builder recordBuilder = StructuredRecord.builder(schema); + Builder recordBuilder = StructuredRecord.builder(schema); // All LONG or LONG RAW columns have to be retrieved from the ResultSet prior to all the other columns. // Otherwise, we will face java.sql.SQLException: Stream has already been closed - for (Schema.Field field : schema.getFields()) { + for (Field field : schema.getFields()) { // Index of a field in the schema may not be same in the ResultSet, // hence find the field by name in the given resultSet int columnIndex = resultSet.findColumn(field.getName()); @@ -91,7 +97,7 @@ public void readFields(ResultSet resultSet) throws SQLException { } // Read fields of other types - for (Schema.Field field : schema.getFields()) { + for (Field field : schema.getFields()) { // Index of a field in the schema may not be same in the ResultSet, // hence find the field by name in the given resultSet int columnIndex = resultSet.findColumn(field.getName()); @@ -104,7 +110,7 @@ record = recordBuilder.build(); } @Override - protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, + protected void handleField(ResultSet resultSet, Builder recordBuilder, Field field, int columnIndex, int sqlType, int sqlPrecision, int sqlScale) throws SQLException { if (OracleSourceSchemaReader.ORACLE_TYPES.contains(sqlType) || sqlType == Types.NCLOB) { handleOracleSpecificType(resultSet, recordBuilder, field, columnIndex, sqlType, sqlPrecision, sqlScale); @@ -116,8 +122,14 @@ protected void handleField(ResultSet resultSet, StructuredRecord.Builder recordB @Override protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema, String fieldName, int fieldIndex) throws SQLException { - int sqlType = columnTypes.get(fieldIndex).getType(); int sqlIndex = fieldIndex + 1; + ColumnType foundColumnType = columnTypes.stream() + .filter(columnType -> columnType.getName().equals(fieldName)) + .findFirst() // Efficiently gets the first match wrapped in an Optional + .orElseThrow(() -> new IllegalArgumentException( + String.format("Unable to find the column type for field '%s'", fieldName))); + + int sqlType = foundColumnType.getType(); // TIMESTAMP and TIMESTAMPTZ types needs to be handled using the specific oracle types to ensure that the data // inserted matches with the provided value. As Oracle driver internally alters the values provided @@ -126,7 +138,7 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema, // More details here : https://docs.oracle.com/cd/E13222_01/wls/docs91/jdbc_drivers/oracle.html // Handle the case when TimestampTZ type is set to CDAP String type or Timestamp type if (sqlType == OracleSourceSchemaReader.TIMESTAMP_TZ) { - if (Schema.Type.STRING.equals(fieldSchema.getType())) { + if (Type.STRING.equals(fieldSchema.getType())) { // Deprecated: Handle the case when the TimestampTZ is mapped to CDAP String type String timestampString = record.get(fieldName); Object timestampTZ = createOracleTimestampWithTimeZone(stmt.getConnection(), timestampString); @@ -140,13 +152,13 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema, stmt.setObject(sqlIndex, timestampWithTimeZone); } } else if (sqlType == OracleSourceSchemaReader.TIMESTAMP_LTZ) { - if (Schema.LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType())) { + if (LogicalType.TIMESTAMP_MICROS.equals(fieldSchema.getLogicalType())) { // Deprecated: Handle the case when the TimestampLTZ is mapped to CDAP Timestamp type ZonedDateTime timestamp = record.getTimestamp(fieldName); String timestampString = Timestamp.valueOf(timestamp.toLocalDateTime()).toString(); Object timestampWithTimeZone = createOracleTimestampWithLocalTimeZone(stmt.getConnection(), timestampString); stmt.setObject(sqlIndex, timestampWithTimeZone); - } else if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { + } else if (LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { // Handle the case when the TimestampLTZ is mapped to CDAP Datetime type LocalDateTime localDateTime = record.getDateTime(fieldName); String timestampString = Timestamp.valueOf(localDateTime).toString(); @@ -154,7 +166,7 @@ protected void writeNonNullToDB(PreparedStatement stmt, Schema fieldSchema, stmt.setObject(sqlIndex, timestampWithTimeZone); } } else if (sqlType == Types.TIMESTAMP) { - if (Schema.LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { + if (LogicalType.DATETIME.equals(fieldSchema.getLogicalType())) { // Handle the case when Timestamp is mapped to CDAP Datetime type. LocalDateTime localDateTime = record.getDateTime(fieldName); String timestampString = Timestamp.valueOf(localDateTime).toString(); @@ -257,7 +269,7 @@ private byte[] getBfileBytes(ResultSet resultSet, String columnName) throws SQLE } } - private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Builder recordBuilder, Schema.Field field, + private void handleOracleSpecificType(ResultSet resultSet, Builder recordBuilder, Field field, int columnIndex, int sqlType, int precision, int scale) throws SQLException { Schema nonNullSchema = field.getSchema().isNullable() ? @@ -270,7 +282,7 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil recordBuilder.set(field.getName(), resultSet.getString(columnIndex)); break; case OracleSourceSchemaReader.TIMESTAMP_TZ: - if (Schema.Type.STRING.equals(nonNullSchema.getType())) { + if (Type.STRING.equals(nonNullSchema.getType())) { recordBuilder.set(field.getName(), resultSet.getString(columnIndex)); } else { // In case of TimestampTZ datatype the getTimestamp(index, Calendar) method call does not @@ -298,7 +310,7 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil case Types.TIMESTAMP: // Since Oracle Timestamp type does not have any timezone information, it should be converted into the // CDAP Datetime type. - if (Schema.LogicalType.DATETIME.equals(nonNullSchema.getLogicalType())) { + if (LogicalType.DATETIME.equals(nonNullSchema.getLogicalType())) { Timestamp timestamp = resultSet.getTimestamp(columnIndex); if (timestamp != null) { recordBuilder.setDateTime(field.getName(), timestamp.toLocalDateTime()); @@ -315,7 +327,7 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil // super.setField sets this '0000-12-31 09:00:00.000Z[UTC]' in the recordBuilder which is incorrect and the // correct value should be '0001-01-01 09:00:00.000Z[UTC]'. Object timeStampObj = resultSet.getObject(columnIndex); - if (Schema.LogicalType.DATETIME.equals(nonNullSchema.getLogicalType())) { + if (LogicalType.DATETIME.equals(nonNullSchema.getLogicalType())) { Timestamp timestampLTZ = resultSet.getTimestamp(columnIndex); if (timestampLTZ != null) { recordBuilder.setDateTime(field.getName(), @@ -351,7 +363,7 @@ private void handleOracleSpecificType(ResultSet resultSet, StructuredRecord.Buil if (precision == 0) { Schema nonNullableSchema = field.getSchema().isNullable() ? field.getSchema().getNonNullable() : field.getSchema(); - if (Schema.LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType())) { + if (LogicalType.DECIMAL.equals(nonNullableSchema.getLogicalType())) { // Handle the field using the schema set in the output schema BigDecimal decimal = resultSet.getBigDecimal(columnIndex, getScale(field.getSchema())); recordBuilder.setDecimal(field.getName(), decimal); @@ -382,8 +394,8 @@ private boolean isLongOrLongRaw(int columnType) { return columnType == OracleSourceSchemaReader.LONG || columnType == OracleSourceSchemaReader.LONG_RAW; } - private void readField(int columnIndex, ResultSetMetaData metadata, ResultSet resultSet, Schema.Field field, - StructuredRecord.Builder recordBuilder) throws SQLException { + private void readField(int columnIndex, ResultSetMetaData metadata, ResultSet resultSet, Field field, + Builder recordBuilder) throws SQLException { int sqlType = metadata.getColumnType(columnIndex); int sqlPrecision = metadata.getPrecision(columnIndex); int sqlScale = metadata.getScale(columnIndex); diff --git a/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleETLDBOutputFormatTest.java b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleETLDBOutputFormatTest.java new file mode 100644 index 000000000..994c86b55 --- /dev/null +++ b/oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleETLDBOutputFormatTest.java @@ -0,0 +1,129 @@ +/* + * Copyright © 2026 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.oracle; + +import org.junit.Assert; +import org.junit.Test; + +public class OracleETLDBOutputFormatTest { + + private final OracleETLDBOutputFormat outputFormat = new OracleETLDBOutputFormat(); + + @Test + public void testConstructUpsertQueryBasic() { + String[] fieldNames = {"id", "name", "age"}; + String[] listKeys = {"id"}; + String table = "my_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO my_table target " + + "USING (SELECT ? AS id, ? AS name, ? AS age FROM dual) source " + + "ON (target.id = source.id) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name, target.age = source.age " + + "WHEN NOT MATCHED THEN INSERT (id, name, age) VALUES (source.id, source.name, source.age)"; + + Assert.assertEquals(expected, result); + } + + @Test + public void testConstructUpsertQueryMultipleKeys() { + String[] fieldNames = {"id", "code", "name", "value"}; + String[] listKeys = {"id", "code"}; + String table = "composite_key_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO composite_key_table target " + + "USING (SELECT ? AS id, ? AS code, ? AS name, ? AS value FROM dual) source " + + "ON (target.id = source.id AND target.code = source.code) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name, target.value = source.value " + + "WHEN NOT MATCHED THEN INSERT (id, code, name, value) VALUES (source.id, source.code, source.name, source" + + ".value)"; + + Assert.assertEquals(expected, result); + } + + @Test + public void testConstructUpsertQuerySingleField() { + String[] fieldNames = {"id", "name"}; + String[] listKeys = {"id"}; + String table = "single_field_update_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO single_field_update_table target " + + "USING (SELECT ? AS id, ? AS name FROM dual) source " + + "ON (target.id = source.id) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name " + + "WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name)"; + + Assert.assertEquals(expected, result); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructUpsertQueryNullListKeys() { + String[] fieldNames = {"id", "name", "age"}; + String table = "my_table"; + + outputFormat.constructUpsertQuery(table, fieldNames, null); + } + + @Test(expected = IllegalArgumentException.class) + public void testConstructUpsertQueryNullFieldNames() { + String[] listKeys = {"id"}; + String table = "my_table"; + + outputFormat.constructUpsertQuery(table, null, listKeys); + } + + @Test + public void testConstructUpsertQueryAllFieldsAreKeys() { + String[] fieldNames = {"id", "code"}; + String[] listKeys = {"id", "code"}; + String table = "all_keys_table"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + // When all fields are keys, the UPDATE SET clause will be empty after "SET " + // Note: There's an extra space before "WHEN NOT MATCHED" due to implementation + String expected = "MERGE INTO all_keys_table target " + + "USING (SELECT ? AS id, ? AS code FROM dual) source " + + "ON (target.id = source.id AND target.code = source.code) " + + "WHEN MATCHED THEN UPDATE SET " + + "WHEN NOT MATCHED THEN INSERT (id, code) VALUES (source.id, source.code)"; + + Assert.assertEquals(expected, result); + } + + @Test + public void testConstructUpsertQueryWithSpecialTableName() { + String[] fieldNames = {"id", "name"}; + String[] listKeys = {"id"}; + String table = "SCHEMA.MY_TABLE"; + + String result = outputFormat.constructUpsertQuery(table, fieldNames, listKeys); + + String expected = "MERGE INTO SCHEMA.MY_TABLE target " + + "USING (SELECT ? AS id, ? AS name FROM dual) source " + + "ON (target.id = source.id) " + + "WHEN MATCHED THEN UPDATE SET target.name = source.name " + + "WHEN NOT MATCHED THEN INSERT (id, name) VALUES (source.id, source.name)"; + + Assert.assertEquals(expected, result); + } +} diff --git a/oracle-plugin/widgets/Oracle-batchsink.json b/oracle-plugin/widgets/Oracle-batchsink.json index 8d8fc79a2..d48a1cb1c 100644 --- a/oracle-plugin/widgets/Oracle-batchsink.json +++ b/oracle-plugin/widgets/Oracle-batchsink.json @@ -192,6 +192,29 @@ "label": "Schema Name", "name": "dbSchemaName" }, + { + "widget-type": "radio-group", + "label": "Operation Name", + "name": "operationName", + "widget-attributes": { + "default": "insert", + "layout": "inline", + "options": [ + { + "id": "insert", + "label": "INSERT" + }, + { + "id": "update", + "label": "UPDATE" + }, + { + "id": "upsert", + "label": "UPSERT" + } + ] + } + }, { "widget-type": "hidden", "label": "Operation Name", @@ -201,9 +224,10 @@ } }, { - "widget-type": "hidden", + "name": "relationTableKey", + "widget-type": "csv", "label": "Table Key", - "name": "relationTableKey" + "widget-attributes": {} } ] },