Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist and recover individual deleted messages #192

Closed
wants to merge 2 commits into from

Conversation

sschepens
Copy link
Contributor

Motivation

Issue #180

Modifications

Modified ManagedCursorInfo and PositionInfo to store a list of ranges of PositionInfo, just like individualDeletedMessages.

When persisting ManagedCursorInfo or PositionInfo they ges populated with the current individualDeletedMessages, which are then used to repopulate individualDeletedMessages on recover.
Theoretically it should be as simple as repopulating individualDeletedMessages and ManagedCursor should skip already acked messages when reading.

Changed MetaStoreImplZookeeper to store Protobuf's byte representation rather than string representation, as this change produces and exponential growth on the string representation, due to the format of the Protobuf structures.
When reading, MetaStoreImplZookeeper will attempt to decode the data as byte representation, if it fails, it will then fallback to parsing the string representation.

I realize this, is a short-comming, since the data stored in ZooKeeper wouldn't be human-readable, but we could maybe expose a command in pulsar-admin to ease the reading. This also benefits ZooKeeper, as the data written is now much smaller (9 bytes for a ManagedCursorInfo with empty individualDeletedMessages).

Maybe persisting all individualDeletedMessages everytime PositionInfo is stored in BookKeeper doesn't make much sense, but users can tune writes though markDeleteThrottling. Also, setting an arbitrary value on the amount of messages between first unacked message and current read position, doesn't make much sense.

Result

Should allow for consumers to have unacked messages without affecting their backlog when bundles get unloading.

I would like for you guys @merlimat @rdhabalia to comment on this, tell me what you think, If you see this could bring unforseen issues, etc.

@merlimat merlimat added the type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages label Feb 7, 2017
@merlimat merlimat added this to the 1.17 milestone Feb 7, 2017
@rdhabalia
Copy link
Contributor

I am not in favor of storing individualDeletedMessages list into zookeeper as sometimes this list can be significantly huge for some subscribers and increasing size of zookeeper-snapshot due to this data is something which we want to avoid.
Another solution which I can think of:

  • everytime when broker starts it creates a dedicated ledger which will just store individualDeletedMessages list for each subscriber when the topic gets unloaded.
  • broker stores ledger-position into cursor-metadata while closing the cursor for a given subscriber
  • at the time of topic loading broker can read the list from bookkeeper and recover it

It will raise a question: when can we delete the ledger?

  • Probably broker can switchover ledger after X hour and we need to invent ledger_TTL to purge it

@sschepens
Copy link
Contributor Author

sschepens commented Feb 9, 2017

@rdhabalia this would require much more work and yet another ledger for every cursor.

Also, if a broker were to crash unexpectedly, it would not have persisted individualDeletedMessages and that would trigger the same behavior we see now.

Maybe we could use the same ledger we have now, and force a read to it's last entry on recover.

@merlimat
Copy link
Contributor

merlimat commented Feb 9, 2017

What about persisting a max number of intervals, stopping after that?

@merlimat
Copy link
Contributor

merlimat commented Feb 9, 2017

This way we can control how big the BK entry or z-node can grow. It will not be perfectly accurate, though better than today. I think storing 1k intervals should be pretty safe.

@saandrews
Copy link
Contributor

Can we persist this info into a ledger instead of storing it in zookeeper? Else we will introduce far more reads to zk during startup, which will be an issue when topic grows.

@merlimat
Copy link
Contributor

merlimat commented Feb 9, 2017

@saandrews The mark-delete position is already stored either in a ledger or in a z-node.

Normally it just gets written into the cursor ledger. During cursor ledger roll-over it also gets written in the z-node (and used as a fallback, in case the cursor ledgers fails to recover).

The other time when mark-delete position is stored in the z-node is when we do the graceful close of the topic. We store in the z-node and throw away the cursor ledger.

With this change, the number of writes stays exactly the same. It only changes (potentially) the amount of information stored.

@sschepens
Copy link
Contributor Author

@merlimat if we're concerned with storing too much on ZK, we can always force a read to the last position in the last ledger to get the list of individualDeletedMessages. This would require not setting ledger to -1 when closing it.
If not we can probably make the amount of ranges stored configurable on brokers, or even be disabled.
We could also probably add configurations to specify the behavior when max amount of ranges is reached, one could probably want to discard the older ranges (meaning those ranges will effectively be acked), and some would like to discard the newer ones, but face potential backlog on bundle unload.

@rdhabalia
Copy link
Contributor

this would require much more work and yet another ledger for every cursor.

