Skip to content

Commit

Permalink
[Feature][Connector V2] expose configurable options in Cassandra (#3681)
Browse files Browse the repository at this point in the history
  • Loading branch information
cason0126 authored May 19, 2023
1 parent 4151506 commit 73f63a5
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 160 deletions.
26 changes: 13 additions & 13 deletions docs/en/connector-v2/sink/Cassandra.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ Write data to Apache Cassandra.

## Options

| name | type | required | default value |
|-------------------|--------|----------|---------------|
| host | String | Yes | - |
| keyspace | String | Yes | - |
| table | String | Yes | - |
| username | String | No | - |
| password | String | No | - |
| datacenter | String | No | datacenter1 |
| consistency_level | String | No | LOCAL_ONE |
| fields | String | No | LOCAL_ONE |
| batch_size | String | No | 5000 |
| batch_type | String | No | UNLOGGER |
| async_write | String | No | true |
| name | type | required | default value |
|-------------------|---------|----------|---------------|
| host | String | Yes | - |
| keyspace | String | Yes | - |
| table | String | Yes | - |
| username | String | No | - |
| password | String | No | - |
| datacenter | String | No | datacenter1 |
| consistency_level | String | No | LOCAL_ONE |
| fields | String | No | LOCAL_ONE |
| batch_size | int | No | 5000 |
| batch_type | String | No | UNLOGGED |
| async_write | boolean | No | true |

### host [string]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,99 +17,50 @@

package org.apache.seatunnel.connectors.seatunnel.cassandra.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.ToString;
public class CassandraConfig {

import java.io.Serializable;
import java.util.List;
public static final Integer DEFAULT_BATCH_SIZE = 5000;

@Data
@ToString
@NoArgsConstructor
public class CassandraConfig implements Serializable {
public static final Option<String> HOST =
Options.key("host").stringType().noDefaultValue().withDescription("");

public static final String HOST = "host";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String DATACENTER = "datacenter";
public static final String KEYSPACE = "keyspace";
public static final String TABLE = "table";
public static final String CQL = "cql";
public static final String FIELDS = "fields";
public static final String CONSISTENCY_LEVEL = "consistency_level";
public static final String BATCH_SIZE = "batch_size";
public static final String BATCH_TYPE = "batch_type";
public static final String ASYNC_WRITE = "async_write";
public static final Option<String> KEYSPACE =
Options.key("keyspace").stringType().noDefaultValue().withDescription("");

private String host;
private String username;
private String password;
private String datacenter;
private String keyspace;
private String table;
private String cql;
private List<String> fields;
private ConsistencyLevel consistencyLevel;
private Integer batchSize;
private DefaultBatchType batchType;
private Boolean asyncWrite;
public static final Option<String> USERNAME =
Options.key("username").stringType().noDefaultValue().withDescription("");
public static final Option<String> PASSWORD =
Options.key("password").stringType().noDefaultValue().withDescription("");
public static final Option<String> DATACENTER =
Options.key("datacenter").stringType().defaultValue("datacenter1").withDescription("");

public CassandraConfig(@NonNull String host, @NonNull String keyspace) {
this.host = host;
this.keyspace = keyspace;
}
public static final Option<String> CONSISTENCY_LEVEL =
Options.key("consistency_level")
.stringType()
.defaultValue("LOCAL_ONE")
.withDescription("");

public static CassandraConfig getCassandraConfig(Config config) {
CassandraConfig cassandraConfig =
new CassandraConfig(config.getString(HOST), config.getString(KEYSPACE));
if (config.hasPath(USERNAME)) {
cassandraConfig.setUsername(config.getString(USERNAME));
}
if (config.hasPath(PASSWORD)) {
cassandraConfig.setPassword(config.getString(PASSWORD));
}
if (config.hasPath(DATACENTER)) {
cassandraConfig.setDatacenter(config.getString(DATACENTER));
} else {
cassandraConfig.setDatacenter("datacenter1");
}
if (config.hasPath(TABLE)) {
cassandraConfig.setTable(config.getString(TABLE));
}
if (config.hasPath(CQL)) {
cassandraConfig.setCql(config.getString(CQL));
}
if (config.hasPath(FIELDS)) {
cassandraConfig.setFields(config.getStringList(FIELDS));
}
if (config.hasPath(CONSISTENCY_LEVEL)) {
cassandraConfig.setConsistencyLevel(
DefaultConsistencyLevel.valueOf(config.getString(CONSISTENCY_LEVEL)));
} else {
cassandraConfig.setConsistencyLevel(DefaultConsistencyLevel.LOCAL_ONE);
}
if (config.hasPath(BATCH_SIZE)) {
cassandraConfig.setBatchSize(config.getInt(BATCH_SIZE));
} else {
cassandraConfig.setBatchSize(Integer.parseInt("5000"));
}
if (config.hasPath(BATCH_TYPE)) {
cassandraConfig.setBatchType(DefaultBatchType.valueOf(config.getString(BATCH_TYPE)));
} else {
cassandraConfig.setBatchType(DefaultBatchType.UNLOGGED);
}
if (config.hasPath(ASYNC_WRITE)) {
cassandraConfig.setAsyncWrite(config.getBoolean(ASYNC_WRITE));
} else {
cassandraConfig.setAsyncWrite(true);
}
return cassandraConfig;
}
public static final Option<String> TABLE =
Options.key("table").stringType().noDefaultValue().withDescription("");

public static final Option<String> FIELDS =
Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("");

public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(DEFAULT_BATCH_SIZE)
.withDescription("");

public static final Option<String> BATCH_TYPE =
Options.key("batch_type").stringType().defaultValue("UNLOGGED").withDescription("");

public static final Option<Boolean> ASYNC_WRITE =
Options.key("async_write").booleanType().defaultValue(true).withDescription("");

public static final Option<String> CQL =
Options.key("cql").stringType().noDefaultValue().withDescription("");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cassandra.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.DefaultBatchType;
import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;
import java.util.List;

@Setter
@Getter
public class CassandraParameters implements Serializable {
private String host;
private String username;
private String password;
private String datacenter;
private String keyspace;
private String table;
private String cql;
private List<String> fields;
private ConsistencyLevel consistencyLevel;
private Integer batchSize;
private DefaultBatchType batchType;
private Boolean asyncWrite;

public void buildWithConfig(Config config) {
this.host = config.getString(CassandraConfig.HOST.key());
this.keyspace = config.getString(CassandraConfig.KEYSPACE.key());

if (config.hasPath(CassandraConfig.USERNAME.key())) {
this.username = config.getString(CassandraConfig.USERNAME.key());
}
if (config.hasPath(CassandraConfig.PASSWORD.key())) {
this.password = config.getString(CassandraConfig.PASSWORD.key());
}
if (config.hasPath(CassandraConfig.DATACENTER.key())) {
this.datacenter = config.getString(CassandraConfig.DATACENTER.key());
} else {
this.datacenter = CassandraConfig.DATACENTER.defaultValue();
}
if (config.hasPath(CassandraConfig.TABLE.key())) {
this.table = config.getString(CassandraConfig.TABLE.key());
}
if (config.hasPath(CassandraConfig.CQL.key())) {
this.cql = config.getString(CassandraConfig.CQL.key());
}
if (config.hasPath(CassandraConfig.FIELDS.key())) {
this.fields = config.getStringList(CassandraConfig.FIELDS.key());
}
if (config.hasPath(CassandraConfig.CONSISTENCY_LEVEL.key())) {
this.consistencyLevel =
DefaultConsistencyLevel.valueOf(
config.getString(CassandraConfig.CONSISTENCY_LEVEL.key()));
} else {
this.consistencyLevel =
DefaultConsistencyLevel.valueOf(
CassandraConfig.CONSISTENCY_LEVEL.defaultValue());
}
if (config.hasPath(CassandraConfig.BATCH_SIZE.key())) {
this.batchSize = config.getInt(CassandraConfig.BATCH_SIZE.key());
} else {
this.batchSize = CassandraConfig.BATCH_SIZE.defaultValue();
}
if (config.hasPath(CassandraConfig.BATCH_TYPE.key())) {
this.batchType =
DefaultBatchType.valueOf(config.getString(CassandraConfig.BATCH_TYPE.key()));
} else {
this.batchType = DefaultBatchType.valueOf(CassandraConfig.BATCH_TYPE.defaultValue());
}
if (config.hasPath(CassandraConfig.ASYNC_WRITE.key())) {
this.asyncWrite = config.getBoolean(CassandraConfig.ASYNC_WRITE.key());
} else {
this.asyncWrite = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
Expand All @@ -51,8 +51,7 @@
@AutoService(SeaTunnelSink.class)
public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {

private CassandraConfig cassandraConfig;

private final CassandraParameters cassandraParameters = new CassandraParameters();
private SeaTunnelRowType seaTunnelRowType;

private ColumnDefinitions tableSchema;
Expand All @@ -63,32 +62,35 @@ public String getPluginName() {
}

@Override
public void prepare(Config config) throws PrepareFailException {
CheckResult checkResult = CheckConfigUtil.checkAllExists(config, HOST, KEYSPACE, TABLE);
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult checkResult =
CheckConfigUtil.checkAllExists(
pluginConfig, HOST.key(), KEYSPACE.key(), TABLE.key());
if (!checkResult.isSuccess()) {
throw new CassandraConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, checkResult.getMsg()));
}
this.cassandraConfig = CassandraConfig.getCassandraConfig(config);
this.cassandraParameters.buildWithConfig(pluginConfig);
try (CqlSession session =
CassandraClient.getCqlSessionBuilder(
cassandraConfig.getHost(),
cassandraConfig.getKeyspace(),
cassandraConfig.getUsername(),
cassandraConfig.getPassword(),
cassandraConfig.getDatacenter())
cassandraParameters.getHost(),
cassandraParameters.getKeyspace(),
cassandraParameters.getUsername(),
cassandraParameters.getPassword(),
cassandraParameters.getDatacenter())
.build()) {
List<String> fields = cassandraConfig.getFields();
this.tableSchema = CassandraClient.getTableSchema(session, cassandraConfig.getTable());
List<String> fields = cassandraParameters.getFields();
this.tableSchema =
CassandraClient.getTableSchema(session, pluginConfig.getString(TABLE.key()));
if (fields == null || fields.isEmpty()) {
List<String> newFields = new ArrayList<>();
for (int i = 0; i < tableSchema.size(); i++) {
newFields.add(tableSchema.get(i).getName().asInternal());
}
cassandraConfig.setFields(newFields);
this.cassandraParameters.setFields(newFields);
} else {
for (String field : fields) {
if (!tableSchema.contains(field)) {
Expand All @@ -97,7 +99,7 @@ public void prepare(Config config) throws PrepareFailException {
"Field "
+ field
+ " does not exist in table "
+ config.getString(TABLE));
+ pluginConfig.getString(TABLE.key()));
}
}
}
Expand All @@ -123,6 +125,6 @@ public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
return new CassandraSinkWriter(cassandraConfig, seaTunnelRowType, tableSchema);
return new CassandraSinkWriter(cassandraParameters, seaTunnelRowType, tableSchema);
}
}
Loading

0 comments on commit 73f63a5

Please sign in to comment.