Skip to content

Commit

Permalink
Merge pull request ibm-messaging#21 from ibm-messaging/19-remove-dead…
Browse files Browse the repository at this point in the history
…-code-and-packages

19 remove dead code and packages
  • Loading branch information
iamgollum authored Apr 20, 2020
2 parents dd16c44 + 46a5554 commit 56daad7
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import com.ibm.eventstreams.connect.jdbcsink.sink.datasource.database.IDatabase;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.beans.PropertyVetoException;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Map;
Expand Down Expand Up @@ -55,10 +56,10 @@ public class JDBCSinkTask extends SinkTask {
DatabaseFactory databaseFactory = new DatabaseFactory();
try {
this.database = databaseFactory.makeDatabase(this.config);
} catch (PropertyVetoException e) {
} catch (Exception e) {
log.error("Failed to build the database {} ", e);
e.printStackTrace();
// TODO: do something else here?
throw e;
}

log.trace("[{}] Exit {}.start", Thread.currentThread().getId(), classname);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@
*/
public class PooledDataSource implements IDataSource {

private ComboPooledDataSource datasource;
private ComboPooledDataSource dataSource;

private PooledDataSource(
ComboPooledDataSource dataSource
) {
this.datasource = dataSource;
this.dataSource = dataSource;
}

@Override public Connection getConnection() throws SQLException {
return datasource.getConnection();
return dataSource.getConnection();
}

public static class Builder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,41 +29,43 @@

public class DatabaseFactory {
private static final Logger log = LoggerFactory.getLogger(JDBCSinkTask.class);
private boolean databaseInsertMode; // TODO: handle this

public IDatabase makeDatabase(JDBCSinkConfig config) throws PropertyVetoException {
public IDatabase makeDatabase(JDBCSinkConfig config) {

log.warn("DatabaseFactory: makeDatabase");

String jdbcUrl = config.getString(JDBCSinkConfig.CONFIG_NAME_CONNECTION_URL);

DatabaseType database = DatabaseType.fromJdbcUrl(jdbcUrl);
DatabaseType databaseType = DatabaseType.fromJdbcUrl(jdbcUrl);

if (database == null) {
if (databaseType == null) {
throw new DatabaseNotSupportedException("Check " + jdbcUrl);
}

String databaseDriver = database.getDriver();
String databaseDriver = databaseType.getDriver();
try {
Class.forName(databaseDriver);
} catch (ClassNotFoundException cnf) {
log.error(database.name() + " JDBC driver not found", cnf);
log.error(databaseType.name() + " JDBC driver not found", cnf);
}

final String username = config.getString(JDBCSinkConfig.CONFIG_NAME_CONNECTION_USER);
final String password = config.getPassword(JDBCSinkConfig.CONFIG_NAME_CONNECTION_PASSWORD).toString();
final int poolSize = config.getInt(JDBCSinkConfig.CONFIG_NAME_CONNECTION_DS_POOL_SIZE);

IDataSource datasource = new PooledDataSource.Builder(
username,
password,
jdbcUrl,
databaseDriver
).withInitialPoolSize(poolSize).build();

databaseInsertMode = config.getBoolean(JDBCSinkConfig.CONFIG_NAME_INSERT_MODE_DATABASELEVEL);
IDataSource dataSource = null;
try {
dataSource = new PooledDataSource.Builder(
username,
password,
jdbcUrl,
databaseDriver
).withInitialPoolSize(poolSize).build();
} catch (PropertyVetoException e) {
log.error(e.toString());
}

return database.create(datasource);
return databaseType.create(dataSource);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.beans.PropertyVetoException;
import java.util.EnumSet;

/**
Expand All @@ -33,18 +32,18 @@
public enum DatabaseType {

db2("com.ibm.db2.jcc.DB2Driver") {
@Override public IDatabase create(IDataSource datasource) throws PropertyVetoException {
return new RelationalDatabase(this, datasource);
@Override public IDatabase create(IDataSource dataSource) {
return new RelationalDatabase(this, dataSource);
}
},
postgresql("org.postgresql.Driver") {
@Override public IDatabase create(IDataSource datasource) throws PropertyVetoException {
return new RelationalDatabase(this, datasource);
@Override public IDatabase create(IDataSource dataSource) {
return new RelationalDatabase(this, dataSource);
}
},
mysql("com.mysql.jdbc.Driver") {
@Override public IDatabase create(IDataSource datasource) throws PropertyVetoException {
return new RelationalDatabase(this, datasource);
@Override public IDatabase create(IDataSource dataSource) {
return new RelationalDatabase(this, dataSource);
}
};

Expand Down Expand Up @@ -82,7 +81,7 @@ public static DatabaseType fromJdbcUrl(String connectionUrl) {
return type;
}

public abstract IDatabase create(IDataSource datasource) throws PropertyVetoException;
public abstract IDatabase create(IDataSource dataSource);

public String getDriver() {
return driver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.ibm.eventstreams.connect.jdbcsink.sink.datasource.IDataSource;
import com.ibm.eventstreams.connect.jdbcsink.sink.datasource.database.writer.IDatabaseWriter;

public interface IDatabase extends IDataSource {
public interface IDatabase {
IDatabaseWriter getWriter();
DatabaseType getType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,17 @@
package com.ibm.eventstreams.connect.jdbcsink.sink.datasource.database;

import com.ibm.eventstreams.connect.jdbcsink.sink.datasource.IDataSource;
import com.ibm.eventstreams.connect.jdbcsink.sink.datasource.PooledDataSource;
import com.ibm.eventstreams.connect.jdbcsink.sink.datasource.database.writer.IDatabaseWriter;
import com.ibm.eventstreams.connect.jdbcsink.sink.datasource.database.writer.JDBCWriter;
import com.mchange.v2.c3p0.ComboPooledDataSource;

import javax.xml.crypto.Data;
import java.beans.PropertyVetoException;
import java.sql.Connection;
import java.sql.SQLException;

public class RelationalDatabase implements IDatabase {

private final IDataSource datasource;
private final IDatabaseWriter writer;
private final DatabaseType type;

public RelationalDatabase(DatabaseType type, IDataSource datasource) throws PropertyVetoException {
// TODO: find SQL statement creator by type and configure writer
this.datasource = datasource;
public RelationalDatabase(DatabaseType type, IDataSource dataSource) {
this.type = type;
this.writer = new JDBCWriter(true, this.datasource);;
}

@Override public Connection getConnection() throws SQLException {
return this.datasource.getConnection();
this.writer = new JDBCWriter(dataSource);;
}

@Override public IDatabaseWriter getWriter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ public interface IDatabaseWriter {
// TODO: handle upserting / idempotency to prevent insertion of duplicate records
boolean insert(final String tableName, final Collection<SinkRecord> records) throws SQLException;


}
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,21 @@ public class JDBCWriter implements IDatabaseWriter{

private static Gson gson = new Gson();

private final boolean isInsertModeDatabaseLevelEnabled;

private static final Logger log = LoggerFactory.getLogger(JDBCSinkTask.class);

private final IDataSource datasource;
private final IDataSource dataSource;

// TODO: Use the strategy pattern with upsert strategies depending on the type of database being supported.
// Comma separated columns, values and relationships - nested json
private static final String INSERT_STATEMENT = "INSERT INTO %s(%, %s, %s) VALUES (%s, %s, %s)";

public JDBCWriter(final boolean insertModeDatabaseLevel, final IDataSource datasource) {
this.isInsertModeDatabaseLevelEnabled = insertModeDatabaseLevel;
this.datasource = datasource;
public JDBCWriter(final IDataSource dataSource) {
this.dataSource = dataSource;
}

private boolean checkTable(String tableName){

try (Connection connection = this.datasource.getConnection()) {
try (Connection connection = this.dataSource.getConnection()) {
connection.setAutoCommit(false);

DatabaseMetaData dbm = connection.getMetaData();
Expand All @@ -68,7 +65,7 @@ public boolean createTable(String tableName) throws SQLException {

final String CREATE_STATEMENT = "CREATE TABLE %s (id INTEGER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, key VARCHAR(32), timestamp VARCHAR(16), value TEXT);";

try (Connection connection = this.datasource.getConnection()){
try (Connection connection = this.dataSource.getConnection()){
Statement statement = connection.createStatement();

final String createQuery = String.format(CREATE_STATEMENT, tableName);
Expand All @@ -89,9 +86,10 @@ public boolean insert(String tableName, Collection<SinkRecord> records) throws S
// tableNameFormat must be processed
// TODO: determine if we should externalize and maintain open connections
// under certain circumstances.
try (Connection connection = this.datasource.getConnection()) {
try (Connection connection = this.dataSource.getConnection()) {
connection.setAutoCommit(false);


log.warn("TABLE EXISTS = " + checkTable("company") + " TABLE EXISTS ");
log.warn("TABLE EXISTS2 = " + checkTable("company2") + " TABLE EXISTS2 ");

Expand Down

This file was deleted.

0 comments on commit 56daad7

Please sign in to comment.