Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CQL Vector type (upgrade Core Driver to 4.16.0, DSBulk to 1.10.0 and fork DSBulk Text Codec #16

Merged
merged 2 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
<artifactId>dsbulk-codecs-api</artifactId>
</dependency>
<dependency>
<!-- forked in this repository temporary -->
<groupId>com.datastax.oss</groupId>
<artifactId>dsbulk-codecs-text</artifactId>
<version>1.11.0-vectors-preview</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.datastax.oss.common.sink.config;

import com.datastax.oss.common.sink.ConfigException;
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
import com.datastax.oss.driver.shaded.guava.common.base.Splitter;
import com.datastax.oss.dsbulk.codecs.api.ConversionContext;
import com.datastax.oss.dsbulk.codecs.api.ConvertingCodecFactory;
Expand Down Expand Up @@ -139,7 +140,7 @@ public String toString() {
}

@NonNull
public ConvertingCodecFactory createCodecFactory() {
public ConvertingCodecFactory createCodecFactory(DefaultCodecRegistry defaultCodecRegistry) {
ConversionContext context =
new TextConversionContext()
.setLocale(
Expand All @@ -150,7 +151,7 @@ public ConvertingCodecFactory createCodecFactory() {
.setTimeZone(ZoneId.of(getString(getTopicSettingPath(topicName, TIMEZONE_OPT))))
.setTimeUnit(
TimeUnit.valueOf(getString(getTopicSettingPath(topicName, TIME_UNIT_OPT))));
return new ConvertingCodecFactory(context);
return new ConvertingCodecFactory(defaultCodecRegistry, context);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,23 @@
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.CqlVectorType;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
import com.datastax.oss.driver.internal.core.type.codec.CqlVectorCodec;
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.dsbulk.codecs.api.ConvertingCodecFactory;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.Collection;
Expand All @@ -89,6 +96,21 @@ public class LifeCycleManager {
private static final ConcurrentMap<String, InstanceState> INSTANCE_STATES =
new ConcurrentHashMap<>();
private static MetricRegistry metricRegistry = new MetricRegistry();
private static final DefaultCodecRegistry CODEC_REGISTRY =
new DefaultCodecRegistry("default-registry") {

protected TypeCodec<?> createCodec(
@Nullable DataType cqlType,
@Nullable GenericType<?> javaType,
boolean isJavaCovariant) {
if (cqlType instanceof CqlVectorType) {
log.info("Automatically Registering codec for CqlVectorType {}", cqlType);
CqlVectorType vectorType = (CqlVectorType) cqlType;
return new CqlVectorCodec<>(vectorType, codecFor(vectorType.getSubtype()));
}
return super.createCodec(cqlType, javaType, isJavaCovariant);
}
};

/** This is a utility class that no one should instantiate. */
private LifeCycleManager() {}
Expand Down Expand Up @@ -420,7 +442,8 @@ private static InstanceState buildInstanceState(CqlSession session, CassandraSin
.stream()
.map(
topicConfig -> {
ConvertingCodecFactory codecFactory = topicConfig.createCodecFactory();
ConvertingCodecFactory codecFactory =
topicConfig.createCodecFactory(CODEC_REGISTRY);
TopicState topicState = new TopicState(codecFactory);
topicStates.put(topicConfig.getTopicName(), topicState);

Expand Down Expand Up @@ -485,6 +508,7 @@ public static CqlSession buildCqlSession(
SslConfig sslConfig = config.getSslConfig();
CqlSessionBuilder builder =
new SessionBuilder(sslConfig)
.withCodecRegistry(CODEC_REGISTRY)
.withApplicationVersion(version)
.withApplicationName(applicationName)
.withClientId(generateClientId(config.getInstanceName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import com.datastax.oss.dsbulk.codecs.api.format.temporal.CqlTemporalFormat;
import com.datastax.oss.dsbulk.codecs.api.format.temporal.TemporalFormat;
import com.datastax.oss.dsbulk.codecs.api.format.temporal.ZonedTemporalFormat;
import com.datastax.oss.dsbulk.codecs.api.util.CodecUtils;
import com.datastax.oss.dsbulk.codecs.api.util.CqlTemporalFormat;
import com.datastax.oss.dsbulk.codecs.api.util.OverflowStrategy;
import com.datastax.oss.dsbulk.codecs.api.util.TemporalFormat;
import com.datastax.oss.dsbulk.codecs.api.util.ZonedTemporalFormat;
import com.datastax.oss.dsbulk.codecs.text.string.StringToIntegerCodec;
import com.datastax.oss.dsbulk.codecs.text.string.StringToLongCodec;
import com.datastax.oss.protocol.internal.response.result.ColumnSpec;
Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<url>http://www.datastax.com</url>
</organization>
<modules>
<module>text</module>
<module>common</module>
</modules>
<properties>
Expand All @@ -40,8 +41,8 @@
<java.release.version>8</java.release.version>
<kafka.connect.version>2.4.0</kafka.connect.version>
<caffeine.version>2.6.2</caffeine.version>
<oss.driver.version>4.6.0</oss.driver.version>
<dsbulk.version>1.6.0</dsbulk.version>
<oss.driver.version>4.16.0</oss.driver.version>
<dsbulk.version>1.10.0</dsbulk.version>
<reactive-streams.version>1.0.3</reactive-streams.version>
<guava.version>25.1-jre</guava.version>
<slf4j.version>1.7.25</slf4j.version>
Expand Down
4 changes: 4 additions & 0 deletions text/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# DataStax Bulk Loader Codecs - Text

This module contains implementations of the ConvertingCodec API for Strings and Json.
Json conversion is done using FasterXML Jackson library.
131 changes: 131 additions & 0 deletions text/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright DataStax, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>dsbulk-codecs</artifactId>
<groupId>com.datastax.oss</groupId>
<version>1.10.0</version>
<relativePath/>
</parent>
<version>1.11.0-vectors-preview</version>
<artifactId>dsbulk-codecs-text</artifactId>
<name>DataStax Bulk Loader - Codecs - Text - Vectors preview</name>
<description>Text codecs for the DataStax Bulk Loader (String and JSON).</description>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>dsbulk-bom</artifactId>
<version>1.10.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.16.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>dsbulk-codecs-api</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-shaded-guava</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>dsbulk-tests</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<compilerId>javac</compilerId>
<forceJavacCompilerUse>true</forceJavacCompilerUse>
<fork>true</fork>
<useIncrementalCompilation>false</useIncrementalCompilation>
<compilerArgs>
<arg>-Werror</arg>
</compilerArgs>
<release>8</release>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>animal-sniffer-maven-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.dsbulk.codecs.text;

import com.datastax.oss.dsbulk.codecs.api.CommonConversionContext;
import com.datastax.oss.dsbulk.codecs.text.json.JsonCodecUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;

public class TextConversionContext extends CommonConversionContext {

public static final String OBJECT_MAPPER = "OBJECT_MAPPER";

public TextConversionContext() {
addAttribute(OBJECT_MAPPER, JsonCodecUtils.getObjectMapper());
}

public TextConversionContext setObjectMapper(@NonNull ObjectMapper objectMapper) {
addAttribute(OBJECT_MAPPER, Objects.requireNonNull(objectMapper));
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.dsbulk.codecs.text.json;

import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import com.fasterxml.jackson.core.json.JsonReadFeature;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.math.BigDecimal;

public class JsonCodecUtils {

public static final GenericType<JsonNode> JSON_NODE_TYPE = GenericType.of(JsonNode.class);

/**
* A {@link JsonNodeFactory} that preserves {@link BigDecimal} scales, used to generate Json
* nodes.
*/
public static final JsonNodeFactory JSON_NODE_FACTORY =
JsonNodeFactory.withExactBigDecimals(true);

/**
* The object mapper to use for converting Json nodes to and from Java types in Json codecs.
*
* <p>This is not the object mapper used by the Json connector to read and write Json files.
*
* @return The object mapper to use for converting Json nodes to and from Java types in Json
* codecs.
*/
public static ObjectMapper getObjectMapper() {
return JsonMapper.builder()
.nodeFactory(JSON_NODE_FACTORY)
// create a somewhat lenient mapper that recognizes a slightly relaxed Json syntax when
// parsing
.enable(JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES)
.enable(JsonReadFeature.ALLOW_MISSING_VALUES)
.enable(JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS)
.enable(JsonReadFeature.ALLOW_SINGLE_QUOTES)
// fail on trailing tokens: the entire input must be parsed
.enable(DeserializationFeature.FAIL_ON_TRAILING_TOKENS)
.build();
}
}
Loading