Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Barrage/web UI support for BigDecimal and BigInteger. #1627

Merged
merged 14 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Integrations/python/deephaven/Types.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
float64 = None
string = None
bigdecimal = None
biginteger = None
stringset = None
datetime = None
timeperiod = None
Expand Down Expand Up @@ -79,7 +80,7 @@ def _defineSymbols():
_qst_column_, _qst_newtable_, _qst_type_, _table_, \
DataType, bool_, byte, short, int16, char, int_, int32, long_, int64, \
float_, single, float32, double, float64, \
string, bigdecimal, stringset, datetime, timeperiod, \
string, bigdecimal, biginteger, stringset, datetime, timeperiod, \
byte_array, short_array, int16_array, int_array, int32_array, long_array, int64_array, \
float_array, single_array, float32_array, double_array, float64_array, string_array, \
_type2jtype
Expand Down Expand Up @@ -114,6 +115,7 @@ def _defineSymbols():
float64 = double # make life simple for people who are used to pyarrow
string = DataType(_qst_type_.stringType())
bigdecimal = _typeFromJavaClassName('java.math.BigDecimal')
biginteger = _typeFromJavaClassName('java.math.BigInteger')
stringset = _typeFromJavaClassName('io.deephaven.stringset.StringSet')
datetime = _typeFromJavaClassName('io.deephaven.time.DateTime')
timeperiod = _typeFromJavaClassName('io.deephaven.time.Period')
Expand Down Expand Up @@ -143,6 +145,7 @@ def _defineSymbols():
double : jpy.get_type('double'),
string : jpy.get_type('java.lang.String'),
bigdecimal : jpy.get_type('java.math.BigDecimal'),
biginteger : jpy.get_type('java.math.BigInteger'),
stringset : jpy.get_type('io.deephaven.stringset.StringSet'),
datetime : jpy.get_type('io.deephaven.time.DateTime'),
timeperiod : jpy.get_type('io.deephaven.time.Period'),
Expand Down
29 changes: 21 additions & 8 deletions Integrations/python/test/testParquetTools.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import shutil

from deephaven import TableTools, ParquetTools
import deephaven.Types as dh


if sys.version_info[0] < 3:
Expand Down Expand Up @@ -49,24 +50,18 @@ def testCreation(self):
shutil.rmtree(fileLocation)
if os.path.exists(fileLocation2):
shutil.rmtree(fileLocation2)
time.sleep(0.01) # avoid race condition on file existence...

# Writing
with self.subTest(msg="writeTable(Table, String)"):
ParquetTools.writeTable(table, fileLocation)
time.sleep(0.01) # avoid race condition on file existence...
self.assertTrue(os.path.exists(fileLocation))
shutil.rmtree(baseDir)
time.sleep(0.01) # avoid race condition on file existence...
with self.subTest(msg="writeTable(Table, File)"):
ParquetTools.writeTable(table, ParquetTools.getFileObject(fileLocation))
time.sleep(0.01) # avoid race condition on file existence...
self.assertTrue(os.path.exists(fileLocation))
shutil.rmtree(baseDir)
time.sleep(0.01) # avoid race condition on file existence...
with self.subTest(msg="writeTables(Table[], TableDefinition, File[]"):
ParquetTools.writeTables([table, table], definition, [fileLocation, fileLocation2])
time.sleep(0.01) # avoid race condition on file existence...
self.assertTrue(os.path.exists(fileLocation))
self.assertTrue(os.path.exists(fileLocation2))

Expand All @@ -78,14 +73,32 @@ def testCreation(self):
with self.subTest(msg="delete(File)"):
if os.path.exists(fileLocation):
ParquetTools.deleteTable(fileLocation)
time.sleep(0.01) # avoid race condition on file existence...
self.assertFalse(os.path.exists(fileLocation))
if os.path.exists(fileLocation2):
ParquetTools.deleteTable(fileLocation2)
time.sleep(0.01) # avoid race condition on file existence...
self.assertFalse(os.path.exists(fileLocation2))
shutil.rmtree(baseDir)

def testDecimal(self):
jbigdecimal = jpy.get_type('java.math.BigDecimal')
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
table = dh.table_of([[jbigdecimal.valueOf(301, 2)],
[jbigdecimal.valueOf(201,2)],
[jbigdecimal.valueOf(101,2)]],
[('decimal_value', dh.bigdecimal)])
self.assertIsNotNone(table)
baseDir = os.path.join(self.rootDir, 'testCreation')
fileLocation = os.path.join(baseDir, 'table1.parquet')
if os.path.exists(fileLocation):
shutil.rmtree(fileLocation)

