From a83e9ed70323b80319545326716340ce76d690eb Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 7 Feb 2024 14:10:33 -0800 Subject: [PATCH] [feat] PIP-335: Add support Oxia as a metadata store (#22007) --- .../server/src/assemble/LICENSE.bin.txt | 2 + pom.xml | 13 + pulsar-broker/pom.xml | 6 + pulsar-metadata/pom.xml | 12 + .../impl/MetadataStoreFactoryImpl.java | 3 + .../metadata/impl/oxia/OxiaMetadataStore.java | 283 ++++++++++++++++++ .../impl/oxia/OxiaMetadataStoreProvider.java | 75 +++++ .../metadata/impl/oxia/package-info.java | 19 ++ .../metadata/BaseMetadataStoreTest.java | 18 ++ .../pulsar/metadata/MetadataBenchmark.java | 2 +- .../pulsar/metadata/MetadataStoreTest.java | 30 +- 11 files changed, 450 insertions(+), 13 deletions(-) create mode 100644 pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java create mode 100644 pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java create mode 100644 pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/package-info.java diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 0bf0fee823c7d..e3941c54a74b1 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -465,6 +465,8 @@ The Apache Software License, Version 2.0 - io.dropwizard.metrics-metrics-jmx-4.1.12.1.jar * Prometheus - io.prometheus-simpleclient_httpserver-0.16.0.jar + * Oxia + - io.streamnative.oxia-oxia-client-0.1.0-shaded.jar * Java JSON WebTokens - io.jsonwebtoken-jjwt-api-0.11.1.jar - io.jsonwebtoken-jjwt-impl-0.11.1.jar diff --git a/pom.xml b/pom.xml index 0c216c9dab8ac..4dfeb30821a3f 100644 --- a/pom.xml +++ b/pom.xml @@ -242,6 +242,7 @@ flexible messaging model and an intuitive client API. 4.5.13 4.4.15 0.7.5 + 0.1.0 2.0 1.10.12 5.3.3 @@ -1152,6 +1153,18 @@ flexible messaging model and an intuitive client API. ${sketches.version} + + io.streamnative.oxia + oxia-client + ${oxia.version} + shaded + + + io.streamnative.oxia + oxia-testcontainers + ${oxia.version} + + com.amazonaws aws-java-sdk-bom diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 9dd319f791191..c39de184b05cc 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -164,6 +164,12 @@ test + + io.streamnative.oxia + oxia-testcontainers + test + + io.dropwizard.metrics diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml index d232a1f5c000a..8600d0ea1919b 100644 --- a/pulsar-metadata/pom.xml +++ b/pulsar-metadata/pom.xml @@ -62,6 +62,18 @@ + + io.streamnative.oxia + oxia-client + shaded + + + + io.streamnative.oxia + oxia-testcontainers + test + + io.dropwizard.metrics metrics-core diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java index dd4df69fc430b..cb7bea718e4be 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER; import static org.apache.pulsar.metadata.impl.RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER; import static org.apache.pulsar.metadata.impl.ZKMetadataStore.ZK_SCHEME_IDENTIFIER; +import static org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStoreProvider.OXIA_SCHEME_IDENTIFIER; import com.google.common.base.Splitter; import java.util.HashMap; import java.util.Map; @@ -31,6 +32,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreProvider; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStoreProvider; @Slf4j public class MetadataStoreFactoryImpl { @@ -66,6 +68,7 @@ static Map loadProviders() { providers.put(MEMORY_SCHEME_IDENTIFIER, new MemoryMetadataStoreProvider()); providers.put(ROCKSDB_SCHEME_IDENTIFIER, new RocksdbMetadataStoreProvider()); providers.put(ETCD_SCHEME_IDENTIFIER, new EtcdMetadataStoreProvider()); + providers.put(OXIA_SCHEME_IDENTIFIER, new OxiaMetadataStoreProvider()); providers.put(ZK_SCHEME_IDENTIFIER, new ZkMetadataStoreProvider()); String factoryClasses = System.getProperty(METADATASTORE_PROVIDERS_PROPERTY, ""); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java new file mode 100644 index 0000000000000..2ab744e205320 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl.oxia; + +import io.streamnative.oxia.client.OxiaClientBuilder; +import io.streamnative.oxia.client.api.AsyncOxiaClient; +import io.streamnative.oxia.client.api.DeleteOption; +import io.streamnative.oxia.client.api.KeyAlreadyExistsException; +import io.streamnative.oxia.client.api.Notification; +import io.streamnative.oxia.client.api.PutOption; +import io.streamnative.oxia.client.api.PutResult; +import io.streamnative.oxia.client.api.UnexpectedVersionIdException; +import io.streamnative.oxia.client.api.Version; +import java.time.Duration; +import java.util.EnumSet; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.metadata.api.GetResult; +import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.metadata.api.Stat; +import org.apache.pulsar.metadata.api.extended.CreateOption; +import org.apache.pulsar.metadata.impl.AbstractMetadataStore; + +@Slf4j +public class OxiaMetadataStore extends AbstractMetadataStore { + + private final AsyncOxiaClient client; + + private final String identity; + private final Optional synchronizer; + + OxiaMetadataStore( + @NonNull String serviceAddress, + @NonNull String namespace, + @NonNull MetadataStoreConfig metadataStoreConfig, + boolean enableSessionWatcher) + throws Exception { + super("oxia-metadata"); + + var linger = metadataStoreConfig.getBatchingMaxDelayMillis(); + if (!metadataStoreConfig.isBatchingEnabled()) { + linger = 0; + } + this.synchronizer = Optional.ofNullable(metadataStoreConfig.getSynchronizer()); + identity = UUID.randomUUID().toString(); + client = + new OxiaClientBuilder(serviceAddress) + .clientIdentifier(identity) + .namespace(namespace) + .sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis())) + .batchLinger(Duration.ofMillis(linger)) + .maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations()) + .asyncClient() + .get(); + client.notifications(this::notificationCallback); + super.registerSyncListener(Optional.ofNullable(metadataStoreConfig.getSynchronizer())); + } + + private void notificationCallback(Notification notification) { + if (notification instanceof Notification.KeyCreated keyCreated) { + receivedNotification( + new org.apache.pulsar.metadata.api.Notification( + NotificationType.Created, keyCreated.key())); + notifyParentChildrenChanged(keyCreated.key()); + + } else if (notification instanceof Notification.KeyModified keyModified) { + receivedNotification( + new org.apache.pulsar.metadata.api.Notification( + NotificationType.Modified, keyModified.key())); + } else if (notification instanceof Notification.KeyDeleted keyDeleted) { + receivedNotification( + new org.apache.pulsar.metadata.api.Notification( + NotificationType.Deleted, keyDeleted.key())); + notifyParentChildrenChanged(keyDeleted.key()); + } else { + log.error("Unknown notification type {}", notification); + } + } + + Optional convertGetResult( + String path, io.streamnative.oxia.client.api.GetResult result) { + if (result == null) { + return Optional.empty(); + } + return Optional.of(result) + .map( + oxiaResult -> + new GetResult(oxiaResult.getValue(), convertStat(path, oxiaResult.getVersion()))); + } + + Stat convertStat(String path, Version version) { + return new Stat( + path, + version.versionId(), + version.createdTimestamp(), + version.modifiedTimestamp(), + version.sessionId().isPresent(), + version.clientIdentifier().stream().anyMatch(identity::equals), + version.modificationsCount() == 0); + } + + @Override + protected CompletableFuture> getChildrenFromStore(String path) { + var pathWithSlash = path + "/"; + + return client + .list(pathWithSlash, pathWithSlash + "/") + .thenApply( + children -> + children.stream().map(child -> child.substring(pathWithSlash.length())).toList()) + .exceptionallyCompose(this::convertException); + } + + @Override + protected CompletableFuture existsFromStore(String path) { + return client.get(path).thenApply(Objects::nonNull) + .exceptionallyCompose(this::convertException); + } + + @Override + protected CompletableFuture> storeGet(String path) { + return client.get(path).thenApply(res -> convertGetResult(path, res)) + .exceptionallyCompose(this::convertException); + } + + @Override + protected CompletableFuture storeDelete(String path, Optional expectedVersion) { + return getChildrenFromStore(path) + .thenCompose( + children -> { + if (children.size() > 0) { + return CompletableFuture.failedFuture( + new MetadataStoreException("Key '" + path + "' has children")); + } else { + var delOption = + expectedVersion + .map(DeleteOption::ifVersionIdEquals) + .orElse(DeleteOption.Unconditionally); + CompletableFuture result = client.delete(path, delOption); + return result + .thenCompose( + exists -> { + if (!exists) { + return CompletableFuture.failedFuture( + new MetadataStoreException.NotFoundException( + "Key '" + path + "' does not exist")); + } + return CompletableFuture.completedFuture((Void) null); + }) + .exceptionallyCompose(this::convertException); + } + }); + } + + @Override + protected CompletableFuture storePut( + String path, byte[] data, Optional optExpectedVersion, EnumSet options) { + CompletableFuture parentsCreated = createParents(path); + return parentsCreated.thenCompose( + __ -> { + var expectedVersion = optExpectedVersion; + if (expectedVersion.isPresent() + && expectedVersion.get() != -1L + && options.contains(CreateOption.Sequential)) { + return CompletableFuture.failedFuture( + new MetadataStoreException( + "Can't have expectedVersion and Sequential at the same time")); + } + CompletableFuture actualPath; + if (options.contains(CreateOption.Sequential)) { + var parent = parent(path); + var parentPath = parent == null ? "/" : parent; + + actualPath = + client + .put(parentPath, new byte[] {}) + .thenApply( + r -> String.format("%s%010d", path, r.version().modificationsCount())); + expectedVersion = Optional.of(-1L); + } else { + actualPath = CompletableFuture.completedFuture(path); + } + var versionCondition = + expectedVersion + .map( + ver -> { + if (ver == -1) { + return PutOption.IfRecordDoesNotExist; + } + return PutOption.ifVersionIdEquals(ver); + }) + .orElse(PutOption.Unconditionally); + var putOptions = + options.contains(CreateOption.Ephemeral) + ? new PutOption[] {PutOption.AsEphemeralRecord, versionCondition} + : new PutOption[] {versionCondition}; + return actualPath + .thenCompose( + aPath -> + client + .put(aPath, data, putOptions) + .thenApply(res -> new PathWithPutResult(aPath, res))) + .thenApply(res -> convertStat(res.path(), res.result().version())) + .exceptionallyCompose(this::convertException); + }); + } + + private CompletionStage convertException(Throwable ex) { + if (ex.getCause() instanceof UnexpectedVersionIdException + || ex.getCause() instanceof KeyAlreadyExistsException) { + return CompletableFuture.failedFuture( + new MetadataStoreException.BadVersionException(ex.getCause())); + } else if (ex.getCause() instanceof IllegalStateException) { + return CompletableFuture.failedFuture(new MetadataStoreException.AlreadyClosedException(ex.getCause())); + } else { + return CompletableFuture.failedFuture(ex.getCause()); + } + } + + private CompletableFuture createParents(String path) { + var parent = parent(path); + if (parent == null || parent.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return exists(parent) + .thenCompose( + exists -> { + if (exists) { + return CompletableFuture.completedFuture(null); + } else { + return client + .put(parent, new byte[] {}, PutOption.IfRecordDoesNotExist) + .thenCompose(__ -> createParents(parent)); + } + }) + .exceptionallyCompose( + ex -> { + if (ex.getCause() instanceof KeyAlreadyExistsException) { + return CompletableFuture.completedFuture(null); + } + return CompletableFuture.failedFuture(ex.getCause()); + }); + } + + @Override + public void close() throws Exception { + if (client != null) { + client.close(); + } + super.close(); + } + + public Optional getMetadataEventSynchronizer() { + return synchronizer; + } + + private record PathWithPutResult(String path, PutResult result) {} +} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java new file mode 100644 index 0000000000000..a4c52134a8a75 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStoreProvider.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl.oxia; + +import lombok.NonNull; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.metadata.api.MetadataStore; +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.MetadataStoreProvider; + +public class OxiaMetadataStoreProvider implements MetadataStoreProvider { + // declare the specific namespace to avoid any changes in the future. + public static final String DefaultNamespace = "default"; + + public static final String OXIA_SCHEME = "oxia"; + public static final String OXIA_SCHEME_IDENTIFIER = OXIA_SCHEME + ":"; + + @Override + public String urlScheme() { + return OXIA_SCHEME; + } + + @Override + public @NonNull MetadataStore create( + String metadataURL, MetadataStoreConfig metadataStoreConfig, boolean enableSessionWatcher) + throws MetadataStoreException { + var serviceAddress = getServiceAddressAndNamespace(metadataURL); + try { + return new OxiaMetadataStore( + serviceAddress.getLeft(), + serviceAddress.getRight(), + metadataStoreConfig, + enableSessionWatcher); + } catch (Exception e) { + throw new MetadataStoreException(e); + } + } + + @NonNull + Pair getServiceAddressAndNamespace(String metadataURL) + throws MetadataStoreException { + if (metadataURL == null || !metadataURL.startsWith(urlScheme() + "://")) { + throw new MetadataStoreException("Invalid metadata URL. Must start with 'oxia://'."); + } + final var addressWithNamespace = metadataURL.substring("oxia://".length()); + final var split = addressWithNamespace.split("/"); + if (split.length > 2) { + throw new MetadataStoreException( + "Invalid metadata URL." + + " the oxia metadata format should be 'oxia://host:port/[namespace]'."); + } + if (split.length == 1) { + // Use default namespace + return Pair.of(split[0], DefaultNamespace); + } + return Pair.of(split[0], split[1]); + } +} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/package-info.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/package-info.java new file mode 100644 index 0000000000000..d63afa5b0a8f0 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/package-info.java @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl.oxia; diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java index 411ee038c48b0..491e3d0b9640c 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java @@ -22,6 +22,7 @@ import static org.testng.Assert.assertTrue; import io.etcd.jetcd.launcher.EtcdCluster; import io.etcd.jetcd.test.EtcdClusterExtension; +import io.streamnative.oxia.testcontainers.OxiaContainer; import java.io.File; import java.net.URI; import java.util.UUID; @@ -39,6 +40,8 @@ public abstract class BaseMetadataStoreTest extends TestRetrySupport { protected TestZKServer zks; protected EtcdCluster etcdCluster; + protected OxiaContainer oxiaServer; + @BeforeClass(alwaysRun = true) @Override public void setup() throws Exception { @@ -59,6 +62,11 @@ public void cleanup() throws Exception { etcdCluster.close(); etcdCluster = null; } + + if (oxiaServer != null) { + oxiaServer.close(); + oxiaServer = null; + } } private static String createTempFolder() { @@ -79,6 +87,7 @@ public Object[][] implementations() { {"Memory", stringSupplier(() -> "memory:" + UUID.randomUUID())}, {"RocksDB", stringSupplier(() -> "rocksdb:" + createTempFolder())}, {"Etcd", stringSupplier(() -> "etcd:" + getEtcdClusterConnectString())}, + {"Oxia", stringSupplier(() -> "oxia://" + getOxiaServerConnectString())}, }; } @@ -87,9 +96,18 @@ public Object[][] distributedImplementations() { return new Object[][]{ {"ZooKeeper", stringSupplier(() -> zks.getConnectionString())}, {"Etcd", stringSupplier(() -> "etcd:" + getEtcdClusterConnectString())}, + {"Oxia", stringSupplier(() -> "oxia://" + getOxiaServerConnectString())}, }; } + private synchronized String getOxiaServerConnectString() { + if (oxiaServer == null) { + oxiaServer = new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME); + oxiaServer.start(); + } + return oxiaServer.getServiceAddress(); + } + private synchronized String getEtcdClusterConnectString() { if (etcdCluster == null) { etcdCluster = EtcdClusterExtension.builder().withClusterName("test").withNodes(1).withSsl(false).build() diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java index 227c0e2c9dc35..b3b95ddc58076 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java @@ -34,7 +34,7 @@ import org.testng.annotations.Test; @Slf4j -public class MetadataBenchmark extends MetadataStoreTest { +public class MetadataBenchmark extends BaseMetadataStoreTest { @Test(dataProvider = "impl", enabled = false) public void testGet(String provider, Supplier urlSupplier) throws Exception { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java index 244ed025e3ed9..b1578188c681d 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java @@ -62,27 +62,28 @@ public class MetadataStoreTest extends BaseMetadataStoreTest { @Test(dataProvider = "impl") public void emptyStoreTest(String provider, Supplier urlSupplier) throws Exception { + String prefix = newKey(); @Cleanup MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().fsyncEnable(false).build()); - assertFalse(store.exists("/non-existing-key").join()); - assertFalse(store.exists("/non-existing-key/child").join()); - assertFalse(store.get("/non-existing-key").join().isPresent()); - assertFalse(store.get("/non-existing-key/child").join().isPresent()); + assertFalse(store.exists(prefix + "/non-existing-key").join()); + assertFalse(store.exists(prefix + "/non-existing-key/child").join()); + assertFalse(store.get(prefix + "/non-existing-key").join().isPresent()); + assertFalse(store.get(prefix + "/non-existing-key/child").join().isPresent()); - assertEquals(store.getChildren("/non-existing-key").join(), Collections.emptyList()); - assertEquals(store.getChildren("/non-existing-key/child").join(), Collections.emptyList()); + assertEquals(store.getChildren(prefix + "/non-existing-key").join(), Collections.emptyList()); + assertEquals(store.getChildren(prefix + "/non-existing-key/child").join(), Collections.emptyList()); try { - store.delete("/non-existing-key", Optional.empty()).join(); + store.delete(prefix + "/non-existing-key", Optional.empty()).join(); fail("Should have failed"); } catch (CompletionException e) { assertException(e, NotFoundException.class); } try { - store.delete("/non-existing-key", Optional.of(1L)).join(); + store.delete(prefix + "/non-existing-key", Optional.of(1L)).join(); fail("Should have failed"); } catch (CompletionException e) { assertTrue(NotFoundException.class.isInstance(e.getCause()) || BadVersionException.class.isInstance( @@ -400,6 +401,10 @@ public void testDeleteRecursive(String provider, Supplier urlSupplier) t @Test(dataProvider = "impl") public void testDeleteUnusedDirectories(String provider, Supplier urlSupplier) throws Exception { + if (provider.equals("Oxia")) { + return; + } + @Cleanup MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().fsyncEnable(false).build()); @@ -710,10 +715,11 @@ public void testExistsDistributed(String provider, Supplier urlSupplier) assertTrue(store1.exists(parent).get()); assertFalse(store1.exists(parent + "/a").get()); store2.put(parent + "/a", value, Optional.empty()).get(); - assertTrue(store1.exists(parent + "/a").get()); + + Awaitility.await() + .untilAsserted(() -> assertTrue(store1.exists(parent + "/a").get())); + // There is a chance watcher event is not triggered before the store1.exists() call. - Awaitility.await().atMost(3, TimeUnit.SECONDS) - .pollInterval(100, TimeUnit.MILLISECONDS) - .untilAsserted(() -> assertFalse(store1.exists(parent + "/b").get())); + assertFalse(store1.exists(parent + "/b").get()); } }