Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): reject invalid options when creating sink #8757

Merged
merged 16 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ statement ok
create table t (v1 int, v2 int);

statement ok
create sink s1 from t with (connector = 'console');
create sink s1 from t with (connector = 'blackhole');

statement ok
create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'console');
create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'blackhole');

statement error The sink cannot be append-only
create sink s3 from t with (connector = 'console', type = 'append-only');
create sink s3 from t with (connector = 'blackhole', type = 'append-only');

statement ok
create sink s3 from t with (connector = 'console', type = 'append-only', force_append_only = 'true');
create sink s3 from t with (connector = 'blackhole', type = 'append-only', force_append_only = 'true');

statement error Cannot force the sink to be append-only
create sink s4 from t with (connector = 'console', type = 'upsert', force_append_only = 'true');
create sink s4 from t with (connector = 'blackhole', type = 'upsert', force_append_only = 'true');

statement ok
drop sink s1
Expand Down
6 changes: 2 additions & 4 deletions e2e_test/sink/create_sink_as.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
# the sink requires a primary index for efficient execution
# which will be enforced by schema precheck in the future

statement ok
CREATE TABLE t4 (v1 int primary key, v2 int);

Expand All @@ -11,7 +8,8 @@ statement ok
CREATE SINK s4 AS select mv4.v1 as v1, mv4.v2 as v2 from mv4 WITH (
connector = 'jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name = 't4'
table.name = 't4',
type = 'upsert'
);

statement ok
Expand Down
6 changes: 4 additions & 2 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ statement ok
CREATE SINK s_postgres FROM mv_remote WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='t_remote'
table.name='t_remote',
type='upsert'
);

statement ok
CREATE SINK s_mysql FROM mv_remote WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw',
table.name='t_remote'
table.name='t_remote',
type='upsert'
);

statement ok
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/mysql-sink/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ FROM
target_count WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:mysql://mysql:3306/mydb?user=root&password=123456',
table.name = 'target_count'
table.name = 'target_count',
type = 'upsert'
);
3 changes: 2 additions & 1 deletion integration_tests/postgres-sink/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ FROM
target_count WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://postgres:5432/mydb?user=myuser&password=123456',
table.name = 'target_count'
table.name = 'target_count',
type = 'upsert'
);
3 changes: 2 additions & 1 deletion integration_tests/tidb-cdc-sink/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ CREATE SINK hot_hashtags_sink FROM hot_hashtags
WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://tidb:4000/test?user=root&password=',
table.name='hot_hashtags'
table.name='hot_hashtags',
type='upsert'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector.api.sink;
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved

public class CommonSinkConfig {
private String connector;

public String getConnector() {
return connector;
}

public void setConnector(String connector) {
this.connector = connector;
}
}
5 changes: 1 addition & 4 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,7 @@ def test_upsert_iceberg_sink(input_file):
def test_deltalake_sink(input_file):
test_sink("deltalake",
{"location":"minio://minioadmin:minioadmin@127.0.0.1:9000/bucket/delta",
"location.type":"minio",
"storage_options.s3a_endpoint":"http://127.0.0.1:9000",
"storage_options.s3a_access_key":"minioadmin",
"storage_options.s3a_secret_key":"minioadmin"},
"location.type":"minio"},
input_file)

