Skip to content

Commit

Permalink
[#10131] Migrate HBase1 API to HBase2 API
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jul 19, 2023
1 parent e27b1ad commit 0c5cc14
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@

package com.navercorp.pinpoint.common.hbase;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import com.navercorp.pinpoint.common.util.ArrayUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import com.navercorp.pinpoint.common.util.BytesUtils;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NamespaceExistException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.logging.log4j.Logger;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* @author emeroad
Expand Down Expand Up @@ -78,23 +76,19 @@ public boolean createNamespaceIfNotExists(String namespace, Map<String, String>
}

@Override
public List<HTableDescriptor> getTableDescriptors(String namespace) {
public List<TableDescriptor> getTableDescriptors(String namespace) {
return execute(admin -> {
HTableDescriptor[] htds = admin.listTableDescriptorsByNamespace(namespace);
if (ArrayUtils.isEmpty(htds)) {
return Collections.emptyList();
}
return Arrays.asList(htds);
return admin.listTableDescriptorsByNamespace(BytesUtils.toBytes(namespace));
});
}

@Override
public HTableDescriptor getTableDescriptor(TableName tableName) {
return execute(admin -> admin.getTableDescriptor(tableName));
public TableDescriptor getTableDescriptor(TableName tableName) {
return execute(admin -> admin.getDescriptor(tableName));
}

@Override
public void createTable(HTableDescriptor htd) {
public void createTable(TableDescriptor htd) {
execute(admin -> {
admin.createTable(htd);
logger.info("{} table created, htd : {}", htd.getTableName(), htd);
Expand All @@ -103,7 +97,7 @@ public void createTable(HTableDescriptor htd) {
}

@Override
public void createTable(HTableDescriptor htd, byte[][] splitKeys) {
public void createTable(TableDescriptor htd, byte[][] splitKeys) {
execute(admin -> {
admin.createTable(htd, splitKeys);
logger.info("{} table created with {} split keys, htd : {}", htd.getTableName(), splitKeys.length + 1, htd);
Expand All @@ -112,7 +106,7 @@ public void createTable(HTableDescriptor htd, byte[][] splitKeys) {
}

@Override
public boolean createTableIfNotExists(HTableDescriptor htd) {
public boolean createTableIfNotExists(TableDescriptor htd) {
return execute(admin -> {
TableName tableName = htd.getTableName();
if (!admin.tableExists(tableName)) {
Expand Down Expand Up @@ -166,20 +160,19 @@ public void dropTable(TableName tableName) {
}

@Override
public void modifyTable(HTableDescriptor htd) {
final TableName tableName = htd.getTableName();
public void modifyTable(TableDescriptor htd) {
execute(admin -> {
admin.modifyTable(tableName, htd);
logger.info("{} table modified, htd : {}", tableName, htd);
admin.modifyTable(htd);
logger.info("table modified, htd : {}", htd);
return null;
});
}

@Override
public void addColumn(TableName tableName, HColumnDescriptor hcd) {
public void addColumn(TableName tableName, ColumnFamilyDescriptor cfDescriptor) {
execute(admin -> {
admin.addColumn(tableName, hcd);
logger.info("{} table added column : {}", tableName, hcd);
admin.addColumnFamily(tableName, cfDescriptor);
logger.info("{} table added column : {}", tableName, cfDescriptor);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package com.navercorp.pinpoint.common.hbase;

import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;

import java.util.List;
import java.util.Map;
Expand All @@ -32,15 +32,15 @@ public interface HbaseAdminOperation {

boolean createNamespaceIfNotExists(String namespace, Map<String, String> configurations);

List<HTableDescriptor> getTableDescriptors(String namespace);
List<TableDescriptor> getTableDescriptors(String namespace);

HTableDescriptor getTableDescriptor(TableName tableName);
TableDescriptor getTableDescriptor(TableName tableName);

void createTable(HTableDescriptor hTableDescriptor);
void createTable(TableDescriptor hTableDescriptor);

void createTable(HTableDescriptor hTableDescriptor, byte[][] splitKeys);
void createTable(TableDescriptor hTableDescriptor, byte[][] splitKeys);

boolean createTableIfNotExists(HTableDescriptor hTableDescriptor);
boolean createTableIfNotExists(TableDescriptor hTableDescriptor);

boolean tableExists(TableName tableName);

Expand All @@ -50,9 +50,9 @@ public interface HbaseAdminOperation {

void dropTable(TableName tableName);

void modifyTable(HTableDescriptor hTableDescriptor);
void modifyTable(TableDescriptor hTableDescriptor);

void addColumn(TableName tableName, HColumnDescriptor hColumnDescriptor);
void addColumn(TableName tableName, ColumnFamilyDescriptor hColumnDescriptor);

<T> T execute(AdminCallback<T> action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import com.navercorp.pinpoint.common.hbase.AdminCallback;
import com.navercorp.pinpoint.common.hbase.HbaseAdminOperation;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.logging.log4j.Logger;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -60,27 +60,27 @@ public boolean createNamespaceIfNotExists(String namespace, Map<String, String>
}

@Override
public List<HTableDescriptor> getTableDescriptors(String namespace) {
public List<TableDescriptor> getTableDescriptors(String namespace) {
return delegate.getTableDescriptors(namespace);
}

@Override
public HTableDescriptor getTableDescriptor(TableName tableName) {
public TableDescriptor getTableDescriptor(TableName tableName) {
return delegate.getTableDescriptor(tableName);
}

@Override
public void createTable(HTableDescriptor htd) {
public void createTable(TableDescriptor htd) {
logger.info("Creating table : {}.", htd);
}

@Override
public void createTable(HTableDescriptor htd, byte[][] splitKeys) {
public void createTable(TableDescriptor htd, byte[][] splitKeys) {
logger.info("Creating table : {} with {} splitKeys.", htd, splitKeys.length);
}

@Override
public boolean createTableIfNotExists(HTableDescriptor htd) {
public boolean createTableIfNotExists(TableDescriptor htd) {
TableName tableName = htd.getTableName();
boolean tableExists = delegate.tableExists(tableName);
if (tableExists) {
Expand Down Expand Up @@ -127,12 +127,12 @@ public void dropTable(TableName tableName) {
}

@Override
public void modifyTable(HTableDescriptor htd) {
public void modifyTable(TableDescriptor htd) {
logger.info("Modifying table : {}, desc : {}", htd.getTableName(), htd);
}

@Override
public void addColumn(TableName tableName, HColumnDescriptor hcd) {
public void addColumn(TableName tableName, ColumnFamilyDescriptor hcd) {
logger.info("Adding column to table : {}, column : {}", tableName, hcd);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import com.navercorp.pinpoint.common.hbase.HbaseAdminOperation;
import com.navercorp.pinpoint.common.util.ArrayUtils;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Arrays;
import java.util.Objects;
Expand All @@ -37,13 +37,13 @@ public class CreateTableCommand extends TableCommand {
private final byte[][] splitKeys;

CreateTableCommand(TableName tableName, Compression.Algorithm compressionAlgorithm, byte[][] splitKeys) {
super(new HTableDescriptor(Objects.requireNonNull(tableName, "tableName")), compressionAlgorithm);
super(tableName, compressionAlgorithm);
this.splitKeys = Objects.requireNonNull(splitKeys, "splitKeys");
}

@Override
public boolean execute(HbaseAdminOperation hbaseAdminOperation) {
HTableDescriptor htd = getHtd();
TableDescriptor htd = buildDescriptor();
TableName tableName = htd.getTableName();
if (hbaseAdminOperation.tableExists(tableName)) {
return false;
Expand All @@ -61,11 +61,9 @@ public boolean execute(HbaseAdminOperation hbaseAdminOperation) {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("CreateTableCommand{");
sb.append("htd=").append(getHtd());
sb.append("compressionAlgorithm=").append(getCompressionAlgorithm().getName());
sb.append(", splitKeys=").append(Arrays.toString(splitKeys));
sb.append('}');
return sb.toString();
return "CreateTableCommand{htd=" + buildDescriptor() +
"compressionAlgorithm=" + getCompressionAlgorithm().getName() +
", splitKeys=" + Arrays.toString(splitKeys) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import com.navercorp.pinpoint.hbase.schema.reader.core.ChangeSet;
import com.navercorp.pinpoint.hbase.schema.reader.core.ChangeType;
import com.navercorp.pinpoint.hbase.schema.reader.core.TableChange;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.springframework.util.CollectionUtils;

Expand Down Expand Up @@ -51,10 +51,10 @@ public HbaseSchemaCommandManager(String namespace, String compression) {
this(namespace, compression, Collections.emptyList());
}

public HbaseSchemaCommandManager(String namespace, String compression, List<HTableDescriptor> currentHtds) {
public HbaseSchemaCommandManager(String namespace, String compression, List<TableDescriptor> currentHtds) {
this.namespace = StringUtils.defaultIfEmpty(namespace, NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR);
this.compressionAlgorithm = getCompressionAlgorithm(compression);
for (HTableDescriptor htd : filterTablesByNamespace(currentHtds)) {
for (TableDescriptor htd : filterTablesByNamespace(currentHtds)) {
tableCommandMap.put(htd.getTableName(), new ModifyTableCommand(htd, this.compressionAlgorithm));
}
}
Expand All @@ -71,12 +71,12 @@ private Compression.Algorithm getCompressionAlgorithm(String compression) {
throw new IllegalArgumentException("Unknown compression option : " + compression);
}

private List<HTableDescriptor> filterTablesByNamespace(List<HTableDescriptor> htds) {
private List<TableDescriptor> filterTablesByNamespace(List<TableDescriptor> htds) {
if (CollectionUtils.isEmpty(htds)) {
return Collections.emptyList();
}
List<HTableDescriptor> filteredHtds = new ArrayList<>();
for (HTableDescriptor htd : htds) {
List<TableDescriptor> filteredHtds = new ArrayList<>();
for (TableDescriptor htd : htds) {
TableName tableName = htd.getTableName();
String namespace = tableName.getNamespaceAsString();
if (this.namespace.equalsIgnoreCase(namespace)) {
Expand Down Expand Up @@ -138,12 +138,11 @@ public List<TableCommand> getCommands() {
.collect(Collectors.toList());
}

public List<HTableDescriptor> getSchemaSnapshot() {
public List<TableDescriptor> getSchemaSnapshot() {
return tableCommandMap.entrySet().stream()
.filter(e -> affectedTables.contains(e.getKey()))
.map(Map.Entry::getValue)
.map(TableCommand::getHtd)
.map(HTableDescriptor::new)
.map(TableCommand::buildDescriptor)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
package com.navercorp.pinpoint.hbase.schema.core.command;

import com.navercorp.pinpoint.common.hbase.HbaseAdminOperation;
import com.navercorp.pinpoint.common.util.ArrayUtils;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This command is solely for adding new column families to an existing table.
Expand All @@ -35,30 +34,27 @@ public class ModifyTableCommand extends TableCommand {

private final Logger logger = LogManager.getLogger(this.getClass());

ModifyTableCommand(HTableDescriptor htd, Compression.Algorithm compressionAlgorithm) {
ModifyTableCommand(TableDescriptor htd, Compression.Algorithm compressionAlgorithm) {
super(htd, compressionAlgorithm);
}

@Override
public boolean execute(HbaseAdminOperation hbaseAdminOperation) {
HTableDescriptor htd = getHtd();
HColumnDescriptor[] hcds = htd.getColumnFamilies();
if (ArrayUtils.isEmpty(hcds)) {
return false;
}
TableDescriptor htd = buildDescriptor();
ColumnFamilyDescriptor[] cfDescriptors = htd.getColumnFamilies();

TableName tableName = htd.getTableName();
HTableDescriptor currentHtd = hbaseAdminOperation.getTableDescriptor(tableName);
TableDescriptor currentHtd = hbaseAdminOperation.getTableDescriptor(tableName);

// Filter existing column families as column family modification is not supported.
// We could use modifyTable(HTableDescriptor) to add column families, but this deletes existing column families
// if they are not specified in HTableDescriptor and this may be dangerous.
// Instead, use addColumn.
boolean changesMade = false;
for (HColumnDescriptor hcd : hcds) {
if (!currentHtd.hasFamily(hcd.getName())) {
logger.info("Adding {} to {} table.", hcd, tableName);
hbaseAdminOperation.addColumn(tableName, hcd);
for (ColumnFamilyDescriptor cf : cfDescriptors) {
if (!currentHtd.hasColumnFamily(cf.getName())) {
logger.info("Adding {} to {} table.", cf, tableName);
hbaseAdminOperation.addColumn(tableName, cf);
changesMade = true;
}
}
Expand All @@ -67,10 +63,8 @@ public boolean execute(HbaseAdminOperation hbaseAdminOperation) {

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ModifyTableCommand{");
sb.append("htd=").append(getHtd());
sb.append(", compressionAlgorithm=").append(getCompressionAlgorithm());
sb.append('}');
return sb.toString();
return "ModifyTableCommand{TableDesc=" + buildDescriptor() +
", compressionAlgorithm=" + getCompressionAlgorithm() +
'}';
}
}
Loading

0 comments on commit 0c5cc14

Please sign in to comment.