No, broker will create only one ledger where all the cursors will write their list when bundle will be unloaded/or on some interval. And broker can roll-over that ledger with expiry time stored at some zk-location and one of the task at leader-broker can purge expired ledger.

Also, if a broker were to crash unexpectedly, it would not have persisted individualDeletedMessages and that would trigger the same behavior we see now.

yes, when broker crashes it also fails to update metadata into zk as well. So, broker crashing always fails to store current state and it falls back to recovery with fallback information.

What about persisting a max number of intervals, stopping after that? This way we can control how big the BK entry or z-node can grow. It will not be perfectly accurate, though better than today. I think storing 1k intervals should be pretty safe.

Yes, but I am just thinking it may not solve the problem entirely if there will be many intervals in a initial range of messages which will prevent to store latest individualDeletedMessageList and it resets cursor to old message range only.

  • Also rollback strategy need to be think of with this change while initializing cursor.

@sschepens
Copy link
Contributor Author

No, broker will create only one ledger where all the cursors will write their list when bundle will be unloaded/or on some interval. And broker can roll-over that ledger with expiry time stored at some zk-location and one of the task at leader-broker can purge expired ledger.

But this entry will potentially grow very large and hit some limit?

yes, when broker crashes it also fails to update metadata into zk as well. So, broker crashing always fails to store current state and it falls back to recovery with fallback information.

Yes, but with what I've currently done, this information is also stored on the cursor ledger.

@merlimat
Copy link
Contributor

merlimat commented Feb 9, 2017

No, broker will create only one ledger where all the cursors will write their list when bundle will be unloaded/or on some interval. And broker can roll-over that ledger with expiry time stored at some zk-location and one of the task at leader-broker can purge expired ledger.

@rdhabalia That would require to open and recover one more ledger when loading the topic.
I like the current approach more, though with a safety limit.

@sschepens
Copy link
Contributor Author

@merlimat another approach to reduce the size a bit, would be to only store the unacked messages of the ranges, and then build the ranges on runtime, this would require more logic that just picking up the ranges though

@saandrews
Copy link
Contributor

I was mostly concerned about the amount of data stored in Zk. Storing ranges or having a limit is better to contain its growth. Though it won't help with one unacked message pockets.

@merlimat
Copy link
Contributor

merlimat commented Feb 9, 2017

I was mostly concerned about the amount of data stored in Zk. Storing ranges or having a limit is better to contain its growth. Though it won't help with one unacked message pockets.

