Skip to content

Commit

Permalink
[Hotfix][Connector-V2][File] Fix the following bugs: 1. write parquet…
Browse files Browse the repository at this point in the history
… NullPointerException 2. when restore write from states getting error file path
  • Loading branch information
TyrantLucifer committed Nov 2, 2022
1 parent a0bf56c commit 71c6a08
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,14 @@ public List<String> getTransactionIdFromStates(List<FileSinkState> fileStates) {
String[] pathSegments = new String[]{textFileSinkConfig.getTmpPath(), Constant.SEATUNNEL, jobId};
String jobDir = String.join(File.separator, pathSegments) + File.separator;
try {
List<String> transactionDirList = FileSystemUtils.dirList(jobDir).stream().map(Path::toString).collect(Collectors.toList());
return transactionDirList.stream().map(dir -> dir.replaceAll(jobDir, "")).collect(Collectors.toList());
List<String> transactionDirList = FileSystemUtils.dirList(jobDir)
.stream()
.map(path -> path.toUri().getPath())
.collect(Collectors.toList());
return transactionDirList
.stream()
.map(dir -> dir.replaceAll(jobDir, ""))
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void finishAndCloseFile() {
}
needMoveFiles.put(k, getTargetLocation(k));
});
this.beingWrittenWriter.clear();
}

private Writer getOrCreateWriter(@NonNull String filePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void finishAndCloseFile() {
}
needMoveFiles.put(k, getTargetLocation(k));
});
this.beingWrittenWriter.clear();
}

private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath) {
Expand Down Expand Up @@ -189,7 +190,7 @@ private Object resolveObject(Object data, SeaTunnelDataType<?> seaTunnelDataType
Schema recordSchema = buildAvroSchemaWithRowType((SeaTunnelRowType) seaTunnelDataType, sinkColumnsIndex);
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(recordSchema);
for (int i = 0; i < fieldNames.length; i++) {
recordBuilder.set(fieldNames[i], resolveObject(seaTunnelRow.getField(i), fieldTypes[i]));
recordBuilder.set(fieldNames[i].toLowerCase(), resolveObject(seaTunnelRow.getField(i), fieldTypes[i]));
}
return recordBuilder.build();
default:
Expand Down Expand Up @@ -284,7 +285,7 @@ private Schema buildAvroSchemaWithRowType(SeaTunnelRowType seaTunnelRowType, Lis
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
String[] fieldNames = seaTunnelRowType.getFieldNames();
sinkColumnsIndex.forEach(index -> {
Type type = seaTunnelDataType2ParquetDataType(fieldNames[index], fieldTypes[index]);
Type type = seaTunnelDataType2ParquetDataType(fieldNames[index].toLowerCase(), fieldTypes[index]);
types.add(type);
});
MessageType seaTunnelRow = Types.buildMessage().addFields(types.toArray(new Type[0])).named("SeaTunnelRecord");
Expand Down

0 comments on commit 71c6a08

Please sign in to comment.