Skip to content

Commit

Permalink
[Fix-14008][registry] Fix etcd memory leak due to leaseId (#14034)
Browse files Browse the repository at this point in the history
  • Loading branch information
eye-gu authored Jul 28, 2023
1 parent 9ce8871 commit d87a0d8
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.etcd;

import org.apache.dolphinscheduler.registry.api.RegistryException;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

import lombok.extern.slf4j.Slf4j;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.grpc.stub.StreamObserver;

@Slf4j
public class EtcdKeepAliveLeaseManager {

private final Map<String, Long> keyLeaseCache = new ConcurrentHashMap<>();

private final Client client;

EtcdKeepAliveLeaseManager(Client client) {
this.client = client;
}

long getOrCreateKeepAliveLease(String key, long timeToLive) {
return keyLeaseCache.computeIfAbsent(key, $ -> {
try {
long leaseId = client.getLeaseClient().grant(timeToLive).get().getID();
client.getLeaseClient().keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {

@Override
public void onNext(LeaseKeepAliveResponse value) {
}

@Override
public void onError(Throwable t) {
log.error("Lease {} keep alive error, remove cache with key:{}", leaseId, key, t);
keyLeaseCache.remove(key);
}

@Override
public void onCompleted() {
log.error("Lease {} keep alive complete, remove cache with key:{}", leaseId, key);
keyLeaseCache.remove(key);
}
});
log.info("Lease {} keep alive create with key:{}", leaseId, key);
return leaseId;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RegistryException("Failed to create lease key: " + key, e);
} catch (ExecutionException e) {
throw new RegistryException("Failed to create lease key: " + key, e);
}
});
}

Optional<Long> getKeepAliveLease(String key) {
return Optional.ofNullable(keyLeaseCache.get(key));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class EtcdRegistry implements Registry {

private final Client client;
private EtcdConnectionStateListener etcdConnectionStateListener;

private EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;

public static final String FOLDER_SEPARATOR = "/";
// save the lock info for thread
// key:lockKey Value:leaseId
Expand Down Expand Up @@ -120,6 +123,7 @@ public EtcdRegistry(EtcdRegistryProperties registryProperties) throws SSLExcepti
client = clientBuilder.build();
log.info("Started Etcd Registry...");
etcdConnectionStateListener = new EtcdConnectionStateListener(client);
etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client);
}

/**
Expand Down Expand Up @@ -206,9 +210,7 @@ public void put(String key, String value, boolean deleteOnDisconnect) {
try {
if (deleteOnDisconnect) {
// keep the key by lease, if disconnected, the lease will expire and the key will delete
long leaseId = client.getLeaseClient().grant(TIME_TO_LIVE_SECONDS).get().getID();
client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
}));
long leaseId = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease(key, TIME_TO_LIVE_SECONDS);
PutOption putOption = PutOption.newBuilder().withLeaseId(leaseId).build();
client.getKVClient().put(byteSequence(key), byteSequence(value), putOption).get();
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.plugin.registry.etcd;

import java.io.IOException;
import java.util.Optional;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import io.etcd.jetcd.Client;
import io.etcd.jetcd.launcher.EtcdCluster;
import io.etcd.jetcd.test.EtcdClusterExtension;

class EtcdKeepAliveLeaseManagerTest {

static EtcdClusterExtension server;

static Client client;

static EtcdKeepAliveLeaseManager etcdKeepAliveLeaseManager;
@BeforeAll
public static void before() throws Exception {
server = EtcdClusterExtension.builder()
.withNodes(1)
.withImage("ibmcom/etcd:3.2.24")
.build();
server.restart();

client = Client.builder().endpoints(server.clientEndpoints()).build();

etcdKeepAliveLeaseManager = new EtcdKeepAliveLeaseManager(client);
}

@Test
void getOrCreateKeepAliveLeaseTest() throws Exception {
long first = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3);
long second = etcdKeepAliveLeaseManager.getOrCreateKeepAliveLease("/test", 3);
Assertions.assertEquals(first, second);

client.getLeaseClient().revoke(first).get();

// wait for lease expire
Thread.sleep(3000);
Optional<Long> keepAliveLease = etcdKeepAliveLeaseManager.getKeepAliveLease("/test");
Assertions.assertFalse(keepAliveLease.isPresent());
}

@AfterAll
public static void after() throws IOException {
try (EtcdCluster closeServer = server.cluster()) {
client.close();
}
}
}

0 comments on commit d87a0d8

Please sign in to comment.