Skip to content

Commit

Permalink
[Hotfix][Connector-V2][Hive] Fix the bug that when write data to hive…
Browse files Browse the repository at this point in the history
… throws NullPointerException (#3258)

* [Hotfix][Connector-V2][Hive] Fix the bug that when write data to hive throws NullPointerException

* [Hotfix][Connector-V2][File] Fix the following bugs: 1. write parquet NullPointerException 2. when restore write from states getting error file path

* [Hotfix][Connector-V2][Hive] Fix code style

* [Hotfix][Connector-V2][Hive] Fix npe

* [Hotfix][Connector-V2][Hive] Fix npe

* [Feature][Connector-V2][Hive] Fix file e2e

* [Hotfix][Connector-V2][File] Fix file e2e

* [Hotfix][Connector-V2][Hive] Add change log
  • Loading branch information
TyrantLucifer authored Nov 8, 2022
1 parent 685978d commit 777bf6b
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 13 deletions.
6 changes: 6 additions & 0 deletions docs/en/connector-v2/sink/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,9 @@ FtpFile {
- [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/incubator-seatunnel/pull/2980))
- [BugFix] Fix filesystem get error ([3117](https://github.com/apache/incubator-seatunnel/pull/3117))
- [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/incubator-seatunnel/pull/3083))

### Next version
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
5 changes: 5 additions & 0 deletions docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,8 @@ HdfsFile {
- [BugFix] Fix filesystem get error ([3117](https://github.com/apache/incubator-seatunnel/pull/3117))
- [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/incubator-seatunnel/pull/3083))

### Next version
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
7 changes: 7 additions & 0 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,10 @@ sink {

### 2.3.0-beta 2022-10-20
- [Improve] Hive Sink supports automatic partition repair ([3133](https://github.com/apache/incubator-seatunnel/pull/3133))

### Next version
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed

6 changes: 6 additions & 0 deletions docs/en/connector-v2/sink/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,9 @@ LocalFile {
- [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/incubator-seatunnel/pull/2980))
- [BugFix] Fix filesystem get error ([3117](https://github.com/apache/incubator-seatunnel/pull/3117))
- [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/incubator-seatunnel/pull/3083))

### Next version
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
6 changes: 6 additions & 0 deletions docs/en/connector-v2/sink/OssFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,9 @@ For orc file format
- [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/incubator-seatunnel/pull/2980))
- [BugFix] Fix filesystem get error ([3117](https://github.com/apache/incubator-seatunnel/pull/3117))
- [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/incubator-seatunnel/pull/3083))

### Next version
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
6 changes: 6 additions & 0 deletions docs/en/connector-v2/sink/S3File.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,9 @@ For orc file format
### 2.3.0-beta 2022-10-20

- Add S3File Sink Connector

### Next version
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
8 changes: 6 additions & 2 deletions docs/en/connector-v2/sink/SftpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ SftpFile {

## Changelog

### next version
### Next version

- Add SftpFile Sink Connector
- Add SftpFile Sink Connector
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ public TextFileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaT
Map<String, Integer> columnsMap = new HashMap<>(seaTunnelRowTypeInfo.getFieldNames().length);
String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
for (int i = 0; i < fieldNames.length; i++) {
columnsMap.put(fieldNames[i], i);
columnsMap.put(fieldNames[i].toLowerCase(), i);
}

// init sink column index and partition field index, we will use the column index to found the data in SeaTunnelRow
this.sinkColumnsIndexInRow = this.sinkColumnList.stream()
.map(columnsMap::get)
.map(column -> columnsMap.get(column.toLowerCase()))
.collect(Collectors.toList());

if (!CollectionUtils.isEmpty(this.partitionFieldList)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -248,11 +247,17 @@ public void beginTransaction(Long checkpointId) {
* @return transaction ids
*/
public List<String> getTransactionIdFromStates(List<FileSinkState> fileStates) {
String[] pathSegments = new String[]{textFileSinkConfig.getPath(), Constant.SEATUNNEL, jobId};
String[] pathSegments = new String[]{textFileSinkConfig.getTmpPath(), Constant.SEATUNNEL, jobId};
String jobDir = String.join(File.separator, pathSegments) + File.separator;
try {
List<String> transactionDirList = FileSystemUtils.dirList(jobDir).stream().map(Path::toString).collect(Collectors.toList());
return transactionDirList.stream().map(dir -> dir.replaceAll(jobDir, "")).collect(Collectors.toList());
List<String> transactionDirList = FileSystemUtils.dirList(jobDir)
.stream()
.map(path -> path.toUri().getPath())
.collect(Collectors.toList());
return transactionDirList
.stream()
.map(dir -> dir.replaceAll(jobDir, ""))
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void finishAndCloseFile() {
}
needMoveFiles.put(k, getTargetLocation(k));
});
this.beingWrittenWriter.clear();
}

private Writer getOrCreateWriter(@NonNull String filePath) {
Expand Down Expand Up @@ -165,7 +166,7 @@ private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
TypeDescription struct = TypeDescription.createStruct();
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) type).getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
struct.addField(((SeaTunnelRowType) type).getFieldName(i), buildFieldWithRowType(fieldTypes[i]));
struct.addField(((SeaTunnelRowType) type).getFieldName(i).toLowerCase(), buildFieldWithRowType(fieldTypes[i]));
}
return struct;
case NULL:
Expand All @@ -179,7 +180,7 @@ private TypeDescription buildSchemaWithRowType() {
TypeDescription schema = TypeDescription.createStruct();
for (Integer i : sinkColumnsIndexInRow) {
TypeDescription fieldType = buildFieldWithRowType(seaTunnelRowType.getFieldType(i));
schema.addField(seaTunnelRowType.getFieldName(i), fieldType);
schema.addField(seaTunnelRowType.getFieldName(i).toLowerCase(), fieldType);
}
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
for (Integer integer : sinkColumnsIndexInRow) {
String fieldName = seaTunnelRowType.getFieldName(integer);
Object field = seaTunnelRow.getField(integer);
recordBuilder.set(fieldName, resolveObject(field, seaTunnelRowType.getFieldType(integer)));
recordBuilder.set(fieldName.toLowerCase(), resolveObject(field, seaTunnelRowType.getFieldType(integer)));
}
GenericData.Record record = recordBuilder.build();
try {
Expand All @@ -117,6 +117,7 @@ public void finishAndCloseFile() {
}
needMoveFiles.put(k, getTargetLocation(k));
});
this.beingWrittenWriter.clear();
}

private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath) {
Expand Down Expand Up @@ -154,6 +155,9 @@ private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath)

@SuppressWarnings("checkstyle:MagicNumber")
private Object resolveObject(Object data, SeaTunnelDataType<?> seaTunnelDataType) {
if (data == null) {
return null;
}
switch (seaTunnelDataType.getSqlType()) {
case ARRAY:
BasicType<?> elementType = ((ArrayType<?, ?>) seaTunnelDataType).getElementType();
Expand Down Expand Up @@ -189,7 +193,7 @@ private Object resolveObject(Object data, SeaTunnelDataType<?> seaTunnelDataType
Schema recordSchema = buildAvroSchemaWithRowType((SeaTunnelRowType) seaTunnelDataType, sinkColumnsIndex);
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(recordSchema);
for (int i = 0; i < fieldNames.length; i++) {
recordBuilder.set(fieldNames[i], resolveObject(seaTunnelRow.getField(i), fieldTypes[i]));
recordBuilder.set(fieldNames[i].toLowerCase(), resolveObject(seaTunnelRow.getField(i), fieldTypes[i]));
}
return recordBuilder.build();
default:
Expand Down Expand Up @@ -284,7 +288,7 @@ private Schema buildAvroSchemaWithRowType(SeaTunnelRowType seaTunnelRowType, Lis
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
String[] fieldNames = seaTunnelRowType.getFieldNames();
sinkColumnsIndex.forEach(index -> {
Type type = seaTunnelDataType2ParquetDataType(fieldNames[index], fieldTypes[index]);
Type type = seaTunnelDataType2ParquetDataType(fieldNames[index].toLowerCase(), fieldTypes[index]);
types.add(type);
});
MessageType seaTunnelRow = Types.buildMessage().addFields(types.toArray(new Type[0])).named("SeaTunnelRecord");
Expand Down
10 changes: 10 additions & 0 deletions seatunnel-connectors-v2/connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
Expand Down Expand Up @@ -89,6 +95,10 @@
<artifactId>pentaho-aggdesigner-algorithm</artifactId>
<groupId>org.pentaho</groupId>
</exclusion>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,21 @@
<artifactId>connector-file-local-e2e</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-local</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>

0 comments on commit 777bf6b

Please sign in to comment.