Skip to content

Commit

Permalink
[improve][pip] PIP-297: Support terminating Function & Connector with…
Browse files Browse the repository at this point in the history
… the fatal exception (#21079)

### Motivation

This PIP is to improve the current function exception handler. It will be applied to both Pulsar Function and Pulsar IO Connector.

For more details, please refer to `pip-297.md`
  • Loading branch information
RobertIndie authored Sep 6, 2023
1 parent 209f222 commit 88231f7
Showing 1 changed file with 213 additions and 0 deletions.
213 changes: 213 additions & 0 deletions pip/pip-297.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# Title: Support terminating Function & Connector with the fatal exception

# Background knowledge

The **Pulsar Function** is a serverless computing framework that runs on top of Pulsar and processes messages.

The **Pulsar IO Connector** is a framework that allows users to easily integrate Pulsar with external systems, such as
databases, messaging systems, and data pipelines. With Pulsar IO Connector, you can create, deploy, and manage
connectors that read data from or write data to Pulsar topics. There are two types of Pulsar IO Connectors: source and
sink. A **source connector** imports data from another system to Pulsar, while a **sink connector** exports data from
Pulsar to another system. The Pulsar IO Connector is implemented based on the Pulsar Function framework. So in
the following, we treat the connector as a special kind of function. The `function` refers to both function and
connector.

**Function Instance** is a running instance of a Pulsar IO Connector that interacts with a specific external system or a
Pulsar Function that processes messages from the topic.

**Function Framework** is a framework for running the Function instance.

**Function Context** is an interface that provides access to various information and resources for the connector or the
function. The function context is passed to the connector or the function when it is initialized, and then can be used
to interact with the Pulsar system.

## The current implementation of the exception handler

**Function instance thread**: The function framework initializes a thread for each function instance to handle the
core logic of the function/connector, including consuming messages from the Pulsar topic for the sink connector,
executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the
exception, etc. And let's define the **Connector thread/Function thread** as a thread that is created by the connector
or function itself.

**Exception handling logic**: The function itself can throw exceptions, and this thread will catch the exception and
then close the function. This means that the function will stop working until it is restarted manually or
automatically by the function framework.

Even though it is not explicitly defined, there are two types of exceptions that should be handled by the function or
the framework:

- **Fatal exception**: This is an exception that the function cannot recover from by itself and needs to notify the
framework to terminate it. These are fatal exceptions that indicate a configuration issue, a logic error, or an
incompatible system. The function framework will catch these exceptions, report them to users, and terminate the
function.
- **Non-fatal exception** is an exception that the function instance don't need to be terminated for. It could be
handled by the connector or function itself. Or be thrown by the function. This exception won't cause the function
instance to be terminated.

### How to handle exceptions thrown from connectors

All the exceptions thrown form the connector are treated as fatal exceptions.

If the exception is thrown from the function instance thread, the function framework will catch the exception and
terminate the function instance.

If the exception is thrown from the connector thread that is created by the connector itself, the function framework
will not be able to catch the exception and terminate the function instance. The connector will hang forever.
The `Motivation` part will talk more about this case.

If the exception is thrown from the external system, the connector implementation could treat it as a retryable
exception and retry to process the message later, or throw it to indicate it as a fatal exception.

### How to handle exceptions thrown from functions

All the exceptions thrown from the pulsar function are treated as non-fatal exceptions. The function framework will
catch the exception and log it. But it will not terminate the function instance.

There is no way for the function developer to throw a fatal exception to the function framework to terminate the
function instance.

# Motivation

Currently, the connector and function cannot terminate the function instance if there are fatal exceptions thrown
outside the function instance thread. The current implementation of the connector and Pulsar Function exception handler
cannot handle the fatal exceptions that are thrown outside the function instance thread.

For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If
any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will
not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue
here: https://github.com/apache/pulsar/issues/9464

The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from
an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has
been observed for the Kafka source connector: https://github.com/apache/pulsar/issues/9464. We have fixed it by adding
the notifyError method to the `PushSource` class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this
does not solve the same problem that all source connectors face because not all connectors are implemented based on
the `PushSource` class.

The problem is same for the Pulsar Function. Currently, the function can't throw fatal exceptions to the function
framework. We need to provide a way for the function developer to implement it.

We need a way for the connector and function developers to throw fatal exceptions outside the function instance
thread. The function framework should catch these exceptions and terminate the function accordingly.

# Goals

## In Scope

- Support terminating the function instance with fatal exceptions
- This proposal will apply both to the Pulsar Function and the Pulsar Connector.

## Out of Scope

- The fixes of the exception-raising issue mentioned in the Motivation part for all the connectors are not included in
this PIP. This PIP only provides the feature for the connector developer to terminate the function instance. The fixes
should be in several different PRs.

# High Level Design

Introduce a new method `fatal` to the context. All the connector implementation code and the function code
can use this context and call the `fatal` method to terminate the instance while raising a fatal exception.

After the connector or function raises the fatal exception, the function instance thread will be interrupted.
The function framework then could catch the exception, log it, and then terminate the function instance.

# Detailed Design

## Design & Implementation Details

This PIP proposes to add a new method`fatal`to the context `BaseContext`. This method allows the connector or the
function code to report a fatal exception to the function framework and terminate the instance. The `SinkContext`
and `SourceContext` are all inherited from `BaseContext`. Therefore, all the sink connectors and source connectors can
invoke this new method. The pulsar function context class `Context` is also inherited from `BaseContext`. Therefore, the
function code can also invoke this new method.

In the `fatal` method, the function instance thread will be interrupted. The function instance thread can then
catch the interrupt exception and get the fatal exception. The function framework then logs this exception,
reports to the metrics, and finally terminates the function instance.

Tbe behavior when invoking the `fatal` method:

- For the connector thread or function thread:
- Invoke the `fatal` method
- Send the exception to the function framework. There is a field `deathException` in the
class `JavaInstanceRunnable` that is used to store the fatal exception.
- Interrupt the function instance thread
- For the function instance thread:
- Catch the interrupt exception
- Get the exception from the function framework
- Report the log and metrics
- Close the function instance

## Public-facing Changes

### Public API

Introduce `fatal` method to the `BaseContext`:

```java
public interface BaseContext {
/**
* Terminate the function instance with a fatal exception.
*
* @param t the fatal exception to be raised
*/
void fatal(Throwable t);
}
```

### Binary protocol

No changes for this part.

### Configuration

No changes for this part.

### CLI

No changes for this part.

### Metrics

No changes for this part.

# Monitoring

No changes for this part.

# Security Considerations

No security-related changes.
The new method `fatal` will only take effect on the current function instance. It won't affect other function instances
even they are in the same function worker.

# Backward & Forward Compatibility

## Revert

No operation required.

## Upgrade

No operation required.

# Alternatives

## Using futures to handle results or exceptions returned the connector

The benefit of this solution is that it makes the use of exception throwing more intuitive to the connector developer.

But it requires changes to existing interfaces, including `Source` and `Sink`, which can complicate connector
development. And we still need the `fatal` method to handle some cases such as terminating the instance in code outside
of the message processing logic. This alternative solution can't handle this case.

Meanwhile, the implementation of this solution will also be more complex, involving changes to the core message
processing logic of the function framework. We need to turn the entire message processing logic into an asynchronous
pattern.

# General Notes

# Links

* Mailing List discussion thread: https://lists.apache.org/thread/j59gzzwjp8c48lwv5poddm9qzlp2hol0
* Mailing List voting thread: https://lists.apache.org/thread/ggok3c2601mnbdomr65v3pjth3lk6fr8

0 comments on commit 88231f7

Please sign in to comment.