Skip to content

Commit

Permalink
[feat] PIP-335: Add support Oxia as a metadata store (apache#22007)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Feb 7, 2024
1 parent 1c5b4b8 commit a83e9ed
Show file tree
Hide file tree
Showing 11 changed files with 450 additions and 13 deletions.
2 changes: 2 additions & 0 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,8 @@ The Apache Software License, Version 2.0
- io.dropwizard.metrics-metrics-jmx-4.1.12.1.jar
* Prometheus
- io.prometheus-simpleclient_httpserver-0.16.0.jar
* Oxia
- io.streamnative.oxia-oxia-client-0.1.0-shaded.jar
* Java JSON WebTokens
- io.jsonwebtoken-jjwt-api-0.11.1.jar
- io.jsonwebtoken-jjwt-impl-0.11.1.jar
Expand Down
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ flexible messaging model and an intuitive client API.</description>
<apache-http-client.version>4.5.13</apache-http-client.version>
<apache-httpcomponents.version>4.4.15</apache-httpcomponents.version>
<jetcd.version>0.7.5</jetcd.version>
<oxia.version>0.1.0</oxia.version>
<snakeyaml.version>2.0</snakeyaml.version>
<ant.version>1.10.12</ant.version>
<seancfoley.ipaddress.version>5.3.3</seancfoley.ipaddress.version>
Expand Down Expand Up @@ -1152,6 +1153,18 @@ flexible messaging model and an intuitive client API.</description>
<version>${sketches.version}</version>
</dependency>

<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-client</artifactId>
<version>${oxia.version}</version>
<classifier>shaded</classifier>
</dependency>
<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-testcontainers</artifactId>
<version>${oxia.version}</version>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bom</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-testcontainers</artifactId>
<scope>test</scope>
</dependency>

<!-- zookeeper server -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
Expand Down
12 changes: 12 additions & 0 deletions pulsar-metadata/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@
</exclusions>
</dependency>

<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-client</artifactId>
<classifier>shaded</classifier>
</dependency>

<dependency>
<groupId>io.streamnative.oxia</groupId>
<artifactId>oxia-testcontainers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore.MEMORY_SCHEME_IDENTIFIER;
import static org.apache.pulsar.metadata.impl.RocksdbMetadataStore.ROCKSDB_SCHEME_IDENTIFIER;
import static org.apache.pulsar.metadata.impl.ZKMetadataStore.ZK_SCHEME_IDENTIFIER;
import static org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStoreProvider.OXIA_SCHEME_IDENTIFIER;
import com.google.common.base.Splitter;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -31,6 +32,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreProvider;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStoreProvider;

@Slf4j
public class MetadataStoreFactoryImpl {
Expand Down Expand Up @@ -66,6 +68,7 @@ static Map<String, MetadataStoreProvider> loadProviders() {
providers.put(MEMORY_SCHEME_IDENTIFIER, new MemoryMetadataStoreProvider());
providers.put(ROCKSDB_SCHEME_IDENTIFIER, new RocksdbMetadataStoreProvider());
providers.put(ETCD_SCHEME_IDENTIFIER, new EtcdMetadataStoreProvider());
providers.put(OXIA_SCHEME_IDENTIFIER, new OxiaMetadataStoreProvider());
providers.put(ZK_SCHEME_IDENTIFIER, new ZkMetadataStoreProvider());

String factoryClasses = System.getProperty(METADATASTORE_PROVIDERS_PROPERTY, "");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.impl.oxia;

import io.streamnative.oxia.client.OxiaClientBuilder;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.DeleteOption;
import io.streamnative.oxia.client.api.KeyAlreadyExistsException;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.api.PutOption;
import io.streamnative.oxia.client.api.PutResult;
import io.streamnative.oxia.client.api.UnexpectedVersionIdException;
import io.streamnative.oxia.client.api.Version;
import java.time.Duration;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;

@Slf4j
public class OxiaMetadataStore extends AbstractMetadataStore {

private final AsyncOxiaClient client;

private final String identity;
private final Optional<MetadataEventSynchronizer> synchronizer;

OxiaMetadataStore(
@NonNull String serviceAddress,
@NonNull String namespace,
@NonNull MetadataStoreConfig metadataStoreConfig,
boolean enableSessionWatcher)
throws Exception {
super("oxia-metadata");

var linger = metadataStoreConfig.getBatchingMaxDelayMillis();
if (!metadataStoreConfig.isBatchingEnabled()) {
linger = 0;
}
this.synchronizer = Optional.ofNullable(metadataStoreConfig.getSynchronizer());
identity = UUID.randomUUID().toString();
client =
new OxiaClientBuilder(serviceAddress)
.clientIdentifier(identity)
.namespace(namespace)
.sessionTimeout(Duration.ofMillis(metadataStoreConfig.getSessionTimeoutMillis()))
.batchLinger(Duration.ofMillis(linger))
.maxRequestsPerBatch(metadataStoreConfig.getBatchingMaxOperations())
.asyncClient()
.get();
client.notifications(this::notificationCallback);
super.registerSyncListener(Optional.ofNullable(metadataStoreConfig.getSynchronizer()));
}

private void notificationCallback(Notification notification) {
if (notification instanceof Notification.KeyCreated keyCreated) {
receivedNotification(
new org.apache.pulsar.metadata.api.Notification(
NotificationType.Created, keyCreated.key()));
notifyParentChildrenChanged(keyCreated.key());

} else if (notification instanceof Notification.KeyModified keyModified) {
receivedNotification(
new org.apache.pulsar.metadata.api.Notification(
NotificationType.Modified, keyModified.key()));
} else if (notification instanceof Notification.KeyDeleted keyDeleted) {
receivedNotification(
new org.apache.pulsar.metadata.api.Notification(
NotificationType.Deleted, keyDeleted.key()));
notifyParentChildrenChanged(keyDeleted.key());
} else {
log.error("Unknown notification type {}", notification);
}
}

Optional<GetResult> convertGetResult(
String path, io.streamnative.oxia.client.api.GetResult result) {
if (result == null) {
return Optional.empty();
}
return Optional.of(result)
.map(
oxiaResult ->
new GetResult(oxiaResult.getValue(), convertStat(path, oxiaResult.getVersion())));
}

Stat convertStat(String path, Version version) {
return new Stat(
path,
version.versionId(),
version.createdTimestamp(),
version.modifiedTimestamp(),
version.sessionId().isPresent(),
version.clientIdentifier().stream().anyMatch(identity::equals),
version.modificationsCount() == 0);
}

@Override
protected CompletableFuture<List<String>> getChildrenFromStore(String path) {
var pathWithSlash = path + "/";

return client
.list(pathWithSlash, pathWithSlash + "/")
.thenApply(
children ->
children.stream().map(child -> child.substring(pathWithSlash.length())).toList())
.exceptionallyCompose(this::convertException);
}

@Override
protected CompletableFuture<Boolean> existsFromStore(String path) {
return client.get(path).thenApply(Objects::nonNull)
.exceptionallyCompose(this::convertException);
}

@Override
protected CompletableFuture<Optional<GetResult>> storeGet(String path) {
return client.get(path).thenApply(res -> convertGetResult(path, res))
.exceptionallyCompose(this::convertException);
}

@Override
protected CompletableFuture<Void> storeDelete(String path, Optional<Long> expectedVersion) {
return getChildrenFromStore(path)
.thenCompose(
children -> {
if (children.size() > 0) {
return CompletableFuture.failedFuture(
new MetadataStoreException("Key '" + path + "' has children"));
} else {
var delOption =
expectedVersion
.map(DeleteOption::ifVersionIdEquals)
.orElse(DeleteOption.Unconditionally);
CompletableFuture<Boolean> result = client.delete(path, delOption);
return result
.thenCompose(
exists -> {
if (!exists) {
return CompletableFuture.failedFuture(
new MetadataStoreException.NotFoundException(
"Key '" + path + "' does not exist"));
}
return CompletableFuture.completedFuture((Void) null);
})
.exceptionallyCompose(this::convertException);
}
});
}

@Override
protected CompletableFuture<Stat> storePut(
String path, byte[] data, Optional<Long> optExpectedVersion, EnumSet<CreateOption> options) {
CompletableFuture<Void> parentsCreated = createParents(path);
return parentsCreated.thenCompose(
__ -> {
var expectedVersion = optExpectedVersion;
if (expectedVersion.isPresent()
&& expectedVersion.get() != -1L
&& options.contains(CreateOption.Sequential)) {
return CompletableFuture.failedFuture(
new MetadataStoreException(
"Can't have expectedVersion and Sequential at the same time"));
}
CompletableFuture<String> actualPath;
if (options.contains(CreateOption.Sequential)) {
var parent = parent(path);
var parentPath = parent == null ? "/" : parent;

actualPath =
client
.put(parentPath, new byte[] {})
.thenApply(
r -> String.format("%s%010d", path, r.version().modificationsCount()));
expectedVersion = Optional.of(-1L);
} else {
actualPath = CompletableFuture.completedFuture(path);
}
var versionCondition =
expectedVersion
.map(
ver -> {
if (ver == -1) {
return PutOption.IfRecordDoesNotExist;
}
return PutOption.ifVersionIdEquals(ver);
})
.orElse(PutOption.Unconditionally);
var putOptions =
options.contains(CreateOption.Ephemeral)
? new PutOption[] {PutOption.AsEphemeralRecord, versionCondition}
: new PutOption[] {versionCondition};
return actualPath
.thenCompose(
aPath ->
client
.put(aPath, data, putOptions)
.thenApply(res -> new PathWithPutResult(aPath, res)))
.thenApply(res -> convertStat(res.path(), res.result().version()))
.exceptionallyCompose(this::convertException);
});
}

private <T> CompletionStage<T> convertException(Throwable ex) {
if (ex.getCause() instanceof UnexpectedVersionIdException
|| ex.getCause() instanceof KeyAlreadyExistsException) {
return CompletableFuture.failedFuture(
new MetadataStoreException.BadVersionException(ex.getCause()));
} else if (ex.getCause() instanceof IllegalStateException) {
return CompletableFuture.failedFuture(new MetadataStoreException.AlreadyClosedException(ex.getCause()));
} else {
return CompletableFuture.failedFuture(ex.getCause());
}
}

private CompletableFuture<Void> createParents(String path) {
var parent = parent(path);
if (parent == null || parent.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return exists(parent)
.thenCompose(
exists -> {
if (exists) {
return CompletableFuture.completedFuture(null);
} else {
return client
.put(parent, new byte[] {}, PutOption.IfRecordDoesNotExist)
.thenCompose(__ -> createParents(parent));
}
})
.exceptionallyCompose(
ex -> {
if (ex.getCause() instanceof KeyAlreadyExistsException) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.failedFuture(ex.getCause());
});
}

@Override
public void close() throws Exception {
if (client != null) {
client.close();
}
super.close();
}

public Optional<MetadataEventSynchronizer> getMetadataEventSynchronizer() {
return synchronizer;
}

private record PathWithPutResult(String path, PutResult result) {}
}
Loading

0 comments on commit a83e9ed

Please sign in to comment.