From ae9dbfe9c5f1862af664cb51efaf7446276c16a2 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Wed, 6 Sep 2017 11:24:42 +0800 Subject: [PATCH] ISSUE #166: Code cleanup for 0.5.0 release Descriptions of the changes in this PR: - add InterfaceAudience and InterfaceStability for public API to inform people what changes would be expected for those interface. - avoid using guava classes in public API since we will provide a shaded jar for distributedlog-core - enable ImportOrder checkstyle rule in some modules - move `org.apache.distributedlog.io` to `distributedlog-common` module - rename `setReadyToFlush` to `flush` and rename `flushAndSync` to `commit` for the new API Author: Sijie Guo Reviewers: Jia Zhai , Leigh Stewart This closes #172 from sijie/finalize_api, closes #166 --- pom.xml | 33 ++++ .../bookkeeper/client/BookKeeperAccessor.java | 2 +- .../bookkeeper/client/LedgerReader.java | 82 +++++---- .../bookkeeper/client/package-info.java | 22 +++ .../distributedlog/BKAsyncLogWriter.java | 3 +- .../BKDistributedLogManager.java | 43 +++-- .../BKDistributedLogNamespace.java | 39 +++-- .../distributedlog/BKLogSegmentWriter.java | 2 +- .../distributedlog/BKSyncLogWriter.java | 8 +- .../DistributedLogConfiguration.java | 24 ++- .../apache/distributedlog/EntryBuffer.java | 1 - .../distributedlog/EnvelopedEntryWriter.java | 2 +- .../distributedlog/api/AsyncLogReader.java | 4 + .../distributedlog/api/AsyncLogWriter.java | 22 ++- .../api/DistributedLogManager.java | 158 ++++++++++++++---- .../apache/distributedlog/api/LogReader.java | 11 +- .../apache/distributedlog/api/LogWriter.java | 19 +-- .../distributedlog/api/MetadataAccessor.java | 13 +- .../api/namespace/Namespace.java | 20 +-- .../api/namespace/NamespaceBuilder.java | 9 +- .../distributedlog/TestAsyncReaderWriter.java | 16 +- .../TestBKDistributedLogManager.java | 28 ++-- .../TestBKDistributedLogNamespace.java | 2 +- .../distributedlog/TestBKSyncLogReader.java | 16 +- .../distributedlog/TestRollLogSegments.java | 12 +- 25 files changed, 394 insertions(+), 197 deletions(-) create mode 100644 src/main/java/org/apache/bookkeeper/client/package-info.java diff --git a/pom.xml b/pom.xml index d561c5e2a45..d439cb21300 100644 --- a/pom.xml +++ b/pom.xml @@ -323,6 +323,39 @@ + + org.apache.maven.plugins + maven-checkstyle-plugin + ${maven-checkstyle-plugin.version} + + + com.puppycrawl.tools + checkstyle + ${puppycrawl.checkstyle.version} + + + org.apache.distributedlog + distributedlog-build-tools + ${project.version} + + + + distributedlog/checkstyle.xml + distributedlog/suppressions-core.xml + true + true + false + true + + + + validate + + check + + + + diff --git a/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java b/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java index dcba24eaedf..8978ff3f1e7 100644 --- a/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java +++ b/src/main/java/org/apache/bookkeeper/client/BookKeeperAccessor.java @@ -21,7 +21,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; /** - * Accessor to protected methods in bookkeeper + * Accessor to protected methods in bookkeeper. */ public class BookKeeperAccessor { diff --git a/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/src/main/java/org/apache/bookkeeper/client/LedgerReader.java index 2ab2bca9731..b1029129312 100644 --- a/src/main/java/org/apache/bookkeeper/client/LedgerReader.java +++ b/src/main/java/org/apache/bookkeeper/client/LedgerReader.java @@ -35,14 +35,15 @@ import org.slf4j.LoggerFactory; /** - * Reader used for DL tools to read entries - * - * TODO: move this to bookkeeper project? + * Reader used for DL tools to read entries. */ public class LedgerReader { - static final Logger logger = LoggerFactory.getLogger(LedgerReader.class); + private static final Logger logger = LoggerFactory.getLogger(LedgerReader.class); + /** + * Read Result Holder. + */ public static class ReadResult { final long entryId; final int rc; @@ -79,7 +80,7 @@ public LedgerReader(BookKeeper bkc) { bookieClient = bkc.getBookieClient(); } - static public SortedMap> bookiesForLedger(final LedgerHandle lh) { + public static SortedMap> bookiesForLedger(final LedgerHandle lh) { return lh.getLedgerMetadata().getEnsembles(); } @@ -102,7 +103,8 @@ public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ByteBuf toRet = Unpooled.copiedBuffer(content); rr = new ReadResult<>(eid, BKException.Code.OK, toRet, bookieAddress.getSocketAddress()); } catch (BKException.BKDigestMatchException e) { - rr = new ReadResult<>(eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress()); + rr = new ReadResult<>( + eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress()); } finally { buffer.release(); } @@ -151,27 +153,24 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration entri } }; - ReadLastConfirmedOp.LastConfirmedDataCallback readLACCallback = new ReadLastConfirmedOp.LastConfirmedDataCallback() { - @Override - public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData recoveryData) { - if (BKException.Code.OK != rc) { - callback.operationComplete(rc, resultList); - return; - } + ReadLastConfirmedOp.LastConfirmedDataCallback readLACCallback = (rc, recoveryData) -> { + if (BKException.Code.OK != rc) { + callback.operationComplete(rc, resultList); + return; + } - if (LedgerHandle.INVALID_ENTRY_ID >= recoveryData.lastAddConfirmed) { - callback.operationComplete(BKException.Code.OK, resultList); - return; - } + if (LedgerHandle.INVALID_ENTRY_ID >= recoveryData.lastAddConfirmed) { + callback.operationComplete(BKException.Code.OK, resultList); + return; + } - long entryId = recoveryData.lastAddConfirmed; - PendingReadOp readOp = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, readCallback, entryId); - try { - readOp.initiate(); - } catch (Throwable t) { - logger.error("Failed to initialize pending read entry {} for ledger {} : ", - new Object[] { entryId, lh.getLedgerMetadata(), t }); - } + long entryId = recoveryData.lastAddConfirmed; + PendingReadOp readOp = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, readCallback, entryId); + try { + readOp.initiate(); + } catch (Throwable t) { + logger.error("Failed to initialize pending read entry {} for ledger {} : ", + new Object[] { entryId, lh.getLedgerMetadata(), t }); } }; // Read Last AddConfirmed @@ -183,26 +182,23 @@ public void readLacs(final LedgerHandle lh, long eid, List writeSet = lh.distributionSchedule.getWriteSet(eid); final AtomicInteger numBookies = new AtomicInteger(writeSet.size()); final Set> readResults = new HashSet>(); - ReadEntryCallback readEntryCallback = new ReadEntryCallback() { - @Override - public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) { - InetSocketAddress bookieAddress = (InetSocketAddress) ctx; - ReadResult rr; - if (BKException.Code.OK != rc) { - rr = new ReadResult(eid, rc, null, bookieAddress); - } else { - try { - DigestManager.RecoveryData data = lh.macManager.verifyDigestAndReturnLastConfirmed(buffer); - rr = new ReadResult(eid, BKException.Code.OK, data.lastAddConfirmed, bookieAddress); - } catch (BKException.BKDigestMatchException e) { - rr = new ReadResult(eid, BKException.Code.DigestMatchException, null, bookieAddress); - } - } - readResults.add(rr); - if (numBookies.decrementAndGet() == 0) { - callback.operationComplete(BKException.Code.OK, readResults); + ReadEntryCallback readEntryCallback = (rc, lid, eid1, buffer, ctx) -> { + InetSocketAddress bookieAddress = (InetSocketAddress) ctx; + ReadResult rr; + if (BKException.Code.OK != rc) { + rr = new ReadResult(eid1, rc, null, bookieAddress); + } else { + try { + DigestManager.RecoveryData data = lh.macManager.verifyDigestAndReturnLastConfirmed(buffer); + rr = new ReadResult(eid1, BKException.Code.OK, data.lastAddConfirmed, bookieAddress); + } catch (BKException.BKDigestMatchException e) { + rr = new ReadResult(eid1, BKException.Code.DigestMatchException, null, bookieAddress); } } + readResults.add(rr); + if (numBookies.decrementAndGet() == 0) { + callback.operationComplete(BKException.Code.OK, readResults); + } }; ArrayList ensemble = lh.getLedgerMetadata().getEnsemble(eid); diff --git a/src/main/java/org/apache/bookkeeper/client/package-info.java b/src/main/java/org/apache/bookkeeper/client/package-info.java new file mode 100644 index 00000000000..ca571079fdb --- /dev/null +++ b/src/main/java/org/apache/bookkeeper/client/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * DistributedLog overrides on bookkeeper client. + */ +package org.apache.bookkeeper.client; \ No newline at end of file diff --git a/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java b/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java index 62b32f24803..abcc4c4be91 100644 --- a/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java +++ b/src/main/java/org/apache/distributedlog/BKAsyncLogWriter.java @@ -463,7 +463,8 @@ CompletableFuture flushAndCommit() { return writerFuture.thenCompose(writer -> writer.flushAndCommit()); } - CompletableFuture markEndOfStream() { + @Override + public CompletableFuture markEndOfStream() { final Stopwatch stopwatch = Stopwatch.createStarted(); CompletableFuture logSegmentWriterFuture; synchronized (this) { diff --git a/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java b/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java index c837ad2b487..02896240b49 100644 --- a/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java +++ b/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java @@ -17,9 +17,18 @@ */ package org.apache.distributedlog; +import static org.apache.distributedlog.namespace.NamespaceDriver.Role.READER; +import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -62,16 +71,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; - -import static org.apache.distributedlog.namespace.NamespaceDriver.Role.READER; -import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER; - /** *

