Skip to content

Commit

Permalink
ZOOKEEPER-4655: Communicate the Zxid that triggered a WatchEvent to f…
Browse files Browse the repository at this point in the history
…ire (#1950)

* Fix a race condition in WatcherCleanerTest.testDeadWatcherMetrics

Because the metrics were updated _after_ the listener is invoked, the listener does not always see
the fresh metric value. This fixes it so that the test waits for the value to become what we expect.

* Leverage an existing method and refactor the rest of the code to match

Since there was an existing waitFor method in ZKTestCase, along with an existing implementation of a
waitForMetric LearnerMetricsTest, this commit moves waitForMetric to ZKTestCase and refactors the
metric-related usages of waitFor.

* Communicate the Zxid that triggered a WatchEvent to fire

With the recent addition of persistent watches, many doors have opened up to significantly more
performant and intuitive local caches of remote state, but the actual implementation can be
difficult because to cache data locally, one needs to execute the following steps:

1. Set the watch
2. Bootstrap the watched subtree
3. Catch up on the events that fired during the bootstrap

The issue is it's now very difficult to deduplicate and sanely resolve the remote state during step
3 because it's unknown whether an event arrived during the bootstrap or after. For example,
imagine that between steps 1 and 2, a node /a was deleted then re-created. By the time step 3 is
executed, there will be a NodeDeleted event queued up followed by a NodeCreated, causing at best a
double read (one from the bootstrap, one from the NodeCreated) or at worst some data inconsistencies
in the local cache.

This change sets the Zxid in the response header whenever the watch event type is NodeCreated,
NodeDeleted, NodeDataChanged or NodeChildrenChanged.
  • Loading branch information
PapaCharlie authored Jun 16, 2023
1 parent b3487c5 commit 880f606
Show file tree
Hide file tree
Showing 22 changed files with 360 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerConcentrateWatch(InvocationState state) throws Exception {
for (String path : state.paths) {
state.watchManager.triggerWatch(path, event);
state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}

Expand Down Expand Up @@ -225,7 +225,7 @@ public void tearDown() {

// clear all the watches
for (String path : paths) {
watchManager.triggerWatch(path, event);
watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}
}
Expand Down Expand Up @@ -294,7 +294,7 @@ public void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception {
for (String path : state.paths) {
state.watchManager.triggerWatch(path, event);
state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
event.setPath(clientPath);
}

WatchedEvent we = new WatchedEvent(event);
WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid());
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,38 @@
*/
@InterfaceAudience.Public
public class WatchedEvent {
public static final long NO_ZXID = -1L;

private final KeeperState keeperState;
private final EventType eventType;
private String path;
private final String path;
private final long zxid;

/**
* Create a WatchedEvent with specified type, state and path
* Create a WatchedEvent with specified type, state, path and zxid
*/
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) {
this.keeperState = keeperState;
this.eventType = eventType;
this.path = path;
this.zxid = zxid;
}

/**
* Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent
* Create a WatchedEvent with specified type, state and path
*/
public WatchedEvent(WatcherEvent eventMessage) {
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
this(eventType, keeperState, path, NO_ZXID);
}

/**
* Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent
*/
public WatchedEvent(WatcherEvent eventMessage, long zxid) {
keeperState = KeeperState.fromInt(eventMessage.getState());
eventType = EventType.fromInt(eventMessage.getType());
path = eventMessage.getPath();
this.zxid = zxid;
}

public KeeperState getState() {
Expand All @@ -66,9 +77,24 @@ public String getPath() {
return path;
}

/**
* Returns the zxid of the transaction that triggered this watch if it is
* of one of the following types:<ul>
* <li>{@link EventType#NodeCreated}</li>
* <li>{@link EventType#NodeDeleted}</li>
* <li>{@link EventType#NodeDataChanged}</li>
* <li>{@link EventType#NodeChildrenChanged}</li>
* </ul>
* Otherwise, returns {@value #NO_ZXID}. Note that {@value #NO_ZXID} is also
* returned by old servers that do not support this feature.
*/
public long getZxid() {
return zxid;
}

@Override
public String toString() {
return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path;
return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid;
}

/**
Expand All @@ -77,5 +103,4 @@ public String toString() {
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
* server it connects to. An application using such a client handles these
* events by registering a callback object with the client. The callback object
* is expected to be an instance of a class that implements Watcher interface.
* When {@link #process} is triggered by a watch firing, such as
* {@link Event.EventType#NodeDataChanged}, {@link WatchedEvent#getZxid()} will
* return the zxid of the transaction that caused said watch to fire. If
* {@value WatchedEvent#NO_ZXID} is returned then the server must be updated to
* support this feature.
*
*/
@InterfaceAudience.Public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem
updateQuotaStat(lastPrefix, bytes, 1);
}
updateWriteStat(path, bytes);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
}

/**
Expand Down Expand Up @@ -615,9 +615,9 @@ public void deleteNode(String path, long zxid) throws NoNodeException {
"childWatches.triggerWatch " + parentName);
}

WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted);
childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged);
WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
}

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
Expand Down Expand Up @@ -649,7 +649,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time)
nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastData));

updateWriteStat(path, dataBytes);
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
return s;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
public class DumbWatcher extends ServerCnxn {

private long sessionId;
private String mostRecentPath;
private Event.EventType mostRecentEventType;
private long mostRecentZxid = WatchedEvent.NO_ZXID;

public DumbWatcher() {
this(0);
Expand All @@ -49,8 +52,24 @@ void setSessionTimeout(int sessionTimeout) {

@Override
public void process(WatchedEvent event) {
mostRecentEventType = event.getType();
mostRecentZxid = event.getZxid();
mostRecentPath = event.getPath();
}

public String getMostRecentPath() {
return mostRecentPath;
}

public Event.EventType getMostRecentEventType() {
return mostRecentEventType;
}

public long getMostRecentZxid() {
return mostRecentZxid;
}


@Override
int getSessionTimeout() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St
*/
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public int getSessionTimeout() {

@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,22 +113,24 @@ default boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherM
*
* @param path znode path
* @param type the watch event type
* @param zxid the zxid for the corresponding change that triggered this event
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type);
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid);

