Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Input Table Interface. #4923

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSequenceFactory;
import io.deephaven.engine.util.config.InputTableStatusListener;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.ChunkSink;
Expand Down Expand Up @@ -88,7 +87,7 @@ private AppendOnlyArrayBackedMutableTable(@NotNull TableDefinition definition,
final Map<String, Object[]> enumValues, final ProcessPendingUpdater processPendingUpdater) {
// noinspection resource
super(RowSetFactory.empty().toTracking(), makeColumnSourceMap(definition),
enumValues, processPendingUpdater);
cpwright marked this conversation as resolved.
Show resolved Hide resolved
processPendingUpdater);
}

@Override
Expand Down Expand Up @@ -140,23 +139,10 @@ ArrayBackedMutableInputTable makeHandler() {
}

private class AppendOnlyArrayBackedMutableInputTable extends ArrayBackedMutableInputTable {
@Override
public void setRows(@NotNull Table defaultValues, int[] rowArray, Map<String, Object>[] valueArray,
InputTableStatusListener listener) {
throw new UnsupportedOperationException();
}

@Override
public void validateDelete(Table tableToDelete) {
throw new UnsupportedOperationException("Table doesn't support delete operation");
}

@Override
public void addRows(Map<String, Object>[] valueArray, boolean allowEdits, InputTableStatusListener listener) {
if (allowEdits) {
throw new UnsupportedOperationException();
}
super.addRows(valueArray, allowEdits, listener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
package io.deephaven.engine.table.impl.util;

import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderSequential;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.TrackingRowSet;
Expand All @@ -17,7 +14,6 @@
import io.deephaven.engine.table.impl.sources.ArrayBackedColumnSource;
import io.deephaven.engine.util.config.InputTableStatusListener;
import io.deephaven.engine.util.config.MutableInputTable;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.engine.table.impl.UpdatableTable;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.annotations.TestUseOnly;
Expand All @@ -30,8 +26,6 @@

abstract class BaseArrayBackedMutableTable extends UpdatableTable {

private static final Object[] BOOLEAN_ENUM_ARRAY = new Object[] {true, false, null};

/**
* Queue of pending changes. Only synchronized access is permitted.
*/
Expand All @@ -45,18 +39,15 @@ abstract class BaseArrayBackedMutableTable extends UpdatableTable {
*/
private long processedSequence = 0L;

private final Map<String, Object[]> enumValues;

private String description = getDefaultDescription();
private Runnable onPendingChange = updateGraph::requestRefresh;

long nextRow = 0;
private long pendingProcessed = -1L;

public BaseArrayBackedMutableTable(TrackingRowSet rowSet, Map<String, ? extends ColumnSource<?>> nameToColumnSource,
Map<String, Object[]> enumValues, ProcessPendingUpdater processPendingUpdater) {
cpwright marked this conversation as resolved.
Show resolved Hide resolved
ProcessPendingUpdater processPendingUpdater) {
super(rowSet, nameToColumnSource, processPendingUpdater);
this.enumValues = enumValues;
MutableInputTable mutableInputTable = makeHandler();
setAttribute(Table.INPUT_TABLE_ATTRIBUTE, mutableInputTable);
setRefreshing(true);
Expand Down Expand Up @@ -350,84 +341,6 @@ void waitForSequence(long sequence) {
}
}

@Override
public void setRows(@NotNull Table defaultValues, int[] rowArray, Map<String, Object>[] valueArray,
InputTableStatusListener listener) {
Assert.neqNull(defaultValues, "defaultValues");
if (defaultValues.isRefreshing()) {
updateGraph.checkInitiateSerialTableOperation();
}

final List<ColumnDefinition<?>> columnDefinitions = getTableDefinition().getColumns();
final Map<String, WritableColumnSource<Object>> sources =
buildSourcesMap(valueArray.length, columnDefinitions);
final String[] kabmtColumns =
getTableDefinition().getColumnNames().toArray(CollectionUtil.ZERO_LENGTH_STRING_ARRAY);
// noinspection unchecked
final WritableColumnSource<Object>[] sourcesByPosition =
Arrays.stream(kabmtColumns).map(sources::get).toArray(WritableColumnSource[]::new);

final Set<String> missingColumns = new HashSet<>(getTableDefinition().getColumnNames());

for (final Map.Entry<String, ? extends ColumnSource<?>> entry : defaultValues.getColumnSourceMap()
.entrySet()) {
final String colName = entry.getKey();
if (!sources.containsKey(colName)) {
continue;
}
final ColumnSource<?> cs = Require.neqNull(entry.getValue(), "defaultValue column source: " + colName);
final WritableColumnSource<Object> dest =
Require.neqNull(sources.get(colName), "destination column source: " + colName);

final RowSet defaultValuesRowSet = defaultValues.getRowSet();
for (int rr = 0; rr < rowArray.length; ++rr) {
final long key = defaultValuesRowSet.get(rowArray[rr]);
dest.set(rr, cs.get(key));
}

missingColumns.remove(colName);
}

for (int ii = 0; ii < valueArray.length; ++ii) {
final Map<String, Object> passedInValues = valueArray[ii];

for (int cc = 0; cc < sourcesByPosition.length; cc++) {
final String colName = kabmtColumns[cc];
if (passedInValues.containsKey(colName)) {
sourcesByPosition[cc].set(ii, passedInValues.get(colName));
} else if (missingColumns.contains(colName)) {
throw new IllegalArgumentException("No value specified for " + colName + " row " + ii);
}
}
}

// noinspection resource
final QueryTable newData = new QueryTable(getTableDefinition(),
RowSetFactory.flat(valueArray.length).toTracking(), sources);
addAsync(newData, true, listener);
}

@Override
public void addRows(Map<String, Object>[] valueArray, boolean allowEdits, InputTableStatusListener listener) {
final List<ColumnDefinition<?>> columnDefinitions = getTableDefinition().getColumns();
final Map<String, WritableColumnSource<Object>> sources =
buildSourcesMap(valueArray.length, columnDefinitions);

for (int rowNumber = 0; rowNumber < valueArray.length; rowNumber++) {
final Map<String, Object> values = valueArray[rowNumber];
for (final ColumnDefinition<?> columnDefinition : columnDefinitions) {
sources.get(columnDefinition.getName()).set(rowNumber, values.get(columnDefinition.getName()));
}

}

// noinspection resource
final QueryTable newData = new QueryTable(getTableDefinition(),
RowSetFactory.flat(valueArray.length).toTracking(), sources);

addAsync(newData, allowEdits, listener);
}

@NotNull
private Map<String, WritableColumnSource<Object>> buildSourcesMap(int capacity,
List<ColumnDefinition<?>> columnDefinitions) {
Expand All @@ -443,14 +356,6 @@ private Map<String, WritableColumnSource<Object>> buildSourcesMap(int capacity,
return sources;
}

@Override
public Object[] getEnumsForColumn(String columnName) {
if (getTableDefinition().getColumn(columnName).getDataType().equals(Boolean.class)) {
return BOOLEAN_ENUM_ARRAY;
}
return enumValues.get(columnName);
}

@Override
public Table getTable() {
return BaseArrayBackedMutableTable.this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private KeyedArrayBackedMutableTable(@NotNull TableDefinition definition, final
final Map<String, Object[]> enumValues, final ProcessPendingUpdater processPendingUpdater) {
// noinspection resource
super(RowSetFactory.empty().toTracking(), makeColumnSourceMap(definition),
enumValues, processPendingUpdater);
cpwright marked this conversation as resolved.
Show resolved Hide resolved
processPendingUpdater);
final List<String> missingKeyColumns = new ArrayList<>(Arrays.asList(keyColumnNames));
missingKeyColumns.removeAll(definition.getColumnNames());
if (!missingKeyColumns.isEmpty()) {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* Implementations of this interface will make their own guarantees about how atomically changes will be applied and
* what operations they support.
*/
public interface MutableInputTable extends InputTableRowSetter, InputTableEnumGetter {
public interface MutableInputTable {
cpwright marked this conversation as resolved.
Show resolved Hide resolved

/**
* Gets the names of the key columns.
Expand Down
Loading
Loading