It will help, right? If you put a cap on 1000 (or whatever number) intervals, that means that, that we can record 1000 disjointed "holes" and remember about acked messages. After that, the rest will be replayed in case of broker restart (like today we're replaying everything).

@saandrews
Copy link
Contributor

It will help. In a bigger cluster 1000 itself might be big. In a bigger cluster, knowing the overall size of individually deleted entries across topics might help to realize and contain its growth. But it's not straightforward. We can go with this for now.

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach. As I commented before, I'd put a configurable max of how many ranges to store and after that fallback to current behavior.

@@ -15,19 +15,12 @@
*/
package org.apache.bookkeeper.mledger;

import com.google.common.annotations.Beta;
import org.apache.bookkeeper.mledger.AsyncCallbacks.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you keep the import in the same format as Eclipse? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@@ -1619,7 +1615,9 @@ public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object
// hence we write it as -1. The cursor ledger is deleted once the z-node write is confirmed.
ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(-1)
.setMarkDeleteLedgerId(markDeletePosition.getLedgerId())
.setMarkDeleteEntryId(markDeletePosition.getEntryId()).build();
.setMarkDeleteEntryId(markDeletePosition.getEntryId())
.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we skip this line if we have no individually deleted messages? We should avoid to create an empty list in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

@@ -1834,6 +1851,7 @@ void switchToNewLedger(final LedgerHandle lh, final VoidCallback callback) {
// ledger and delete the old one.
ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(lh.getId())
.setMarkDeleteLedgerId(markDeletePosition.getLedgerId())
.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, avoid if possible

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

@@ -233,84 +224,76 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa
info.getCursorsLedgerId(), info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());

String path = prefix + ledgerName + "/" + cursorName;
byte[] content = info.toString().getBytes(Encoding);
byte[] content = info.toByteArray();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this maintaining the protobuf text format? I think toByteArray() is using the binary format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, this is using the binary format, and it's intentional, I mentioned it in the description of the PR.
Text format is REALLY verbose, and its size grows A LOT for each entry.
Here's what an entry with only 71 messages looks like:

cursorsLedgerId: -1 markDeleteLedgerId: 459 markDeleteEntryId: 59 individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 60 } upperEndpoint { ledgerId: 459 entryId: 1845 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1846 } upperEndpoint { ledgerId: 459 entryId: 1848 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1849 } upperEndpoint { ledgerId: 459 entryId: 1851 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1852 } upperEndpoint { ledgerId: 459 entryId: 1854 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1855 } upperEndpoint { ledgerId: 459 entryId: 1857 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1858 } upperEndpoint { ledgerId: 459 entryId: 1860 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1861 } upperEndpoint { ledgerId: 459 entryId: 1863 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1864 } upperEndpoint { ledgerId: 459 entryId: 1866 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1867 } upperEndpoint { ledgerId: 459 entryId: 1869 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1870 } upperEndpoint { ledgerId: 459 entryId: 1872 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1873 } upperEndpoint { ledgerId: 459 entryId: 1875 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1876 } upperEndpoint { ledgerId: 459 entryId: 1878 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1879 } upperEndpoint { ledgerId: 459 entryId: 1881 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1882 } upperEndpoint { ledgerId: 459 entryId: 1884 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1885 } upperEndpoint { ledgerId: 459 entryId: 1887 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1888 } upperEndpoint { ledgerId: 459 entryId: 1890 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1891 } upperEndpoint { ledgerId: 459 entryId: 1893 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1894 } upperEndpoint { ledgerId: 459 entryId: 1896 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1897 } upperEndpoint { ledgerId: 459 entryId: 1899 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1900 } upperEndpoint { ledgerId: 459 entryId: 1902 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1903 } upperEndpoint { ledgerId: 459 entryId: 1905 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1906 } upperEndpoint { ledgerId: 459 entryId: 1908 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1909 } upperEndpoint { ledgerId: 459 entryId: 1911 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1912 } upperEndpoint { ledgerId: 459 entryId: 1914 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1915 } upperEndpoint { ledgerId: 459 entryId: 1917 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1918 } upperEndpoint { ledgerId: 459 entryId: 1920 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1921 } upperEndpoint { ledgerId: 459 entryId: 1923 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1924 } upperEndpoint { ledgerId: 459 entryId: 1926 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1927 } upperEndpoint { ledgerId: 459 entryId: 1929 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1930 } upperEndpoint { ledgerId: 459 entryId: 1932 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1933 } upperEndpoint { ledgerId: 459 entryId: 1935 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1936 } upperEndpoint { ledgerId: 459 entryId: 1938 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1939 } upperEndpoint { ledgerId: 459 entryId: 1941 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1942 } upperEndpoint { ledgerId: 459 entryId: 1944 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1945 } upperEndpoint { ledgerId: 459 entryId: 1947 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1948 } upperEndpoint { ledgerId: 459 entryId: 1950 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1951 } upperEndpoint { ledgerId: 459 entryId: 1953 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1954 } upperEndpoint { ledgerId: 459 entryId: 1956 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1957 } upperEndpoint { ledgerId: 459 entryId: 1959 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1960 } upperEndpoint { ledgerId: 459 entryId: 1962 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1963 } upperEndpoint { ledgerId: 459 entryId: 1965 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1966 } upperEndpoint { ledgerId: 459 entryId: 1968 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1969 } upperEndpoint { ledgerId: 459 entryId: 1971 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1972 } upperEndpoint { ledgerId: 459 entryId: 1974 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1975 } upperEndpoint { ledgerId: 459 entryId: 1977 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1978 } upperEndpoint { ledgerId: 459 entryId: 1980 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1981 } upperEndpoint { ledgerId: 459 entryId: 1983 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1984 } upperEndpoint { ledgerId: 459 entryId: 1986 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1987 } upperEndpoint { ledgerId: 459 entryId: 1989 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1990 } upperEndpoint { ledgerId: 459 entryId: 1992 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1993 } upperEndpoint { ledgerId: 459 entryId: 1995 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1996 } upperEndpoint { ledgerId: 459 entryId: 1998 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 1999 } upperEndpoint { ledgerId: 459 entryId: 2001 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2002 } upperEndpoint { ledgerId: 459 entryId: 2004 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2005 } upperEndpoint { ledgerId: 459 entryId: 2007 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2008 } upperEndpoint { ledgerId: 459 entryId: 2010 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2011 } upperEndpoint { ledgerId: 459 entryId: 2013 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2014 } upperEndpoint { ledgerId: 459 entryId: 2016 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2017 } upperEndpoint { ledgerId: 459 entryId: 2019 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2020 } upperEndpoint { ledgerId: 459 entryId: 2022 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2023 } upperEndpoint { ledgerId: 459 entryId: 2025 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2026 } upperEndpoint { ledgerId: 459 entryId: 2028 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2029 } upperEndpoint { ledgerId: 459 entryId: 2031 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2032 } upperEndpoint { ledgerId: 459 entryId: 2034 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2035 } upperEndpoint { ledgerId: 459 entryId: 2037 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2038 } upperEndpoint { ledgerId: 459 entryId: 2040 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2041 } upperEndpoint { ledgerId: 459 entryId: 2043 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2044 } upperEndpoint { ledgerId: 459 entryId: 2046 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2047 } upperEndpoint { ledgerId: 459 entryId: 2048 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2050 } upperEndpoint { ledgerId: 459 entryId: 2051 } } individualDeletedMessages { lowerEndpoint { ledgerId: 459 entryId: 2053 } upperEndpoint { ledgerId: 459 entryId: 2054 } }

