Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hand wrapped KafkaTools, ParquetTools, fixes #1643, #1546, #1892 #1825

Merged
merged 15 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Integrations/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ Closure composeConfig = { task ->

def pyTest = runInDocker("test-py-37", 'python', ['python3', '-m', 'xmlrunner', 'discover', '-v', '-o', '/out/report'], composeConfig)
def pyFunctionalTest = runInDocker('py-functional-test', 'python', ['/bin/bash', 'run-functional-tests.sh'])
def pyTest2 = runInDocker('test-py-deephaven2', '../pyintegration', ['python3', '-m', 'xmlrunner', 'discover', 'tests', '-v', '-o', '/out/report'])
def pyTest2 = runInDocker('test-py-deephaven2', '../pyintegration', ['python3', '-m', 'xmlrunner', 'discover', 'tests', '-v', '-o', '/out/report'], composeConfig)
pyTest.configure({
onlyIf { TestTools.shouldRunTests(project) }
})
Expand Down
22 changes: 13 additions & 9 deletions pyintegration/deephaven2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
"""Deephaven Python Integration Package provides the ability to access the Deephaven's query engine natively and thus
unlocks the unique and tremendous power of Deephaven to the Python community.
unlocks the unique power of Deephaven to the Python community.
"""
import jpy

__version__ = "0.9.0"

if not jpy.has_jvm():
from ._utils.bootstrap import build_py_session
from ._init.bootstrap import build_py_session
from .dherror import DHError

try:
build_py_session()
except Exception as e:
raise DHError(e, "deephaven initialization failed.") from e
else:
from .constants import SortDirection
from .csv import read as read_csv
from .csv import write as write_csv
from .table_factory import empty_table, time_table, merge, merge_sorted, new_table
from .stream.kafka import consumer as kafka_consumer
from .stream.kafka import producer as kafka_producer

from .dherror import DHError
from .constants import SortDirection
from .csv import read as read_csv
from .csv import write as write_csv
from .table_factory import empty_table, time_table, merge, merge_sorted, new_table
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
""" This module supports bootstrapping a Deephaven Python Script session from Python."""

import os

import jpy
Expand All @@ -14,6 +16,8 @@


def build_py_session():
""" This function uses the default DH property file to embed the Deephaven server and starts a Deephaven Python
Script session. """
if not jpy.has_jvm():
os.environ['JAVA_VERSION'] = '11'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#

"""Utilities for starting the Deephaven JVM."""
""" Utilities for starting the Deephaven JVM. """

import os
import re
Expand Down
17 changes: 17 additions & 0 deletions pyintegration/deephaven2/_wrapper_abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
""" This module defines an Abstract Class for Java object wrappers.
The primary purpose of this ABC is to enable downstream code to retrieve the wrapped Java objects in a uniform way.
"""
from abc import ABC, abstractmethod

import jpy


class JObjectWrapper(ABC):
@property
@abstractmethod
def j_object(self) -> jpy.JType:
...
5 changes: 5 additions & 0 deletions pyintegration/deephaven2/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

_JColumnHeader = jpy.get_type("io.deephaven.qst.column.header.ColumnHeader")
_JColumn = jpy.get_type("io.deephaven.qst.column.Column")
_JColumnDefinition = jpy.get_type("io.deephaven.engine.table.ColumnDefinition")


class ColumnType(Enum):
Expand Down Expand Up @@ -41,6 +42,10 @@ class Column:
def j_column_header(self):
return _JColumnHeader.of(self.name, self.data_type.qst_type)

@property
def j_column_definition(self):
return _JColumnDefinition.fromGenericType(self.name, self.data_type.qst_type.clazz(), self.component_type)


@dataclass
class InputColumn(Column):
Expand Down
23 changes: 23 additions & 0 deletions pyintegration/deephaven2/config/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
""" This module provides access to the Deephaven server configuration. """
import jpy
from deephaven2.time import TimeZone

_JDHConfig = jpy.get_type("io.deephaven.configuration.Configuration")
_JDateTimeZone = jpy.get_type("org.joda.time.DateTimeZone")


def get_log_dir() -> str:
""" Returns the server's log directory. """
return _JDHConfig.getInstance().getLogDir()


def get_server_timezone() -> TimeZone:
""" Returns the server's time zone. """
j_timezone = _JDateTimeZone.forTimeZone(_JDHConfig.getInstance().getServerTimezone())
for tz in TimeZone:
if j_timezone == tz.value.getTimeZone():
return tz
return None
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
85 changes: 68 additions & 17 deletions pyintegration/deephaven2/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
#
""" This module defines the data types supported by the Deephaven engine.

Each data type is represented by a DType class which supports creating arrays of the same type and more.
Each data type is represented by a DType instance which supports creating arrays of the same type and more.
"""
from __future__ import annotations

from typing import Any, Sequence, Callable, Iterable
from typing import Iterable, Any, Sequence, Callable, Optional, Dict, Set

import jpy

Expand All @@ -27,7 +27,7 @@ class DType:
_j_name_type_map = {}

@classmethod
def from_jtype(cls, j_class: Any) -> DType:
def from_jtype(cls, j_class: Any) -> Optional[DType]:
if not j_class:
return None

Expand All @@ -50,10 +50,10 @@ def __repr__(self):
return self.j_name

def __call__(self, *args, **kwargs):
try:
if not self.is_primitive:
return self.j_type(*args, **kwargs)
except Exception as e:
raise DHError(e, f"failed to create an instance of {self.j_name}") from e
else:
raise DHError(message=f"primitive type {self.j_name} is not callable.")

def array(self, size: int):
""" Creates a Java array of the same data type of the specified size.
Expand Down Expand Up @@ -101,10 +101,7 @@ def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None):


class CharDType(DType):
""" The class for char type. """

def __init__(self):
super().__init__(j_name="char", qst_type=_JQstType.charType(), is_primitive=True)
""" The class that wraps Java char type. """

def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None):
if isinstance(seq, str) and not remap:
Expand All @@ -117,7 +114,7 @@ def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None):
int8 = byte
short = DType(j_name="short", qst_type=_JQstType.shortType(), is_primitive=True)
int16 = short
char = CharDType()
char = CharDType(j_name="char", qst_type=_JQstType.charType(), is_primitive=True)
int_ = DType(j_name="int", qst_type=_JQstType.intType(), is_primitive=True)
int32 = int_
long = DType(j_name="long", qst_type=_JQstType.longType(), is_primitive=True)
Expand All @@ -136,11 +133,65 @@ def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None):
Period = DType(j_name="io.deephaven.time.Period")


def j_array_list(values: Iterable):
j_list = jpy.get_type("java.util.ArrayList")(len(values))
try:
for v in values:
class PropertiesDType(DType):
""" The class that wraps java Properties. """

def __call__(self, d: Dict) -> Any:
if d is None:
return None
r = self.j_type()
for key, value in d.items():
if value is None:
value = ''
r.setProperty(key, value)
return r


class HashMapDType(DType):
""" The class that wraps java HashMap. """

def __call__(self, d: Dict) -> Any:
if d is None:
return None
r = self.j_type()
for key, value in d.items():
if value is None:
value = ''
r.put(key, value)
return r


class HashSetDType(DType):
""" The case the wraps java Set. """

def __call__(self, s: Set):
if s is None:
return None
r = self.j_type()
for v in s:
r.add(v)
return r


class ArrayListDType(DType):
""" The case the wraps java Set. """

def __call__(self, it: Iterable):
if not it:
return None

j_list = self.j_type()
for v in it:
j_list.add(v)
return j_list
except Exception as e:
raise DHError(e, "failed to create a Java ArrayList from the Python collection.") from e


Properties = PropertiesDType(j_name="java.util.Properties")
HashMap = HashMapDType(j_name="java.util.HashMap")
HashSet = HashSetDType(j_name="java.util.HashSet")
ArrayList = ArrayListDType(j_name="java.util.ArrayList")
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved


def is_java_type(obj: Any) -> bool:
""" Returns True if the object is originated in Java. """
return isinstance(obj, jpy.JType)
Loading