Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: (do not merge) zombie fencing improvement #209

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -390,16 +390,6 @@ public String controlTopic() {
return getString(CONTROL_TOPIC_PROP);
}

public String controlGroupId() {
String result = getString(CONTROL_GROUP_ID_PROP);
if (result != null) {
return result;
}
String connectorName = connectorName();
Preconditions.checkNotNull(connectorName, "Connector name cannot be null");
return DEFAULT_CONTROL_GROUP_PREFIX + connectorName;
}

public String connectGroupId() {
String result = getString(CONNECT_GROUP_ID_PROP);
if (result != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void open(Collection<TopicPartition> partitions) {
Collection<MemberDescription> members = groupDesc.members();
if (isLeader(members, partitions)) {
LOG.info("Task elected leader, starting commit coordinator");
Coordinator coordinator = new Coordinator(catalog, config, members, clientFactory);
Coordinator coordinator = new Coordinator(catalog, config, members, clientFactory, context);
coordinatorThread = new CoordinatorThread(coordinator);
coordinatorThread.start();
}
Expand All @@ -98,7 +98,6 @@ public void open(Collection<TopicPartition> partitions) {
LOG.info("Starting commit worker");
IcebergWriterFactory writerFactory = new IcebergWriterFactory(catalog, config);
worker = new Worker(config, clientFactory, writerFactory, context);
worker.syncCommitOffsets();
worker.start();
}

Expand Down Expand Up @@ -169,10 +168,8 @@ private void processControlEvents() {
@Override
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
if (worker == null) {
return ImmutableMap.of();
}
return worker.commitOffsets();
// offset commit is handled by the worker
return ImmutableMap.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,10 +46,10 @@ public abstract class Channel {
private static final Logger LOG = LoggerFactory.getLogger(Channel.class);

private final String controlTopic;
private final String controlGroupId;
private final String groupId;
private final String connectGroupId;
private final Producer<String, byte[]> producer;
private final Consumer<String, byte[]> consumer;
private final SinkTaskContext context;
private final Admin admin;
private final Map<Integer, Long> controlTopicOffsets = Maps.newHashMap();
private final String producerId;
Expand All @@ -58,10 +58,11 @@ public Channel(
String name,
String consumerGroupId,
IcebergSinkConfig config,
KafkaClientFactory clientFactory) {
KafkaClientFactory clientFactory,
SinkTaskContext context) {
this.controlTopic = config.controlTopic();
this.controlGroupId = config.controlGroupId();
this.groupId = config.controlGroupId();
this.connectGroupId = config.connectGroupId();
this.context = context;

String transactionalId = name + config.transactionalSuffix();
this.producer = clientFactory.createProducer(transactionalId);
Expand Down Expand Up @@ -95,9 +96,8 @@ protected void send(List<Event> events, Map<TopicPartition, Offset> sourceOffset
try {
recordList.forEach(producer::send);
if (!sourceOffsets.isEmpty()) {
// TODO: this doesn't fence zombies
producer.sendOffsetsToTransaction(
offsetsToCommit, new ConsumerGroupMetadata(controlGroupId));
offsetsToCommit, KafkaUtils.getConsumerGroupMetadata(context, connectGroupId));
}
producer.commitTransaction();
} catch (Exception e) {
Expand All @@ -124,7 +124,7 @@ record -> {

Event event = Event.decode(record.value());

if (event.groupId().equals(groupId)) {
if (event.groupId().equals(connectGroupId)) {
LOG.debug("Received event of type: {}", event.type().name());
if (receive(new Envelope(event, record.partition(), record.offset()))) {
LOG.info("Handled event of type: {}", event.type().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -74,16 +75,17 @@ public Coordinator(
Catalog catalog,
IcebergSinkConfig config,
Collection<MemberDescription> members,
KafkaClientFactory clientFactory) {
KafkaClientFactory clientFactory,
SinkTaskContext context) {
// pass consumer group ID to which we commit low watermark offsets
super("coordinator", config.controlGroupId() + "-coord", config, clientFactory);
super("coordinator", config.connectGroupId() + "-coord", config, clientFactory, context);

this.catalog = catalog;
this.config = config;
this.totalPartitionCount =
members.stream().mapToInt(desc -> desc.assignment().topicPartitions().size()).sum();
this.snapshotOffsetsProp =
String.format(OFFSETS_SNAPSHOT_PROP_FMT, config.controlTopic(), config.controlGroupId());
String.format(OFFSETS_SNAPSHOT_PROP_FMT, config.controlTopic(), config.connectGroupId());
this.exec = ThreadPools.newWorkerPool("iceberg-committer", config.commitThreads());
this.commitState = new CommitState(config);
}
Expand All @@ -94,7 +96,7 @@ public void process() {
commitState.startNewCommit();
Event event =
new Event(
config.controlGroupId(),
config.connectGroupId(),
EventType.COMMIT_REQUEST,
new CommitRequestPayload(commitState.currentCommitId()));
send(event);
Expand Down Expand Up @@ -153,7 +155,7 @@ private void doCommit(boolean partialCommit) {

Event event =
new Event(
config.controlGroupId(),
config.connectGroupId(),
EventType.COMMIT_COMPLETE,
new CommitCompletePayload(commitState.currentCommitId(), vtts));
send(event);
Expand Down Expand Up @@ -240,7 +242,7 @@ private void commitToTable(
Long snapshotId = latestSnapshot(table, branch.orElse(null)).snapshotId();
Event event =
new Event(
config.controlGroupId(),
config.connectGroupId(),
EventType.COMMIT_TABLE,
new CommitTablePayload(
commitState.currentCommitId(), TableName.of(tableIdentifier), snapshotId, vtts));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@
package io.tabular.iceberg.connect.channel;

import java.util.concurrent.ExecutionException;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkTaskContext;

public class KafkaUtils {

private static final String CONTEXT_CLASS_NAME =
"org.apache.kafka.connect.runtime.WorkerSinkTaskContext";

public static ConsumerGroupDescription consumerGroupDescription(
String consumerGroupId, Admin admin) {
try {
Expand All @@ -40,5 +47,16 @@ public static ConsumerGroupDescription consumerGroupDescription(
}
}

@SuppressWarnings("unchecked")
public static ConsumerGroupMetadata getConsumerGroupMetadata(
SinkTaskContext context, String connectGroupId) {
if (CONTEXT_CLASS_NAME.equals(context.getClass().getName())) {
return ((Consumer<byte[], byte[]>)
DynFields.builder().hiddenImpl(CONTEXT_CLASS_NAME, "consumer").build(context).get())
.groupMetadata();
}
return new ConsumerGroupMetadata(connectGroupId);
}

private KafkaUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package io.tabular.iceberg.connect.channel;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.IcebergWriterFactory;
Expand All @@ -38,15 +37,10 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

Expand All @@ -55,7 +49,6 @@ public class Worker extends Channel {
private final IcebergSinkConfig config;
private final IcebergWriterFactory writerFactory;
private final SinkTaskContext context;
private final String controlGroupId;
private final Map<String, RecordWriter> writers;
private final Map<TopicPartition, Offset> sourceOffsets;

Expand All @@ -69,34 +62,16 @@ public Worker(
"worker",
IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + UUID.randomUUID(),
config,
clientFactory);
clientFactory,
context);

this.config = config;
this.writerFactory = writerFactory;
this.context = context;
this.controlGroupId = config.controlGroupId();
this.writers = Maps.newHashMap();
this.sourceOffsets = Maps.newHashMap();
}

public void syncCommitOffsets() {
Map<TopicPartition, Long> offsets =
commitOffsets().entrySet().stream()
.collect(toMap(Entry::getKey, entry -> entry.getValue().offset()));
context.offset(offsets);
}

public Map<TopicPartition, OffsetAndMetadata> commitOffsets() {
try {
ListConsumerGroupOffsetsResult response = admin().listConsumerGroupOffsets(controlGroupId);
return response.partitionsToOffsetAndMetadata().get().entrySet().stream()
.filter(entry -> context.assignment().contains(entry.getKey()))
.collect(toMap(Entry::getKey, Entry::getValue));
} catch (InterruptedException | ExecutionException e) {
throw new ConnectException(e);
}
}

public void process() {
consumeAvailable(Duration.ZERO);
}
Expand Down Expand Up @@ -138,7 +113,7 @@ protected boolean receive(Envelope envelope) {
.map(
writeResult ->
new Event(
config.controlGroupId(),
config.connectGroupId(),
EventType.COMMIT_RESPONSE,
new CommitResponsePayload(
writeResult.partitionStruct(),
Expand All @@ -150,7 +125,7 @@ protected boolean receive(Envelope envelope) {

Event readyEvent =
new Event(
config.controlGroupId(),
config.connectGroupId(),
EventType.COMMIT_READY,
new CommitReadyPayload(commitId, assignments));
events.add(readyEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
public class ChannelTestBase {
protected static final String SRC_TOPIC_NAME = "src-topic";
protected static final String CTL_TOPIC_NAME = "ctl-topic";
protected static final String CONTROL_CONSUMER_GROUP_ID = "cg-connector";
protected static final String CONNECT_CONSUMER_GROUP_ID = "cg-connect";
protected InMemoryCatalog catalog;
protected Table table;
protected IcebergSinkConfig config;
Expand All @@ -80,7 +80,7 @@ private InMemoryCatalog initInMemoryCatalog() {

protected static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id";
protected static final String OFFSETS_SNAPSHOT_PROP =
String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, CONTROL_CONSUMER_GROUP_ID);
String.format("kafka.connect.offsets.%s.%s", CTL_TOPIC_NAME, CONNECT_CONSUMER_GROUP_ID);
protected static final String VTTS_SNAPSHOT_PROP = "kafka.connect.vtts";

@BeforeEach
Expand All @@ -93,7 +93,7 @@ public void before() {
config = mock(IcebergSinkConfig.class);
when(config.controlTopic()).thenReturn(CTL_TOPIC_NAME);
when(config.commitThreads()).thenReturn(1);
when(config.controlGroupId()).thenReturn(CONTROL_CONSUMER_GROUP_ID);
when(config.connectGroupId()).thenReturn(CONNECT_CONSUMER_GROUP_ID);
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));

TopicPartitionInfo partitionInfo = mock(TopicPartitionInfo.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.tabular.iceberg.connect.channel;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import io.tabular.iceberg.connect.events.CommitCompletePayload;
Expand All @@ -44,6 +45,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types.StructType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -161,7 +163,9 @@ private UUID coordinatorTest(List<DataFile> dataFiles, List<DeleteFile> deleteFi
when(config.commitIntervalMs()).thenReturn(0);
when(config.commitTimeoutMs()).thenReturn(Integer.MAX_VALUE);

Coordinator coordinator = new Coordinator(catalog, config, ImmutableList.of(), clientFactory);
SinkTaskContext context = mock(SinkTaskContext.class);
Coordinator coordinator =
new Coordinator(catalog, config, ImmutableList.of(), clientFactory, context);
coordinator.start();

// init consumer after subscribe()
Expand All @@ -180,7 +184,7 @@ private UUID coordinatorTest(List<DataFile> dataFiles, List<DeleteFile> deleteFi

Event commitResponse =
new Event(
config.controlGroupId(),
config.connectGroupId(),
EventType.COMMIT_RESPONSE,
new CommitResponsePayload(
StructType.of(),
Expand All @@ -193,7 +197,7 @@ private UUID coordinatorTest(List<DataFile> dataFiles, List<DeleteFile> deleteFi

Event commitReady =
new Event(
config.controlGroupId(),
config.connectGroupId(),
EventType.COMMIT_READY,
new CommitReadyPayload(
commitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void workerTest(Map<String, Object> value) {
UUID commitId = UUID.randomUUID();
Event commitRequest =
new Event(
config.controlGroupId(), EventType.COMMIT_REQUEST, new CommitRequestPayload(commitId));
config.connectGroupId(), EventType.COMMIT_REQUEST, new CommitRequestPayload(commitId));
byte[] bytes = Event.encode(commitRequest);
consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", bytes));

Expand Down