Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

ISSUE-5866: Flink event time based windowing functions not working with pulsar #8

Closed
sijie opened this issue Dec 26, 2019 · 0 comments

Comments

@sijie
Copy link
Member

sijie commented Dec 26, 2019

Original Issue: apache/pulsar#824638502752
We are using Event time based windowing functions in Flink. The source is a partitioned Pulsar topic . The timestamp field is decided by the processing function in flink.
The event time is extracted from the message by attaching a TimeStampExtractor to the stream which extracts time from the message

Expected behavior

The window should get triggered once the watermark is crossed.

Actual behavior

We noticed that the window does not triggered though the watermark is crossed.

Steps to reproduce

Attaching the sample code.
pulsar-flink-preload-new.zip

System configuration

Pulsar version: 2.4.2
Flink version : 1.8.2

@sijie sijie closed this as completed Dec 26, 2019
@sijie sijie reopened this Dec 26, 2019
@sijie sijie closed this as completed Dec 26, 2019
streamnativebot pushed a commit that referenced this issue Nov 12, 2020
)

### Motivation

The `AckGroupingTrackerEnabled`'s timer callback only captures `this`, which is a weak reference to the `AckGroupingTrackerEnabled ` instance. If the instance went out of the scope and destroyed, `this` would point to an invalid block.

Even if the destructor of `AckGroupingTrackerEnabled` cancels the timer, the callback may not be triggered immediately. There's still a possibility that when the callback is triggered, the error code is 0 but accessing to `this` is invalid. For example, there's a crash caused by the callback in production environment that is hard to reproduce:

```
#6 <signal handler called>
#7 0x00007fb4e67c5cb8 in ?? ()
#8 0x00007fb604981adb in operator() (ec=..., __closure=0x7fb52b0fb230)
   at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc:148
#9 operator() (this=0x7fb52b0fb230) at /usr/local/include/boost/asio/detail/bind_handler.hpp:47
```

### Modifications

- Use `std::shared_ptr` instead of `std::unique_ptr` for `AckGroupingTrackerEnabled`, then capture the shared pointer in timer callback's lambda expression to extend the lifetime of `this`.
- Add `start()` method to `AckGroupingTracker` to avoid `std::bad_weak_ptr` because `shared_from_this()` in a constructor returns a null pointer.
- Use `std::weak_ptr` to reference `HandlerBase` in case that the handler may be invalid when the timer callback is triggered.
codelipenghui pushed a commit that referenced this issue Nov 13, 2020
)

### Motivation

The `AckGroupingTrackerEnabled`'s timer callback only captures `this`, which is a weak reference to the `AckGroupingTrackerEnabled ` instance. If the instance went out of the scope and destroyed, `this` would point to an invalid block.

Even if the destructor of `AckGroupingTrackerEnabled` cancels the timer, the callback may not be triggered immediately. There's still a possibility that when the callback is triggered, the error code is 0 but accessing to `this` is invalid. For example, there's a crash caused by the callback in production environment that is hard to reproduce:

```
#6 <signal handler called>
#7 0x00007fb4e67c5cb8 in ?? ()
#8 0x00007fb604981adb in operator() (ec=..., __closure=0x7fb52b0fb230)
   at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc:148
#9 operator() (this=0x7fb52b0fb230) at /usr/local/include/boost/asio/detail/bind_handler.hpp:47
```

### Modifications

- Use `std::shared_ptr` instead of `std::unique_ptr` for `AckGroupingTrackerEnabled`, then capture the shared pointer in timer callback's lambda expression to extend the lifetime of `this`.
- Add `start()` method to `AckGroupingTracker` to avoid `std::bad_weak_ptr` because `shared_from_this()` in a constructor returns a null pointer.
- Use `std::weak_ptr` to reference `HandlerBase` in case that the handler may be invalid when the timer callback is triggered.

(cherry picked from commit cfa65d0)
(cherry picked from commit 98591c4)
zymap pushed a commit that referenced this issue Dec 4, 2020
)

### Motivation

The `AckGroupingTrackerEnabled`'s timer callback only captures `this`, which is a weak reference to the `AckGroupingTrackerEnabled ` instance. If the instance went out of the scope and destroyed, `this` would point to an invalid block.

Even if the destructor of `AckGroupingTrackerEnabled` cancels the timer, the callback may not be triggered immediately. There's still a possibility that when the callback is triggered, the error code is 0 but accessing to `this` is invalid. For example, there's a crash caused by the callback in production environment that is hard to reproduce:

```
#6 <signal handler called>
#7 0x00007fb4e67c5cb8 in ?? ()
#8 0x00007fb604981adb in operator() (ec=..., __closure=0x7fb52b0fb230)
   at /usr/local/src/apache-pulsar-microfocus/pulsar-client-cpp/lib/AckGroupingTrackerEnabled.cc:148
#9 operator() (this=0x7fb52b0fb230) at /usr/local/include/boost/asio/detail/bind_handler.hpp:47
```

### Modifications

- Use `std::shared_ptr` instead of `std::unique_ptr` for `AckGroupingTrackerEnabled`, then capture the shared pointer in timer callback's lambda expression to extend the lifetime of `this`.
- Add `start()` method to `AckGroupingTracker` to avoid `std::bad_weak_ptr` because `shared_from_this()` in a constructor returns a null pointer.
- Use `std::weak_ptr` to reference `HandlerBase` in case that the handler may be invalid when the timer callback is triggered.

(cherry picked from commit cfa65d0)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

1 participant