From eef89e36bb2f70994892baeb5a70cbacb72041b7 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Mon, 10 Jan 2022 21:00:24 -0700 Subject: [PATCH] Hand wrapped KafkaTools, ParquetTools --- pyintegration/deephaven2/__init__.py | 20 +- .../deephaven2/{_utils => _init}/bootstrap.py | 5 + .../deephaven2/{_utils => _init}/start_jvm.py | 3 +- pyintegration/deephaven2/column.py | 5 + pyintegration/deephaven2/dtypes.py | 18 +- pyintegration/deephaven2/parquet.py | 206 +++++++++++++ .../deephaven2/stream/kafka/_utils.py | 47 +++ .../deephaven2/stream/kafka/consumer.py | 289 ++++++++++++++++++ .../deephaven2/stream/kafka/producer.py | 195 ++++++++++++ pyintegration/deephaven2/stream/utils.py | 28 ++ pyintegration/deephaven2/utils/__init__.py | 14 + pyintegration/setup.py | 4 +- pyintegration/tests/test_consume_kafka.py | 201 ++++++++++++ pyintegration/tests/test_parquet.py | 144 +++++++++ pyintegration/tests/test_produce_kafka.py | 138 +++++++++ pyintegration/tests/test_utils.py | 18 ++ 16 files changed, 1317 insertions(+), 18 deletions(-) rename pyintegration/deephaven2/{_utils => _init}/bootstrap.py (89%) rename pyintegration/deephaven2/{_utils => _init}/start_jvm.py (99%) create mode 100644 pyintegration/deephaven2/parquet.py create mode 100644 pyintegration/deephaven2/stream/kafka/_utils.py create mode 100644 pyintegration/deephaven2/stream/kafka/consumer.py create mode 100644 pyintegration/deephaven2/stream/kafka/producer.py create mode 100644 pyintegration/deephaven2/stream/utils.py create mode 100644 pyintegration/deephaven2/utils/__init__.py create mode 100644 pyintegration/tests/test_consume_kafka.py create mode 100644 pyintegration/tests/test_parquet.py create mode 100644 pyintegration/tests/test_produce_kafka.py create mode 100644 pyintegration/tests/test_utils.py diff --git a/pyintegration/deephaven2/__init__.py b/pyintegration/deephaven2/__init__.py index 227fd001cdc..b9f9e7614b2 100644 --- a/pyintegration/deephaven2/__init__.py +++ b/pyintegration/deephaven2/__init__.py @@ -9,13 +9,19 @@ __version__ = "0.9.0" -if not jpy.has_jvm(): - from ._utils.bootstrap import build_py_session +from .dherror import DHError +from ._init.bootstrap import build_py_session +try: build_py_session() +except Exception as e: + raise DHError(e, "deephaven initialization failed.") from e +else: + from deephaven2.constants import SortDirection + from deephaven2.csv import read as read_csv + from deephaven2.csv import write as write_csv + from deephaven2.table_factory import empty_table, time_table, merge, merge_sorted, new_table + from deephaven2.utils import get_workspace_root + import deephaven2.stream.kafka.consumer as kafka_consumer + import deephaven2.stream.kafka.producer as kafka_producer -from .dherror import DHError -from .constants import SortDirection -from .csv import read as read_csv -from .csv import write as write_csv -from .table_factory import empty_table, time_table, merge, merge_sorted, new_table diff --git a/pyintegration/deephaven2/_utils/bootstrap.py b/pyintegration/deephaven2/_init/bootstrap.py similarity index 89% rename from pyintegration/deephaven2/_utils/bootstrap.py rename to pyintegration/deephaven2/_init/bootstrap.py index a201a6a2fbf..bc127bb34f1 100644 --- a/pyintegration/deephaven2/_utils/bootstrap.py +++ b/pyintegration/deephaven2/_init/bootstrap.py @@ -1,6 +1,10 @@ # # Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending # +""" This module supports bootstrapping a Deephaven Python session from Python. +It is used mainly by Deephaven internally for running Python tests. +""" + import os import jpy @@ -14,6 +18,7 @@ def build_py_session(): + """ This function uses the default DH property file to initialize a Python session. """ if not jpy.has_jvm(): os.environ['JAVA_VERSION'] = '11' diff --git a/pyintegration/deephaven2/_utils/start_jvm.py b/pyintegration/deephaven2/_init/start_jvm.py similarity index 99% rename from pyintegration/deephaven2/_utils/start_jvm.py rename to pyintegration/deephaven2/_init/start_jvm.py index 64988aa75be..cd178eac460 100644 --- a/pyintegration/deephaven2/_utils/start_jvm.py +++ b/pyintegration/deephaven2/_init/start_jvm.py @@ -1,8 +1,7 @@ # # Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending # - -"""Utilities for starting the Deephaven JVM.""" +""" Utilities for starting the Deephaven JVM. """ import os import re diff --git a/pyintegration/deephaven2/column.py b/pyintegration/deephaven2/column.py index 52211b495fb..65c41f54349 100644 --- a/pyintegration/deephaven2/column.py +++ b/pyintegration/deephaven2/column.py @@ -13,6 +13,7 @@ _JColumnHeader = jpy.get_type("io.deephaven.qst.column.header.ColumnHeader") _JColumn = jpy.get_type("io.deephaven.qst.column.Column") +_JColumnDefinition = jpy.get_type("io.deephaven.engine.table.ColumnDefinition") class ColumnType(Enum): @@ -41,6 +42,10 @@ class Column: def j_column_header(self): return _JColumnHeader.of(self.name, self.data_type.qst_type) + @property + def j_column_definition(self): + return _JColumnDefinition.fromGenericType(self.name, self.data_type.qst_type.clazz(), self.component_type) + @dataclass class InputColumn(Column): diff --git a/pyintegration/deephaven2/dtypes.py b/pyintegration/deephaven2/dtypes.py index e61e5d00819..3307b5b3411 100644 --- a/pyintegration/deephaven2/dtypes.py +++ b/pyintegration/deephaven2/dtypes.py @@ -3,11 +3,11 @@ # """ This module defines the data types supported by the Deephaven engine. -Each data type is represented by a DType class which supports creating arrays of the same type and more. +Each data type is represented by a DType instance which supports creating arrays of the same type and more. """ from __future__ import annotations -from typing import Any, Sequence, Callable, Iterable +from typing import Iterable, Any, Sequence, Callable, Optional import jpy @@ -27,7 +27,7 @@ class DType: _j_name_type_map = {} @classmethod - def from_jtype(cls, j_class: Any) -> DType: + def from_jtype(cls, j_class: Any) -> Optional[DType]: if not j_class: return None @@ -50,10 +50,10 @@ def __repr__(self): return self.j_name def __call__(self, *args, **kwargs): - try: + if not self.is_primitive: return self.j_type(*args, **kwargs) - except Exception as e: - raise DHError(e, f"failed to create an instance of {self.j_name}") from e + else: + raise DHError(message="primitive types are not callable.") def array(self, size: int): """ Creates a Java array of the same data type of the specified size. @@ -137,10 +137,14 @@ def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None): def j_array_list(values: Iterable): - j_list = jpy.get_type("java.util.ArrayList")(len(values)) + j_list = jpy.get_type("java.util.ArrayList")() try: for v in values: j_list.add(v) return j_list except Exception as e: raise DHError(e, "failed to create a Java ArrayList from the Python collection.") from e + + +def is_java_type(obj): + return isinstance(obj, jpy.JType) diff --git a/pyintegration/deephaven2/parquet.py b/pyintegration/deephaven2/parquet.py new file mode 100644 index 00000000000..8ac553c43ae --- /dev/null +++ b/pyintegration/deephaven2/parquet.py @@ -0,0 +1,206 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +""" The Parquet integration module. """ +from dataclasses import dataclass +from typing import List + +import jpy + +from deephaven2 import DHError +from deephaven2.column import Column +from deephaven2.dtypes import is_java_type +from deephaven2.table import Table + +_JParquetTools = jpy.get_type("io.deephaven.parquet.table.ParquetTools") +_JFile = jpy.get_type("java.io.File") +_JCompressionCodecName = jpy.get_type("org.apache.parquet.hadoop.metadata.CompressionCodecName") +_JParquetInstructions = jpy.get_type("io.deephaven.parquet.table.ParquetInstructions") +_JTableDefinition = jpy.get_type("io.deephaven.engine.table.TableDefinition") + + +@dataclass +class ColumnInstruction: + column_name: str = None + parquet_column_name: str = None + codec_name: str = None + codec_args: str = None + use_dictionary: bool = False + + +def _build_parquet_instructions(col_instructions: List[ColumnInstruction] = None, compression_codec_name: str = None, + max_dictionary_keys: int = None, is_legacy_parquet: bool = False, + for_read: bool = True): + if not any([col_instructions, compression_codec_name, max_dictionary_keys, is_legacy_parquet]): + return None + + builder = _JParquetInstructions.builder() + if col_instructions is not None: + for ci in col_instructions: + if for_read and not ci.parquet_column_name: + raise ValueError("must specify the parquet column name for read.") + if not for_read and not ci.column_name: + raise ValueError("must specify the table column name for write.") + + builder.addColumnNameMapping(ci.column_name, ci.parquet_column_name) + if ci.column_name: + if ci.codec_name: + builder.addColumnCodec(ci.column_name, ci.codec_name, ci.codec_args) + builder.useDictionary(ci.column_name, ci.use_dictionary) + if compression_codec_name: + builder.setCompressionCodecName(compression_codec_name) + + if max_dictionary_keys is not None: + builder.setMaximumDictionaryKeys(max_dictionary_keys) + + if is_legacy_parquet: + builder.setIsLegacyParquet(True) + + return builder.build() + + +def read_table(path: str, col_instructions: List[ColumnInstruction] = None, is_legacy_parquet: bool = False) -> Table: + """ Reads in a table from a single parquet, metadata file, or directory with recognized layout. + + Args: + path (str): the file or directory to examine + col_instructions (List[ColumnInstruction]): instructions for customizations while reading + is_legacy_parquet (bool): if the parquet data is legacy + + Returns: + a table + + Raises: + DHError + """ + + read_instructions = _build_parquet_instructions(col_instructions=col_instructions, + is_legacy_parquet=is_legacy_parquet) + + try: + if read_instructions: + return Table(j_table=_JParquetTools.readTable(path, read_instructions)) + else: + return Table(j_table=_JParquetTools.readTable(path)) + except Exception as e: + raise DHError(e, "failed to read parquet data.") from e + + +def _get_file_object(arg): + """ Helper function for easily creating a java file object from a path string + + Args: + arg: path string, or list of path strings + + Returns: + a java File object, or java array of File objects + + Raises: + ValueError + """ + if is_java_type(arg): + return arg + elif isinstance(arg, str): + return _JFile(arg) + elif isinstance(arg, list): + # NB: map() returns an iterator in python 3, so list comprehension is appropriate here + return jpy.array("java.io.File", [_JFile(el) for el in arg]) + else: + raise ValueError("Method accepts only a java type, string, or list of strings as input. " + "Got {}".format(type(arg))) + + +def delete_table(path: str): + """ Deletes a Parquet table on disk. + + Args: + path (str): path to delete + + Raises: + DHError + """ + try: + _JParquetTools.deleteTable(_get_file_object(path)) + except Exception as e: + raise DHError(e, "failed to delate a parquet table on disk.") from e + + +def write_table(table: Table, destination: str, col_definitions: List[Column] = None, + col_instructions: List[ColumnInstruction] = None, compression_codec_name: str = None, + max_dictionary_keys: int = None): + """ Write a table to a Parquet file. + + Args: + table (Table): the source table + destination (str): destination file path; the file name should end in ".parquet" extension. If the path includes + non-existing directories they are created. If there is an error any intermediate directories previously + created are removed; note this makes this method unsafe for concurrent use + col_definitions (List[Column]): the column definitions to use + col_instructions (List[ColumnInstruction]): instructions for customizations while writing + compression_codec_name (str): the default compression codec to use, default is SNAPPY + max_dictionary_keys (int): the maximum dictionary keys allowed + + Raises: + DHError + """ + write_instructions = _build_parquet_instructions(col_instructions=col_instructions, + compression_codec_name=compression_codec_name, + max_dictionary_keys=max_dictionary_keys, + for_read=False) + + table_definition = None + if col_definitions is not None: + table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions]) + + try: + if table_definition: + if write_instructions: + _JParquetTools.writeTable(table.j_table, destination, table_definition, write_instructions) + else: + _JParquetTools.writeTable(table.j_table, _get_file_object(destination), table_definition) + else: + if write_instructions: + _JParquetTools.writeTable(table.j_table, _get_file_object(destination), write_instructions) + else: + _JParquetTools.writeTable(table.j_table, destination) + except Exception as e: + raise DHError(e, "failed to write to parquet data.") from e + + +def write_tables(tables: List[Table], destinations: List[str], col_definitions: List[Column], + col_instructions: List[ColumnInstruction] = None, compression_codec_name: str = None, + max_dictionary_keys: int = None, grouping_cols: List[str] = None): + """ Writes tables to disk in parquet format to a supplied set of destinations. If you specify grouping columns, + there must already be grouping information for those columns in the sources. This can be accomplished with + .groupBy().ungroup() or .sort(). + + Args: + tables (List[Table]): the source tables + destinations (List[str]): the destinations paths. Any non existing directories in the paths provided are + created. If there is an error any intermediate directories previously created are removed; note this makes + this method unsafe for concurrent use + col_definitions (List[Column]): the column definitions to use + col_instructions (List[ColumnInstruction]): instructions for customizations while writing + compression_codec_name (str): the compression codec to use, default is SNAPPY + max_dictionary_keys (int): the maximum dictionary keys allowed + grouping_cols (List[str]): the group column names + + Raises: + DHError + """ + write_instructions = _build_parquet_instructions(col_instructions=col_instructions, + compression_codec_name=compression_codec_name, + max_dictionary_keys=max_dictionary_keys, + for_read=False) + + table_definition = _JTableDefinition.of([col.j_column_definition for col in col_definitions]) + + try: + if grouping_cols: + _JParquetTools.writeParquetTables([t.j_table for t in tables], table_definition, write_instructions, + _get_file_object(destinations), grouping_cols) + else: + _JParquetTools.writeTables([t.j_table for t in tables], table_definition, + _get_file_object(destinations)) + except Exception as e: + raise DHError(e, "write multiple table to parquet data failed.") from e diff --git a/pyintegration/deephaven2/stream/kafka/_utils.py b/pyintegration/deephaven2/stream/kafka/_utils.py new file mode 100644 index 00000000000..4bb67b3d362 --- /dev/null +++ b/pyintegration/deephaven2/stream/kafka/_utils.py @@ -0,0 +1,47 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +""" The private utility module for kafka. """ +import jpy + +_JProperties = jpy.get_type("java.util.Properties") +_JHashMap = jpy.get_type("java.util.HashMap") +_JHashSet = jpy.get_type("java.util.HashSet") +_JPythonTools = jpy.get_type("io.deephaven.integrations.python.PythonTools") +IDENTITY = object() # Ensure IDENTITY is unique. + + +def _dict_to_j_properties(d): + r = _JProperties() + for key, value in d.items(): + if value is None: + value = '' + r.setProperty(key, value) + return r + + +def _dict_to_j_map(d): + if d is None: + return None + r = _JHashMap() + for key, value in d.items(): + if value is None: + value = '' + r.put(key, value) + return r + + +def _dict_to_j_func(dict_mapping, default_value): + java_map = _dict_to_j_map(dict_mapping) + if default_value is IDENTITY: + return _JPythonTools.functionFromMapWithIdentityDefaults(java_map) + return _JPythonTools.functionFromMapWithDefault(java_map, default_value) + + +def _seq_to_j_set(s): + if s is None: + return None + r = _JHashSet() + for v in s: + r.add(v) + return r diff --git a/pyintegration/deephaven2/stream/kafka/consumer.py b/pyintegration/deephaven2/stream/kafka/consumer.py new file mode 100644 index 00000000000..6d29041e2a9 --- /dev/null +++ b/pyintegration/deephaven2/stream/kafka/consumer.py @@ -0,0 +1,289 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +""" The Kafka consumer module. """ +import collections + +import jpy + +from deephaven2.column import Column +from deephaven2.dherror import DHError +from deephaven2.dtypes import DType +from deephaven2.stream.kafka._utils import _dict_to_j_properties, _dict_to_j_map, _dict_to_j_func, IDENTITY +from deephaven2.table import Table + +_JKafkaTools = jpy.get_type("io.deephaven.kafka.KafkaTools") +_JStreamTableTools = jpy.get_type("io.deephaven.engine.table.impl.StreamTableTools") +_JAvroSchema = jpy.get_type("org.apache.avro.Schema") +_JKafkaTools_Consume = jpy.get_type("io.deephaven.kafka.KafkaTools$Consume") +SEEK_TO_BEGINNING = getattr(_JKafkaTools, 'SEEK_TO_BEGINNING') +DONT_SEEK = getattr(_JKafkaTools, 'DONT_SEEK') +SEEK_TO_END = getattr(_JKafkaTools, 'SEEK_TO_END') +FROM_PROPERTIES = getattr(_JKafkaTools, 'FROM_PROPERTIES') +IGNORE = getattr(_JKafkaTools_Consume, 'IGNORE') +ALL_PARTITIONS = getattr(_JKafkaTools, 'ALL_PARTITIONS') +ALL_PARTITIONS_SEEK_TO_BEGINNING = getattr(_JKafkaTools, 'ALL_PARTITIONS_SEEK_TO_BEGINNING') +ALL_PARTITIONS_DONT_SEEK = getattr(_JKafkaTools, 'ALL_PARTITIONS_DONT_SEEK') +ALL_PARTITIONS_SEEK_TO_END = getattr(_JKafkaTools, 'ALL_PARTITIONS_SEEK_TO_END') + + +def _build_column_definition(col_name: str, data_type, component_type=None): + col = Column(name=col_name, data_type=data_type, component_type=component_type) + return col.j_column_definition + + +def _build_column_definitions(ts): + """ Convert a sequence of tuples of the form ('Price', double_type) + or ('Prices', double_array_type, double_type) TODO vector type, check with Cristian + to a list of ColumnDefinition objects. + + Args: + ts: a sequence of 2 or 3 element tuples of (str, type) or (str, type, type) specifying a column definition object. + + Returns: + a list of column definition objects. + """ + r = [] + for t in ts: + r.append(_build_column_definition(*t)) + return r + + +def consume( + kafka_config: dict, + topic: str, + partitions=None, + offsets=None, + key=None, + value=None, + table_type='stream' +): + """ Consume from Kafka to a Deephaven table. + + Args: + kafka_config: dictionary with properties to configure the associated kafka consumer and + also the resulting table. Once the table-specific properties are stripped, the result is + passed to the org.apache.kafka.clients.consumer.KafkaConsumer constructor; pass any + KafkaConsumer specific desired configuration here. + topic: the topic name + partitions: either a sequence of integer partition numbers or the predefined constant + ALL_PARTITIONS for all partitions. + offsets: either a dict mapping partition numbers to offset numbers, or one of the predefined constants + ALL_PARTITIONS_SEEK_TO_BEGINNING, ALL_PARTITIONS_SEEK_TO_END or ALL_PARTITIONS_DONT_SEEK. + If a dict, the values may be one of the predefined constants SEEK_TO_BEGINNING, SEEK_TO_END + or DONT_SEEK. + key: a specification for how to map the Key field in Kafka messages. This should be the result of calling one + of the methods simple, avro or json in this module, or None to obtain a single column specified in the + kafka _config param via the keys 'deephaven.key.column.name' for column name and 'deephaven.key.column.type' + for the column type; both should have string values associated to them. + value: a specification for how to map the Value field in Kafka messages. This should be the result of calling + one of the methods simple, avro or json in this module, or None to obtain a single column specified in the + kafka_config param via the keys 'deephaven.value.column.name' for column name and + 'deephaven.value.column.type' for the column type; both should have string values associated to them. + table_type: a string specifying the resulting table type: one of 'stream' (default), 'append', 'stream_map' + or 'append_map'. + + Returns: + a Deephaven live table that will update based on Kafka messages consumed for the given topic. + + Raises: + ValueError, TypeError, DHError + """ + + if not isinstance(topic, str): + raise ValueError("argument 'topic' has to be of str type, instead got " + topic) + + if partitions is None: + partitions = ALL_PARTITIONS + elif isinstance(partitions, collections.Sequence): + try: + jarr = jpy.array('int', partitions) + except Exception as e: + raise ValueError( + "when not one of the predefined constants, keyword argument 'partitions' has to " + + "represent a sequence of integer partition with values >= 0, instead got " + + str(partitions) + " of type " + type(partitions).__name__ + ) from e + partitions = _JKafkaTools.partitionFilterFromArray(jarr) + elif not isinstance(partitions, jpy.JType): + raise TypeError( + "argument 'partitions' has to be of str or sequence type, " + + "or a predefined compatible constant, instead got partitions " + + str(partitions) + " of type " + type(partitions).__name__) + + if offsets is None: + offsets = ALL_PARTITIONS_DONT_SEEK + elif isinstance(offsets, dict): + try: + partitions_array = jpy.array('int', list(offsets.keys())) + offsets_array = jpy.array('long', list(offsets.values())) + offsets = _JKafkaTools.partitionToOffsetFromParallelArrays(partitions_array, offsets_array) + except Exception as e: + raise ValueError( + "when of type dict, keyword argument 'offsets' has to map " + + "numeric partitions to either numeric offsets, or one of the constants { " + + "SEEK_TO_BEGINNING, DONT_SEEK, SEEK_TO_END }," + + "instead got offsets=" + str(offsets) + ) from e + elif not isinstance(offsets, jpy.JType): + raise TypeError( + "value " + str(offsets) + " of type " + type(offsets).__name__ + + " not recognized for argument 'offsets'; only str, dict like, or predefined constants allowed") + + if key is None: + key = FROM_PROPERTIES + if value is None: + value = FROM_PROPERTIES + if key is IGNORE and value is IGNORE: + raise ValueError( + "at least one argument for 'key' or 'value' must be different from IGNORE") + + if not isinstance(table_type, str): + raise TypeError( + "argument 'table_type' expected to be of type str, instead got " + + str(table_type) + " of type " + type(table_type).__name__) + table_type_enum = _JKafkaTools.friendlyNameToTableType(table_type) + if table_type_enum is None: + raise ValueError("unknown value " + table_type + " for argument 'table_type'") + + kafka_config = _dict_to_j_properties(kafka_config) + try: + return Table( + j_table=_JKafkaTools.consumeToTable(kafka_config, topic, partitions, offsets, key, value, table_type_enum)) + except Exception as e: + raise DHError(e, "failed to consume a Kafka stream.") from e + + +def avro(schema, schema_version: str = None, mapping: dict = None, mapping_only: dict = None): + """ Specify an Avro schema to use when consuming a Kafka stream to a Deephaven table. + + Args: + schema: either an Avro schema object or a string specifying a schema name for a schema + registered in a Confluent compatible Schema Server. When the latter is provided, the + associated kafka_config dict in the call to consumeToTable should include the key + 'schema.registry.url' with the associated value of the Schema Server URL for fetching the schema + definition. + schema_version: if a string schema name is provided, the version to fetch from schema + service; if not specified, a default of 'latest' is assumed. + mapping: a dict representing a string to string mapping from Avro field name to Deephaven table + column name; the fields mentioned in the mapping will have their column names defined by it; any other + fields not mentioned in the mapping with use the same Avro field name for Deephaven table column + name. Note that only one parameter between mapping and mapping_only can be provided. + mapping_only: a dict representing a string to string mapping from Avro field name to Deephaven + table column name; the fields mentioned in the mapping will have their column names defined by it; + any other fields not mentioned in the mapping will be ignored and will not be present in the resulting + table. Note that only one parameter between mapping and mapping_only can be provided. + + Returns: + a Kafka Key or Value spec object to use in a call to consumeToTable. + + Raises: + ValueError, TypeError, DHError + """ + if mapping is not None and mapping_only is not None: + raise Exception( + "only one argument between 'mapping' and " + + "'mapping_only' expected, instead got both") + if mapping is not None: + have_mapping = True + # when providing 'mapping', fields names not given are mapped as identity + mapping = _dict_to_j_func(mapping, default_value=IDENTITY) + elif mapping_only is not None: + have_mapping = True + # when providing 'mapping_only', fields not given are ignored. + mapping = _dict_to_j_func(mapping_only, default_value=None) + else: + have_mapping = False + if isinstance(schema, str): + have_actual_schema = False + if schema_version is None: + schema_version = "latest" + elif not isinstance(schema_version, str): + raise TypeError( + "argument 'schema_version' should be of str type, instead got " + + str(schema_version) + " of type " + type(schema_version).__name__) + elif isinstance(schema, _JAvroSchema): + have_actual_schema = True + if schema_version is not None: + raise Exception( + "argument 'schema_version' is only expected if schema is of str type") + else: + raise TypeError( + "'schema' argument expected to be of either " + + "str type or avro schema type, instead got " + str(schema)) + + try: + if have_mapping: + if have_actual_schema: + return _JKafkaTools_Consume.avroSpec(schema, mapping) + else: + return _JKafkaTools_Consume.avroSpec(schema, schema_version, mapping) + else: + if have_actual_schema: + return _JKafkaTools_Consume.avroSpec(schema) + else: + return _JKafkaTools_Consume.avroSpec(schema, schema_version) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec") from e + + +def json(col_defs, mapping: dict = None): + """ Specify how to use JSON data when consuming a Kafka stream to a Deephaven table. + + Args: + col_defs: a sequence of tuples specifying names and types for columns to be + created on the resulting Deephaven table. Tuples contain two elements, a + string for column name and a Deephaven type for column data type. + mapping: a dict mapping JSON field names to column names defined in the col_defs + argument. If not present or None, a 1:1 mapping between JSON fields and Deephaven + table column names is assumed. + Returns: + a Kafka Key or Value spec object to use in a call to consumeToTable. + + Raises: + ValueError, TypeError, DHError + """ + if not isinstance(col_defs, collections.abc.Sequence) or isinstance(col_defs, str): + raise TypeError( + "'col_defs' argument needs to be a sequence of tuples, instead got " + + str(col_defs) + " of type " + type(col_defs).__name__) + try: + col_defs = _build_column_definitions(col_defs) + except Exception as e: + raise Exception("could not create column definitions from " + str(col_defs)) from e + if mapping is None: + return _JKafkaTools_Consume.jsonSpec(col_defs) + if not isinstance(mapping, dict): + raise TypeError( + "argument 'mapping' is expected to be of dict type, " + + "instead got " + str(mapping) + " of type " + type(mapping).__name__) + mapping = _dict_to_j_map(mapping) + try: + return _JKafkaTools_Consume.jsonSpec(col_defs, mapping) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec") from e + + +def simple(column_name: str, data_type: DType = None): + """ Specify a single value when consuming a Kafka stream to a Deephaven table. + + Args: + column_name: a string specifying the Deephaven column name to use. + data_type: a Deephaven type specifying the column data type to use. + + Returns: + a Kafka Key or Value spec object to use in a call to consumeToTable. + + Raises: + TypeError, DHError + """ + if not isinstance(column_name, str): + raise TypeError( + "'column_name' argument needs to be of str type, instead got " + str(column_name)) + try: + if data_type is None: + return _JKafkaTools_Consume.simpleSpec(column_name) + return _JKafkaTools_Consume.simpleSpec(column_name, data_type.qst_type.clazz()) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec") from e + diff --git a/pyintegration/deephaven2/stream/kafka/producer.py b/pyintegration/deephaven2/stream/kafka/producer.py new file mode 100644 index 00000000000..09657a2298a --- /dev/null +++ b/pyintegration/deephaven2/stream/kafka/producer.py @@ -0,0 +1,195 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +""" The Kafka producer module. """ +import jpy + +from deephaven2 import DHError +from deephaven2.stream.kafka._utils import _dict_to_j_properties, _dict_to_j_map, _seq_to_j_set + +_JKafkaTools = jpy.get_type("io.deephaven.kafka.KafkaTools") +_JAvroSchema = jpy.get_type("org.apache.avro.Schema") +_JKafkaTools_Produce = jpy.get_type("io.deephaven.kafka.KafkaTools$Produce") +IGNORE = getattr(_JKafkaTools_Produce, 'IGNORE') + + +def produce( + table, + kafka_config: dict, + topic: str, + key, + value, + last_by_key_columns: bool = False +): + """ Produce to Kafka from a Deephaven table. + + Args: + table: a Deephaven table used as a source of rows to publish to Kafka. + kafka_config: dictionary with properties to configure the associated kafka producer. + topic: the topic name + key: a specification for how to map table column(s) to the Key field in produced + Kafka messages. This should be the result of calling one of the methods + simple, avro or json in this module, or the constant IGNORE. + value: a specification for how to map table column(s) to the Value field in produced + Kafka messages. This should be the result of calling one of the methods + simple, avro or json in this module, or the constant IGNORE. + last_by_key_columns: Whether to publish only the last record for each unique key. + Ignored if key is IGNORE. If key is not IGNORE and last_by_key_columns is false, + it is expected that table updates will not produce any row shifts; that is, the publisher + expects keyed tables to be streams, add-only, or aggregated. + + Returns: + a callback that, when invoked, stops publishing and cleans up subscriptions and resources. + Users should hold to this callback to ensure liveness for publishing for as long as this + publishing is desired, and once not desired anymore they should invoke it. + + Raises: + ValueError, TypeError, DHError + """ + + if not isinstance(topic, str): + raise ValueError("argument 'topic' has to be of str type, instead got " + topic) + + if key is None: + raise ValueError("argument 'key' is None") + if value is None: + raise ValueError("argument 'value' is None") + if key is IGNORE and value is IGNORE: + raise ValueError( + "at least one argument for 'key' or 'value' must be different from IGNORE") + + kafka_config = _dict_to_j_properties(kafka_config) + try: + runnable = _JKafkaTools.produceFromTable(table.j_table, kafka_config, topic, key, value, last_by_key_columns) + except Exception as e: + raise DHError(e, "failed to start producing Kafka messages.") from e + + def cleanup(): + runnable.run() + + return cleanup + + +def avro(schema, schema_version: str = None, field_to_col_mapping=None, timestamp_field=None): + """ Specify an Avro schema to use when producing a Kafka stream from a Deephaven table. + + Args: + schema: either an Avro schema object or a string specifying a schema name for a schema + registered in a Confluent compatible Schema Server. When the latter is provided, the + associated kafka_config dict in the call to consumeToTable should include the key + 'schema.registry.url' with the associated value of the Schema Server URL for fetching the schema + definition. + schema_version: if a string schema name is provided, the version to fetch from schema + service; if not specified, a default of 'latest' is assumed. + field_to_col_mapping: a dict mapping field names in the schema to column names in the Deephaven table. + Any fields in the schema not present in the dict as keys are mapped to columns of the same name. + If this argument is None, all schema fields are mapped to columns of the same name. + timestamp_field: a string for the name of an additional timestamp field to include, + or None for no such field. + + Returns: + a Kafka Key or Value spec object to use in a call to produceFromTable. + + Raises: + ValueError, TypeError, DHError + """ + if isinstance(schema, str): + have_actual_schema = False + if schema_version is None: + schema_version = "latest" + elif not isinstance(schema_version, str): + raise TypeError( + "argument 'schema_version' should be of str type, instead got " + + str(schema_version) + " of type " + type(schema_version).__name__) + elif isinstance(schema, _JAvroSchema): + have_actual_schema = True + if schema_version is not None: + raise Exception( + "argument 'schema_version' is only expected if schema is of str type") + else: + raise TypeError( + "'schema' argument expected to be of either " + + "str type or avro schema type, instead got " + str(schema)) + + if field_to_col_mapping is not None and not isinstance(field_to_col_mapping, dict): + raise TypeError( + "argument 'mapping' is expected to be of dict type, " + + "instead got " + str(field_to_col_mapping) + " of type " + type(field_to_col_mapping).__name__) + field_to_col_mapping = _dict_to_j_map(field_to_col_mapping) + try: + if have_actual_schema: + return _JKafkaTools_Produce.avroSpec(schema, field_to_col_mapping, timestamp_field) + return _JKafkaTools_Produce.avroSpec(schema, schema_version, field_to_col_mapping, timestamp_field) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec.") from e + + +def json(include_columns=None, + exclude_columns=None, + mapping=None, + nested_delim=None, + output_nulls=False, + timestamp_field=None): + """ Specify how to produce JSON data when producing a Kafka stream from a Deephaven table. + + Args: + include_columns: a sequence of Deephaven column names to include in the JSON output + as fields, or None to indicate all except the ones mentioned in the exclude_columns argument. + If include_columns is not None, exclude_columns should be None. + exclude_columns: a sequence of Deephaven column names to omit in the JSON output as fields. + If exclude_columns is not None, include_columns should be None. + mapping: a dict mapping column names to JSON field names. Any column name implied by earlier arguments + and not included as a key in the map implies a field of the same name; if this argument is None all + columns will be mapped to JSON fields of the same name. + nested_delim: if nested JSON fields are desired, the field separator that is used for the fieldNames + parameter, or None for no nesting (default). For instance, if a particular column should be mapped + to JSON field X nested inside field Y, the corresponding field name value for the column key + in the mapping dict can be the string "X.Y", in which case the value for nested_delim should be "." + output_nulls: if False (default), do not output a field for null column values. + timestamp_field: a string for the name of an additional timestamp field to include, or None for no such field. + + Returns: + a Kafka Key or Value spec object to use in a call to produceFromTable. + + Raises: + ValueError, TypeError, DHError + """ + if include_columns is not None and exclude_columns is not None: + raise Exception( + "Both include_columns (=" + str(include_columns) + + ") and exclude_columns (=" + str(exclude_columns) + + " are not None, at least one of them should be None." + ) + if mapping is not None and not isinstance(mapping, dict): + raise TypeError( + "argument 'mapping' is expected to be of dict type, " + + "instead got " + str(mapping) + " of type " + type(mapping).__name__) + exclude_columns = _seq_to_j_set(exclude_columns) + mapping = _dict_to_j_map(mapping) + try: + return _JKafkaTools_Produce.jsonSpec(include_columns, exclude_columns, mapping, nested_delim, output_nulls, + timestamp_field) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec.") from e + + +def simple(column_name: str): + """ Specify a single column when producing to a Kafka Key or Value field. + + Args: + column_name: a string specifying the Deephaven column name to use. + + Returns: + a Kafka Key or Value spec object to use in a call to produceFromTable. + + Raises: + TypeError, DHError + """ + if not isinstance(column_name, str): + raise TypeError( + "'column_name' argument needs to be of str type, instead got " + str(column_name)) + + try: + return _JKafkaTools_Produce.simpleSpec(column_name) + except Exception as e: + raise DHError(e, "failed to create a Kafka key/value spec.") from e diff --git a/pyintegration/deephaven2/stream/utils.py b/pyintegration/deephaven2/stream/utils.py new file mode 100644 index 00000000000..31a35238fbd --- /dev/null +++ b/pyintegration/deephaven2/stream/utils.py @@ -0,0 +1,28 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +""" Utility module for the stream subpackage. """ +import jpy + +from deephaven2 import DHError +from deephaven2.table import Table + +_JStreamTableTools = jpy.get_type("io.deephaven.engine.table.impl.StreamTableTools") + + +def stream_to_append_only(table: Table) -> Table: + """ Creates an 'append only' table from the stream table. + + Args: + table (Table): a stream table + + Returns: + an append-only table + + Raises: + DHError + """ + try: + return Table(j_table=_JStreamTableTools.streamToAppendOnlyTable(table.j_table)) + except Exception as e: + raise DHError(e, "failed to create an append-only table.") from e diff --git a/pyintegration/deephaven2/utils/__init__.py b/pyintegration/deephaven2/utils/__init__.py new file mode 100644 index 00000000000..a6caefcc3b9 --- /dev/null +++ b/pyintegration/deephaven2/utils/__init__.py @@ -0,0 +1,14 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +import jpy + + +_JDHConfig = jpy.get_type("io.deephaven.configuration.Configuration") + + +def get_workspace_root(): + """ + Helper function for extracting the root directory for the workspace configuration + """ + return _JDHConfig.getInstance().getWorkspacePath() diff --git a/pyintegration/setup.py b/pyintegration/setup.py index 66f06eaa9d4..80e13cda515 100644 --- a/pyintegration/setup.py +++ b/pyintegration/setup.py @@ -4,7 +4,7 @@ import os import pathlib from setuptools.extern import packaging -from setuptools import find_packages, setup +from setuptools import find_namespace_packages, setup # The directory containing this file HERE = pathlib.Path(__file__).parent @@ -29,7 +29,7 @@ def normalize_version(version): version=__normalized_version__, description='Deephaven Engine Python Package', long_description=README, - packages=find_packages(exclude=("tests",)), + packages=find_namespace_packages(exclude=("tests",)), url='https://deephaven.io/', author='Deephaven Data Labs', author_email='python@deephaven.io', diff --git a/pyintegration/tests/test_consume_kafka.py b/pyintegration/tests/test_consume_kafka.py new file mode 100644 index 00000000000..15939e7fd30 --- /dev/null +++ b/pyintegration/tests/test_consume_kafka.py @@ -0,0 +1,201 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +import os +import unittest + +from deephaven2 import kafka_consumer as ck +from tests.testbase import BaseTestCase +from deephaven2 import dtypes + + +class KafkaConsumerTestCase(BaseTestCase): + + def _assert_common_cols(self, cols): + self.assertEquals("KafkaPartition", cols[0].name) + self.assertEquals(dtypes.int32, cols[0].data_type) + self.assertEquals("KafkaOffset", cols[1].name) + self.assertEquals(dtypes.long, cols[1].data_type) + self.assertEquals("KafkaTimestamp", cols[2].name) + self.assertEquals(dtypes.DateTime, cols[2].data_type) + + def test_basic_constants(self): + """ + Check that the basic constants are imported and visible. + """ + self.assertIsNotNone(ck.SEEK_TO_BEGINNING) + self.assertIsNotNone(ck.DONT_SEEK) + self.assertIsNotNone(ck.SEEK_TO_END) + self.assertIsNotNone(ck.FROM_PROPERTIES) + self.assertIsNotNone(ck.IGNORE) + self.assertIsNotNone(ck.ALL_PARTITIONS) + self.assertIsNotNone(ck.ALL_PARTITIONS_SEEK_TO_BEGINNING) + self.assertIsNotNone(ck.ALL_PARTITIONS_SEEK_TO_END) + self.assertIsNotNone(ck.ALL_PARTITIONS_DONT_SEEK) + + def test_simple(self): + """ + Check a simple Kafka subscription creates the right table. + """ + t = ck.consume( + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key=ck.IGNORE, + value=ck.simple('Price', dtypes.double)) + + cols = t.columns + self.assertEquals(4, len(cols)) + self._assert_common_cols(cols) + self.assertEquals("Price", cols[3].name) + self.assertEquals(dtypes.double, cols[3].data_type) + + def test_json(self): + """ + Check a JSON Kafka subscription creates the right table. + """ + + t = ck.consume( + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key=ck.IGNORE, + value=ck.json( + [('Symbol', dtypes.string), + ('Side', dtypes.string), + ('Price', dtypes.double), + ('Qty', dtypes.int_), + ('Tstamp', dtypes.DateTime)], + mapping={ + 'jsymbol': 'Symbol', + 'jside': 'Side', + 'jprice': 'Price', + 'jqty': 'Qty', + 'jts': 'Tstamp' + } + ), + table_type='append' + ) + + cols = t.columns + self.assertEquals(8, len(cols)) + self._assert_common_cols(cols) + + self.assertEquals("Symbol", cols[3].name) + self.assertEquals(dtypes.string, cols[3].data_type) + self.assertEquals("Side", cols[4].name) + self.assertEquals(dtypes.string, cols[4].data_type) + self.assertEquals("Price", cols[5].name) + self.assertEquals(dtypes.double, cols[5].data_type) + self.assertEquals("Qty", cols[6].name) + self.assertEquals(dtypes.int_, cols[6].data_type) + self.assertEquals("Tstamp", cols[7].name) + self.assertEquals(dtypes.DateTime, cols[7].data_type) + + def test_avro(self): + """ + Check an Avro Kafka subscription creates the right table. + """ + + schema = \ + """ + { "type" : "record", + "namespace" : "io.deephaven.examples", + "name" : "share_price", + "fields" : [ + { "name" : "Symbol", "type" : "string" }, + { "name" : "Side", "type" : "string" }, + { "name" : "Qty", "type" : "int" }, + { "name" : "Price", "type" : "double" } + ] + } + """ + + schema_str = '{ "schema" : "%s" }' % \ + schema.replace('\n', ' ').replace('"', '\\"') + + sys_str = \ + """ + curl -X POST \ + -H 'Content-type: application/vnd.schemaregistry.v1+json; artifactType=AVRO' \ + --data-binary '%s' \ + http://redpanda:8081/subjects/share_price_record/versions + """ % schema_str + + r = os.system(sys_str) + self.assertEqual(0, r) + + with self.subTest(msg='straight schema, no mapping'): + t = ck.consume( + { + 'bootstrap.servers': 'redpanda:29092', + 'schema.registry.url': 'http://redpanda:8081' + }, + 'share_price', + key=ck.IGNORE, + value=ck.avro('share_price_record', schema_version='1'), + table_type='append' + ) + + cols = t.columns + self.assertEquals(7, len(cols)) + self._assert_common_cols(cols) + + self.assertEquals("Symbol", cols[3].name) + self.assertEquals(dtypes.string, cols[3].data_type) + self.assertEquals("Side", cols[4].name) + self.assertEquals(dtypes.string, cols[4].data_type) + self.assertEquals("Qty", cols[5].name) + self.assertEquals(dtypes.int_, cols[5].data_type) + self.assertEquals("Price", cols[6].name) + self.assertEquals(dtypes.double, cols[6].data_type) + + with self.subTest(msg='mapping_only (filter out some schema fields)'): + m = {'Symbol': 'Ticker', 'Price': 'Dollars'} + t = ck.consume( + { + 'bootstrap.servers': 'redpanda:29092', + 'schema.registry.url': 'http://redpanda:8081' + }, + 'share_price', + key=ck.IGNORE, + value=ck.avro('share_price_record', mapping_only=m), + table_type='append' + ) + + cols = t.columns + self.assertEquals(5, len(cols)) + self._assert_common_cols(cols) + + self.assertEquals("Ticker", cols[3].name) + self.assertEquals(dtypes.string, cols[3].data_type) + self.assertEquals("Dollars", cols[4].name) + self.assertEquals(dtypes.double, cols[4].data_type) + + with self.subTest(msg='mapping (rename some fields)'): + m = {'Symbol': 'Ticker', 'Qty': 'Quantity'} + t = ck.consume( + { + 'bootstrap.servers': 'redpanda:29092', + 'schema.registry.url': 'http://redpanda:8081' + }, + 'share_price', + key=ck.IGNORE, + value=ck.avro('share_price_record', mapping=m), + table_type='append' + ) + + cols = t.columns + self.assertEquals(7, len(cols)) + self._assert_common_cols(cols) + + self.assertEquals("Ticker", cols[3].name) + self.assertEquals(dtypes.string, cols[3].data_type) + self.assertEquals("Side", cols[4].name) + self.assertEquals(dtypes.string, cols[4].data_type) + self.assertEquals("Quantity", cols[5].name) + self.assertEquals(dtypes.int_, cols[5].data_type) + self.assertEquals("Price", cols[6].name) + self.assertEquals(dtypes.double, cols[6].data_type) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyintegration/tests/test_parquet.py b/pyintegration/tests/test_parquet.py new file mode 100644 index 00000000000..09e7c9aaec0 --- /dev/null +++ b/pyintegration/tests/test_parquet.py @@ -0,0 +1,144 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +import os +import shutil +import unittest + +from deephaven2 import empty_table, dtypes, new_table +from deephaven2.column import InputColumn +from deephaven2.parquet import write_table, write_tables, read_table, delete_table, ColumnInstruction + +from deephaven2.utils import get_workspace_root +from tests.testbase import BaseTestCase + + +class ParquetTestCase(BaseTestCase): + """ Test cases for the deephaven.ParquetTools module (performed locally) """ + root_dir = "" + + @classmethod + def setUpClass(cls): + # define a junk table workspace directory + cls.root_dir = os.path.join(get_workspace_root(), 'TestParquetTools') + + def test_crd(self): + """ Test suite for reading, writing, and deleting a table to disk """ + + table = empty_table(3).update(formulas=["x=i", "y=(double)(i/10.0)", "z=(double)(i*i)"]) + definition = table.columns + base_dir = os.path.join(self.root_dir, "testCreation") + file_location = os.path.join(base_dir, 'table1.parquet') + file_location2 = os.path.join(base_dir, 'table2.parquet') + + # make sure that the test workspace is clean + if os.path.exists(file_location): + shutil.rmtree(file_location) + if os.path.exists(file_location2): + shutil.rmtree(file_location2) + + # Writing + with self.subTest(msg="write_table(Table, str)"): + write_table(table, file_location) + self.assertTrue(os.path.exists(file_location)) + shutil.rmtree(base_dir) + with self.subTest(msg="write_tables(Table[], destinations, col_definitions"): + write_tables([table, table], [file_location, file_location2], definition) + self.assertTrue(os.path.exists(file_location)) + self.assertTrue(os.path.exists(file_location2)) + + # Reading + with self.subTest(msg="read_table(str)"): + table2 = read_table(file_location) + + # Delete + with self.subTest(msg="delete(str)"): + if os.path.exists(file_location): + delete_table(file_location) + self.assertFalse(os.path.exists(file_location)) + if os.path.exists(file_location2): + delete_table(file_location2) + self.assertFalse(os.path.exists(file_location2)) + shutil.rmtree(base_dir) + + def test_crd_with_instructions(self): + """ Test suite for reading, writing, and deleting a table to disk """ + + table = empty_table(3).update(formulas=["x=i", "y=String.valueOf((double)(i/10.0))", "z=(double)(i*i)"]) + col_definitions = table.columns + base_dir = os.path.join(self.root_dir, "testCreation") + file_location = os.path.join(base_dir, 'table1.parquet') + file_location2 = os.path.join(base_dir, 'table2.parquet') + + # make sure that the test workspace is clean + if os.path.exists(file_location): + shutil.rmtree(file_location) + if os.path.exists(file_location2): + shutil.rmtree(file_location2) + + # Writing + col_inst = ColumnInstruction(column_name="x", parquet_column_name="px") + col_inst1 = ColumnInstruction(column_name="y", parquet_column_name="py", codec_name="LZ4") + + with self.subTest(msg="write_table(Table, str, max_dictionary_keys)"): + write_table(table, file_location, max_dictionary_keys=10) + self.assertTrue(os.path.exists(file_location)) + shutil.rmtree(base_dir) + with self.subTest(msg="write_table(Table, str, col_instructions, max_dictionary_keys)"): + write_table(table, file_location, col_instructions=[col_inst, col_inst1], max_dictionary_keys=10) + self.assertTrue(os.path.exists(file_location)) + shutil.rmtree(base_dir) + with self.subTest(msg="write_tables(Table[], destinations, col_definitions, "): + write_tables([table, table], [file_location, file_location2], col_definitions, + col_instructions=[col_inst, col_inst1]) + self.assertTrue(os.path.exists(file_location)) + self.assertTrue(os.path.exists(file_location2)) + + # Reading + with self.subTest(msg="read_table(str)"): + table2 = read_table(path=file_location, col_instructions=[col_inst, col_inst1]) + + # Delete + with self.subTest(msg="delete(str)"): + if os.path.exists(file_location): + delete_table(file_location) + self.assertFalse(os.path.exists(file_location)) + if os.path.exists(file_location2): + delete_table(file_location2) + self.assertFalse(os.path.exists(file_location2)) + shutil.rmtree(base_dir) + + def test_big_decimal(self): + j_type = dtypes.BigDecimal.j_type + big_decimal_list = [j_type.valueOf(301, 2), + j_type.valueOf(201, 2), + j_type.valueOf(101, 2)] + bd_col = InputColumn(name='decimal_value', data_type=dtypes.BigDecimal, input_data=big_decimal_list) + table = new_table([bd_col]) + self.assertIsNotNone(table) + base_dir = os.path.join(self.root_dir, 'testCreation') + file_location = os.path.join(base_dir, 'table1.parquet') + if os.path.exists(file_location): + shutil.rmtree(file_location) + + write_table(table, file_location) + table2 = read_table(file_location) + self.assertEqual(table.size, table2.size) + self.assertEqual(table, table2) + + self.assertTrue(os.path.exists(file_location)) + shutil.rmtree(base_dir) + + @classmethod + def tearDownClass(cls): + # remove the junk definitions created in the tests, if they exist... + if os.path.exists(cls.root_dir): + try: + shutil.rmtree(cls.root_dir) + except Exception as e: + print("Tried removing directory {}, but failed with error {}. " + "Manual clean-up may be necessary".format(cls.root_dir, e)) + + +if __name__ == '__main__': + unittest.main() diff --git a/pyintegration/tests/test_produce_kafka.py b/pyintegration/tests/test_produce_kafka.py new file mode 100644 index 00000000000..a9d226b79bd --- /dev/null +++ b/pyintegration/tests/test_produce_kafka.py @@ -0,0 +1,138 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# +import os +import unittest + +from deephaven2 import kafka_producer as pk, new_table +from deephaven2.column import string_col, int_col, double_col +from tests.testbase import BaseTestCase + + +def table_helper(): + columns = [ + string_col('Symbol', ['MSFT', 'GOOG', 'AAPL', 'AAPL']), + string_col('Side', ['B', 'B', 'S', 'B']), + int_col('Qty', [200, 100, 300, 50]), + double_col('Price', [210.0, 310.5, 411.0, 411.5]) + ] + t = new_table(cols=columns) + return t + + +class KafkaProducerTestCase(BaseTestCase): + """ + Test cases for the deephaven.ConsumeKafka module (performed locally) - + """ + + def test_basic_constants(self): + """ + Check that the basic constants are imported and visible. + """ + self.assertIsNotNone(pk.IGNORE) + + def test_simple(self): + """ + Check a simple Kafka subscription creates the right table. + """ + t = new_table(cols=[double_col('Price', [10.0, 10.5, 11.0, 11.5])]) + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key=pk.IGNORE, + value=pk.simple('Price') + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_json_only_columns(self): + t = table_helper() + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key=pk.IGNORE, + value=pk.json(['Symbol', 'Price']), + last_by_key_columns=False + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_json_all_arguments(self): + t = table_helper() + cleanup = pk.produce( + t, + {'bootstrap.servers': 'redpanda:29092'}, + 'orders', + key=pk.IGNORE, + value=pk.json( + ['Symbol', 'Price'], + mapping={'Symbol': 'jSymbol', 'Price': 'jPrice'}, + timestamp_field='jTs' + ), + last_by_key_columns=False + ) + + self.assertIsNotNone(cleanup) + cleanup() + + def test_avro(self): + schema = \ + """ + { "type" : "record", + "namespace" : "io.deephaven.examples", + "name" : "share_price_timestamped", + "fields" : [ + { "name" : "Symbol", "type" : "string" }, + { "name" : "Side", "type" : "string" }, + { "name" : "Qty", "type" : "int" }, + { "name" : "Price", "type" : "double" }, + { "name" : "Timestamp", + "type" : { + "type" : "long", + "logicalType" : "timestamp-micros" + } + } + ] + } + """ + + schema_str = '{ "schema" : "%s" }' % \ + schema.replace('\n', ' ').replace('"', '\\"') + + sys_str = \ + """ + curl -X POST \ + -H 'Content-type: application/vnd.schemaregistry.v1+json; artifactType=AVRO' \ + --data-binary '%s' \ + http://redpanda:8081/subjects/share_price_timestamped_record/versions + """ % schema_str + + r = os.system(sys_str) + self.assertEquals(0, r) + + t = table_helper() + cleanup = pk.produce( + t, + { + 'bootstrap.servers': 'redpanda:29092', + 'schema.registry.url': 'http://redpanda:8081' + }, + 'share_price_timestamped', + key=pk.IGNORE, + value=pk.avro( + 'share_price_timestamped_record', + timestamp_field='Timestamp' + ), + last_by_key_columns=False + ) + + self.assertIsNotNone(cleanup) + cleanup() + + +if __name__ == '__main__': + unittest.main() diff --git a/pyintegration/tests/test_utils.py b/pyintegration/tests/test_utils.py new file mode 100644 index 00000000000..ef907bf7a8f --- /dev/null +++ b/pyintegration/tests/test_utils.py @@ -0,0 +1,18 @@ +# +# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending +# + +import unittest + +from deephaven2 import get_workspace_root +from tests.testbase import BaseTestCase + + +class UtilsTestCase(BaseTestCase): + def test_get_workspace_root(self): + root_path = get_workspace_root() + self.assertNotEqual("", root_path) + + +if __name__ == '__main__': + unittest.main()