Skip to content

Commit

Permalink
[bugfix][hive-reader] HiveBaseResultSet#getTimestamp method does not …
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao committed Sep 20, 2023
1 parent de5c5c1 commit 2410eed
Showing 1 changed file with 36 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

package com.wgzhao.addax.plugin.reader.hivereader;

import com.wgzhao.addax.common.element.Column;
import com.wgzhao.addax.common.element.DoubleColumn;
import com.wgzhao.addax.common.element.TimestampColumn;
import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.plugin.RecordSender;
import com.wgzhao.addax.common.spi.Reader;
Expand All @@ -30,6 +33,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;

import static com.wgzhao.addax.common.base.Constant.DEFAULT_FETCH_SIZE;
Expand All @@ -39,22 +47,19 @@
import static com.wgzhao.addax.common.base.Key.KERBEROS_PRINCIPAL;

public class HiveReader
extends Reader
{
extends Reader {

private static final DataBaseType DATABASE_TYPE = DataBaseType.Hive;

public static class Job
extends Reader.Job
{
extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);

private Configuration originalConfig = null;
private CommonRdbmsReader.Job commonRdbmsReaderJob;

@Override
public void init()
{
public void init() {
this.originalConfig = getPluginJobConf();

boolean haveKerberos = originalConfig.getBool(HAVE_KERBEROS, false);
Expand All @@ -71,37 +76,31 @@ public void init()
}

@Override
public void preCheck()
{
public void preCheck() {
this.commonRdbmsReaderJob.preCheck(originalConfig, DATABASE_TYPE);
}

@Override
public List<Configuration> split(int adviceNumber)
{
public List<Configuration> split(int adviceNumber) {
return this.commonRdbmsReaderJob.split(originalConfig, adviceNumber);
}

@Override
public void post()
{
public void post() {
this.commonRdbmsReaderJob.post(originalConfig);
}

@Override
public void destroy()
{
public void destroy() {
this.commonRdbmsReaderJob.destroy(originalConfig);
}

private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf)
{
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf) {
if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
}
catch (Exception e) {
} catch (Exception e) {
String message = String.format("Auth failure with kerberos, Please check " +
"kerberosKeytabFilePath[%s] and kerberosPrincipal[%s]",
kerberosKeytabFilePath, kerberosPrincipal);
Expand All @@ -112,37 +111,45 @@ private void kerberosAuthentication(String kerberosPrincipal, String kerberosKey
}

public static class Task
extends Reader.Task
{
extends Reader.Task {

private Configuration readerSliceConfig;
private CommonRdbmsReader.Task commonRdbmsReaderTask;

@Override
public void init()
{
public void init() {
this.readerSliceConfig = getPluginJobConf();
this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId());
this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId()) {

@Override
protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i)
throws SQLException, UnsupportedEncodingException {
if (metaData.getColumnType(i) == Types.TIMESTAMP ) {
// hive HiveBaseResultSet#getTimestamp(String columnName, Calendar cal) not support
return new TimestampColumn(rs.getTimestamp(i));
}
return super.createColumn(rs, metaData, i);
}

};

this.commonRdbmsReaderTask.init(this.readerSliceConfig);
}

@Override
public void startRead(RecordSender recordSender)
{
public void startRead(RecordSender recordSender) {
int fetchSize = this.readerSliceConfig.getInt(FETCH_SIZE, DEFAULT_FETCH_SIZE);

this.commonRdbmsReaderTask.startRead(readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize);
}

@Override
public void post()
{
public void post() {
this.commonRdbmsReaderTask.post(readerSliceConfig);
}

@Override
public void destroy()
{
public void destroy() {
this.commonRdbmsReaderTask.destroy(readerSliceConfig);
}
}
Expand Down

0 comments on commit 2410eed

Please sign in to comment.