Skip to content

Commit

Permalink
feature(file): 增加是否携带文件名开关以及对应逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
chaorongzhi committed Aug 30, 2024
1 parent 8c6ebc1 commit 061e1ae
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,11 @@ public class BaseSourceConfigOptions {
.noDefaultValue()
.withDescription(
"Local file source configs, used to create multiple local file source.");

public static final Option<Boolean> DATA_CARRY_FILENAME =
Options.key("data_carry_filename")
.booleanType()
.defaultValue(false)
.withDescription(
"Specifies whether to carry the corresponding file name in each piece of data.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
Expand Down Expand Up @@ -71,6 +72,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {

protected Pattern pattern;

protected boolean dataCarryFilename = false;

@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
Expand Down Expand Up @@ -146,6 +149,10 @@ public void setPluginConfig(Config pluginConfig) {
pluginConfig.getString(BaseSourceConfigOptions.FILE_FILTER_PATTERN.key());
this.pattern = Pattern.compile(Matcher.quoteReplacement(filterPattern));
}
if (pluginConfig.hasPath(BaseSourceConfigOptions.DATA_CARRY_FILENAME.key())) {
this.dataCarryFilename =
pluginConfig.getBoolean(BaseSourceConfigOptions.DATA_CARRY_FILENAME.key());
}
}

@Override
Expand Down Expand Up @@ -211,4 +218,16 @@ public void close() throws IOException {
} catch (Exception ignore) {
}
}

protected SeaTunnelRow dataCarryFilename(SeaTunnelRow seaTunnelRow, String path) {
Object[] fields = seaTunnelRow.getFields();
Object[] newFields = new Object[fields.length];
String[] splitPath = path.split("/");
newFields[0] = splitPath[splitPath.length - 1];
System.arraycopy(fields, 0, newFields, 1, fields.length - 1);
SeaTunnelRow newSeaTunnelRow = new SeaTunnelRow(newFields);
newSeaTunnelRow.setRowKind(seaTunnelRow.getRowKind());
newSeaTunnelRow.setTableId(seaTunnelRow.getTableId());
return newSeaTunnelRow;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
seaTunnelRow.setField(index++, value);
}
}

if (this.dataCarryFilename) {
seaTunnelRow = dataCarryFilename(seaTunnelRow, path);
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
seaTunnelRow.setField(index++, value);
}
}

if (this.dataCarryFilename) {
seaTunnelRow = dataCarryFilename(seaTunnelRow, path);
}
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
}
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);

if (this.dataCarryFilename) {
seaTunnelRow = dataCarryFilename(seaTunnelRow, path);
}

seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
num++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);

if (this.dataCarryFilename) {
seaTunnelRow = dataCarryFilename(seaTunnelRow, path);
}

seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
seaTunnelRow.setField(index++, value);
}
}

if (this.dataCarryFilename) {
seaTunnelRow = dataCarryFilename(seaTunnelRow, path);
}

seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,16 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)

if (CollectionUtils.isEmpty(fields)) return;

fields.forEach(
field -> {
int fieldIndex =
ArrayUtils.indexOf(
seaTunnelRowType.getFieldNames(),
field.getName());
seaTunnelRow.setField(
fieldIndex,
convert(
field.getText(),
seaTunnelRowType
.getFieldTypes()[fieldIndex]));
});
for (Node field : fields) {
int fieldIndex =
ArrayUtils.indexOf(
seaTunnelRowType.getFieldNames(), field.getName());
seaTunnelRow.setField(
fieldIndex,
convert(
field.getText(),
seaTunnelRowType.getFieldTypes()[fieldIndex]));
}

if (isMergePartition) {
int partitionIndex = seaTunnelRowType.getTotalFields();
Expand All @@ -149,6 +146,10 @@ public void read(String path, String tableId, Collector<SeaTunnelRow> output)
}
}

if (this.dataCarryFilename) {
seaTunnelRow = dataCarryFilename(seaTunnelRow, path);
}

seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public class SeaTunnelEngineExample {

public static void main(String[] args)
throws FileNotFoundException, URISyntaxException, CommandException {
String configurePath = args.length > 0 ? args[0] : "/examples/ojdbc_clickhouse.conf";
String configurePath = args.length > 0 ? args[0] : "/examples/localfile_to_console.conf";
String configFile = getTestConfigFile(configurePath);
ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
clientCommandArgs.setConfigFile(configFile);
clientCommandArgs.setCheckConfig(false);
clientCommandArgs.setJobName(Paths.get(configFile).getFileName().toString());
// Change Execution Mode to CLUSTER to use client mode, before do this, you should start
// SeaTunnelEngineServerExample
clientCommandArgs.setMasterType(MasterType.LOCAL);
clientCommandArgs.setMasterType(MasterType.CLUSTER);
SeaTunnel.run(clientCommandArgs.buildCommand());
}

Expand Down

0 comments on commit 061e1ae

Please sign in to comment.