Skip to content

Commit

Permalink
[Hotfix][Connector-V2][Hive] Fix the bug that when write data to hive…
Browse files Browse the repository at this point in the history
… throws NullPointerException
  • Loading branch information
TyrantLucifer committed Nov 1, 2022
1 parent 9bd076c commit 907ecad
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ public TextFileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaT
Map<String, Integer> columnsMap = new HashMap<>(seaTunnelRowTypeInfo.getFieldNames().length);
String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
for (int i = 0; i < fieldNames.length; i++) {
columnsMap.put(fieldNames[i], i);
columnsMap.put(fieldNames[i].toLowerCase(), i);
}

// init sink column index and partition field index, we will use the column index to found the data in SeaTunnelRow
this.sinkColumnsIndexInRow = this.sinkColumnList.stream()
.map(columnsMap::get)
.map(column -> columnsMap.get(column.toLowerCase()))
.collect(Collectors.toList());

if (!CollectionUtils.isEmpty(this.partitionFieldList)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void beginTransaction(Long checkpointId) {
* @return transaction ids
*/
public List<String> getTransactionIdFromStates(List<FileSinkState> fileStates) {
String[] pathSegments = new String[]{textFileSinkConfig.getPath(), Constant.SEATUNNEL, jobId};
String[] pathSegments = new String[]{textFileSinkConfig.getTmpPath(), Constant.SEATUNNEL, jobId};
String jobDir = String.join(File.separator, pathSegments) + File.separator;
try {
List<String> transactionDirList = FileSystemUtils.dirList(jobDir).stream().map(Path::toString).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ private TypeDescription buildSchemaWithRowType() {
TypeDescription schema = TypeDescription.createStruct();
for (Integer i : sinkColumnsIndexInRow) {
TypeDescription fieldType = buildFieldWithRowType(seaTunnelRowType.getFieldType(i));
schema.addField(seaTunnelRowType.getFieldName(i), fieldType);
schema.addField(seaTunnelRowType.getFieldName(i).toLowerCase(), fieldType);
}
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
for (Integer integer : sinkColumnsIndexInRow) {
String fieldName = seaTunnelRowType.getFieldName(integer);
Object field = seaTunnelRow.getField(integer);
recordBuilder.set(fieldName, resolveObject(field, seaTunnelRowType.getFieldType(integer)));
recordBuilder.set(fieldName.toLowerCase(), resolveObject(field, seaTunnelRowType.getFieldType(integer)));
}
GenericData.Record record = recordBuilder.build();
try {
Expand Down
10 changes: 10 additions & 0 deletions seatunnel-connectors-v2/connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
Expand Down Expand Up @@ -89,6 +95,10 @@
<artifactId>pentaho-aggdesigner-algorithm</artifactId>
<groupId>org.pentaho</groupId>
</exclusion>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
Expand Down

0 comments on commit 907ecad

Please sign in to comment.