Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java-binding): support java binding on stream chunk #8517

Merged
merged 5 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ci/scripts/java-binding-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@ cargo make ingest-data-and-run-java-binding

echo "--- Kill cluster"
cargo make ci-kill

echo "--- run stream chunk java binding"
cargo make run-java-binding-stream-chunk-demo
44 changes: 38 additions & 6 deletions java/com_risingwave_java_binding_Binding.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.risingwave.java.binding;

import static com.risingwave.java.binding.Utils.validateRow;

import com.risingwave.java.utils.MetaClient;
import com.risingwave.proto.Catalog.Table;
import com.risingwave.proto.Hummock.HummockVersion;
Expand All @@ -27,7 +29,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;

/** Hello world! */
public class Demo {
public class HummockReadDemo {
public static void main(String[] args) {
String objectStore = System.getenv("OBJECT_STORE");
String dbName = System.getenv("DB_NAME");
Expand Down Expand Up @@ -67,7 +69,7 @@ public static void main(String[] args) {
.addAllVnodeIds(vnodeList)
.build();

try (Iterator iter = new Iterator(readPlan)) {
try (HummockIterator iter = new HummockIterator(readPlan)) {
int count = 0;
while (true) {
try (KeyedRow row = iter.next()) {
Expand All @@ -92,43 +94,4 @@ public static void main(String[] args) {

scheduledThreadPool.shutdown();
}

static void validateRow(KeyedRow row) {
// The validation of row data are according to the data generation rule
// defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py
short rowIndex = row.getShort(0);
if (row.getInt(1) != rowIndex) {
throw new RuntimeException(
String.format("invalid int value: %s %s", row.getInt(1), rowIndex));
}
if (row.getLong(2) != rowIndex) {
throw new RuntimeException(
String.format("invalid long value: %s %s", row.getLong(2), rowIndex));
}
if (row.getFloat(3) != (float) rowIndex) {
throw new RuntimeException(
String.format("invalid float value: %s %s", row.getFloat(3), rowIndex));
}
if (row.getDouble(4) != (double) rowIndex) {
throw new RuntimeException(
String.format("invalid double value: %s %s", row.getDouble(4), rowIndex));
}
if (row.getBoolean(5) != (rowIndex % 3 == 0)) {
throw new RuntimeException(
String.format(
"invalid bool value: %s %s", row.getBoolean(5), (rowIndex % 3 == 0)));
}
if (!row.getString(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) {
throw new RuntimeException(
String.format(
"invalid string value: %s %s",
row.getString(6),
((Short) rowIndex).toString().repeat((rowIndex % 10) + 1)));
}
if (row.isNull(7) != (rowIndex % 5 == 0)) {
throw new RuntimeException(
String.format(
"invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.java.binding;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved

import static com.risingwave.java.binding.Utils.validateRow;

import java.io.IOException;

public class StreamChunkDemo {

public static void main(String[] args) throws IOException {
byte[] payload = System.in.readAllBytes();
try (StreamChunkIterator iter = new StreamChunkIterator(payload)) {
int count = 0;
while (true) {
try (StreamChunkRow row = iter.next()) {
if (row == null) {
break;
}
count += 1;
validateRow(row);
}
}
int expectedCount = 30000;
if (count != expectedCount) {
throw new RuntimeException(
String.format("row count is %s, should be %s", count, expectedCount));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.java.binding;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved

public class Utils {
public static void validateRow(BaseRow row) {
// The validation of row data are according to the data generation rule
// defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py
short rowIndex = row.getShort(0);
if (row.getInt(1) != rowIndex) {
throw new RuntimeException(
String.format("invalid int value: %s %s", row.getInt(1), rowIndex));
}
if (row.getLong(2) != rowIndex) {
throw new RuntimeException(
String.format("invalid long value: %s %s", row.getLong(2), rowIndex));
}
if (row.getFloat(3) != (float) rowIndex) {
throw new RuntimeException(
String.format("invalid float value: %s %s", row.getFloat(3), rowIndex));
}
if (row.getDouble(4) != (double) rowIndex) {
throw new RuntimeException(
String.format("invalid double value: %s %s", row.getDouble(4), rowIndex));
}
if (row.getBoolean(5) != (rowIndex % 3 == 0)) {
throw new RuntimeException(
String.format(
"invalid bool value: %s %s", row.getBoolean(5), (rowIndex % 3 == 0)));
}
if (!row.getString(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) {
throw new RuntimeException(
String.format(
"invalid string value: %s %s",
row.getString(6),
((Short) rowIndex).toString().repeat((rowIndex % 10) + 1)));
}
if (row.isNull(7) != (rowIndex % 5 == 0)) {
throw new RuntimeException(
String.format(
"invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.java.binding;
wenym1 marked this conversation as resolved.
Show resolved Hide resolved

public class BaseRow implements AutoCloseable {
protected final long pointer;
private boolean isClosed;

protected BaseRow(long pointer) {
this.pointer = pointer;
this.isClosed = false;
}

public boolean isNull(int index) {
return Binding.rowIsNull(pointer, index);
}

public short getShort(int index) {
return Binding.rowGetInt16Value(pointer, index);
}

public int getInt(int index) {
return Binding.rowGetInt32Value(pointer, index);
}

public long getLong(int index) {
return Binding.rowGetInt64Value(pointer, index);
}

public float getFloat(int index) {
return Binding.rowGetFloatValue(pointer, index);
}

public double getDouble(int index) {
return Binding.rowGetDoubleValue(pointer, index);
}

public boolean getBoolean(int index) {
return Binding.rowGetBooleanValue(pointer, index);
}

public String getString(int index) {
return Binding.rowGetStringValue(pointer, index);
}

@Override
public void close() {
if (!isClosed) {
isClosed = true;
Binding.rowClose(pointer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ public class Binding {

public static native int vnodeCount();

// iterator method
// hummock iterator method
// Return a pointer to the iterator
static native long iteratorNew(byte[] readPlan);
static native long hummockIteratorNew(byte[] readPlan);

// return a pointer to the next row
static native long iteratorNext(long pointer);
static native long hummockIteratorNext(long pointer);

// Since the underlying rust does not have garbage collection, we will have to manually call
// close on the iterator to release the iterator instance pointed by the pointer.
static native void iteratorClose(long pointer);
static native void hummockIteratorClose(long pointer);

// row method
static native byte[] rowGetKey(long pointer);

static native int rowGetOp(long pointer);

static native boolean rowIsNull(long pointer, int index);

static native short rowGetInt16Value(long pointer, int index);
Expand All @@ -54,4 +56,11 @@ public class Binding {
// Since the underlying rust does not have garbage collection, we will have to manually call
// close on the row to release the row instance pointed by the pointer.
static native void rowClose(long pointer);

// stream chunk iterator method
static native long streamChunkIteratorNew(byte[] streamChunkPayload);

static native long streamChunkIteratorNext(long pointer);

static native void streamChunkIteratorClose(long pointer);
}
Loading