Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unack message tracker for pre-fetched messages #68

Merged
merged 1 commit into from
Oct 17, 2016

Conversation

merlimat
Copy link
Contributor

@merlimat merlimat commented Oct 17, 2016

Rebase of @sboobna PR at #55

Motivation

Currently, when an application consumer gets stuck, the pre-fetched messages remain in the queue forever and might never reach the application again if they are redelivered to the same consumer (since it will keep lying in the pre-fetched queue), until the application restarts.

Modifications

The UnackedMessageTracker will also track the messages lying in the pre-fetched queue in the client.

Result

The messages will never lie in the pre-fetched queue forever, since they will be redelivered after the ack timeout.

@yahoocla
Copy link

CLA is valid!

@merlimat merlimat added this to the 1.15 milestone Oct 17, 2016
@merlimat merlimat added type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages area/client labels Oct 17, 2016
@merlimat merlimat merged commit 1e9fa9f into apache:master Oct 17, 2016
@merlimat merlimat deleted the sboobna-ack-timeout branch October 18, 2016 17:38
sijie pushed a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
…ache#68)

* Added FunctionActioner to start/stop functions in seperate thread

* Fixed a bug
massakam pushed a commit to massakam/pulsar that referenced this pull request Aug 6, 2020
codelipenghui added a commit to codelipenghui/incubator-pulsar that referenced this pull request Aug 10, 2021
…ader

If we have many ledgers in a managed ledger, for checking if need to delete the offloaded ledger from bookies,
for each ledger, will call getOffloadPolicies() from the Offloader. For the BlobStoreManagedLedgerOffloader we
are generate the offload policies from the properties for each getting operation(Maybe need another PR to find way to optimize this part).
This will lead high CPU usage

Stack:

```
"bookkeeper-ml-workers-OrderedExecutor-4-0" apache#68 prio=5 os_prio=0 tid=0x00007f23663d8000 nid=0xae runnable [0x00007f22b8ac2000]
   java.lang.Thread.State: RUNNABLE
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.pulsar.common.util.FieldParser.convert(FieldParser.java:119)
	at org.apache.pulsar.common.util.FieldParser.value(FieldParser.java:194)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.lambda$create$0(OffloadPolicies.java:265)
	at org.apache.pulsar.common.policies.data.OffloadPolicies$$Lambda$127/540923243.accept(Unknown Source)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.create(OffloadPolicies.java:261)
	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.getOffloadPolicies(BlobStoreManagedLedgerOffloader.java:303)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isOffloadedNeedsDelete(ManagedLedgerImpl.java:2091)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalTrimConsumedLedgers(ManagedLedgerImpl.java:2176)
	- locked <0x00000006a3f2c000> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$trimConsumedLedgersInBackground$24(ManagedLedgerImpl.java:1997)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$$Lambda$722/254878114.run(Unknown Source)
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
```
codelipenghui added a commit to codelipenghui/incubator-pulsar that referenced this pull request Aug 11, 2021
…ader

If we have many ledgers in a managed ledger, for checking if need to delete the offloaded ledger from bookies,
for each ledger, will call getOffloadPolicies() from the Offloader. For the BlobStoreManagedLedgerOffloader we
are generate the offload policies from the properties for each getting operation(Maybe need another PR to find way to optimize this part).
This will lead high CPU usage

Stack:

