Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRAFT: Dead letter implementations #244

Closed
wants to merge 1 commit into from

Conversation

fqtab
Copy link
Contributor

@fqtab fqtab commented Apr 23, 2024

It might be a little hard to see WriteExceptionHandler is all we really need to add to fit all the use cases.
This PR should demonstrate how you should be able to fit the most common usecases within the WriteExceptionHandler framework.

I want to write out the original byte array to a dead-letter-table

In theory (i.e. NOT definitively proven yet) ...

For the usecase of writing out the original Kafka key/value byte arrays, we have to do a little bit of work:
- My idea here would be to write a Converter/SMT which preserves the original key/value byte arrays somewhere.
- I'm thinking in the SinkRecord's key or the SinkRecord's headers or SinkRecords schemas parameters
- This SinkRecord is passed to the SinkTask.put method
- When an exception is thrown, our WriteExceptionHandler.handle method would pull out the key/value byte arrays from the SinkRecord's parameters/headers/key and return a new SinkRecord with those values
- The connector would then write that new SinkRecord to the dead-letter table

Importantly, what I'm trying to avoid is having special record structure that the connector code needs to understand in order to do the right thing. The ideal situation for me is the connector is oblivious to "good" and "dead-letter-records-from-Converter/SMT-failures."

I want to write out the original JSON to the dead-letter-table

Same approach as above

I want to capture deserialization/SMT failures somehere!

Two options:

  • Use kafka-connect's built-in DLQ
  • Use dynamic-tables mode + an SMT that on error produces a SinkRecord destined for your dead-letter-table (see sample implementation in this PR)

I want to capture null records somewhere!

Two options:

  • Use kafka-connect's built-in DLQ by throwing a NullRecordException :D
  • Use dynamic-tables mode + an SMT that for null records produces a SinkRecord destined for your null-records table

I don't want to write bad records to Iceberg, I want to write to Kafka!

See KafkaDLQ for a sample implementation.

@fqtab fqtab marked this pull request as draft April 23, 2024 14:37
@fqtab fqtab mentioned this pull request Apr 23, 2024
@fqtab fqtab changed the base branch from main to error_handler April 23, 2024 14:51
@fqtab fqtab force-pushed the dead_letter_implementations branch from fb10fea to 524bb76 Compare April 23, 2024 14:53

// TODO: bring in Lists and Maps classes library to get rid of SuppressWarning annotation
@SuppressWarnings("RegexpSingleline")
public class ExceptionHandlingTransform implements Transformation<SinkRecord> {
Copy link
Contributor Author

@fqtab fqtab Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very basic implementation just to demonstrate what I'm thinking of in terms of using an exception handling transform to produce dead-letter-table records directly.

We will probably want all the bells and whistles @tabmatfournier implemented in his PR: #233

@fqtab fqtab force-pushed the dead_letter_implementations branch from 524bb76 to 13a6337 Compare April 23, 2024 14:59
@fqtab
Copy link
Contributor Author

fqtab commented Apr 23, 2024

@tabmatfournier the other PR just FYI

@fqtab fqtab mentioned this pull request Apr 23, 2024
@tabmatfournier
Copy link
Contributor

tabmatfournier commented Apr 23, 2024

Importantly, what I'm trying to avoid is having special record structure that the connector code needs to understand in order to do the right thing. The ideal situation for me is the connector is oblivious to "good" and "dead-letter-records-from-Converter/SMT-failures."

TBH you are still doing this, you're just changing the shape of what the special record is. The connector has to know about dead-letter-records-from-converter during failure in order to dig out the original bytes.

@tabmatfournier
Copy link
Contributor

tabmatfournier commented Apr 23, 2024

  • I'm thinking in the SinkRecord's key or the SinkRecord's headers or SinkRecords schemas parameters

Not this simple.

  • the key may be null, a string, a json, a struct, a map, or some other value. You would have to overwrite the key and assume no one is using it downstream if you want to use the key.
  • SinkRecord may not have a schema (it's possible it's a Map for instance, in which case the schema is null). Even if it does have a schema, the parameters are a Map<String, String> so you have have to base64 encode your bytes in order to put them in (and decode them out later) if you wanted to do this.

Headers are probably the best bet but least nice to work with. This is why I went for the custom struct approach. Either way you are writing some custom code that knows how to dig this out of the record, and putting that into the connector. In my approach this was determined by a config option --not using dead letter/error transform? nothing goes looking for this (no runtime cost).

@fqtab
Copy link
Contributor Author

fqtab commented Apr 24, 2024

The connector has to know about dead-letter-records-from-converter during failure in order to dig out the original bytes.

The connector code doesn't need to know.
The WriteExceptionHandler code will need to know.

SinkRecord may not have a schema (it's possible it's a Map for instance, in which case the schema is null). Even if it does have a schema, the parameters are a Map<String, String> so you have have to base64 encode your bytes in order to put them in (and decode them out later) if you wanted to do this.

Let's definitely not use schema parameters then lol (that's why I had it last in the list)

You would have to overwrite the key and assume no one is using it downstream if you want to use the key.

Yea absolutely, but I thought the idea with the ExceptionHandlingTransform was it would be the one and only SMT involved (it can run user supplied SMTs internally). So there would be no downstream (other than SinkTask.put obviously)?

Even with your approach, if another SMT is involved downstream of the ExceptionHandlingTransform, it could break things, no?

Headers are probably the best bet but least nice to work with.

I would prefer key for that reason personally.

In my approach this was determined by a config option --not using dead letter/error transform? nothing goes looking for this (no runtime cost).

This approach should also have no runtime cost if you're not using a WriteExceptionHandler.
(The code will use the MultiTableWriter instead of wrapping that in an ExceptionHandlingTableWriter if no WriteExceptionHandler is supplied).

@tabmatfournier
Copy link
Contributor

The connector has to know about dead-letter-records-from-converter during failure in order to dig out the original bytes.

The connector code doesn't need to know. The WriteExceptionHandler code will need to know.

SinkRecord may not have a schema (it's possible it's a Map for instance, in which case the schema is null). Even if it does have a schema, the parameters are a Map<String, String> so you have have to base64 encode your bytes in order to put them in (and decode them out later) if you wanted to do this.

Let's definitely not use schema parameters then lol (that's why I had it last in the list)

You would have to overwrite the key and assume no one is using it downstream if you want to use the key.

Yea absolutely, but I thought the idea with the ExceptionHandlingTransform was it would be the one and only SMT involved (it can run user supplied SMTs internally). So there would be no downstream (other than SinkTask.put obviously)?

Even with your approach, if another SMT is involved downstream of the ExceptionHandlingTransform, it could break things, no?

Headers are probably the best bet but least nice to work with.

I would prefer key for that reason personally.

In my approach this was determined by a config option --not using dead letter/error transform? nothing goes looking for this (no runtime cost).

This approach should also have no runtime cost if you're not using a WriteExceptionHandler. (The code will use the MultiTableWriter instead of wrapping that in an ExceptionHandlingTableWriter if no WriteExceptionHandler is supplied).

I'm hesitant to override the key. We are now providing a lot of customization downstream, people may be digging information out of the key. I don't think we can override/replace what is there.

@fqtab fqtab closed this May 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants