Skip to content

Commit

Permalink
Remove table id from Faker
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Oct 25, 2024
1 parent 31278de commit a066f3b
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -79,11 +78,8 @@ public class FakerMetadata
private final double nullProbability;
private final long defaultLimit;

private final AtomicLong nextTableId = new AtomicLong();
@GuardedBy("this")
private final Map<SchemaTableName, Long> tableIds = new HashMap<>();
@GuardedBy("this")
private final Map<Long, TableInfo> tables = new HashMap<>();
private final Map<SchemaTableName, TableInfo> tables = new HashMap<>();

@Inject
public FakerMetadata(FakerConfig config)
Expand Down Expand Up @@ -134,13 +130,13 @@ public synchronized ConnectorTableHandle getTableHandle(ConnectorSession session
throw new TrinoException(NOT_SUPPORTED, "This connector does not support versioned tables");
}
SchemaInfo schema = getSchema(tableName.getSchemaName());
Long id = tableIds.get(tableName);
if (id == null) {
TableInfo tableInfo = tables.get(tableName);
if (tableInfo == null) {
return null;
}
long schemaLimit = (long) schema.properties().getOrDefault(SchemaInfo.DEFAULT_LIMIT_PROPERTY, defaultLimit);
long tableLimit = (long) tables.get(id).properties().getOrDefault(TableInfo.DEFAULT_LIMIT_PROPERTY, schemaLimit);
return new FakerTableHandle(id, tableName, TupleDomain.all(), tableLimit);
long tableLimit = (long) tables.get(tableName).properties().getOrDefault(TableInfo.DEFAULT_LIMIT_PROPERTY, schemaLimit);
return new FakerTableHandle(tableName, TupleDomain.all(), tableLimit);
}

@Override
Expand All @@ -149,15 +145,20 @@ public synchronized ConnectorTableMetadata getTableMetadata(
ConnectorTableHandle connectorTableHandle)
{
FakerTableHandle tableHandle = (FakerTableHandle) connectorTableHandle;
return tables.get(tableHandle.id()).metadata();
SchemaTableName name = tableHandle.schemaTableName();
TableInfo tableInfo = tables.get(tableHandle.schemaTableName());
return new ConnectorTableMetadata(
name,
tableInfo.columns().stream().map(ColumnInfo::metadata).collect(toImmutableList()),
tableInfo.properties(),
tableInfo.comment());
}

@Override
public synchronized List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
{
return tables.values().stream()
.filter(table -> schemaName.map(table.schemaName()::contentEquals).orElse(true))
.map(TableInfo::schemaTableName)
return tables.keySet().stream()
.filter(table -> schemaName.map(table.getSchemaName()::contentEquals).orElse(true))
.collect(toImmutableList());
}

Expand All @@ -167,7 +168,7 @@ public synchronized Map<String, ColumnHandle> getColumnHandles(
ConnectorTableHandle connectorTableHandle)
{
FakerTableHandle tableHandle = (FakerTableHandle) connectorTableHandle;
return tables.get(tableHandle.id())
return tables.get(tableHandle.schemaTableName())
.columns().stream()
.collect(toImmutableMap(ColumnInfo::name, ColumnInfo::handle));
}
Expand All @@ -179,30 +180,27 @@ public synchronized ColumnMetadata getColumnMetadata(
ColumnHandle columnHandle)
{
FakerTableHandle tableHandle = (FakerTableHandle) connectorTableHandle;
return tables.get(tableHandle.id())
return tables.get(tableHandle.schemaTableName())
.column(columnHandle)
.metadata();
}

@Override
public synchronized Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
return tables.values().stream()
.filter(table -> prefix.matches(table.schemaTableName()))
.map(table -> TableColumnsMetadata.forTable(
table.schemaTableName(),
table.metadata().getColumns()))
return tables.entrySet().stream()
.filter(entry -> prefix.matches(entry.getKey()))
.map(entry -> TableColumnsMetadata.forTable(
entry.getKey(),
entry.getValue().columns().stream().map(ColumnInfo::metadata).collect(toImmutableList())))
.iterator();
}

@Override
public synchronized void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
FakerTableHandle handle = (FakerTableHandle) tableHandle;
TableInfo info = tables.remove(handle.id());
if (info != null) {
tableIds.remove(info.schemaTableName());
}
tables.remove(handle.schemaTableName());
}

