diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java index e9edf0e1..f0952bf3 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriter.java @@ -32,8 +32,13 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.sink.SinkRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IcebergWriter implements RecordWriter { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergWriter.class); + private final Table table; private final String tableName; private final IcebergSinkConfig config; @@ -91,6 +96,7 @@ private Record convertToRow(SinkRecord record) { flush(); // apply the schema updates, this will refresh the table SchemaUtils.applySchemaUpdates(table, updates); + LOG.info("Table schema evolution on table {} caused by record at topic: {}, partition: {}, offset: {}", table.name(), record.topic(), record.kafkaPartition(), record.kafkaOffset()); // initialize a new writer with the new schema initNewWriter(); // convert the row again, this time using the new table schema diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java index a11d1cf1..2bbf0788 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/IcebergWriterFactory.java @@ -105,6 +105,7 @@ Table autoCreateTable(String tableName, SinkRecord sample) { result.set( catalog.createTable( identifier, schema, partitionSpec, config.autoCreateProps())); + LOG.info("Created new table {} from record at topic: {}, partition: {}, offset: {}", identifier, sample.topic(), sample.kafkaPartition(), sample.kafkaOffset()); } }); return result.get();