Skip to content

Commit

Permalink
feat(hive): hdfs获取连接时指定用户
Browse files Browse the repository at this point in the history
  • Loading branch information
chaorongzhi committed Jan 3, 2024
1 parent 4aab51c commit e09bac5
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
hadoopConf.setKerberosKeytabPath(
pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key()));
}
if (pluginConfig.hasPath(BaseSinkConfig.HADOOP_USER_NAME.key())) {
hadoopConf.setHadoopUserName(
pluginConfig.getString(BaseSinkConfig.HADOOP_USER_NAME.key()));
} else {
hadoopConf.setHadoopUserName(BaseSinkConfig.HADOOP_USER_NAME.defaultValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class BaseSinkConfig {
public static final String DEFAULT_TMP_PATH = "/tmp/seatunnel";
public static final String DEFAULT_FILE_NAME_EXPRESSION = "${transactionId}";
public static final int DEFAULT_BATCH_SIZE = 1000000;
public static final String DEFAULT_HADOOP_USER_NAME = "hadoop";

public static final Option<CompressFormat> COMPRESS_CODEC =
Options.key("compress_codec")
Expand Down Expand Up @@ -242,6 +243,6 @@ public class BaseSinkConfig {
public static final Option<String> HADOOP_USER_NAME =
Options.key("hadoop_user_name")
.stringType()
.defaultValue("hadoop")
.defaultValue(DEFAULT_HADOOP_USER_NAME)
.withDescription("hadoop user name");
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.List;

public class BaseSourceConfig {
public static final String DEFAULT_HADOOP_USER_NAME = "hadoop";

public static final Option<FileFormat> FILE_FORMAT_TYPE =
Options.key("file_format_type")
.objectType(FileFormat.class)
Expand Down Expand Up @@ -132,4 +134,10 @@ public class BaseSourceConfig {
.stringType()
.noDefaultValue()
.withDescription("Krb5 file path");

public static final Option<String> HADOOP_USER_NAME =
Options.key("hadoop_user_name")
.stringType()
.defaultValue(DEFAULT_HADOOP_USER_NAME)
.withDescription("hadoop user name");
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,24 @@ public Configuration getConfiguration(HadoopConf hadoopConf) {
String keytabPath = hadoopConf.getKerberosKeytabPath();
String krb5Path = hadoopConf.getKrb5Path();
doKerberosAuthentication(configuration, principal, keytabPath, krb5Path);
configuration.set("hadoop.rpc.protection", "privacy");
return configuration;
}

public FileSystem getFileSystem(@NonNull String path) throws IOException {
if (configuration == null) {
configuration = getConfiguration(hadoopConf);
configuration.set("hadoop.rpc.protection", "privacy");
}
FileSystem fileSystem =
FileSystem.get(URI.create(path.replaceAll("\\\\", "/")), configuration);
FileSystem fileSystem = null;
try {
fileSystem =
FileSystem.get(
URI.create(path.replaceAll("\\\\", "/")),
configuration,
hadoopConf.getHadoopUserName());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
fileSystem.setWriteChecksum(false);
return fileSystem;
}
Expand Down

0 comments on commit e09bac5

Please sign in to comment.