Skip to content

Commit

Permalink
ISSUE apache#166: Code cleanup for 0.5.0 release
Browse files Browse the repository at this point in the history
Descriptions of the changes in this PR:

- add InterfaceAudience and InterfaceStability for public API to inform people what changes would be expected for those interface.
- avoid using guava classes in public API since we will provide a shaded jar for distributedlog-core
- enable ImportOrder checkstyle rule in some modules
- move `org.apache.distributedlog.io` to `distributedlog-common` module
- rename `setReadyToFlush` to `flush` and rename `flushAndSync` to `commit` for the new API

Author: Sijie Guo <sijie@apache.org>

Reviewers: Jia Zhai <None>, Leigh Stewart <None>

This closes apache#172 from sijie/finalize_api, closes apache#166
  • Loading branch information
sijie authored and jiazhai committed Sep 6, 2017
1 parent 5a5ad46 commit ae9dbfe
Show file tree
Hide file tree
Showing 25 changed files with 394 additions and 197 deletions.
33 changes: 33 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,39 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>${maven-checkstyle-plugin.version}</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>${puppycrawl.checkstyle.version}</version>
</dependency>
<dependency>
<groupId>org.apache.distributedlog</groupId>
<artifactId>distributedlog-build-tools</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<configuration>
<configLocation>distributedlog/checkstyle.xml</configLocation>
<suppressionsLocation>distributedlog/suppressions-core.xml</suppressionsLocation>
<consoleOutput>true</consoleOutput>
<failOnViolation>true</failOnViolation>
<includeResources>false</includeResources>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<executions>
<execution>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;

