Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-8464][Sort] Add JDBC connector on Flink 1.15 #10491

Merged
merged 10 commits into from
Jun 26, 2024
Merged
108 changes: 108 additions & 0 deletions inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/jdbc/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connectors-v1.15</artifactId>
<version>1.13.0-SNAPSHOT</version>
</parent>

<artifactId>sort-connector-jdbc-v1.15</artifactId>
<name>Apache InLong - Sort-connector-jdbc</name>
<properties>
<inlong.root.dir>${project.parent.parent.parent.parent.parent.basedir}</inlong.root.dir>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-flink-dependencies-v1.15</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>audit-sdk</artifactId>
<version>${project.version}</version>
</dependency>
<!--for clickhouse-->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
</dependency>
<!--for jdbc-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<goals>
<goal>shade</goal>
</goals>
<phase>package</phase>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>org.apache.inlong:sort-connector-*</artifact>
<includes>
<include>org/apache/inlong/**</include>
<include>META-INF/services/org.apache.flink.table.factories.Factory</include>
<include>META-INF/services/org.apache.inlong.sort.jdbc.dialect.JdbcDialectFactory</include>
</includes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.jdbc.converter.clickhouse;

import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.RowType;

/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for ClickHouse.
*/
public class ClickHouseRowConverter extends AbstractJdbcRowConverter {

private static final long serialVersionUID = 1L;

public ClickHouseRowConverter(RowType rowType) {
super(rowType);
}

@Override
public String converterName() {
return "ClickHouse";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.jdbc.dialect;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;

/**
* A factory to create a specific {@link JdbcDialect}. This factory is used with Java's Service
* Provider Interfaces (SPI) for discovering.
*
* <p>Classes that implement this interface can be added to the
* "META_INF/services/org.apache.inlong.sort.jdbc.dialect.JdbcDialectFactory" file of a JAR file
* in the current classpath to be found.
*
* @see JdbcDialect
* copy from {@link org.apache.flink.connector.jdbc.internal.JdbcOutputFormat}
* not modified
*/
@PublicEvolving
public interface JdbcDialectFactory {

/**
* Retrieves whether the dialect thinks that it can open a connection to the given URL.
* Typically, dialects will return <code>true</code> if they understand the sub-protocol
* specified in the URL and <code>false</code> if they do not.
*
* @param url the URL of the database
* @return <code>true</code> if this dialect understands the given URL; <code>false</code>
* otherwise.
*/
boolean acceptsURL(String url);

/** @return Creates a new instance of the {@link JdbcDialect}. */
JdbcDialect create();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.jdbc.dialect;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.List;
import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.stream.Collectors;

/** Utility for working with {@link JdbcDialect}.
* copy from {@link org.apache.flink.connector.jdbc.internal.JdbcOutputFormat}
* not modified
* */
@Internal
public final class JdbcDialectLoader {

private static final Logger LOG = LoggerFactory.getLogger(JdbcDialectLoader.class);

private JdbcDialectLoader() {
}

/**
* Loads the unique JDBC Dialect that can handle the given database url.
*
* @param url A database URL.
* @throws IllegalStateException if the loader cannot find exactly one dialect that can
* unambiguously process the given database URL.
* @return The loaded dialect.
*/
public static JdbcDialect load(String url) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();

List<JdbcDialectFactory> foundFactories = discoverFactories(cl);

if (foundFactories.isEmpty()) {
throw new IllegalStateException(
String.format(
"Could not find any jdbc dialect factories that implement '%s' in the classpath.",
JdbcDialectFactory.class.getName()));
}

final List<JdbcDialectFactory> matchingFactories =
foundFactories.stream().filter(f -> f.acceptsURL(url)).collect(Collectors.toList());

if (matchingFactories.isEmpty()) {
throw new IllegalStateException(
String.format(
"Could not find any jdbc dialect factory that can handle url '%s' that implements '%s' in the classpath.\n\n"
+ "Available factories are:\n\n"
+ "%s",
url,
JdbcDialectFactory.class.getName(),
foundFactories.stream()
.map(f -> f.getClass().getName())
.distinct()
.sorted()
.collect(Collectors.joining("\n"))));
}
if (matchingFactories.size() > 1) {
throw new IllegalStateException(
String.format(
"Multiple jdbc dialect factories can handle url '%s' that implement '%s' found in the classpath.\n\n"
+ "Ambiguous factory classes are:\n\n"
+ "%s",
url,
JdbcDialectFactory.class.getName(),
matchingFactories.stream()
.map(f -> f.getClass().getName())
.sorted()
.collect(Collectors.joining("\n"))));
}

return matchingFactories.get(0).create();
}

private static List<JdbcDialectFactory> discoverFactories(ClassLoader classLoader) {
try {
final List<JdbcDialectFactory> result = new LinkedList<>();
ServiceLoader.load(JdbcDialectFactory.class, classLoader)
.iterator()
.forEachRemaining(result::add);
return result;
} catch (ServiceConfigurationError e) {
LOG.error("Could not load service provider for jdbc dialects factory.", e);
throw new RuntimeException(
"Could not load service provider for jdbc dialects factory.", e);
}
}
}
Loading
Loading