Skip to content

Commit

Permalink
EventHubs integration part 2 - spotbugs (Azure#3105)
Browse files Browse the repository at this point in the history
This PR includes fix for SpotBugs and also corrects some CheckStyle errors left from previous PR (EventHubs integration part 1 Azure#3100):

* fix<SpotBugs>: all 6 P1 bugs

* fix<SpotBugs>: supression and refactor

* fix<SpotBugs>: update spotbugs-exclude with comment for EH

* refactor(SpotBugs): import static

* fix<checkstyle>: access can be package-private

* fix<checkstyle>: modifier public is redundant for interface methods

* fix<checkstyle>: modifier public is redundant for inner class of interface and modifier static is redundant for inner class of interface and inner enum

* fix<checkstyle>: modifier final is redundant for final class

* fix(stylecheck): EmptyBlock

* fix(CheckStyle): EmptyBlock and ConstantName

* fix(CheckStyle): MethodName

* fix(CheckStyle): ConstantName

* fix(CheckStyle): checkstyle suppressions file

* fix(CheckStyle): refactor Inner Assignment

* fix(CheckStyle): refactor Empty Block

* fix(CheckStyle):  update Check Style Suppressions

* fix(CheckStyle): after update to checkstyle 8.18, final modifier modifier

* docs(CheckStyle): clarity the comment, no functional things involved
  • Loading branch information
mssfang authored and sima-zhu committed Mar 21, 2019
1 parent 34f1e57 commit 30e48df
Show file tree
Hide file tree
Showing 41 changed files with 212 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@
<Bug pattern="UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD"/>
</Match>

<!-- Public field already exists in the public API surface area even
though it should be final. -->
<Match>
<Class name="com.microsoft.azure.eventhubs.impl.EventHubClientImpl"/>
<Field name="USER_AGENT"/>
<Bug pattern="MS_SHOULD_BE_FINAL"/>
</Match>

<!-- These KeyVault classes are publicly released APIs that intentionally return null rather
than an empty array. -->
<Match>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,9 +525,7 @@ private boolean renewLeaseInternal(CompleteLease lease) throws StorageException
leaseBlob.renewLease(AccessCondition.generateLeaseCondition(azLease.getToken()), this.renewRequestOptions, null);
result = true;
} catch (StorageException se) {
if (wasLeaseLost(se, azLease.getPartitionId())) {
// leave result as false
} else {
if (!wasLeaseLost(se, azLease.getPartitionId())) {
throw se;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*/
public final class EventProcessorHost {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorHost.class);
private static final Object uuidSynchronizer = new Object();
private static final Object UUID_SYNCHRONIZER = new Object();
// weOwnExecutor exists to support user-supplied thread pools.
private final boolean weOwnExecutor;
private final ScheduledExecutorService executorService;
Expand Down Expand Up @@ -330,7 +330,7 @@ public static String createHostName(String prefix) {
* @return A string UUID with dashes but no curly brackets.
*/
public static String safeCreateUUID() {
synchronized (EventProcessorHost.uuidSynchronizer) {
synchronized (EventProcessorHost.UUID_SYNCHRONIZER) {
final UUID newUuid = UUID.randomUUID();
return newUuid.toString();
}
Expand Down Expand Up @@ -530,15 +530,15 @@ public CompletableFuture<Void> unregisterEventProcessor() {
}

static class EventProcessorHostThreadPoolFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final String namePrefix;
private final String hostName;
private final String entityName;
private final String consumerGroupName;

public EventProcessorHostThreadPoolFactory(
EventProcessorHostThreadPoolFactory(
String hostName,
String entityName,
String consumerGroupName) {
Expand All @@ -561,7 +561,7 @@ public Thread newThread(Runnable r) {

private String getNamePrefix() {
return String.format("[%s|%s|%s]-%s-",
this.entityName, this.consumerGroupName, this.hostName, poolNumber.getAndIncrement());
this.entityName, this.consumerGroupName, this.hostName, POOL_NUMBER.getAndIncrement());
}

static class ThreadUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ public interface ICheckpointManager {
*
* @return CompletableFuture {@literal ->} true if it exists, false if not
*/
public CompletableFuture<Boolean> checkpointStoreExists();
CompletableFuture<Boolean> checkpointStoreExists();

/***
* Create the checkpoint store if it doesn't exist. Do nothing if it does exist.
*
* @return CompletableFuture {@literal ->} null on success, completes exceptionally on error.
*/
public CompletableFuture<Void> createCheckpointStoreIfNotExists();
CompletableFuture<Void> createCheckpointStoreIfNotExists();

/**
* Deletes the checkpoint store.
*
* @return CompletableFuture {@literal ->} null on success, completes exceptionally on error.
*/
public CompletableFuture<Void> deleteCheckpointStore();
CompletableFuture<Void> deleteCheckpointStore();

/***
* Get the checkpoint data associated with the given partition. Could return null if no checkpoint has
Expand All @@ -51,7 +51,7 @@ public interface ICheckpointManager {
*
* @return CompletableFuture {@literal ->} checkpoint info, or null. Completes exceptionally on error.
*/
public CompletableFuture<Checkpoint> getCheckpoint(String partitionId);
CompletableFuture<Checkpoint> getCheckpoint(String partitionId);

/***
* Creates the checkpoint HOLDERs for the given partitions. Does nothing for any checkpoint HOLDERs
Expand All @@ -67,7 +67,7 @@ public interface ICheckpointManager {
* @param partitionIds List of partitions to create checkpoint HOLDERs for.
* @return CompletableFuture {@literal ->} null on success, completes exceptionally on error.
*/
public CompletableFuture<Void> createAllCheckpointsIfNotExists(List<String> partitionIds);
CompletableFuture<Void> createAllCheckpointsIfNotExists(List<String> partitionIds);

/***
* Update the checkpoint in the store with the offset/sequenceNumber in the provided checkpoint.
Expand All @@ -81,7 +81,7 @@ public interface ICheckpointManager {
* @param checkpoint offset/sequenceNumber and partition id to update the store with.
* @return CompletableFuture {@literal ->} null on success. Completes exceptionally on error.
*/
public CompletableFuture<Void> updateCheckpoint(CompleteLease lease, Checkpoint checkpoint);
CompletableFuture<Void> updateCheckpoint(CompleteLease lease, Checkpoint checkpoint);

/***
* Delete the stored checkpoint data for the given partition. If there is no stored checkpoint for the
Expand All @@ -91,5 +91,5 @@ public interface ICheckpointManager {
* @param partitionId id of partition to delete checkpoint from store
* @return CompletableFuture {@literal ->} null on success. Completes exceptionally on error.
*/
public CompletableFuture<Void> deleteCheckpoint(String partitionId);
CompletableFuture<Void> deleteCheckpoint(String partitionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface IEventProcessor {
* @param context Information about the partition that this event processor will process events from.
* @throws Exception to indicate failure.
*/
public void onOpen(PartitionContext context) throws Exception;
void onOpen(PartitionContext context) throws Exception;

/**
* Called by processor host to indicate that the event processor is being stopped.
Expand All @@ -43,7 +43,7 @@ public interface IEventProcessor {
* @param reason Reason why the event processor is being stopped.
* @throws Exception to indicate failure.
*/
public void onClose(PartitionContext context, CloseReason reason) throws Exception;
void onClose(PartitionContext context, CloseReason reason) throws Exception;

/**
* Called by the processor host when a batch of events has arrived.
Expand All @@ -58,7 +58,7 @@ public interface IEventProcessor {
* @param events The events to be processed. May be empty.
* @throws Exception to indicate failure.
*/
public void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception;
void onEvents(PartitionContext context, Iterable<EventData> events) throws Exception;

/**
* Called when the underlying client experiences an error while receiving. EventProcessorHost will take
Expand All @@ -68,5 +68,5 @@ public interface IEventProcessor {
* @param context Information about the partition.
* @param error The error that occured.
*/
public void onError(PartitionContext context, Throwable error);
void onError(PartitionContext context, Throwable error);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ public interface IEventProcessorFactory<T extends IEventProcessor> {
* @throws Exception to indicate failure.
* @return The event processor object.
*/
public T createEventProcessor(PartitionContext context) throws Exception;
T createEventProcessor(PartitionContext context) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface ILeaseManager {
*
* @return Duration of a lease before it expires unless renewed, specified in milliseconds.
*/
public int getLeaseDurationInMilliseconds();
int getLeaseDurationInMilliseconds();

/**
* Does the lease store exist?
Expand All @@ -35,29 +35,29 @@ public interface ILeaseManager {
*
* @return CompletableFuture {@literal ->} true if it exists, false if not
*/
public CompletableFuture<Boolean> leaseStoreExists();
CompletableFuture<Boolean> leaseStoreExists();

/**
* Create the lease store if it does not exist, do nothing if it does exist.
*
* @return CompletableFuture {@literal ->} null on success, completes exceptionally on error.
*/
public CompletableFuture<Void> createLeaseStoreIfNotExists();
CompletableFuture<Void> createLeaseStoreIfNotExists();

/**
* Deletes the lease store.
*
* @return CompletableFuture {@literal ->} null on success, completes exceptionally on error.
*/
public CompletableFuture<Void> deleteLeaseStore();
CompletableFuture<Void> deleteLeaseStore();

/**
* Returns the lease info for the given partition..
*
* @param partitionId Get the lease info for this partition.
* @return CompletableFuture {@literal ->} Lease, completes exceptionally on error.
*/
public CompletableFuture<CompleteLease> getLease(String partitionId);
CompletableFuture<CompleteLease> getLease(String partitionId);

/**
* Returns lightweight BaseLease for all leases, which includes name of owning host and whether lease
Expand All @@ -67,7 +67,7 @@ public interface ILeaseManager {
*
* @return CompletableFuture {@literal ->} list of BaseLease, completes exceptionally on error.
*/
public CompletableFuture<List<BaseLease>> getAllLeases();
CompletableFuture<List<BaseLease>> getAllLeases();


/**
Expand All @@ -77,7 +77,7 @@ public interface ILeaseManager {
* @param partitionIds ids of partitions to create lease info for
* @return CompletableFuture {@literal ->} null on success, completes exceptionally on error
*/
public CompletableFuture<Void> createAllLeasesIfNotExists(List<String> partitionIds);
CompletableFuture<Void> createAllLeasesIfNotExists(List<String> partitionIds);

/**
* Delete the lease info for a partition from the store. If there is no stored lease for the given partition,
Expand All @@ -86,7 +86,7 @@ public interface ILeaseManager {
* @param lease the currently existing lease info for the partition
* @return CompletableFuture {@literal ->} null on success, completes exceptionally on error.
*/
public CompletableFuture<Void> deleteLease(CompleteLease lease);
CompletableFuture<Void> deleteLease(CompleteLease lease);

/**
* Acquire the lease on the desired partition for this EventProcessorHost.
Expand All @@ -103,7 +103,7 @@ public interface ILeaseManager {
* @param lease Lease info for the desired partition
* @return CompletableFuture {@literal ->} true if the lease was acquired, false if not, completes exceptionally on error.
*/
public CompletableFuture<Boolean> acquireLease(CompleteLease lease);
CompletableFuture<Boolean> acquireLease(CompleteLease lease);

/**
* Renew a lease currently held by this host instance.
Expand All @@ -116,7 +116,7 @@ public interface ILeaseManager {
* @param lease Lease to be renewed
* @return true if the lease was renewed, false as described above, completes exceptionally on error.
*/
public CompletableFuture<Boolean> renewLease(CompleteLease lease);
CompletableFuture<Boolean> renewLease(CompleteLease lease);

/**
* Give up a lease currently held by this host.
Expand All @@ -127,7 +127,7 @@ public interface ILeaseManager {
* @param lease Lease to be given up
* @return CompletableFuture {@literal ->} null on success, completes exceptionally on error.
*/
public CompletableFuture<Void> releaseLease(CompleteLease lease);
CompletableFuture<Void> releaseLease(CompleteLease lease);

/**
* Update the store with the information in the provided lease.
Expand All @@ -139,5 +139,5 @@ public interface ILeaseManager {
* @param lease New lease info to be stored
* @return true if the update was successful, false if lease was lost and could not be updated, completes exceptionally on error.
*/
public CompletableFuture<Boolean> updateLease(CompleteLease lease);
CompletableFuture<Boolean> updateLease(CompleteLease lease);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,29 @@ public void initialize(HostContext hostContext) {

@Override
public CompletableFuture<Boolean> checkpointStoreExists() {
boolean exists = InMemoryCheckpointStore.singleton.existsMap();
boolean exists = InMemoryCheckpointStore.SINGLETON.existsMap();
TRACE_LOGGER.debug(this.hostContext.withHost("checkpointStoreExists() " + exists));
return CompletableFuture.completedFuture(exists);
}

@Override
public CompletableFuture<Void> createCheckpointStoreIfNotExists() {
TRACE_LOGGER.debug(this.hostContext.withHost("createCheckpointStoreIfNotExists()"));
InMemoryCheckpointStore.singleton.initializeMap();
InMemoryCheckpointStore.SINGLETON.initializeMap();
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> deleteCheckpointStore() {
TRACE_LOGGER.debug(this.hostContext.withHost("deleteCheckpointStore()"));
InMemoryCheckpointStore.singleton.deleteMap();
InMemoryCheckpointStore.SINGLETON.deleteMap();
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Checkpoint> getCheckpoint(String partitionId) {
Checkpoint returnCheckpoint = null;
Checkpoint checkpointInStore = InMemoryCheckpointStore.singleton.getCheckpoint(partitionId);
Checkpoint checkpointInStore = InMemoryCheckpointStore.SINGLETON.getCheckpoint(partitionId);
if (checkpointInStore == null) {
TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(partitionId,
"getCheckpoint() no existing Checkpoint"));
Expand All @@ -87,7 +87,7 @@ public CompletableFuture<Checkpoint> getCheckpoint(String partitionId) {
@Override
public CompletableFuture<Void> createAllCheckpointsIfNotExists(List<String> partitionIds) {
for (String id : partitionIds) {
Checkpoint checkpointInStore = InMemoryCheckpointStore.singleton.getCheckpoint(id);
Checkpoint checkpointInStore = InMemoryCheckpointStore.SINGLETON.getCheckpoint(id);
if (checkpointInStore != null) {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(id,
"createCheckpointIfNotExists() found existing checkpoint, OK"));
Expand All @@ -99,7 +99,7 @@ public CompletableFuture<Void> createAllCheckpointsIfNotExists(List<String> part
// and put it in the store, but the values are set to indicate that it is not initialized.
newStoreCheckpoint.setOffset(null);
newStoreCheckpoint.setSequenceNumber(-1);
InMemoryCheckpointStore.singleton.setOrReplaceCheckpoint(newStoreCheckpoint);
InMemoryCheckpointStore.SINGLETON.setOrReplaceCheckpoint(newStoreCheckpoint);
}
}
return CompletableFuture.completedFuture(null);
Expand All @@ -109,7 +109,7 @@ public CompletableFuture<Void> createAllCheckpointsIfNotExists(List<String> part
public CompletableFuture<Void> updateCheckpoint(CompleteLease lease, Checkpoint checkpoint) {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(checkpoint.getPartitionId(),
"updateCheckpoint() " + checkpoint.getOffset() + "//" + checkpoint.getSequenceNumber()));
Checkpoint checkpointInStore = InMemoryCheckpointStore.singleton.getCheckpoint(checkpoint.getPartitionId());
Checkpoint checkpointInStore = InMemoryCheckpointStore.SINGLETON.getCheckpoint(checkpoint.getPartitionId());
if (checkpointInStore != null) {
checkpointInStore.setOffset(checkpoint.getOffset());
checkpointInStore.setSequenceNumber(checkpoint.getSequenceNumber());
Expand All @@ -123,13 +123,13 @@ public CompletableFuture<Void> updateCheckpoint(CompleteLease lease, Checkpoint
@Override
public CompletableFuture<Void> deleteCheckpoint(String partitionId) {
TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId, "deleteCheckpoint()"));
InMemoryCheckpointStore.singleton.removeCheckpoint(partitionId);
InMemoryCheckpointStore.SINGLETON.removeCheckpoint(partitionId);
return CompletableFuture.completedFuture(null);
}


private static class InMemoryCheckpointStore {
static final InMemoryCheckpointStore singleton = new InMemoryCheckpointStore();
static final InMemoryCheckpointStore SINGLETON = new InMemoryCheckpointStore();

private ConcurrentHashMap<String, Checkpoint> inMemoryCheckpointsPrivate = null;

Expand Down
Loading

0 comments on commit 30e48df

Please sign in to comment.