ParquetTools.writeTable(table, fileLocation)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test seems incomplete. Shouldn't we read the data back in and compare it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally yes, but this test something that wasn't being tested before, meaning, that writing the file doesn't blow up.
Ok, let's test more...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better. Might be better still with TstUtils.assertTableEquals(table, table2) if that's easy to do from Python.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, but is not easy to do from python:

ERROR [0.057s]: testDecimal (test.testParquetTools.TestParquetTools)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/python/test/testParquetTools.py", line 97, in testDecimal
    tstutils = jpy.get_type('io.deephaven.engine.table.impl.TstUtils')
ValueError: Java class 'io.deephaven.engine.table.impl.TstUtils' not found

My guess is we don't export the test packages when we build the docker image (which makes sense).
I used TableTools.diff instead, which I think should give a similar value.

table2 = ParquetTools.readTable(fileLocation)
joinRowCount = table.exactJoin(table2, 'decimal_value').size()
self.assertEquals(table.size(), joinRowCount)

self.assertTrue(os.path.exists(fileLocation))
shutil.rmtree(baseDir)

@classmethod
def tearDownClass(cls):
# remove the junk definitions created in the tests, if they exist...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.jetbrains.annotations.Nullable;

import java.io.Externalizable;
import java.math.BigDecimal;

