Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Barrage/web UI support for BigDecimal and BigInteger. #1627

Merged
merged 14 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions Integrations/python/test/testParquetTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,18 @@ def testCreation(self):
shutil.rmtree(fileLocation)
if os.path.exists(fileLocation2):
shutil.rmtree(fileLocation2)
time.sleep(0.01) # avoid race condition on file existence...

# Writing
with self.subTest(msg="writeTable(Table, String)"):
ParquetTools.writeTable(table, fileLocation)
time.sleep(0.01) # avoid race condition on file existence...
self.assertTrue(os.path.exists(fileLocation))
shutil.rmtree(baseDir)
time.sleep(0.01) # avoid race condition on file existence...
with self.subTest(msg="writeTable(Table, File)"):
ParquetTools.writeTable(table, ParquetTools.getFileObject(fileLocation))
time.sleep(0.01) # avoid race condition on file existence...
self.assertTrue(os.path.exists(fileLocation))
shutil.rmtree(baseDir)
time.sleep(0.01) # avoid race condition on file existence...
with self.subTest(msg="writeTables(Table[], TableDefinition, File[]"):
ParquetTools.writeTables([table, table], definition, [fileLocation, fileLocation2])
time.sleep(0.01) # avoid race condition on file existence...
self.assertTrue(os.path.exists(fileLocation))
self.assertTrue(os.path.exists(fileLocation2))

Expand All @@ -79,11 +73,9 @@ def testCreation(self):
with self.subTest(msg="delete(File)"):
if os.path.exists(fileLocation):
ParquetTools.deleteTable(fileLocation)
time.sleep(0.01) # avoid race condition on file existence...
self.assertFalse(os.path.exists(fileLocation))
if os.path.exists(fileLocation2):
ParquetTools.deleteTable(fileLocation2)
time.sleep(0.01) # avoid race condition on file existence...
self.assertFalse(os.path.exists(fileLocation2))
shutil.rmtree(baseDir)

Expand All @@ -94,17 +86,18 @@ def testDecimal(self):
[jbigdecimal.valueOf(101,2)]],
[('decimal_value', dh.bigdecimal)])
self.assertIsNotNone(table)
baseDir = os.path.join(self.rootDir, "testCreation")
baseDir = os.path.join(self.rootDir, 'testCreation')
fileLocation = os.path.join(baseDir, 'table1.parquet')
if os.path.exists(fileLocation):
shutil.rmtree(fileLocation)
time.sleep(0.01) # avoid race condition on file existence...

