From d169f135d9a289076300b9a77fab4447307e7db7 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 16 Mar 2023 15:33:05 +0800 Subject: [PATCH 1/5] feat(connector-node): specify sink payload format in start sink --- .../assembly/scripts/start-service.sh | 2 +- .../connector/api/sink/SinkRow.java | 6 +-- .../python-client/integration_tests.py | 2 + .../risingwave/connector/Deserializer.java | 5 +- .../connector/JsonDeserializer.java | 13 +++-- .../connector/SinkStreamObserver.java | 48 +++++++++---------- .../connector/DeserializerTest.java | 7 ++- .../connector/SinkStreamObserverTest.java | 3 ++ proto/connector_service.proto | 6 +++ src/rpc_client/src/connector_client.rs | 1 + 10 files changed, 57 insertions(+), 36 deletions(-) diff --git a/java/connector-node/assembly/scripts/start-service.sh b/java/connector-node/assembly/scripts/start-service.sh index 75792b745762..483c95057032 100755 --- a/java/connector-node/assembly/scripts/start-service.sh +++ b/java/connector-node/assembly/scripts/start-service.sh @@ -15,7 +15,7 @@ while getopts ":h:p:" o; do done shift $((OPTIND-1)) -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +DIR="$( cd "$( dirname "$0" )" && pwd )" MAIN='com.risingwave.connector.ConnectorService' PORT=50051 diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java index 6eb64b6a9586..dcddfc07479b 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java @@ -17,9 +17,9 @@ import com.risingwave.proto.Data; public interface SinkRow { - public Object get(int index); + Object get(int index); - public Data.Op getOp(); + Data.Op getOp(); - public int size(); + int size(); } diff --git a/java/connector-node/python-client/integration_tests.py b/java/connector-node/python-client/integration_tests.py index a64e3a79a720..c8364983a600 100644 --- a/java/connector-node/python-client/integration_tests.py +++ b/java/connector-node/python-client/integration_tests.py @@ -46,6 +46,7 @@ def test_upsert_sink(type, prop, input_file): stub = connector_service_pb2_grpc.ConnectorServiceStub(channel) request_list = [ connector_service_pb2.SinkStreamRequest(start=connector_service_pb2.SinkStreamRequest.StartSink( + format=connector_service_pb2.SinkPayloadFormat.JSON, sink_config=connector_service_pb2.SinkConfig( sink_type=type, properties=prop, @@ -85,6 +86,7 @@ def test_sink(type, prop, input_file): stub = connector_service_pb2_grpc.ConnectorServiceStub(channel) request_list = [ connector_service_pb2.SinkStreamRequest(start=connector_service_pb2.SinkStreamRequest.StartSink( + format=connector_service_pb2.SinkPayloadFormat.JSON, sink_config=connector_service_pb2.SinkConfig( sink_type=type, properties=prop, diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/Deserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/Deserializer.java index 932fb599f5e3..5aca6a956e7f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/Deserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/Deserializer.java @@ -15,8 +15,11 @@ package com.risingwave.connector; import com.risingwave.connector.api.sink.SinkRow; +import com.risingwave.proto.ConnectorServiceProto; import java.util.Iterator; public interface Deserializer { - Iterator deserialize(Object payload); + Iterator deserialize(ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch); + + void close(); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java index a83a7013601e..3cea8fb72245 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java @@ -20,6 +20,7 @@ import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.ArraySinkrow; import com.risingwave.connector.api.sink.SinkRow; +import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload; import com.risingwave.proto.Data; import java.util.Iterator; @@ -33,13 +34,14 @@ public JsonDeserializer(TableSchema tableSchema) { } @Override - public Iterator deserialize(Object payload) { - if (!(payload instanceof JsonPayload)) { + public Iterator deserialize( + ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch) { + if (!writeBatch.hasJsonPayload()) { throw INVALID_ARGUMENT - .withDescription("expected JsonPayload, got " + payload.getClass().getName()) + .withDescription("expected JsonPayload, got " + writeBatch.getPayloadCase()) .asRuntimeException(); } - JsonPayload jsonPayload = (JsonPayload) payload; + JsonPayload jsonPayload = writeBatch.getJsonPayload(); return jsonPayload.getRowOpsList().stream() .map( rowOp -> { @@ -141,4 +143,7 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj .asRuntimeException(); } } + + @Override + public void close() {} } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java index 4d0ecfb59d7a..c7e8a8c5f992 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java @@ -63,7 +63,7 @@ public void onNext(ConnectorServiceProto.SinkStreamRequest sinkTask) { .withDescription("Sink is already initialized") .asRuntimeException(); } - bindSink(sinkTask.getStart().getSinkConfig()); + bindSink(sinkTask.getStart().getSinkConfig(), sinkTask.getStart().getFormat()); LOG.debug("Sink initialized"); responseObserver.onNext( ConnectorServiceProto.SinkResponse.newBuilder() @@ -123,26 +123,7 @@ public void onNext(ConnectorServiceProto.SinkStreamRequest sinkTask) { } Iterator rows; - switch (sinkTask.getWrite().getPayloadCase()) { - case JSON_PAYLOAD: - if (deserializer == null) { - deserializer = new JsonDeserializer(tableSchema); - } - - if (deserializer instanceof JsonDeserializer) { - rows = deserializer.deserialize(sinkTask.getWrite().getJsonPayload()); - } else { - throw INTERNAL.withDescription( - "invalid payload type: expected JSON, got " - + deserializer.getClass().getName()) - .asRuntimeException(); - } - break; - default: - throw INVALID_ARGUMENT - .withDescription("invalid payload type") - .asRuntimeException(); - } + rows = deserializer.deserialize(sinkTask.getWrite()); sink.write(new MonitoredRowIterator(rows)); currentBatchId = sinkTask.getWrite().getBatchId(); @@ -193,25 +174,40 @@ public void onNext(ConnectorServiceProto.SinkStreamRequest sinkTask) { @Override public void onError(Throwable throwable) { LOG.error("sink task error: ", throwable); - if (sink != null) { - sink.drop(); - } + cleanup(); responseObserver.onError(throwable); } @Override public void onCompleted() { LOG.debug("sink task completed"); + cleanup(); + responseObserver.onCompleted(); + } + + private void cleanup() { if (sink != null) { sink.drop(); } - responseObserver.onCompleted(); + if (deserializer != null) { + deserializer.close(); + } } - private void bindSink(SinkConfig sinkConfig) { + private void bindSink(SinkConfig sinkConfig, ConnectorServiceProto.SinkPayloadFormat format) { tableSchema = TableSchema.fromProto(sinkConfig.getTableSchema()); SinkFactory sinkFactory = SinkUtils.getSinkFactory(sinkConfig.getSinkType()); sink = sinkFactory.create(tableSchema, sinkConfig.getPropertiesMap()); + switch (format) { + case Unspecified: + case UNRECOGNIZED: + throw INVALID_ARGUMENT + .withDescription("should specify payload format in request") + .asRuntimeException(); + case JSON: + deserializer = new JsonDeserializer(tableSchema); + break; + } ConnectorNodeMetrics.incActiveConnections(sinkConfig.getSinkType(), "node1"); } } diff --git a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/DeserializerTest.java b/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/DeserializerTest.java index 54fe000aa2d1..2ee86ff95ad7 100644 --- a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/DeserializerTest.java +++ b/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/DeserializerTest.java @@ -16,6 +16,7 @@ import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkRow; +import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload; import com.risingwave.proto.Data; import junit.framework.TestCase; @@ -31,7 +32,11 @@ public void testJsonDeserializer() { .setLine("{\"id\": 1, \"name\": \"John\"}") .build()) .build(); - SinkRow outcome = deserializer.deserialize(jsonPayload).next(); + ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch = + ConnectorServiceProto.SinkStreamRequest.WriteBatch.newBuilder() + .setJsonPayload(jsonPayload) + .build(); + SinkRow outcome = deserializer.deserialize(writeBatch).next(); assertEquals(outcome.get(0), 1); assertEquals(outcome.get(1), "John"); } diff --git a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/SinkStreamObserverTest.java b/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/SinkStreamObserverTest.java index ddd310d9b8f7..4fba56f74339 100644 --- a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/SinkStreamObserverTest.java +++ b/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/SinkStreamObserverTest.java @@ -92,6 +92,7 @@ public void testOnNext_syncValidation() { .setStart( ConnectorServiceProto.SinkStreamRequest.StartSink.newBuilder() .setSinkConfig(fileSinkConfig) + .setFormat(ConnectorServiceProto.SinkPayloadFormat.JSON) .build()) .build(); ConnectorServiceProto.SinkStreamRequest firstSync = @@ -133,6 +134,7 @@ public void testOnNext_startEpochValidation() { .setStart( ConnectorServiceProto.SinkStreamRequest.StartSink.newBuilder() .setSinkConfig(fileSinkConfig) + .setFormat(ConnectorServiceProto.SinkPayloadFormat.JSON) .build()) .build(); ConnectorServiceProto.SinkStreamRequest firstSync = @@ -197,6 +199,7 @@ public void testOnNext_writeValidation() { ConnectorServiceProto.SinkStreamRequest.newBuilder() .setStart( ConnectorServiceProto.SinkStreamRequest.StartSink.newBuilder() + .setFormat(ConnectorServiceProto.SinkPayloadFormat.JSON) .setSinkConfig(fileSinkConfig)) .build(); ConnectorServiceProto.SinkStreamRequest firstStartEpoch = diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 9cd736517eee..52a27f56959b 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -28,9 +28,15 @@ message SinkConfig { TableSchema table_schema = 3; } +enum SinkPayloadFormat { + Unspecified = 0; + JSON = 1; +} + message SinkStreamRequest { message StartSink { SinkConfig sink_config = 1; + SinkPayloadFormat format = 2; } message WriteBatch { diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index b10992e3e4e3..00bb068796a6 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -126,6 +126,7 @@ impl ConnectorClient { request_sender .send(SinkStreamRequest { request: Some(SinkRequest::Start(StartSink { + format: SinkPayloadFormat::Json as i32, sink_config: Some(SinkConfig { sink_type, properties, From 4c78b3143ea6371eaa0dd8947cb2afa0e7a41953 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 16 Mar 2023 16:04:05 +0800 Subject: [PATCH 2/5] fix ci --- dashboard/proto/gen/connector_service.ts | 45 ++++++++++++++++++- .../connector/SinkStreamObserver.java | 2 +- proto/connector_service.proto | 2 +- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/dashboard/proto/gen/connector_service.ts b/dashboard/proto/gen/connector_service.ts index 7b3b6b4295d7..17d86fa6eb14 100644 --- a/dashboard/proto/gen/connector_service.ts +++ b/dashboard/proto/gen/connector_service.ts @@ -10,6 +10,41 @@ import { export const protobufPackage = "connector_service"; +export const SinkPayloadFormat = { + UNSPECIFIED_FORMAT: "UNSPECIFIED_FORMAT", + JSON: "JSON", + UNRECOGNIZED: "UNRECOGNIZED", +} as const; + +export type SinkPayloadFormat = typeof SinkPayloadFormat[keyof typeof SinkPayloadFormat]; + +export function sinkPayloadFormatFromJSON(object: any): SinkPayloadFormat { + switch (object) { + case 0: + case "UNSPECIFIED_FORMAT": + return SinkPayloadFormat.UNSPECIFIED_FORMAT; + case 1: + case "JSON": + return SinkPayloadFormat.JSON; + case -1: + case "UNRECOGNIZED": + default: + return SinkPayloadFormat.UNRECOGNIZED; + } +} + +export function sinkPayloadFormatToJSON(object: SinkPayloadFormat): string { + switch (object) { + case SinkPayloadFormat.UNSPECIFIED_FORMAT: + return "UNSPECIFIED_FORMAT"; + case SinkPayloadFormat.JSON: + return "JSON"; + case SinkPayloadFormat.UNRECOGNIZED: + default: + return "UNRECOGNIZED"; + } +} + export const SourceType = { UNSPECIFIED: "UNSPECIFIED", MYSQL: "MYSQL", @@ -86,6 +121,7 @@ export interface SinkStreamRequest { export interface SinkStreamRequest_StartSink { sinkConfig: SinkConfig | undefined; + format: SinkPayloadFormat; } export interface SinkStreamRequest_WriteBatch { @@ -405,18 +441,22 @@ export const SinkStreamRequest = { }; function createBaseSinkStreamRequest_StartSink(): SinkStreamRequest_StartSink { - return { sinkConfig: undefined }; + return { sinkConfig: undefined, format: SinkPayloadFormat.UNSPECIFIED_FORMAT }; } export const SinkStreamRequest_StartSink = { fromJSON(object: any): SinkStreamRequest_StartSink { - return { sinkConfig: isSet(object.sinkConfig) ? SinkConfig.fromJSON(object.sinkConfig) : undefined }; + return { + sinkConfig: isSet(object.sinkConfig) ? SinkConfig.fromJSON(object.sinkConfig) : undefined, + format: isSet(object.format) ? sinkPayloadFormatFromJSON(object.format) : SinkPayloadFormat.UNSPECIFIED_FORMAT, + }; }, toJSON(message: SinkStreamRequest_StartSink): unknown { const obj: any = {}; message.sinkConfig !== undefined && (obj.sinkConfig = message.sinkConfig ? SinkConfig.toJSON(message.sinkConfig) : undefined); + message.format !== undefined && (obj.format = sinkPayloadFormatToJSON(message.format)); return obj; }, @@ -425,6 +465,7 @@ export const SinkStreamRequest_StartSink = { message.sinkConfig = (object.sinkConfig !== undefined && object.sinkConfig !== null) ? SinkConfig.fromPartial(object.sinkConfig) : undefined; + message.format = object.format ?? SinkPayloadFormat.UNSPECIFIED_FORMAT; return message; }, }; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java index c7e8a8c5f992..d3770524e47c 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java @@ -199,7 +199,7 @@ private void bindSink(SinkConfig sinkConfig, ConnectorServiceProto.SinkPayloadFo SinkFactory sinkFactory = SinkUtils.getSinkFactory(sinkConfig.getSinkType()); sink = sinkFactory.create(tableSchema, sinkConfig.getPropertiesMap()); switch (format) { - case Unspecified: + case UNSPECIFIED_FORMAT: case UNRECOGNIZED: throw INVALID_ARGUMENT .withDescription("should specify payload format in request") diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 52a27f56959b..3c9a0d528dce 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -29,7 +29,7 @@ message SinkConfig { } enum SinkPayloadFormat { - Unspecified = 0; + UNSPECIFIED_FORMAT = 0; JSON = 1; } From 2d063a85876cb7514ce88d3ea31b14f5a242cff3 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 16 Mar 2023 17:09:02 +0800 Subject: [PATCH 3/5] impl Closeable for iterator and sink row --- .../{ArraySinkrow.java => ArraySinkRow.java} | 7 +- .../connector/api/sink/CloseableIterator.java | 5 + .../connector/api/sink}/Deserializer.java | 9 +- .../connector/api/sink/SinkRow.java | 2 +- .../api/sink/TrivialCloseIterator.java | 25 ++++ .../com/risingwave/connector/FileSink.java | 41 ++++--- .../connector/JsonDeserializer.java | 55 +++++---- .../com/risingwave/connector/PrintSink.java | 19 +-- .../connector/SinkStreamObserver.java | 17 +-- .../risingwave/connector/FileSinkTest.java | 10 +- .../risingwave/connector/PrintSinkTest.java | 10 +- .../risingwave/connector/DeltaLakeSink.java | 39 ++++--- .../connector/DeltaLakeLocalSinkTest.java | 10 +- .../com/risingwave/connector/IcebergSink.java | 109 +++++++++--------- .../connector/UpsertIcebergSink.java | 95 +++++++-------- .../connector/IcebergSinkLocalTest.java | 10 +- .../connector/IcebergSinkPartitionTest.java | 10 +- .../risingwave/connector/SinkRowMapTest.java | 22 ++-- .../connector/UpsertIcebergSinkLocalTest.java | 22 ++-- .../UpsertIcebergSinkPartitionTest.java | 22 ++-- .../com/risingwave/connector/JDBCSink.java | 33 +++--- .../risingwave/connector/JDBCSinkTest.java | 16 +-- proto/connector_service.proto | 2 +- 23 files changed, 319 insertions(+), 271 deletions(-) rename java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/{ArraySinkrow.java => ArraySinkRow.java} (86%) create mode 100644 java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CloseableIterator.java rename java/connector-node/{risingwave-connector-service/src/main/java/com/risingwave/connector => connector-api/src/main/java/com/risingwave/connector/api/sink}/Deserializer.java (74%) create mode 100644 java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/TrivialCloseIterator.java diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkrow.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkRow.java similarity index 86% rename from java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkrow.java rename to java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkRow.java index 983f425c2893..e443a7d3e286 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkrow.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkRow.java @@ -16,11 +16,11 @@ import com.risingwave.proto.Data; -public class ArraySinkrow implements SinkRow { +public class ArraySinkRow implements SinkRow { public final Object[] values; public final Data.Op op; - public ArraySinkrow(Data.Op op, Object... value) { + public ArraySinkRow(Data.Op op, Object... value) { this.op = op; this.values = value; } @@ -39,4 +39,7 @@ public Data.Op getOp() { public int size() { return values.length; } + + @Override + public void close() throws Exception {} } diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CloseableIterator.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CloseableIterator.java new file mode 100644 index 000000000000..bea6348bd1bf --- /dev/null +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CloseableIterator.java @@ -0,0 +1,5 @@ +package com.risingwave.connector.api.sink; + +import java.util.Iterator; + +public interface CloseableIterator extends AutoCloseable, Iterator {} diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/Deserializer.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/Deserializer.java similarity index 74% rename from java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/Deserializer.java rename to java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/Deserializer.java index 5aca6a956e7f..36bd9386f51e 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/Deserializer.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/Deserializer.java @@ -12,14 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package com.risingwave.connector; +package com.risingwave.connector.api.sink; -import com.risingwave.connector.api.sink.SinkRow; import com.risingwave.proto.ConnectorServiceProto; -import java.util.Iterator; public interface Deserializer { - Iterator deserialize(ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch); - - void close(); + CloseableIterator deserialize( + ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch); } diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java index dcddfc07479b..0ae0aa3facf7 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java @@ -16,7 +16,7 @@ import com.risingwave.proto.Data; -public interface SinkRow { +public interface SinkRow extends AutoCloseable { Object get(int index); Data.Op getOp(); diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/TrivialCloseIterator.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/TrivialCloseIterator.java new file mode 100644 index 000000000000..40bc7f3e3fc1 --- /dev/null +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/TrivialCloseIterator.java @@ -0,0 +1,25 @@ +package com.risingwave.connector.api.sink; + +import java.util.Iterator; + +public class TrivialCloseIterator implements CloseableIterator { + + private final Iterator inner; + + public TrivialCloseIterator(Iterator inner) { + this.inner = inner; + } + + @Override + public void close() throws Exception {} + + @Override + public boolean hasNext() { + return inner.hasNext(); + } + + @Override + public E next() { + return inner.next(); + } +} diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java index 1072bfa9c15c..085ae800bdb0 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java @@ -57,25 +57,28 @@ public FileSink(String sinkPath, TableSchema tableSchema) { @Override public void write(Iterator rows) { while (rows.hasNext()) { - SinkRow row = rows.next(); - switch (row.getOp()) { - case INSERT: - String buf = - new Gson() - .toJson( - IntStream.range(0, row.size()) - .mapToObj(row::get) - .toArray()); - try { - sinkWriter.write(buf + System.lineSeparator()); - } catch (IOException e) { - throw INTERNAL.withCause(e).asRuntimeException(); - } - break; - default: - throw UNIMPLEMENTED - .withDescription("unsupported operation: " + row.getOp()) - .asRuntimeException(); + try (SinkRow row = rows.next()) { + switch (row.getOp()) { + case INSERT: + String buf = + new Gson() + .toJson( + IntStream.range(0, row.size()) + .mapToObj(row::get) + .toArray()); + try { + sinkWriter.write(buf + System.lineSeparator()); + } catch (IOException e) { + throw INTERNAL.withCause(e).asRuntimeException(); + } + break; + default: + throw UNIMPLEMENTED + .withDescription("unsupported operation: " + row.getOp()) + .asRuntimeException(); + } + } catch (Exception e) { + throw new RuntimeException(e); } } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java index 3cea8fb72245..097872dcef75 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/JsonDeserializer.java @@ -18,12 +18,10 @@ import com.google.gson.Gson; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkrow; -import com.risingwave.connector.api.sink.SinkRow; +import com.risingwave.connector.api.sink.*; import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.SinkStreamRequest.WriteBatch.JsonPayload; import com.risingwave.proto.Data; -import java.util.Iterator; import java.util.Map; public class JsonDeserializer implements Deserializer { @@ -34,7 +32,7 @@ public JsonDeserializer(TableSchema tableSchema) { } @Override - public Iterator deserialize( + public CloseableIterator deserialize( ConnectorServiceProto.SinkStreamRequest.WriteBatch writeBatch) { if (!writeBatch.hasJsonPayload()) { throw INVALID_ARGUMENT @@ -42,27 +40,31 @@ public Iterator deserialize( .asRuntimeException(); } JsonPayload jsonPayload = writeBatch.getJsonPayload(); - return jsonPayload.getRowOpsList().stream() - .map( - rowOp -> { - Map columnValues = new Gson().fromJson(rowOp.getLine(), Map.class); - Object[] values = new Object[columnValues.size()]; - for (String columnName : tableSchema.getColumnNames()) { - if (!columnValues.containsKey(columnName)) { - throw INVALID_ARGUMENT - .withDescription( - "column " + columnName + " not found in json") - .asRuntimeException(); - } - Data.DataType.TypeName typeName = - tableSchema.getColumnType(columnName); - values[tableSchema.getColumnIndex(columnName)] = - validateJsonDataTypes( - typeName, columnValues.get(columnName)); - } - return (SinkRow) new ArraySinkrow(rowOp.getOpType(), values); - }) - .iterator(); + return new TrivialCloseIterator<>( + jsonPayload.getRowOpsList().stream() + .map( + rowOp -> { + Map columnValues = + new Gson().fromJson(rowOp.getLine(), Map.class); + Object[] values = new Object[columnValues.size()]; + for (String columnName : tableSchema.getColumnNames()) { + if (!columnValues.containsKey(columnName)) { + throw INVALID_ARGUMENT + .withDescription( + "column " + + columnName + + " not found in json") + .asRuntimeException(); + } + Data.DataType.TypeName typeName = + tableSchema.getColumnType(columnName); + values[tableSchema.getColumnIndex(columnName)] = + validateJsonDataTypes( + typeName, columnValues.get(columnName)); + } + return (SinkRow) new ArraySinkRow(rowOp.getOpType(), values); + }) + .iterator()); } private static Long castLong(Object value) { @@ -143,7 +145,4 @@ private static Object validateJsonDataTypes(Data.DataType.TypeName typeName, Obj .asRuntimeException(); } } - - @Override - public void close() {} } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/PrintSink.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/PrintSink.java index eb582c121c97..3c9483b844a3 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/PrintSink.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/PrintSink.java @@ -40,13 +40,18 @@ public PrintSink(Map properties, TableSchema tableSchema, PrintS @Override public void write(Iterator rows) { while (rows.hasNext()) { - SinkRow row = rows.next(); - out.println( - "PrintSink: " - + row.getOp().name() - + " values " - + Arrays.toString( - IntStream.range(0, row.size()).mapToObj(row::get).toArray())); + try (SinkRow row = rows.next()) { + out.println( + "PrintSink: " + + row.getOp().name() + + " values " + + Arrays.toString( + IntStream.range(0, row.size()) + .mapToObj(row::get) + .toArray())); + } catch (Exception e) { + throw new RuntimeException(e); + } } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java index d3770524e47c..9d64211dd157 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkStreamObserver.java @@ -17,9 +17,7 @@ import static io.grpc.Status.*; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.SinkBase; -import com.risingwave.connector.api.sink.SinkFactory; -import com.risingwave.connector.api.sink.SinkRow; +import com.risingwave.connector.api.sink.*; import com.risingwave.metrics.ConnectorNodeMetrics; import com.risingwave.metrics.MonitoredRowIterator; import com.risingwave.proto.ConnectorServiceProto; @@ -28,7 +26,6 @@ import com.risingwave.proto.ConnectorServiceProto.SinkResponse.SyncResponse; import com.risingwave.proto.ConnectorServiceProto.SinkResponse.WriteResponse; import io.grpc.stub.StreamObserver; -import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -122,9 +119,10 @@ public void onNext(ConnectorServiceProto.SinkStreamRequest sinkTask) { .asRuntimeException(); } - Iterator rows; - rows = deserializer.deserialize(sinkTask.getWrite()); - sink.write(new MonitoredRowIterator(rows)); + try (CloseableIterator rowIter = + deserializer.deserialize(sinkTask.getWrite())) { + sink.write(new MonitoredRowIterator(rowIter)); + } currentBatchId = sinkTask.getWrite().getBatchId(); LOG.debug( @@ -189,9 +187,6 @@ private void cleanup() { if (sink != null) { sink.drop(); } - if (deserializer != null) { - deserializer.close(); - } } private void bindSink(SinkConfig sinkConfig, ConnectorServiceProto.SinkPayloadFormat format) { @@ -199,7 +194,7 @@ private void bindSink(SinkConfig sinkConfig, ConnectorServiceProto.SinkPayloadFo SinkFactory sinkFactory = SinkUtils.getSinkFactory(sinkConfig.getSinkType()); sink = sinkFactory.create(tableSchema, sinkConfig.getPropertiesMap()); switch (format) { - case UNSPECIFIED_FORMAT: + case FORMAT_UNSPECIFIED: case UNRECOGNIZED: throw INVALID_ARGUMENT .withDescription("should specify payload format in request") diff --git a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java b/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java index 808480ec4705..1ec9eda586c9 100644 --- a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java +++ b/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java @@ -19,7 +19,7 @@ import com.google.common.collect.Iterators; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkrow; +import com.risingwave.connector.api.sink.ArraySinkRow; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -40,7 +40,7 @@ public void testSync() throws IOException { Path file = Paths.get(filePath); try { - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 1, "Alice"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 1, "Alice"))); sink.sync(); String[] expectedA = {"[1,\"Alice\"]"}; String[] actualA = Files.lines(file).toArray(String[]::new); @@ -48,7 +48,7 @@ public void testSync() throws IOException { IntStream.range(0, expectedA.length) .forEach(i -> assertEquals(expectedA[i], actualA[i])); - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 2, "Bob"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 2, "Bob"))); String[] expectedB = new String[] {"[1,\"Alice\"]"}; String[] actualB = Files.lines(file).toArray(String[]::new); assertEquals(expectedB.length, actualB.length); @@ -84,8 +84,8 @@ public void testWrite() throws IOException { String[] expected = {"[1,\"Alice\"]", "[2,\"Bob\"]"}; sink.write( Iterators.forArray( - new ArraySinkrow(Op.INSERT, 1, "Alice"), - new ArraySinkrow(Op.INSERT, 2, "Bob"))); + new ArraySinkRow(Op.INSERT, 1, "Alice"), + new ArraySinkRow(Op.INSERT, 2, "Bob"))); sink.sync(); String[] actual = Files.lines(Paths.get(filePath)).toArray(String[]::new); diff --git a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/PrintSinkTest.java b/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/PrintSinkTest.java index 2d4092ff2873..5d3e8a9c4cf0 100644 --- a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/PrintSinkTest.java +++ b/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/PrintSinkTest.java @@ -18,7 +18,7 @@ import com.google.common.collect.Iterators; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkrow; +import com.risingwave.connector.api.sink.ArraySinkRow; import java.io.PrintStream; import java.util.HashMap; import java.util.Iterator; @@ -66,10 +66,10 @@ public void print(String x) { sink.write( Iterators.forArray( - new ArraySinkrow(Op.INSERT, 1, "Alice"), - new ArraySinkrow(Op.UPDATE_DELETE, 1, "Alice"), - new ArraySinkrow(Op.UPDATE_INSERT, 2, "Bob"), - new ArraySinkrow(Op.DELETE, 2, "Bob"))); + new ArraySinkRow(Op.INSERT, 1, "Alice"), + new ArraySinkRow(Op.UPDATE_DELETE, 1, "Alice"), + new ArraySinkRow(Op.UPDATE_INSERT, 2, "Bob"), + new ArraySinkRow(Op.DELETE, 2, "Bob"))); if (!writeCalled[0]) { fail("write batch did not print messages"); } diff --git a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java index a0a152e931f0..f4ceded0a055 100644 --- a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java +++ b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java @@ -75,24 +75,27 @@ public void write(Iterator rows) { } } while (rows.hasNext()) { - SinkRow row = rows.next(); - switch (row.getOp()) { - case INSERT: - GenericRecord record = new GenericData.Record(this.sinkSchema); - for (int i = 0; i < this.sinkSchema.getFields().size(); i++) { - record.put(i, row.get(i)); - } - try { - this.parquetWriter.write(record); - this.numOutputRows += 1; - } catch (IOException ioException) { - throw INTERNAL.withCause(ioException).asRuntimeException(); - } - break; - default: - throw UNIMPLEMENTED - .withDescription("unsupported operation: " + row.getOp()) - .asRuntimeException(); + try (SinkRow row = rows.next()) { + switch (row.getOp()) { + case INSERT: + GenericRecord record = new GenericData.Record(this.sinkSchema); + for (int i = 0; i < this.sinkSchema.getFields().size(); i++) { + record.put(i, row.get(i)); + } + try { + this.parquetWriter.write(record); + this.numOutputRows += 1; + } catch (IOException ioException) { + throw INTERNAL.withCause(ioException).asRuntimeException(); + } + break; + default: + throw UNIMPLEMENTED + .withDescription("unsupported operation: " + row.getOp()) + .asRuntimeException(); + } + } catch (Exception e) { + throw new RuntimeException(e); } } } diff --git a/java/connector-node/risingwave-sink-deltalake/src/test/java/com/risingwave/connector/DeltaLakeLocalSinkTest.java b/java/connector-node/risingwave-sink-deltalake/src/test/java/com/risingwave/connector/DeltaLakeLocalSinkTest.java index 36a2ed65d97a..a3c614bdac30 100644 --- a/java/connector-node/risingwave-sink-deltalake/src/test/java/com/risingwave/connector/DeltaLakeLocalSinkTest.java +++ b/java/connector-node/risingwave-sink-deltalake/src/test/java/com/risingwave/connector/DeltaLakeLocalSinkTest.java @@ -20,7 +20,7 @@ import com.google.common.collect.Iterators; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkrow; +import com.risingwave.connector.api.sink.ArraySinkRow; import io.delta.standalone.DeltaLog; import java.io.IOException; import java.nio.file.Files; @@ -66,8 +66,8 @@ public void testWrite() throws IOException { sink.write( Iterators.forArray( - new ArraySinkrow(Op.INSERT, 1, "Alice"), - new ArraySinkrow(Op.INSERT, 2, "Bob"))); + new ArraySinkRow(Op.INSERT, 1, "Alice"), + new ArraySinkRow(Op.INSERT, 2, "Bob"))); sink.sync(); List rows = List.of(RowFactory.create(1, "Alice"), RowFactory.create(2, "Bob")); @@ -93,14 +93,14 @@ public void testSync() throws IOException { createStructField("name", StringType, false), }); - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 1, "Alice"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 1, "Alice"))); validateTableWithSpark(location, List.of(), schema); sink.sync(); List rows = List.of(RowFactory.create(1, "Alice")); validateTableWithSpark(location, rows, schema); - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 2, "Bob"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 2, "Bob"))); sink.sync(); rows = List.of(RowFactory.create(1, "Alice"), RowFactory.create(2, "Bob")); validateTableWithSpark(location, rows, schema); diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSink.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSink.java index a81d7aca9a7c..ab7965697b7a 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSink.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSink.java @@ -71,60 +71,65 @@ public IcebergSink( @Override public void write(Iterator rows) { while (rows.hasNext()) { - SinkRow row = rows.next(); - switch (row.getOp()) { - case INSERT: - Record record = GenericRecord.create(rowSchema); - if (row.size() != getTableSchema().getColumnNames().length) { - throw INTERNAL.withDescription("row values do not match table schema") - .asRuntimeException(); - } - for (int i = 0; i < rowSchema.columns().size(); i++) { - record.set(i, row.get(i)); - } - PartitionKey partitionKey = - new PartitionKey( - transaction.table().spec(), transaction.table().schema()); - partitionKey.partition(record); - DataWriter dataWriter; - if (dataWriterMap.containsKey(partitionKey)) { - dataWriter = dataWriterMap.get(partitionKey); - } else { - try { - String filename = fileFormat.addExtension(UUID.randomUUID().toString()); - OutputFile outputFile = - transaction - .table() - .io() - .newOutputFile( - transaction.table().location() - + "/data/" - + transaction - .table() - .spec() - .partitionToPath(partitionKey) - + "/" - + filename); - dataWriter = - Parquet.writeData(outputFile) - .schema(rowSchema) - .withSpec(transaction.table().spec()) - .withPartition(partitionKey) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .build(); - } catch (Exception e) { - throw INTERNAL.withDescription("failed to create dataWriter") + try (SinkRow row = rows.next()) { + switch (row.getOp()) { + case INSERT: + Record record = GenericRecord.create(rowSchema); + if (row.size() != getTableSchema().getColumnNames().length) { + throw INTERNAL.withDescription("row values do not match table schema") .asRuntimeException(); } - dataWriterMap.put(partitionKey, dataWriter); - } - dataWriter.write(record); - break; - default: - throw UNIMPLEMENTED - .withDescription("unsupported operation: " + row.getOp()) - .asRuntimeException(); + for (int i = 0; i < rowSchema.columns().size(); i++) { + record.set(i, row.get(i)); + } + PartitionKey partitionKey = + new PartitionKey( + transaction.table().spec(), transaction.table().schema()); + partitionKey.partition(record); + DataWriter dataWriter; + if (dataWriterMap.containsKey(partitionKey)) { + dataWriter = dataWriterMap.get(partitionKey); + } else { + try { + String filename = + fileFormat.addExtension(UUID.randomUUID().toString()); + OutputFile outputFile = + transaction + .table() + .io() + .newOutputFile( + transaction.table().location() + + "/data/" + + transaction + .table() + .spec() + .partitionToPath( + partitionKey) + + "/" + + filename); + dataWriter = + Parquet.writeData(outputFile) + .schema(rowSchema) + .withSpec(transaction.table().spec()) + .withPartition(partitionKey) + .createWriterFunc(GenericParquetWriter::buildWriter) + .overwrite() + .build(); + } catch (Exception e) { + throw INTERNAL.withDescription("failed to create dataWriter") + .asRuntimeException(); + } + dataWriterMap.put(partitionKey, dataWriter); + } + dataWriter.write(record); + break; + default: + throw UNIMPLEMENTED + .withDescription("unsupported operation: " + row.getOp()) + .asRuntimeException(); + } + } catch (Exception e) { + throw new RuntimeException(e); } } } diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSink.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSink.java index 65d4e9de5dcf..a0522602a4c3 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSink.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSink.java @@ -155,52 +155,57 @@ private List> getKeyFromRow(SinkRow row) { @Override public void write(Iterator rows) { while (rows.hasNext()) { - SinkRow row = rows.next(); - if (row.size() != getTableSchema().getColumnNames().length) { - throw Status.FAILED_PRECONDITION - .withDescription("row values do not match table schema") - .asRuntimeException(); - } - Record record = newRecord(rowSchema, row); - PartitionKey partitionKey = - new PartitionKey(transaction.table().spec(), transaction.table().schema()); - partitionKey.partition(record); - SinkRowMap sinkRowMap; - if (sinkRowMapByPartition.containsKey(partitionKey)) { - sinkRowMap = sinkRowMapByPartition.get(partitionKey); - } else { - sinkRowMap = new SinkRowMap(); - sinkRowMapByPartition.put(partitionKey, sinkRowMap); - } - switch (row.getOp()) { - case INSERT: - sinkRowMap.insert(getKeyFromRow(row), row); - break; - case DELETE: - sinkRowMap.delete(getKeyFromRow(row), row); - break; - case UPDATE_DELETE: - if (updateBufferExists) { - throw Status.FAILED_PRECONDITION - .withDescription("an UPDATE_INSERT should precede an UPDATE_DELETE") - .asRuntimeException(); - } - sinkRowMap.delete(getKeyFromRow(row), row); - updateBufferExists = true; - break; - case UPDATE_INSERT: - if (!updateBufferExists) { - throw Status.FAILED_PRECONDITION - .withDescription("an UPDATE_INSERT should precede an UPDATE_DELETE") - .asRuntimeException(); - } - sinkRowMap.insert(getKeyFromRow(row), row); - updateBufferExists = false; - break; - default: - throw UNIMPLEMENTED - .withDescription("unsupported operation: " + row.getOp()) + try (SinkRow row = rows.next()) { + if (row.size() != getTableSchema().getColumnNames().length) { + throw Status.FAILED_PRECONDITION + .withDescription("row values do not match table schema") .asRuntimeException(); + } + Record record = newRecord(rowSchema, row); + PartitionKey partitionKey = + new PartitionKey(transaction.table().spec(), transaction.table().schema()); + partitionKey.partition(record); + SinkRowMap sinkRowMap; + if (sinkRowMapByPartition.containsKey(partitionKey)) { + sinkRowMap = sinkRowMapByPartition.get(partitionKey); + } else { + sinkRowMap = new SinkRowMap(); + sinkRowMapByPartition.put(partitionKey, sinkRowMap); + } + switch (row.getOp()) { + case INSERT: + sinkRowMap.insert(getKeyFromRow(row), row); + break; + case DELETE: + sinkRowMap.delete(getKeyFromRow(row), row); + break; + case UPDATE_DELETE: + if (updateBufferExists) { + throw Status.FAILED_PRECONDITION + .withDescription( + "an UPDATE_INSERT should precede an UPDATE_DELETE") + .asRuntimeException(); + } + sinkRowMap.delete(getKeyFromRow(row), row); + updateBufferExists = true; + break; + case UPDATE_INSERT: + if (!updateBufferExists) { + throw Status.FAILED_PRECONDITION + .withDescription( + "an UPDATE_INSERT should precede an UPDATE_DELETE") + .asRuntimeException(); + } + sinkRowMap.insert(getKeyFromRow(row), row); + updateBufferExists = false; + break; + default: + throw UNIMPLEMENTED + .withDescription("unsupported operation: " + row.getOp()) + .asRuntimeException(); + } + } catch (Exception e) { + throw new RuntimeException(e); } } } diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkLocalTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkLocalTest.java index 7652ba29ac18..78860f0801ce 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkLocalTest.java +++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkLocalTest.java @@ -21,7 +21,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkrow; +import com.risingwave.connector.api.sink.ArraySinkRow; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -115,7 +115,7 @@ public void testSync() throws IOException { FileFormat.PARQUET); try { - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 1, "Alice"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 1, "Alice"))); sink.sync(); Record record1 = GenericRecord.create(icebergTableSchema); @@ -125,7 +125,7 @@ public void testSync() throws IOException { validateTableWithIceberg(expected); validateTableWithSpark(expected); - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 2, "Bob"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 2, "Bob"))); validateTableWithIceberg(expected); validateTableWithSpark(expected); @@ -160,8 +160,8 @@ public void testWrite() throws IOException { try { sink.write( Iterators.forArray( - new ArraySinkrow(Op.INSERT, 1, "Alice"), - new ArraySinkrow(Op.INSERT, 2, "Bob"))); + new ArraySinkRow(Op.INSERT, 1, "Alice"), + new ArraySinkRow(Op.INSERT, 2, "Bob"))); sink.sync(); Record record1 = GenericRecord.create(icebergTableSchema); diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkPartitionTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkPartitionTest.java index b2e7ad7a9bda..54f2d47f154a 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkPartitionTest.java +++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkPartitionTest.java @@ -22,7 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkrow; +import com.risingwave.connector.api.sink.ArraySinkRow; import com.risingwave.proto.Data; import java.io.IOException; import java.nio.file.Files; @@ -134,7 +134,7 @@ public void testSync() throws IOException { FileFormat.PARQUET); try { - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 1, "Alice", "aaa"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 1, "Alice", "aaa"))); sink.sync(); Record record1 = GenericRecord.create(icebergTableSchema); @@ -145,7 +145,7 @@ public void testSync() throws IOException { validateTableWithIceberg(expected); validateTableWithSpark(expected); - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 2, "Bob", "bbb"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 2, "Bob", "bbb"))); validateTableWithIceberg(expected); validateTableWithSpark(expected); @@ -181,8 +181,8 @@ public void testWrite() throws IOException { try { sink.write( Iterators.forArray( - new ArraySinkrow(Op.INSERT, 1, "Alice", "aaa"), - new ArraySinkrow(Op.INSERT, 2, "Bob", "bbb"))); + new ArraySinkRow(Op.INSERT, 1, "Alice", "aaa"), + new ArraySinkRow(Op.INSERT, 2, "Bob", "bbb"))); sink.sync(); Record record1 = GenericRecord.create(icebergTableSchema); diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/SinkRowMapTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/SinkRowMapTest.java index 5b6fc14af23b..f4cdd8d988c7 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/SinkRowMapTest.java +++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/SinkRowMapTest.java @@ -16,7 +16,7 @@ import static org.junit.Assert.assertEquals; -import com.risingwave.connector.api.sink.ArraySinkrow; +import com.risingwave.connector.api.sink.ArraySinkRow; import com.risingwave.connector.api.sink.SinkRow; import com.risingwave.proto.Data; import java.util.ArrayList; @@ -28,7 +28,7 @@ public class SinkRowMapTest { @Test public void testInsert() { SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row = new ArraySinkrow(Data.Op.OP_UNSPECIFIED, 1); + SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); @@ -42,10 +42,10 @@ public void testInsert() { public void testInsertAfterDelete() { SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row1 = new ArraySinkrow(Data.Op.OP_UNSPECIFIED, 1, "Alice"); + SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice"); List> key1 = new ArrayList<>(); key1.add((Comparable) row1.get(0)); - SinkRow row2 = new ArraySinkrow(Data.Op.OP_UNSPECIFIED, 1, "Bob"); + SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Bob"); List> key2 = new ArrayList<>(); key2.add((Comparable) row2.get(0)); @@ -59,7 +59,7 @@ public void testInsertAfterDelete() { @Test public void testInsertAfterInsert() { SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row = new ArraySinkrow(Data.Op.OP_UNSPECIFIED, 1); + SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); @@ -83,7 +83,7 @@ public void testInsertAfterInsert() { public void testDelete() { SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row = new ArraySinkrow(Data.Op.OP_UNSPECIFIED, 1); + SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); @@ -96,7 +96,7 @@ public void testDelete() { @Test public void testDeleteAfterDelete() { SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row = new ArraySinkrow(Data.Op.OP_UNSPECIFIED, 1); + SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); @@ -118,7 +118,7 @@ public void testDeleteAfterDelete() { public void testDeleteAfterInsert() { SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row = new ArraySinkrow(Data.Op.OP_UNSPECIFIED, 1); + SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); @@ -131,10 +131,10 @@ public void testDeleteAfterInsert() { public void testDeleteAfterUpdate() { SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row1 = new ArraySinkrow(Data.Op.OP_UNSPECIFIED, 1, "Alice"); + SinkRow row1 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Alice"); List> key1 = new ArrayList<>(); key1.add((Comparable) row1.get(0)); - SinkRow row2 = new ArraySinkrow(Data.Op.OP_UNSPECIFIED, 1, "Clare"); + SinkRow row2 = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1, "Clare"); List> key2 = new ArrayList<>(); key2.add((Comparable) row2.get(0)); @@ -150,7 +150,7 @@ public void testDeleteAfterUpdate() { public void testClear() { SinkRowMap sinkRowMap = new SinkRowMap(); - SinkRow row = new ArraySinkrow(Data.Op.OP_UNSPECIFIED, 1); + SinkRow row = new ArraySinkRow(Data.Op.OP_UNSPECIFIED, 1); List> key = new ArrayList<>(); key.add((Comparable) row.get(0)); sinkRowMap.insert(key, row); diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/UpsertIcebergSinkLocalTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/UpsertIcebergSinkLocalTest.java index 12da72424eba..3ee18f546c4f 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/UpsertIcebergSinkLocalTest.java +++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/UpsertIcebergSinkLocalTest.java @@ -21,7 +21,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkrow; +import com.risingwave.connector.api.sink.ArraySinkRow; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; @@ -115,7 +115,7 @@ public void testSync() throws IOException { FileFormat.PARQUET); try { - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 1, "Alice"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 1, "Alice"))); sink.sync(); Record record1 = GenericRecord.create(icebergTableSchema); @@ -125,7 +125,7 @@ public void testSync() throws IOException { validateTableWithIceberg(expected); validateTableWithSpark(expected); - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 2, "Bob"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 2, "Bob"))); validateTableWithIceberg(expected); validateTableWithSpark(expected); @@ -160,11 +160,11 @@ public void testWrite() throws IOException { try { sink.write( Iterators.forArray( - new ArraySinkrow(Op.INSERT, 1, "Alice"), - new ArraySinkrow(Op.INSERT, 2, "Bob"), - new ArraySinkrow(Op.UPDATE_DELETE, 1, "Alice"), - new ArraySinkrow(Op.UPDATE_INSERT, 1, "Clare"), - new ArraySinkrow(Op.DELETE, 2, "Bob"))); + new ArraySinkRow(Op.INSERT, 1, "Alice"), + new ArraySinkRow(Op.INSERT, 2, "Bob"), + new ArraySinkRow(Op.UPDATE_DELETE, 1, "Alice"), + new ArraySinkRow(Op.UPDATE_INSERT, 1, "Clare"), + new ArraySinkRow(Op.DELETE, 2, "Bob"))); sink.sync(); Record record1 = GenericRecord.create(icebergTableSchema); @@ -176,9 +176,9 @@ public void testWrite() throws IOException { sink.write( Iterators.forArray( - new ArraySinkrow(Op.UPDATE_DELETE, 1, "Clare"), - new ArraySinkrow(Op.UPDATE_INSERT, 1, "Alice"), - new ArraySinkrow(Op.DELETE, 1, "Alice"))); + new ArraySinkRow(Op.UPDATE_DELETE, 1, "Clare"), + new ArraySinkRow(Op.UPDATE_INSERT, 1, "Alice"), + new ArraySinkRow(Op.DELETE, 1, "Alice"))); sink.sync(); validateTableWithIceberg(Sets.newHashSet()); diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/UpsertIcebergSinkPartitionTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/UpsertIcebergSinkPartitionTest.java index e0a3a9f38035..70207be05403 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/UpsertIcebergSinkPartitionTest.java +++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/UpsertIcebergSinkPartitionTest.java @@ -22,7 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkrow; +import com.risingwave.connector.api.sink.ArraySinkRow; import com.risingwave.proto.Data; import java.io.IOException; import java.nio.file.Files; @@ -129,7 +129,7 @@ public void testSync() throws IOException { FileFormat.PARQUET); try { - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 1, "Alice", "aaa"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 1, "Alice", "aaa"))); sink.sync(); Record record1 = GenericRecord.create(icebergTableSchema); @@ -140,7 +140,7 @@ public void testSync() throws IOException { validateTableWithIceberg(expected); validateTableWithSpark(expected); - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 2, "Bob", "bbb"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 2, "Bob", "bbb"))); validateTableWithIceberg(expected); validateTableWithSpark(expected); @@ -176,11 +176,11 @@ public void testWrite() throws IOException { try { sink.write( Iterators.forArray( - new ArraySinkrow(Op.INSERT, 1, "Alice", "aaa"), - new ArraySinkrow(Op.INSERT, 2, "Bob", "bbb"), - new ArraySinkrow(Op.UPDATE_DELETE, 1, "Alice", "aaa"), - new ArraySinkrow(Op.UPDATE_INSERT, 1, "Clare", "ccc"), - new ArraySinkrow(Op.DELETE, 2, "Bob", "bbb"))); + new ArraySinkRow(Op.INSERT, 1, "Alice", "aaa"), + new ArraySinkRow(Op.INSERT, 2, "Bob", "bbb"), + new ArraySinkRow(Op.UPDATE_DELETE, 1, "Alice", "aaa"), + new ArraySinkRow(Op.UPDATE_INSERT, 1, "Clare", "ccc"), + new ArraySinkRow(Op.DELETE, 2, "Bob", "bbb"))); sink.sync(); Record record1 = GenericRecord.create(icebergTableSchema); @@ -193,9 +193,9 @@ public void testWrite() throws IOException { sink.write( Iterators.forArray( - new ArraySinkrow(Op.UPDATE_DELETE, 1, "Clare", "ccc"), - new ArraySinkrow(Op.UPDATE_INSERT, 1, "Alice", "aaa"), - new ArraySinkrow(Op.DELETE, 1, "Alice", "aaa"))); + new ArraySinkRow(Op.UPDATE_DELETE, 1, "Clare", "ccc"), + new ArraySinkRow(Op.UPDATE_INSERT, 1, "Alice", "aaa"), + new ArraySinkRow(Op.DELETE, 1, "Alice", "aaa"))); sink.sync(); validateTableWithIceberg(Sets.newHashSet()); diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 43f99e511943..d110b351d5de 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -156,22 +156,25 @@ private PreparedStatement prepareStatement(SinkRow row) { @Override public void write(Iterator rows) { while (rows.hasNext()) { - SinkRow row = rows.next(); - PreparedStatement stmt = prepareStatement(row); - if (row.getOp() == Data.Op.UPDATE_DELETE) { - continue; - } - if (stmt != null) { - try { - LOG.debug("Executing statement: " + stmt); - stmt.executeUpdate(); - } catch (SQLException e) { - throw Status.INTERNAL.withCause(e).asRuntimeException(); + try (SinkRow row = rows.next()) { + PreparedStatement stmt = prepareStatement(row); + if (row.getOp() == Data.Op.UPDATE_DELETE) { + continue; } - } else { - throw Status.INTERNAL - .withDescription("empty statement encoded") - .asRuntimeException(); + if (stmt != null) { + try { + LOG.debug("Executing statement: " + stmt); + stmt.executeUpdate(); + } catch (SQLException e) { + throw Status.INTERNAL.withCause(e).asRuntimeException(); + } + } else { + throw Status.INTERNAL + .withDescription("empty statement encoded") + .asRuntimeException(); + } + } catch (Exception e) { + throw new RuntimeException(e); } } } diff --git a/java/connector-node/risingwave-sink-jdbc/src/test/java/com/risingwave/connector/JDBCSinkTest.java b/java/connector-node/risingwave-sink-jdbc/src/test/java/com/risingwave/connector/JDBCSinkTest.java index c089f6d9ac2a..62a8f63c207a 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/test/java/com/risingwave/connector/JDBCSinkTest.java +++ b/java/connector-node/risingwave-sink-jdbc/src/test/java/com/risingwave/connector/JDBCSinkTest.java @@ -18,7 +18,7 @@ import com.google.common.collect.Iterators; import com.risingwave.connector.api.TableSchema; -import com.risingwave.connector.api.sink.ArraySinkrow; +import com.risingwave.connector.api.sink.ArraySinkRow; import com.risingwave.proto.Data.Op; import java.sql.*; import org.junit.Test; @@ -33,7 +33,7 @@ public void testJDBCSync() throws SQLException { JDBCSink sink = new JDBCSink(conn, TableSchema.getMockTableSchema(), "test"); createMockTable(conn, sink.getTableName()); - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 1, "Alice"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 1, "Alice"))); sink.sync(); Statement stmt = conn.createStatement(); @@ -44,7 +44,7 @@ public void testJDBCSync() throws SQLException { } assertEquals(1, count); - sink.write(Iterators.forArray(new ArraySinkrow(Op.INSERT, 2, "Bob"))); + sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 2, "Bob"))); sink.sync(); stmt = conn.createStatement(); rs = stmt.executeQuery("SELECT * FROM test"); @@ -83,11 +83,11 @@ public void testJDBCWrite() throws SQLException { sink.write( Iterators.forArray( - new ArraySinkrow(Op.INSERT, 1, "Alice"), - new ArraySinkrow(Op.INSERT, 2, "Bob"), - new ArraySinkrow(Op.UPDATE_DELETE, 1, "Alice"), - new ArraySinkrow(Op.UPDATE_INSERT, 1, "Clare"), - new ArraySinkrow(Op.DELETE, 2, "Bob"))); + new ArraySinkRow(Op.INSERT, 1, "Alice"), + new ArraySinkRow(Op.INSERT, 2, "Bob"), + new ArraySinkRow(Op.UPDATE_DELETE, 1, "Alice"), + new ArraySinkRow(Op.UPDATE_INSERT, 1, "Clare"), + new ArraySinkRow(Op.DELETE, 2, "Bob"))); sink.sync(); Statement stmt = conn.createStatement(); diff --git a/proto/connector_service.proto b/proto/connector_service.proto index 3c9a0d528dce..f7912e7d9221 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -29,7 +29,7 @@ message SinkConfig { } enum SinkPayloadFormat { - UNSPECIFIED_FORMAT = 0; + FORMAT_UNSPECIFIED = 0; JSON = 1; } From 79f829ee1c61e7518c211b77213a9b33bef9cdf5 Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 16 Mar 2023 17:14:33 +0800 Subject: [PATCH 4/5] update typescript --- dashboard/proto/gen/connector_service.ts | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dashboard/proto/gen/connector_service.ts b/dashboard/proto/gen/connector_service.ts index 17d86fa6eb14..48d70c3d4092 100644 --- a/dashboard/proto/gen/connector_service.ts +++ b/dashboard/proto/gen/connector_service.ts @@ -11,7 +11,7 @@ import { export const protobufPackage = "connector_service"; export const SinkPayloadFormat = { - UNSPECIFIED_FORMAT: "UNSPECIFIED_FORMAT", + FORMAT_UNSPECIFIED: "FORMAT_UNSPECIFIED", JSON: "JSON", UNRECOGNIZED: "UNRECOGNIZED", } as const; @@ -21,8 +21,8 @@ export type SinkPayloadFormat = typeof SinkPayloadFormat[keyof typeof SinkPayloa export function sinkPayloadFormatFromJSON(object: any): SinkPayloadFormat { switch (object) { case 0: - case "UNSPECIFIED_FORMAT": - return SinkPayloadFormat.UNSPECIFIED_FORMAT; + case "FORMAT_UNSPECIFIED": + return SinkPayloadFormat.FORMAT_UNSPECIFIED; case 1: case "JSON": return SinkPayloadFormat.JSON; @@ -35,8 +35,8 @@ export function sinkPayloadFormatFromJSON(object: any): SinkPayloadFormat { export function sinkPayloadFormatToJSON(object: SinkPayloadFormat): string { switch (object) { - case SinkPayloadFormat.UNSPECIFIED_FORMAT: - return "UNSPECIFIED_FORMAT"; + case SinkPayloadFormat.FORMAT_UNSPECIFIED: + return "FORMAT_UNSPECIFIED"; case SinkPayloadFormat.JSON: return "JSON"; case SinkPayloadFormat.UNRECOGNIZED: @@ -441,14 +441,14 @@ export const SinkStreamRequest = { }; function createBaseSinkStreamRequest_StartSink(): SinkStreamRequest_StartSink { - return { sinkConfig: undefined, format: SinkPayloadFormat.UNSPECIFIED_FORMAT }; + return { sinkConfig: undefined, format: SinkPayloadFormat.FORMAT_UNSPECIFIED }; } export const SinkStreamRequest_StartSink = { fromJSON(object: any): SinkStreamRequest_StartSink { return { sinkConfig: isSet(object.sinkConfig) ? SinkConfig.fromJSON(object.sinkConfig) : undefined, - format: isSet(object.format) ? sinkPayloadFormatFromJSON(object.format) : SinkPayloadFormat.UNSPECIFIED_FORMAT, + format: isSet(object.format) ? sinkPayloadFormatFromJSON(object.format) : SinkPayloadFormat.FORMAT_UNSPECIFIED, }; }, @@ -465,7 +465,7 @@ export const SinkStreamRequest_StartSink = { message.sinkConfig = (object.sinkConfig !== undefined && object.sinkConfig !== null) ? SinkConfig.fromPartial(object.sinkConfig) : undefined; - message.format = object.format ?? SinkPayloadFormat.UNSPECIFIED_FORMAT; + message.format = object.format ?? SinkPayloadFormat.FORMAT_UNSPECIFIED; return message; }, }; From fc8d6b12cb8250b017696f06a8a2007ecb47dcfb Mon Sep 17 00:00:00 2001 From: William Wen Date: Thu, 16 Mar 2023 17:27:36 +0800 Subject: [PATCH 5/5] add license --- .../connector/api/sink/CloseableIterator.java | 16 ++++++++++++++++ .../connector/api/sink/TrivialCloseIterator.java | 16 ++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CloseableIterator.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CloseableIterator.java index bea6348bd1bf..b866e6dddcf5 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CloseableIterator.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CloseableIterator.java @@ -1,3 +1,19 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector.api.sink; import java.util.Iterator; diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/TrivialCloseIterator.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/TrivialCloseIterator.java index 40bc7f3e3fc1..e89b9330bd9c 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/TrivialCloseIterator.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/TrivialCloseIterator.java @@ -1,3 +1,19 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package com.risingwave.connector.api.sink; import java.util.Iterator;