Skip to content

Commit

Permalink
Make the faker connector generate multiple splits per node
Browse files Browse the repository at this point in the history
  • Loading branch information
nineinchnick committed Oct 17, 2024
1 parent 3a6c7fc commit 7039a53
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class FakerConfig
{
private double nullProbability = 0.5;
private long defaultLimit = 1000L;
private int minSplits = 1;

@Max(1)
@Min(0)
Expand Down Expand Up @@ -52,4 +53,19 @@ public FakerConfig setDefaultLimit(long value)
this.defaultLimit = value;
return this;
}

@Min(1)
@Max(1_000)
public int getMinSplits()
{
return minSplits;
}

@Config("faker.min-splits")
@ConfigDescription("Minimum number of splits per node")
public FakerConfig setMinSplits(int value)
{
this.minSplits = value;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class FakerMetadata
private final List<SchemaInfo> schemas = new ArrayList<>();
private final double nullProbability;
private final long defaultLimit;
private final int minSplits;

private final AtomicLong nextTableId = new AtomicLong();
@GuardedBy("this")
Expand All @@ -92,6 +93,7 @@ public FakerMetadata(FakerConfig config)
this.schemas.add(new SchemaInfo(SCHEMA_NAME));
this.nullProbability = config.getNullProbability();
this.defaultLimit = config.getDefaultLimit();
this.minSplits = config.getMinSplits();
}

@Override
Expand Down Expand Up @@ -141,6 +143,7 @@ public synchronized ConnectorTableHandle getTableHandle(ConnectorSession session
}
long schemaLimit = (long) schema.getProperties().getOrDefault(SchemaInfo.DEFAULT_LIMIT_PROPERTY, defaultLimit);
long tableLimit = (long) tables.get(id).getProperties().getOrDefault(TableInfo.DEFAULT_LIMIT_PROPERTY, schemaLimit);
tableLimit = (tableLimit + minSplits - 1) / minSplits;
return new FakerTableHandle(id, tableName, TupleDomain.all(), tableLimit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.faker;

import io.trino.spi.NodeManager;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
Expand All @@ -21,12 +22,27 @@
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import jakarta.inject.Inject;

import java.util.List;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class FakerSplitManager
implements ConnectorSplitManager
{
private final int minSplits;
private final NodeManager nodeManager;

@Inject
public FakerSplitManager(FakerConfig config, NodeManager nodeManager)
{
this.minSplits = requireNonNull(config, "config is null").getMinSplits();
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
}

@Override
public ConnectorSplitSource getSplits(
ConnectorTransactionHandle transaction,
Expand All @@ -35,6 +51,11 @@ public ConnectorSplitSource getSplits(
DynamicFilter dynamicFilter,
Constraint constraint)
{
return new FixedSplitSource(List.of(new FakerSplit()));
List<FakerSplit> splits = nodeManager.getRequiredWorkerNodes().stream()
.flatMap(_ -> Stream.generate(FakerSplit::new)
.limit(minSplits))
.collect(toImmutableList());

return new FixedSplitSource(splits);
}
}

0 comments on commit 7039a53

Please sign in to comment.