```
"bookkeeper-ml-workers-OrderedExecutor-4-0" apache#68 prio=5 os_prio=0 tid=0x00007f23663d8000 nid=0xae runnable [0x00007f22b8ac2000]
   java.lang.Thread.State: RUNNABLE
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.pulsar.common.util.FieldParser.convert(FieldParser.java:119)
	at org.apache.pulsar.common.util.FieldParser.value(FieldParser.java:194)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.lambda$create$0(OffloadPolicies.java:265)
	at org.apache.pulsar.common.policies.data.OffloadPolicies$$Lambda$127/540923243.accept(Unknown Source)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.create(OffloadPolicies.java:261)
	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.getOffloadPolicies(BlobStoreManagedLedgerOffloader.java:303)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isOffloadedNeedsDelete(ManagedLedgerImpl.java:2091)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalTrimConsumedLedgers(ManagedLedgerImpl.java:2176)
	- locked <0x00000006a3f2c000> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$trimConsumedLedgersInBackground$24(ManagedLedgerImpl.java:1997)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$$Lambda$722/254878114.run(Unknown Source)
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
```
codelipenghui added a commit that referenced this pull request Aug 11, 2021
…ader (#11629)

* Avoid redundant calls for getting the offload policies from the offloader

If we have many ledgers in a managed ledger, for checking if need to delete the offloaded ledger from bookies,
for each ledger, will call getOffloadPolicies() from the Offloader. For the BlobStoreManagedLedgerOffloader we
are generate the offload policies from the properties for each getting operation(Maybe need another PR to find way to optimize this part).
This will lead high CPU usage

Stack:

```
"bookkeeper-ml-workers-OrderedExecutor-4-0" #68 prio=5 os_prio=0 tid=0x00007f23663d8000 nid=0xae runnable [0x00007f22b8ac2000]
   java.lang.Thread.State: RUNNABLE
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.pulsar.common.util.FieldParser.convert(FieldParser.java:119)
	at org.apache.pulsar.common.util.FieldParser.value(FieldParser.java:194)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.lambda$create$0(OffloadPolicies.java:265)
	at org.apache.pulsar.common.policies.data.OffloadPolicies$$Lambda$127/540923243.accept(Unknown Source)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.create(OffloadPolicies.java:261)
	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.getOffloadPolicies(BlobStoreManagedLedgerOffloader.java:303)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isOffloadedNeedsDelete(ManagedLedgerImpl.java:2091)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalTrimConsumedLedgers(ManagedLedgerImpl.java:2176)
	- locked <0x00000006a3f2c000> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$trimConsumedLedgersInBackground$24(ManagedLedgerImpl.java:1997)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$$Lambda$722/254878114.run(Unknown Source)
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
```
hangc0276 pushed a commit that referenced this pull request Aug 12, 2021
…ader (#11629)

* Avoid redundant calls for getting the offload policies from the offloader

If we have many ledgers in a managed ledger, for checking if need to delete the offloaded ledger from bookies,
for each ledger, will call getOffloadPolicies() from the Offloader. For the BlobStoreManagedLedgerOffloader we
are generate the offload policies from the properties for each getting operation(Maybe need another PR to find way to optimize this part).
This will lead high CPU usage

Stack:

```
"bookkeeper-ml-workers-OrderedExecutor-4-0" #68 prio=5 os_prio=0 tid=0x00007f23663d8000 nid=0xae runnable [0x00007f22b8ac2000]
   java.lang.Thread.State: RUNNABLE
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.pulsar.common.util.FieldParser.convert(FieldParser.java:119)
	at org.apache.pulsar.common.util.FieldParser.value(FieldParser.java:194)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.lambda$create$0(OffloadPolicies.java:265)
	at org.apache.pulsar.common.policies.data.OffloadPolicies$$Lambda$127/540923243.accept(Unknown Source)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.create(OffloadPolicies.java:261)
	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.getOffloadPolicies(BlobStoreManagedLedgerOffloader.java:303)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isOffloadedNeedsDelete(ManagedLedgerImpl.java:2091)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalTrimConsumedLedgers(ManagedLedgerImpl.java:2176)
	- locked <0x00000006a3f2c000> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$trimConsumedLedgersInBackground$24(ManagedLedgerImpl.java:1997)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$$Lambda$722/254878114.run(Unknown Source)
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
```

(cherry picked from commit af9b800)
codelipenghui added a commit that referenced this pull request Dec 11, 2021
…ader (#11629)

* Avoid redundant calls for getting the offload policies from the offloader

If we have many ledgers in a managed ledger, for checking if need to delete the offloaded ledger from bookies,
for each ledger, will call getOffloadPolicies() from the Offloader. For the BlobStoreManagedLedgerOffloader we
are generate the offload policies from the properties for each getting operation(Maybe need another PR to find way to optimize this part).
This will lead high CPU usage

Stack:

```
"bookkeeper-ml-workers-OrderedExecutor-4-0" #68 prio=5 os_prio=0 tid=0x00007f23663d8000 nid=0xae runnable [0x00007f22b8ac2000]
   java.lang.Thread.State: RUNNABLE
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.pulsar.common.util.FieldParser.convert(FieldParser.java:119)
	at org.apache.pulsar.common.util.FieldParser.value(FieldParser.java:194)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.lambda$create$0(OffloadPolicies.java:265)
	at org.apache.pulsar.common.policies.data.OffloadPolicies$$Lambda$127/540923243.accept(Unknown Source)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.create(OffloadPolicies.java:261)
	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.getOffloadPolicies(BlobStoreManagedLedgerOffloader.java:303)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isOffloadedNeedsDelete(ManagedLedgerImpl.java:2091)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalTrimConsumedLedgers(ManagedLedgerImpl.java:2176)
	- locked <0x00000006a3f2c000> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$trimConsumedLedgersInBackground$24(ManagedLedgerImpl.java:1997)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$$Lambda$722/254878114.run(Unknown Source)
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
```

(cherry picked from commit af9b800)
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…ader (apache#11629)

* Avoid redundant calls for getting the offload policies from the offloader

If we have many ledgers in a managed ledger, for checking if need to delete the offloaded ledger from bookies,
for each ledger, will call getOffloadPolicies() from the Offloader. For the BlobStoreManagedLedgerOffloader we
are generate the offload policies from the properties for each getting operation(Maybe need another PR to find way to optimize this part).
This will lead high CPU usage

Stack:

```
"bookkeeper-ml-workers-OrderedExecutor-4-0" apache#68 prio=5 os_prio=0 tid=0x00007f23663d8000 nid=0xae runnable [0x00007f22b8ac2000]
   java.lang.Thread.State: RUNNABLE
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.pulsar.common.util.FieldParser.convert(FieldParser.java:119)
	at org.apache.pulsar.common.util.FieldParser.value(FieldParser.java:194)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.lambda$create$0(OffloadPolicies.java:265)
	at org.apache.pulsar.common.policies.data.OffloadPolicies$$Lambda$127/540923243.accept(Unknown Source)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:647)
	at org.apache.pulsar.common.policies.data.OffloadPolicies.create(OffloadPolicies.java:261)
	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader.getOffloadPolicies(BlobStoreManagedLedgerOffloader.java:303)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.isOffloadedNeedsDelete(ManagedLedgerImpl.java:2091)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalTrimConsumedLedgers(ManagedLedgerImpl.java:2176)
	- locked <0x00000006a3f2c000> (a org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$trimConsumedLedgersInBackground$24(ManagedLedgerImpl.java:1997)
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl$$Lambda$722/254878114.run(Unknown Source)
	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
	at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
```
cbornet pushed a commit to cbornet/pulsar that referenced this pull request Apr 11, 2022
nicoloboschi pushed a commit to nicoloboschi/pulsar that referenced this pull request Apr 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants