Skip to content

Commit

Permalink
Optimizing By Using Precomputed Set in Partition Assignment Routine (#…
Browse files Browse the repository at this point in the history
…960)

Co-authored-by: Shrinand Thakkar <sthakkar@sthakkar-mn2.linkedin.biz>
  • Loading branch information
shrinandthakkar and Shrinand Thakkar authored Oct 25, 2023
1 parent 8f58370 commit 178956c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,15 +630,15 @@ public void testFetchPartitionChange() throws Exception {
Map<String, Optional<DatastreamGroupPartitionsMetadata>> partitionInfo = connector.getDatastreamPartitions();
Assert.assertEquals(partitionInfo.get(group.getTaskPrefix()).get().getDatastreamGroup().getName(),
group.getTaskPrefix());
Assert.assertEquals(new HashSet<>(partitionInfo.get(group.getTaskPrefix()).get().getPartitions()),
Assert.assertEquals(partitionInfo.get(group.getTaskPrefix()).get().getPartitions(),
ImmutableSet.of(yummyTopic + "-0"));

String saltyTopic = "SaltyPizza";
createTopic(_adminClient, saltyTopic, 2);

Assert.assertTrue(PollUtils.poll(() -> partitionChangeCalls.get() == 2, POLL_PERIOD_MS, POLL_TIMEOUT_MS));
partitionInfo = connector.getDatastreamPartitions();
Assert.assertEquals(new HashSet<>(partitionInfo.get(group.getTaskPrefix()).get().getPartitions()),
Assert.assertEquals(partitionInfo.get(group.getTaskPrefix()).get().getPartitions(),
ImmutableSet.of(yummyTopic + "-0", saltyTopic + "-0", saltyTopic + "-1"));
connector.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
*/
package com.linkedin.datastream.server;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import com.linkedin.datastream.common.LogUtils;

Expand All @@ -16,14 +19,23 @@
public class DatastreamGroupPartitionsMetadata {

private final DatastreamGroup _datastreamGroup;
private final List<String> _partitions;
private final Set<String> _partitions;

/**
* constructor
* @param datastreamGroup datastream group which handle the partitions
* @param partitions the partitions that belong to this datastream
* @param partitions the partitions in a list that belong to this datastream
*/
public DatastreamGroupPartitionsMetadata(DatastreamGroup datastreamGroup, List<String> partitions) {
this(datastreamGroup, new HashSet<>(partitions));
}

/**
* constructor
* @param datastreamGroup datastream group which handle the partitions
* @param partitions the partitions in a set that belong to this datastream
*/
public DatastreamGroupPartitionsMetadata(DatastreamGroup datastreamGroup, Set<String> partitions) {
_datastreamGroup = datastreamGroup;
_partitions = partitions;
}
Expand All @@ -32,12 +44,13 @@ public DatastreamGroup getDatastreamGroup() {
return _datastreamGroup;
}

public List<String> getPartitions() {
return Collections.unmodifiableList(_partitions);
public Set<String> getPartitions() {
return Collections.unmodifiableSet(_partitions);
}

@Override
public String toString() {
return String.format("datastream %s, partitions %s", _datastreamGroup.getName(), LogUtils.logSummarizedTopicPartitionsMapping(_partitions));
return String.format("datastream %s, partitions %s", _datastreamGroup.getName(),
LogUtils.logSummarizedTopicPartitionsMapping(new ArrayList<>(_partitions)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(
tasks.forEach(task -> {
if (task.getTaskPrefix().equals(datastreamGroupName)) {
Set<String> retainedPartitions = new HashSet<>(task.getPartitionsV2());
retainedPartitions.retainAll(new HashSet<>(partitionMetadata.getPartitions()));
retainedPartitions.retainAll(partitionMetadata.getPartitions());
newPartitionAssignmentMap.put(task.getId(), retainedPartitions);
if (retainedPartitions.size() != task.getPartitionsV2().size()) {
tasksWithChangedPartition.add(task.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public Map<String, Set<DatastreamTask>> assignPartitions(Map<String,
return task;
} else {
Set<String> newPartitions = new HashSet<>(task.getPartitionsV2());
newPartitions.retainAll(new HashSet<>(datastreamPartitions.getPartitions()));
newPartitions.retainAll(datastreamPartitions.getPartitions());

//We need to create new task if the partition is changed
boolean partitionChanged = newPartitions.size() != task.getPartitionsV2().size();
Expand Down Expand Up @@ -332,7 +332,7 @@ public Map<String, Set<DatastreamTask>> movePartitions(Map<String, Set<Datastrea

Set<String> allToReassignPartitions = new HashSet<>();
targetAssignment.values().forEach(allToReassignPartitions::addAll);
allToReassignPartitions.retainAll(new HashSet<>(partitionsMetadata.getPartitions()));
allToReassignPartitions.retainAll(partitionsMetadata.getPartitions());

// construct a map to store the tasks and if it contain the partitions that can be released
// map: <source taskName, partitions that need to be released>
Expand Down

0 comments on commit 178956c

Please sign in to comment.