/**
* Accessor to protected methods in bookkeeper
* Accessor to protected methods in bookkeeper.
*/
public class BookKeeperAccessor {

Expand Down
82 changes: 39 additions & 43 deletions src/main/java/org/apache/bookkeeper/client/LedgerReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,15 @@
import org.slf4j.LoggerFactory;

/**
* Reader used for DL tools to read entries
*
* TODO: move this to bookkeeper project?
* Reader used for DL tools to read entries.
*/
public class LedgerReader {

static final Logger logger = LoggerFactory.getLogger(LedgerReader.class);
private static final Logger logger = LoggerFactory.getLogger(LedgerReader.class);

/**
* Read Result Holder.
*/
public static class ReadResult<T> {
final long entryId;
final int rc;
Expand Down Expand Up @@ -79,7 +80,7 @@ public LedgerReader(BookKeeper bkc) {
bookieClient = bkc.getBookieClient();
}

static public SortedMap<Long, ArrayList<BookieSocketAddress>> bookiesForLedger(final LedgerHandle lh) {
public static SortedMap<Long, ArrayList<BookieSocketAddress>> bookiesForLedger(final LedgerHandle lh) {
return lh.getLedgerMetadata().getEnsembles();
}

Expand All @@ -102,7 +103,8 @@ public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object
ByteBuf toRet = Unpooled.copiedBuffer(content);
rr = new ReadResult<>(eid, BKException.Code.OK, toRet, bookieAddress.getSocketAddress());
} catch (BKException.BKDigestMatchException e) {
rr = new ReadResult<>(eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
rr = new ReadResult<>(
eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress());
} finally {
buffer.release();
}
Expand Down Expand Up @@ -151,27 +153,24 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entri
}
};

ReadLastConfirmedOp.LastConfirmedDataCallback readLACCallback = new ReadLastConfirmedOp.LastConfirmedDataCallback() {
@Override
public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData recoveryData) {
if (BKException.Code.OK != rc) {
callback.operationComplete(rc, resultList);
return;
}
ReadLastConfirmedOp.LastConfirmedDataCallback readLACCallback = (rc, recoveryData) -> {
if (BKException.Code.OK != rc) {
callback.operationComplete(rc, resultList);
return;
}

if (LedgerHandle.INVALID_ENTRY_ID >= recoveryData.lastAddConfirmed) {
callback.operationComplete(BKException.Code.OK, resultList);
return;
}
if (LedgerHandle.INVALID_ENTRY_ID >= recoveryData.lastAddConfirmed) {
callback.operationComplete(BKException.Code.OK, resultList);
return;
}

long entryId = recoveryData.lastAddConfirmed;
PendingReadOp readOp = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, readCallback, entryId);
try {
readOp.initiate();
} catch (Throwable t) {
logger.error("Failed to initialize pending read entry {} for ledger {} : ",
new Object[] { entryId, lh.getLedgerMetadata(), t });
}
long entryId = recoveryData.lastAddConfirmed;
PendingReadOp readOp = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, readCallback, entryId);
try {
readOp.initiate();
} catch (Throwable t) {
logger.error("Failed to initialize pending read entry {} for ledger {} : ",
new Object[] { entryId, lh.getLedgerMetadata(), t });
}
};
// Read Last AddConfirmed
Expand All @@ -183,26 +182,23 @@ public void readLacs(final LedgerHandle lh, long eid,
List<Integer> writeSet = lh.distributionSchedule.getWriteSet(eid);
final AtomicInteger numBookies = new AtomicInteger(writeSet.size());
final Set<ReadResult<Long>> readResults = new HashSet<ReadResult<Long>>();
ReadEntryCallback readEntryCallback = new ReadEntryCallback() {
@Override
public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) {
InetSocketAddress bookieAddress = (InetSocketAddress) ctx;
ReadResult<Long> rr;
if (BKException.Code.OK != rc) {
rr = new ReadResult<Long>(eid, rc, null, bookieAddress);
} else {
try {
DigestManager.RecoveryData data = lh.macManager.verifyDigestAndReturnLastConfirmed(buffer);
rr = new ReadResult<Long>(eid, BKException.Code.OK, data.lastAddConfirmed, bookieAddress);
} catch (BKException.BKDigestMatchException e) {
rr = new ReadResult<Long>(eid, BKException.Code.DigestMatchException, null, bookieAddress);
}
}
readResults.add(rr);
if (numBookies.decrementAndGet() == 0) {
callback.operationComplete(BKException.Code.OK, readResults);
ReadEntryCallback readEntryCallback = (rc, lid, eid1, buffer, ctx) -> {
InetSocketAddress bookieAddress = (InetSocketAddress) ctx;
ReadResult<Long> rr;
if (BKException.Code.OK != rc) {
rr = new ReadResult<Long>(eid1, rc, null, bookieAddress);
} else {
try {
DigestManager.RecoveryData data = lh.macManager.verifyDigestAndReturnLastConfirmed(buffer);
rr = new ReadResult<Long>(eid1, BKException.Code.OK, data.lastAddConfirmed, bookieAddress);
} catch (BKException.BKDigestMatchException e) {
rr = new ReadResult<Long>(eid1, BKException.Code.DigestMatchException, null, bookieAddress);
}
}
readResults.add(rr);
if (numBookies.decrementAndGet() == 0) {
callback.operationComplete(BKException.Code.OK, readResults);
}
};

ArrayList<BookieSocketAddress> ensemble = lh.getLedgerMetadata().getEnsemble(eid);
Expand Down
22 changes: 22 additions & 0 deletions src/main/java/org/apache/bookkeeper/client/package-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* DistributedLog overrides on bookkeeper client.
*/
package org.apache.bookkeeper.client;
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,8 @@ CompletableFuture<Long> flushAndCommit() {
return writerFuture.thenCompose(writer -> writer.flushAndCommit());
}

CompletableFuture<Long> markEndOfStream() {
@Override
public CompletableFuture<Long> markEndOfStream() {
final Stopwatch stopwatch = Stopwatch.createStarted();
CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture;
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@
*/
package org.apache.distributedlog;

import static org.apache.distributedlog.namespace.NamespaceDriver.Role.READER;
import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
Expand Down Expand Up @@ -62,27 +71,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;

import static org.apache.distributedlog.namespace.NamespaceDriver.Role.READER;
import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;

/**
* <h3>Metrics</h3>
* <ul>
* <li> `log_writer/*`: all asynchronous writer related metrics are exposed under scope `log_writer`.
* See {@link BKAsyncLogWriter} for detail stats.
* <li> `async_reader/*`: all asyncrhonous reader related metrics are exposed under scope `async_reader`.
* See {@link BKAsyncLogReader} for detail stats.
* <li> `writer_future_pool/*`: metrics about the future pools that used by writers are exposed under
* scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats.
* <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under
* scope `reader_future_pool`. See {@link MonitoredFuturePool} for detail stats.
* <li> `lock/*`: metrics about the locks used by writers. See {@link ZKDistributedLock} for detail
* stats.
* <li> `read_lock/*`: metrics about the locks used by readers. See {@link ZKDistributedLock} for
Expand Down Expand Up @@ -476,6 +471,11 @@ public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException {
*/
@Override
public BKSyncLogWriter startLogSegmentNonPartitioned() throws IOException {
return openLogWriter();
}

@Override
public BKSyncLogWriter openLogWriter() throws IOException {
checkClosedOrInError("startLogSegmentNonPartitioned");
BKSyncLogWriter writer = new BKSyncLogWriter(conf, dynConf, this);
boolean success = false;
Expand Down Expand Up @@ -606,16 +606,27 @@ private CompletableFuture<DLSN> getDLSNNotLessThanTxIdInSegment(final long fromT
* @throws IOException if a stream cannot be found.
*/
@Override
public LogReader getInputStream(long fromTxnId)
public LogReader openLogReader(long fromTxnId)
throws IOException {
return getInputStreamInternal(fromTxnId);
}

@Override
public LogReader getInputStream(DLSN fromDLSN) throws IOException {
public LogReader openLogReader(DLSN fromDLSN) throws IOException {
return getInputStreamInternal(fromDLSN, Optional.<Long>absent());
}

@Override
public LogReader getInputStream(long fromTxnId)
throws IOException {
return openLogReader(fromTxnId);
}

@Override
public LogReader getInputStream(DLSN fromDLSN) throws IOException {
return openLogReader(fromDLSN);
}

@Override
public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException {
return Utils.ioResult(openAsyncLogReader(fromTxnId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,16 @@
*/
package org.apache.distributedlog;

import com.google.common.base.Optional;
import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName;

import com.google.common.base.Ticker;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.distributedlog.acl.AccessControlManager;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
Expand All @@ -28,7 +36,6 @@
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.util.ConfUtils;
Expand All @@ -41,15 +48,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName;

/**
* BKDistributedLogNamespace is the default implementation of {@link Namespace}. It uses
* zookeeper for metadata storage and bookkeeper for data storage.
Expand Down Expand Up @@ -155,25 +153,25 @@ public void deleteLog(String logName)
throws InvalidStreamNameException, LogNotFoundException, IOException {
checkState();
logName = validateAndNormalizeName(logName);
Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
com.google.common.base.Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
if (!uri.isPresent()) {
throw new LogNotFoundException("Log " + logName + " isn't found.");
}
DistributedLogManager dlm = openLogInternal(
uri.get(),
logName,
Optional.<DistributedLogConfiguration>absent(),
Optional.<DynamicDistributedLogConfiguration>absent());
Optional.empty(),
Optional.empty());
dlm.delete();
}

@Override
public DistributedLogManager openLog(String logName)
throws InvalidStreamNameException, IOException {
return openLog(logName,
Optional.<DistributedLogConfiguration>absent(),
Optional.<DynamicDistributedLogConfiguration>absent(),
Optional.<StatsLogger>absent());
Optional.empty(),
Optional.empty(),
Optional.empty());
}

@Override
Expand All @@ -184,7 +182,7 @@ public DistributedLogManager openLog(String logName,
throws InvalidStreamNameException, IOException {
checkState();
logName = validateAndNormalizeName(logName);
Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
com.google.common.base.Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
if (!uri.isPresent()) {
throw new LogNotFoundException("Log " + logName + " isn't found.");
}
Expand All @@ -199,7 +197,7 @@ public DistributedLogManager openLog(String logName,
public boolean logExists(String logName)
throws IOException, IllegalArgumentException {
checkState();
Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
com.google.common.base.Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
if (uri.isPresent()) {
try {
Utils.ioResult(driver.getLogStreamMetadataStore(WRITER)
Expand Down Expand Up @@ -281,7 +279,8 @@ protected DistributedLogManager openLogInternal(
failureInjector, /* Failure Injector */
statsLogger, /* Stats Logger */
perLogStatsLogger, /* Per Log Stats Logger */
Optional.<AsyncCloseable>absent() /* shared resources, we don't need to close any resources in dlm */
com.google.common.base.Optional.absent()
/* shared resources, we don't need to close any resources in dlm */
);
}

Expand Down
Loading

0 comments on commit ae9dbfe

Please sign in to comment.