Skip to content

Commit

Permalink
feat(lock): support basic distributed lock api (#178)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao authored Sep 3, 2024
1 parent 9e021bc commit 3961ef4
Show file tree
Hide file tree
Showing 14 changed files with 1,234 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

public interface AsyncLock {

/** Represents the different status of a lock. */
enum LockStatus {
INIT,
ACQUIRING,
ACQUIRED,
RELEASING,
RELEASED;
}

LockStatus getStatus();

/**
* Asynchronously acquires the lock.
*
* @return a CompletableFuture that completes when the lock is acquired
*/
CompletableFuture<Void> lock();

/**
* Tries to acquire the lock asynchronously.
*
* @return a CompletableFuture that completes when the lock is acquired or not. The future will
* complete exceptionally with a {@link
* io.streamnative.oxia.client.api.exceptions.LockException.LockBusyException} if the lock is
* acquired by others, or with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException} if the
* current lock status is not {@link LockStatus#INIT} or {@link LockStatus#RELEASED}.
* Additionally, the future will complete exceptionally with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException} in case of an unknown error.
*/
CompletableFuture<Void> tryLock();

/**
* Asynchronously releases the lock.
*
* @return a CompletableFuture that completes when the lock is acquired or not. The future will
* complete exceptionally with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException} if the
* current lock status is {@link LockStatus#INIT}. In the case of an unknown error, the future
* will complete exceptionally with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException}.
*/
CompletableFuture<Void> unlock();

/**
* Asynchronously acquires the lock using a specified ExecutorService.
*
* @param executorService the ExecutorService to use for acquiring the lock
* @return a CompletableFuture that completes when the lock is acquired
*/
CompletableFuture<Void> lock(ExecutorService executorService);

/**
* Tries to acquire the lock asynchronously using a specified ExecutorService.
*
* @param executorService the ExecutorService to use for acquiring the lock
* @return a CompletableFuture that completes when the lock is acquired or not. The future will
* complete exceptionally with a {@link
* io.streamnative.oxia.client.api.exceptions.LockException.LockBusyException} if the lock is
* acquired by others, or with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException} if the
* current lock status is not {@link LockStatus#INIT} or {@link LockStatus#RELEASED}.
* Additionally, the future will complete exceptionally with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException} in case of an unknown error.
*/
CompletableFuture<Void> tryLock(ExecutorService executorService);

/**
* Asynchronously releases the lock using a specified ExecutorService.
*
* @param executorService the ExecutorService to use for releasing the lock
* @return a CompletableFuture that completes when the lock is acquired or not. The future will
* complete exceptionally with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException} if the
* current lock status is {@link LockStatus#INIT}. In the case of an unknown error, the future
* will complete exceptionally with an {@link
* io.streamnative.oxia.client.api.exceptions.LockException}.
*/
CompletableFuture<Void> unlock(ExecutorService executorService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,11 @@ void rangeScan(
* @param notificationCallback A callback to receive notifications.
*/
void notifications(@NonNull Consumer<Notification> notificationCallback);

/**
* Get the client identifier
*
* @return The client identifier
*/
String getClientIdentifier();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

import java.io.Closeable;

public interface LockManager extends Closeable {

/**
* Gets a lightweight asynchronous lock for the specified key with default backoff options.
*
* @param key the key associated with the lock
* @return an AsyncLock instance for the specified key
*/
default AsyncLock getLightWeightLock(String key) {
return getLightWeightLock(key, OptionBackoff.DEFAULT);
}

/**
* Gets a lightweight asynchronous lock for the specified key with custom backoff options.
*
* @param key the key associated with the lock
* @param optionBackoff the backoff options to be used for lock acquisition retries
* @return an AsyncLock instance for the specified key
*/
AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

import java.util.concurrent.TimeUnit;

public record OptionAutoRevalidate(boolean enabled, long initDelay, long delay, TimeUnit unit) {

public static final OptionAutoRevalidate DEFAULT =
new OptionAutoRevalidate(true, 15, 15, TimeUnit.MINUTES);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.util.concurrent.TimeUnit;

public record OptionBackoff(
long initDelay, TimeUnit initDelayUnit, long maxDelay, TimeUnit maxDelayUnit) {
public static OptionBackoff DEFAULT = new OptionBackoff(10, MILLISECONDS, 500, MILLISECONDS);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.api.exceptions;

import io.streamnative.oxia.client.api.AsyncLock;
import lombok.Getter;

public sealed class LockException extends OxiaException {
LockException(String message) {
super(message);
}

LockException(Throwable cause) {
super("", cause);
}

public static LockException wrap(Throwable ex) {
if (ex instanceof LockException) {
return (LockException) ex;
} else {
return new LockException(ex);
}
}

public static final class LockBusyException extends LockException {
public LockBusyException() {
super("lock busy");
}
}

public static final class AcquireTimeoutException extends LockException {
public AcquireTimeoutException() {
super("lock acquire timeout");
}
}

@Getter
public static final class IllegalLockStatusException extends LockException {
private final AsyncLock.LockStatus expect;
private final AsyncLock.LockStatus actual;

public IllegalLockStatusException(AsyncLock.LockStatus expect, AsyncLock.LockStatus actual) {
super("illegal lock status. expect: " + expect.name() + ", actual: " + actual.name());
this.expect = expect;
this.actual = actual;
}
}

@Getter
public static final class UnknownLockStatusException extends LockException {
private final AsyncLock.LockStatus actual;

public UnknownLockStatusException(AsyncLock.LockStatus actual) {
super("unknown lock status: " + actual.name());
this.actual = actual;
}
}
}
Loading

0 comments on commit 3961ef4

Please sign in to comment.