ParquetTools.writeTable(table, fileLocation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test seems incomplete. Shouldn't we read the data back in and compare it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally yes, but this test something that wasn't being tested before, meaning, that writing the file doesn't blow up.
Ok, let's test more...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better. Might be better still with TstUtils.assertTableEquals(table, table2) if that's easy to do from Python.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, but is not easy to do from python:

ERROR [0.057s]: testDecimal (test.testParquetTools.TestParquetTools)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/python/test/testParquetTools.py", line 97, in testDecimal
    tstutils = jpy.get_type('io.deephaven.engine.table.impl.TstUtils')
ValueError: Java class 'io.deephaven.engine.table.impl.TstUtils' not found

My guess is we don't export the test packages when we build the docker image (which makes sense).
I used TableTools.diff instead, which I think should give a similar value.

time.sleep(0.01) # avoid race condition on file existence...
table2 = ParquetTools.readTable(fileLocation)
joinRowCount = table.exactJoin(table2, 'decimal_value').size()
self.assertEquals(table.size(), joinRowCount)

self.assertTrue(os.path.exists(fileLocation))
shutil.rmtree(baseDir)
time.sleep(0.01) # avoid race condition on file existence...

@classmethod
def tearDownClass(cls):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ public static boolean codecRequired(@NotNull final ColumnDefinition<?> columnDef
* @return Whether a codec is required
*/
public static boolean codecRequired(@NotNull final Class<?> dataType, @Nullable final Class<?> componentType) {
if (dataType.isPrimitive() || dataType == Boolean.class || dataType == DateTime.class
|| dataType == String.class || StringSet.class.isAssignableFrom(dataType)) {
if (dataType.isPrimitive() || noCodecRequired(dataType) || StringSet.class.isAssignableFrom(dataType)) {
// Primitive, basic, and special types do not require codecs
return false;
}
Expand All @@ -49,31 +48,34 @@ public static boolean codecRequired(@NotNull final Class<?> dataType, @Nullable
"Array type " + dataType + " does not match component type " + componentType);
}
// Arrays of primitives or basic types do not require codecs
return !(componentType.isPrimitive() || componentType == Boolean.class || componentType == DateTime.class
|| componentType == String.class);
return !(componentType.isPrimitive() || noCodecRequired(dataType));
}
if (Vector.class.isAssignableFrom(dataType)) {
if (componentType == null) {
throw new IllegalArgumentException("Vector type " + dataType + " requires a component type");
}
if (ObjectVector.class.isAssignableFrom(dataType)) {
// Vectors of basic types do not require codecs
return !(componentType == Boolean.class || componentType == DateTime.class
|| componentType == String.class);
return !noCodecRequired(dataType);
}
// VectorBases of primitive types do not require codecs
return false;
}
if (BigDecimal.class.equals(dataType)) {
// A BigDecimal column maps to a logical type of decimal with the appropriate
// precision and scale (unless the user explicitly requested something else
// via instructions).
return false;
}
// Anything else must have a codec
return true;
}

private static boolean noCodecRequired(@NotNull final Class<?> dataType) {
return dataType == Boolean.class ||
dataType == DateTime.class ||
dataType == String.class ||
// A BigDecimal column maps to a logical type of decimal, with
// appropriate precision and scale calculated from column data,
// unless the user explicitly requested something else
// via instructions.
dataType == BigDecimal.class;
}

/**
* Test whether an explicit codec has been set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,11 @@ static <T> Chunk<Values> extractChunkFromInputStream(
bufferInfoIter,
(final byte[] buf, final int offset, final int length) -> {
// read the int scale value as little endian, arrow's endianness.
final int bigEndianScale = ByteBuffer.wrap(buf, offset, 4).getInt();
final int scale = Integer.reverseBytes(bigEndianScale);
final byte b1 = buf[offset];
final byte b2 = buf[offset + 1];
final byte b3 = buf[offset + 2];
final byte b4 = buf[offset + 3];
final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF);
return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale);
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private static void pushColumnTypesFromAvroField(
}
final Schema.Type fieldType = fieldSchema.getType();
pushColumnTypesFromAvroField(
columnsOut, mappedOut, prefix, field, fieldName, fieldSchema, mappedName, fieldType,
columnsOut, mappedOut, prefix, fieldName, fieldSchema, mappedName, fieldType,
fieldNameToColumnName);

}
Expand All @@ -172,7 +172,6 @@ private static void pushColumnTypesFromAvroField(
final List<ColumnDefinition<?>> columnsOut,
final Map<String, String> mappedOut,
final String prefix,
final Schema.Field field,
final String fieldName,
final Schema fieldSchema,
final String mappedName,
Expand Down Expand Up @@ -207,7 +206,7 @@ private static void pushColumnTypesFromAvroField(
case UNION:
final Schema effectiveSchema = Utils.getEffectiveSchema(fieldName, fieldSchema);
pushColumnTypesFromAvroField(
columnsOut, mappedOut, prefix, field, fieldName, effectiveSchema, mappedName,
columnsOut, mappedOut, prefix, fieldName, effectiveSchema, mappedName,
effectiveSchema.getType(),
fieldNameToColumnName);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private static FieldCopier makeFieldCopier(
return new GenericRecordLongFieldCopierWithMultiplier(fieldName, 1000L);
} else if (LogicalTypes.timestampMillis().equals(logicalType)) {
// millis to nanos
return new GenericRecordLongFieldCopierWithMultiplier(fieldName, 1000L * 1000L);
return new GenericRecordLongFieldCopierWithMultiplier(fieldName, 1000_000L);
}
throw new IllegalArgumentException(
"Can not map field with unknown logical type to DateTime: field=" + fieldName
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.deephaven.util.codec;
package io.deephaven.parquet.table;

import io.deephaven.datastructures.util.CollectionUtil;
import io.deephaven.util.codec.ObjectCodec;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -78,9 +79,9 @@ public byte[] encode(@Nullable final BigDecimal input) {
}

final BigDecimal value = (input.scale() == scale) ? input : input.setScale(scale, roundingMode);
final BigInteger unescaledValue = value.unscaledValue();
final BigInteger unscaledValue = value.unscaledValue();

final byte[] bytes = unescaledValue.toByteArray();
final byte[] bytes = unscaledValue.toByteArray();
return bytes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ class MappedSchema {
static MappedSchema create(
final Map<String, Map<ParquetTableWriter.CacheTags, Object>> computedCache,
final TableDefinition definition,
final TrackingRowSet index,
final TrackingRowSet rowSet,
final Map<String, ? extends ColumnSource<?>> columnSourceMap,
final ParquetInstructions instructions,
final ColumnDefinition... extraColumns) {
final MessageTypeBuilder builder = Types.buildMessage();
for (final ColumnDefinition<?> columnDefinition : definition.getColumns()) {
TypeInfos.TypeInfo typeInfo =
getTypeInfo(computedCache, columnDefinition, index, columnSourceMap, instructions);
getTypeInfo(computedCache, columnDefinition, rowSet, columnSourceMap, instructions);
Type schemaType = typeInfo.createSchemaType(columnDefinition, instructions);
builder.addField(schemaType);
}
for (final ColumnDefinition<?> extraColumn : extraColumns) {
builder.addField(getTypeInfo(computedCache, extraColumn, index, columnSourceMap, instructions)
builder.addField(getTypeInfo(computedCache, extraColumn, rowSet, columnSourceMap, instructions)
.createSchemaType(extraColumn, instructions));
}
MessageType schema = builder.named("root");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.util.QueryConstants;
import io.deephaven.util.codec.BigDecimalParquetBytesCodec;
import io.deephaven.util.codec.ObjectCodec;
import io.deephaven.util.type.TypeUtils;
import io.deephaven.parquet.base.ColumnWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,18 @@ public PrecisionAndScale(final int precision, final int scale) {
}
}

private static PrecisionAndScale getPrecisionAndScale(final TrackingRowSet rowSet,
private static PrecisionAndScale computePrecisionAndScale(final TrackingRowSet rowSet,
final ColumnSource<BigDecimal> source) {
final int sz = 4096;
final ChunkSource.GetContext context = source.makeGetContext(sz);
// we first compute max(precision - scale) and max(scale), which corresponds to max(digits left of the decimal
// point),
// max(digits right of the decimal point). Then we convert to (precision, scale) before returning.
// we first compute max(precision - scale) and max(scale), which corresponds to
// max(digits left of the decimal point), max(digits right of the decimal point).
// Then we convert to (precision, scale) before returning.
int maxPrecisionMinusScale = 0;
int maxScale = 0;
try (final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) {
final RowSequence ok = it.getNextRowSequenceWithLength(sz);
final ObjectChunk<BigDecimal, ? extends Values> chunk = source.getChunk(context, ok).asObjectChunk();
try (final ChunkSource.GetContext context = source.makeGetContext(sz);
final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) {
final RowSequence rowSeq = it.getNextRowSequenceWithLength(sz);
final ObjectChunk<BigDecimal, ? extends Values> chunk = source.getChunk(context, rowSeq).asObjectChunk();
for (int i = 0; i < chunk.size(); ++i) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
final BigDecimal x = chunk.get(i);
final int precision = x.precision();
Expand All @@ -144,21 +144,10 @@ static PrecisionAndScale getPrecisionAndScale(
final String columnName,
final TrackingRowSet rowSet,
Supplier<ColumnSource<BigDecimal>> columnSourceSupplier) {
Map<ParquetTableWriter.CacheTags, Object> columnCache = computedCache.get(columnName);
if (columnCache != null) {
final Object cached = columnCache.get(ParquetTableWriter.CacheTags.DECIMAL_ARGS);
if (cached != null) {
return (PrecisionAndScale) cached;
}
}
// noinspection unchecked
final PrecisionAndScale ans = getPrecisionAndScale(rowSet, columnSourceSupplier.get());
if (columnCache == null) {
columnCache = new HashMap<>();
computedCache.put(columnName, columnCache);
}
columnCache.put(ParquetTableWriter.CacheTags.DECIMAL_ARGS, ans);
return ans;
return (PrecisionAndScale) computedCache
.computeIfAbsent(columnName, unusedColumnName -> new HashMap<>())
.computeIfAbsent(ParquetTableWriter.CacheTags.DECIMAL_ARGS,
unusedCacheTag -> computePrecisionAndScale(rowSet, columnSourceSupplier.get()));
}

static TypeInfo bigDecimalTypeInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import io.deephaven.parquet.base.tempfix.ParquetMetadataConverter;
import io.deephaven.parquet.table.pagestore.topage.*;
import io.deephaven.parquet.table.region.*;
import io.deephaven.util.codec.BigDecimalParquetBytesCodec;
import io.deephaven.parquet.table.BigDecimalParquetBytesCodec;
import io.deephaven.util.codec.CodecCache;
import io.deephaven.util.codec.ObjectCodec;
import io.deephaven.util.codec.SimpleByteArrayCodec;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.deephaven.parquet.table.pagestore.topage;

import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.attributes.Any;
import org.jetbrains.annotations.NotNull;

import java.math.BigDecimal;

public abstract class ToBigDecimalBase<ATTR extends Any> implements ToPage<ATTR, BigDecimal[]> {
protected final byte scale;

protected ToBigDecimalBase(@NotNull final Class<?> nativeType, final int precision, final int scale) {
if (!BigDecimal.class.equals(nativeType)) {
throw new IllegalArgumentException(
"The native type for a BigDecimal column is " + nativeType.getCanonicalName());
}

this.scale = (byte) scale;
if (((int) this.scale) != scale) {
throw new IllegalArgumentException(
"precision=" + precision + " and scale=" + scale + " can't be represented");
}
}

@NotNull
@Override
public Class<?> getNativeType() {
return BigDecimal.class;
}

@Override
@NotNull
public final ChunkType getChunkType() {
return ChunkType.Object;
}
}
Original file line number Diff line number Diff line change
@@ -1,47 +1,41 @@
package io.deephaven.parquet.table.pagestore.topage;

import io.deephaven.chunk.attributes.Any;
import io.deephaven.vector.ObjectVector;
import io.deephaven.vector.ObjectVectorDirect;
import org.jetbrains.annotations.NotNull;

import java.math.BigDecimal;
import java.math.BigInteger;

public class ToBigDecimalFromIntPage<ATTR extends Any> extends ToIntPage<ATTR> {
private final byte scale;
import static io.deephaven.util.QueryConstants.NULL_INT_BOXED;

public class ToBigDecimalFromIntPage<ATTR extends Any> extends ToBigDecimalBase<ATTR> {

public static <ATTR extends Any> ToPage<ATTR, BigDecimal[]> create(
@NotNull final Class<?> nativeType,
final int precision,
final int scale
) {
if (!BigDecimal.class.equals(nativeType)) {
throw new IllegalArgumentException(
"The native type for a BigDecimal column is " + nativeType.getCanonicalName());
}

return new ToBigDecimalFromIntPage(precision, scale);
return new ToBigDecimalFromIntPage(nativeType, precision, scale);
}

protected ToBigDecimalFromIntPage(final int precision, final int scale) {
this.scale = (byte) scale;
if (((int) this.scale) != scale) {
throw new IllegalArgumentException("precision=" + precision + " and scale=" + scale + " can't be represented");
}
protected ToBigDecimalFromIntPage(@NotNull final Class<?> nativeType, final int precision, final int scale) {
super(nativeType, precision, scale);
}

@Override
@NotNull
public ObjectVector<BigDecimal> makeVector(final int[] result) {
final BigDecimal[] to = new BigDecimal[result.length];
for (int i = 0; i < result.length; ++i) {
to[i] = BigDecimal.valueOf(result[i], scale);
public BigDecimal[] convertResult(@NotNull final Object result) {
final int[] in = (int[]) result;
final int resultLength = in.length;
final BigDecimal[] out = new BigDecimal[resultLength];
for (int ri = 0; ri < resultLength; ++ri) {
out[ri] = new BigDecimal(BigInteger.valueOf(in[ri]), scale);
jcferretti marked this conversation as resolved.
Show resolved Hide resolved
}
return new ObjectVectorDirect<>(to);
return out;
}

@Override
public int[] convertResult(@NotNull final Object result) {
return (int[]) result;
@NotNull
public final Object nullValue() {
return NULL_INT_BOXED;
}
}
Loading