Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hand wrapped KafkaTools, ParquetTools, fixes #1643, #1546, #1892 #1825

Merged
merged 15 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pyintegration/deephaven2/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
#
from dataclasses import dataclass, field
from enum import Enum
from typing import Sequence, Any
from typing import Sequence

import jpy
from deephaven2.dtypes import DType

import deephaven2.dtypes as dtypes
from deephaven2 import DHError
from deephaven2.dtypes import DType

_JColumnHeader = jpy.get_type("io.deephaven.qst.column.header.ColumnHeader")
_JColumn = jpy.get_type("io.deephaven.qst.column.Column")
Expand Down Expand Up @@ -58,9 +58,9 @@ def __post_init__(self):
self.j_column = _JColumn.empty(self.j_column_header)
else:
if self.data_type.is_primitive:
self.j_column = _JColumn.ofUnsafe(self.name, self.data_type.array_from(self.input_data))
self.j_column = _JColumn.ofUnsafe(self.name, dtypes.array(self.data_type, self.input_data))
else:
self.j_column = _JColumn.of(self.j_column_header, self.data_type.array_from(self.input_data))
self.j_column = _JColumn.of(self.j_column_header, dtypes.array(self.data_type, self.input_data))
except Exception as e:
raise DHError(e, "failed to create an InputColumn.") from e

Expand Down
20 changes: 14 additions & 6 deletions pyintegration/deephaven2/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#
""" This module provides access to the Deephaven server configuration. """
import jpy

from deephaven2 import DHError
from deephaven2.time import TimeZone

_JDHConfig = jpy.get_type("io.deephaven.configuration.Configuration")
Expand All @@ -11,13 +13,19 @@

def get_log_dir() -> str:
""" Returns the server's log directory. """
return _JDHConfig.getInstance().getLogDir()
try:
return _JDHConfig.getInstance().getLogDir()
except Exception as e:
raise DHError(e, "failed to get the server's log directory.") from e


def get_server_timezone() -> TimeZone:
""" Returns the server's time zone. """
j_timezone = _JDateTimeZone.forTimeZone(_JDHConfig.getInstance().getServerTimezone())
for tz in TimeZone:
if j_timezone == tz.value.getTimeZone():
return tz
return None
try:
j_timezone = _JDateTimeZone.forTimeZone(_JDHConfig.getInstance().getServerTimezone())
for tz in TimeZone:
if j_timezone == tz.value.getTimeZone():
return tz
raise NotImplementedError("can't find the time zone in the TImeZone Enum.")
except Exception as e:
raise DHError(e, message=f"failed to find a recognized time zone") from e
258 changes: 124 additions & 134 deletions pyintegration/deephaven2/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@
#
""" This module defines the data types supported by the Deephaven engine.

Each data type is represented by a DType instance which supports creating arrays of the same type and more.
Each data type is represented by a DType class which supports creating arrays of the same type and more.
"""
from __future__ import annotations

from typing import Iterable, Any, Sequence, Callable, Optional, Dict, Set
from typing import Iterable, Any, Sequence, Callable, Dict, Type, Set, Union

import jpy
import numpy as np

from deephaven2 import DHError

_JQstType = jpy.get_type("io.deephaven.qst.type.Type")
_JTableTools = jpy.get_type("io.deephaven.engine.util.TableTools")

_j_name_type_map: Dict[str, DType] = {}


def _qst_custom_type(cls_name: str):
return _JQstType.find(_JTableTools.typeFromName(cls_name))
Expand All @@ -24,174 +27,161 @@ def _qst_custom_type(cls_name: str):
class DType:
""" A class representing a data type in Deephaven."""

_j_name_type_map = {}

@classmethod
def from_jtype(cls, j_class: Any) -> Optional[DType]:
if not j_class:
return None

j_name = j_class.getName()
dtype = DType._j_name_type_map.get(j_name)
if not dtype:
return cls(j_name=j_name, j_type=j_class)
else:
return dtype

def __init__(self, j_name: str, j_type: Any = None, qst_type: Any = None, is_primitive: bool = False):
def __init__(self, j_name: str, j_type: Type = None, qst_type: jpy.JType = None, is_primitive: bool = False,
np_type=None, factory: Callable = None):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
"""
Args:
j_name (str): the full qualified name of the Java class
j_type (Type): the mapped Python class created by JPY
qst_type (JType): the JPY wrapped object for a instance of QST Type
is_primitive (bool): whether this instance represents a primitive Java type
np_type: an instance of numpy dtype (dtype("int64")or numpy class (e.g. np.int16)
factory: a callable that returns an instance of the wrapped Java class
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
"""
self.j_name = j_name
self.j_type = j_type if j_type else jpy.get_type(j_name)
self.qst_type = qst_type if qst_type else _qst_custom_type(j_name)
self.is_primitive = is_primitive
self.np_type = np_type
self._factory = factory

