Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WriteExceptionHandler #243

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String NAME_PROP = "name";
private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers";

private static final String WRITE_EXCEPTION_HANDLER_CLASS_PROP =
"experimental.write-exception-handler.class";

private static final String DEFAULT_CATALOG_NAME = "iceberg";
private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg";
public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-";
Expand Down Expand Up @@ -237,6 +240,12 @@ private static ConfigDef newConfigDef() {
null,
Importance.MEDIUM,
"Coordinator threads to use for table commits, default is (cores * 2)");
configDef.define(
WRITE_EXCEPTION_HANDLER_CLASS_PROP,
Type.STRING,
null,
Importance.MEDIUM,
"The WriteExceptionHandler implementation to use to handle exceptions when writing records to files");
return configDef;
}

Expand Down Expand Up @@ -451,6 +460,10 @@ public JsonConverter jsonConverter() {
return jsonConverter;
}

public String writeExceptionHandlerClassName() {
return getString(WRITE_EXCEPTION_HANDLER_CLASS_PROP);
}

private Map<String, String> loadWorkerProps() {
String javaCmd = System.getProperty("sun.java.command");
if (javaCmd != null && !javaCmd.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect;

import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

public interface WriteExceptionHandler {
void initialize(SinkTaskContext context, IcebergSinkConfig config);

class Result {
private final SinkRecord sinkRecord;
private final String tableName;

public Result(SinkRecord sinkRecord, String tableName) {
this.sinkRecord = sinkRecord;
this.tableName = tableName;
}

public SinkRecord sinkRecord() {
return sinkRecord;
}

public String tableName() {
return tableName;
}
}

/**
* This method will be invoked whenever the connector runs into an exception while trying to write
* SinkRecords to a table. Implementations of this method have 3 general options:
*
* <ol>
* <li>Return a SinkRecord and the name of the table to write to (wrapped inside a {@link
* Result})
* <li>Return null to drop the SinkRecord
* <li>Throw an exception to fail the task
* </ol>
*
* @param sinkRecord The SinkRecord that couldn't be written
* @param tableName The table the SinkRecord couldn't be written to
* @param exception The exception encountered while trying to write the SinkRecord
*/
Result handle(SinkRecord sinkRecord, String tableName, Exception exception) throws Exception;

/**
* This method will be invoked prior to committing allowing advanced {@link WriteExceptionHandler}
* implementations to complete any inflight work before the connector commits.
*
* <p>Note that there is no guarantee that the connector will successfully commit after this
* method is called.
*/
void preCommit();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.channel;

import io.tabular.iceberg.connect.WriteExceptionHandler;
import io.tabular.iceberg.connect.data.Utilities;
import io.tabular.iceberg.connect.data.WriterResult;
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;

class ExceptionHandlingTableWriter implements TableWriter, AutoCloseable {

private final TableWriter underlying;
private final WriteExceptionHandler exceptionHandler;

ExceptionHandlingTableWriter(TableWriter underlying, WriteExceptionHandler exceptionHandler) {
this.underlying = underlying;
this.exceptionHandler = exceptionHandler;
}

@Override
public void write(SinkRecord sinkRecord, String tableName, boolean ignoreMissingTable) {
try {
underlying.write(sinkRecord, tableName, ignoreMissingTable);
Copy link
Contributor

@tabmatfournier tabmatfournier Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very broad net here which makes me uncomfortable.

In my PR there is a reason why I introduced CatalogAPI wrapping a bunch of the calls to Iceberg / changes to IcebergWriterFactory where most of this is coming from. Is it schema failing? is it creating the table failing? is it evolving the table failure? Is it the catalog failing? is it the partitionspec failing? Much more fine grained control on the errors. This is catching a ton of errors from many sources and the opportunities for getting it wrong are large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep something like CatalogAPI that throws clear Exception classes e.g. SchemaEvolveError, TableCreateError, PartitionEvolutionError. I'm not opposed to that.

Having the broad net here is necessary to enable WriteExceptionHandler implementations. It's down to the WriteExceptionHandler implementation to decide which errors it wants to recover from and which it doesn't. I most certainly don't recommend sending everything that throws an Exception to the dead-letter-table as some of those could be transient.

Copy link
Contributor

@tabmatfournier tabmatfournier Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raise a DeadLetterException wrapping the underlying exception in some cases. Again, it's not that easy/clean when you start putting it all together.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also cause infinite loops if you fail to create the dead letter table (possibly because the dead letter route has been set to an invalid value). Also an issue that this breaks due to whether or not autocreate tables is configured on.

Again, this is why CatalogAPI exists in the other API:

  • it knows when you are creating / doing something with the dead letter table, so throw those errors instead of attempting to throw them into a dead letter table and do not catch the exception
  • when you are doing something with a regular record/not dead letter table, then apply the error handler.

I can still do the above w/ the error handler, but you can't have this broad write here --too many things downstream, some of which involve hard coded configs, some of which will involve the dead letter table --e.g. we support partitioning the dead letter table via the normal way to provide partition specs for any table, if that fails, you need to hard fail the connector.

The above would infinite loop in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will also cause infinite loops

I deliberately wrote it recursively. If users (which could be us, the maintainers!) feel an exception is transient, they're welcome to retry via the API, I don't feel we should restrict that option.

possibly because the dead letter route has been set to an invalid value

There is no dead-letter-table-route with this approach as far as the connector code is concerned.
The WriteExceptionHandler may certainly try to write to a dead-letter-table but that is a WriteExceptionHandler implementation detail.

I can still do the above w/ the error handler, but you can't have this broad write here --too many things downstream, some of which involve hard coded configs, some of which will involve the dead letter table --e.g. we support partitioning the dead letter table via the normal way to provide partition specs for any table, if that fails, you need to hard fail the connector.

With this approach, a dead-letter-table is like any other ordinary table so you can still "partitioning the dead letter table via the normal way to provide partition specs for any table."

if that fails, you need to hard fail the connector.

No problems, you can still achieve that here with the WriteExceptionHandler approach. The WriteExceptionHandler implementation just needs to ensure that if it sees a PartitionSpecEvolutionError for what it considers to be a dead-letter-table, it should just hard-fail :)

Again, this is why CatalogAPI exists in the other API:

Like I said, I'm not opposed to having a concept like the Catalog API in the connector code if it makes it easier to react to clear and actionable exceptions.

} catch (Exception exception) {
final WriteExceptionHandler.Result result;
try {
result = exceptionHandler.handle(sinkRecord, tableName, exception);
} catch (Exception e) {
throw new RuntimeException(e);
}

if (result != null) {
// TODO: ignoreMissingTables=false make sense here? I _think_ so but I could also expose
// it
write(result.sinkRecord(), result.tableName(), false);
}
}
}

@Override
public List<WriterResult> committable() {
exceptionHandler.preCommit();
return underlying.committable();
}

@Override
public void close() {
Utilities.close(exceptionHandler);
Utilities.close(underlying);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.channel;

import io.tabular.iceberg.connect.data.IcebergWriterFactory;
import io.tabular.iceberg.connect.data.RecordWriter;
import io.tabular.iceberg.connect.data.Utilities;
import io.tabular.iceberg.connect.data.WriterResult;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.connect.sink.SinkRecord;

class MultiTableWriter implements TableWriter, AutoCloseable {

private final IcebergWriterFactory writerFactory;
private final Map<String, RecordWriter> writers;

MultiTableWriter(IcebergWriterFactory writerFactory) {
this.writerFactory = writerFactory;
this.writers = Maps.newHashMap();
}

private RecordWriter writerForTable(
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
return writers.computeIfAbsent(
tableName, notUsed -> writerFactory.createWriter(tableName, sample, ignoreMissingTable));
}

@Override
public void write(SinkRecord sinkRecord, String tableName, boolean ignoreMissingTable) {
writerForTable(tableName, sinkRecord, ignoreMissingTable).write(sinkRecord);
}

private void closeWriters() {
writers.values().forEach(Utilities::close);
writers.clear();
}

@Override
public List<WriterResult> committable() {
List<WriterResult> writerResults =
writers.values().stream()
.flatMap(writer -> writer.complete().stream())
.collect(Collectors.toList());

closeWriters();

return writerResults;
}

@Override
public void close() {
closeWriters();
Utilities.close(writerFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.channel;

import io.tabular.iceberg.connect.data.WriterResult;
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;

interface TableWriter {
void write(SinkRecord sinkRecord, String tableName, boolean ignoreMissingTable);

List<WriterResult> committable();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.channel;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.WriteExceptionHandler;
import io.tabular.iceberg.connect.data.IcebergWriterFactory;
import io.tabular.iceberg.connect.data.Utilities;
import io.tabular.iceberg.connect.data.WriterResult;
import java.util.List;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

class TableWriterImpl implements TableWriter, AutoCloseable {

private final TableWriter underlying;

private WriteExceptionHandler loadHandler(String name) {
ClassLoader loader = this.getClass().getClassLoader();
final Object obj;
try {
Class<?> clazz = Class.forName(name, true, loader);
obj = clazz.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(String.format("Could not initialize class %s", name), e);
}

final WriteExceptionHandler writeExceptionHandler = (WriteExceptionHandler) obj;

return writeExceptionHandler;
}

TableWriterImpl(
SinkTaskContext context, IcebergSinkConfig config, IcebergWriterFactory writerFactory) {
MultiTableWriter baseTableWriter = new MultiTableWriter(writerFactory);
if (config.writeExceptionHandlerClassName() == null) {
this.underlying = baseTableWriter;
} else {
WriteExceptionHandler handler = loadHandler(config.writeExceptionHandlerClassName());
handler.initialize(context, config);
this.underlying = new ExceptionHandlingTableWriter(baseTableWriter, handler);
}
}

@Override
public void write(SinkRecord sinkRecord, String tableName, boolean ignoreMissingTable) {
underlying.write(sinkRecord, tableName, ignoreMissingTable);
}

@Override
public List<WriterResult> committable() {
return underlying.committable();
}

@Override
public void close() {
Utilities.close(underlying);
}
}
Loading
Loading