Skip to content

Commit

Permalink
Hand wrapped KafkaTools, ParquetTools, fixes #1643, #1546, #1892 (#1825)
Browse files Browse the repository at this point in the history
 Hand wrapped KafkaTools, ParquetTools
  • Loading branch information
jmao-denver authored Feb 16, 2022
1 parent 701e8e3 commit 003f272
Show file tree
Hide file tree
Showing 21 changed files with 1,414 additions and 146 deletions.
2 changes: 1 addition & 1 deletion Integrations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Closure composeConfig = { task ->

def pyTest = runInDocker("test-py-37", 'python', ['python3', '-m', 'xmlrunner', 'discover', '-v', '-o', '/out/report'], composeConfig)
def pyFunctionalTest = runInDocker('py-functional-test', 'python', ['/bin/bash', 'run-functional-tests.sh'])
def pyTest2 = runInDocker('test-py-deephaven2', '../pyintegration', ['python3', '-m', 'xmlrunner', 'discover', 'tests', '-v', '-o', '/out/report'])
def pyTest2 = runInDocker('test-py-deephaven2', '../pyintegration', ['python3', '-m', 'xmlrunner', 'discover', 'tests', '-v', '-o', '/out/report'], composeConfig)
pyTest.configure({
onlyIf { TestTools.shouldRunTests(project) }
})
Expand Down
22 changes: 13 additions & 9 deletions pyintegration/deephaven2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
"""Deephaven Python Integration Package provides the ability to access the Deephaven's query engine natively and thus
unlocks the unique and tremendous power of Deephaven to the Python community.
unlocks the unique power of Deephaven to the Python community.
"""
import jpy

__version__ = "0.9.0"

if not jpy.has_jvm():
from ._utils.bootstrap import build_py_session
from ._init.bootstrap import build_py_session
from .dherror import DHError

try:
build_py_session()
except Exception as e:
raise DHError(e, "deephaven initialization failed.") from e
else:
from .constants import SortDirection
from .csv import read as read_csv
from .csv import write as write_csv
from .table_factory import empty_table, time_table, merge, merge_sorted, new_table
from .stream.kafka import consumer as kafka_consumer
from .stream.kafka import producer as kafka_producer

from .dherror import DHError
from .constants import SortDirection
from .csv import read as read_csv
from .csv import write as write_csv
from .table_factory import empty_table, time_table, merge, merge_sorted, new_table
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
""" This module supports bootstrapping a Deephaven Python Script session from Python."""

import os

import jpy
Expand All @@ -14,6 +16,8 @@


def build_py_session():
""" This function uses the default DH property file to embed the Deephaven server and starts a Deephaven Python
Script session. """
if not jpy.has_jvm():
os.environ['JAVA_VERSION'] = '11'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#

"""Utilities for starting the Deephaven JVM."""
""" Utilities for starting the Deephaven JVM. """

import os
import re
Expand Down
59 changes: 59 additions & 0 deletions pyintegration/deephaven2/_jcompat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
""" This module provides Java compatibility support including convenience functions to create some widely used Java
data structures from corresponding Python ones in order to be able to call Java methods. """
from typing import Any, Iterable, Dict, Set

import jpy


def is_java_type(obj: Any) -> bool:
""" Returns True if the object is originated in Java. """
return isinstance(obj, jpy.JType)


def j_array_list(values: Iterable = None) -> jpy.JType:
""" Creates a Java ArrayList instance from an iterable. """
if values is None:
return None
r = jpy.get_type("java.util.ArrayList")(len(list(values)))
for v in values:
r.add(v)
return r


def j_hashmap(d: Dict = None) -> jpy.JType:
""" Creates a Java HashMap from a dict. """
if d is None:
return None

r = jpy.get_type("java.util.HashMap")()
for key, value in d.items():
if value is None:
value = ''
r.put(key, value)
return r


def j_hashset(s: Set = None) -> jpy.JType:
""" Creates a Java HashSet from a set. """
if s is None:
return None

r = jpy.get_type("java.util.HashSet")()
for v in s:
r.add(v)
return r


def j_properties(d: Dict = None) -> jpy.JType:
""" Creates a Java Properties from a dict. """
if d is None:
return None
r = jpy.get_type("java.util.Properties")()
for key, value in d.items():
if value is None:
value = ''
r.setProperty(key, value)
return r
17 changes: 17 additions & 0 deletions pyintegration/deephaven2/_wrapper_abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
""" This module defines an Abstract Class for Java object wrappers.
The primary purpose of this ABC is to enable downstream code to retrieve the wrapped Java objects in a uniform way.
"""
from abc import ABC, abstractmethod

import jpy


class JObjectWrapper(ABC):
@property
@abstractmethod
def j_object(self) -> jpy.JType:
...
13 changes: 9 additions & 4 deletions pyintegration/deephaven2/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
#
from dataclasses import dataclass, field
from enum import Enum
from typing import Sequence, Any
from typing import Sequence

import jpy
from deephaven2.dtypes import DType

import deephaven2.dtypes as dtypes
from deephaven2 import DHError
from deephaven2.dtypes import DType

_JColumnHeader = jpy.get_type("io.deephaven.qst.column.header.ColumnHeader")
_JColumn = jpy.get_type("io.deephaven.qst.column.Column")
_JColumnDefinition = jpy.get_type("io.deephaven.engine.table.ColumnDefinition")


class ColumnType(Enum):
Expand Down Expand Up @@ -41,6 +42,10 @@ class Column:
def j_column_header(self):
return _JColumnHeader.of(self.name, self.data_type.qst_type)

@property
def j_column_definition(self):
return _JColumnDefinition.fromGenericType(self.name, self.data_type.qst_type.clazz(), self.component_type)


@dataclass
class InputColumn(Column):
Expand All @@ -53,9 +58,9 @@ def __post_init__(self):
self.j_column = _JColumn.empty(self.j_column_header)
else:
if self.data_type.is_primitive:
self.j_column = _JColumn.ofUnsafe(self.name, self.data_type.array_from(self.input_data))
self.j_column = _JColumn.ofUnsafe(self.name, dtypes.array(self.data_type, self.input_data))
else:
self.j_column = _JColumn.of(self.j_column_header, self.data_type.array_from(self.input_data))
self.j_column = _JColumn.of(self.j_column_header, dtypes.array(self.data_type, self.input_data))
except Exception as e:
raise DHError(e, "failed to create an InputColumn.") from e

Expand Down
31 changes: 31 additions & 0 deletions pyintegration/deephaven2/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
""" This module provides access to the Deephaven server configuration. """
import jpy

from deephaven2 import DHError
from deephaven2.time import TimeZone

_JDHConfig = jpy.get_type("io.deephaven.configuration.Configuration")
_JDateTimeZone = jpy.get_type("org.joda.time.DateTimeZone")


def get_log_dir() -> str:
""" Returns the server's log directory. """
try:
return _JDHConfig.getInstance().getLogDir()
except Exception as e:
raise DHError(e, "failed to get the server's log directory.") from e


def get_server_timezone() -> TimeZone:
""" Returns the server's time zone. """
try:
j_timezone = _JDateTimeZone.forTimeZone(_JDHConfig.getInstance().getServerTimezone())
for tz in TimeZone:
if j_timezone == tz.value.getTimeZone():
return tz
raise NotImplementedError("can't find the time zone in the TImeZone Enum.")
except Exception as e:
raise DHError(e, message=f"failed to find a recognized time zone") from e
Loading

0 comments on commit 003f272

Please sign in to comment.