/**
* Distribute the watch event for the given path, but ignore those
* suppressed ones.
*
* @param path znode path
* @param type the watch event type
* @param zxid the zxid for the corresponding change that triggered this event
* @param suppress the suppressed watcher set
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress);
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress);

/**
* Get the size of watchers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ public synchronized void removeWatcher(Watcher watcher) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
return triggerWatch(path, type, zxid, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
Set<Watcher> watchers = new HashSet<>();
synchronized (this) {
PathParentIterator pathParentIterator = getPathParentIterator(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ public void processDeadWatchers(Set<Integer> deadWatchers) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
return triggerWatch(path, type, zxid, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);

BitHashSet watchers = remove(path);
if (watchers == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.time.Instant;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.util.ServiceUtils;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.StringDescription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -40,6 +45,7 @@ public class ZKTestCase {

protected static final File testBaseDir = new File(System.getProperty("build.test.dir", "build"));
private static final Logger LOG = LoggerFactory.getLogger(ZKTestCase.class);
public static final int DEFAULT_METRIC_TIMEOUT = 30;

static {
// Disable System.exit in tests.
Expand Down Expand Up @@ -103,7 +109,7 @@ public interface WaitForCondition {
* @param timeout timeout in seconds
* @throws InterruptedException
*/
public void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException {
public static void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException {
final Instant deadline = Instant.now().plusSeconds(timeout);
while (Instant.now().isBefore(deadline)) {
if (condition.evaluate()) {
Expand All @@ -114,4 +120,36 @@ public void waitFor(String msg, WaitForCondition condition, int timeout) throws
fail(msg);
}

}
public static <T> void waitForMetric(String metricKey, Matcher<T> matcher) throws InterruptedException {
waitForMetric(metricKey, matcher, DEFAULT_METRIC_TIMEOUT);
}

public static <T> void waitForMetric(String metricKey, Matcher<T> matcher, int timeoutInSeconds) throws InterruptedException {
String errorMessage = String.format("metric \"%s\" failed to match after %d seconds",
metricKey, timeoutInSeconds);
waitFor(errorMessage, () -> {
@SuppressWarnings("unchecked")
T actual = (T) MetricsUtils.currentServerMetrics().get(metricKey);
if (!matcher.matches(actual)) {
Description description = new StringDescription();
matcher.describeMismatch(actual, description);
LOG.info("match failed for metric {}: {}", metricKey, description);
return false;
}
return true;
}, timeoutInSeconds);
}

/**
* Functionally identical to {@link org.hamcrest.Matchers#closeTo} except that it accepts all numerical types
* instead of failing if the value is not a {@link Double}.
*/
public static Matcher<Number> closeTo(double operand, double error) {
return new CustomMatcher<Number>(String.format("A number within %s of %s", error, operand)) {
@Override
public boolean matches(Object actual) {
return Math.abs(operand - ((Number) actual).doubleValue()) <= error;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.zookeeper.server;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
Expand Down Expand Up @@ -75,10 +77,6 @@ public class RequestThrottlerTest extends ZKTestCase {
ZooKeeper zk = null;
int connectionLossCount = 0;

private long getCounterMetric(String name) {
return (long) MetricsUtils.currentServerMetrics().get(name);
}

@BeforeEach
public void setup() throws Exception {
// start a server and create a client
Expand Down Expand Up @@ -222,11 +220,8 @@ public void testRequestThrottler() throws Exception {
submitted.await(5, TimeUnit.SECONDS);

// but only two requests can get into the pipeline because of the throttler
WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") == 2;
waitFor("request not queued", requestQueued, 5);

WaitForCondition throttleWait = () -> getCounterMetric("request_throttle_wait_count") >= 1;
waitFor("no throttle wait", throttleWait, 5);
waitForMetric("prep_processor_request_queued", is(2L));
waitForMetric("request_throttle_wait_count", greaterThanOrEqualTo(1L));

// let the requests go through the pipeline and the throttler will be waken up to allow more requests
// to enter the pipeline
Expand Down Expand Up @@ -387,8 +382,7 @@ public void testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled()
// be GLOBAL_OUTSTANDING_LIMIT + 2.
//
// But due to leak of consistent view of number of outstanding requests, the number could be larger.
WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") >= Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2;
waitFor("no enough requests queued", requestQueued, 5);
waitForMetric("prep_processor_request_queued", greaterThanOrEqualTo(Long.parseLong(GLOBAL_OUTSTANDING_LIMIT) + 2));

resumeProcess.countDown();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public void testFileTxnSnapLogMetrics() throws Exception {
}

// It is possible that above writes will trigger more than one snapshot due to randomization.
WaitForCondition newSnapshot = () -> (long) MetricsUtils.currentServerMetrics().get("cnt_snapshottime") >= 2L;
waitFor("no snapshot in 10s", newSnapshot, 10);
waitForMetric("cnt_snapshottime", greaterThanOrEqualTo(2L), 10);

// Pauses snapshot and logs more txns.
cnxnFactory.getZooKeeperServer().getTxnLogFactory().snapLog.close();
Expand Down
Loading

0 comments on commit 880f606

Please sign in to comment.