diff --git a/Integrations/build.gradle b/Integrations/build.gradle index e612b984522..11481fc033a 100644 --- a/Integrations/build.gradle +++ b/Integrations/build.gradle @@ -133,7 +133,7 @@ Closure composeConfig = { task -> def pyTest = runInDocker("test-py-37", 'python', ['python3', '-m', 'xmlrunner', 'discover', '-v', '-o', '/out/report'], composeConfig) def pyFunctionalTest = runInDocker('py-functional-test', 'python', ['/bin/bash', 'run-functional-tests.sh']) -def pyTest2 = runInDocker('test-py-deephaven2', '../pyintegration', ['python3', '-m', 'xmlrunner', 'discover', 'tests', '-v', '-o', '/out/report']) +def pyTest2 = runInDocker('test-py-deephaven2', '../pyintegration', ['python3', '-m', 'xmlrunner', 'discover', 'tests', '-v', '-o', '/out/report'], composeConfig) pyTest.configure({ onlyIf { TestTools.shouldRunTests(project) } }) diff --git a/pyintegration/deephaven2/__init__.py b/pyintegration/deephaven2/__init__.py index 227fd001cdc..c8c49912f7f 100644 --- a/pyintegration/deephaven2/__init__.py +++ b/pyintegration/deephaven2/__init__.py @@ -2,20 +2,24 @@ # Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending # """Deephaven Python Integration Package provides the ability to access the Deephaven's query engine natively and thus -unlocks the unique and tremendous power of Deephaven to the Python community. +unlocks the unique power of Deephaven to the Python community. """ -import jpy __version__ = "0.9.0" -if not jpy.has_jvm(): - from ._utils.bootstrap import build_py_session +from ._init.bootstrap import build_py_session +from .dherror import DHError +try: build_py_session() +except Exception as e: + raise DHError(e, "deephaven initialization failed.") from e +else: + from .constants import SortDirection + from .csv import read as read_csv + from .csv import write as write_csv + from .table_factory import empty_table, time_table, merge, merge_sorted, new_table + from .stream.kafka import consumer as kafka_consumer + from .stream.kafka import producer as kafka_producer -from .dherror import DHError -from .constants import SortDirection -from .csv import read as read_csv -from .csv import write as write_csv -from .table_factory import empty_table, time_table, merge, merge_sorted, new_table diff --git a/pyintegration/deephaven2/_utils/bootstrap.py b/pyintegration/deephaven2/_init/bootstrap.py similarity index 89% rename from pyintegration/deephaven2/_utils/bootstrap.py rename to pyintegration/deephaven2/_init/bootstrap.py index a201a6a2fbf..f5991416a98 100644 --- a/pyintegration/deephaven2/_utils/bootstrap.py +++ b/pyintegration/deephaven2/_init/bootstrap.py @@ -1,6 +1,8 @@ # # Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending # +""" This module supports bootstrapping a Deephaven Python Script session from Python.""" + import os import jpy @@ -14,6 +16,8 @@ def build_py_session(): + """ This function uses the default DH property file to embed the Deephaven server and starts a Deephaven Python + Script session. """ if not jpy.has_jvm(): os.environ['JAVA_VERSION'] = '11' diff --git a/pyintegration/deephaven2/_utils/start_jvm.py b/pyintegration/deephaven2/_init/start_jvm.py similarity index 99% rename from pyintegration/deephaven2/_utils/start_jvm.py rename to pyintegration/deephaven2/_init/start_jvm.py index 64988aa75be..cd178eac460 100644 --- a/pyintegration/deephaven2/_utils/start_jvm.py +++ b/pyintegration/deephaven2/_init/start_jvm.py @@ -1,8 +1,7 @@ # # Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending # - -"""Utilities for starting the Deephaven JVM.""" +""" Utilities for starting the Deephaven JVM. """ import os import re diff --git a/pyintegration/deephaven2/_jcompat.py b/pyintegration/deephaven2/_jcompat.py new file mode 100644 index 00000000000..5669fb67d3e --- /dev/null +++ b/pyintegration/deephaven2/_jcompat.py @@ -0,0 +1,59 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +""" This module provides Java compatibility support including convenience functions to create some widely used Java +data structures from corresponding Python ones in order to be able to call Java methods. """ +from typing import Any, Iterable, Dict, Set + +import jpy + + +def is_java_type(obj: Any) -> bool: + """ Returns True if the object is originated in Java. """ + return isinstance(obj, jpy.JType) + + +def j_array_list(values: Iterable = None) -> jpy.JType: + """ Creates a Java ArrayList instance from an iterable. """ + if values is None: + return None + r = jpy.get_type("java.util.ArrayList")(len(list(values))) + for v in values: + r.add(v) + return r + + +def j_hashmap(d: Dict = None) -> jpy.JType: + """ Creates a Java HashMap from a dict. """ + if d is None: + return None + + r = jpy.get_type("java.util.HashMap")() + for key, value in d.items(): + if value is None: + value = '' + r.put(key, value) + return r + + +def j_hashset(s: Set = None) -> jpy.JType: + """ Creates a Java HashSet from a set. """ + if s is None: + return None + + r = jpy.get_type("java.util.HashSet")() + for v in s: + r.add(v) + return r + + +def j_properties(d: Dict = None) -> jpy.JType: + """ Creates a Java Properties from a dict. """ + if d is None: + return None + r = jpy.get_type("java.util.Properties")() + for key, value in d.items(): + if value is None: + value = '' + r.setProperty(key, value) + return r \ No newline at end of file diff --git a/pyintegration/deephaven2/_wrapper_abc.py b/pyintegration/deephaven2/_wrapper_abc.py new file mode 100644 index 00000000000..3d0355b8964 --- /dev/null +++ b/pyintegration/deephaven2/_wrapper_abc.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +""" This module defines an Abstract Class for Java object wrappers. + +The primary purpose of this ABC is to enable downstream code to retrieve the wrapped Java objects in a uniform way. +""" +from abc import ABC, abstractmethod + +import jpy + + +class JObjectWrapper(ABC): + @property + @abstractmethod + def j_object(self) -> jpy.JType: + ... \ No newline at end of file diff --git a/pyintegration/deephaven2/column.py b/pyintegration/deephaven2/column.py index 52211b495fb..c22ab1f12eb 100644 --- a/pyintegration/deephaven2/column.py +++ b/pyintegration/deephaven2/column.py @@ -3,16 +3,17 @@ # from dataclasses import dataclass, field from enum import Enum -from typing import Sequence, Any +from typing import Sequence import jpy -from deephaven2.dtypes import DType import deephaven2.dtypes as dtypes from deephaven2 import DHError +from deephaven2.dtypes import DType _JColumnHeader = jpy.get_type("io.deephaven.qst.column.header.ColumnHeader") _JColumn = jpy.get_type("io.deephaven.qst.column.Column") +_JColumnDefinition = jpy.get_type("io.deephaven.engine.table.ColumnDefinition") class ColumnType(Enum): @@ -41,6 +42,10 @@ class Column: def j_column_header(self): return _JColumnHeader.of(self.name, self.data_type.qst_type) + @property + def j_column_definition(self): + return _JColumnDefinition.fromGenericType(self.name, self.data_type.qst_type.clazz(), self.component_type) + @dataclass class InputColumn(Column): @@ -53,9 +58,9 @@ def __post_init__(self): self.j_column = _JColumn.empty(self.j_column_header) else: if self.data_type.is_primitive: - self.j_column = _JColumn.ofUnsafe(self.name, self.data_type.array_from(self.input_data)) + self.j_column = _JColumn.ofUnsafe(self.name, dtypes.array(self.data_type, self.input_data)) else: - self.j_column = _JColumn.of(self.j_column_header, self.data_type.array_from(self.input_data)) + self.j_column = _JColumn.of(self.j_column_header, dtypes.array(self.data_type, self.input_data)) except Exception as e: raise DHError(e, "failed to create an InputColumn.") from e diff --git a/pyintegration/deephaven2/config/__init__.py b/pyintegration/deephaven2/config/__init__.py new file mode 100644 index 00000000000..9ef6ee89768 --- /dev/null +++ b/pyintegration/deephaven2/config/__init__.py @@ -0,0 +1,31 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +""" This module provides access to the Deephaven server configuration. """ +import jpy + +from deephaven2 import DHError +from deephaven2.time import TimeZone + +_JDHConfig = jpy.get_type("io.deephaven.configuration.Configuration") +_JDateTimeZone = jpy.get_type("org.joda.time.DateTimeZone") + + +def get_log_dir() -> str: + """ Returns the server's log directory. """ + try: + return _JDHConfig.getInstance().getLogDir() + except Exception as e: + raise DHError(e, "failed to get the server's log directory.") from e + + +def get_server_timezone() -> TimeZone: + """ Returns the server's time zone. """ + try: + j_timezone = _JDateTimeZone.forTimeZone(_JDHConfig.getInstance().getServerTimezone()) + for tz in TimeZone: + if j_timezone == tz.value.getTimeZone(): + return tz + raise NotImplementedError("can't find the time zone in the TImeZone Enum.") + except Exception as e: + raise DHError(e, message=f"failed to find a recognized time zone") from e diff --git a/pyintegration/deephaven2/dtypes.py b/pyintegration/deephaven2/dtypes.py index e61e5d00819..8875ffb3ab5 100644 --- a/pyintegration/deephaven2/dtypes.py +++ b/pyintegration/deephaven2/dtypes.py @@ -7,15 +7,18 @@ """ from __future__ import annotations -from typing import Any, Sequence, Callable, Iterable +from typing import Any, Sequence, Callable, Dict, Type import jpy +import numpy as np from deephaven2 import DHError _JQstType = jpy.get_type("io.deephaven.qst.type.Type") _JTableTools = jpy.get_type("io.deephaven.engine.util.TableTools") +_j_name_type_map: Dict[str, DType] = {} + def _qst_custom_type(cls_name: str): return _JQstType.find(_JTableTools.typeFromName(cls_name)) @@ -24,123 +27,111 @@ def _qst_custom_type(cls_name: str): class DType: """ A class representing a data type in Deephaven.""" - _j_name_type_map = {} - - @classmethod - def from_jtype(cls, j_class: Any) -> DType: - if not j_class: - return None - - j_name = j_class.getName() - dtype = DType._j_name_type_map.get(j_name) - if not dtype: - return cls(j_name=j_name, j_type=j_class) - else: - return dtype - - def __init__(self, j_name: str, j_type: Any = None, qst_type: Any = None, is_primitive: bool = False): + def __init__(self, j_name: str, j_type: Type = None, qst_type: jpy.JType = None, is_primitive: bool = False, + np_type: Any = np.object_): + """ + Args: + j_name (str): the full qualified name of the Java class + j_type (Type): the mapped Python class created by JPY + qst_type (JType): the JPY wrapped object for a instance of QST Type + is_primitive (bool): whether this instance represents a primitive Java type + np_type (Any): an instance of numpy dtype (dtype("int64") or numpy class (e.g. np.int16), default is + np.object_ + """ self.j_name = j_name self.j_type = j_type if j_type else jpy.get_type(j_name) self.qst_type = qst_type if qst_type else _qst_custom_type(j_name) self.is_primitive = is_primitive + self.np_type = np_type - DType._j_name_type_map[j_name] = self + _j_name_type_map[j_name] = self def __repr__(self): return self.j_name def __call__(self, *args, **kwargs): + if self.is_primitive: + raise DHError(message=f"primitive type {self.j_name} is not callable.") + try: return self.j_type(*args, **kwargs) except Exception as e: raise DHError(e, f"failed to create an instance of {self.j_name}") from e - def array(self, size: int): - """ Creates a Java array of the same data type of the specified size. - - Args: - size (int): the size of the array - - Returns: - a Java array - - Raises: - DHError - """ - try: - return jpy.array(self.j_name, size) - except Exception as e: - raise DHError("failed to create a Java array.") from e - - def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None): - """ Creates a Java array of the same data type populated with values from a sequence. - Note: - this method does unsafe casting, meaning precision and values might be lost with down cast - - Args: - seq: a sequence of compatible data, e.g. list, tuple, numpy array, Pandas series, etc. - remap (optional): a callable that takes one value and maps it to another, for handling the translation of - special DH values such as NULL_INT, NAN_INT between Python and the DH engine - - Returns: - a Java array - - Raises: - DHError - """ - try: - if remap: - if not callable(remap): - raise ValueError("Not a callable") - seq = [remap(v) for v in seq] - - return jpy.array(self.j_name, seq) - except Exception as e: - raise DHError(e, f"failed to create a Java {self.j_name} array.") from e - - -class CharDType(DType): - """ The class for char type. """ - - def __init__(self): - super().__init__(j_name="char", qst_type=_JQstType.charType(), is_primitive=True) - - def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None): - if isinstance(seq, str) and not remap: - return super().array_from(seq, remap=ord) - return super().array_from(seq, remap) - - -bool_ = DType(j_name="java.lang.Boolean", qst_type=_JQstType.booleanType()) -byte = DType(j_name="byte", qst_type=_JQstType.byteType(), is_primitive=True) +bool_ = DType(j_name="java.lang.Boolean", qst_type=_JQstType.booleanType(), np_type=np.bool_) +byte = DType(j_name="byte", qst_type=_JQstType.byteType(), is_primitive=True, np_type=np.int8) int8 = byte -short = DType(j_name="short", qst_type=_JQstType.shortType(), is_primitive=True) +short = DType(j_name="short", qst_type=_JQstType.shortType(), is_primitive=True, np_type=np.int16) int16 = short -char = CharDType() -int_ = DType(j_name="int", qst_type=_JQstType.intType(), is_primitive=True) -int32 = int_ -long = DType(j_name="long", qst_type=_JQstType.longType(), is_primitive=True) +char = DType(j_name="char", qst_type=_JQstType.charType(), is_primitive=True, np_type=np.dtype('uint16')) +int32 = DType(j_name="int", qst_type=_JQstType.intType(), is_primitive=True, np_type=np.int32) +long = DType(j_name="long", qst_type=_JQstType.longType(), is_primitive=True, np_type=np.int64) int64 = long -float_ = DType(j_name="float", qst_type=_JQstType.floatType(), is_primitive=True) +int_ = int32 +float_ = DType(j_name="float", qst_type=_JQstType.floatType(), is_primitive=True, np_type=np.float32) single = float_ float32 = float_ -double = DType(j_name="double", qst_type=_JQstType.doubleType(), is_primitive=True) +double = DType(j_name="double", qst_type=_JQstType.doubleType(), is_primitive=True, np_type=np.float64) float64 = double string = DType(j_name="java.lang.String", qst_type=_JQstType.stringType()) BigDecimal = DType(j_name="java.math.BigDecimal") StringSet = DType(j_name="io.deephaven.stringset.StringSet") +DateTime = DType(j_name="io.deephaven.time.DateTime", np_type=np.dtype("datetime64[ns]")) +Period = DType(j_name="io.deephaven.time.Period") PyObject = DType(j_name="org.jpy.PyObject") JObject = DType(j_name="java.lang.Object") -DateTime = DType(j_name="io.deephaven.time.DateTime") -Period = DType(j_name="io.deephaven.time.Period") -def j_array_list(values: Iterable): - j_list = jpy.get_type("java.util.ArrayList")(len(values)) +def array(dtype: DType, seq: Sequence, remap: Callable[[Any], Any] = None) -> jpy.JType: + """ Creates a Java array of the specified data type populated with values from a sequence. + + Note: + this method does unsafe casting, meaning precision and values might be lost with down cast + + Args: + dtype (DType): the component type of the array + seq (Sequence): a sequence of compatible data, e.g. list, tuple, numpy array, Pandas series, etc. + remap (optional): a callable that takes one value and maps it to another, for handling the translation of + special DH values such as NULL_INT, NAN_INT between Python and the DH engine + + Returns: + a Java array + + Raises: + DHError + """ try: - for v in values: - j_list.add(v) - return j_list + if remap: + if not callable(remap): + raise ValueError("Not a callable") + seq = [remap(v) for v in seq] + else: + if isinstance(seq, str) and dtype == char: + return array(char, seq, remap=ord) + + return jpy.array(dtype.j_type, seq) except Exception as e: - raise DHError(e, "failed to create a Java ArrayList from the Python collection.") from e + raise DHError(e, f"failed to create a Java {dtype.j_name} array.") from e + + +def from_jtype(j_class: Any) -> DType: + """ look up a DType that matches the java type, if not found, create a DType for it. """ + if not j_class: + return None + + j_name = j_class.getName() + dtype = _j_name_type_map.get(j_name) + if not dtype: + return DType(j_name=j_name, j_type=j_class, np_type=np.object_) + else: + return dtype + + +def from_np_dtype(np_dtype: np.dtype) -> DType: + """ Look up a DType that matches the numpy.dtype, if not found, return PyObject. """ + for _, dtype in _j_name_type_map.items(): + if np.dtype(dtype.np_type) == np_dtype and dtype.np_type != np.object_: + return dtype + + return PyObject diff --git a/pyintegration/deephaven2/parquet.py b/pyintegration/deephaven2/parquet.py new file mode 100644 index 00000000000..cb351c980fd --- /dev/null +++ b/pyintegration/deephaven2/parquet.py @@ -0,0 +1,192 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +""" This module supports reading an external Parquet files into Deephaven tables and writing Deephaven tables out as +Parquet files. """ +from dataclasses import dataclass +from typing import List + +import jpy + +from deephaven2 import DHError +from deephaven2.column import Column +from deephaven2.table import Table + +_JParquetTools = jpy.get_type("io.deephaven.parquet.table.ParquetTools") +_JFile = jpy.get_type("java.io.File") +_JCompressionCodecName = jpy.get_type("org.apache.parquet.hadoop.metadata.CompressionCodecName") +_JParquetInstructions = jpy.get_type("io.deephaven.parquet.table.ParquetInstructions") +_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") + + +@dataclass +class ColumnInstruction: + """ This class specifies the instructions for reading/writing a Parquet column. """ + column_name: str = None + parquet_column_name: str = None + codec_name: str = None + codec_args: str = None + use_dictionary: bool = False + + +def _build_parquet_instructions(col_instructions: List[ColumnInstruction] = None, compression_codec_name: str = None, + max_dictionary_keys: int = None, is_legacy_parquet: bool = False, + for_read: bool = True): + if not any([col_instructions, compression_codec_name, max_dictionary_keys, is_legacy_parquet]): + return None + + builder = _JParquetInstructions.builder() + if col_instructions is not None: + for ci in col_instructions: + if for_read and not ci.parquet_column_name: + raise ValueError("must specify the parquet column name for read.") + if not for_read and not ci.column_name: + raise ValueError("must specify the table column name for write.") + + builder.addColumnNameMapping(ci.parquet_column_name, ci.column_name) + if ci.column_name: + if ci.codec_name: + builder.addColumnCodec(ci.column_name, ci.codec_name, ci.codec_args) + builder.useDictionary(ci.column_name, ci.use_dictionary) + + if compression_codec_name: + builder.setCompressionCodecName(compression_codec_name) + + if max_dictionary_keys is not None: + builder.setMaximumDictionaryKeys(max_dictionary_keys) + + if is_legacy_parquet: + builder.setIsLegacyParquet(True) + + return builder.build() + + +def read_table(path: str, col_instructions: List[ColumnInstruction] = None, is_legacy_parquet: bool = False) -> Table: + """ Reads in a table from a single parquet, metadata file, or directory with recognized layout. + + Args: + path (str): the file or directory to examine + col_instructions (List[ColumnInstruction]): instructions for customizations while reading + is_legacy_parquet (bool): if the parquet data is legacy + + Returns: + a table + + Raises: + DHError + """ + + try: + read_instructions = _build_parquet_instructions(col_instructions=col_instructions, + is_legacy_parquet=is_legacy_parquet, + for_read=True) + + if read_instructions: + return Table(j_table=_JParquetTools.readTable(path, read_instructions)) + else: + return Table(j_table=_JParquetTools.readTable(path)) + except Exception as e: + raise DHError(e, "failed to read parquet data.") from e + + +def _j_file_array(paths: List[str]): + return jpy.array("java.io.File", [_JFile(el) for el in paths]) + + +def delete_table(path: str) -> None: + """ Deletes a Parquet table on disk. + + Args: + path (str): path to delete + + Raises: + DHError + """ + try: + _JParquetTools.deleteTable(_JFile(path)) + except Exception as e: + raise DHError(e, f"failed to delete a parquet table: {path} on disk.") from e + + +def write_table(table: Table, destination: str, col_definitions: List[Column] = None, + col_instructions: List[ColumnInstruction] = None, compression_codec_name: str = None, + max_dictionary_keys: int = None) -> None: + """ Write a table to a Parquet file. + + Args: + table (Table): the source table + destination (str): destination file path; the file name should end in a ".parquet" extension. If the path + includes non-existing directories they are created. If there is an error, any intermediate directories + previously created are removed; note this makes this method unsafe for concurrent use + col_definitions (List[Column]): the column definitions to use, default is None + col_instructions (List[ColumnInstruction]): instructions for customizations while writing, default is None + compression_codec_name (str): the default compression codec to use, if not specified, defaults to SNAPPY + max_dictionary_keys (int): the maximum dictionary keys allowed, if not specified, defaults to 2^20 (1,048,576) + + Raises: + DHError + """ + try: + write_instructions = _build_parquet_instructions(col_instructions=col_instructions, + compression_codec_name=compression_codec_name, + max_dictionary_keys=max_dictionary_keys, + for_read=False) + + table_definition = None + if col_definitions is not None: + table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions]) + + if table_definition: + if write_instructions: + _JParquetTools.writeTable(table.j_table, destination, table_definition, write_instructions) + else: + _JParquetTools.writeTable(table.j_table, _JFile(destination), table_definition) + else: + if write_instructions: + _JParquetTools.writeTable(table.j_table, _JFile(destination), write_instructions) + else: + _JParquetTools.writeTable(table.j_table, destination) + except Exception as e: + raise DHError(e, "failed to write to parquet data.") from e + + +def write_tables(tables: List[Table], destinations: List[str], col_definitions: List[Column], + col_instructions: List[ColumnInstruction] = None, compression_codec_name: str = None, + max_dictionary_keys: int = None, grouping_cols: List[str] = None): + """ Writes tables to disk in parquet format to a supplied set of destinations. + + If you specify grouping columns, there must already be grouping information for those columns in the sources. + This can be accomplished with .groupBy().ungroup() or .sort(). + + Note that either all the tables are written out successfully or none is. + + Args: + tables (List[Table]): the source tables + destinations (List[str]): the destinations paths. Any non existing directories in the paths provided are + created. If there is an error, any intermediate directories previously created are removed; note this makes + this method unsafe for concurrent use + col_definitions (List[Column]): the column definitions to use + col_instructions (List[ColumnInstruction]): instructions for customizations while writing + compression_codec_name (str): the compression codec to use, if not specified, defaults to SNAPPY + max_dictionary_keys (int): the maximum dictionary keys allowed, if not specified, defaults to 2^20 (1,048,576) + grouping_cols (List[str]): the group column names + + Raises: + DHError + """ + try: + write_instructions = _build_parquet_instructions(col_instructions=col_instructions, + compression_codec_name=compression_codec_name, + max_dictionary_keys=max_dictionary_keys, + for_read=False) + + table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions]) + + if grouping_cols: + _JParquetTools.writeParquetTables([t.j_table for t in tables], table_definition, write_instructions, + _j_file_array(destinations), grouping_cols) + else: + _JParquetTools.writeTables([t.j_table for t in tables], table_definition, + _j_file_array(destinations)) + except Exception as e: + raise DHError(e, "write multiple tables to parquet data failed.") from e diff --git a/pyintegration/deephaven2/server/script_session/__init__.py b/pyintegration/deephaven2/server/script_session/__init__.py index a4337d0b8b8..251df454675 100644 --- a/pyintegration/deephaven2/server/script_session/__init__.py +++ b/pyintegration/deephaven2/server/script_session/__init__.py @@ -40,9 +40,8 @@ def unwrap_to_java_type(object): """ if isinstance(object, JType): return object - # TODO(deephaven-core#1892): Create standard ABC for wrapped Java objects - if isinstance(object, deephaven2.table.Table): - return object.j_table + if isinstance(object, deephaven2._wrapper_abc.JObjectWrapper): + return object.j_object if isinstance(object, FigureWrapper): return object.figure # add more here when/if necessary diff --git a/pyintegration/deephaven2/stream/__init__.py b/pyintegration/deephaven2/stream/__init__.py new file mode 100644 index 00000000000..31a35238fbd --- /dev/null +++ b/pyintegration/deephaven2/stream/__init__.py @@ -0,0 +1,28 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +""" Utility module for the stream subpackage. """ +import jpy + +from deephaven2 import DHError +from deephaven2.table import Table + +_JStreamTableTools = jpy.get_type("io.deephaven.engine.table.impl.StreamTableTools") + + +def stream_to_append_only(table: Table) -> Table: + """ Creates an 'append only' table from the stream table. + + Args: + table (Table): a stream table + + Returns: + an append-only table + + Raises: + DHError + """ + try: + return Table(j_table=_JStreamTableTools.streamToAppendOnlyTable(table.j_table)) + except Exception as e: + raise DHError(e, "failed to create an append-only table.") from e diff --git a/pyintegration/deephaven2/stream/kafka/consumer.py b/pyintegration/deephaven2/stream/kafka/consumer.py new file mode 100644 index 00000000000..10eb972d138 --- /dev/null +++ b/pyintegration/deephaven2/stream/kafka/consumer.py @@ -0,0 +1,239 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +""" The kafka.consumer module supports consuming a Kakfa topic as a Deephaven live table. """ +from enum import Enum +from typing import Dict, Tuple, List, Callable + +import jpy + +from deephaven2 import dtypes +from deephaven2._jcompat import j_hashmap, j_properties +from deephaven2._wrapper_abc import JObjectWrapper +from deephaven2.column import Column +from deephaven2.dherror import DHError +from deephaven2.dtypes import DType +from deephaven2.table import Table + +_JKafkaTools = jpy.get_type("io.deephaven.kafka.KafkaTools") +_JKafkaTools_Consume = jpy.get_type("io.deephaven.kafka.KafkaTools$Consume") +_JPythonTools = jpy.get_type("io.deephaven.integrations.python.PythonTools") +_JTableType = jpy.get_type("io.deephaven.kafka.KafkaTools$TableType") +_ALL_PARTITIONS = _JKafkaTools.ALL_PARTITIONS + + +class TableType(Enum): + """ A Enum that defines the supported Table Type for consuming Kafka. """ + Stream = _JTableType.Stream + """ Consume all partitions into a single interleaved stream table, which will present only newly-available rows + to downstream operations and visualizations.""" + Append = _JTableType.Append + """ Consume all partitions into a single interleaved in-memory append-only table.""" + StreamMap = _JTableType.StreamMap + """ Similar to Stream, but each partition is mapped to a distinct stream table.""" + AppendMap = _JTableType.AppendMap + """ Similar to Append, but each partition is mapped to a distinct in-memory append-only table. """ + + +SEEK_TO_BEGINNING = _JKafkaTools.SEEK_TO_BEGINNING +""" Start consuming at the beginning of a partition. """ +DONT_SEEK = _JKafkaTools.DONT_SEEK +""" Start consuming at the current position of a partition. """ +SEEK_TO_END = _JKafkaTools.SEEK_TO_END +""" Start consuming at the end of a partition. """ + +ALL_PARTITIONS_SEEK_TO_BEGINNING = {-1: SEEK_TO_BEGINNING} +""" For all partitions, start consuming at the beginning. """ +ALL_PARTITIONS_DONT_SEEK = {-1: DONT_SEEK} +""" For all partitions, start consuming at the current position.""" +ALL_PARTITIONS_SEEK_TO_END = {-1: SEEK_TO_END} +""" For all partitions, start consuming at the end. """ + +_ALL_PARTITIONS_SEEK_TO_BEGINNING = _JKafkaTools.ALL_PARTITIONS_SEEK_TO_BEGINNING +_ALL_PARTITIONS_DONT_SEEK = _JKafkaTools.ALL_PARTITIONS_DONT_SEEK +_ALL_PARTITIONS_SEEK_TO_END = _JKafkaTools.ALL_PARTITIONS_SEEK_TO_END + + +class KeyValueSpec(JObjectWrapper): + _j_spec: jpy.JType + + def __init__(self, j_spec): + self._j_spec = j_spec + + @property + def j_object(self) -> jpy.JType: + return self._j_spec + + +KeyValueSpec.IGNORE = KeyValueSpec(_JKafkaTools_Consume.IGNORE) +""" The spec for explicitly ignoring either key or value in a Kafka message when consuming a Kafka stream. """ + +KeyValueSpec.FROM_PROPERTIES = KeyValueSpec(_JKafkaTools.FROM_PROPERTIES) +""" The spec for specifying that when consuming a Kafka stream, the names for the key or value columns can be provided +in the properties as "key.column.name" or "value.column.name" in the config, and otherwise default to "key" or "value". +""" + + +def _dict_to_j_func(dict_mapping: Dict, mapped_only: bool) -> Callable[[str], str]: + java_map = j_hashmap(dict_mapping) + if not mapped_only: + return _JPythonTools.functionFromMapWithIdentityDefaults(java_map) + return _JPythonTools.functionFromMapWithDefault(java_map, None) + + +def _build_column_definitions(ts: List[Tuple[str, DType]]) -> List[Column]: + """ Converts a list of two-element tuples in the form of (name, DType) to a list of Columns. """ + cols = [] + for t in ts: + cols.append(Column(*t)) + return cols + + +def consume(kafka_config: Dict, topic: str, partitions: List[int] = None, offsets: Dict[int, int] = None, + key_spec: KeyValueSpec = None, value_spec: KeyValueSpec = None, + table_type: TableType = TableType.Stream) -> Table: + """ Consume from Kafka to a Deephaven table. + + Args: + kafka_config (Dict): configuration for the associated Kafka consumer and also the resulting table. + Once the table-specific properties are stripped, the remaining one is used to call the constructor of + org.apache.kafka.clients.consumer.KafkaConsumer; pass any KafkaConsumer specific desired configuration here + topic (str): the Kafka topic name + partitions (List[int]) : a list of integer partition numbers, default is None which means all partitions + offsets (Dict[int, int]) : a mapping between partition numbers and offset numbers, and can be one of the + predefined ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK. + The default is None which works the same as ALL_PARTITIONS_DONT_SEEK. The offset numbers may be one + of the predefined SEEK_TO_BEGINNING, SEEK_TO_END, or DONT_SEEK. + key_spec (KeyValueSpec): specifies how to map the Key field in Kafka messages to Deephaven column(s). + It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this + module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which + works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values + for dictionary keys 'deephaven.key.column.name' and 'deephaven.key.column.type', for the single resulting + column name and type + value_spec (KeyValueSpec): specifies how to map the Value field in Kafka messages to Deephaven column(s). + It can be the result of calling one of the functions: simple_spec(),avro_spec() or json_spec() in this + module, or the predefined KeyValueSpec.IGNORE or KeyValueSpec.FROM_PROPERTIES. The default is None which + works the same as KeyValueSpec.FROM_PROPERTIES, in which case, the kafka_config param should include values + for dictionary keys 'deephaven.key.column.name' and 'deephaven.key.column.type', for the single resulting + column name and type + table_type (TableType): a TableType enum, default is TableType.Stream + + Returns: + a Deephaven live table that will update based on Kafka messages consumed for the given topic + + Raises: + DHError + """ + + try: + if partitions is None: + partitions = _ALL_PARTITIONS + else: + j_array = dtypes.array(dtypes.int32, partitions) + partitions = _JKafkaTools.partitionFilterFromArray(j_array) + + if offsets is None or offsets == ALL_PARTITIONS_DONT_SEEK: + offsets = _ALL_PARTITIONS_DONT_SEEK + elif offsets == ALL_PARTITIONS_SEEK_TO_BEGINNING: + offsets = _ALL_PARTITIONS_SEEK_TO_BEGINNING + elif offsets == ALL_PARTITIONS_SEEK_TO_END: + offsets = _ALL_PARTITIONS_SEEK_TO_END + else: + partitions_array = jpy.array('int', list(offsets.keys())) + offsets_array = jpy.array('long', list(offsets.values())) + offsets = _JKafkaTools.partitionToOffsetFromParallelArrays(partitions_array, offsets_array) + + key_spec = KeyValueSpec.FROM_PROPERTIES if key_spec is None else key_spec + value_spec = KeyValueSpec.FROM_PROPERTIES if value_spec is None else value_spec + + if key_spec is KeyValueSpec.IGNORE and value_spec is KeyValueSpec.IGNORE: + raise ValueError( + "at least one argument for 'key' or 'value' must be different from KeyValueSpec.IGNORE") + + kafka_config = j_properties(kafka_config) + return Table(j_table=_JKafkaTools.consumeToTable(kafka_config, topic, partitions, offsets, key_spec.j_object, + value_spec.j_object, + table_type.value)) + except Exception as e: + raise DHError(e, "failed to consume a Kafka stream.") from e + + +def avro_spec(schema: str, schema_version: str = "latest", mapping: Dict[str, str] = None, + mapped_only: bool = False) -> KeyValueSpec: + """ Creates a spec for how to use an Avro schema when consuming a Kafka stream to a Deephaven table. + + Args: + schema (str): the name for a schema registered in a Confluent compatible Schema Server. The associated + 'kafka_config' parameter in the call to consume() should include the key 'schema.registry.url' with + the value of the Schema Server URL for fetching the schema definition + schema_version (str): the schema version to fetch from schema service, default is 'latest' + mapping (Dict[str, str]): a mapping from Avro field name to Deephaven table column name; the fields specified in + the mapping will have their column names defined by it; if 'mapped_only' parameter is False, any other fields + not mentioned in the mapping will use the same Avro field name for Deephaven table column; otherwise, these + unmapped fields will be ignored and will not be present in the resulting table. default is None + mapped_only (bool): whether to ignore Avro fields not present in the 'mapping' argument, default is False + + Returns: + a KeyValueSpec + + Raises: + DHError + """ + try: + if mapping is not None: + mapping = _dict_to_j_func(mapping, mapped_only) + + if mapping: + return KeyValueSpec(j_spec=_JKafkaTools_Consume.avroSpec(schema, schema_version, mapping)) + else: + return KeyValueSpec(j_spec=_JKafkaTools_Consume.avroSpec(schema, schema_version)) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec") from e + + +def json_spec(col_defs: List[Tuple[str, DType]], mapping: Dict = None) -> KeyValueSpec: + """ Creates a spec for how to use JSON data when consuming a Kafka stream to a Deephaven table. + + Args: + col_defs (List[Tuple[str, DType]]): a list of tuples specifying names and types for columns to be + created on the resulting Deephaven table. Tuples contain two elements, a string for column name + and a Deephaven type for column data type. + mapping (Dict): a map from JSON field names to column names defined in the col_defs argument, default is None, + meaning a 1:1 mapping between JSON fields and Deephaven table column names + + Returns: + a KeyValueSpec + + Raises: + DHError + """ + try: + col_defs = [c.j_column_definition for c in _build_column_definitions(col_defs)] + if mapping is None: + return KeyValueSpec(j_spec=_JKafkaTools_Consume.jsonSpec(col_defs)) + mapping = j_hashmap(mapping) + return KeyValueSpec(j_spec=_JKafkaTools_Consume.jsonSpec(col_defs, mapping)) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec") from e + + +def simple_spec(col_name: str, data_type: DType = None) -> KeyValueSpec: + """ Creates a spec that defines a single column to receive the key or value of a Kafka message when consuming a + Kafka stream to a Deephaven table. + + Args: + col_name (str): the Deephaven column name + data_type (DType): the column data type + + Returns: + a KeyValueSpec + + Raises: + DHError + """ + try: + if data_type is None: + return KeyValueSpec(j_spec=_JKafkaTools_Consume.simpleSpec(col_name)) + return KeyValueSpec(j_spec=_JKafkaTools_Consume.simpleSpec(col_name, data_type.qst_type.clazz())) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec") from e diff --git a/pyintegration/deephaven2/stream/kafka/producer.py b/pyintegration/deephaven2/stream/kafka/producer.py new file mode 100644 index 00000000000..223a851007c --- /dev/null +++ b/pyintegration/deephaven2/stream/kafka/producer.py @@ -0,0 +1,190 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +""" The kafka.producer module supports publishing Deephaven tables to Kafka streams. """ +from typing import Dict, Callable, List + +import jpy + +from deephaven2._jcompat import j_hashmap, j_hashset, j_properties +from deephaven2 import DHError +from deephaven2._wrapper_abc import JObjectWrapper +from deephaven2.table import Table + +_JKafkaTools = jpy.get_type("io.deephaven.kafka.KafkaTools") +_JAvroSchema = jpy.get_type("org.apache.avro.Schema") +_JKafkaTools_Produce = jpy.get_type("io.deephaven.kafka.KafkaTools$Produce") + + +class KeyValueSpec(JObjectWrapper): + _j_spec: jpy.JType + + def __init__(self, j_spec): + self._j_spec = j_spec + + @property + def j_object(self) -> jpy.JType: + return self._j_spec + + +KeyValueSpec.IGNORE = KeyValueSpec(_JKafkaTools_Produce.IGNORE) + + +def produce(table: Table, kafka_config: Dict, topic: str, key_spec: KeyValueSpec, value_spec: KeyValueSpec, + last_by_key_columns: bool = False) -> Callable[[], None]: + """ Produce to Kafka from a Deephaven table. + + Args: + table (Table): the source table to publish to Kafka + kafka_config (Dict): configuration for the associated kafka producer + topic (str): the topic name + key_spec (KeyValueSpec): specifies how to map table column(s) to the Key field in produced Kafka messages. + This should be the result of calling one of the functions simple_spec(), avro_spec() or json_spec() in this + module, or the constant KeyValueSpec.IGNORE + value_spec (KeyValueSpec): specifies how to map table column(s) to the Value field in produced Kafka messages. + This should be the result of calling one of the functions simple_spec(), avro_spec() or json_spec() in this, + or the constant KeyValueSpec.IGNORE + last_by_key_columns (bool): whether to publish only the last record for each unique key, Ignored if key_spec is + KeyValueSpec.IGNORE. Otherwise, if last_by_key_columns is true this method will internally perform a last_by + aggregation on table grouped by the input columns of key_spec and publish to Kafka from the result. + + Returns: + a callback that, when invoked, stops publishing and cleans up subscriptions and resources. + Users should hold to this callback to ensure liveness for publishing for as long as this + publishing is desired, and once not desired anymore they should invoke it + + Raises: + DHError + """ + try: + if key_spec is KeyValueSpec.IGNORE and value_spec is KeyValueSpec.IGNORE: + raise ValueError( + "at least one argument for 'key_spec' or 'value_spec' must be different from KeyValueSpec.IGNORE") + + kafka_config = j_properties(kafka_config) + runnable = _JKafkaTools.produceFromTable(table.j_table, kafka_config, topic, key_spec.j_object, + value_spec.j_object, + last_by_key_columns) + + def cleanup(): + try: + runnable.run() + except Exception as ex: + raise DHError(ex, "failed to stop publishing to Kafka and the clean-up.") from ex + + return cleanup + except Exception as e: + raise DHError(e, "failed to start producing Kafka messages.") from e + + +def avro_spec(schema: str, schema_version: str = "latest", field_to_col_mapping: Dict[str, str] = None, + timestamp_field: str = None, include_only_columns: List[str] = None, exclude_columns: List[str] = None, + publish_schema: bool = False, schema_namespace: str = None, + column_properties: Dict[str, str] = None) -> KeyValueSpec: + """ Creates a spec for how to use an Avro schema to produce a Kafka stream from a Deephaven table. + + Args: + schema (str): the name for a schema registered in a Confluent compatible Schema Server. The associated + 'kafka_config' parameter in the call to produce() should include the key 'schema.registry.url' with + the value of the Schema Server URL for fetching the schema definition + schema_version (str): the schema version to fetch from schema service, default is 'latest' + field_to_col_mapping (Dict[str, str]): a mapping from Avro field names in the schema to column names in + the Deephaven table. Any fields in the schema not present in the dict as keys are mapped to columns of the + same name. The default is None, meaning all schema fields are mapped to columns of the same name. + timestamp_field (str): the name of an extra timestamp field to be included in the produced Kafka message body, + it is used mostly for debugging slowdowns, default is None. + include_only_columns (List[str]): the list of column names in the source table to include in the generated + output, default is None. When not None, the 'exclude_columns' parameter must be None + exclude_columns (List[str]): the list of column names to exclude from the generated output (every other column + will be included), default is None. When not None, the 'include_only_columns' must be None + publish_schema (bool): when True, publish the given schema name to Schema Registry Server, according to an Avro + schema generated from the table definition, for the columns and fields implied by field_to_col_mapping, + include_only_columns, and exclude_columns; if a schema_version is provided and the resulting version after + publishing does not match, an exception results. The default is False. + schema_namespace (str): when 'publish_schema' is True, the namespace for the generated schema to be registered + in the Schema Registry Server. + column_properties (Dict[str, str]): when 'publish_schema' is True, specifies the properties of the columns + implying particular Avro type mappings for them. In particular, column X of BigDecimal type should specify + properties 'x.precision' and 'x.scale'. + + Returns: + a KeyValueSpec + + Raises: + DHError + """ + try: + field_to_col_mapping = j_hashmap(field_to_col_mapping) + column_properties = j_properties(column_properties) + include_only_columns = j_hashset(include_only_columns) + include_only_columns = _JKafkaTools.predicateFromSet(include_only_columns) + exclude_columns = j_hashset(exclude_columns) + exclude_columns = _JKafkaTools.predicateFromSet(exclude_columns) + + return KeyValueSpec(_JKafkaTools_Produce.avroSpec(schema, schema_version, field_to_col_mapping, timestamp_field, + include_only_columns, exclude_columns, publish_schema, + schema_namespace, column_properties)) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec.") from e + + +def json_spec(include_columns: List[str] = None, exclude_columns: List[str] = None, mapping: Dict[str, str] = None, + nested_delim: str = None, output_nulls: bool = False, timestamp_field: str = None) -> KeyValueSpec: + """ Creates a spec for how to generate JSON data when producing a Kafka stream from a Deephaven table. + + Because JSON is a nested structure, a Deephaven column can be specified to map to a top level JSON field or + a field nested inside another JSON object many levels deep, e.g. X.Y.Z.field. The parameter 'nested_delim' controls + how a JSON nested field name should be delimited in the mapping. + + Args: + include_columns (List[str]): the list of Deephaven column names to include in the JSON output as fields, + default is None, meaning all except the ones mentioned in the 'exclude_columns' argument . If not None, + the 'exclude_columns' must be None. + exclude_columns (List[str]): the list of Deephaven column names to omit in the JSON output as fields, default + is None, meaning no column is omitted. If not None, include_columns must be None. + mapping (Dict[str, str]): a mapping from column names to JSON field names. Any column name implied by earlier + arguments and not included as a key in the map implies a field of the same name. default is None, + meaning all columns will be mapped to JSON fields of the same name. + nested_delim (str): if nested JSON fields are desired, the field separator that is used for the field names + parameter, or None for no nesting (default). For instance, if a particular column should be mapped + to JSON field X nested inside field Y, the corresponding field name value for the column key + in the mapping dict can be the string "X.Y", in which case the value for nested_delim should be "." + output_nulls (bool): when False (default), do not output a field for null column values + timestamp_field (str): the name of an extra timestamp field to be included in the produced Kafka message body, + it is used mostly for debugging slowdowns, default is None. + + Returns: + a KeyValueSpec + + Raises: + DHError + """ + try: + if include_columns is not None and exclude_columns is not None: + raise ValueError("One of include_columns and exclude_columns must be None.") + exclude_columns = j_hashset(exclude_columns) + mapping = j_hashmap(mapping) + return KeyValueSpec( + _JKafkaTools_Produce.jsonSpec(include_columns, exclude_columns, mapping, nested_delim, output_nulls, + timestamp_field)) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec.") from e + + +def simple_spec(col_name: str) -> KeyValueSpec: + """ Creates a spec that defines a single column to be published as either the key or value of a Kafka message when + producing a Kafka stream from a Deephaven table. + + Args: + col_name (str): the Deephaven column name + + Returns: + a KeyValueSpec + + Raises: + DHError + """ + try: + return KeyValueSpec(_JKafkaTools_Produce.simpleSpec(col_name)) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec.") from e diff --git a/pyintegration/deephaven2/table.py b/pyintegration/deephaven2/table.py index 3924422e37b..146550e7d29 100644 --- a/pyintegration/deephaven2/table.py +++ b/pyintegration/deephaven2/table.py @@ -9,10 +9,11 @@ import jpy from deephaven2 import DHError, dtypes +from deephaven2._jcompat import j_array_list +from deephaven2._wrapper_abc import JObjectWrapper from deephaven2.agg import Aggregation from deephaven2.column import Column, ColumnType from deephaven2.constants import SortDirection -from deephaven2.dtypes import DType _JTableTools = jpy.get_type("io.deephaven.engine.util.TableTools") _JColumnName = jpy.get_type("io.deephaven.api.ColumnName") @@ -21,7 +22,7 @@ _JFilterOr = jpy.get_type("io.deephaven.api.filter.FilterOr") -class Table: +class Table(JObjectWrapper): """ A Table represents a Deephaven table. It allows applications to perform powerful Deephaven table operations. Note: It should not be instantiated directly by user code. Tables are mostly created by factory methods, @@ -75,11 +76,15 @@ def columns(self): for i in range(j_col_list.size()): j_col = j_col_list.get(i) self._schema.append(Column(name=j_col.getName(), - data_type=DType.from_jtype(j_col.getDataType()), - component_type=DType.from_jtype(j_col.getComponentType()), + data_type=dtypes.from_jtype(j_col.getDataType()), + component_type=dtypes.from_jtype(j_col.getComponentType()), column_type=ColumnType(j_col.getColumnType()))) return self._schema + @property + def j_object(self) -> jpy.JType: + return self.j_table + def to_string(self, num_rows: int = 10, cols: List[str] = []) -> str: """ Returns the first few rows of a table as a pipe-delimited string. @@ -546,7 +551,7 @@ def sort_column(col, dir_): try: if order: sort_columns = [sort_column(col, dir_) for col, dir_ in zip(order_by, order)] - j_sc_list = dtypes.j_array_list(sort_columns) + j_sc_list = j_array_list(sort_columns) return Table(j_table=self.j_table.sort(j_sc_list)) else: return Table(j_table=self.j_table.sort(*order_by)) @@ -996,7 +1001,7 @@ def agg_by(self, aggs: List[Aggregation], by: List[str]) -> Table: DHError """ try: - j_agg_list = dtypes.j_array_list([agg.j_agg for agg in aggs]) + j_agg_list = j_array_list([agg.j_agg for agg in aggs]) return Table(j_table=self.j_table.aggBy(j_agg_list, *by)) except Exception as e: raise DHError(e, "table agg_by operation failed.") from e diff --git a/pyintegration/setup.py b/pyintegration/setup.py index 66f06eaa9d4..80e13cda515 100644 --- a/pyintegration/setup.py +++ b/pyintegration/setup.py @@ -4,7 +4,7 @@ import os import pathlib from setuptools.extern import packaging -from setuptools import find_packages, setup +from setuptools import find_namespace_packages, setup # The directory containing this file HERE = pathlib.Path(__file__).parent @@ -29,7 +29,7 @@ def normalize_version(version): version=__normalized_version__, description='Deephaven Engine Python Package', long_description=README, - packages=find_packages(exclude=("tests",)), + packages=find_namespace_packages(exclude=("tests",)), url='https://deephaven.io/', author='Deephaven Data Labs', author_email='python@deephaven.io', diff --git a/pyintegration/tests/test_config.py b/pyintegration/tests/test_config.py new file mode 100644 index 00000000000..cb1a22775be --- /dev/null +++ b/pyintegration/tests/test_config.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +import os.path +import unittest + +from deephaven2.config import get_server_timezone, get_log_dir +from deephaven2.time import TimeZone +from tests.testbase import BaseTestCase + + +class ConfigTestCase(BaseTestCase): + + def test_get_log_dir(self): + log_dir = get_log_dir() + self.assertTrue(os.path.exists(log_dir)) + + def test_get_server_timezone(self): + tz = get_server_timezone() + self.assertIn(tz, [tz for tz in TimeZone]) + + +if __name__ == '__main__': + unittest.main() diff --git a/pyintegration/tests/test_dtypes.py b/pyintegration/tests/test_dtypes.py index 3bc7809379f..f25b9056444 100644 --- a/pyintegration/tests/test_dtypes.py +++ b/pyintegration/tests/test_dtypes.py @@ -6,7 +6,6 @@ import time import unittest -import numpy import numpy as np import pandas as pd @@ -40,6 +39,7 @@ def test_j_type(self): self.assertEqual(dtypes.char.j_type, jpy.get_type("char")) self.assertEqual(dtypes.int_.j_type, jpy.get_type("int")) self.assertEqual(dtypes.long.j_type, jpy.get_type("long")) + self.assertEqual(dtypes.int_.j_type, jpy.get_type("int")) self.assertEqual(dtypes.float_.j_type, jpy.get_type("float")) self.assertEqual(dtypes.double.j_type, jpy.get_type("double")) self.assertEqual(dtypes.string.j_type, jpy.get_type("java.lang.String")) @@ -61,23 +61,20 @@ def test_callable(self): big_decimal2 = dtypes.BigDecimal("12.88") self.assertIn("12.88", str(big_decimal2)) + j_string = dtypes.string("abc") + self.assertEqual(j_string.toString(), "abc") + def test_array(self): - j_array = dtypes.int_.array(5) - for i in range(5): - j_array[i] = i - np_array = numpy.frombuffer(j_array, numpy.int32) - self.assertTrue((np_array == numpy.array([0, 1, 2, 3, 4], dtype=numpy.int32)).all()) - - def test_array_from(self): - j_array = dtypes.int_.array_from(range(5)) - np_array = numpy.frombuffer(j_array, numpy.int32) - self.assertTrue((np_array == numpy.array([0, 1, 2, 3, 4], dtype=numpy.int32)).all()) - - j_array = dtypes.int_.array_from([0, 1, 2, 3, 4]) - np_array = numpy.frombuffer(j_array, numpy.int32) - self.assertTrue((np_array == numpy.array([0, 1, 2, 3, 4], dtype=numpy.int32)).all()) - - def test_integer_array_from(self): + j_array = dtypes.array(dtypes.int_, range(5)) + np_array = np.frombuffer(j_array, np.int32) + expected = np.array([0, 1, 2, 3, 4], dtype=np.int32) + self.assertTrue(np.array_equal(np_array, expected)) + + j_array = dtypes.array(dtypes.int64, [0, 1, 2, 3, 4]) + np_array = np.frombuffer(j_array, dtype=np.int64) + self.assertTrue(np.array_equal(np_array, expected)) + + def test_integer_array(self): np_array = np.array([float('nan'), NULL_DOUBLE, 1.123, np.inf], dtype=np.float64) nulls = {dtypes.int64: NULL_LONG, dtypes.int32: NULL_INT, dtypes.short: NULL_SHORT, dtypes.byte: NULL_BYTE} @@ -85,25 +82,25 @@ def test_integer_array_from(self): map_fn = functools.partial(remap_double, null_value=nv) with self.subTest(f"numpy double array to {dt}"): expected = [nv, nv, 1, nv] - j_array = dt.array_from(np_array, remap=map_fn) + j_array = dtypes.array(dt, np_array, remap=map_fn) py_array = [x for x in j_array] self.assertEqual(expected, py_array) with self.subTest("int array from Python list"): expected = [1, 2, 3] - j_array = dtypes.int32.array_from([1.1, 2.2, 3.3]) + j_array = dtypes.array(dtypes.int32, [1.1, 2.2, 3.3]) self.assertIn("[I", str(type(j_array))) py_array = [x for x in j_array] self.assertEqual(expected, py_array) with self.subTest("byte array from Python list, down cast"): expected = [1000, 2000, 3000] - j_array = dtypes.byte.array_from(expected) + j_array = dtypes.array(dtypes.byte, expected) self.assertIn("[B", str(type(j_array))) py_array = [x for x in j_array] self.assertNotEqual(expected, py_array) - def test_floating_array_from(self): + def test_floating_array(self): nulls = {dtypes.float_: NULL_FLOAT, dtypes.double: NULL_DOUBLE} @@ -112,7 +109,7 @@ def test_floating_array_from(self): map_fn = functools.partial(remap_double, null_value=nv) with self.subTest(f"numpy double array to {dt} with mapping"): expected = [nv, 1.7976931348623157e+300, nv, 1.1, nv] - j_array = dt.array_from(np_array, remap=map_fn) + j_array = dtypes.array(dt, np_array, remap=map_fn) py_array = [x for x in j_array] for i in range(4): # downcast from double to float results in inf when the value is outside of float range @@ -121,7 +118,7 @@ def test_floating_array_from(self): with self.subTest("double array from numpy array"): np_array = np.array([float('nan'), NULL_DOUBLE, 1.1, float('inf')], dtype=np.float64) pd_series = pd.Series(np_array) - j_array = dtypes.double.array_from(pd_series) + j_array = dtypes.array(dtypes.double, pd_series) py_array = [x for x in j_array] expected = [float('nan'), NULL_DOUBLE, 1.1, float('inf')] self.assertTrue(math.isnan(py_array[0])) @@ -130,7 +127,7 @@ def test_floating_array_from(self): with self.subTest("double array from numpy long array"): expected = [NULL_LONG, 1, 2, 3] np_array = np.array(expected, dtype=np.int64) - j_array = dtypes.float64.array_from(np_array) + j_array = dtypes.array(dtypes.float64, np_array) self.assertIn("[D", str(type(j_array))) py_array = [x for x in j_array] for i in range(4): @@ -138,12 +135,12 @@ def test_floating_array_from(self): with self.subTest("double array from Python list of integer"): expected = [NULL_LONG, 1, 2, 3] - j_array = dtypes.float64.array_from(expected) + j_array = dtypes.array(dtypes.float64, expected) py_array = [x for x in j_array] for i in range(3): self.assertAlmostEqual(expected[i], py_array[i]) - def test_char_array_from(self): + def test_char_array(self): def remap_char(v): if v is None: return NULL_CHAR @@ -159,14 +156,14 @@ def remap_char(v): return NULL_CHAR test_str = "abcdefg0123456" - j_array = dtypes.char.array_from(test_str) + j_array = dtypes.array(dtypes.char, test_str) self.assertIn("[C", str(type(j_array))) py_array = [chr(x) for x in j_array] self.assertEqual(test_str, "".join(py_array)) test_list = [None, "abc", {}, 69] expected = [NULL_CHAR, ord("a"), NULL_CHAR, ord("E")] - j_array = dtypes.char.array_from(test_list, remap=remap_char) + j_array = dtypes.array(dtypes.char, test_list, remap=remap_char) py_array = [x for x in j_array] self.assertIn("[C", str(type(j_array))) self.assertEqual(expected, py_array) @@ -175,7 +172,7 @@ def test_datetime(self): dt1 = DateTime(round(time.time())) dt2 = now() values = [dt1, dt2, None] - j_array = DateTime.array_from(values) + j_array = dtypes.array(DateTime, values) self.assertTrue(all(x == y for x, y in zip(j_array, values))) diff --git a/pyintegration/tests/test_kafka_consumer.py b/pyintegration/tests/test_kafka_consumer.py new file mode 100644 index 00000000000..c6b92bd090e --- /dev/null +++ b/pyintegration/tests/test_kafka_consumer.py @@ -0,0 +1,199 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +import os +import unittest + +from deephaven2 import kafka_consumer as ck +from deephaven2.stream.kafka.consumer import TableType, KeyValueSpec +from tests.testbase import BaseTestCase +from deephaven2 import dtypes + + +class KafkaConsumerTestCase(BaseTestCase): + + def _assert_common_cols(self, cols): + self.assertEqual("KafkaPartition", cols[0].name) + self.assertEqual(dtypes.int32, cols[0].data_type) + self.assertEqual("KafkaOffset", cols[1].name) + self.assertEqual(dtypes.long, cols[1].data_type) + self.assertEqual("KafkaTimestamp", cols[2].name) + self.assertEqual(dtypes.DateTime, cols[2].data_type) + + def test_basic_constants(self): + """ + Check that the basic constants are imported and visible. + """ + self.assertIsNotNone(ck.SEEK_TO_BEGINNING) + self.assertIsNotNone(ck.DONT_SEEK) + self.assertIsNotNone(ck.SEEK_TO_END) + self.assertIsNotNone(ck.ALL_PARTITIONS_SEEK_TO_BEGINNING) + self.assertIsNotNone(ck.ALL_PARTITIONS_SEEK_TO_END) + self.assertIsNotNone(ck.ALL_PARTITIONS_DONT_SEEK) + + def test_simple_spec(self): + """ + Check a simple Kafka subscription creates the right table. + """ + t = ck.consume( + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key_spec=KeyValueSpec.IGNORE, + value_spec=ck.simple_spec('Price', dtypes.double)) + + cols = t.columns + self.assertEqual(4, len(cols)) + self._assert_common_cols(cols) + self.assertEqual("Price", cols[3].name) + self.assertEqual(dtypes.double, cols[3].data_type) + + def test_json_spec(self): + """ + Check a JSON Kafka subscription creates the right table. + """ + + t = ck.consume( + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key_spec=KeyValueSpec.IGNORE, + value_spec=ck.json_spec( + [('Symbol', dtypes.string), + ('Side', dtypes.string), + ('Price', dtypes.double), + ('Qty', dtypes.int_), + ('Tstamp', dtypes.DateTime)], + mapping={ + 'jsymbol': 'Symbol', + 'jside': 'Side', + 'jprice': 'Price', + 'jqty': 'Qty', + 'jts': 'Tstamp' + } + ), + table_type=TableType.Append + ) + + cols = t.columns + self.assertEqual(8, len(cols)) + self._assert_common_cols(cols) + + self.assertEqual("Symbol", cols[3].name) + self.assertEqual(dtypes.string, cols[3].data_type) + self.assertEqual("Side", cols[4].name) + self.assertEqual(dtypes.string, cols[4].data_type) + self.assertEqual("Price", cols[5].name) + self.assertEqual(dtypes.double, cols[5].data_type) + self.assertEqual("Qty", cols[6].name) + self.assertEqual(dtypes.int_, cols[6].data_type) + self.assertEqual("Tstamp", cols[7].name) + self.assertEqual(dtypes.DateTime, cols[7].data_type) + + def test_avro_spec(self): + """ + Check an Avro Kafka subscription creates the right table. + """ + + schema = \ + """ + { "type" : "record", + "namespace" : "io.deephaven.examples", + "name" : "share_price", + "fields" : [ + { "name" : "Symbol", "type" : "string" }, + { "name" : "Side", "type" : "string" }, + { "name" : "Qty", "type" : "int" }, + { "name" : "Price", "type" : "double" } + ] + } + """ + + schema_str = '{ "schema" : "%s" }' % \ + schema.replace('\n', ' ').replace('"', '\\"') + + sys_str = \ + """ + curl -X POST \ + -H 'Content-type: application/vnd.schemaregistry.v1+json; artifactType=AVRO' \ + --data-binary '%s' \ + http://redpanda:8081/subjects/share_price_record/versions + """ % schema_str + + r = os.system(sys_str) + self.assertEqual(0, r) + + with self.subTest(msg='straight schema, no mapping'): + t = ck.consume( + { + 'bootstrap.servers': 'redpanda:29092', + 'schema.registry.url': 'http://redpanda:8081' + }, + 'share_price', + key_spec=KeyValueSpec.IGNORE, + value_spec=ck.avro_spec('share_price_record', schema_version='1'), + table_type=TableType.Append + ) + + cols = t.columns + self.assertEqual(7, len(cols)) + self._assert_common_cols(cols) + + self.assertEqual("Symbol", cols[3].name) + self.assertEqual(dtypes.string, cols[3].data_type) + self.assertEqual("Side", cols[4].name) + self.assertEqual(dtypes.string, cols[4].data_type) + self.assertEqual("Qty", cols[5].name) + self.assertEqual(dtypes.int_, cols[5].data_type) + self.assertEqual("Price", cols[6].name) + self.assertEqual(dtypes.double, cols[6].data_type) + + with self.subTest(msg='mapping_only (filter out some schema fields)'): + m = {'Symbol': 'Ticker', 'Price': 'Dollars'} + t = ck.consume( + { + 'bootstrap.servers': 'redpanda:29092', + 'schema.registry.url': 'http://redpanda:8081' + }, + 'share_price', + key_spec=KeyValueSpec.IGNORE, + value_spec=ck.avro_spec('share_price_record', mapping=m, mapped_only=True), + table_type=TableType.Append + ) + + cols = t.columns + self.assertEqual(5, len(cols)) + self._assert_common_cols(cols) + + self.assertEqual("Ticker", cols[3].name) + self.assertEqual(dtypes.string, cols[3].data_type) + self.assertEqual("Dollars", cols[4].name) + self.assertEqual(dtypes.double, cols[4].data_type) + + with self.subTest(msg='mapping (rename some fields)'): + m = {'Symbol': 'Ticker', 'Qty': 'Quantity'} + t = ck.consume( + { + 'bootstrap.servers': 'redpanda:29092', + 'schema.registry.url': 'http://redpanda:8081' + }, + 'share_price', + key_spec=KeyValueSpec.IGNORE, + value_spec=ck.avro_spec('share_price_record', mapping=m), + table_type=TableType.Append + ) + + cols = t.columns + self.assertEqual(7, len(cols)) + self._assert_common_cols(cols) + + self.assertEqual("Ticker", cols[3].name) + self.assertEqual(dtypes.string, cols[3].data_type) + self.assertEqual("Side", cols[4].name) + self.assertEqual(dtypes.string, cols[4].data_type) + self.assertEqual("Quantity", cols[5].name) + self.assertEqual(dtypes.int_, cols[5].data_type) + self.assertEqual("Price", cols[6].name) + self.assertEqual(dtypes.double, cols[6].data_type) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyintegration/tests/test_kafka_producer.py b/pyintegration/tests/test_kafka_producer.py new file mode 100644 index 00000000000..95837e55867 --- /dev/null +++ b/pyintegration/tests/test_kafka_producer.py @@ -0,0 +1,139 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +import os +import unittest + +from deephaven2 import kafka_producer as pk, new_table +from deephaven2.column import string_col, int_col, double_col +from deephaven2.stream.kafka.producer import KeyValueSpec +from tests.testbase import BaseTestCase + + +def table_helper(): + columns = [ + string_col('Symbol', ['MSFT', 'GOOG', 'AAPL', 'AAPL']), + string_col('Side', ['B', 'B', 'S', 'B']), + int_col('Qty', [200, 100, 300, 50]), + double_col('Price', [210.0, 310.5, 411.0, 411.5]) + ] + t = new_table(cols=columns) + return t + + +class KafkaProducerTestCase(BaseTestCase): + """ + Test cases for the deephaven.ConsumeKafka module (performed locally) - + """ + + def test_basic_constants(self): + """ + Check that the basic constants are imported and visible. + """ + self.assertIsNotNone(KeyValueSpec.IGNORE) + + def test_simple_spec(self): + """ + Check a simple Kafka subscription creates the right table. + """ + t = new_table(cols=[double_col('Price', [10.0, 10.5, 11.0, 11.5])]) + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key_spec=KeyValueSpec.IGNORE, + value_spec=pk.simple_spec('Price') + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_json_spec_only_columns(self): + t = table_helper() + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key_spec=KeyValueSpec.IGNORE, + value_spec=pk.json_spec(['Symbol', 'Price']), + last_by_key_columns=False + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_json_spec_all_arguments(self): + t = table_helper() + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key_spec=KeyValueSpec.IGNORE, + value_spec=pk.json_spec( + ['Symbol', 'Price'], + mapping={'Symbol': 'jSymbol', 'Price': 'jPrice'}, + timestamp_field='jTs' + ), + last_by_key_columns=False + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_avro_spec(self): + schema = \ + """ + { "type" : "record", + "namespace" : "io.deephaven.examples", + "name" : "share_price_timestamped", + "fields" : [ + { "name" : "Symbol", "type" : "string" }, + { "name" : "Side", "type" : "string" }, + { "name" : "Qty", "type" : "int" }, + { "name" : "Price", "type" : "double" }, + { "name" : "Timestamp", + "type" : { + "type" : "long", + "logicalType" : "timestamp-micros" + } + } + ] + } + """ + + schema_str = '{ "schema" : "%s" }' % \ + schema.replace('\n', ' ').replace('"', '\\"') + + sys_str = \ + """ + curl -X POST \ + -H 'Content-type: application/vnd.schemaregistry.v1+json; artifactType=AVRO' \ + --data-binary '%s' \ + http://redpanda:8081/subjects/share_price_timestamped_record/versions + """ % schema_str + + r = os.system(sys_str) + self.assertEqual(0, r) + + t = table_helper() + cleanup = pk.produce( + t, + { + 'bootstrap.servers': 'redpanda:29092', + 'schema.registry.url': 'http://redpanda:8081' + }, + 'share_price_timestamped', + key_spec=KeyValueSpec.IGNORE, + value_spec=pk.avro_spec( + 'share_price_timestamped_record', + timestamp_field='Timestamp' + ), + last_by_key_columns=False + ) + + self.assertIsNotNone(cleanup) + cleanup() + + +if __name__ == '__main__': + unittest.main() diff --git a/pyintegration/tests/test_parquet.py b/pyintegration/tests/test_parquet.py new file mode 100644 index 00000000000..d213c54f612 --- /dev/null +++ b/pyintegration/tests/test_parquet.py @@ -0,0 +1,146 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +import os +import shutil +import unittest +import tempfile + +from deephaven2 import empty_table, dtypes, new_table +from deephaven2.column import InputColumn +from deephaven2.parquet import write_table, write_tables, read_table, delete_table, ColumnInstruction + +from tests.testbase import BaseTestCase + + +class ParquetTestCase(BaseTestCase): + """ Test cases for the deephaven.ParquetTools module (performed locally) """ + + @classmethod + def setUpClass(cls): + # define a junk table workspace directory + cls.temp_dir = tempfile.TemporaryDirectory() + + @classmethod + def tearDownClass(cls): + cls.temp_dir.cleanup() + + def test_crd(self): + """ Test suite for reading, writing, and deleting a table to disk """ + + table = empty_table(3).update(formulas=["x=i", "y=(double)(i/10.0)", "z=(double)(i*i)"]) + definition = table.columns + base_dir = os.path.join(self.temp_dir.name, "testCreation") + file_location = os.path.join(base_dir, 'table1.parquet') + file_location2 = os.path.join(base_dir, 'table2.parquet') + + # make sure that the test workspace is clean + if os.path.exists(file_location): + shutil.rmtree(file_location) + if os.path.exists(file_location2): + shutil.rmtree(file_location2) + + # Writing + with self.subTest(msg="write_table(Table, str)"): + write_table(table, file_location) + self.assertTrue(os.path.exists(file_location)) + table2 = read_table(file_location) + self.assertEqual(table, table2) + shutil.rmtree(base_dir) + + with self.subTest(msg="write_tables(Table[], destinations, col_definitions"): + write_tables([table, table], [file_location, file_location2], definition) + self.assertTrue(os.path.exists(file_location)) + self.assertTrue(os.path.exists(file_location2)) + table2 = read_table(file_location) + self.assertEqual(table, table2) + + # Delete + with self.subTest(msg="delete(str)"): + if os.path.exists(file_location): + delete_table(file_location) + self.assertFalse(os.path.exists(file_location)) + if os.path.exists(file_location2): + delete_table(file_location2) + self.assertFalse(os.path.exists(file_location2)) + shutil.rmtree(base_dir) + + def test_crd_with_instructions(self): + """ Test suite for reading, writing, and deleting a table to disk """ + + table = empty_table(3).update(formulas=["x=i", "y=String.valueOf((double)(i/10.0))", "z=(double)(i*i)"]) + col_definitions = table.columns + base_dir = os.path.join(self.temp_dir.name, "testCreation") + file_location = os.path.join(base_dir, 'table1.parquet') + file_location2 = os.path.join(base_dir, 'table2.parquet') + + # make sure that the test workspace is clean + if os.path.exists(file_location): + shutil.rmtree(file_location) + if os.path.exists(file_location2): + shutil.rmtree(file_location2) + + # Writing + col_inst = ColumnInstruction(column_name="x", parquet_column_name="px") + col_inst1 = ColumnInstruction(column_name="y", parquet_column_name="py") + + with self.subTest(msg="write_table(Table, str, max_dictionary_keys)"): + write_table(table, file_location, max_dictionary_keys=10) + self.assertTrue(os.path.exists(file_location)) + shutil.rmtree(base_dir) + + with self.subTest(msg="write_table(Table, str, col_instructions, max_dictionary_keys)"): + write_table(table, file_location, col_instructions=[col_inst, col_inst1], max_dictionary_keys=10) + self.assertTrue(os.path.exists(file_location)) + shutil.rmtree(base_dir) + + with self.subTest(msg="write_tables(Table[], destinations, col_definitions, "): + write_tables([table, table], [file_location, file_location2], col_definitions, + col_instructions=[col_inst, col_inst1]) + self.assertTrue(os.path.exists(file_location)) + self.assertTrue(os.path.exists(file_location2)) + shutil.rmtree(base_dir) + + with self.subTest(msg="write_table(Table, destination, col_definitions, "): + write_table(table, file_location, col_instructions=[col_inst, col_inst1]) + # self.assertTrue(os.path.exists(file_location)) + + # Reading + with self.subTest(msg="read_table(str)"): + table2 = read_table(path=file_location, col_instructions=[col_inst, col_inst1]) + self.assertEqual(table, table2) + + # Delete + with self.subTest(msg="delete(str)"): + if os.path.exists(file_location): + delete_table(file_location) + self.assertFalse(os.path.exists(file_location)) + if os.path.exists(file_location2): + delete_table(file_location2) + self.assertFalse(os.path.exists(file_location2)) + shutil.rmtree(base_dir) + + def test_big_decimal(self): + j_type = dtypes.BigDecimal.j_type + big_decimal_list = [j_type.valueOf(301, 2), + j_type.valueOf(201, 2), + j_type.valueOf(101, 2)] + bd_col = InputColumn(name='decimal_value', data_type=dtypes.BigDecimal, input_data=big_decimal_list) + table = new_table([bd_col]) + self.assertIsNotNone(table) + base_dir = os.path.join(self.temp_dir.name, 'testCreation') + file_location = os.path.join(base_dir, 'table1.parquet') + if os.path.exists(file_location): + shutil.rmtree(file_location) + + write_table(table, file_location) + table2 = read_table(file_location) + self.assertEqual(table.size, table2.size) + self.assertEqual(table, table2) + + self.assertTrue(os.path.exists(file_location)) + shutil.rmtree(base_dir) + + +if __name__ == '__main__': + unittest.main()