Skip to content

Commit

Permalink
apache#2367 fix code style
Browse files Browse the repository at this point in the history
  • Loading branch information
laglangyue committed Aug 10, 2022
1 parent 048fded commit 871804a
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog;

import com.mysql.cj.MysqlType;
import com.mysql.cj.jdbc.result.ResultSetImpl;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
Expand All @@ -29,7 +28,6 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -43,129 +41,117 @@
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;

/**
* dm database
*
* @author Lager.Tang
* create: 2022-08-06 13:06
**/
public class DMCatalog extends AbstractJdbcCatalog {

public DMCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
super(catalogName, defaultDatabase, username, pwd, baseUrl);
}

public DMCatalog(String catalogName, String username, String pwd, String defaultUrl) {
super(catalogName, username, pwd, defaultUrl);
}

private static final Set<String> SYS_DATABASES = new HashSet<>(4);
private static final Set<String> SYS_DATABASES = new HashSet<>(4);

static {
SYS_DATABASES.add("SYSSSO");
SYS_DATABASES.add("CTISYS");
SYS_DATABASES.add("SYS");
SYS_DATABASES.add("SYSAUDITOR");
}

@Override
public List<String> listDatabases() throws CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
static {
SYS_DATABASES.add("SYSSSO");
SYS_DATABASES.add("CTISYS");
SYS_DATABASES.add("SYS");
SYS_DATABASES.add("SYSAUDITOR");
}

PreparedStatement ps = conn.prepareStatement("SELECT DISTINCT object_name FROM ALL_OBJECTS WHERE OBJECT_TYPE = 'SCH';");
public DMCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
super(catalogName, defaultDatabase, username, pwd, baseUrl);
}

List<String> databases = new ArrayList<>();
ResultSet rs = ps.executeQuery();
public DMCatalog(String catalogName, String username, String pwd, String defaultUrl) {
super(catalogName, username, pwd, defaultUrl);
}

while (rs.next()) {
String databaseName = rs.getString(1);
if (!SYS_DATABASES.contains(databaseName)) {
databases.add(rs.getString(1));
@Override
public List<String> listDatabases() throws CatalogException {
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

PreparedStatement ps = conn.prepareStatement("SELECT DISTINCT object_name FROM ALL_OBJECTS WHERE OBJECT_TYPE = 'SCH';");

List<String> databases = new ArrayList<>();
ResultSet rs = ps.executeQuery();

while (rs.next()) {
String databaseName = rs.getString(1);
if (!SYS_DATABASES.contains(databaseName)) {
databases.add(rs.getString(1));
}
}
return databases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", this.catalogName), e);
}
}
return databases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", this.catalogName), e);
}
}

@Override
public List<String> listTables(String databaseName) throws CatalogException, DatabaseNotExistException {
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(this.catalogName, databaseName);
}
@Override
public List<String> listTables(String databaseName) throws CatalogException, DatabaseNotExistException {
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(this.catalogName, databaseName);
}

try (Connection conn = DriverManager.getConnection(baseUrl + databaseName, username, pwd)) {
PreparedStatement ps =
conn.prepareStatement(String.format("select * from all_tables where owner = '%s';", databaseName));
try (Connection conn = DriverManager.getConnection(baseUrl + databaseName, username, pwd)) {
PreparedStatement ps =
conn.prepareStatement(String.format("select * from all_tables where owner = '%s';", databaseName));

ResultSet rs = ps.executeQuery();
ResultSet rs = ps.executeQuery();

List<String> tables = new ArrayList<>();
List<String> tables = new ArrayList<>();

while (rs.next()) {
tables.add(rs.getString(1));
}
while (rs.next()) {
tables.add(rs.getString(1));
}

return tables;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", catalogName), e);
return tables;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", catalogName), e);
}
}
}

@Override
public CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(catalogName, tablePath);
}
@Override
public CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(catalogName, tablePath);
}

String dbUrl = baseUrl + tablePath.getDatabaseName();
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
DatabaseMetaData metaData = conn.getMetaData();
Optional<PrimaryKey> primaryKey =
getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName());
String dbUrl = baseUrl + tablePath.getDatabaseName();
try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
DatabaseMetaData metaData = conn.getMetaData();
Optional<PrimaryKey> primaryKey =
getPrimaryKey(metaData, tablePath.getDatabaseName(), tablePath.getTableName());

PreparedStatement ps =
conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1 = 0;", tablePath.getFullName()));
PreparedStatement ps =
conn.prepareStatement(String.format("SELECT * FROM %s WHERE 1 = 0;", tablePath.getFullName()));

ResultSetMetaData tableMetaData = ps.getMetaData();
ResultSetMetaData tableMetaData = ps.getMetaData();

TableSchema.Builder builder = TableSchema.builder();
for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
builder.physicalColumn(tableMetaData.getColumnName(i), type);
}
TableSchema.Builder builder = TableSchema.builder();
for (int i = 1; i <= tableMetaData.getColumnCount(); i++) {
SeaTunnelDataType<?> type = fromJdbcType(tableMetaData, i);
builder.physicalColumn(tableMetaData.getColumnName(i), type);
}

primaryKey.ifPresent(builder::primaryKey);
primaryKey.ifPresent(builder::primaryKey);

TableIdentifier tableIdentifier = TableIdentifier.of(catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
return CatalogTable.of(tableIdentifier, builder.build(), buildConnectorOptions(tablePath), Collections.emptyList(), "");
} catch (Exception e) {
throw new CatalogException(String.format("Failed getting table %s", tablePath.getFullName()), e);
}
}


/**
* @see com.mysql.cj.MysqlType
* @see ResultSetImpl#getObjectStoredProc(int, int)
*/
private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) throws SQLException {
return null;
}

TableIdentifier tableIdentifier = TableIdentifier.of(catalogName, tablePath.getDatabaseName(), tablePath.getTableName());
return CatalogTable.of(tableIdentifier, builder.build(), buildConnectorOptions(tablePath), Collections.emptyList(), "");
} catch (Exception e) {
throw new CatalogException(String.format("Failed getting table %s", tablePath.getFullName()), e);
@SuppressWarnings("MagicNumber")
private Map<String, String> buildConnectorOptions(TablePath tablePath) {
return null;
}
}


/**
* @see com.mysql.cj.MysqlType
* @see ResultSetImpl#getObjectStoredProc(int, int)
*/
private SeaTunnelDataType<?> fromJdbcType(ResultSetMetaData metadata, int colIndex) throws SQLException {
// todo:
return null;
}

@SuppressWarnings("MagicNumber")
private Map<String, String> buildConnectorOptions(TablePath tablePath) {
// todo:
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

/** Factory for {@link DMDialect}. */

/**
* Factory for {@link DMDialect}.
*/
@AutoService(JdbcDialectFactory.class)
public class DMDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:dm:");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

public class DMJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return "MySql";
Expand Down
Loading

0 comments on commit 871804a

Please sign in to comment.