Skip to content

Commit

Permalink
Refactor MessageArgs (#3358)
Browse files Browse the repository at this point in the history
* remove binary commandMap by including binary-payload field in MsgTuple. Fix misleading indentation in serverDaemon. Remove repeated error-handling code from ServerDaemon.

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* remove commandMapArray by adding payload field to MessageArgs

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* fix messageArgs serialization bug

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* remove unecessary string.doFormat, replacing with string.format

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* remove symbols duplicated across IOCompat versions

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* start refactor of ParameterObj. Simplified argument parsing API to include 'toScalar', 'toScalarTuple', 'toScalarList', 'toScalarArray'. Other methods were removed or "deprecated".  Added 'this()' indexing to msgArgs

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* remove objType enum from Message.chpl (and message.py) - resolves #2522. Only 'efunc2Msg' was using this functionality, so put a temporary fix in place until it can be refactored

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* fix dep modules wrt IOCompat changes

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* remove objType from message.py tests

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* fix tuple and array deserialization for array-api-builds. Fix a bug in the array-api 'tolist' introduced in #3242

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* remove duplicate serialize method from MessageArgs

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

* fix formatting in Message.chpl

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>

---------

Signed-off-by: Jeremiah Corrado <jeremiah.corrado@hpe.com>
Co-authored-by: ajpotts <amanda.j.potts@gmail.com>
  • Loading branch information
jeremiah-corrado and ajpotts authored Jun 27, 2024
1 parent 2bb0045 commit b8c3e9a
Show file tree
Hide file tree
Showing 42 changed files with 455 additions and 726 deletions.
29 changes: 8 additions & 21 deletions PROTO_tests/tests/message_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,13 @@ def test_scalar_args(self, dtype):
json.dumps(
{
"key": "arg1",
"objType": "VALUE",
"dtype": ak.resolve_scalar_dtype(val1),
"val": str(val1),
}
),
json.dumps(
{
"key": "arg2",
"objType": "VALUE",
"dtype": ak.resolve_scalar_dtype(val2),
"val": str(val2),
}
Expand All @@ -127,7 +125,7 @@ def test_addl_str(self):
expected = json.dumps(
[
json.dumps(
{"key": "arg", "objType": "VALUE", "dtype": ak.resolve_scalar_dtype(val), "val": val}
{"key": "arg", "dtype": ak.resolve_scalar_dtype(val), "val": val}
),
]
)
Expand All @@ -144,15 +142,13 @@ def test_list_arg(self, dtype):
json.dumps(
{
"key": "list1",
"objType": "LIST",
"dtype": ak.resolve_scalar_dtype(l1[0]),
"val": json.dumps([str(x) for x in l1]),
}
),
json.dumps(
{
"key": "list2",
"objType": "LIST",
"dtype": ak.resolve_scalar_dtype(l2[0]),
"val": json.dumps([str(x) for x in l2]),
}
Expand All @@ -170,7 +166,6 @@ def test_list_addl_str(self):
json.dumps(
{
"key": "str_list",
"objType": "LIST",
"dtype": ak.resolve_scalar_dtype(string_list[0]),
"val": json.dumps(string_list),
}
Expand All @@ -184,7 +179,7 @@ def test_datetime_arg(self):
size, args = _json_args_to_str({"datetime": dt})

expected = json.dumps(
[json.dumps({"key": "datetime", "objType": "PDARRAY", "dtype": "int64", "val": dt.name})]
[json.dumps({"key": "datetime", "dtype": "int64", "val": dt.name})]
)
assert args == expected

Expand All @@ -193,7 +188,7 @@ def test_ip_arg(self):
ip = ak.ip_address(a)
size, args = _json_args_to_str({"ip": ip})
expected = json.dumps(
[json.dumps({"key": "ip", "objType": "PDARRAY", "dtype": "uint64", "val": ip.name})]
[json.dumps({"key": "ip", "dtype": "uint64", "val": ip.name})]
)
assert args == expected

Expand All @@ -202,7 +197,7 @@ def test_fields_arg(self):
f = ak.Fields(a, names="ABCD")
size, args = _json_args_to_str({"fields": f})
expected = json.dumps(
[json.dumps({"key": "fields", "objType": "PDARRAY", "dtype": "uint64", "val": f.name})]
[json.dumps({"key": "fields", "dtype": "uint64", "val": f.name})]
)
assert args == expected

Expand All @@ -214,10 +209,10 @@ def test_pda_arg(self, dtype):
expected = json.dumps(
[
json.dumps(
{"key": "pda1", "objType": "PDARRAY", "dtype": str(pda1.dtype), "val": pda1.name}
{"key": "pda1", "dtype": str(pda1.dtype), "val": pda1.name}
),
json.dumps(
{"key": "pda2", "objType": "PDARRAY", "dtype": str(pda2.dtype), "val": pda2.name}
{"key": "pda2", "dtype": str(pda2.dtype), "val": pda2.name}
),
]
)
Expand All @@ -229,7 +224,6 @@ def test_pda_arg(self, dtype):
json.dumps(
{
"key": "pda_list",
"objType": "LIST",
"dtype": ak.pdarray.objType,
"val": json.dumps([pda1.name, pda2.name]),
}
Expand All @@ -244,8 +238,8 @@ def test_segstr_arg(self):
size, args = _json_args_to_str({"str1": str1, "str2": str2})
expected = json.dumps(
[
json.dumps({"key": "str1", "objType": "SEGSTRING", "dtype": "str", "val": str1.name}),
json.dumps({"key": "str2", "objType": "SEGSTRING", "dtype": "str", "val": str2.name}),
json.dumps({"key": "str1", "dtype": "str", "val": str1.name}),
json.dumps({"key": "str2", "dtype": "str", "val": str2.name}),
]
)
assert args == expected
Expand All @@ -256,7 +250,6 @@ def test_segstr_arg(self):
json.dumps(
{
"key": "str_list",
"objType": "LIST",
"dtype": ak.Strings.objType,
"val": json.dumps([str1.name, str2.name]),
}
Expand All @@ -280,46 +273,40 @@ def test_dict_arg(self):
json.dumps(
{
"key": "json_1",
"objType": "DICT",
"dtype": "dict",
"val": json.dumps(
[
json.dumps(
{
"key": "param1",
"objType": "VALUE",
"dtype": ak.resolve_scalar_dtype(json_1["param1"]),
"val": "1",
}
),
json.dumps(
{
"key": "param2",
"objType": "VALUE",
"dtype": ak.resolve_scalar_dtype(json_1["param2"]),
"val": "abc",
}
),
json.dumps(
{
"key": "param3",
"objType": "LIST",
"dtype": ak.resolve_scalar_dtype(json_1["param3"][0]),
"val": json.dumps([str(x) for x in json_1["param3"]]),
}
),
json.dumps(
{
"key": "param4",
"objType": "PDARRAY",
"dtype": str(json_1["param4"].dtype),
"val": json_1["param4"].name,
}
),
json.dumps(
{
"key": "param5",
"objType": "SEGSTRING",
"dtype": "str",
"val": json_1["param5"].name,
}
Expand Down
2 changes: 1 addition & 1 deletion ServerModules.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ IndexingMsg
JoinEqWithDTMsg
KExtremeMsg
LogMsg
# ManipulationMsg
ManipulationMsg
OperatorMsg
ParquetMsg
RandMsg
Expand Down
2 changes: 1 addition & 1 deletion arkouda/array_api/array_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def tolist(self):
:func:`~arkouda.client.maxTransferBytes`)
"""
x = self._array.to_list()
if self._has_single_elem():
if self.shape == ():
# to match numpy, return a scalar for a 0-dimensional array
return x[0]
else:
Expand Down
69 changes: 28 additions & 41 deletions arkouda/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,55 +10,22 @@
from arkouda.dtypes import isSupportedNumber, resolve_scalar_dtype


class ObjectType(Enum):
"""
Class used for assigning object types in the JSON string
sent to the server for processing
"""

PDARRAY = "PDARRAY"
STRINGS = "SEGSTRING"
SEGARRAY = "SEGARRAY"
LIST = "LIST"
DICT = "DICT"
VALUE = "VALUE"
DATETIME = "DATETIME"
TIMEDELTA = "TIMEDELTA"

def __str__(self) -> str:
"""
Overridden method returns value, which is useful in outputting
a MessageType object to JSON.
"""
return self.value

def __repr__(self) -> str:
"""
Overridden method returns value, which is useful in outputting
a MessageType object to JSON.
"""
return self.value


class ParameterObject:
__slots__ = ("key", "objType", "dtype", "val")
__slots__ = ("key", "dtype", "val")

key: str
objType: MessageFormat
dtype: str
val: str

def __init__(self, key, objType, dtype, val):
def __init__(self, key, dtype, val):
object.__setattr__(self, "key", key)
object.__setattr__(self, "objType", objType)
object.__setattr__(self, "dtype", dtype)
object.__setattr__(self, "val", val)

@property
def dict(self):
return {
"key": self.key,
"objType": str(self.objType),
"dtype": self.dtype,
"val": self.val,
}
Expand All @@ -80,7 +47,7 @@ def _build_pdarray_param(key: str, val) -> ParameterObject:
-------
ParameterObject
"""
return ParameterObject(key, ObjectType.PDARRAY, str(val.dtype), val.name)
return ParameterObject(key, str(val.dtype), val.name)

@staticmethod
@typechecked
Expand All @@ -101,7 +68,7 @@ def _build_strings_param(key: str, val) -> ParameterObject:
"""
# empty string if name of String obj is none
name = val.name if val.name else ""
return ParameterObject(key, ObjectType.STRINGS, "str", name)
return ParameterObject(key, "str", name)

@staticmethod
@typechecked
Expand All @@ -121,7 +88,7 @@ def _build_segarray_param(key: str, val) -> ParameterObject:
ParameterObject
"""
data = json.dumps({"segments": val.segments.name, "values": val.values.name})
return ParameterObject(key, ObjectType.SEGARRAY, str(val.values.dtype), data)
return ParameterObject(key, str(val.values.dtype), data)

@staticmethod
def _is_supported_value(val):
Expand All @@ -141,6 +108,25 @@ def _format_param(p):
else p.name
)

@staticmethod
@typechecked
def _build_tuple_param(key: str, val: tuple) -> ParameterObject:
"""
Create a ParameterObject from a tuple
Parameters
----------
key : str
key from the dictionary object
val : tuple
tuple object to format as string
Returns
-------
ParameterObject
"""
return ParameterObject._build_list_param(key, list(val))

@staticmethod
@typechecked
def _build_list_param(key: str, val: list) -> ParameterObject:
Expand Down Expand Up @@ -184,7 +170,7 @@ def _build_list_param(key: str, val: list) -> ParameterObject:
str(p) if ParameterObject._is_supported_value(p) else ParameterObject._format_param(p)
for p in val
]
return ParameterObject(key, ObjectType.LIST, t, json.dumps(data))
return ParameterObject(key, t, json.dumps(data))

@staticmethod
@typechecked
Expand All @@ -195,7 +181,7 @@ def _build_dict_param(key: str, val: Dict) -> ParameterObject:
raise TypeError(f"Argument keys are required to be str. Found {type(k)}")
param = ParameterObject.factory(k, v)
j.append(json.dumps(param.dict))
return ParameterObject(key, ObjectType.DICT, str(dict.__name__), json.dumps(j))
return ParameterObject(key, str(dict.__name__), json.dumps(j))

@staticmethod
@typechecked
Expand All @@ -215,7 +201,7 @@ def _build_gen_param(key: str, val) -> ParameterObject:
ParameterObject
"""
v = val if isinstance(val, str) else str(val)
return ParameterObject(key, ObjectType.VALUE, resolve_scalar_dtype(val), v)
return ParameterObject(key, resolve_scalar_dtype(val), v)

@staticmethod
def generate_dispatch() -> Dict:
Expand All @@ -234,6 +220,7 @@ def generate_dispatch() -> Dict:
SegArray.__name__: ParameterObject._build_segarray_param,
list.__name__: ParameterObject._build_list_param,
dict.__name__: ParameterObject._build_dict_param,
tuple.__name__: ParameterObject._build_tuple_param,
}

@classmethod
Expand Down
3 changes: 1 addition & 2 deletions dep/checkHDF5.chpl
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use HDF5, CTypes;
use ArkoudaIOCompat;

proc main() {
var H5major: c_uint, H5minor: c_uint, H5micro: c_uint;
C_HDF5.H5get_libversion(H5major, H5minor, H5micro);
writefCompat("Found HDF5 version: %?.%?.%?\n", H5major, H5minor, H5micro);
writef("Found HDF5 version: %?.%?.%?\n", H5major, H5minor, H5micro);
return 0;
}
3 changes: 1 addition & 2 deletions dep/checkZMQ.chpl
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use ZMQ;
use ArkoudaIOCompat;

proc main() {
var (Zmajor, Zminor, Zmicro) = ZMQ.version;
writefCompat("Found ZMQ version: %?.%?.%?\n", Zmajor, Zminor, Zmicro);
writef("Found ZMQ version: %?.%?.%?\n", Zmajor, Zminor, Zmicro);
return 0;
}
4 changes: 2 additions & 2 deletions src/AryUtil.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ module AryUtil
// ranges of flat indices owned by each locale
const flatLocRanges = [loc in Locales] d.localSubdomain(loc).dim(0);

coforall loc in Locales do on loc {
coforall loc in Locales with (ref unflat) do on loc {
const lduf = unflat.domain.localSubdomain(),
lastRank = lduf.dim(N-1);

Expand Down Expand Up @@ -807,7 +807,7 @@ module AryUtil
// ranges of flat indices owned by each locale
const flatLocRanges = [loc in Locales] flat.domain.localSubdomain(loc).dim(0);

coforall loc in Locales do on loc {
coforall loc in Locales with (ref flat) do on loc {
const ld = d.localSubdomain(),
lastRank = ld.dim(d.rank-1);

Expand Down
3 changes: 1 addition & 2 deletions src/BigIntMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ module BigIntMsg {
use ServerErrorStrings;
use BigInteger;
use List;

use ArkoudaIOCompat;
use IOUtils;

private config const logLevel = ServerConfig.logLevel;
private config const logChannel = ServerConfig.logChannel;
Expand Down
3 changes: 1 addition & 2 deletions src/CSVMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ module CSVMsg {
use Sort;
use FileIO;
use Set;

use ArkoudaIOCompat;
use IOUtils;

const CSV_HEADER_OPEN = "**HEADER**";
const CSV_HEADER_CLOSE = "*/HEADER/*";
Expand Down
Loading

0 comments on commit b8c3e9a

Please sign in to comment.