Skip to content

Commit

Permalink
Hand wrapped KafkaTools, ParquetTools
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Jan 31, 2022
1 parent 70568c4 commit eef89e3
Show file tree
Hide file tree
Showing 16 changed files with 1,317 additions and 18 deletions.
20 changes: 13 additions & 7 deletions pyintegration/deephaven2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@

__version__ = "0.9.0"

if not jpy.has_jvm():
from ._utils.bootstrap import build_py_session
from .dherror import DHError
from ._init.bootstrap import build_py_session

try:
build_py_session()
except Exception as e:
raise DHError(e, "deephaven initialization failed.") from e
else:
from deephaven2.constants import SortDirection
from deephaven2.csv import read as read_csv
from deephaven2.csv import write as write_csv
from deephaven2.table_factory import empty_table, time_table, merge, merge_sorted, new_table
from deephaven2.utils import get_workspace_root
import deephaven2.stream.kafka.consumer as kafka_consumer
import deephaven2.stream.kafka.producer as kafka_producer

from .dherror import DHError
from .constants import SortDirection
from .csv import read as read_csv
from .csv import write as write_csv
from .table_factory import empty_table, time_table, merge, merge_sorted, new_table
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
""" This module supports bootstrapping a Deephaven Python session from Python.
It is used mainly by Deephaven internally for running Python tests.
"""

import os

import jpy
Expand All @@ -14,6 +18,7 @@


def build_py_session():
""" This function uses the default DH property file to initialize a Python session. """
if not jpy.has_jvm():
os.environ['JAVA_VERSION'] = '11'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#

"""Utilities for starting the Deephaven JVM."""
""" Utilities for starting the Deephaven JVM. """

import os
import re
Expand Down
5 changes: 5 additions & 0 deletions pyintegration/deephaven2/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

_JColumnHeader = jpy.get_type("io.deephaven.qst.column.header.ColumnHeader")
_JColumn = jpy.get_type("io.deephaven.qst.column.Column")
_JColumnDefinition = jpy.get_type("io.deephaven.engine.table.ColumnDefinition")


class ColumnType(Enum):
Expand Down Expand Up @@ -41,6 +42,10 @@ class Column:
def j_column_header(self):
return _JColumnHeader.of(self.name, self.data_type.qst_type)

@property
def j_column_definition(self):
return _JColumnDefinition.fromGenericType(self.name, self.data_type.qst_type.clazz(), self.component_type)


@dataclass
class InputColumn(Column):
Expand Down
18 changes: 11 additions & 7 deletions pyintegration/deephaven2/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#
""" This module defines the data types supported by the Deephaven engine.
Each data type is represented by a DType class which supports creating arrays of the same type and more.
Each data type is represented by a DType instance which supports creating arrays of the same type and more.
"""
from __future__ import annotations

from typing import Any, Sequence, Callable, Iterable
from typing import Iterable, Any, Sequence, Callable, Optional

import jpy

Expand All @@ -27,7 +27,7 @@ class DType:
_j_name_type_map = {}

@classmethod
def from_jtype(cls, j_class: Any) -> DType:
def from_jtype(cls, j_class: Any) -> Optional[DType]:
if not j_class:
return None

Expand All @@ -50,10 +50,10 @@ def __repr__(self):
return self.j_name

def __call__(self, *args, **kwargs):
try:
if not self.is_primitive:
return self.j_type(*args, **kwargs)
except Exception as e:
raise DHError(e, f"failed to create an instance of {self.j_name}") from e
else:
raise DHError(message="primitive types are not callable.")

