diff --git a/docs/en/connector-v2/sink/Druid.md b/docs/en/connector-v2/sink/Druid.md index cbf290b2c37..e591a19488e 100644 --- a/docs/en/connector-v2/sink/Druid.md +++ b/docs/en/connector-v2/sink/Druid.md @@ -4,37 +4,47 @@ ## Description -Write data to Apache Druid. +Used to write data to Druid. +## Key features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) + +The Druid sink plug-in can achieve accuracy once by implementing idempotent writing, and needs to cooperate with aggregatingmergetree and other engines that support deduplication. + +- [ ] [schema projection](../../concept/connector-v2-features.md) + + +::: ## Options | name | type | required | default value | | ----------------------- | -------- | -------- | ------------- | -| coordinator_url | `String` | yes | - | -| datasource | `String` | yes | - | -| columns | `List` | yes| __time | -| timestamp_column | `String` | no | timestamp | -| timestamp_format | `String` | no | auto | -| timestamp_missing_value | `String` | no | - | +| coordinator_url | string | yes | - | +| datasource | string | yes | - | +| columns | List | yes| - | +| timestamp_column | string | no | timestamp | +| timestamp_format | string | no | auto | +| timestamp_missing_value | string| no | - | -### coordinator_url [`String`] +### coordinator_url [string] -The URL of Coordinator service in Apache Druid. +`Druid` cluster coordinator address, the format is `host:port` ,Such as `"host:8081"` . -### datasource [`String`] +### datasource [string] -The DataSource name in Apache Druid. +The `Druid` datasource -### columns [`List`] +### columns [array] -These columns that you want to write of Druid. +The data field that needs to be output to `Druid` , if not configured, it will be automatically adapted according to the sink table `schema` . -### timestamp_column [`String`] +### timestamp_column [string] The timestamp column name in Apache Druid, the default value is `timestamp`. -### timestamp_format [`String`] +### timestamp_format [string] The timestamp format in Apache Druid, the default value is `auto`, it could be: @@ -58,12 +68,10 @@ The timestamp format in Apache Druid, the default value is `auto`, it could be: - any [Joda DateTimeFormat](http://joda-time.sourceforge.net/apidocs/org/joda/time/format/DateTimeFormat.html) string -### timestamp_missing_value [`String`] +### timestamp_missing_value [string] The timestamp missing value in Apache Druid, which is used for input records that have a null or missing timestamp. The value of `timestamp_missing_value` should be in ISO 8601 format, for example `"2022-02-02T02:02:02.222"`. -## Example - ### Simple ```hocon @@ -97,5 +105,4 @@ DruidSink { timestamp_missing_value = "2022-02-02T02:02:02.222" columns = ["flags","page"] } -``` - +``` \ No newline at end of file diff --git a/docs/en/connector-v2/source/Druid.md b/docs/en/connector-v2/source/Druid.md index 4343fd31aa0..aa7732dd931 100644 --- a/docs/en/connector-v2/source/Druid.md +++ b/docs/en/connector-v2/source/Druid.md @@ -6,33 +6,44 @@ Read data from Apache Druid. +## Key features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [schema projection](../../concept/connector-v2-features.md) + +:::tip + +Reading data from Druid can also be done using JDBC + ## Options | name | type | required | default value | | ---------- | -------------- | -------- | ------------- | -| url | `String` | yes | - | -| datasource | `String` | yes | - | -| start_date | `String` | no | - | -| end_date | `String` | no | - | -| columns | `List` | no | `*` | +| url | string | yes | - | +| datasource | string | yes | - | +| start_date | string | no | - | +| end_date | string | no | - | +| columns | List | no | * | -### url [`String`] +### url [string] -The URL of JDBC of Apache Druid. +`Druid` cluster broker address, the format is `jdbc:avatica:remote:url=http://host:port/druid/v2/sql/avatica/` ,Such as `"jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/"` . -### datasource [`String`] +### datasource [string] The DataSource name in Apache Druid. -### start_date [`String`] +### start_date [string] The start date of DataSource, for example, `'2016-06-27'`, `'2016-06-27 00:00:00'`, etc. -### end_date [`String`] +### end_date [string] The end date of DataSource, for example, `'2016-06-28'`, `'2016-06-28 00:00:00'`, etc. -### columns [`List`] +### columns [List] These columns that you want to write of DataSource. @@ -43,6 +54,15 @@ Source Plugin common parameters, refer to [Source Plugin](common-options.mdx) fo ## Example +```hocon +DruidSource { + url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/" + datasource = "wikipedia" + start_date = "2016-06-27 00:00:00" + end_date = "2016-06-28 00:00:00" +} +``` + ```hocon DruidSource { url = "jdbc:avatica:remote:url=http://localhost:8082/druid/v2/sql/avatica/" diff --git a/pom.xml b/pom.xml index 1678604b297..91f87a5c422 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,7 @@ seatunnel-formats seatunnel-dist seatunnel-server + seatunnel-engine @@ -143,9 +144,6 @@ 2.12.6 1.18.0 1.20 - 8.0.16 - 42.3.3 - 8.1.2.141 false false false @@ -164,8 +162,11 @@ 7.5.1 2.7.5-7.0 3.4 + 2.11.0 4.4 3.3.0 + 1.20 + 1.8.0 provided provided 1.13 @@ -184,7 +185,7 @@ 0.38.0 1.3.0 1.9.5 - 1.16.3 + 1.17.3 false true @@ -198,8 +199,6 @@ 1.1.8.3 3.10.0 4.2.0 - 4.5.13 - 1.2.83 1.17.0 0.23.0 2.10.14 @@ -281,6 +280,12 @@ ${jackson.version} + + org.apache.commons + commons-compress + ${commons-compress.version} + + org.testcontainers testcontainers @@ -298,12 +303,6 @@ - - org.apache.commons - commons-compress - ${commons-compress.version} - - org.apache.logging.log4j log4j-core @@ -326,6 +325,7 @@ guava ${guava.version} + org.slf4j slf4j-api @@ -358,15 +358,28 @@ - org.apache.httpcomponents - httpclient - ${httpclient.version} + commons-io + commons-io + ${commons-io.version} + + + + io.protostuff + protostuff-core + ${protostuff.version} - com.alibaba - fastjson - ${fastjson.version} + io.protostuff + protostuff-runtime + ${protostuff.version} + + + + com.google.auto.service + auto-service + ${auto-service.version} + provided diff --git a/seatunnel-connectors-v2/connector-druid/pom.xml b/seatunnel-connectors-v2/connector-druid/pom.xml index 1ab81976133..9cf9faf65d4 100644 --- a/seatunnel-connectors-v2/connector-druid/pom.xml +++ b/seatunnel-connectors-v2/connector-druid/pom.xml @@ -1,4 +1,18 @@ + @@ -13,6 +27,7 @@ 2.6.7 + 4.5.13 @@ -30,10 +45,6 @@ org.apache.httpcomponents httpclient - - com.alibaba - fastjson - joda-time joda-time @@ -50,6 +61,11 @@ org.apache.calcite.avatica avatica-core + + org.apache.httpcomponents + httpclient + ${httpclient.version} + - \ No newline at end of file + diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java index a4d7e69a00e..4848eddd4b7 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidInputFormat.java @@ -18,8 +18,6 @@ package org.apache.seatunnel.connectors.seatunnel.druid.client; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.LocalTimeType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -27,6 +25,8 @@ import org.apache.seatunnel.connectors.seatunnel.druid.config.DruidTypeMapper; import lombok.Data; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +43,6 @@ import java.time.ZoneId; import java.util.ArrayList; import java.util.Calendar; -import java.util.List; import java.util.TimeZone; @Data @@ -71,7 +70,7 @@ public ResultSetMetaData getResultSetMetaData() throws SQLException { statement = connection.prepareStatement(quarySQL, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); return statement.getMetaData(); } catch (SQLException se) { - throw new SQLException("ResultSetMetaData() failed." + se.getMessage(), se); + throw new SQLException("ResultSetMetaData() failed." + ExceptionUtils.getMessage(se), se); } } @@ -82,7 +81,7 @@ public void openInputFormat() { resultSet = statement.executeQuery(); hasNext = resultSet.next(); } catch (SQLException se) { - throw new IllegalArgumentException("openInputFormat() failed." + se.getMessage(), se); + throw new IllegalArgumentException("openInputFormat() failed." + ExceptionUtils.getMessage(se), se); } } @@ -91,7 +90,7 @@ private String getSQL() throws SQLException { String startTimestamp = druidSourceOptions.getStartTimestamp(); String endTimestamp = druidSourceOptions.getEndTimestamp(); String dataSource = druidSourceOptions.getDatasource(); - if (druidSourceOptions.getColumns() != null && druidSourceOptions.getColumns().size() > 0) { + if (CollectionUtils.isEmpty(druidSourceOptions.getColumns()) && druidSourceOptions.getColumns().size() > 0) { columns = String.join(",", druidSourceOptions.getColumns()); } String sql = String.format(QUERY_TEMPLATE, columns, dataSource); @@ -143,69 +142,76 @@ public SeaTunnelRow nextRecord() throws IOException { hasNext = resultSet.next(); return seaTunnelRow; } catch (SQLException se) { - throw new IOException("Couldn't read data - " + se.getMessage(), se); + throw new IOException("Couldn't read data - " + ExceptionUtils.getMessage(se), se); } catch (NullPointerException npe) { throw new IOException("Couldn't access resultSet", npe); } } public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType rowTypeInfo) throws SQLException { - List fields = new ArrayList<>(); SeaTunnelDataType[] seaTunnelDataTypes = rowTypeInfo.getFieldTypes(); + Object[] fields = new Object[seaTunnelDataTypes.length]; for (int i = 1; i <= seaTunnelDataTypes.length; i++) { Object seatunnelField; SeaTunnelDataType seaTunnelDataType = seaTunnelDataTypes[i - 1]; + seaTunnelDataType.getSqlType(); if (null == rs.getObject(i)) { seatunnelField = null; - } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getBoolean(i); - } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getByte(i); - } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getShort(i); - } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getInt(i); - } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getLong(i); - } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getFloat(i); - } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getDouble(i); - } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getString(i); - } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) { - Timestamp ts = rs.getTimestamp(i, Calendar.getInstance(TimeZone.getTimeZone("UTC"))); - LocalDateTime localDateTime = LocalDateTime.ofInstant(ts.toInstant(), ZoneId.of("UTC")); // good - seatunnelField = localDateTime; - } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getDate(i); - } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) { - seatunnelField = rs.getDate(i); } else { - throw new IllegalStateException("Unexpected value: " + seaTunnelDataType); + switch (seaTunnelDataType.getSqlType()) { + case BOOLEAN: + seatunnelField = rs.getBoolean(i); + break; + case BYTES: + seatunnelField = rs.getByte(i); + break; + case SMALLINT: + seatunnelField = rs.getShort(i); + break; + case INT: + seatunnelField = rs.getInt(i); + break; + case BIGINT: + seatunnelField = rs.getLong(i); + break; + case FLOAT: + seatunnelField = rs.getFloat(i); + break; + case DOUBLE: + seatunnelField = rs.getDouble(i); + break; + case STRING: + seatunnelField = rs.getString(i); + break; + case TIMESTAMP: + Timestamp ts = rs.getTimestamp(i, Calendar.getInstance(TimeZone.getTimeZone("UTC"))); + LocalDateTime localDateTime = LocalDateTime.ofInstant(ts.toInstant(), ZoneId.of("UTC")); // good + seatunnelField = localDateTime; + break; + default: + throw new IllegalStateException("Unexpected value: " + seaTunnelDataType); + } } - - fields.add(seatunnelField); + fields[i - 1] = seatunnelField; } - return new SeaTunnelRow(fields.toArray()); + return new SeaTunnelRow(fields); } private SeaTunnelRowType initTableField() { ArrayList> seaTunnelDataTypes = new ArrayList<>(); - ArrayList fieldNames = new ArrayList<>(); - try { ResultSetMetaData resultSetMetaData = getResultSetMetaData(); + String[] fieldNames = new String[resultSetMetaData.getColumnCount()]; for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { - fieldNames.add(resultSetMetaData.getColumnName(i)); + fieldNames[i - 1] = resultSetMetaData.getColumnName(i); seaTunnelDataTypes.add(DruidTypeMapper.DRUID_TYPE_MAPPS.get(resultSetMetaData.getColumnTypeName(i))); } + rowTypeInfo = new SeaTunnelRowType(fieldNames, seaTunnelDataTypes.toArray(new SeaTunnelDataType[seaTunnelDataTypes.size()])); } catch (SQLException e) { LOGGER.warn("get row type info exception", e); } - rowTypeInfo = new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType[seaTunnelDataTypes.size()])); return rowTypeInfo; } diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java index 62ef37ea87d..4edfd793a5e 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/client/DruidOutputFormat.java @@ -20,8 +20,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -29,6 +27,7 @@ import com.fasterxml.jackson.datatype.joda.JodaModule; import lombok.Data; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -58,6 +57,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; @Data @@ -107,7 +107,7 @@ public void write(SeaTunnelRow element) { this.data.append(DEFAULT_LINE_DELIMITER); } - public void closeOutputFormat() { + public void closeOutputFormat() throws IOException { try { ParallelIndexIOConfig ioConfig = parallelIndexIOConfig(); ParallelIndexTuningConfig tuningConfig = tuningConfig(); @@ -122,20 +122,21 @@ public void closeOutputFormat() { mapper.configure(SerializationFeature.INDENT_OUTPUT, false); mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); String taskJSON = mapper.writeValueAsString(indexTask); - JSONObject jsonObject = JSON.parseObject(taskJSON); - jsonObject.remove("id"); - jsonObject.remove("groupId"); - jsonObject.remove("resource"); - JSONObject spec = jsonObject.getJSONObject("spec"); - spec.remove("tuningConfig"); - jsonObject.put("spec", spec); - taskJSON = jsonObject.toJSONString(); - + Map map = mapper.readValue(taskJSON, Map.class); + map.remove("id"); + map.remove("groupId"); + map.remove("resource"); + String spec = mapper.writeValueAsString(map.get("spec")); + Map specMap = mapper.readValue(spec, Map.class); + specMap.remove("tuningConfig"); + map.put("spec", mapper.readValue(mapper.writeValueAsString(specMap), Object.class)); + taskJSON = mapper.writeValueAsString(map); URL url = new URL(this.coordinatorURL + "druid/indexer/v1/task"); HttpURLConnection con = (HttpURLConnection) url.openConnection(); con.setRequestMethod("POST"); con.setRequestProperty("Content-Type", "application/json"); con.setRequestProperty("Accept", "application/json, text/plain, */*"); + con.setRequestProperty("charset", "utf-8"); con.setDoOutput(true); try (OutputStream os = con.getOutputStream()) { byte[] input = taskJSON.getBytes(StandardCharsets.UTF_8); @@ -149,8 +150,8 @@ public void closeOutputFormat() { } LOGGER.info("Druid write task has been sent, and the response is {}", response.toString()); } - } catch (IOException e) { - e.printStackTrace(); + } catch (IOException se) { + throw new IOException("Couldn't write data - " + ExceptionUtils.getMessage(se), se); } } diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java index c351767eb0f..38214cd05fe 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSinkOptions.java @@ -43,9 +43,9 @@ public class DruidSinkOptions implements Serializable { private int parallelism; public DruidSinkOptions(Config pluginConfig) { - this.coordinatorURL = pluginConfig.getString(DruidSinkConfig.COORDINATOR_URL); - this.datasource = pluginConfig.getString(DruidSinkConfig.DATASOURCE); - this.columns = pluginConfig.getStringList(DruidSinkConfig.COLUMNS); + this.coordinatorURL = pluginConfig.hasPath(DruidSinkConfig.COORDINATOR_URL) ? pluginConfig.getString(DruidSinkConfig.COORDINATOR_URL) : null; + this.datasource = pluginConfig.hasPath(DruidSinkConfig.DATASOURCE) ? pluginConfig.getString(DruidSinkConfig.DATASOURCE) : null; + this.columns = pluginConfig.hasPath(DruidSinkConfig.COLUMNS) ? pluginConfig.getStringList(DruidSinkConfig.COLUMNS) : null; this.timestampColumn = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_COLUMN) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_COLUMN) : null; this.timestampFormat = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_FORMAT) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_FORMAT) : null; this.timestampMissingValue = pluginConfig.hasPath(DruidSinkConfig.TIMESTAMP_MISSING_VALUE) ? pluginConfig.getString(DruidSinkConfig.TIMESTAMP_MISSING_VALUE) : null; diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java index 87333ca78cb..66b7288910e 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/config/DruidSourceOptions.java @@ -41,8 +41,8 @@ public class DruidSourceOptions implements Serializable { private Integer parallelism; public DruidSourceOptions(Config pluginConfig) { - this.url = pluginConfig.getString(DruidSourceConfig.URL); - this.datasource = pluginConfig.getString(DruidSourceConfig.DATASOURCE); + this.url = pluginConfig.hasPath(DruidSourceConfig.URL) ? pluginConfig.getString(DruidSourceConfig.URL) : null; + this.datasource = pluginConfig.hasPath(DruidSourceConfig.DATASOURCE) ? pluginConfig.getString(DruidSourceConfig.DATASOURCE) : null; this.columns = pluginConfig.hasPath(DruidSourceConfig.COLUMNS) ? pluginConfig.getStringList(DruidSourceConfig.COLUMNS) : null; this.startTimestamp = pluginConfig.hasPath(DruidSourceConfig.START_TIMESTAMP) ? pluginConfig.getString(DruidSourceConfig.START_TIMESTAMP) : null; this.endTimestamp = pluginConfig.hasPath(DruidSourceConfig.END_TIMESTAMP) ? pluginConfig.getString(DruidSourceConfig.END_TIMESTAMP) : null; diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java index dd4ba46b0a8..50f14491eef 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/seatunnel/druid/source/DruidSource.java @@ -34,6 +34,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import com.google.auto.service.AutoService; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,8 +58,8 @@ public void prepare(Config pluginConfig) throws PrepareFailException { druidSourceOptions = new DruidSourceOptions(pluginConfig); druidInputFormat = new DruidInputFormat(druidSourceOptions); this.rowTypeInfo = druidInputFormat.getRowTypeInfo(); - } catch (Exception e) { - throw new PrepareFailException("Druid", PluginType.SOURCE, e.toString()); + } catch (Exception se) { + throw new PrepareFailException("Druid", PluginType.SOURCE, ExceptionUtils.getMessage(se)); } }