Skip to content

Commit

Permalink
Add some sample handler implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 committed Apr 23, 2024
1 parent 925dbcb commit 524bb76
Show file tree
Hide file tree
Showing 6 changed files with 398 additions and 0 deletions.
21 changes: 21 additions & 0 deletions kafka-connect-exception-handlers/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
plugins {
id "java-test-fixtures"
}

dependencies {
compileOnly project(":iceberg-kafka-connect")
compileOnly libs.bundles.kafka.connect

testImplementation libs.junit.api
testRuntimeOnly libs.junit.engine

testImplementation libs.assertj
}

publishing {
publications {
mavenJava(MavenPublication) {
from components.java
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;

public class DeadLetterTable {

private DeadLetterTable() {}

public static final String NAME = "dead-letter";

public static final Schema SCHEMA =
// This is just a simple dead letter table schema just for demonstration purposes
// Users can get as complicated as they wish
SchemaBuilder.struct()
.field("topic", Schema.STRING_SCHEMA)
.field("partition", Schema.INT32_SCHEMA)
.field("offset", Schema.INT64_SCHEMA)
.field("exception", Schema.STRING_SCHEMA)
.field("target_table", Schema.STRING_SCHEMA)
.schema();

public static SinkRecord sinkRecord(
SinkRecord original, Exception exception, String deadLetterTableName) {
Struct struct = new Struct(SCHEMA);
struct.put("topic", original.topic());
struct.put("partition", original.kafkaPartition());
struct.put("offset", original.kafkaOffset());
struct.put("exception", exception.toString());
struct.put("target_table", deadLetterTableName);
return original.newRecord(
original.topic(),
original.kafkaPartition(),
null,
null,
SCHEMA,
struct,
original.timestamp());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.handler;

import io.tabular.iceberg.connect.DeadLetterTable;
import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.WriteExceptionHandler;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

/** Sample DLQ implementation that sends records to a fixed dead-letter table. */
public class IcebergDLQ implements WriteExceptionHandler {
@Override
public void initialize(SinkTaskContext context, IcebergSinkConfig config) {}

@Override
public Result handle(SinkRecord sinkRecord, String tableName, Exception exception) {
return new WriteExceptionHandler.Result(
// Users could customize here and create a more sophisticated SinkRecord that includes:
// - key/value bytes
// - connector name
// - connector version
// etc.
DeadLetterTable.sinkRecord(sinkRecord, exception, DeadLetterTable.NAME),
// Users could customize here and make a dead-letter table per topic or whatever they want
DeadLetterTable.NAME);
}

@Override
public void preCommit() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.handler;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.WriteExceptionHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

/**
* Sends any record that can't be handled to Kafka.
*
* <p>Provides only at-least-once guarantees which is a limitation of the Kafka Connect {@link
* ErrantRecordReporter} API.
*/
class KafkaDLQ implements WriteExceptionHandler {
private SinkTaskContext context;

private List<Future<Void>> pendingFutures;

@SuppressWarnings("RegexpSingleline")
@Override
public void initialize(SinkTaskContext sinkTaskContext, IcebergSinkConfig config) {
this.context = sinkTaskContext;
this.pendingFutures = new ArrayList<>();
}

private boolean isDone(Future<Void> voidFuture) {
if (voidFuture.isDone()) {
try {
voidFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return true;
} else {
return false;
}
}

@Override
public Result handle(SinkRecord sinkRecord, String tableName, Exception exception) {
// TODO: I would much rather do something smarter with callbacks
// but I only get the old java.util.concurrent.Future interface here >_<
pendingFutures.add(context.errantRecordReporter().report(sinkRecord, exception));
pendingFutures.removeIf(this::isDone);
return null;
}

@Override
public void preCommit() {
pendingFutures.forEach(
voidFuture -> {
try {
voidFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
}
Loading

0 comments on commit 524bb76

Please sign in to comment.