Skip to content

Commit

Permalink
feat(java-binding): support java binding on stream chunk (#8517)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Mar 15, 2023
1 parent 8be23f4 commit 8be4734
Show file tree
Hide file tree
Showing 20 changed files with 635 additions and 188 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions ci/scripts/java-binding-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@ cargo make ingest-data-and-run-java-binding

echo "--- Kill cluster"
cargo make ci-kill

echo "--- run stream chunk java binding"
cargo make run-java-binding-stream-chunk-demo
44 changes: 38 additions & 6 deletions java/com_risingwave_java_binding_Binding.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.risingwave.java.binding;

import static com.risingwave.java.binding.Utils.validateRow;

import com.risingwave.java.utils.MetaClient;
import com.risingwave.proto.Catalog.Table;
import com.risingwave.proto.Hummock.HummockVersion;
Expand All @@ -27,7 +29,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;

/** Hello world! */
public class Demo {
public class HummockReadDemo {
public static void main(String[] args) {
String objectStore = System.getenv("OBJECT_STORE");
String dbName = System.getenv("DB_NAME");
Expand Down Expand Up @@ -67,7 +69,7 @@ public static void main(String[] args) {
.addAllVnodeIds(vnodeList)
.build();

try (Iterator iter = new Iterator(readPlan)) {
try (HummockIterator iter = new HummockIterator(readPlan)) {
int count = 0;
while (true) {
try (KeyedRow row = iter.next()) {
Expand All @@ -92,43 +94,4 @@ public static void main(String[] args) {

scheduledThreadPool.shutdown();
}

static void validateRow(KeyedRow row) {
// The validation of row data are according to the data generation rule
// defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py
short rowIndex = row.getShort(0);
if (row.getInt(1) != rowIndex) {
throw new RuntimeException(
String.format("invalid int value: %s %s", row.getInt(1), rowIndex));
}
if (row.getLong(2) != rowIndex) {
throw new RuntimeException(
String.format("invalid long value: %s %s", row.getLong(2), rowIndex));
}
if (row.getFloat(3) != (float) rowIndex) {
throw new RuntimeException(
String.format("invalid float value: %s %s", row.getFloat(3), rowIndex));
}
if (row.getDouble(4) != (double) rowIndex) {
throw new RuntimeException(
String.format("invalid double value: %s %s", row.getDouble(4), rowIndex));
}
if (row.getBoolean(5) != (rowIndex % 3 == 0)) {
throw new RuntimeException(
String.format(
"invalid bool value: %s %s", row.getBoolean(5), (rowIndex % 3 == 0)));
}
if (!row.getString(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) {
throw new RuntimeException(
String.format(
"invalid string value: %s %s",
row.getString(6),
((Short) rowIndex).toString().repeat((rowIndex % 10) + 1)));
}
if (row.isNull(7) != (rowIndex % 5 == 0)) {
throw new RuntimeException(
String.format(
"invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.java.binding;

import static com.risingwave.java.binding.Utils.validateRow;

import java.io.IOException;

public class StreamChunkDemo {

public static void main(String[] args) throws IOException {
byte[] payload = System.in.readAllBytes();
try (StreamChunkIterator iter = new StreamChunkIterator(payload)) {
int count = 0;
while (true) {
try (StreamChunkRow row = iter.next()) {
if (row == null) {
break;
}
count += 1;
validateRow(row);
}
}
int expectedCount = 30000;
if (count != expectedCount) {
throw new RuntimeException(
String.format("row count is %s, should be %s", count, expectedCount));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.java.binding;

public class Utils {
public static void validateRow(BaseRow row) {
// The validation of row data are according to the data generation rule
// defined in ${REPO_ROOT}/src/java_binding/gen-demo-insert-data.py
short rowIndex = row.getShort(0);
if (row.getInt(1) != rowIndex) {
throw new RuntimeException(
String.format("invalid int value: %s %s", row.getInt(1), rowIndex));
}
if (row.getLong(2) != rowIndex) {
throw new RuntimeException(
String.format("invalid long value: %s %s", row.getLong(2), rowIndex));
}
if (row.getFloat(3) != (float) rowIndex) {
throw new RuntimeException(
String.format("invalid float value: %s %s", row.getFloat(3), rowIndex));
}
if (row.getDouble(4) != (double) rowIndex) {
throw new RuntimeException(
String.format("invalid double value: %s %s", row.getDouble(4), rowIndex));
}
if (row.getBoolean(5) != (rowIndex % 3 == 0)) {
throw new RuntimeException(
String.format(
"invalid bool value: %s %s", row.getBoolean(5), (rowIndex % 3 == 0)));
}
if (!row.getString(6).equals(((Short) rowIndex).toString().repeat((rowIndex % 10) + 1))) {
throw new RuntimeException(
String.format(
"invalid string value: %s %s",
row.getString(6),
((Short) rowIndex).toString().repeat((rowIndex % 10) + 1)));
}
if (row.isNull(7) != (rowIndex % 5 == 0)) {
throw new RuntimeException(
String.format(
"invalid isNull value: %s %s", row.isNull(7), (rowIndex % 5 == 0)));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.java.binding;

public class BaseRow implements AutoCloseable {
protected final long pointer;
private boolean isClosed;

protected BaseRow(long pointer) {
this.pointer = pointer;
this.isClosed = false;
}

public boolean isNull(int index) {
return Binding.rowIsNull(pointer, index);
}

public short getShort(int index) {
return Binding.rowGetInt16Value(pointer, index);
}

public int getInt(int index) {
return Binding.rowGetInt32Value(pointer, index);
}

public long getLong(int index) {
return Binding.rowGetInt64Value(pointer, index);
}

public float getFloat(int index) {
return Binding.rowGetFloatValue(pointer, index);
}

public double getDouble(int index) {
return Binding.rowGetDoubleValue(pointer, index);
}

public boolean getBoolean(int index) {
return Binding.rowGetBooleanValue(pointer, index);
}

public String getString(int index) {
return Binding.rowGetStringValue(pointer, index);
}

@Override
public void close() {
if (!isClosed) {
isClosed = true;
Binding.rowClose(pointer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,22 @@ public class Binding {

public static native int vnodeCount();

// iterator method
// hummock iterator method
// Return a pointer to the iterator
static native long iteratorNew(byte[] readPlan);
static native long hummockIteratorNew(byte[] readPlan);

// return a pointer to the next row
static native long iteratorNext(long pointer);
static native long hummockIteratorNext(long pointer);

// Since the underlying rust does not have garbage collection, we will have to manually call
// close on the iterator to release the iterator instance pointed by the pointer.
static native void iteratorClose(long pointer);
static native void hummockIteratorClose(long pointer);

// row method
static native byte[] rowGetKey(long pointer);

static native int rowGetOp(long pointer);

static native boolean rowIsNull(long pointer, int index);

static native short rowGetInt16Value(long pointer, int index);
Expand All @@ -54,4 +56,11 @@ public class Binding {
// Since the underlying rust does not have garbage collection, we will have to manually call
// close on the row to release the row instance pointed by the pointer.
static native void rowClose(long pointer);

// stream chunk iterator method
static native long streamChunkIteratorNew(byte[] streamChunkPayload);

static native long streamChunkIteratorNext(long pointer);

static native void streamChunkIteratorClose(long pointer);
}
Loading

0 comments on commit 8be4734

Please sign in to comment.