DType._j_name_type_map[j_name] = self
_j_name_type_map[j_name] = self

def __repr__(self):
return self.j_name

def __call__(self, *args, **kwargs):
if not self.is_primitive:
return self.j_type(*args, **kwargs)
else:
if self.is_primitive:
raise DHError(message=f"primitive type {self.j_name} is not callable.")

def array(self, size: int):
""" Creates a Java array of the same data type of the specified size.

Args:
size (int): the size of the array

Returns:
a Java array

Raises:
DHError
"""
try:
return jpy.array(self.j_name, size)
except Exception as e:
raise DHError("failed to create a Java array.") from e

def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None):
""" Creates a Java array of the same data type populated with values from a sequence.

Note:
this method does unsafe casting, meaning precision and values might be lost with down cast

Args:
seq: a sequence of compatible data, e.g. list, tuple, numpy array, Pandas series, etc.
remap (optional): a callable that takes one value and maps it to another, for handling the translation of
special DH values such as NULL_INT, NAN_INT between Python and the DH engine

Returns:
a Java array

Raises:
DHError
"""
try:
if remap:
if not callable(remap):
raise ValueError("Not a callable")
seq = [remap(v) for v in seq]

return jpy.array(self.j_name, seq)
if self._factory:
return self._factory(*args, **kwargs)
return self.j_type(*args, **kwargs)
except Exception as e:
raise DHError(e, f"failed to create a Java {self.j_name} array.") from e


class CharDType(DType):
""" The class that wraps Java char type. """

def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None):
if isinstance(seq, str) and not remap:
return super().array_from(seq, remap=ord)
return super().array_from(seq, remap)
raise DHError(e, f"failed to create an instance of {self.j_name}") from e


bool_ = DType(j_name="java.lang.Boolean", qst_type=_JQstType.booleanType())
byte = DType(j_name="byte", qst_type=_JQstType.byteType(), is_primitive=True)
bool_ = DType(j_name="java.lang.Boolean", qst_type=_JQstType.booleanType(), np_type=np.bool_)
byte = DType(j_name="byte", qst_type=_JQstType.byteType(), is_primitive=True, np_type=np.int8)
int8 = byte
short = DType(j_name="short", qst_type=_JQstType.shortType(), is_primitive=True)
short = DType(j_name="short", qst_type=_JQstType.shortType(), is_primitive=True, np_type=np.int16)
int16 = short
char = CharDType(j_name="char", qst_type=_JQstType.charType(), is_primitive=True)
int_ = DType(j_name="int", qst_type=_JQstType.intType(), is_primitive=True)
int32 = int_
long = DType(j_name="long", qst_type=_JQstType.longType(), is_primitive=True)
char = DType(j_name="char", qst_type=_JQstType.charType(), is_primitive=True, np_type=np.dtype('uint16'))
int32 = DType(j_name="int", qst_type=_JQstType.intType(), is_primitive=True, np_type=np.int32)
long = DType(j_name="long", qst_type=_JQstType.longType(), is_primitive=True, np_type=np.int64)
int64 = long
float_ = DType(j_name="float", qst_type=_JQstType.floatType(), is_primitive=True)
int_ = int32
float_ = DType(j_name="float", qst_type=_JQstType.floatType(), is_primitive=True, np_type=np.float32)
single = float_
float32 = float_
double = DType(j_name="double", qst_type=_JQstType.doubleType(), is_primitive=True)
double = DType(j_name="double", qst_type=_JQstType.doubleType(), is_primitive=True, np_type=np.float64)
float64 = double
string = DType(j_name="java.lang.String", qst_type=_JQstType.stringType())
BigDecimal = DType(j_name="java.math.BigDecimal")
StringSet = DType(j_name="io.deephaven.stringset.StringSet")
PyObject = DType(j_name="org.jpy.PyObject")
JObject = DType(j_name="java.lang.Object")
DateTime = DType(j_name="io.deephaven.time.DateTime")
Period = DType(j_name="io.deephaven.time.Period")
string = DType(j_name="java.lang.String", qst_type=_JQstType.stringType(), np_type=np.object_)
BigDecimal = DType(j_name="java.math.BigDecimal", np_type=np.object_)
StringSet = DType(j_name="io.deephaven.stringset.StringSet", np_type=np.object_)
DateTime = DType(j_name="io.deephaven.time.DateTime", np_type=np.dtype("datetime64[ns]"))
Period = DType(j_name="io.deephaven.time.Period", np_type=np.object_)
PyObject = DType(j_name="org.jpy.PyObject", np_type=np.object_)
JObject = DType(j_name="java.lang.Object", np_type=np.object_)