def array(self, size: int):
""" Creates a Java array of the same data type of the specified size.
Expand Down Expand Up @@ -137,10 +137,14 @@ def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None):


def j_array_list(values: Iterable):
j_list = jpy.get_type("java.util.ArrayList")(len(values))
j_list = jpy.get_type("java.util.ArrayList")()
try:
for v in values:
j_list.add(v)
return j_list
except Exception as e:
raise DHError(e, "failed to create a Java ArrayList from the Python collection.") from e


def is_java_type(obj):
return isinstance(obj, jpy.JType)
206 changes: 206 additions & 0 deletions pyintegration/deephaven2/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
""" The Parquet integration module. """
from dataclasses import dataclass
from typing import List

import jpy

from deephaven2 import DHError
from deephaven2.column import Column
from deephaven2.dtypes import is_java_type
from deephaven2.table import Table

_JParquetTools = jpy.get_type("io.deephaven.parquet.table.ParquetTools")
_JFile = jpy.get_type("java.io.File")
_JCompressionCodecName = jpy.get_type("org.apache.parquet.hadoop.metadata.CompressionCodecName")
_JParquetInstructions = jpy.get_type("io.deephaven.parquet.table.ParquetInstructions")
_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition")


@dataclass
class ColumnInstruction:
column_name: str = None
parquet_column_name: str = None
codec_name: str = None
codec_args: str = None
use_dictionary: bool = False


def _build_parquet_instructions(col_instructions: List[ColumnInstruction] = None, compression_codec_name: str = None,
max_dictionary_keys: int = None, is_legacy_parquet: bool = False,
for_read: bool = True):
if not any([col_instructions, compression_codec_name, max_dictionary_keys, is_legacy_parquet]):
return None

builder = _JParquetInstructions.builder()
if col_instructions is not None:
for ci in col_instructions:
if for_read and not ci.parquet_column_name:
raise ValueError("must specify the parquet column name for read.")
if not for_read and not ci.column_name:
raise ValueError("must specify the table column name for write.")

builder.addColumnNameMapping(ci.column_name, ci.parquet_column_name)
if ci.column_name:
if ci.codec_name:
builder.addColumnCodec(ci.column_name, ci.codec_name, ci.codec_args)
builder.useDictionary(ci.column_name, ci.use_dictionary)
if compression_codec_name:
builder.setCompressionCodecName(compression_codec_name)

if max_dictionary_keys is not None:
builder.setMaximumDictionaryKeys(max_dictionary_keys)

if is_legacy_parquet:
builder.setIsLegacyParquet(True)

return builder.build()


def read_table(path: str, col_instructions: List[ColumnInstruction] = None, is_legacy_parquet: bool = False) -> Table:
""" Reads in a table from a single parquet, metadata file, or directory with recognized layout.
Args:
path (str): the file or directory to examine
col_instructions (List[ColumnInstruction]): instructions for customizations while reading
is_legacy_parquet (bool): if the parquet data is legacy
Returns:
a table
Raises:
DHError
"""

read_instructions = _build_parquet_instructions(col_instructions=col_instructions,
is_legacy_parquet=is_legacy_parquet)

try:
if read_instructions:
return Table(j_table=_JParquetTools.readTable(path, read_instructions))
else:
return Table(j_table=_JParquetTools.readTable(path))
except Exception as e:
raise DHError(e, "failed to read parquet data.") from e


def _get_file_object(arg):
""" Helper function for easily creating a java file object from a path string
Args:
arg: path string, or list of path strings
Returns:
a java File object, or java array of File objects
Raises:
ValueError
"""
if is_java_type(arg):
return arg
elif isinstance(arg, str):
return _JFile(arg)
elif isinstance(arg, list):
# NB: map() returns an iterator in python 3, so list comprehension is appropriate here
return jpy.array("java.io.File", [_JFile(el) for el in arg])
else:
raise ValueError("Method accepts only a java type, string, or list of strings as input. "
"Got {}".format(type(arg)))


def delete_table(path: str):
""" Deletes a Parquet table on disk.
Args:
path (str): path to delete
Raises:
DHError
"""
try:
_JParquetTools.deleteTable(_get_file_object(path))
except Exception as e:
raise DHError(e, "failed to delate a parquet table on disk.") from e


def write_table(table: Table, destination: str, col_definitions: List[Column] = None,
col_instructions: List[ColumnInstruction] = None, compression_codec_name: str = None,
max_dictionary_keys: int = None):
""" Write a table to a Parquet file.
Args:
table (Table): the source table
destination (str): destination file path; the file name should end in ".parquet" extension. If the path includes
non-existing directories they are created. If there is an error any intermediate directories previously
created are removed; note this makes this method unsafe for concurrent use
col_definitions (List[Column]): the column definitions to use
col_instructions (List[ColumnInstruction]): instructions for customizations while writing
compression_codec_name (str): the default compression codec to use, default is SNAPPY
max_dictionary_keys (int): the maximum dictionary keys allowed
Raises:
DHError
"""
write_instructions = _build_parquet_instructions(col_instructions=col_instructions,
compression_codec_name=compression_codec_name,
max_dictionary_keys=max_dictionary_keys,
for_read=False)

table_definition = None
if col_definitions is not None:
table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions])

try:
if table_definition:
if write_instructions:
_JParquetTools.writeTable(table.j_table, destination, table_definition, write_instructions)
else:
_JParquetTools.writeTable(table.j_table, _get_file_object(destination), table_definition)
else:
if write_instructions:
_JParquetTools.writeTable(table.j_table, _get_file_object(destination), write_instructions)
else:
_JParquetTools.writeTable(table.j_table, destination)
except Exception as e:
raise DHError(e, "failed to write to parquet data.") from e


def write_tables(tables: List[Table], destinations: List[str], col_definitions: List[Column],
col_instructions: List[ColumnInstruction] = None, compression_codec_name: str = None,
max_dictionary_keys: int = None, grouping_cols: List[str] = None):
""" Writes tables to disk in parquet format to a supplied set of destinations. If you specify grouping columns,
there must already be grouping information for those columns in the sources. This can be accomplished with
.groupBy(<grouping columns>).ungroup() or .sort(<grouping column>).
Args:
tables (List[Table]): the source tables
destinations (List[str]): the destinations paths. Any non existing directories in the paths provided are
created. If there is an error any intermediate directories previously created are removed; note this makes
this method unsafe for concurrent use
col_definitions (List[Column]): the column definitions to use
col_instructions (List[ColumnInstruction]): instructions for customizations while writing
compression_codec_name (str): the compression codec to use, default is SNAPPY
max_dictionary_keys (int): the maximum dictionary keys allowed
grouping_cols (List[str]): the group column names
Raises:
DHError
"""
write_instructions = _build_parquet_instructions(col_instructions=col_instructions,
compression_codec_name=compression_codec_name,
max_dictionary_keys=max_dictionary_keys,
for_read=False)

table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions])

try:
if grouping_cols:
_JParquetTools.writeParquetTables([t.j_table for t in tables], table_definition, write_instructions,
_get_file_object(destinations), grouping_cols)
else:
_JParquetTools.writeTables([t.j_table for t in tables], table_definition,
_get_file_object(destinations))
except Exception as e:
raise DHError(e, "write multiple table to parquet data failed.") from e
47 changes: 47 additions & 0 deletions pyintegration/deephaven2/stream/kafka/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
""" The private utility module for kafka. """
import jpy

_JProperties = jpy.get_type("java.util.Properties")
_JHashMap = jpy.get_type("java.util.HashMap")
_JHashSet = jpy.get_type("java.util.HashSet")
_JPythonTools = jpy.get_type("io.deephaven.integrations.python.PythonTools")
IDENTITY = object() # Ensure IDENTITY is unique.


def _dict_to_j_properties(d):
r = _JProperties()
for key, value in d.items():
if value is None:
value = ''
r.setProperty(key, value)
return r


def _dict_to_j_map(d):
if d is None:
return None
r = _JHashMap()
for key, value in d.items():
if value is None:
value = ''
r.put(key, value)
return r


def _dict_to_j_func(dict_mapping, default_value):
java_map = _dict_to_j_map(dict_mapping)
if default_value is IDENTITY:
return _JPythonTools.functionFromMapWithIdentityDefaults(java_map)
return _JPythonTools.functionFromMapWithDefault(java_map, default_value)


def _seq_to_j_set(s):
if s is None:
return None
r = _JHashSet()
for v in s:
r.add(v)
return r
Loading

0 comments on commit eef89e3

Please sign in to comment.