Metrics

*
    @@ -79,10 +78,6 @@ * See {@link BKAsyncLogWriter} for detail stats. *
  • `async_reader/*`: all asyncrhonous reader related metrics are exposed under scope `async_reader`. * See {@link BKAsyncLogReader} for detail stats. - *
  • `writer_future_pool/*`: metrics about the future pools that used by writers are exposed under - * scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats. - *
  • `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under - * scope `reader_future_pool`. See {@link MonitoredFuturePool} for detail stats. *
  • `lock/*`: metrics about the locks used by writers. See {@link ZKDistributedLock} for detail * stats. *
  • `read_lock/*`: metrics about the locks used by readers. See {@link ZKDistributedLock} for @@ -476,6 +471,11 @@ public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException { */ @Override public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException { + return openLogWriter(); + } + + @Override + public BKSyncLogWriter openLogWriter() throws IOException { checkClosedOrInError("startLogSegmentNonPartitioned"); BKSyncLogWriter writer = new BKSyncLogWriter(conf, dynConf, this); boolean success = false; @@ -606,16 +606,27 @@ private CompletableFuture getDLSNNotLessThanTxIdInSegment(final long fromT * @throws IOException if a stream cannot be found. */ @Override - public LogReader getInputStream(long fromTxnId) + public LogReader openLogReader(long fromTxnId) throws IOException { return getInputStreamInternal(fromTxnId); } @Override - public LogReader getInputStream(DLSN fromDLSN) throws IOException { + public LogReader openLogReader(DLSN fromDLSN) throws IOException { return getInputStreamInternal(fromDLSN, Optional.absent()); } + @Override + public LogReader getInputStream(long fromTxnId) + throws IOException { + return openLogReader(fromTxnId); + } + + @Override + public LogReader getInputStream(DLSN fromDLSN) throws IOException { + return openLogReader(fromDLSN); + } + @Override public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException { return Utils.ioResult(openAsyncLogReader(fromTxnId)); diff --git a/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java index 60ad91668d5..0264178ecbc 100644 --- a/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java +++ b/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java @@ -17,8 +17,16 @@ */ package org.apache.distributedlog; -import com.google.common.base.Optional; +import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER; +import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName; + import com.google.common.base.Ticker; +import java.io.IOException; +import java.net.URI; +import java.util.Iterator; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.distributedlog.acl.AccessControlManager; import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.api.namespace.Namespace; @@ -28,7 +36,6 @@ import org.apache.distributedlog.exceptions.InvalidStreamNameException; import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.injector.AsyncFailureInjector; -import org.apache.distributedlog.io.AsyncCloseable; import org.apache.distributedlog.logsegment.LogSegmentMetadataCache; import org.apache.distributedlog.namespace.NamespaceDriver; import org.apache.distributedlog.util.ConfUtils; @@ -41,15 +48,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; -import java.util.Iterator; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER; -import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName; - /** * BKDistributedLogNamespace is the default implementation of {@link Namespace}. It uses * zookeeper for metadata storage and bookkeeper for data storage. @@ -155,15 +153,15 @@ public void deleteLog(String logName) throws InvalidStreamNameException, LogNotFoundException, IOException { checkState(); logName = validateAndNormalizeName(logName); - Optional uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName)); + com.google.common.base.Optional uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName)); if (!uri.isPresent()) { throw new LogNotFoundException("Log " + logName + " isn't found."); } DistributedLogManager dlm = openLogInternal( uri.get(), logName, - Optional.absent(), - Optional.absent()); + Optional.empty(), + Optional.empty()); dlm.delete(); } @@ -171,9 +169,9 @@ public void deleteLog(String logName) public DistributedLogManager openLog(String logName) throws InvalidStreamNameException, IOException { return openLog(logName, - Optional.absent(), - Optional.absent(), - Optional.absent()); + Optional.empty(), + Optional.empty(), + Optional.empty()); } @Override @@ -184,7 +182,7 @@ public DistributedLogManager openLog(String logName, throws InvalidStreamNameException, IOException { checkState(); logName = validateAndNormalizeName(logName); - Optional uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName)); + com.google.common.base.Optional uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName)); if (!uri.isPresent()) { throw new LogNotFoundException("Log " + logName + " isn't found."); } @@ -199,7 +197,7 @@ public DistributedLogManager openLog(String logName, public boolean logExists(String logName) throws IOException, IllegalArgumentException { checkState(); - Optional uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName)); + com.google.common.base.Optional uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName)); if (uri.isPresent()) { try { Utils.ioResult(driver.getLogStreamMetadataStore(WRITER) @@ -281,7 +279,8 @@ protected DistributedLogManager openLogInternal( failureInjector, /* Failure Injector */ statsLogger, /* Stats Logger */ perLogStatsLogger, /* Per Log Stats Logger */ - Optional.absent() /* shared resources, we don't need to close any resources in dlm */ + com.google.common.base.Optional.absent() + /* shared resources, we don't need to close any resources in dlm */ ); } diff --git a/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java index 48a0dafd963..e2178dca29e 100644 --- a/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java +++ b/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java @@ -995,7 +995,7 @@ private void checkWriteLock() throws LockingException { /** * Transmit the current buffer to bookkeeper. - * Synchronised at the class. #write() and #setReadyToFlush() + * Synchronised at the class. #write() and #flush() * are never called at the same time. * * NOTE: This method should only throw known exceptions so that we don't accidentally diff --git a/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java b/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java index 15296b24092..f16914f64c4 100644 --- a/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java +++ b/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java @@ -68,8 +68,8 @@ public void markEndOfStream() throws IOException { * New data can be still written to the stream while flush is ongoing. */ @Override - public long setReadyToFlush() throws IOException { - checkClosedOrInError("setReadyToFlush"); + public long flush() throws IOException { + checkClosedOrInError("flush"); long highestTransactionId = 0; BKLogSegmentWriter writer = getCachedLogWriter(); if (null != writer) { @@ -86,8 +86,8 @@ public long setReadyToFlush() throws IOException { * becomes full or a certain period of time is elapsed. */ @Override - public long flushAndSync() throws IOException { - checkClosedOrInError("flushAndSync"); + public long commit() throws IOException { + checkClosedOrInError("commit"); LOG.debug("FlushAndSync Started"); long highestTransactionId = 0; diff --git a/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java b/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java index 3269f57f14e..613e60c48ea 100644 --- a/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java +++ b/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java @@ -20,6 +20,10 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import java.net.URL; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Set; import org.apache.distributedlog.bk.QuorumConfig; import org.apache.distributedlog.feature.DefaultFeatureProvider; import org.apache.distributedlog.api.namespace.NamespaceBuilder; @@ -39,13 +43,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.net.URL; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Set; - /** * DistributedLog Configuration. + * *

    * DistributedLog configuration is basically a properties based configuration, which extends from * Apache commons {@link CompositeConfiguration}. All the DL settings are in camel case and prefixed @@ -527,8 +527,22 @@ public void loadConf(Configuration otherConf) { * Load whitelisted stream configuration from another configuration object * * @param streamConfiguration stream configuration overrides + * @Deprecated since 0.5.0, in favor of using {@link #loadStreamConf(java.util.Optional)} */ public void loadStreamConf(Optional streamConfiguration) { + if (streamConfiguration.isPresent()) { + loadStreamConf(java.util.Optional.of(streamConfiguration.get())); + } else { + loadStreamConf(java.util.Optional.empty()); + } + } + + /** + * Load whitelisted stream configuration from another configuration object. + * + * @param streamConfiguration stream configuration overrides + */ + public void loadStreamConf(java.util.Optional streamConfiguration) { if (!streamConfiguration.isPresent()) { return; } diff --git a/src/main/java/org/apache/distributedlog/EntryBuffer.java b/src/main/java/org/apache/distributedlog/EntryBuffer.java index a881df877e2..68d76ea4da8 100644 --- a/src/main/java/org/apache/distributedlog/EntryBuffer.java +++ b/src/main/java/org/apache/distributedlog/EntryBuffer.java @@ -20,7 +20,6 @@ import io.netty.buffer.ByteBuf; import java.io.IOException; import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException; -import org.apache.distributedlog.io.TransmitListener; /** * Write representation of a {@link Entry}. diff --git a/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java index cc6e9410869..86cc56ebb69 100644 --- a/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java +++ b/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; /** - * {@link org.apache.distributedlog.io.Buffer} based log record set writer. + * {@link ByteBuf} based log record set writer. */ class EnvelopedEntryWriter implements Writer { diff --git a/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java b/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java index 3838bf75b05..217236e7f1b 100644 --- a/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java +++ b/src/main/java/org/apache/distributedlog/api/AsyncLogReader.java @@ -20,9 +20,13 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; import org.apache.distributedlog.LogRecordWithDLSN; import org.apache.distributedlog.io.AsyncCloseable; +@Public +@Evolving public interface AsyncLogReader extends AsyncCloseable { /** diff --git a/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java b/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java index 9e12de22f4e..8bb45a213f2 100644 --- a/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java +++ b/src/main/java/org/apache/distributedlog/api/AsyncLogWriter.java @@ -19,11 +19,15 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.LogRecord; import org.apache.distributedlog.io.AsyncAbortable; import org.apache.distributedlog.io.AsyncCloseable; +@Public +@Evolving public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable { /** @@ -31,7 +35,7 @@ public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable { * * @return last committed transaction id. */ - public long getLastTxId(); + long getLastTxId(); /** * Write a log record to the stream. @@ -40,7 +44,7 @@ public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable { * @return A Future which contains a DLSN if the record was successfully written * or an exception if the write fails */ - public CompletableFuture write(LogRecord record); + CompletableFuture write(LogRecord record); /** * Write log records to the stream in bulk. Each future in the list represents the result of @@ -51,7 +55,7 @@ public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable { * @return A Future which contains a list of Future DLSNs if the record was successfully written * or an exception if the operation fails. */ - public CompletableFuture>> writeBulk(List record); + CompletableFuture>> writeBulk(List record); /** * Truncate the log until dlsn. @@ -61,10 +65,18 @@ public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable { * @return A Future indicates whether the operation succeeds or not, or an exception * if the truncation fails. */ - public CompletableFuture truncate(DLSN dlsn); + CompletableFuture truncate(DLSN dlsn); + + /** + * Seal the log stream. + * + * @return a future indicates whether the stream is sealed or not. The final transaction id is returned + * if the stream is sealed, otherwise an exception is returned. + */ + CompletableFuture markEndOfStream(); /** * Get the name of the stream this writer writes data to */ - public String getStreamName(); + String getStreamName(); } diff --git a/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java b/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java index 60f629de943..46f8b35fb5e 100644 --- a/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java +++ b/src/main/java/org/apache/distributedlog/api/DistributedLogManager.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; import org.apache.distributedlog.AppendOnlyStreamReader; import org.apache.distributedlog.AppendOnlyStreamWriter; import org.apache.distributedlog.DLSN; @@ -38,6 +40,8 @@ * each conceptual place of storage corresponds to exactly one instance of * this class, which is created when the EditLog is first opened. */ +@Public +@Evolving public interface DistributedLogManager extends AsyncCloseable, Closeable { /** @@ -53,6 +57,10 @@ public interface DistributedLogManager extends AsyncCloseable, Closeable { */ public NamespaceDriver getNamespaceDriver(); + // + // Log Segment Related Operations + // + /** * Get log segments. * @@ -77,6 +85,10 @@ public interface DistributedLogManager extends AsyncCloseable, Closeable { */ public void unregisterListener(LogSegmentListener listener); + // + // Writer & Reader Operations + // + /** * Open async log writer to write records to the log stream. * @@ -84,10 +96,19 @@ public interface DistributedLogManager extends AsyncCloseable, Closeable { */ public CompletableFuture openAsyncLogWriter(); + /** + * Open sync log writer to write records to the log stream. + * + * @return sync log writer + * @throws IOException when fails to open a sync log writer. + */ + public LogWriter openLogWriter() throws IOException; + /** * Begin writing to the log stream identified by the name * * @return the writer interface to generate log records + * @Deprecated since 0.5.0, in favor of using {@link #openLogWriter()} */ public LogWriter startLogSegmentNonPartitioned() throws IOException; @@ -95,23 +116,45 @@ public interface DistributedLogManager extends AsyncCloseable, Closeable { * Begin writing to the log stream identified by the name * * @return the writer interface to generate log records + * @Deprecated since 0.5.0, in favor of using {@link #openAsyncLogWriter()} */ - // @Deprecated public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException; /** - * Begin appending to the end of the log stream which is being treated as a sequence of bytes + * Open an sync log reader to read records from a log starting from fromTxnId. * - * @return the writer interface to generate log records + * @param fromTxnId + * transaction id to start reading from + * @return sync log reader */ - public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException; + public LogReader openLogReader(long fromTxnId) throws IOException; /** - * Get a reader to read a log stream as a sequence of bytes + * Open an async log reader to read records from a log starting from fromDLSN * - * @return the writer interface to generate log records + * @param fromDLSN + * dlsn to start reading from + * @return async log reader */ - public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException; + public LogReader openLogReader(DLSN fromDLSN) throws IOException; + + /** + * Open an async log reader to read records from a log starting from fromTxnId. + * + * @param fromTxnId + * transaction id to start reading from + * @return async log reader + */ + public CompletableFuture openAsyncLogReader(long fromTxnId); + + /** + * Open an async log reader to read records from a log starting from fromDLSN + * + * @param fromDLSN + * dlsn to start reading from + * @return async log reader + */ + public CompletableFuture openAsyncLogReader(DLSN fromDLSN); /** * Get the input stream starting with fromTxnId for the specified log @@ -119,41 +162,61 @@ public interface DistributedLogManager extends AsyncCloseable, Closeable { * @param fromTxnId - the first transaction id we want to read * @return the stream starting with transaction fromTxnId * @throws IOException if a stream cannot be found. + * @Deprecated since 0.5.0, in favor of using {@link #openLogReader(long)} */ public LogReader getInputStream(long fromTxnId) throws IOException; + /** + * Get the input stream starting with fromTxnId for the specified log + * + * @param fromDLSN - the first DLSN we want to read + * @return the stream starting with DLSN + * @throws IOException if a stream cannot be found. + * @Deprecated since 0.5.0, in favor of using {@link #openLogReader(DLSN)} + */ public LogReader getInputStream(DLSN fromDLSN) throws IOException; /** - * Open an async log reader to read records from a log starting from fromTxnId. + * Get an async log reader to read records from a log starting from fromTxnId. * * @param fromTxnId * transaction id to start reading from * @return async log reader + * @throws IOException when fails to open an async log reader. + * @see #openAsyncLogReader(long) + * @Deprecated it is deprecated since 0.5.0, in favor of using {@link #openAsyncLogReader(long)} */ - public CompletableFuture openAsyncLogReader(long fromTxnId); + public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException; /** - * Open an async log reader to read records from a log starting from fromDLSN + * Get an async log reader to read records from a log starting from fromDLSN. * * @param fromDLSN * dlsn to start reading from * @return async log reader + * @throws IOException when fails to open an async log reader. + * @see #openAsyncLogReader(DLSN) + * @Deprecated it is deprecated since 0.5.0, in favor of using {@link #openAsyncLogReader(DLSN)} */ - public CompletableFuture openAsyncLogReader(DLSN fromDLSN); - - // @Deprecated - public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException; - - // @Deprecated public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException; + /** + * Get a log reader with lock starting from fromDLSN. + * + *

    If two readers tried to open using same subscriberId, one would succeed, while the other + * will be blocked until it gets the lock. + * + * @param fromDLSN + * start dlsn + * @return async log reader + */ public CompletableFuture getAsyncLogReaderWithLock(DLSN fromDLSN); /** * Get a log reader with lock starting from fromDLSN and using subscriberId. - * If two readers tried to open using same subscriberId, one would succeed, while the other + * + *

    If two readers tried to open using same subscriberId, one would succeed, while the other * will be blocked until it gets the lock. * * @param fromDLSN @@ -169,7 +232,7 @@ public LogReader getInputStream(long fromTxnId) * its last commit position recorded in subscription store. If no last commit position found * in subscription store, it would start reading from head of the stream. * - * If the two readers tried to open using same subscriberId, one would succeed, while the other + *

    If the two readers tried to open using same subscriberId, one would succeed, while the other * will be blocked until it gets the lock. * * @param subscriberId @@ -178,6 +241,33 @@ public LogReader getInputStream(long fromTxnId) */ public CompletableFuture getAsyncLogReaderWithLock(String subscriberId); + // + // Stream writer and reader + // + + /** + * Begin appending to the end of the log stream which is being treated as a sequence of bytes + * + * @return the writer interface to generate log records + */ + public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException; + + /** + * Get a reader to read a log stream as a sequence of bytes + * + * @return the writer interface to generate log records + */ + public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException; + + // + // Metadata Operations: + // + // - retrieve head or tail records + // - get log record count + // - delete logs + // - recover logs + // + /** * Get the {@link DLSN} of first log record whose transaction id is not less than transactionId. * @@ -196,6 +286,13 @@ public LogReader getInputStream(long fromTxnId) public LogRecordWithDLSN getLastLogRecord() throws IOException; + /** + * Get Latest log record with DLSN in the log - async + * + * @return latest log record with DLSN + */ + public CompletableFuture getLastLogRecordAsync(); + /** * Get the earliest Transaction Id available in the log * @@ -212,21 +309,6 @@ public LogRecordWithDLSN getLastLogRecord() */ public long getLastTxId() throws IOException; - /** - * Get Latest DLSN in the log - * - * @return last dlsn - * @throws IOException - */ - public DLSN getLastDLSN() throws IOException; - - /** - * Get Latest log record with DLSN in the log - async - * - * @return latest log record with DLSN - */ - public CompletableFuture getLastLogRecordAsync(); - /** * Get Latest Transaction Id in the log - async * @@ -241,6 +323,14 @@ public LogRecordWithDLSN getLastLogRecord() */ public CompletableFuture getFirstDLSNAsync(); + /** + * Get Latest DLSN in the log + * + * @return last dlsn + * @throws IOException + */ + public DLSN getLastDLSN() throws IOException; + /** * Get Latest DLSN in the log - async * @@ -286,6 +376,8 @@ public LogRecordWithDLSN getLastLogRecord() * Delete the log. * * @throws IOException if the deletion fails + * @Deprecated since 0.5.0, in favor of using + * {@link org.apache.distributedlog.api.namespace.Namespace#deleteLog(String)} */ public void delete() throws IOException; diff --git a/src/main/java/org/apache/distributedlog/api/LogReader.java b/src/main/java/org/apache/distributedlog/api/LogReader.java index 631a8a990ac..eb1ef9c41bf 100644 --- a/src/main/java/org/apache/distributedlog/api/LogReader.java +++ b/src/main/java/org/apache/distributedlog/api/LogReader.java @@ -17,15 +17,16 @@ */ package org.apache.distributedlog.api; +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; import org.apache.distributedlog.DLSN; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.LogRecordWithDLSN; import org.apache.distributedlog.io.AsyncCloseable; -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - /** * LogReader is a `synchronous` reader reading records from a DL log. * @@ -163,6 +164,8 @@ * * @see AsyncLogReader */ +@Public +@Evolving public interface LogReader extends Closeable, AsyncCloseable { /** diff --git a/src/main/java/org/apache/distributedlog/api/LogWriter.java b/src/main/java/org/apache/distributedlog/api/LogWriter.java index 46ad1f07a87..e72b36829a7 100644 --- a/src/main/java/org/apache/distributedlog/api/LogWriter.java +++ b/src/main/java/org/apache/distributedlog/api/LogWriter.java @@ -17,17 +17,20 @@ */ package org.apache.distributedlog.api; -import org.apache.distributedlog.LogRecord; -import org.apache.distributedlog.io.Abortable; - import java.io.Closeable; import java.io.IOException; import java.util.List; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.io.Abortable; /* * A generic interface class to support writing log records into * a persistent distributed log. */ +@Public +@Evolving public interface LogWriter extends Closeable, Abortable { /** * Write a log record to the stream. @@ -52,19 +55,15 @@ public interface LogWriter extends Closeable, Abortable { * persistent storage. * The transmission is asynchronous and new data can be still written to the * stream while flushing is performed. - * - * TODO: rename this to flush() */ - public long setReadyToFlush() throws IOException; + public long flush() throws IOException; /** * Flush and sync all data that is ready to be flush - * {@link #setReadyToFlush()} into underlying persistent store. + * {@link #flush()} into underlying persistent store. * @throws IOException - * - * TODO: rename this to commit() */ - public long flushAndSync() throws IOException; + public long commit() throws IOException; /** * Flushes all the data up to this point, diff --git a/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java b/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java index 76ef700e924..d73c99b5b2c 100644 --- a/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java +++ b/src/main/java/org/apache/distributedlog/api/MetadataAccessor.java @@ -17,11 +17,20 @@ */ package org.apache.distributedlog.api; -import org.apache.distributedlog.io.AsyncCloseable; - import java.io.Closeable; import java.io.IOException; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; +import org.apache.distributedlog.io.AsyncCloseable; +/** + * Provide a metadata accessor to access customized metadata associated with logs. + * + * @Deprecated this class is here for legacy reason. It is not recommended to use this class for storing customized + * metadata. + */ +@LimitedPrivate +@Evolving public interface MetadataAccessor extends Closeable, AsyncCloseable { /** * Get the name of the stream managed by this log manager diff --git a/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java b/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java index 818824dbccb..fc3629f29fa 100644 --- a/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java +++ b/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java @@ -17,20 +17,19 @@ */ package org.apache.distributedlog.api.namespace; -import com.google.common.annotations.Beta; -import com.google.common.base.Optional; +import java.io.IOException; +import java.util.Iterator; +import java.util.Optional; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving; +import org.apache.bookkeeper.stats.StatsLogger; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.api.DistributedLogManager; -import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.acl.AccessControlManager; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.callback.NamespaceListener; import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.exceptions.LogNotFoundException; import org.apache.distributedlog.exceptions.InvalidStreamNameException; - -import java.io.IOException; -import java.util.Iterator; - -import org.apache.bookkeeper.stats.StatsLogger; import org.apache.distributedlog.namespace.NamespaceDriver; /** @@ -68,7 +67,8 @@ * @see DistributedLogManager * @since 0.3.32 */ -@Beta +@Public +@Evolving public interface Namespace { /** diff --git a/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java b/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java index 45dc0219d0a..6b01731b0cf 100644 --- a/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java +++ b/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java @@ -18,6 +18,10 @@ package org.apache.distributedlog.api.namespace; import com.google.common.base.Preconditions; +import java.io.IOException; +import java.net.URI; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Stable; import org.apache.distributedlog.BKDistributedLogNamespace; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.DistributedLogConstants; @@ -40,9 +44,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.net.URI; - /** * Builder to construct a Namespace. * The builder takes the responsibility of loading backend according to the uri. @@ -50,6 +51,8 @@ * @see Namespace * @since 0.3.32 */ +@Public +@Stable public class NamespaceBuilder { private static final Logger logger = LoggerFactory.getLogger(NamespaceBuilder.class); diff --git a/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java b/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java index 781e8d0f704..c1181e8a73c 100644 --- a/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java +++ b/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java @@ -316,12 +316,12 @@ private static long writeLogSegment(DistributedLogManager dlm, writer.write(DLMTestUtil.getLargeLogRecordInstance(txid++)); } if (j % flushPerNumRecords == 0 ) { - writer.setReadyToFlush(); - writer.flushAndSync(); + writer.flush(); + writer.commit(); } } - writer.setReadyToFlush(); - writer.flushAndSync(); + writer.flush(); + writer.commit(); writer.close(); return txid; } @@ -1384,7 +1384,7 @@ public void run() { for (int iter = 1; iter <= (2 * idleReaderErrorThreshold / threadSleepTime) ; iter++) { Thread.sleep(threadSleepTime); writer.write(DLMTestUtil.getLargeLogRecordInstance(txid, true)); - writer.setReadyToFlush(); + writer.flush(); } Thread.sleep(threadSleepTime); } @@ -1994,9 +1994,9 @@ public void testCreateLogStreamWithDifferentReplicationFactor() throws Exception // use customized configuration dlm = namespace.openLog( name + "-custom", - Optional.absent(), - Optional.of(dynConf), - Optional.absent()); + java.util.Optional.empty(), + java.util.Optional.of(dynConf), + java.util.Optional.empty()); writer = dlm.startAsyncLogSegmentNonPartitioned(); Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L))); segments = dlm.getLogSegments(); diff --git a/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java index d35389456ec..5fd8fe39b75 100644 --- a/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java +++ b/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java @@ -104,8 +104,8 @@ private void testNonPartitionedWritesInternal(String name, DistributedLogConfigu for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { writer.write(DLMTestUtil.getLogRecordInstance(txid++)); } - writer.setReadyToFlush(); - writer.flushAndSync(); + writer.flush(); + writer.commit(); writer.close(); LogReader reader = dlm.getInputStream(1); @@ -183,8 +183,8 @@ public void testContinuousReaders() throws Exception { LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); out.write(op); } - out.setReadyToFlush(); - out.flushAndSync(); + out.flush(); + out.commit(); out.close(); dlm.close(); @@ -326,8 +326,8 @@ public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); out.write(op); } - out.setReadyToFlush(); - out.flushAndSync(); + out.flush(); + out.commit(); out.close(); long numTrans = DLMTestUtil.getNumberofLogRecords(createNewDLM(conf, name), 1); @@ -353,8 +353,8 @@ public void testContinuousReaderBulk() throws Exception { LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); out.write(op); } - out.setReadyToFlush(); - out.flushAndSync(); + out.flush(); + out.commit(); out.close(); dlm.close(); @@ -410,8 +410,8 @@ public void testContinuousReadersWithEmptyLedgers() throws Exception { LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); out.write(op); } - out.setReadyToFlush(); - out.flushAndSync(); + out.flush(); + out.commit(); out.close(); dlm.close(); @@ -461,8 +461,8 @@ public void testCheckLogExists() throws Exception { for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { writer.write(DLMTestUtil.getLogRecordInstance(txid++)); } - writer.setReadyToFlush(); - writer.flushAndSync(); + writer.flush(); + writer.commit(); writer.close(); dlm.close(); @@ -834,8 +834,8 @@ public void testLastLogRecordWithEmptyLedgers() throws Exception { LogRecord op = DLMTestUtil.getLogRecordInstance(txid); op.setControl(); out.write(op); - out.setReadyToFlush(); - out.flushAndSync(); + out.flush(); + out.commit(); out.abort(); dlm.close(); diff --git a/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java b/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java index 2078a88ff29..ca3e3325088 100644 --- a/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java +++ b/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java @@ -105,7 +105,7 @@ private void createLogPathTest(String logName) throws Exception { try { writer = dlm.startLogSegmentNonPartitioned(); writer.write(DLMTestUtil.getLogRecordInstance(1L)); - writer.flushAndSync(); + writer.commit(); fail("Should fail to write data if stream doesn't exist."); } catch (IOException ioe) { // expected diff --git a/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java b/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java index 07f0db56325..6b2bfada2b4 100644 --- a/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java +++ b/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java @@ -154,8 +154,8 @@ public void testReadRecordsAfterReadAheadCaughtUp() throws Exception { LogRecord record = DLMTestUtil.getLogRecordInstance(i); out.write(record); } - out.setReadyToFlush(); - out.flushAndSync(); + out.flush(); + out.commit(); logger.info("Write first 10 records"); @@ -174,8 +174,8 @@ public void testReadRecordsAfterReadAheadCaughtUp() throws Exception { LogRecord record = DLMTestUtil.getLogRecordInstance(i); out.write(record); } - out.setReadyToFlush(); - out.flushAndSync(); + out.flush(); + out.commit(); logger.info("Write another 10 records"); @@ -222,8 +222,8 @@ public void testReadRecordsWhenReadAheadCatchingUp() throws Exception { LogRecord record = DLMTestUtil.getLogRecordInstance(i); out.write(record); } - out.setReadyToFlush(); - out.flushAndSync(); + out.flush(); + out.commit(); logger.info("Write first 10 records"); @@ -263,8 +263,8 @@ public void testReadRecordsWhenReadAheadCatchingUp2() throws Exception { LogRecord record = DLMTestUtil.getLogRecordInstance(i); out.write(record); } - out.setReadyToFlush(); - out.flushAndSync(); + out.flush(); + out.commit(); final AtomicLong nextTxId = new AtomicLong(11L); logger.info("Write first 10 records"); diff --git a/src/test/java/org/apache/distributedlog/TestRollLogSegments.java b/src/test/java/org/apache/distributedlog/TestRollLogSegments.java index 0111e4d6dbf..7111e074e99 100644 --- a/src/test/java/org/apache/distributedlog/TestRollLogSegments.java +++ b/src/test/java/org/apache/distributedlog/TestRollLogSegments.java @@ -357,8 +357,8 @@ public void testCaughtUpReaderOnLogSegmentRolling() throws Exception { final int numEntries = 5; for (int i = 1; i <= numEntries; i++) { writer.write(DLMTestUtil.getLogRecordInstance(i)); - writer.setReadyToFlush(); - writer.flushAndSync(); + writer.flush(); + writer.commit(); } BKDistributedLogManager readDLM = (BKDistributedLogManager) createNewDLM(confLocal, name); @@ -382,13 +382,13 @@ public void testCaughtUpReaderOnLogSegmentRolling() throws Exception { // write 6th record writer.write(DLMTestUtil.getLogRecordInstance(numEntries + 1)); - writer.setReadyToFlush(); + writer.flush(); // Writer moved to lac = 10, while reader knows lac = 9 and moving to wait on 10 checkAndWaitWriterReaderPosition(perStreamWriter, 10, reader, 10, readLh, 9); // write records without commit to simulate similar failure cases writer.write(DLMTestUtil.getLogRecordInstance(numEntries + 2)); - writer.setReadyToFlush(); + writer.flush(); // Writer moved to lac = 11, while reader knows lac = 10 and moving to wait on 11 checkAndWaitWriterReaderPosition(perStreamWriter, 11, reader, 11, readLh, 10); @@ -415,8 +415,8 @@ public void testCaughtUpReaderOnLogSegmentRolling() throws Exception { BKSyncLogWriter anotherWriter = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); anotherWriter.write(DLMTestUtil.getLogRecordInstance(numEntries + 3)); - anotherWriter.setReadyToFlush(); - anotherWriter.flushAndSync(); + anotherWriter.flush(); + anotherWriter.commit(); anotherWriter.closeAndComplete(); for (long i = numEntries + 1; i <= numEntries + 3; i++) {