/**
* Utility class to concentrate {@link ObjectCodec} lookups.
Expand All @@ -37,8 +38,7 @@ public static boolean codecRequired(@NotNull final ColumnDefinition<?> columnDef
* @return Whether a codec is required
*/
public static boolean codecRequired(@NotNull final Class<?> dataType, @Nullable final Class<?> componentType) {
if (dataType.isPrimitive() || dataType == Boolean.class || dataType == DateTime.class
|| dataType == String.class || StringSet.class.isAssignableFrom(dataType)) {
if (dataType.isPrimitive() || noCodecRequired(dataType) || StringSet.class.isAssignableFrom(dataType)) {
// Primitive, basic, and special types do not require codecs
return false;
}
Expand All @@ -48,17 +48,15 @@ public static boolean codecRequired(@NotNull final Class<?> dataType, @Nullable
"Array type " + dataType + " does not match component type " + componentType);
}
// Arrays of primitives or basic types do not require codecs
return !(componentType.isPrimitive() || componentType == Boolean.class || componentType == DateTime.class
|| componentType == String.class);
return !(componentType.isPrimitive() || noCodecRequired(dataType));
}
if (Vector.class.isAssignableFrom(dataType)) {
if (componentType == null) {
throw new IllegalArgumentException("Vector type " + dataType + " requires a component type");
}
if (ObjectVector.class.isAssignableFrom(dataType)) {
// Vectors of basic types do not require codecs
return !(componentType == Boolean.class || componentType == DateTime.class
|| componentType == String.class);
return !noCodecRequired(dataType);
}
// VectorBases of primitive types do not require codecs
return false;
Expand All @@ -67,6 +65,17 @@ public static boolean codecRequired(@NotNull final Class<?> dataType, @Nullable
return true;
}

private static boolean noCodecRequired(@NotNull final Class<?> dataType) {
return dataType == Boolean.class ||
dataType == DateTime.class ||
dataType == String.class ||
// A BigDecimal column maps to a logical type of decimal, with
// appropriate precision and scale calculated from column data,
// unless the user explicitly requested something else
// via instructions.
dataType == BigDecimal.class;
}

/**
* Test whether an explicit codec has been set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2402,7 +2402,7 @@ static ColumnSource<?> maybeTransformToPrimitive(final ColumnSource<?> columnSou
} else {
// noinspection unchecked
final ColumnSource<DateTime> columnSourceAsDateTime = (ColumnSource<DateTime>) columnSource;
return new DatetimeAsLongColumnSource(columnSourceAsDateTime);
return new DateTimeAsLongColumnSource(columnSourceAsDateTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import org.jetbrains.annotations.NotNull;

/**
* Reinterpret result {@link ColumnSource} implementations that translates {@link Boolean} to {@code byte} values.
* Reinterpret result {@link ColumnSource} implementations that translates {@link DateTime} to {@code long} values.
*/
@AbstractColumnSource.IsSerializable(value = true)
public class DatetimeAsLongColumnSource extends AbstractColumnSource<Long> implements MutableColumnSourceGetDefaults.ForLong {
public class DateTimeAsLongColumnSource extends AbstractColumnSource<Long> implements MutableColumnSourceGetDefaults.ForLong {

private final ColumnSource<DateTime> alternateColumnSource;

public DatetimeAsLongColumnSource(@NotNull final ColumnSource<DateTime> alternateColumnSource) {
public DateTimeAsLongColumnSource(@NotNull final ColumnSource<DateTime> alternateColumnSource) {
super(long.class);
this.alternateColumnSource = alternateColumnSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public static ColumnSource<Long> dateTimeToLongSource(ColumnSource<?> source) {
return source.reinterpret(long.class);
} else {
// noinspection unchecked
return new DatetimeAsLongColumnSource((ColumnSource<DateTime>) source);
return new DateTimeAsLongColumnSource((ColumnSource<DateTime>) source);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import com.google.common.io.LittleEndianDataOutputStream;
import com.google.common.primitives.Ints;
import gnu.trove.iterator.TLongIterator;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
Expand All @@ -18,6 +20,9 @@

import java.io.DataInput;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Iterator;

public interface ChunkInputStreamGenerator extends SafeCloseable {
Expand All @@ -44,12 +49,29 @@ static <T> ChunkInputStreamGenerator makeInputStreamGenerator(
case Object:
if (type.isArray()) {
return new VarListChunkInputStreamGenerator<>(type, chunk.asObjectChunk());
} else if (type == String.class) {
}
if (type == String.class) {
return new VarBinaryChunkInputStreamGenerator<>(String.class, chunk.asObjectChunk(), (out, str) -> {
out.write(str.getBytes(Charsets.UTF_8));
});
}
// TODO (core#513): BigDecimal, BigInteger
if (type == BigInteger.class) {
return new VarBinaryChunkInputStreamGenerator<>(BigInteger.class, chunk.asObjectChunk(), (out, item) -> {
out.write(item.toByteArray());
});
}
if (type == BigDecimal.class) {
return new VarBinaryChunkInputStreamGenerator<>(BigDecimal.class, chunk.asObjectChunk(), (out, item) -> {
final BigDecimal normal = item.stripTrailingZeros();
final int v = normal.scale();
// Write as little endian, arrow endianness.
out.write(0xFF & v);
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
out.write(0xFF & (v >> 8));
out.write(0xFF & (v >> 16));
out.write(0xFF & (v >> 24));
out.write(normal.unscaledValue().toByteArray());
});
}
// TODO (core#936): support column conversion modes

return new VarBinaryChunkInputStreamGenerator<>(type, chunk.asObjectChunk(), (out, item) -> {
Expand Down Expand Up @@ -109,15 +131,39 @@ static <T> Chunk<Values> extractChunkFromInputStream(
Double.BYTES, options,fieldNodeIter, bufferInfoIter, is);
case Object:
if (type.isArray()) {
return VarListChunkInputStreamGenerator.extractChunkFromInputStream(options, type, fieldNodeIter, bufferInfoIter, is) ;
return VarListChunkInputStreamGenerator.extractChunkFromInputStream(
options, type, fieldNodeIter, bufferInfoIter, is);
}

if (options.columnConversionMode().equals(BarrageSubscriptionOptions.ColumnConversionMode.Stringify)) {
if (type == BigInteger.class) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
BigInteger::new
);
}
if (type == BigDecimal.class) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(
is,
fieldNodeIter,
bufferInfoIter,
(final byte[] buf, final int offset, final int length) -> {
// read the int scale value as little endian, arrow's endianness.
final byte b1 = buf[offset];
final byte b2 = buf[offset + 1];
final byte b3 = buf[offset + 2];
final byte b4 = buf[offset + 3];
final int scale = b4 << 24 | (b3 & 0xFF) << 16 | (b2 & 0xFF) << 8 | (b1 & 0xFF);
return new BigDecimal(new BigInteger(buf, offset + 4, length - 4), scale);
}
);
}
if (type == String.class ||
options.columnConversionMode().equals(BarrageSubscriptionOptions.ColumnConversionMode.Stringify)) {
return VarBinaryChunkInputStreamGenerator.extractChunkFromInputStream(is, fieldNodeIter, bufferInfoIter,
(buf, off, len) -> new String(buf, off, len, Charsets.UTF_8));
} else {
throw new UnsupportedOperationException("Do not yet support column conversion mode: " + options.columnConversionMode());
}
throw new UnsupportedOperationException("Do not yet support column conversion mode: " + options.columnConversionMode());
default:
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.deephaven.extensions.barrage.chunk;

import com.google.common.base.Charsets;
import com.google.common.io.LittleEndianDataOutputStream;
import gnu.trove.iterator.TLongIterator;
import io.deephaven.UncheckedDeephavenException;
Expand Down Expand Up @@ -37,15 +36,13 @@ public class VarBinaryChunkInputStreamGenerator<T> extends BaseChunkInputStreamG

private byte[] bytes;
private WritableIntChunk<ChunkPositions> offsets;
private byte[] stringBytes;
private WritableIntChunk<ChunkPositions> stringOffsets;

public interface Appender<T> {
void append(OutputStream out, T item) throws IOException;
}

public interface Mapper<T> {
T constructFrom(final byte[] buf, int offset, int length) throws IOException;
T constructFrom(byte[] buf, int offset, int length) throws IOException;
}

VarBinaryChunkInputStreamGenerator(final Class<T> type, final ObjectChunk<T, Values> chunk,
Expand Down Expand Up @@ -76,25 +73,6 @@ private synchronized void computePayload() throws IOException {
}
}

private synchronized void computeStringPayload() throws IOException {
if (stringBytes != null) {
return;
}

stringOffsets = WritableIntChunk.makeWritableChunk(chunk.size() + 1);

try (final BarrageProtoUtil.ExposedByteArrayOutputStream baos = new BarrageProtoUtil.ExposedByteArrayOutputStream()) {
stringOffsets.set(0, 0);
for (int i = 0; i < chunk.size(); ++i) {
if (chunk.get(i) != null) {
baos.write(chunk.get(i).toString().getBytes(Charsets.UTF_8));
}
stringOffsets.set(i + 1, baos.size());
}
stringBytes = baos.peekBuffer();
}
}

@Override
public void close() {
if (REFERENCE_COUNT_UPDATER.decrementAndGet(this) == 0) {
Expand All @@ -104,21 +82,13 @@ public void close() {
if (offsets != null) {
offsets.close();
}
if (stringOffsets != null) {
stringOffsets.close();
}
}
}

@Override
public DrainableColumn getInputStream(final BarrageSubscriptionOptions options, final @Nullable RowSet subset) throws IOException {
if (type == String.class) {
computePayload();
return new ObjectChunkInputStream(options, offsets, bytes, subset);
}

computeStringPayload();
return new ObjectChunkInputStream(options, stringOffsets, stringBytes, subset);
computePayload();
return new ObjectChunkInputStream(options, offsets, bytes, subset);
}

private class ObjectChunkInputStream extends BaseChunkInputStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private static void pushColumnTypesFromAvroField(
}
final Schema.Type fieldType = fieldSchema.getType();
pushColumnTypesFromAvroField(
columnsOut, mappedOut, prefix, field, fieldName, fieldSchema, mappedName, fieldType,
columnsOut, mappedOut, prefix, fieldName, fieldSchema, mappedName, fieldType,
fieldNameToColumnName);

}
Expand All @@ -172,7 +172,6 @@ private static void pushColumnTypesFromAvroField(
final List<ColumnDefinition<?>> columnsOut,
final Map<String, String> mappedOut,
final String prefix,
final Schema.Field field,
final String fieldName,
final Schema fieldSchema,
final String mappedName,
Expand Down Expand Up @@ -207,13 +206,13 @@ private static void pushColumnTypesFromAvroField(
case UNION:
final Schema effectiveSchema = Utils.getEffectiveSchema(fieldName, fieldSchema);
pushColumnTypesFromAvroField(
columnsOut, mappedOut, prefix, field, fieldName, effectiveSchema, mappedName,
columnsOut, mappedOut, prefix, fieldName, effectiveSchema, mappedName,
effectiveSchema.getType(),
fieldNameToColumnName);
return;
case RECORD:
// Linearize any nesting.
for (final Schema.Field nestedField : field.schema().getFields()) {
for (final Schema.Field nestedField : fieldSchema.getFields()) {
jcferretti marked this conversation as resolved.
Show resolved Hide resolved
pushColumnTypesFromAvroField(
columnsOut, mappedOut,
prefix + fieldName + NESTED_FIELD_NAME_SEPARATOR,
Expand Down
Loading