Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicator stop retrying to read when cursor is already closed #239

Merged
merged 1 commit into from
Feb 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public ConcurrentFindCursorPositionException(String msg) {
}
}

public static class CursorAlreadyClosedException extends ManagedLedgerException {
public CursorAlreadyClosedException(String msg) {
super(msg);
}
}

public static class TooManyRequestsException extends ManagedLedgerException {
public TooManyRequestsException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -47,6 +50,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound;
Expand Down Expand Up @@ -451,7 +455,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx) {
checkArgument(numberOfEntriesToRead > 0);
if (STATE_UPDATER.get(this) == State.Closed) {
callback.readEntriesFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,22 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.util.Rate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yahoo.pulsar.common.policies.data.ReplicatorStats;
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import com.yahoo.pulsar.client.api.MessageId;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.impl.Backoff;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.client.impl.MessageImpl;
import com.yahoo.pulsar.client.impl.ProducerImpl;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.client.impl.SendCallback;
import com.yahoo.pulsar.common.policies.data.ReplicatorStats;

import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
Expand Down Expand Up @@ -488,7 +489,13 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

long waitTimeMillis = readFailureBackoff.next();

if (!(exception instanceof TooManyRequestsException)) {
if(exception instanceof CursorAlreadyClosedException) {
log.error("[{}][{} -> {}] Error reading entries because replicator is already deleted and cursor is already closed {}, ({})", topic, localCluster,
remoteCluster, ctx, exception.getMessage(), exception);
// replicator is already deleted and cursor is already closed so, producer should also be stopped
closeProducerAsync();
return;
}else if (!(exception instanceof TooManyRequestsException)) {
log.error("[{}][{} -> {}] Error reading entries at {}. Retrying to read in {}s. ({})", topic, localCluster,
remoteCluster, ctx, waitTimeMillis / 1000.0, exception.getMessage(), exception);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
import org.mockito.Mockito;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.slf4j.Logger;
Expand Down Expand Up @@ -741,6 +744,67 @@ public void testResumptionAfterBacklogRelaxed() throws Exception {
}
}

/**
* It verifies that PersistentReplicator considers CursorAlreadyClosedException as non-retriable-read exception and
* it should closed the producer as cursor is already closed because replicator is already deleted.
*
* @throws Exception
*/
@Test(timeOut = 5000)
public void testCloseReplicatorStartProducer() throws Exception {

DestinationName dest = DestinationName.get("persistent://pulsar/global/ns1/closeCursor");
// Producer on r1
MessageProducer producer1 = new MessageProducer(url1, dest);
// Consumer on r1
MessageConsumer consumer1 = new MessageConsumer(url1, dest);
// Consumer on r2
MessageConsumer consumer2 = new MessageConsumer(url2, dest);

// Replicator for r1 -> r2
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString());
PersistentReplicator replicator = topic.getPersistentReplicator("r2");

// close the cursor
Field cursorField = PersistentReplicator.class.getDeclaredField("cursor");
cursorField.setAccessible(true);
ManagedCursor cursor = (ManagedCursor) cursorField.get(replicator);
cursor.close();
// try to read entries
CountDownLatch latch = new CountDownLatch(1);
producer1.produce(10);
cursor.asyncReadEntriesOrWait(10, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
latch.countDown();
fail("it should have been failed");
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
assertTrue(exception instanceof CursorAlreadyClosedException);
}
}, null);

// replicator-readException: cursorAlreadyClosed
replicator.readEntriesFailed(new CursorAlreadyClosedException("Cursor already closed exception"), null);

// wait replicator producer to be closed
Thread.sleep(1000);

// Replicator producer must be closed
Field producerField = PersistentReplicator.class.getDeclaredField("producer");
producerField.setAccessible(true);
ProducerImpl replicatorProducer = (ProducerImpl) producerField.get(replicator);
assertEquals(replicatorProducer, null);

producer1.close();
consumer1.close();
consumer2.close();

}

private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

}