Skip to content

Commit

Permalink
Add delta in reassign process
Browse files Browse the repository at this point in the history
  • Loading branch information
kewang1024 authored and jessesleeping committed Dec 20, 2019
1 parent a60985c commit 5f781d3
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,12 @@ private ConnectorSplit createSplit(BucketShards bucketShards)
throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
Node node = selectRandom(availableNodes);
shardManager.replaceShardAssignment(tableId, shardUuid, node.getNodeIdentifier(), true);
shardManager.replaceShardAssignment(
tableId,
shardUuid,
deltaShardUuid,
node.getNodeIdentifier(),
true);
addresses = ImmutableList.of(node.getHostAndPort());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ public ResultIterator<BucketShards> getShardNodesBucketed(long tableId, boolean
}

@Override
public void replaceShardAssignment(long tableId, UUID shardUuid, String nodeIdentifier, boolean gracePeriod)
public void replaceShardAssignment(long tableId, UUID shardUuid, Optional<UUID> deltaUuid, String nodeIdentifier, boolean gracePeriod)
{
if (gracePeriod && (nanosSince(startTime).compareTo(startupGracePeriod) < 0)) {
throw new PrestoException(SERVER_STARTING_UP, "Cannot reassign shards while server is starting");
Expand All @@ -1000,10 +1000,18 @@ public void replaceShardAssignment(long tableId, UUID shardUuid, String nodeIden
ShardDao dao = shardDaoSupplier.attach(handle);

Set<Integer> oldAssignments = new HashSet<>(fetchLockedNodeIds(handle, tableId, shardUuid));

// 1. Update index table
updateNodeIds(handle, tableId, shardUuid, ImmutableSet.of(nodeId));

// 2. Update shards table
dao.deleteShardNodes(shardUuid, oldAssignments);
dao.insertShardNode(shardUuid, nodeId);

if (deltaUuid.isPresent()) {
dao.deleteShardNodes(deltaUuid.get(), oldAssignments);
dao.insertShardNode(deltaUuid.get(), nodeId);
}
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public interface ShardManager
/**
* Remove all old shard assignments and assign a shard to a node
*/
void replaceShardAssignment(long tableId, UUID shardUuid, String nodeIdentifier, boolean gracePeriod);
void replaceShardAssignment(long tableId, UUID shardUuid, Optional<UUID> deltaUuid, String nodeIdentifier, boolean gracePeriod);

/**
* Get the number of bytes used by assigned shards per node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ void process()
// only include nodes that are below threshold
nodes = new HashMap<>(filterValues(nodes, size -> size <= averageSize));

// get non-bucketed node shards by size, largest to smallest
// get non-bucketed node shards(only) by size, largest to smallest
List<ShardMetadata> shards = shardManager.getNodeShards(currentNode).stream()
.filter(shard -> !shard.getBucketNumber().isPresent())
.sorted(comparingLong(ShardMetadata::getCompressedSize).reversed())
Expand All @@ -227,6 +227,7 @@ void process()
ShardMetadata shard = queue.remove();
long shardSize = shard.getCompressedSize();
UUID shardUuid = shard.getShardUuid();
Optional<UUID> deltaUuid = shard.getDeltaUuid();

// verify backup exists
if (!backupStore.get().shardExists(shardUuid)) {
Expand All @@ -250,7 +251,7 @@ void process()
nodeSize -= shardSize;

// move assignment
shardManager.replaceShardAssignment(shard.getTableId(), shardUuid, target, false);
shardManager.replaceShardAssignment(shard.getTableId(), shardUuid, deltaUuid, target, false);

// delete local file
Path file = storageService.getStorageFile(shardUuid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,21 +222,21 @@ public void testAssignShard()
assertEquals(actual, new ShardNodes(shard, Optional.empty(), ImmutableSet.of("node1")));

try {
shardManager.replaceShardAssignment(tableId, shard, "node2", true);
shardManager.replaceShardAssignment(tableId, shard, Optional.empty(), "node2", true);
fail("expected exception");
}
catch (PrestoException e) {
assertEquals(e.getErrorCode(), SERVER_STARTING_UP.toErrorCode());
}

// replace shard assignment to another node
shardManager.replaceShardAssignment(tableId, shard, "node2", false);
shardManager.replaceShardAssignment(tableId, shard, Optional.empty(), "node2", false);

actual = getOnlyElement(getShardNodes(tableId, TupleDomain.all()));
assertEquals(actual, new ShardNodes(shard, Optional.empty(), ImmutableSet.of("node2")));

// replacing shard assignment should be idempotent
shardManager.replaceShardAssignment(tableId, shard, "node2", false);
shardManager.replaceShardAssignment(tableId, shard, Optional.empty(), "node2", false);

actual = getOnlyElement(getShardNodes(tableId, TupleDomain.all()));
assertEquals(actual, new ShardNodes(shard, Optional.empty(), ImmutableSet.of("node2")));
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testGetNodeBytes()

assertEquals(shardManager.getNodeBytes(), ImmutableMap.of("node1", 88L));

shardManager.replaceShardAssignment(tableId, shard1, "node2", false);
shardManager.replaceShardAssignment(tableId, shard1, Optional.empty(), "node2", false);

assertEquals(getShardNodes(tableId, TupleDomain.all()), ImmutableSet.of(
new ShardNodes(shard1, Optional.empty(), ImmutableSet.of("node2")),
Expand Down

0 comments on commit 5f781d3

Please sign in to comment.