diff --git a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java index 97b9315c6e6..5e1899c73a6 100644 --- a/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/manager/balancer/BalanceParamsImpl.java @@ -20,16 +20,17 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.stream.Collectors; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.TabletIdImpl; import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.TServerInstance; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.TabletBalancer; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; @@ -41,39 +42,44 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters { private final List migrationsOut; private final SortedMap thriftCurrentStatus; private final Set thriftCurrentMigrations; - private final DataLevel currentDataLevel; + private final String partition; + private final Map tablesToBalance; public static BalanceParamsImpl fromThrift(SortedMap currentStatus, SortedMap thriftCurrentStatus, - Set thriftCurrentMigrations, DataLevel currentLevel) { + Set thriftCurrentMigrations, String partition, + Map tablesToBalance) { Set currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new) .collect(Collectors.toUnmodifiableSet()); return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(), - thriftCurrentStatus, thriftCurrentMigrations, currentLevel); + thriftCurrentStatus, thriftCurrentMigrations, partition, tablesToBalance); } public BalanceParamsImpl(SortedMap currentStatus, - Set currentMigrations, List migrationsOut, - DataLevel currentLevel) { + Set currentMigrations, List migrationsOut, String partition, + Map tablesToBalance) { this.currentStatus = currentStatus; this.currentMigrations = currentMigrations; this.migrationsOut = migrationsOut; this.thriftCurrentStatus = null; this.thriftCurrentMigrations = null; - this.currentDataLevel = currentLevel; + this.partition = partition; + this.tablesToBalance = tablesToBalance; } private BalanceParamsImpl(SortedMap currentStatus, Set currentMigrations, List migrationsOut, SortedMap thriftCurrentStatus, - Set thriftCurrentMigrations, DataLevel currentLevel) { + Set thriftCurrentMigrations, String partition, + Map tablesToBalance) { this.currentStatus = currentStatus; this.currentMigrations = currentMigrations; this.migrationsOut = migrationsOut; this.thriftCurrentStatus = thriftCurrentStatus; this.thriftCurrentMigrations = thriftCurrentMigrations; - this.currentDataLevel = currentLevel; + this.partition = partition; + this.tablesToBalance = tablesToBalance; } @Override @@ -107,7 +113,12 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns } @Override - public String currentLevel() { - return currentDataLevel.name(); + public String partitionName() { + return partition; + } + + @Override + public Map getTablesToBalance() { + return tablesToBalance; } } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java index 733e847fe56..2d648ad3397 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/BalancerEnvironment.java @@ -40,6 +40,12 @@ public interface BalancerEnvironment extends ServiceEnvironment { * Many Accumulo plugins are given table IDs as this is what Accumulo uses internally to identify * tables. This provides a mapping of table names to table IDs for the purposes of translating * and/or enumerating the existing tables. + * + *

+ * This returns all tables that exists in the system. Each request to balance should limit itself + * to {@link TabletBalancer.BalanceParameters#getTablesToBalance()} and not balance everything + * returned by this. + *

*/ Map getTableIdMap(); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java index dc34e704440..cbfa37e128f 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/GroupBalancer.java @@ -70,7 +70,7 @@ public abstract class GroupBalancer implements TabletBalancer { protected BalancerEnvironment environment; private final TableId tableId; - protected final Map lastRunTimes = new HashMap<>(DataLevel.values().length); + protected final Map lastRunTimes = new HashMap<>(DataLevel.values().length); @Override public void init(BalancerEnvironment balancerEnvironment) { @@ -213,9 +213,8 @@ public long balance(BalanceParameters params) { return 5000; } - final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel()); - - if (System.currentTimeMillis() - lastRunTimes.getOrDefault(currentLevel, 0L) < getWaitTime()) { + if (System.currentTimeMillis() - lastRunTimes.getOrDefault(params.partitionName(), 0L) + < getWaitTime()) { return 5000; } @@ -279,7 +278,7 @@ public long balance(BalanceParameters params) { populateMigrations(tservers.keySet(), params.migrationsOut(), moves); - lastRunTimes.put(currentLevel, System.currentTimeMillis()); + lastRunTimes.put(params.partitionName(), System.currentTimeMillis()); return 5000; } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java index cb88ce320c4..22b87aa3e41 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancer.java @@ -51,7 +51,6 @@ import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; import org.apache.accumulo.core.manager.balancer.TableStatisticsImpl; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TableStatistics; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; @@ -182,7 +181,7 @@ static class HrtlbConf { } private static final Set EMPTY_MIGRATIONS = Collections.emptySet(); - protected final Map lastOOBCheckTimes = new HashMap<>(DataLevel.values().length); + protected final Map lastOOBCheckTimes = new HashMap<>(); private Map> pools = new HashMap<>(); private final Map migrationsFromLastPass = new HashMap<>(); private final Map tableToTimeSinceNoMigrations = new HashMap<>(); @@ -380,7 +379,7 @@ public void getAssignments(AssignmentParameters params) { public long balance(BalanceParameters params) { long minBalanceTime = 20_000; // Iterate over the tables and balance each of them - Map tableIdMap = environment.getTableIdMap(); + Map tableIdMap = params.getTablesToBalance(); Map tableIdToTableName = tableIdMap.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getValue, Map.Entry::getKey)); tableIdToTableName.keySet().forEach(this::checkTableConfig); @@ -395,9 +394,9 @@ public long balance(BalanceParameters params) { Map> currentGrouped = splitCurrentByRegex(params.currentStatus()); - final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel()); - if ((now - this.lastOOBCheckTimes.getOrDefault(currentLevel, System.currentTimeMillis())) + if ((now + - this.lastOOBCheckTimes.getOrDefault(params.partitionName(), System.currentTimeMillis())) > myConf.oobCheckMillis) { try { // Check to see if a tablet is assigned outside the bounds of the pool. If so, migrate it. @@ -458,7 +457,7 @@ public long balance(BalanceParameters params) { } } finally { // this could have taken a while...get a new time - this.lastOOBCheckTimes.put(currentLevel, System.currentTimeMillis()); + this.lastOOBCheckTimes.put(params.partitionName(), System.currentTimeMillis()); } } @@ -511,8 +510,8 @@ public long balance(BalanceParameters params) { continue; } ArrayList newMigrations = new ArrayList<>(); - getBalancerForTable(tableId).balance( - new BalanceParamsImpl(currentView, migrations, newMigrations, DataLevel.of(tableId))); + getBalancerForTable(tableId).balance(new BalanceParamsImpl(currentView, migrations, + newMigrations, params.partitionName() + ":" + tableId, Map.of(tableName, tableId))); if (newMigrations.isEmpty()) { tableToTimeSinceNoMigrations.remove(tableId); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java index b593a104fe1..146c3ea28d9 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancer.java @@ -77,6 +77,7 @@ public class SimpleLoadBalancer implements TabletBalancer { public SimpleLoadBalancer() {} + // TODO drop this constructor and use new getTablesToBalance() method public SimpleLoadBalancer(TableId table) { tableToBalance = table; } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java index 55a24c30943..6a995f03d43 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancer.java @@ -30,7 +30,6 @@ import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.slf4j.Logger; @@ -125,12 +124,13 @@ public void getAssignments(AssignmentParameters params) { public long balance(BalanceParameters params) { long minBalanceTime = 5_000; // Iterate over the tables and balance each of them - final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel()); - for (TableId tableId : environment.getTableIdMap().values()) { + for (Entry entry : params.getTablesToBalance().entrySet()) { + String tableName = entry.getKey(); + TableId tableId = entry.getValue(); ArrayList newMigrations = new ArrayList<>(); - long tableBalanceTime = - getBalancerForTable(tableId).balance(new BalanceParamsImpl(params.currentStatus(), - params.currentMigrations(), newMigrations, currentDataLevel)); + long tableBalanceTime = getBalancerForTable(tableId) + .balance(new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(), + newMigrations, params.partitionName() + ":" + tableId, Map.of(tableName, tableId))); if (tableBalanceTime < minBalanceTime) { minBalanceTime = tableBalanceTime; } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java index 06235a10a1f..bdbdc781853 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/balancer/TabletBalancer.java @@ -24,6 +24,7 @@ import java.util.SortedMap; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; @@ -95,13 +96,24 @@ interface BalanceParameters { List migrationsOut(); /** - * Return the DataLevel name for which the Manager is currently balancing. Balancers should - * return migrations for tables within the current DataLevel. + * Accumulo may partition tables in different ways and pass subsets of tables to the balancer + * via {@link #getTablesToBalance()}. Each partition is given a unique name that is always the + * same for a given partition. Balancer can use this to determine if they are being called for + * the same or a different partition if tracking state between balance calls. * - * @return name of current balancing iteration data level + * @return name of current partition of tables to balance. * @since 2.1.4 */ - String currentLevel(); + String partitionName(); + + /** + * This is the set of tables the balancer should consider. Balancing any tables outside of this + * set will be ignored and result in an error in the logs. + * + * @return map of table names to table ids that. + * @since 2.1.4 + */ + Map getTablesToBalance(); } /** diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java index e55eb379d23..1c8abdec982 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/GroupBalancerTest.java @@ -40,7 +40,6 @@ import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -124,7 +123,7 @@ public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) { } balancer - .balance(new BalanceParamsImpl(current, migrations, migrationsOut, DataLevel.of(tid))); + .balance(new BalanceParamsImpl(current, migrations, migrationsOut, "USER", Map.of())); assertTrue(migrationsOut.size() <= (maxMigrations + 5), "Max Migration exceeded " + maxMigrations + " " + migrationsOut.size()); diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java index 58a89ec6260..0004055345f 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerReconfigurationTest.java @@ -43,7 +43,6 @@ import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; import org.apache.accumulo.core.spi.balancer.data.TabletStatistics; @@ -108,7 +107,7 @@ public void testConfigurationChanges() { // getOnlineTabletsForTable UtilWaitThread.sleep(3000); this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), - migrations, migrationsOut, DataLevel.USER)); + migrations, migrationsOut, "USER", tables)); assertEquals(0, migrationsOut.size()); // Change property, simulate call by TableConfWatcher @@ -118,9 +117,9 @@ public void testConfigurationChanges() { // in the HostRegexTableLoadBalancer. For this test we want // to get into the out of bounds checking code, so we need to // populate the map with an older time value - this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2); + this.lastOOBCheckTimes.put("USER", System.currentTimeMillis() / 2); this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers), - migrations, migrationsOut, DataLevel.USER)); + migrations, migrationsOut, "USER", tables)); assertEquals(5, migrationsOut.size()); for (TabletMigration migration : migrationsOut) { assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1") diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java index 4d3162e02d2..3ec645bcd82 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/HostRegexTableLoadBalancerTest.java @@ -48,7 +48,6 @@ import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -98,7 +97,7 @@ public void testBalance() { List migrationsOut = new ArrayList<>(); long wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut, DataLevel.USER)); + migrations, migrationsOut, "USER", environment.getTableIdMap())); assertEquals(20000, wait); // should balance four tablets in one of the tables before reaching max assertEquals(4, migrationsOut.size()); @@ -109,7 +108,7 @@ public void testBalance() { } migrationsOut.clear(); wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut, DataLevel.USER)); + migrations, migrationsOut, "USER", environment.getTableIdMap())); assertEquals(20000, wait); // should balance four tablets in one of the other tables before reaching max assertEquals(4, migrationsOut.size()); @@ -120,7 +119,7 @@ public void testBalance() { } migrationsOut.clear(); wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut, DataLevel.USER)); + migrations, migrationsOut, "USER", environment.getTableIdMap())); assertEquals(20000, wait); // should balance four tablets in one of the other tables before reaching max assertEquals(4, migrationsOut.size()); @@ -131,7 +130,7 @@ public void testBalance() { } migrationsOut.clear(); wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut, DataLevel.USER)); + migrations, migrationsOut, "USER", environment.getTableIdMap())); assertEquals(20000, wait); // no more balancing to do assertEquals(0, migrationsOut.size()); @@ -148,7 +147,7 @@ public void testBalanceWithTooManyOutstandingMigrations() { migrations.addAll(tableTablets.get(BAR.getTableName())); long wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)), - migrations, migrationsOut, DataLevel.USER)); + migrations, migrationsOut, "USER", environment.getTableIdMap())); assertEquals(20000, wait); // no migrations should have occurred as 10 is the maxOutstandingMigrations assertEquals(0, migrationsOut.size()); @@ -491,12 +490,12 @@ public void testOutOfBoundsTablets() { // in the HostRegexTableLoadBalancer. For this test we want // to get into the out of bounds checking code, so we need to // populate the map with an older time value - this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2); + this.lastOOBCheckTimes.put("USER", System.currentTimeMillis() / 2); init(DEFAULT_TABLE_PROPERTIES); Set migrations = new HashSet<>(); List migrationsOut = new ArrayList<>(); - this.balance( - new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER)); + this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, "USER", + environment.getTableIdMap())); assertEquals(2, migrationsOut.size()); } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java index 055898928b3..0aa98371554 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/SimpleLoadBalancerTest.java @@ -42,7 +42,6 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -204,7 +203,7 @@ public void testUnevenAssignment() { while (true) { List migrationsOut = new ArrayList<>(); balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, - DataLevel.USER)); + "USER", Map.of())); if (migrationsOut.isEmpty()) { break; } @@ -247,7 +246,7 @@ public void testUnevenAssignment2() { while (true) { List migrationsOut = new ArrayList<>(); balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, - DataLevel.USER)); + "USER", Map.of())); if (migrationsOut.isEmpty()) { break; } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java index 8e9aefd0283..ba8b96039df 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/balancer/TableLoadBalancerTest.java @@ -44,7 +44,6 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -142,13 +141,13 @@ public void test() { List migrationsOut = new ArrayList<>(); TableLoadBalancer tls = new TableLoadBalancer(); tls.init(environment); - tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER)); + tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, "USER", tableIdMap)); assertEquals(0, migrationsOut.size()); state.put(mkts("10.0.0.2", 2345, "0x02030405"), status()); tls = new TableLoadBalancer(); tls.init(environment); - tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, DataLevel.USER)); + tls.balance(new BalanceParamsImpl(state, migrations, migrationsOut, "USER", tableIdMap)); int count = 0; Map movedByTable = new HashMap<>(); movedByTable.put(TableId.of(t1Id), 0); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java index dbb611303f8..c9ae4528877 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataTableScanner.java @@ -96,6 +96,7 @@ public static void configureScanner(ScannerBase scanner, CurrentState state) { TabletStateChangeIterator.setCurrentServers(tabletChange, state.onlineTabletServers()); TabletStateChangeIterator.setOnlineTables(tabletChange, state.onlineTables()); TabletStateChangeIterator.setMerges(tabletChange, state.merges()); + // TODO this scan is for a level, only get the migrations for the level TabletStateChangeIterator.setMigrations(tabletChange, state.migrationsSnapshot()); TabletStateChangeIterator.setManagerState(tabletChange, state.getManagerState()); TabletStateChangeIterator.setShuttingDown(tabletChange, state.shutdownServers()); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index a3540fa062a..99b1da278f5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -32,7 +32,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -188,8 +187,7 @@ public class Manager extends AbstractServer final Map badServers = Collections.synchronizedMap(new HashMap<>()); final Set serversToShutdown = Collections.synchronizedSet(new HashSet<>()); - final SortedMap migrations = - Collections.synchronizedSortedMap(new TreeMap<>()); + final Migrations migrations = new Migrations(); final EventCoordinator nextEvent = new EventCoordinator(); private final Object mergeLock = new Object(); private Thread replicationWorkThread; @@ -419,6 +417,7 @@ public static void main(String[] args) throws Exception { protected Manager(ServerOpts opts, String[] args) throws IOException { super("manager", opts, args); + ServerContext context = super.getContext(); balancerEnvironment = new BalancerEnvironmentImpl(context); @@ -559,9 +558,7 @@ public boolean hasCycled(long time) { } public void clearMigrations(TableId tableId) { - synchronized (migrations) { - migrations.keySet().removeIf(extent -> extent.tableId().equals(tableId)); - } + migrations.removeTable(tableId); } public MetricsProducer getBalancerMetrics() { @@ -716,11 +713,7 @@ public void run() { */ private void cleanupNonexistentMigrations(final ClientContext clientContext) { - Map> notSeen; - - synchronized (migrations) { - notSeen = partitionMigrations(migrations.keySet()); - } + Map> notSeen = migrations.mutableCopy(); // for each level find the set of migrating tablets that do not exists in metadata store for (DataLevel dataLevel : DataLevel.values()) { @@ -740,7 +733,7 @@ private void cleanupNonexistentMigrations(final ClientContext clientContext) { // remove any tablets that previously existed in migrations for this level but were not seen // in the metadata table for the level - migrations.keySet().removeAll(notSeenForLevel); + migrations.removeExtents(notSeenForLevel); } } @@ -802,23 +795,6 @@ public void run() { } - /** - * balanceTablets() balances tables by DataLevel. Return the current set of migrations partitioned - * by DataLevel - */ - private static Map> - partitionMigrations(final Set migrations) { - final Map> partitionedMigrations = new EnumMap<>(DataLevel.class); - // populate to prevent NPE - for (DataLevel dl : DataLevel.values()) { - partitionedMigrations.put(dl, new HashSet<>()); - } - migrations.forEach(ke -> { - partitionedMigrations.get(DataLevel.of(ke.tableId())).add(ke); - }); - return partitionedMigrations; - } - private class StatusThread implements Runnable { private boolean goodStats() { @@ -1000,20 +976,22 @@ private SortedMap createTServerStatusView( final Map newTableMap = new HashMap<>(dl == DataLevel.USER ? oldTableMap.size() : 1); if (dl == DataLevel.ROOT) { - if (oldTableMap.containsKey(RootTable.NAME)) { - newTableMap.put(RootTable.NAME, oldTableMap.get(RootTable.NAME)); + if (oldTableMap.containsKey(RootTable.ID.canonical())) { + newTableMap.put(RootTable.ID.canonical(), oldTableMap.get(RootTable.ID.canonical())); } } else if (dl == DataLevel.METADATA) { - if (oldTableMap.containsKey(MetadataTable.NAME)) { - newTableMap.put(MetadataTable.NAME, oldTableMap.get(MetadataTable.NAME)); + if (oldTableMap.containsKey(MetadataTable.ID.canonical())) { + newTableMap.put(MetadataTable.ID.canonical(), + oldTableMap.get(MetadataTable.ID.canonical())); } } else if (dl == DataLevel.USER) { - if (!oldTableMap.containsKey(MetadataTable.NAME) - && !oldTableMap.containsKey(RootTable.NAME)) { + if (!oldTableMap.containsKey(MetadataTable.ID.canonical()) + && !oldTableMap.containsKey(RootTable.ID.canonical())) { newTableMap.putAll(oldTableMap); } else { oldTableMap.forEach((table, info) -> { - if (!table.equals(RootTable.NAME) && !table.equals(MetadataTable.NAME)) { + if (!table.equals(RootTable.ID.canonical()) + && !table.equals(MetadataTable.ID.canonical())) { newTableMap.put(table, info); } }); @@ -1027,14 +1005,29 @@ private SortedMap createTServerStatusView( return tserverStatusForLevel; } + private Map getTablesForLevel(DataLevel dataLevel) { + switch (dataLevel) { + case ROOT: + return Map.of(RootTable.NAME, RootTable.ID); + case METADATA: + return Map.of(MetadataTable.NAME, MetadataTable.ID); + case USER: { + Map userTables = new HashMap<>(getContext().getTableNameToIdMap()); + userTables.remove(RootTable.NAME); + userTables.remove(MetadataTable.NAME); + return Collections.unmodifiableMap(userTables); + } + default: + throw new IllegalArgumentException("Unknown data level " + dataLevel); + } + } + private long balanceTablets() { final int tabletsNotHosted = notHosted(); BalanceParamsImpl params = null; long wait = 0; long totalMigrationsOut = 0; - final Map> partitionedMigrations = - partitionMigrations(migrationsSnapshot()); int levelsCompleted = 0; for (DataLevel dl : DataLevel.values()) { @@ -1043,6 +1036,18 @@ private long balanceTablets() { tabletsNotHosted); continue; } + + if ((dl == DataLevel.METADATA || dl == DataLevel.USER) + && !migrations.isEmpty(DataLevel.ROOT)) { + log.debug("Not balancing {} because {} has migrations", dl, DataLevel.ROOT); + continue; + } + + if (dl == DataLevel.USER && !migrations.isEmpty(DataLevel.METADATA)) { + log.debug("Not balancing {} because {} has migrations", dl, DataLevel.METADATA); + continue; + } + // Create a view of the tserver status such that it only contains the tables // for this level in the tableMap. SortedMap tserverStatusForLevel = @@ -1053,44 +1058,25 @@ private long balanceTablets() { tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel .put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status))); - long migrationsOutForLevel = 0; - int attemptNum = 0; - do { - log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, ++attemptNum); - - SortedMap statusForBalancerLevel = - tserverStatusForBalancerLevel; - if (attemptNum > 1 && (dl == DataLevel.ROOT || dl == DataLevel.METADATA)) { - // If we are still migrating then perform a re-check on the tablet - // servers to make sure non of them have failed. - Set currentServers = tserverSet.getCurrentServers(); - tserverStatus = gatherTableInformation(currentServers); - // Create a view of the tserver status such that it only contains the tables - // for this level in the tableMap. - tserverStatusForLevel = createTServerStatusView(dl, tserverStatus); - final SortedMap tserverStatusForBalancerLevel2 = - new TreeMap<>(); - tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel2 - .put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status))); - statusForBalancerLevel = tserverStatusForBalancerLevel2; - } + log.debug("Balancing for tables at level {}", dl); - params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, tserverStatusForLevel, - partitionedMigrations.get(dl), dl); - wait = Math.max(tabletBalancer.balance(params), wait); - migrationsOutForLevel = 0; - for (TabletMigration m : checkMigrationSanity(statusForBalancerLevel.keySet(), - params.migrationsOut(), dl)) { - final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet()); - if (migrations.containsKey(ke)) { - log.warn("balancer requested migration more than once, skipping {}", m); - continue; - } - migrationsOutForLevel++; - migrations.put(ke, TabletServerIdImpl.toThrift(m.getNewTabletServer())); - log.debug("migration {}", m); + SortedMap statusForBalancerLevel = + tserverStatusForBalancerLevel; + params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, tserverStatusForLevel, + migrations.snapshot(dl), dl.name(), getTablesForLevel(dl)); + wait = Math.max(tabletBalancer.balance(params), wait); + long migrationsOutForLevel = 0; + for (TabletMigration m : checkMigrationSanity(statusForBalancerLevel.keySet(), + params.migrationsOut(), dl)) { + final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet()); + if (migrations.contains(ke)) { + log.warn("balancer requested migration more than once, skipping {}", m); + continue; } - } while (migrationsOutForLevel > 0 && (dl == DataLevel.ROOT || dl == DataLevel.METADATA)); + migrationsOutForLevel++; + migrations.put(ke, TabletServerIdImpl.toThrift(m.getNewTabletServer())); + log.debug("migration {}", m); + } totalMigrationsOut += migrationsOutForLevel; // increment this at end of loop to signal complete run w/o any continue @@ -1116,7 +1102,7 @@ private List checkMigrationSanity(Set current, if (m.getTablet() == null) { log.error("Balancer gave back a null tablet {}", m); } else if (DataLevel.of(m.getTablet().getTable()) != level) { - log.trace( + log.warn( "Balancer wants to move a tablet ({}) outside of the current processing level ({}), " + "ignoring and should be processed at the correct level ({})", m.getTablet(), level, DataLevel.of(m.getTablet().getTable())); @@ -1768,16 +1754,7 @@ public void update(LiveTServerSet current, Set deleted, cleanListByHostAndPort(serversToShutdown, deleted, added); } - synchronized (migrations) { - Iterator> iter = migrations.entrySet().iterator(); - while (iter.hasNext()) { - Entry entry = iter.next(); - if (deleted.contains(entry.getValue())) { - log.info("Canceling migration of {} to {}", entry.getKey(), entry.getValue()); - iter.remove(); - } - } - } + migrations.removeServers(deleted); nextEvent.event("There are now %d tablet servers", current.size()); } @@ -1942,11 +1919,7 @@ public boolean delegationTokensAvailable() { @Override public Set migrationsSnapshot() { - Set migrationKeys; - synchronized (migrations) { - migrationKeys = new HashSet<>(migrations.keySet()); - } - return Collections.unmodifiableSet(migrationKeys); + return migrations.snapshotAll(); } @Override diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java index ee914703ccf..3f3cc61a113 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java @@ -351,7 +351,7 @@ public void reportSplitExtent(TInfo info, TCredentials credentials, String serve } KeyExtent oldTablet = KeyExtent.fromThrift(split.oldTablet); - if (manager.migrations.remove(oldTablet) != null) { + if (manager.migrations.removeExtent(oldTablet) != null) { Manager.log.info("Canceled migration of {}", split.oldTablet); } for (TServerInstance instance : manager.tserverSet.getCurrentServers()) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Migrations.java b/server/manager/src/main/java/org/apache/accumulo/manager/Migrations.java new file mode 100644 index 00000000000..cf2d74e7235 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Migrations.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.function.Predicate; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Migrations { + private final EnumMap> migrations; + + private static final Logger log = LoggerFactory.getLogger(Migrations.class); + + public Migrations() { + this.migrations = new EnumMap<>(DataLevel.class); + for (var dataLevel : DataLevel.values()) { + migrations.put(dataLevel, Collections.synchronizedSortedMap(new TreeMap<>())); + } + } + + private void removeIf(Predicate> removalTest) { + for (var dataLevel : DataLevel.values()) { + var mmap = migrations.get(dataLevel); + mmap.entrySet().removeIf(entry -> { + if (removalTest.test(entry)) { + log.trace("Removed migration {} -> {}", entry.getKey(), entry.getValue()); + return true; + } + return false; + }); + } + } + + public void removeTable(TableId tableId) { + removeIf(entry -> entry.getKey().tableId().equals(tableId)); + } + + public void removeExtents(Set extents) { + extents.forEach(this::removeExtent); + } + + public TServerInstance removeExtent(KeyExtent extent) { + var tserver = migrations.get(DataLevel.of(extent.tableId())).remove(extent); + if (tserver != null) { + log.trace("Removed migration {} -> {}", extent, tserver); + } + return tserver; + } + + public void removeServers(Set servers) { + removeIf(entry -> servers.contains(entry.getValue())); + } + + public void put(KeyExtent extent, TServerInstance tServerInstance) { + migrations.get(DataLevel.of(extent.tableId())).put(extent, tServerInstance); + log.trace("Added migration {} -> {}", extent, tServerInstance); + } + + public TServerInstance get(KeyExtent extent) { + return migrations.get(DataLevel.of(extent.tableId())).get(extent); + } + + public Set snapshotAll() { + var copy = new HashSet(); + migrations.values().forEach(mset -> { + synchronized (mset) { + copy.addAll(mset.keySet()); + } + }); + return Collections.unmodifiableSet(copy); + } + + public Set snapshot(DataLevel dl) { + return Set.copyOf(migrations.get(dl).keySet()); + } + + public Map> mutableCopy() { + Map> copy = new EnumMap<>(DataLevel.class); + for (var dataLevel : DataLevel.values()) { + copy.put(dataLevel, new HashSet<>(migrations.get(dataLevel).keySet())); + } + return copy; + } + + public boolean isEmpty() { + for (var mset : migrations.values()) { + if (!mset.isEmpty()) { + return false; + } + } + return true; + } + + public int size() { + int size = 0; + for (var mset : migrations.values()) { + size += mset.size(); + } + return size; + } + + public boolean isEmpty(DataLevel dataLevel) { + return migrations.get(dataLevel).isEmpty(); + } + + public boolean contains(KeyExtent extent) { + return migrations.get(DataLevel.of(extent.tableId())).containsKey(extent); + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index 8e4eb0768ef..b306f6ef7b0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -326,7 +326,7 @@ public void run() { switch (state) { case HOSTED: if (location.getServerInstance().equals(manager.migrations.get(tls.extent))) { - manager.migrations.remove(tls.extent); + manager.migrations.removeExtent(tls.extent); } break; case ASSIGNED_TO_DEAD_SERVER: @@ -453,7 +453,7 @@ private void hostUnassignedTablet(TabletLists tLists, KeyExtent tablet, tLists.assignments.add(new Assignment(tablet, dest, unassignedTablet.getLastLocation())); } else { // get rid of this migration - manager.migrations.remove(tablet); + manager.migrations.removeExtent(tablet); tLists.unassigned.put(tablet, unassignedTablet); } } else { @@ -492,7 +492,7 @@ private void hostDeadTablet(TabletLists tLists, TabletLocationState tls, Locatio WalStateManager wals) throws WalMarkerException { tLists.assignedToDeadServers.add(tls); if (location.getServerInstance().equals(manager.migrations.get(tls.extent))) { - manager.migrations.remove(tls.extent); + manager.migrations.removeExtent(tls.extent); } TServerInstance tserver = tls.futureOrCurrentServer(); if (!tLists.logsForDeadServers.containsKey(tserver)) { @@ -504,7 +504,7 @@ private void cancelOfflineTableMigrations(KeyExtent extent) { TServerInstance dest = manager.migrations.get(extent); TableState tableState = manager.getTableManager().getTableState(extent.tableId()); if (dest != null && tableState == TableState.OFFLINE) { - manager.migrations.remove(extent); + manager.migrations.removeExtent(extent); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/MigrationsTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/MigrationsTest.java new file mode 100644 index 00000000000..bb55d03f876 --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/MigrationsTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.util.HostAndPort; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class MigrationsTest { + + private final TServerInstance TS1 = + new TServerInstance(HostAndPort.fromParts("server1", 9997), "s001"); + private final TServerInstance TS2 = + new TServerInstance(HostAndPort.fromParts("server2", 1234), "s002"); + private final TServerInstance TS3 = + new TServerInstance(HostAndPort.fromParts("server3", 6789), "s002"); + + private final TableId TID1 = TableId.of("1"); + private final TableId TID2 = TableId.of("2"); + private final TableId TID3 = TableId.of("3"); + + private final KeyExtent ME = new KeyExtent(MetadataTable.ID, null, null); + + private final KeyExtent KE1 = new KeyExtent(TID1, null, null); + private final KeyExtent KE2 = new KeyExtent(TID2, null, null); + private final KeyExtent KE3 = new KeyExtent(TID3, null, null); + + Migrations migrations; + + @BeforeEach + public void setup() { + migrations = new Migrations(); + migrations.put(RootTable.EXTENT, TS2); + migrations.put(ME, TS1); + migrations.put(KE1, TS2); + migrations.put(KE2, TS1); + } + + @Test + public void testInitial() { + assertEquals(4, migrations.size()); + assertEquals(Set.of(RootTable.EXTENT, ME, KE1, KE2), migrations.snapshotAll()); + assertEquals(Set.of(RootTable.EXTENT), migrations.snapshot(DataLevel.ROOT)); + assertEquals(Set.of(ME), migrations.snapshot(DataLevel.METADATA)); + assertEquals(Set.of(KE1, KE2), migrations.snapshot(DataLevel.USER)); + assertTrue(migrations.contains(RootTable.EXTENT)); + assertTrue(migrations.contains(ME)); + assertTrue(migrations.contains(KE1)); + assertTrue(migrations.contains(KE2)); + assertFalse(migrations.contains(KE3)); + assertEquals(TS2, migrations.get(RootTable.EXTENT)); + assertEquals(TS1, migrations.get(ME)); + assertEquals(TS2, migrations.get(KE1)); + assertEquals(TS1, migrations.get(KE2)); + assertNull(migrations.get(KE3)); + assertFalse(migrations.isEmpty()); + assertFalse(migrations.isEmpty(DataLevel.ROOT)); + assertFalse(migrations.isEmpty(DataLevel.METADATA)); + assertFalse(migrations.isEmpty(DataLevel.USER)); + assertEquals(Map.of(DataLevel.ROOT, Set.of(RootTable.EXTENT), DataLevel.METADATA, Set.of(ME), + DataLevel.USER, Set.of(KE1, KE2)), migrations.mutableCopy()); + } + + @Test + public void testRemoveServer() { + migrations.removeServers(Set.of(TS1)); + + assertEquals(2, migrations.size()); + assertEquals(Set.of(RootTable.EXTENT, KE1), migrations.snapshotAll()); + assertEquals(Set.of(RootTable.EXTENT), migrations.snapshot(DataLevel.ROOT)); + assertEquals(Set.of(), migrations.snapshot(DataLevel.METADATA)); + assertEquals(Set.of(KE1), migrations.snapshot(DataLevel.USER)); + assertTrue(migrations.contains(RootTable.EXTENT)); + assertFalse(migrations.contains(ME)); + assertTrue(migrations.contains(KE1)); + assertFalse(migrations.contains(KE3)); + assertEquals(TS2, migrations.get(RootTable.EXTENT)); + assertNull(migrations.get(ME)); + assertEquals(TS2, migrations.get(KE1)); + assertNull(migrations.get(KE2)); + assertNull(migrations.get(KE3)); + assertFalse(migrations.isEmpty()); + assertFalse(migrations.isEmpty(DataLevel.ROOT)); + assertTrue(migrations.isEmpty(DataLevel.METADATA)); + assertFalse(migrations.isEmpty(DataLevel.USER)); + assertEquals(Map.of(DataLevel.ROOT, Set.of(RootTable.EXTENT), DataLevel.METADATA, Set.of(), + DataLevel.USER, Set.of(KE1)), migrations.mutableCopy()); + } + + @Test + public void testRemoveExtent() { + migrations.removeExtent(KE1); + + assertEquals(3, migrations.size()); + assertEquals(Set.of(RootTable.EXTENT, ME, KE2), migrations.snapshotAll()); + assertEquals(Set.of(RootTable.EXTENT), migrations.snapshot(DataLevel.ROOT)); + assertEquals(Set.of(ME), migrations.snapshot(DataLevel.METADATA)); + assertEquals(Set.of(KE2), migrations.snapshot(DataLevel.USER)); + assertTrue(migrations.contains(RootTable.EXTENT)); + assertTrue(migrations.contains(ME)); + assertFalse(migrations.contains(KE1)); + assertTrue(migrations.contains(KE2)); + assertFalse(migrations.contains(KE3)); + assertEquals(TS2, migrations.get(RootTable.EXTENT)); + assertEquals(TS1, migrations.get(ME)); + assertNull(migrations.get(KE1)); + assertEquals(TS1, migrations.get(KE2)); + assertNull(migrations.get(KE3)); + assertFalse(migrations.isEmpty()); + assertFalse(migrations.isEmpty(DataLevel.ROOT)); + assertFalse(migrations.isEmpty(DataLevel.METADATA)); + assertFalse(migrations.isEmpty(DataLevel.USER)); + assertEquals(Map.of(DataLevel.ROOT, Set.of(RootTable.EXTENT), DataLevel.METADATA, Set.of(ME), + DataLevel.USER, Set.of(KE2)), migrations.mutableCopy()); + } + + @Test + public void testRemoveExtents() { + migrations.removeExtents(Set.of(KE1, RootTable.EXTENT)); + + assertEquals(2, migrations.size()); + assertEquals(Set.of(ME, KE2), migrations.snapshotAll()); + assertEquals(Set.of(), migrations.snapshot(DataLevel.ROOT)); + assertEquals(Set.of(ME), migrations.snapshot(DataLevel.METADATA)); + assertEquals(Set.of(KE2), migrations.snapshot(DataLevel.USER)); + assertFalse(migrations.contains(RootTable.EXTENT)); + assertTrue(migrations.contains(ME)); + assertFalse(migrations.contains(KE1)); + assertTrue(migrations.contains(KE2)); + assertFalse(migrations.contains(KE3)); + assertNull(migrations.get(RootTable.EXTENT)); + assertEquals(TS1, migrations.get(ME)); + assertNull(migrations.get(KE1)); + assertEquals(TS1, migrations.get(KE2)); + assertNull(migrations.get(KE3)); + assertFalse(migrations.isEmpty()); + assertTrue(migrations.isEmpty(DataLevel.ROOT)); + assertFalse(migrations.isEmpty(DataLevel.METADATA)); + assertFalse(migrations.isEmpty(DataLevel.USER)); + assertEquals(Map.of(DataLevel.ROOT, Set.of(), DataLevel.METADATA, Set.of(ME), DataLevel.USER, + Set.of(KE2)), migrations.mutableCopy()); + } + + @Test + public void testRemoveTable() { + migrations.removeTable(TID1); + + assertEquals(3, migrations.size()); + assertEquals(Set.of(RootTable.EXTENT, ME, KE2), migrations.snapshotAll()); + assertEquals(Set.of(RootTable.EXTENT), migrations.snapshot(DataLevel.ROOT)); + assertEquals(Set.of(ME), migrations.snapshot(DataLevel.METADATA)); + assertEquals(Set.of(KE2), migrations.snapshot(DataLevel.USER)); + assertTrue(migrations.contains(RootTable.EXTENT)); + assertTrue(migrations.contains(ME)); + assertFalse(migrations.contains(KE1)); + assertTrue(migrations.contains(KE2)); + assertFalse(migrations.contains(KE3)); + assertEquals(TS2, migrations.get(RootTable.EXTENT)); + assertEquals(TS1, migrations.get(ME)); + assertNull(migrations.get(KE1)); + assertEquals(TS1, migrations.get(KE2)); + assertNull(migrations.get(KE3)); + assertFalse(migrations.isEmpty()); + assertFalse(migrations.isEmpty(DataLevel.ROOT)); + assertFalse(migrations.isEmpty(DataLevel.METADATA)); + assertFalse(migrations.isEmpty(DataLevel.USER)); + assertEquals(Map.of(DataLevel.ROOT, Set.of(RootTable.EXTENT), DataLevel.METADATA, Set.of(ME), + DataLevel.USER, Set.of(KE2)), migrations.mutableCopy()); + } + + @Test + public void testEmpty() { + migrations.removeServers(Set.of(TS1, TS2)); + + assertEquals(0, migrations.size()); + assertEquals(Set.of(), migrations.snapshotAll()); + assertEquals(Set.of(), migrations.snapshot(DataLevel.ROOT)); + assertEquals(Set.of(), migrations.snapshot(DataLevel.METADATA)); + assertEquals(Set.of(), migrations.snapshot(DataLevel.USER)); + assertFalse(migrations.contains(RootTable.EXTENT)); + assertFalse(migrations.contains(ME)); + assertFalse(migrations.contains(KE1)); + assertFalse(migrations.contains(KE2)); + assertFalse(migrations.contains(KE3)); + assertNull(migrations.get(RootTable.EXTENT)); + assertNull(migrations.get(ME)); + assertNull(migrations.get(KE1)); + assertNull(migrations.get(KE2)); + assertNull(migrations.get(KE3)); + assertTrue(migrations.isEmpty()); + assertTrue(migrations.isEmpty(DataLevel.ROOT)); + assertTrue(migrations.isEmpty(DataLevel.METADATA)); + assertTrue(migrations.isEmpty(DataLevel.USER)); + assertEquals( + Map.of(DataLevel.ROOT, Set.of(), DataLevel.METADATA, Set.of(), DataLevel.USER, Set.of()), + migrations.mutableCopy()); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceIT.java index 01644639030..a282388ce7d 100644 --- a/test/src/main/java/org/apache/accumulo/test/BalanceIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BalanceIT.java @@ -18,27 +18,40 @@ */ package org.apache.accumulo.test; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import java.time.Duration; +import java.util.HashMap; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BalanceIT extends AccumuloClusterHarness { +public class BalanceIT extends ConfigurableMacBase { private static final Logger log = LoggerFactory.getLogger(BalanceIT.class); @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { Map siteConfig = cfg.getSiteConfig(); siteConfig.put(Property.TSERV_MAXMEM.getKey(), "10K"); siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "50ms"); @@ -46,7 +59,7 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoo siteConfig.put("general.custom.metrics.opts.logging.step", "0.5s"); cfg.setSiteConfig(siteConfig); // ensure we have two tservers - if (cfg.getNumTservers() < 2) { + if (cfg.getNumTservers() != 2) { cfg.setNumTservers(2); } } @@ -59,7 +72,7 @@ protected Duration defaultTimeout() { @Test public void testBalance() throws Exception { String tableName = getUniqueNames(1)[0]; - try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { log.info("Creating table"); c.tableOperations().create(tableName); SortedSet splits = new TreeSet<>(); @@ -72,4 +85,77 @@ public void testBalance() throws Exception { c.instanceOperations().waitForBalance(); } } + + @Test + public void testBalanceMetadata() throws Exception { + String tableName = getUniqueNames(1)[0]; + try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { + SortedSet splits = new TreeSet<>(); + for (int i = 0; i < 10; i++) { + splits.add(new Text("" + i)); + } + c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); + + var metaSplits = IntStream.range(1, 100).mapToObj(i -> Integer.toString(i, 36)).map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + c.tableOperations().addSplits(MetadataTable.NAME, metaSplits); + + var locCounts = countLocations(c, MetadataTable.NAME); + + c.instanceOperations().waitForBalance(); + + locCounts = countLocations(c, MetadataTable.NAME); + var stats = locCounts.values().stream().mapToInt(i -> i).summaryStatistics(); + assertTrue(stats.getMax() <= 51, locCounts.toString()); + assertTrue(stats.getMin() >= 50, locCounts.toString()); + assertEquals(2, stats.getCount(), locCounts.toString()); + + assertEquals(2, getCluster().getConfig().getNumTservers()); + getCluster().getConfig().setNumTservers(4); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + + Wait.waitFor(() -> { + var lc = countLocations(c, MetadataTable.NAME); + log.info("locations:{}", lc); + return lc.size() == 4; + }); + + c.instanceOperations().waitForBalance(); + + locCounts = countLocations(c, MetadataTable.NAME); + stats = locCounts.values().stream().mapToInt(i -> i).summaryStatistics(); + assertTrue(stats.getMax() <= 26, locCounts.toString()); + assertTrue(stats.getMin() >= 25, locCounts.toString()); + assertEquals(4, stats.getCount(), locCounts.toString()); + + // The user table should eventually balance + Wait.waitFor(() -> { + var lc = countLocations(c, tableName); + log.info("locations:{}", lc); + return lc.size() == 4; + }); + + locCounts = countLocations(c, tableName); + stats = locCounts.values().stream().mapToInt(i -> i).summaryStatistics(); + assertTrue(stats.getMax() <= 3, locCounts.toString()); + assertTrue(stats.getMin() >= 2, locCounts.toString()); + assertEquals(4, stats.getCount(), locCounts.toString()); + } + } + + private Map countLocations(AccumuloClient client, String tableName) + throws Exception { + var ctx = ((ClientContext) client); + var ample = ctx.getAmple(); + try (var tabletsMeta = + ample.readTablets().forTable(ctx.getTableId(tableName)).fetch(LOCATION, PREV_ROW).build()) { + Map locCounts = new HashMap<>(); + for (var tabletMeta : tabletsMeta) { + var loc = tabletMeta.getLocation(); + locCounts.merge(loc == null ? " none" : loc.toString(), 1, Integer::sum); + } + return locCounts; + } + } } diff --git a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java index 57fbd33247f..3bb4a1e0f73 100644 --- a/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java +++ b/test/src/test/java/org/apache/accumulo/test/ChaoticLoadBalancerTest.java @@ -40,7 +40,6 @@ import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl; import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl; import org.apache.accumulo.core.master.thrift.TableInfo; -import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.spi.balancer.data.TServerStatus; import org.apache.accumulo.core.spi.balancer.data.TabletMigration; import org.apache.accumulo.core.spi.balancer.data.TabletServerId; @@ -159,7 +158,7 @@ public void testUnevenAssignment() { List migrationsOut = new ArrayList<>(); while (!migrationsOut.isEmpty()) { balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut, - DataLevel.USER)); + "USER", Map.of())); } }