Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][Connector-V2][Hive] Fix the bug that when write data to hive throws NullPointerException #3258

Merged
merged 8 commits into from
Nov 8, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ public TextFileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaT
Map<String, Integer> columnsMap = new HashMap<>(seaTunnelRowTypeInfo.getFieldNames().length);
String[] fieldNames = seaTunnelRowTypeInfo.getFieldNames();
for (int i = 0; i < fieldNames.length; i++) {
columnsMap.put(fieldNames[i], i);
columnsMap.put(fieldNames[i].toLowerCase(), i);
}

// init sink column index and partition field index, we will use the column index to found the data in SeaTunnelRow
this.sinkColumnsIndexInRow = this.sinkColumnList.stream()
.map(columnsMap::get)
.map(column -> columnsMap.get(column.toLowerCase()))
.collect(Collectors.toList());

if (!CollectionUtils.isEmpty(this.partitionFieldList)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -248,11 +247,17 @@ public void beginTransaction(Long checkpointId) {
* @return transaction ids
*/
public List<String> getTransactionIdFromStates(List<FileSinkState> fileStates) {
String[] pathSegments = new String[]{textFileSinkConfig.getPath(), Constant.SEATUNNEL, jobId};
String[] pathSegments = new String[]{textFileSinkConfig.getTmpPath(), Constant.SEATUNNEL, jobId};
TyrantLucifer marked this conversation as resolved.
Show resolved Hide resolved
String jobDir = String.join(File.separator, pathSegments) + File.separator;
try {
List<String> transactionDirList = FileSystemUtils.dirList(jobDir).stream().map(Path::toString).collect(Collectors.toList());
return transactionDirList.stream().map(dir -> dir.replaceAll(jobDir, "")).collect(Collectors.toList());
List<String> transactionDirList = FileSystemUtils.dirList(jobDir)
.stream()
.map(path -> path.toUri().getPath())
.collect(Collectors.toList());
return transactionDirList
.stream()
.map(dir -> dir.replaceAll(jobDir, ""))
.collect(Collectors.toList());
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public void finishAndCloseFile() {
}
needMoveFiles.put(k, getTargetLocation(k));
});
this.beingWrittenWriter.clear();
}

private Writer getOrCreateWriter(@NonNull String filePath) {
Expand Down Expand Up @@ -165,7 +166,7 @@ private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
TypeDescription struct = TypeDescription.createStruct();
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) type).getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
struct.addField(((SeaTunnelRowType) type).getFieldName(i), buildFieldWithRowType(fieldTypes[i]));
struct.addField(((SeaTunnelRowType) type).getFieldName(i).toLowerCase(), buildFieldWithRowType(fieldTypes[i]));
}
return struct;
case NULL:
Expand All @@ -179,7 +180,7 @@ private TypeDescription buildSchemaWithRowType() {
TypeDescription schema = TypeDescription.createStruct();
for (Integer i : sinkColumnsIndexInRow) {
TypeDescription fieldType = buildFieldWithRowType(seaTunnelRowType.getFieldType(i));
schema.addField(seaTunnelRowType.getFieldName(i), fieldType);
schema.addField(seaTunnelRowType.getFieldName(i).toLowerCase(), fieldType);
}
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
for (Integer integer : sinkColumnsIndexInRow) {
String fieldName = seaTunnelRowType.getFieldName(integer);
Object field = seaTunnelRow.getField(integer);
recordBuilder.set(fieldName, resolveObject(field, seaTunnelRowType.getFieldType(integer)));
recordBuilder.set(fieldName.toLowerCase(), resolveObject(field, seaTunnelRowType.getFieldType(integer)));
}
GenericData.Record record = recordBuilder.build();
try {
Expand All @@ -117,6 +117,7 @@ public void finishAndCloseFile() {
}
needMoveFiles.put(k, getTargetLocation(k));
});
this.beingWrittenWriter.clear();
}

private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath) {
Expand Down Expand Up @@ -154,6 +155,9 @@ private ParquetWriter<GenericRecord> getOrCreateWriter(@NonNull String filePath)

@SuppressWarnings("checkstyle:MagicNumber")
private Object resolveObject(Object data, SeaTunnelDataType<?> seaTunnelDataType) {
if (data == null) {
return null;
}
switch (seaTunnelDataType.getSqlType()) {
case ARRAY:
BasicType<?> elementType = ((ArrayType<?, ?>) seaTunnelDataType).getElementType();
Expand Down Expand Up @@ -189,7 +193,7 @@ private Object resolveObject(Object data, SeaTunnelDataType<?> seaTunnelDataType
Schema recordSchema = buildAvroSchemaWithRowType((SeaTunnelRowType) seaTunnelDataType, sinkColumnsIndex);
GenericRecordBuilder recordBuilder = new GenericRecordBuilder(recordSchema);
for (int i = 0; i < fieldNames.length; i++) {
recordBuilder.set(fieldNames[i], resolveObject(seaTunnelRow.getField(i), fieldTypes[i]));
recordBuilder.set(fieldNames[i].toLowerCase(), resolveObject(seaTunnelRow.getField(i), fieldTypes[i]));
}
return recordBuilder.build();
default:
Expand Down Expand Up @@ -284,7 +288,7 @@ private Schema buildAvroSchemaWithRowType(SeaTunnelRowType seaTunnelRowType, Lis
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
String[] fieldNames = seaTunnelRowType.getFieldNames();
sinkColumnsIndex.forEach(index -> {
Type type = seaTunnelDataType2ParquetDataType(fieldNames[index], fieldTypes[index]);
Type type = seaTunnelDataType2ParquetDataType(fieldNames[index].toLowerCase(), fieldTypes[index]);
types.add(type);
});
MessageType seaTunnelRow = Types.buildMessage().addFields(types.toArray(new Type[0])).named("SeaTunnelRecord");
Expand Down
10 changes: 10 additions & 0 deletions seatunnel-connectors-v2/connector-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
Expand Down Expand Up @@ -89,6 +95,10 @@
<artifactId>pentaho-aggdesigner-algorithm</artifactId>
<groupId>org.pentaho</groupId>
</exclusion>
<exclusion>
<artifactId>avro</artifactId>
<groupId>org.apache.avro</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
Expand Down