def array(dtype: DType, seq: Sequence, remap: Callable[[Any], Any] = None) -> jpy.JType:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
""" Creates a Java array of the specified data type populated with values from a sequence.

Note:
this method does unsafe casting, meaning precision and values might be lost with down cast

Args:
dtype (DType): the component type of the array
seq (Sequence): a sequence of compatible data, e.g. list, tuple, numpy array, Pandas series, etc.
remap (optional): a callable that takes one value and maps it to another, for handling the translation of
special DH values such as NULL_INT, NAN_INT between Python and the DH engine

Returns:
a Java array

Raises:
DHError
"""
try:
if remap:
if not callable(remap):
raise ValueError("Not a callable")
seq = [remap(v) for v in seq]
else:
if isinstance(seq, str) and dtype == char:
return array(char, seq, remap=ord)

return jpy.array(dtype.j_type, seq)
except Exception as e:
raise DHError(e, f"failed to create a Java {dtype.j_name} array.") from e


class PropertiesDType(DType):
""" The class that wraps java Properties. """
def from_jtype(j_class: Any) -> DType:
""" look up a DType that matches the java type, if not found, create a DType for it. """
if not j_class:
return None

def __call__(self, d: Dict) -> Any:
if d is None:
return None
r = self.j_type()
for key, value in d.items():
if value is None:
value = ''
r.setProperty(key, value)
return r
j_name = j_class.getName()
dtype = _j_name_type_map.get(j_name)
if not dtype:
return DType(j_name=j_name, j_type=j_class, np_type=np.object_)
else:
return dtype


class HashMapDType(DType):
""" The class that wraps java HashMap. """
def from_np_dtype(np_dtype: np.dtype) -> DType:
""" Look up a DType that matches the numpy.dtype, if not found, return PyObject. """
for _, dtype in _j_name_type_map.items():
if np.dtype(dtype.np_type) == np_dtype and dtype.np_type != np.object_:
return dtype

def __call__(self, d: Dict) -> Any:
if d is None:
return None
r = self.j_type()
for key, value in d.items():
if value is None:
value = ''
r.put(key, value)
return r
return PyObject


class HashSetDType(DType):
""" The case the wraps java Set. """
def is_java_type(obj: Any) -> bool:
""" Returns True if the object is originated in Java. """
return isinstance(obj, jpy.JType)


def __call__(self, s: Set):
if s is None:
return None
r = self.j_type()
for v in s:
r.add(v)
return r
def j_array_list(values: Iterable):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if values is None:
return None
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
r = jpy.get_type("java.util.ArrayList")(len(list(values)))
for v in values:
r.add(v)
return r


class ArrayListDType(DType):
""" The case the wraps java Set. """
def j_hashmap(d: Dict):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if d is None:
return None
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

def __call__(self, it: Iterable):
if not it:
return None
r = jpy.get_type("java.util.HashMap")()
for key, value in d.items():
if value is None:
value = ''
r.put(key, value)
return r

j_list = self.j_type()
for v in it:
j_list.add(v)
return j_list

def j_hashset(s: Set):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if s is None:
return None
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved

Properties = PropertiesDType(j_name="java.util.Properties")
HashMap = HashMapDType(j_name="java.util.HashMap")
HashSet = HashSetDType(j_name="java.util.HashSet")
ArrayList = ArrayListDType(j_name="java.util.ArrayList")
r = jpy.get_type("java.util.HashSet")()
for v in s:
r.add(v)
return r


def is_java_type(obj: Any) -> bool:
""" Returns True if the object is originated in Java. """
return isinstance(obj, jpy.JType)
def j_properties(d: Dict):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
if d is None:
return None
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
r = jpy.get_type("java.util.Properties")()
for key, value in d.items():
if value is None:
value = ''
r.setProperty(key, value)
return r
Loading