Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE: Refactor plan #230

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jackson-ver = "2.14.2"
junit-ver = "5.10.0"
kafka-ver = "3.5.1"
slf4j-ver = "1.7.36"
testcontainers-ver = "1.17.6"
testcontainers-ver = "1.19.7"


[libraries]
Expand All @@ -37,6 +37,7 @@ jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson-ver" }
kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka-ver" }
kafka-connect-api = { module = "org.apache.kafka:connect-api", version.ref = "kafka-ver" }
kafka-connect-runtime = { module = "org.apache.kafka:connect-runtime", version.ref = "kafka-ver" }
kafka-connect-json = { module = "org.apache.kafka:connect-json", version.ref = "kafka-ver" }
kafka-connect-transforms = { module = "org.apache.kafka:connect-transforms", version.ref = "kafka-ver" }
slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j-ver" }
Expand All @@ -60,7 +61,7 @@ palantir-gradle = "com.palantir.baseline:gradle-baseline-java:4.42.0"
iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet"]
iceberg-ext = ["iceberg-aws", "iceberg-aws-bundle", "iceberg-azure", "iceberg-azure-bundle", "iceberg-gcp","iceberg-gcp-bundle", "iceberg-nessie"]
jackson = ["jackson-core", "jackson-databind"]
kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms"]
kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-runtime", "kafka-connect-json", "kafka-connect-transforms"]


[plugins]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect;

import static io.tabular.iceberg.connect.TestEvent.TEST_SCHEMA;
import static io.tabular.iceberg.connect.TestEvent.TEST_SPEC;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.common.TopicPartition;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;