That's about 8k, and this is only going to get larger, larger Ids mean larger string size.

I know that introducing a change in format is a breaking change, but I changed usages of this to parse both formats.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the size concern for the text format is very real, but the breaking change would be dangerous in 2 ways:

  • While a rolling upgrade is happening, an updated broker crashes and topics can end up in a non-updated broker
  • In case there is any issue with the release, we need to be able to roll back to the previous release

In general this problem can be solved in a couple of ways:

  • Do one release that will understand both formats. Next release will start writing the new format.
  • Use a config switch to chose the format. Then retire the config variable once the feature is widely activated.

For the sake of this PR, I would say to not mix it with format changes, this we need to tackle separately in a controlled way.

In this case, we can simply avoid snapshotting this information into the z-node. In normal behavior the cursor position is appended in binary form into a ledger.
When doing graceful topic close, as an optimization to save on the number of zk writes to do, we write the information with the last mark-delete position in the z-node and throw the ledger away.

What we could do, when closing the topic, is:

  • If we have individually-deleted-messages, we close the cursor ledger
  • If not, continue with current behavior of storing snaphot in the z-node

In both cases the information is preserved.

Later, when we enable binary format (btw: we should also do that for managed-ledger z-nodes), we can revert to a unique behavior again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat what do we suggest we do then?

I realized broker also saves in ZK the loadbalance info of all bunldes it owns, this can potentially grow very large too, right?

@sschepens sschepens force-pushed the persist_unacked_messages branch 2 times, most recently from 317c1c8 to ea41da5 Compare February 20, 2017 14:53
@sschepens
Copy link
Contributor Author

@merlimat I added a configuration field to specify the amount of ranges to persist, it defaults to 1000.
I don't know if the naming is correct, please do suggest alternatives.

@sschepens sschepens force-pushed the persist_unacked_messages branch 2 times, most recently from fefbcfc to cb121b1 Compare February 20, 2017 20:58
@sschepens sschepens force-pushed the persist_unacked_messages branch from cb121b1 to ec53a06 Compare February 21, 2017 14:24
@sschepens
Copy link
Contributor Author

@merlimat I pushed a new commit rearranging some of the locking and synchronization in ManagedCursorImpl some synchronization blocks were no necessary I believe, and some covered way too much code that wasn't actually needing the synchronization or lock I think. Can you give this a look?

We were getting deadlocks because buildIndividualDeletedMessageRanges is now requiring a lock (we had ConcurrentModificationExceptions before) but persistPosition is sometimes called from within a synchronized block, and this caused a deadlock. Here is the detail:

Found one Java-level deadlock:
=============================
"pulsar-io-39-8":
  waiting for ownable synchronizer 0x00000006c3e59278, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "pulsar-io-39-2"
"pulsar-io-39-2":
  waiting to lock monitor 0x00007f3980058158 (object 0x00000006c3e59320, a java.util.ArrayDeque),
  which is held by "bookkeeper-ml-workers-31-1"
"bookkeeper-ml-workers-31-1":
  waiting for ownable synchronizer 0x00000006c3e59278, (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "pulsar-io-39-2"

Java stack information for the threads listed above:
===================================================
"pulsar-io-39-8":
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006c3e59278> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:943)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncDelete(ManagedCursorImpl.java:1409)
        at com.yahoo.pulsar.broker.service.persistent.PersistentSubscription.acknowledgeMessage(PersistentSubscription.java:169)
        at com.yahoo.pulsar.broker.service.Consumer.messageAcked(Consumer.java:306)
        at com.yahoo.pulsar.broker.service.ServerCnx.handleAck(ServerCnx.java:497)
        at com.yahoo.pulsar.common.api.PulsarDecoder.channelRead(PulsarDecoder.java:112)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:934)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:405)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:310)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)