@Override
Expand All @@ -212,54 +210,45 @@ public synchronized void renameTable(ConnectorSession session, ConnectorTableHan
checkTableNotExists(newTableName);

FakerTableHandle handle = (FakerTableHandle) tableHandle;
long tableId = handle.id();

TableInfo oldInfo = tables.get(tableId);
tables.put(tableId, new TableInfo(
tableId,
newTableName.getSchemaName(),
newTableName.getTableName(),
oldInfo.columns(),
oldInfo.properties(),
oldInfo.comment()));

tableIds.remove(oldInfo.schemaTableName());
tableIds.put(newTableName, tableId);
SchemaTableName oldTableName = handle.schemaTableName();

tables.remove(oldTableName);
tables.put(newTableName, tables.get(oldTableName));
}

@Override
public synchronized void setTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle, Map<String, Optional<Object>> properties)
{
FakerTableHandle handle = (FakerTableHandle) tableHandle;
long tableId = handle.id();
SchemaTableName tableName = handle.schemaTableName();

TableInfo oldInfo = tables.get(tableId);
TableInfo oldInfo = tables.get(tableName);
Map<String, Object> newProperties = Stream.concat(
oldInfo.properties().entrySet().stream()
.filter(entry -> !properties.containsKey(entry.getKey())),
properties.entrySet().stream()
.filter(entry -> entry.getValue().isPresent()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
tables.put(tableId, oldInfo.withProperties(newProperties));
tables.put(tableName, oldInfo.withProperties(newProperties));
}

@Override
public synchronized void setTableComment(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<String> comment)
{
FakerTableHandle handle = (FakerTableHandle) tableHandle;
long tableId = handle.id();
SchemaTableName tableName = handle.schemaTableName();

TableInfo oldInfo = requireNonNull(tables.get(tableId), "tableInfo is null");
tables.put(tableId, oldInfo.withComment(comment));
TableInfo oldInfo = requireNonNull(tables.get(tableName), "tableInfo is null");
tables.put(tableName, oldInfo.withComment(comment));
}

@Override
public synchronized void setColumnComment(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle column, Optional<String> comment)
{
FakerTableHandle handle = (FakerTableHandle) tableHandle;
long tableId = handle.id();
SchemaTableName tableName = handle.schemaTableName();

TableInfo oldInfo = tables.get(tableId);
TableInfo oldInfo = tables.get(tableName);
List<ColumnInfo> columns = oldInfo.columns().stream()
.map(columnInfo -> {
if (columnInfo.handle().equals(column)) {
Expand All @@ -268,7 +257,7 @@ public synchronized void setColumnComment(ConnectorSession session, ConnectorTab
return columnInfo;
})
.collect(toImmutableList());
tables.put(tableId, oldInfo.withColumns(columns));
tables.put(tableName, oldInfo.withColumns(columns));
}

@Override
Expand All @@ -287,9 +276,9 @@ public synchronized FakerOutputTableHandle beginCreateTable(ConnectorSession ses
if (replace) {
throw new TrinoException(NOT_SUPPORTED, "This connector does not support replacing tables");
}
SchemaInfo schema = getSchema(tableMetadata.getTable().getSchemaName());
SchemaTableName tableName = tableMetadata.getTable();
SchemaInfo schema = getSchema(tableName.getSchemaName());
checkTableNotExists(tableMetadata.getTable());
long tableId = nextTableId.getAndIncrement();

double schemaNullProbability = (double) schema.properties().getOrDefault(SchemaInfo.NULL_PROBABILITY_PROPERTY, nullProbability);
double tableNullProbability = (double) tableMetadata.getProperties().getOrDefault(TableInfo.NULL_PROBABILITY_PROPERTY, schemaNullProbability);
Expand All @@ -315,16 +304,12 @@ public synchronized FakerOutputTableHandle beginCreateTable(ConnectorSession ses
column));
}

tableIds.put(tableMetadata.getTable(), tableId);
tables.put(tableId, new TableInfo(
tableId,
tableMetadata.getTable().getSchemaName(),
tableMetadata.getTable().getTableName(),
tables.put(tableName, new TableInfo(
columns.build(),
tableMetadata.getProperties(),
tableMetadata.getComment()));

return new FakerOutputTableHandle(tableId);
return new FakerOutputTableHandle(tableName);
}

private boolean isCharacterColumn(ColumnMetadata column)
Expand All @@ -341,7 +326,7 @@ private synchronized void checkSchemaExists(String schemaName)

private synchronized void checkTableNotExists(SchemaTableName tableName)
{
if (tableIds.containsKey(tableName)) {
if (tables.containsKey(tableName)) {
throw new TrinoException(TABLE_ALREADY_EXISTS, format("Table '%s' already exists", tableName));
}
}
Expand All @@ -352,14 +337,14 @@ public synchronized Optional<ConnectorOutputMetadata> finishCreateTable(Connecto
requireNonNull(tableHandle, "tableHandle is null");
FakerOutputTableHandle fakerOutputHandle = (FakerOutputTableHandle) tableHandle;

long tableId = fakerOutputHandle.table();
SchemaTableName tableName = fakerOutputHandle.schemaTableName();

TableInfo info = tables.get(tableId);
TableInfo info = tables.get(tableName);
requireNonNull(info, "info is null");

// TODO ensure fragments is empty?

tables.put(tableId, new TableInfo(tableId, info.schemaName(), info.tableName(), info.columns(), info.properties(), info.comment()));
tables.put(tableName, new TableInfo(info.columns(), info.properties(), info.comment()));
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
package io.trino.plugin.faker;

import io.trino.spi.connector.ConnectorOutputTableHandle;
import io.trino.spi.connector.SchemaTableName;

public record FakerOutputTableHandle(long table)
import static java.util.Objects.requireNonNull;

public record FakerOutputTableHandle(SchemaTableName schemaTableName)
implements ConnectorOutputTableHandle
{
public FakerOutputTableHandle
{
requireNonNull(schemaTableName, "schemaTableName is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import static java.util.Objects.requireNonNull;

public record FakerTableHandle(long id, SchemaTableName schemaTableName, TupleDomain<ColumnHandle> constraint, long limit)
public record FakerTableHandle(SchemaTableName schemaTableName, TupleDomain<ColumnHandle> constraint, long limit)
implements ConnectorTableHandle
{
public FakerTableHandle
Expand All @@ -32,11 +32,11 @@ public record FakerTableHandle(long id, SchemaTableName schemaTableName, TupleDo

public FakerTableHandle withConstraint(TupleDomain<ColumnHandle> constraint)
{
return new FakerTableHandle(id, schemaTableName, constraint, limit);
return new FakerTableHandle(schemaTableName, constraint, limit);
}

public FakerTableHandle withLimit(long limit)
{
return new FakerTableHandle(id, schemaTableName, constraint, limit);
return new FakerTableHandle(schemaTableName, constraint, limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,25 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.SchemaTableName;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public record TableInfo(long id, String schemaName, String tableName, List<ColumnInfo> columns, Map<String, Object> properties, Optional<String> comment)
public record TableInfo(List<ColumnInfo> columns, Map<String, Object> properties, Optional<String> comment)
{
public static final String NULL_PROBABILITY_PROPERTY = "null_probability";
public static final String DEFAULT_LIMIT_PROPERTY = "default_limit";

public TableInfo
{
requireNonNull(schemaName, "schemaName is null");
requireNonNull(tableName, "tableName is null");
columns = ImmutableList.copyOf(columns);
properties = ImmutableMap.copyOf(requireNonNull(properties, "properties is null"));
requireNonNull(comment, "comment is null");
}

public SchemaTableName schemaTableName()
{
return new SchemaTableName(schemaName, tableName);
}

public ConnectorTableMetadata metadata()
{
return new ConnectorTableMetadata(
new SchemaTableName(schemaName, tableName),
columns.stream()
.map(ColumnInfo::metadata)
.collect(toImmutableList()),
properties,
comment);
}

public ColumnInfo column(ColumnHandle handle)
{
return columns.stream()
Expand All @@ -66,16 +45,16 @@ public ColumnInfo column(ColumnHandle handle)

public TableInfo withColumns(List<ColumnInfo> columns)
{
return new TableInfo(id, schemaName, tableName, columns, properties, comment);
return new TableInfo(columns, properties, comment);
}

public TableInfo withProperties(Map<String, Object> properties)
{
return new TableInfo(id, schemaName, tableName, columns, properties, comment);
return new TableInfo(columns, properties, comment);
}

public TableInfo withComment(Optional<String> comment)
{
return new TableInfo(id, schemaName, tableName, columns, properties, comment);
return new TableInfo(columns, properties, comment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void testSplits()
ConnectorSplitSource splitSource = new FakerSplitManager(nodeManager).getSplits(
TestingTransactionHandle.create(),
TestingConnectorSession.SESSION,
new FakerTableHandle(1L, new SchemaTableName("schema", "table"), TupleDomain.all(), expectedRows),
new FakerTableHandle(new SchemaTableName("schema", "table"), TupleDomain.all(), expectedRows),
DynamicFilter.EMPTY,
Constraint.alwaysTrue());
List<ConnectorSplit> splits = splitSource.getNextBatch(1_000_000).get().getSplits();
Expand Down

0 comments on commit a066f3b

Please sign in to comment.