public class IntegrationControlClusterTest extends IntegrationTestBase {

private static final String TEST_DB = "test";
private static final String TEST_TABLE = "foobar";
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(TEST_DB, TEST_TABLE);

@BeforeEach
public void before() {
createTopic(testTopic, TEST_TOPIC_PARTITIONS);
((SupportsNamespaces) catalog).createNamespace(Namespace.of(TEST_DB));
}

@AfterEach
public void after() {
context.stopConnector(connectorName);
deleteTopic(testTopic);
catalog.dropTable(TableIdentifier.of(TEST_DB, TEST_TABLE));
((SupportsNamespaces) catalog).dropNamespace(Namespace.of(TEST_DB));
}

@ParameterizedTest
@NullSource
public void testIcebergSinkPartitionedTable(String branch)
throws InterruptedException, ExecutionException {
catalog.createTable(
TABLE_IDENTIFIER, TEST_SCHEMA, TEST_SPEC, ImmutableMap.of(FORMAT_VERSION, "2"));

boolean useSchema = branch == null; // use a schema for one of the tests
runTest(branch, useSchema);

List<DataFile> files = dataFiles(TABLE_IDENTIFIER, branch);
// partition may involve 1 or 2 workers
assertThat(files).hasSizeBetween(2, 3);
assertThat(files.stream().mapToLong(DataFile::recordCount).sum()).isEqualTo(4);

List<DeleteFile> deleteFiles = deleteFiles(TABLE_IDENTIFIER, branch);
// partition may involve 1 or 2 workers
assertThat(files).hasSizeBetween(2, 3);
assertThat(deleteFiles.stream().mapToLong(DeleteFile::recordCount).sum()).isEqualTo(2);

assertSnapshotProps(TABLE_IDENTIFIER, branch);
}

private void runTest(String branch, boolean useSchema)
throws InterruptedException, ExecutionException {
// set offset reset to earliest so we don't miss any test messages
KafkaConnectContainer.Config connectorConfig =
new KafkaConnectContainer.Config(connectorName)
.config("topics", testTopic)
.config("connector.class", IcebergSinkConnector.class.getName())
.config("tasks.max", 2)
.config("consumer.override.auto.offset.reset", "earliest")
.config("key.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("key.converter.schemas.enable", false)
.config("value.converter", "org.apache.kafka.connect.json.JsonConverter")
.config("value.converter.schemas.enable", useSchema)
.config("iceberg.tables", String.format("%s.%s", TEST_DB, TEST_TABLE))
.config("iceberg.tables.cdc-field", "op")
.config("iceberg.control.commit.interval-ms", 1000)
.config("iceberg.control.commit.timeout-ms", 10000)
.config("iceberg.kafka.auto.offset.reset", "earliest")
.config(
IcebergSinkConfig.COMMITTER_FACTORY_CLASS_PROP,
IcebergSinkConfig.COMMITTER_FACTORY_V2)
.config(
String.format(
"iceberg.control.kafka.%s", CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
context.getRemoteControlBootstrapServers());

context.connectorCatalogProperties().forEach(connectorConfig::config);

if (branch != null) {
connectorConfig.config("iceberg.tables.default-commit-branch", branch);
}

if (!useSchema) {
connectorConfig.config("value.converter.schemas.enable", false);
}

context.startConnector(connectorConfig);

// start with 3 records, update 1, delete 1. Should be a total of 4 adds and 2 deletes
// (the update will be 1 add and 1 delete)

TestEvent event1 = new TestEvent(1, "type1", new Date(), "hello world!", "I");
TestEvent event2 = new TestEvent(2, "type2", new Date(), "having fun?", "I");

Date threeDaysAgo = Date.from(Instant.now().minus(Duration.ofDays(3)));
TestEvent event3 = new TestEvent(3, "type3", threeDaysAgo, "hello from the past!", "I");

TestEvent event4 = new TestEvent(1, "type1", new Date(), "hello world!", "D");
TestEvent event5 = new TestEvent(3, "type3", threeDaysAgo, "updated!", "U");

send(testTopic, 0, event1, useSchema);
send(testTopic, 1, event2, useSchema);
send(testTopic, 0, event3, useSchema);
send(testTopic, 1, event4, useSchema);
send(testTopic, 0, event5, useSchema);
flush();

Awaitility.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(this::assertSnapshotAdded);

// TODO: assert visible behaviours:
// - control topic exists only on control cluster
// - consumer offsets exists only on control cluster

Map<TopicPartition, Long> controlConsumerOffsets =
controlAdmin
.listConsumerGroupOffsets(
IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + connectorName,
new ListConsumerGroupOffsetsOptions().requireStable(true))
.partitionsToOffsetAndMetadata().get().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, (entry) -> entry.getValue().offset()));

assertThat(controlConsumerOffsets)
.isEqualTo(
ImmutableMap.of(
new TopicPartition(testTopic, 0), 3L,
new TopicPartition(testTopic, 1), 2L));
}

private void assertSnapshotAdded() {
Table table = catalog.loadTable(TABLE_IDENTIFIER);
assertThat(table.snapshots()).hasSize(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class IntegrationTestBase {
protected final TestContext context = TestContext.INSTANCE;
protected S3Client s3;
protected Catalog catalog;
protected Admin admin;
protected Admin sourceAdmin;
protected Admin controlAdmin;

private KafkaProducer<String, String> producer;

Expand All @@ -62,7 +63,8 @@ public void baseBefore() {
s3 = context.initLocalS3Client();
catalog = context.initLocalCatalog();
producer = context.initLocalProducer();
admin = context.initLocalAdmin();
sourceAdmin = context.initLocalSourceAdmin();
controlAdmin = context.initLocalControlAdmin();

this.connectorName = "test_connector-" + UUID.randomUUID();
this.testTopic = "test-topic-" + UUID.randomUUID();
Expand All @@ -78,7 +80,8 @@ public void baseAfter() {
throw new RuntimeException(e);
}
producer.close();
admin.close();
sourceAdmin.close();
controlAdmin.close();
s3.close();
}

Expand Down Expand Up @@ -112,7 +115,7 @@ private Snapshot latestSnapshot(Table table, String branch) {

protected void createTopic(String topicName, int partitions) {
try {
admin
sourceAdmin
.createTopics(ImmutableList.of(new NewTopic(topicName, partitions, (short) 1)))
.all()
.get(10, TimeUnit.SECONDS);
Expand All @@ -123,7 +126,7 @@ protected void createTopic(String topicName, int partitions) {

protected void deleteTopic(String topicName) {
try {
admin.deleteTopics(ImmutableList.of(topicName)).all().get(10, TimeUnit.SECONDS);
sourceAdmin.deleteTopics(ImmutableList.of(topicName)).all().get(10, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
Expand All @@ -134,6 +137,11 @@ protected void send(String topicName, TestEvent event, boolean useSchema) {
producer.send(new ProducerRecord<>(topicName, Long.toString(event.id()), eventStr));
}

protected void send(String topicName, int partition, TestEvent event, boolean useSchema) {
String eventStr = event.serialize(useSchema);
producer.send(new ProducerRecord<>(topicName, partition, Long.toString(event.id()), eventStr));
}

protected void flush() {
producer.flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ public class TestContext {

public static final TestContext INSTANCE = new TestContext();

private final Network network;
private final KafkaContainer kafka;
final Network network;
private final KafkaContainer sourceKafka;
private final KafkaContainer controlKafka;
private final KafkaConnectContainer kafkaConnect;
private final GenericContainer catalog;
private final GenericContainer minio;
Expand All @@ -64,7 +65,7 @@ public class TestContext {
private static final String KC_PLUGIN_DIR = "/test/kafka-connect";

private static final String MINIO_IMAGE = "minio/minio";
private static final String KAFKA_IMAGE = "confluentinc/cp-kafka:7.5.1";
static final String KAFKA_IMAGE = "confluentinc/cp-kafka:7.5.1";
private static final String CONNECT_IMAGE = "confluentinc/cp-kafka-connect:7.5.1";
private static final String REST_CATALOG_IMAGE = "tabulario/iceberg-rest:0.6.0";

Expand Down Expand Up @@ -96,18 +97,19 @@ private TestContext() {
.withEnv("CATALOG_S3_PATH__STYLE__ACCESS", "true")
.withEnv("AWS_REGION", AWS_REGION);

kafka = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE)).withNetwork(network);
sourceKafka = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE)).withNetwork(network);
controlKafka = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE)).withNetwork(network);

kafkaConnect =
new KafkaConnectContainer(DockerImageName.parse(CONNECT_IMAGE))
.withNetwork(network)
.dependsOn(catalog, kafka)
.dependsOn(catalog, sourceKafka)
.withFileSystemBind(LOCAL_INSTALL_DIR, KC_PLUGIN_DIR)
.withEnv("CONNECT_PLUGIN_PATH", KC_PLUGIN_DIR)
.withEnv("CONNECT_BOOTSTRAP_SERVERS", kafka.getNetworkAliases().get(0) + ":9092")
.withEnv("CONNECT_BOOTSTRAP_SERVERS", sourceKafka.getNetworkAliases().get(0) + ":9092")
.withEnv("CONNECT_OFFSET_FLUSH_INTERVAL_MS", "500");

Startables.deepStart(Stream.of(minio, catalog, kafka, kafkaConnect)).join();
Startables.deepStart(Stream.of(minio, catalog, sourceKafka, controlKafka, kafkaConnect)).join();

try (S3Client s3 = initLocalS3Client()) {
s3.createBucket(req -> req.bucket(BUCKET));
Expand All @@ -118,7 +120,8 @@ private TestContext() {

private void shutdown() {
kafkaConnect.close();
kafka.close();
controlKafka.close();
sourceKafka.close();
catalog.close();
minio.close();
network.close();
Expand All @@ -132,8 +135,16 @@ private int getLocalCatalogPort() {
return catalog.getMappedPort(CATALOG_PORT);
}

private String getLocalBootstrapServers() {
return kafka.getBootstrapServers();
private String getLocalSourceBootstrapServers() {
return sourceKafka.getBootstrapServers();
}

private String getLocalControlBootstrapServers() {
return controlKafka.getBootstrapServers();
}

public String getRemoteControlBootstrapServers() {
return controlKafka.getNetworkAliases().get(0) + ":9092";
}

public void startConnector(KafkaConnectContainer.Config config) {
Expand Down Expand Up @@ -198,15 +209,22 @@ public KafkaProducer<String, String> initLocalProducer() {
return new KafkaProducer<>(
ImmutableMap.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
getLocalBootstrapServers(),
getLocalSourceBootstrapServers(),
ProducerConfig.CLIENT_ID_CONFIG,
UUID.randomUUID().toString()),
new StringSerializer(),
new StringSerializer());
}

public Admin initLocalAdmin() {
public Admin initLocalSourceAdmin() {
return Admin.create(
ImmutableMap.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getLocalBootstrapServers()));
ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getLocalSourceBootstrapServers()));
}

public Admin initLocalControlAdmin() {
return Admin.create(
ImmutableMap.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getLocalControlBootstrapServers()));
}
}
3 changes: 3 additions & 0 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ dependencies {
testImplementation libs.mockito
testImplementation libs.assertj

testImplementation libs.testcontainers
testImplementation libs.testcontainers.kafka

testImplementation 'ch.qos.logback:logback-classic:1.5.3'
}

Expand Down
Loading