"pulsar-io-39-2":
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.internalAsyncMarkDelete(ManagedCursorImpl.java:1251)
        - waiting to lock <0x00000006c3e59320> (a java.util.ArrayDeque)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncDelete(ManagedCursorImpl.java:1475)
        at com.yahoo.pulsar.broker.service.persistent.PersistentSubscription.acknowledgeMessage(PersistentSubscription.java:169)
        at com.yahoo.pulsar.broker.service.Consumer.messageAcked(Consumer.java:306)
        at com.yahoo.pulsar.broker.service.ServerCnx.handleAck(ServerCnx.java:497)
        at com.yahoo.pulsar.common.api.PulsarDecoder.channelRead(PulsarDecoder.java:112)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:934)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:405)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:310)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)
"bookkeeper-ml-workers-31-1":
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006c3e59278> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
        at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.buildIndividualDeletedMessageRanges(ManagedCursorImpl.java:1822)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.persistPosition(ManagedCursorImpl.java:1850)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.internalMarkDelete(ManagedCursorImpl.java:1290)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.internalFlushPendingMarkDeletes(ManagedCursorImpl.java:1770)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.flushPendingMarkDeletes(ManagedCursorImpl.java:1753)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.readOperationCompleted(ManagedCursorImpl.java:2001)
        - locked <0x00000006c3e59320> (a java.util.ArrayDeque)
        at org.apache.bookkeeper.mledger.impl.OpReadEntry.checkReadCompletion(OpReadEntry.java:120)
        at org.apache.bookkeeper.mledger.impl.OpReadEntry.readEntriesComplete(OpReadEntry.java:71)
        at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$null$2(EntryCacheImpl.java:274)
        at org.apache.bookkeeper.mledger.impl.EntryCacheImpl$$Lambda$245/787545477.run(Unknown Source)
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:27)
        at org.apache.bookkeeper.util.SafeRunnable.run(SafeRunnable.java:31)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
        at java.lang.Thread.run(Thread.java:745)

Found 1 deadlock.

@merlimat
Copy link
Contributor

merlimat commented Mar 6, 2017

@sschepens I have added few changes on top of this PR in #276. Please take a look.

I haven't included your second commit about the lock refactoring. I didn't see any deadlock so far and that's kind of "sensitive" code :). Kind of easy to break something unexpected there.
How did you get the deadlock, just running the unit tests?

@merlimat
Copy link
Contributor

merlimat commented Mar 6, 2017

How did you get the deadlock, just running the unit tests?

Correct myself.. I see the same deadlock

@sschepens
Copy link
Contributor Author

sschepens commented Mar 6, 2017

@merlimat we've been running with this code for almost two weeks and we have not seen any more deadlocks nor unwanted behavior I believe, please check the lock refactoring I made, most changes make a lot of sense to me, and shouldn't trigger any side effects.

This will also probably reduce contention a little bit, since some locks are never required and some others are released quicker.

In some cases we were synchronizing on pendingMarkDeleteOps just to perform an RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet, and this didn't make much sense, since this operations are already atomic and don't modify pendingMarkDeleteOps

In other cases we were synchronizing on pendingMarkDeleteOps on blocks of code which then didn't end up needing the lock at all. I modified these, to only lock the blocks of code which actually need the lock.

Then flushPendingMarkDeletes was requiring that the caller synchronized on pendingMarkDeleteOps before calling the method, this also made little sense, so I moved the lock inside the method.

I don't see how these could cause any issues, but maybe you can see something I don't.

@@ -1462,7 +1458,19 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba
newMarkDeletePosition = range.upperEndpoint();
}
}
} catch (Exception e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this was the source of the deadlock, I had it fixed in my branch as well. ecf4501

// Resume normal mark-delete operations
STATE_UPDATER.set(ManagedCursorImpl.this, State.Open);
}
flushPendingMarkDeletes();
Copy link
Contributor

@merlimat merlimat Mar 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure on why the 2 operations were grouped on the same lock, though I'm sure there was some reason :), maybe not a good one. In any case, this is unrelated to the specific change of persisting the individually deleted position and should go into a separate PR.

@merlimat
Copy link
Contributor

Closing this one since the change was carried over in #276

@merlimat merlimat closed this Mar 24, 2017
sijie pushed a commit to sijie/pulsar that referenced this pull request Mar 4, 2018
hangc0276 pushed a commit to hangc0276/pulsar that referenced this pull request May 26, 2021
fix apache#192

* remove unnecessary register producer

* exist check

* fix spotbug issue

* remove blank line
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement The enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants