Skip to content

Commit

Permalink
[Improve][Test] Add test for ResourceManager to keep task will be dep…
Browse files Browse the repository at this point in the history
…loyed in different node (apache#5518)

* [Improve][Test] Add test for ResourceManager to keep task will be deployed in different node
  • Loading branch information
Hisoka-X authored Oct 7, 2023
1 parent fdb41d2 commit a6e5850
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.spi.impl.InternalCompletableFuture;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
Expand Down Expand Up @@ -75,7 +73,7 @@ private void initWorker() {
.map(Member::getAddress)
.collect(Collectors.toList());
log.info("initWorker live nodes: " + aliveWorker);
List<InternalCompletableFuture<Void>> futures =
List<CompletableFuture<Void>> futures =
aliveWorker.stream()
.map(
worker ->
Expand All @@ -86,7 +84,7 @@ private void initWorker() {
worker, (WorkerProfile) p);
}))
.collect(Collectors.toList());
futures.forEach(InternalCompletableFuture::join);
futures.forEach(CompletableFuture::join);
log.info("registerWorker: " + registerWorker);
}

Expand Down Expand Up @@ -155,7 +153,7 @@ public void close() {
isRunning = false;
}

protected <E> InvocationFuture<E> sendToMember(Operation operation, Address address) {
protected <E> CompletableFuture<E> sendToMember(Operation operation, Address address) {
return NodeEngineUtil.sendOperationToMemberNode(nodeEngine, operation, address);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.hazelcast.cluster.Address;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -139,7 +138,7 @@ private void addSlotToCacheMap(int index, SlotProfile slotProfile) {

private CompletableFuture<SlotAndWorkerProfile> singleResourceRequestToMember(
int i, ResourceProfile r, WorkerProfile workerProfile) {
InvocationFuture<SlotAndWorkerProfile> future =
CompletableFuture<SlotAndWorkerProfile> future =
resourceManager.sendToMember(
new RequestSlotOperation(jobId, r), workerProfile.getAddress());
return future.whenComplete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.IOException;
Expand All @@ -33,6 +34,7 @@
* Used to describe the status of the current Worker, including address and resource assign status
*/
@Data
@AllArgsConstructor
public class WorkerProfile implements IdentifiedDataSerializable {

private Address address;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.resourcemanager;

import org.apache.seatunnel.engine.server.resourcemanager.opeartion.RequestSlotOperation;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.resourcemanager.worker.WorkerProfile;
import org.apache.seatunnel.engine.server.service.slot.SlotAndWorkerProfile;

import com.hazelcast.cluster.Address;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.Operation;

import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;

/** Used to test ResourceManager, override init method to register more workers. */
public class FakeResourceManager extends AbstractResourceManager {
public FakeResourceManager(NodeEngine nodeEngine) {
super(nodeEngine);
init();
}

@Override
public void init() {
try {
Address address1 = new Address("localhost", 5801);
WorkerProfile workerProfile1 =
new WorkerProfile(
address1,
new ResourceProfile(),
new ResourceProfile(),
new SlotProfile[] {},
new SlotProfile[] {});
this.registerWorker.put(address1, workerProfile1);

Address address2 = new Address("localhost", 5802);
WorkerProfile workerProfile2 =
new WorkerProfile(
address2,
new ResourceProfile(),
new ResourceProfile(),
new SlotProfile[] {},
new SlotProfile[] {});
this.registerWorker.put(address2, workerProfile2);
Address address3 = new Address("localhost", 5803);
WorkerProfile workerProfile3 =
new WorkerProfile(
address3,
new ResourceProfile(),
new ResourceProfile(),
new SlotProfile[] {},
new SlotProfile[] {});
this.registerWorker.put(address3, workerProfile3);
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}

@Override
protected <E> CompletableFuture<E> sendToMember(Operation operation, Address address) {
if (operation instanceof RequestSlotOperation) {
return (CompletableFuture<E>)
CompletableFuture.completedFuture(
new SlotAndWorkerProfile(
new WorkerProfile(
address,
new ResourceProfile(),
new ResourceProfile(),
new SlotProfile[] {},
new SlotProfile[] {}),
new SlotProfile(address, 1, new ResourceProfile(), "")));
} else {
return super.sendToMember(operation, address);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.resourcemanager;

import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.resourcemanager.resource.ResourceProfile;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.hazelcast.cluster.Address;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class ResourceManagerFunctionTest
extends AbstractSeaTunnelServerTest<ResourceManagerFunctionTest> {

@Test
public void testApplyResourceWithRandomResult()
throws ExecutionException, InterruptedException {
FakeResourceManager resourceManager = new FakeResourceManager(nodeEngine);

List<ResourceProfile> resourceProfiles = new ArrayList<>();
resourceProfiles.add(new ResourceProfile());
resourceProfiles.add(new ResourceProfile());
resourceProfiles.add(new ResourceProfile());
resourceProfiles.add(new ResourceProfile());
resourceProfiles.add(new ResourceProfile());
List<SlotProfile> slotProfiles = resourceManager.applyResources(1L, resourceProfiles).get();
Assertions.assertEquals(slotProfiles.size(), 5);

Set<Address> addresses =
slotProfiles.stream().map(SlotProfile::getWorker).collect(Collectors.toSet());
Assertions.assertTrue(addresses.size() > 1);
}
}

0 comments on commit a6e5850

Please sign in to comment.