Skip to content

Commit

Permalink
Merge pull request #2 from merlimat/master
Browse files Browse the repository at this point in the history
Configure Travis CI build
  • Loading branch information
merlimat authored Sep 7, 2016
2 parents e757999 + 94b941a commit d876f75
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 104 deletions.
8 changes: 8 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@

language: java
jdk:
- oraclejdk8

cache:
directories:
- $HOME/.m2
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -47,7 +47,6 @@

public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase {

Executor executor = Executors.newCachedThreadPool();
private static final Logger log = LoggerFactory.getLogger(ManagedCursorConcurrencyTest.class);

private final AsyncCallbacks.DeleteCallback deleteCallback = new AsyncCallbacks.DeleteCallback() {
Expand Down Expand Up @@ -208,7 +207,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
assertEquals(closeFuture.get(), CLOSED);
}

@Test
@Test(timeOut = 30000)
public void testAckAndClose() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger_test_ack_and_close",
new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
Expand All @@ -226,51 +225,44 @@ public void testAckAndClose() throws Exception {
final CountDownLatch counter = new CountDownLatch(2);
final AtomicBoolean gotException = new AtomicBoolean(false);

Thread deleter = new Thread() {
public void run() {
try {
barrier.await();
// Deleter thread
cachedExecutor.execute(() -> {
try {
barrier.await();

for (Position position : addedEntries) {
cursor.asyncDelete(position, deleteCallback, position);
}
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
for (Position position : addedEntries) {
cursor.asyncDelete(position, deleteCallback, position);
}
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
};
});

Thread reader = new Thread() {
public void run() {
try {
barrier.await();
// Reader thread
cachedExecutor.execute(() -> {
try {
barrier.await();

for (int i = 0; i < 1000; i++) {
cursor.readEntries(1).forEach(e -> e.release());
}
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
for (int i = 0; i < 1000; i++) {
cursor.readEntries(1).forEach(e -> e.release());
}
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
};
System.out.println("starting deleter and reader.." + System.currentTimeMillis());
deleter.start();
reader.start();
});

counter.await();

assertEquals(gotException.get(), false);
System.out.println("Finished.." + System.currentTimeMillis());

}

@Test
@Test(timeOut = 30000)
public void testConcurrentIndividualDeletes() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(100));

Expand All @@ -291,24 +283,22 @@ public void testConcurrentIndividualDeletes() throws Exception {

for (int thread = 0; thread < Threads; thread++) {
final int myThread = thread;
executor.execute(new Runnable() {
public void run() {
try {
barrier.await();

for (int i = 0; i < N; i++) {
int threadId = i % Threads;
if (threadId == myThread) {
cursor.delete(addedEntries.get(i));
}
}
cachedExecutor.execute(() -> {
try {
barrier.await();

} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
for (int i = 0; i < N; i++) {
int threadId = i % Threads;
if (threadId == myThread) {
cursor.delete(addedEntries.get(i));
}
}

} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
counter.countDown();
}
});
}
Expand All @@ -319,7 +309,7 @@ public void run() {
assertEquals(cursor.getMarkDeletedPosition(), addedEntries.get(addedEntries.size() - 1));
}

@Test
@Test(timeOut = 30000)
public void testConcurrentReadOfSameEntry() throws Exception {
ManagedLedger ledger = factory.open("testConcurrentReadOfSameEntry", new ManagedLedgerConfig());
final int numCursors = 5;
Expand All @@ -346,7 +336,7 @@ public void testConcurrentReadOfSameEntry() throws Exception {
for (int i = 0; i < numCursors; i++) {
final int cursorIndex = i;
final ManagedCursor cursor = cursors.get(cursorIndex);
executor.execute(() -> {
cachedExecutor.execute(() -> {
try {
barrier.await();
for (int j = 0; j < N; j++) {
Expand All @@ -369,7 +359,7 @@ public void testConcurrentReadOfSameEntry() throws Exception {
assertNull(result.get());
}

@Test
@Test(timeOut = 30000)
public void testConcurrentIndividualDeletesWithGetNthEntry() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger",
new ManagedLedgerConfig().setMaxEntriesPerLedger(100).setThrottleMarkDelete(0.5));
Expand All @@ -390,12 +380,12 @@ public void testConcurrentIndividualDeletesWithGetNthEntry() throws Exception {
final AtomicInteger iteration = new AtomicInteger(0);

for (int i = 0; i < deleteEntries; i++) {
executor.execute(() -> {
executor.submit(safeRun(() -> {
try {
cursor.asyncDelete(addedEntries.get(iteration.getAndIncrement()), new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
log.info("Successfully deleted cursor");
// Ok
}

@Override
Expand All @@ -410,8 +400,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
} finally {
counter.countDown();
}

});
}));
}

counter.await();
Expand All @@ -426,6 +415,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
public void readEntryComplete(Entry entry, Object ctx) {
successReadEntries.getAndIncrement();
entry.release();
readCounter.countDown();
}

@Override
Expand All @@ -437,14 +427,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
} catch (Exception e) {
e.printStackTrace();
gotException.set(true);
} finally {
readCounter.countDown();
}
}

readCounter.await();

assertEquals(gotException.get(), false);
assertEquals(readEntries, successReadEntries.get());
assertFalse(gotException.get());
assertEquals(successReadEntries.get(), readEntries);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package org.apache.bookkeeper.test;

import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.bookkeeper.client.MockBookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
Expand Down Expand Up @@ -47,6 +49,7 @@ public abstract class MockedBookKeeperTestCase {
protected ClientConfiguration baseClientConf = new ClientConfiguration();

protected OrderedSafeExecutor executor;
protected ExecutorService cachedExecutor;

public MockedBookKeeperTestCase() {
// By default start a 3 bookies cluster
Expand All @@ -69,19 +72,19 @@ public void setUp(Method method) throws Exception {
}

executor = new OrderedSafeExecutor(2, "test");
cachedExecutor = Executors.newCachedThreadPool();
factory = new ManagedLedgerFactoryImpl(bkc, zkc);
}

@AfterMethod
public void tearDown(Method method) throws Exception {
LOG.info("@@@@@@@@@ stopping " + method);
System.gc();
factory.shutdown();
factory = null;
stopBookKeeper();
stopZooKeeper();
executor.shutdown();
System.gc();
cachedExecutor.shutdown();
LOG.info("--------- stopped {}", method);
}

Expand Down
Loading

0 comments on commit d876f75

Please sign in to comment.