From 88231f732a804fe1c75c9f082500762b3d1faa6c Mon Sep 17 00:00:00 2001 From: Zike Yang Date: Wed, 6 Sep 2023 19:18:18 +0800 Subject: [PATCH] [improve][pip] PIP-297: Support terminating Function & Connector with the fatal exception (#21079) ### Motivation This PIP is to improve the current function exception handler. It will be applied to both Pulsar Function and Pulsar IO Connector. For more details, please refer to `pip-297.md` --- pip/pip-297.md | 213 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 pip/pip-297.md diff --git a/pip/pip-297.md b/pip/pip-297.md new file mode 100644 index 0000000000000..2985864beed43 --- /dev/null +++ b/pip/pip-297.md @@ -0,0 +1,213 @@ +# Title: Support terminating Function & Connector with the fatal exception + +# Background knowledge + +The **Pulsar Function** is a serverless computing framework that runs on top of Pulsar and processes messages. + +The **Pulsar IO Connector** is a framework that allows users to easily integrate Pulsar with external systems, such as +databases, messaging systems, and data pipelines. With Pulsar IO Connector, you can create, deploy, and manage +connectors that read data from or write data to Pulsar topics. There are two types of Pulsar IO Connectors: source and +sink. A **source connector** imports data from another system to Pulsar, while a **sink connector** exports data from +Pulsar to another system. The Pulsar IO Connector is implemented based on the Pulsar Function framework. So in +the following, we treat the connector as a special kind of function. The `function` refers to both function and +connector. + +**Function Instance** is a running instance of a Pulsar IO Connector that interacts with a specific external system or a +Pulsar Function that processes messages from the topic. + +**Function Framework** is a framework for running the Function instance. + +**Function Context** is an interface that provides access to various information and resources for the connector or the +function. The function context is passed to the connector or the function when it is initialized, and then can be used +to interact with the Pulsar system. + +## The current implementation of the exception handler + +**Function instance thread**: The function framework initializes a thread for each function instance to handle the +core logic of the function/connector, including consuming messages from the Pulsar topic for the sink connector, +executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the +exception, etc. And let's define the **Connector thread/Function thread** as a thread that is created by the connector +or function itself. + +**Exception handling logic**: The function itself can throw exceptions, and this thread will catch the exception and +then close the function. This means that the function will stop working until it is restarted manually or +automatically by the function framework. + +Even though it is not explicitly defined, there are two types of exceptions that should be handled by the function or +the framework: + +- **Fatal exception**: This is an exception that the function cannot recover from by itself and needs to notify the + framework to terminate it. These are fatal exceptions that indicate a configuration issue, a logic error, or an + incompatible system. The function framework will catch these exceptions, report them to users, and terminate the + function. +- **Non-fatal exception** is an exception that the function instance don't need to be terminated for. It could be + handled by the connector or function itself. Or be thrown by the function. This exception won't cause the function + instance to be terminated. + +### How to handle exceptions thrown from connectors + +All the exceptions thrown form the connector are treated as fatal exceptions. + +If the exception is thrown from the function instance thread, the function framework will catch the exception and +terminate the function instance. + +If the exception is thrown from the connector thread that is created by the connector itself, the function framework +will not be able to catch the exception and terminate the function instance. The connector will hang forever. +The `Motivation` part will talk more about this case. + +If the exception is thrown from the external system, the connector implementation could treat it as a retryable +exception and retry to process the message later, or throw it to indicate it as a fatal exception. + +### How to handle exceptions thrown from functions + +All the exceptions thrown from the pulsar function are treated as non-fatal exceptions. The function framework will +catch the exception and log it. But it will not terminate the function instance. + +There is no way for the function developer to throw a fatal exception to the function framework to terminate the +function instance. + +# Motivation + +Currently, the connector and function cannot terminate the function instance if there are fatal exceptions thrown +outside the function instance thread. The current implementation of the connector and Pulsar Function exception handler +cannot handle the fatal exceptions that are thrown outside the function instance thread. + +For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If +any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will +not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue +here: https://github.com/apache/pulsar/issues/9464 + +The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from +an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has +been observed for the Kafka source connector: https://github.com/apache/pulsar/issues/9464. We have fixed it by adding +the notifyError method to the `PushSource` class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this +does not solve the same problem that all source connectors face because not all connectors are implemented based on +the `PushSource` class. + +The problem is same for the Pulsar Function. Currently, the function can't throw fatal exceptions to the function +framework. We need to provide a way for the function developer to implement it. + +We need a way for the connector and function developers to throw fatal exceptions outside the function instance +thread. The function framework should catch these exceptions and terminate the function accordingly. + +# Goals + +## In Scope + +- Support terminating the function instance with fatal exceptions +- This proposal will apply both to the Pulsar Function and the Pulsar Connector. + +## Out of Scope + +- The fixes of the exception-raising issue mentioned in the Motivation part for all the connectors are not included in + this PIP. This PIP only provides the feature for the connector developer to terminate the function instance. The fixes + should be in several different PRs. + +# High Level Design + +Introduce a new method `fatal` to the context. All the connector implementation code and the function code +can use this context and call the `fatal` method to terminate the instance while raising a fatal exception. + +After the connector or function raises the fatal exception, the function instance thread will be interrupted. +The function framework then could catch the exception, log it, and then terminate the function instance. + +# Detailed Design + +## Design & Implementation Details + +This PIP proposes to add a new method`fatal`to the context `BaseContext`. This method allows the connector or the +function code to report a fatal exception to the function framework and terminate the instance. The `SinkContext` +and `SourceContext` are all inherited from `BaseContext`. Therefore, all the sink connectors and source connectors can +invoke this new method. The pulsar function context class `Context` is also inherited from `BaseContext`. Therefore, the +function code can also invoke this new method. + +In the `fatal` method, the function instance thread will be interrupted. The function instance thread can then +catch the interrupt exception and get the fatal exception. The function framework then logs this exception, +reports to the metrics, and finally terminates the function instance. + +Tbe behavior when invoking the `fatal` method: + +- For the connector thread or function thread: + - Invoke the `fatal` method + - Send the exception to the function framework. There is a field `deathException` in the + class `JavaInstanceRunnable` that is used to store the fatal exception. + - Interrupt the function instance thread +- For the function instance thread: + - Catch the interrupt exception + - Get the exception from the function framework + - Report the log and metrics + - Close the function instance + +## Public-facing Changes + +### Public API + +Introduce `fatal` method to the `BaseContext`: + +```java +public interface BaseContext { + /** + * Terminate the function instance with a fatal exception. + * + * @param t the fatal exception to be raised + */ + void fatal(Throwable t); +} +``` + +### Binary protocol + +No changes for this part. + +### Configuration + +No changes for this part. + +### CLI + +No changes for this part. + +### Metrics + +No changes for this part. + +# Monitoring + +No changes for this part. + +# Security Considerations + +No security-related changes. +The new method `fatal` will only take effect on the current function instance. It won't affect other function instances +even they are in the same function worker. + +# Backward & Forward Compatibility + +## Revert + +No operation required. + +## Upgrade + +No operation required. + +# Alternatives + +## Using futures to handle results or exceptions returned the connector + +The benefit of this solution is that it makes the use of exception throwing more intuitive to the connector developer. + +But it requires changes to existing interfaces, including `Source` and `Sink`, which can complicate connector +development. And we still need the `fatal` method to handle some cases such as terminating the instance in code outside +of the message processing logic. This alternative solution can't handle this case. + +Meanwhile, the implementation of this solution will also be more complex, involving changes to the core message +processing logic of the function framework. We need to turn the entire message processing logic into an asynchronous +pattern. + +# General Notes + +# Links + +* Mailing List discussion thread: https://lists.apache.org/thread/j59gzzwjp8c48lwv5poddm9qzlp2hol0 +* Mailing List voting thread: https://lists.apache.org/thread/ggok3c2601mnbdomr65v3pjth3lk6fr8