if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@
public class FileSink extends SinkBase {
private final FileWriter sinkWriter;

private String sinkPath;
private FileSinkConfig config;

private boolean closed = false;

public FileSink(String sinkPath, TableSchema tableSchema) {
public FileSink(FileSinkConfig config, TableSchema tableSchema) {
super(tableSchema);
this.sinkPath = sinkPath;

String sinkPath = config.getSinkPath();
try {
new File(sinkPath).mkdirs();
Path path = Paths.get(sinkPath, UUID.randomUUID() + ".dat");
Expand All @@ -48,10 +49,12 @@ public FileSink(String sinkPath, TableSchema tableSchema) {
throw INTERNAL.withDescription("failed to create file: " + path)
.asRuntimeException();
}
this.sinkPath = path.toString();
config.setSinkPath(path.toString());
} catch (IOException e) {
throw INTERNAL.withCause(e).asRuntimeException();
}

this.config = config;
}

@Override
Expand Down Expand Up @@ -103,7 +106,7 @@ public void drop() {
}

public String getSinkPath() {
return sinkPath;
return config.getSinkPath();
}

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector;
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.risingwave.connector.api.sink.CommonSinkConfig;

public class FileSinkConfig extends CommonSinkConfig {
private String sinkPath;

@JsonCreator
public FileSinkConfig(@JsonProperty(value = "output.path") String sinkPath) {
this.sinkPath = sinkPath;
}

public String getSinkPath() {
return sinkPath;
}

public void setSinkPath(String sinkPath) {
this.sinkPath = sinkPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,27 @@

package com.risingwave.connector;

import static io.grpc.Status.*;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkBase;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.proto.Catalog.SinkType;
import java.util.Map;

public class FileSinkFactory implements SinkFactory {
public static final String OUTPUT_PATH_PROP = "output.path";

@Override
public SinkBase create(TableSchema tableSchema, Map<String, String> tableProperties) {
String sinkPath = tableProperties.get(OUTPUT_PATH_PROP);
return new FileSink(sinkPath, tableSchema);
ObjectMapper mapper = new ObjectMapper();
FileSinkConfig config = mapper.convertValue(tableProperties, FileSinkConfig.class);
return new FileSink(config, tableSchema);
}

@Override
public void validate(
TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType) {
if (!tableProperties.containsKey(OUTPUT_PATH_PROP)) {
throw INVALID_ARGUMENT
.withDescription(String.format("%s is not specified", OUTPUT_PATH_PROP))
.asRuntimeException();
}
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true);
mapper.convertValue(tableProperties, FileSinkConfig.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.SinkConfig;
import io.grpc.stub.StreamObserver;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +39,30 @@ public void handle(ConnectorServiceProto.ValidateSinkRequest request) {
TableSchema tableSchema = TableSchema.fromProto(sinkConfig.getTableSchema());
SinkFactory sinkFactory = SinkUtils.getSinkFactory(sinkConfig.getConnectorType());
sinkFactory.validate(tableSchema, sinkConfig.getPropertiesMap(), request.getSinkType());

} catch (IllegalArgumentException e) {
LOG.error("sink validation failed", e);
// Extract useful information from the error thrown by Jackson and convert it into a
// more concise message.
String errorMessage = e.getLocalizedMessage();
Pattern missingFieldPattern = Pattern.compile("Missing creator property '([^']*)'");
Pattern unrecognizedFieldPattern = Pattern.compile("Unrecognized field \"([^\"]*)\"");
Matcher missingFieldMatcher = missingFieldPattern.matcher(errorMessage);
Matcher unrecognizedFieldMatcher = unrecognizedFieldPattern.matcher(errorMessage);
if (missingFieldMatcher.find()) {
errorMessage = "missing field `" + missingFieldMatcher.group(1) + "`";
} else if (unrecognizedFieldMatcher.find()) {
errorMessage = "unknown field `" + unrecognizedFieldMatcher.group(1) + "`";
}
responseObserver.onNext(
ConnectorServiceProto.ValidateSinkResponse.newBuilder()
.setError(
ConnectorServiceProto.ValidationError.newBuilder()
.setErrorMessage(errorMessage)
.build())
.build());
responseObserver.onCompleted();

} catch (Exception e) {
LOG.error("sink validation failed", e);
responseObserver.onNext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public void testSync() throws IOException {
Files.createDirectories(Paths.get(path));
}

FileSink sink = new FileSink(path, TableSchema.getMockTableSchema());
FileSinkConfig config = new FileSinkConfig(path);
FileSink sink = new FileSink(config, TableSchema.getMockTableSchema());
String filePath = sink.getSinkPath();

Path file = Paths.get(filePath);
Expand Down Expand Up @@ -76,7 +77,8 @@ public void testWrite() throws IOException {
if (!Paths.get(path).toFile().isDirectory()) {
Files.createDirectories(Paths.get(path));
}
FileSink sink = new FileSink(path, TableSchema.getMockTableSchema());
FileSinkConfig config = new FileSinkConfig(path);
FileSink sink = new FileSink(config, TableSchema.getMockTableSchema());

String filePath = sink.getSinkPath();
try {
Expand Down Expand Up @@ -104,7 +106,8 @@ public void testDrop() throws IOException {
if (!Paths.get(path).toFile().isDirectory()) {
Files.createDirectories(Paths.get(path));
}
FileSink sink = new FileSink(path, TableSchema.getMockTableSchema());
FileSinkConfig config = new FileSinkConfig(path);
FileSink sink = new FileSink(config, TableSchema.getMockTableSchema());

sink.drop();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2023 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector;
xx01cyx marked this conversation as resolved.
Show resolved Hide resolved

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.risingwave.connector.api.sink.CommonSinkConfig;

public class DeltaLakeSinkConfig extends CommonSinkConfig {
private String location;

private String locationType;

private String sinkType;

@JsonProperty(value = "force_append_only")
private Boolean forceAppendOnly;

@JsonCreator
public DeltaLakeSinkConfig(
@JsonProperty(value = "location") String location,
@JsonProperty(value = "location.type") String locationType,
@JsonProperty(value = "type") String sinkType) {
this.location = location;
this.locationType = locationType;
this.sinkType = sinkType;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}

public String getLocationType() {
return locationType;
}

public void setLocationType(String locationType) {
this.locationType = locationType;
}
}
Loading