Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changed some tests thread array list into executor and added timeout … #149

Merged
merged 15 commits into from
Mar 30, 2021
Merged
35 changes: 25 additions & 10 deletions core/src/test/java/com/yahoo/oak/ComputeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

import com.yahoo.oak.common.OakCommonBuildersFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class ComputeTest {
Expand All @@ -26,10 +29,18 @@ public class ComputeTest {
private static int keySize = 10;
private static int valSize = Math.round(5 * K);
private static int numOfEntries;
ExecutorService executor;


private static ArrayList<Thread> threads = new ArrayList<>(NUM_THREADS);
private static CountDownLatch latch = new CountDownLatch(1);
private CountDownLatch latch;



@Before
public void setup() {
executor = Executors.newFixedThreadPool(NUM_THREADS);
latch = new CountDownLatch(1);
}

private static Consumer<OakScopedWriteBuffer> computer = oakWBuffer -> {
if (oakWBuffer.getInt(0) == oakWBuffer.getInt(Integer.BYTES * keySize)) {
Expand Down Expand Up @@ -105,7 +116,7 @@ public void testMain() throws InterruptedException {


for (int i = 0; i < NUM_THREADS; i++) {
threads.add(new Thread(new RunThreads(latch)));
executor.execute(new RunThreads(latch));
}

for (int i = 0; i < (int) Math.round(numOfEntries * 0.5); i++) {
Expand All @@ -116,14 +127,18 @@ public void testMain() throws InterruptedException {
oak.zc().putIfAbsent(key, val);
}

for (int i = 0; i < NUM_THREADS; i++) {
threads.get(i).start();
}

latch.countDown();

for (int i = 0; i < NUM_THREADS; i++) {
threads.get(i).join();
executor.shutdown();
try {
executor.shutdown();
OrHayat marked this conversation as resolved.
Show resolved Hide resolved
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
Assert.fail("should have done all the tasks in time");
OrHayat marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (InterruptedException e) {
executor.shutdownNow();
Assert.fail("failed to run all the tasks in the executor service");
OrHayat marked this conversation as resolved.
Show resolved Hide resolved
}
OrHayat marked this conversation as resolved.
Show resolved Hide resolved

for (int i = 0; i < numOfEntries; i++) {
Expand Down
27 changes: 17 additions & 10 deletions core/src/test/java/com/yahoo/oak/ConcurrentPutRemoveTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import org.junit.Ignore;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -25,7 +27,8 @@ public class ConcurrentPutRemoveTest {
private static final int NUM_THREADS = 1;
private static final int K = 1024;
private static final int NUM_OF_ENTRIES = 10 * K;
private ArrayList<Thread> threads;
private ExecutorService executor;

private AtomicBoolean stop;
private AtomicInteger[] status;
private CyclicBarrier barrier;
Expand All @@ -36,7 +39,7 @@ public void initStuff() {
oak = builder.build();
barrier = new CyclicBarrier(NUM_THREADS + 1);
stop = new AtomicBoolean(false);
threads = new ArrayList<>(NUM_THREADS);
executor = Executors.newFixedThreadPool(NUM_THREADS);
status = new AtomicInteger[NUM_OF_ENTRIES];
for (int i = 0; i < status.length; i++) {
status[i] = new AtomicInteger(0);
Expand Down Expand Up @@ -79,7 +82,7 @@ public void run() {
@Test
public void testMain() throws InterruptedException {
for (int i = 0; i < NUM_THREADS; i++) {
threads.add(new Thread(new RunThread()));
executor.execute(new RunThread());
}
Random r = new Random();
for (int i = 0; i < (int) Math.round(NUM_OF_ENTRIES * 0.5); ) {
Expand All @@ -100,22 +103,26 @@ public void testMain() throws InterruptedException {
}
}

for (int i = 0; i < NUM_THREADS; i++) {
threads.get(i).start();
}

try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
Assert.fail("got unexpected exception");
}
OrHayat marked this conversation as resolved.
Show resolved Hide resolved

Thread.sleep(DURATION);

stop.set(true);

for (int i = 0; i < NUM_THREADS; i++) {
threads.get(i).join();
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
Assert.fail("should have done all the tasks in time");
}
} catch (InterruptedException e) {
executor.shutdownNow();
Assert.fail("failed to run all the tasks in the executor service");
}

for (int i = 0; i < NUM_OF_ENTRIES; i++) {
Expand Down
34 changes: 24 additions & 10 deletions core/src/test/java/com/yahoo/oak/FillTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import com.yahoo.oak.common.OakCommonBuildersFactory;
import com.yahoo.oak.common.integer.OakIntSerializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class FillTest {

Expand All @@ -26,9 +29,16 @@ public class FillTest {
private static final int VALUE_SIZE = Math.round(5 * K);
private static final int NUM_OF_ENTRIES = 100;

private static ArrayList<Thread> threads = new ArrayList<>(NUM_THREADS);
private static CountDownLatch latch = new CountDownLatch(1);
private static CountDownLatch latch;
private static ExecutorService executor;
OrHayat marked this conversation as resolved.
Show resolved Hide resolved

liran-funaro marked this conversation as resolved.
Show resolved Hide resolved

@Before
public void setup() {
latch = new CountDownLatch(1);
executor = Executors.newFixedThreadPool(NUM_THREADS);

}
static class RunThreads implements Runnable {
CountDownLatch latch;

Expand Down Expand Up @@ -90,25 +100,29 @@ public void testMain() throws InterruptedException {


for (int i = 0; i < NUM_THREADS; i++) {
threads.add(new Thread(new RunThreads(latch)));
executor.execute(new RunThreads(latch));
}

for (int i = 0; i < (int) Math.round(NUM_OF_ENTRIES * 0.5); i++) {
oak.zc().putIfAbsent(i, i);
}

for (int i = 0; i < NUM_THREADS; i++) {
threads.get(i).start();
}

long startTime = System.currentTimeMillis();

latch.countDown();

for (int i = 0; i < NUM_THREADS; i++) {
threads.get(i).join();
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
Assert.fail("should have done all the tasks in time");
}
} catch (InterruptedException e) {
executor.shutdownNow();
Assert.fail("failed to run all the tasks in the executor service");
}

liran-funaro marked this conversation as resolved.
Show resolved Hide resolved

long stopTime = System.currentTimeMillis();

for (Integer i = 0; i < NUM_OF_ENTRIES / 2; i++) {
Expand Down
27 changes: 17 additions & 10 deletions core/src/test/java/com/yahoo/oak/MultiThreadComputeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class MultiThreadComputeTest {

private OakMap<Integer, Integer> oak;
private static final int NUM_THREADS = 31;
private ArrayList<Thread> threads;
private ExecutorService executor;
private CountDownLatch latch;
private Consumer<OakScopedWriteBuffer> computer;
private Consumer<OakScopedWriteBuffer> emptyComputer;
Expand All @@ -32,13 +34,12 @@ public void init() {
.setChunkMaxItems(MAX_ITEMS_PER_CHUNK);
oak = builder.build();
latch = new CountDownLatch(1);
threads = new ArrayList<>(NUM_THREADS);
executor = Executors.newFixedThreadPool(NUM_THREADS);
computer = oakWBuffer -> {
if (oakWBuffer.getInt(0) == 0) {
oakWBuffer.putInt(0, 1);
}
};

emptyComputer = oakWBuffer -> {
};
}
Expand Down Expand Up @@ -135,14 +136,20 @@ public void run() {
@Test
public void testThreadsCompute() throws InterruptedException {
for (int i = 0; i < NUM_THREADS; i++) {
threads.add(new Thread(new MultiThreadComputeTest.RunThreads(latch)));
}
for (int i = 0; i < NUM_THREADS; i++) {
threads.get(i).start();
executor.execute(new MultiThreadComputeTest.RunThreads(latch));
}

latch.countDown();
for (int i = 0; i < NUM_THREADS; i++) {
threads.get(i).join();

executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
Assert.fail("should have done all the tasks in time");
}
} catch (InterruptedException e) {
executor.shutdownNow();
Assert.fail("failed to run all the tasks in the executor service");
}
for (Integer i = 0; i < MAX_ITEMS_PER_CHUNK; i++) {
Integer value = oak.get(i);
Expand Down
27 changes: 18 additions & 9 deletions core/src/test/java/com/yahoo/oak/MultiThreadRangeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class MultiThreadRangeTest {

private OakMap<Integer, Integer> oak;
private static final int NUM_THREADS = 31;
private ArrayList<Thread> threads;
private ExecutorService executor;

private CountDownLatch latch;
private static final int MAX_ITEMS_PER_CHUNK = 2048;

Expand All @@ -31,7 +34,8 @@ public void init() {
.setChunkMaxItems(MAX_ITEMS_PER_CHUNK);
oak = builder.build();
latch = new CountDownLatch(1);
threads = new ArrayList<>(NUM_THREADS);
executor = Executors.newFixedThreadPool(NUM_THREADS);

liran-funaro marked this conversation as resolved.
Show resolved Hide resolved
}

@After
Expand Down Expand Up @@ -69,7 +73,7 @@ public void run() {
@Test
public void testRange() throws InterruptedException {
for (int i = 0; i < NUM_THREADS; i++) {
threads.add(new Thread(new MultiThreadRangeTest.RunThreads(latch)));
executor.execute(new MultiThreadRangeTest.RunThreads(latch));
}

// fill
Expand All @@ -81,12 +85,17 @@ public void testRange() throws InterruptedException {
}
}

for (int i = 0; i < NUM_THREADS; i++) {
threads.get(i).start();
}
latch.countDown();
for (int i = 0; i < NUM_THREADS; i++) {
threads.get(i).join();

executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
Assert.fail("should have done all the tasks in time");
}
} catch (InterruptedException e) {
executor.shutdownNow();
Assert.fail("failed to run all the tasks in the executor service");
}

int size = 0;
Expand Down
Loading