Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delivery improvements #1153

Merged
merged 43 commits into from
Aug 31, 2019
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
9d80eec
Use Guava's `Hashing` to calculate the shard index consistently.
armiol Aug 23, 2019
a8798e5
Remove a redundant suppression of the warning.
armiol Aug 23, 2019
195a2f7
Extract the `Delivery.Builder` into a top-level class.
armiol Aug 23, 2019
094d885
Introduce a `DeliveryMonitor`.
armiol Aug 23, 2019
2967153
Remove a redundant line.
armiol Aug 23, 2019
a63a3cf
Fix the Javadoc method reference.
armiol Aug 23, 2019
b78ef8b
Do not allow to dispatch any other message than what's wrapped by the…
armiol Aug 24, 2019
8df19e0
Deliver messages to the same target/label/tenant in a batch (in progr…
armiol Aug 24, 2019
a244403
Do not allow to call `EntityMessageEndpoint.onEmptyResult(..)` with t…
armiol Aug 25, 2019
63384d9
Implement a batch dispatching of message from `Inbox` by introducing …
armiol Aug 25, 2019
67c08c6
Call proper `load`/`store` methods for `ProcessManager` and `Projecti…
armiol Aug 26, 2019
f72b9c9
Resolve test issues related to over-mocking.
armiol Aug 26, 2019
ad46c9c
Start testing the batched delivery (in progress).
armiol Aug 26, 2019
d16d8e0
Test the delivery in batches.
armiol Aug 26, 2019
056b24c
Fix the display name.
armiol Aug 26, 2019
2be21c2
Clear the stat counters.
armiol Aug 26, 2019
18c142f
Get rid of `System.out` call.
armiol Aug 26, 2019
917b87d
Calculate the shard index without using `Stringifiers`.
armiol Aug 28, 2019
f4f6ef9
Allow to release the node values from the shard indexes if a node has…
armiol Aug 28, 2019
e38ae60
Allow to customize the page size for a `DeliveryStage`.
armiol Aug 29, 2019
7a24405
Add the tests for `InboxStorage` and `InboxIds`.
armiol Aug 30, 2019
18d72ad
Allow to monitor the delivery stages after each page is delivered.
armiol Aug 30, 2019
7572ad8
Rename and document the class.
armiol Aug 30, 2019
4503891
Document the class.
armiol Aug 30, 2019
71a819c
Kill the redundant lines.
armiol Aug 30, 2019
cf30b35
Merge remote-tracking branch 'origin/master' into delivery-improvements
armiol Aug 30, 2019
dc54ab9
Use the charset explicitly.
armiol Aug 30, 2019
beb8315
Update the Spine version.
armiol Aug 30, 2019
2602fd4
Improve the code layout.
armiol Aug 30, 2019
be073df
Pass the proper value to the monitor.
armiol Aug 30, 2019
e917e82
Avoid the flow-control exception.
armiol Aug 30, 2019
e08ee1e
Update the license report and pom.xml
armiol Aug 30, 2019
6fd3083
Fix the alignment.
armiol Aug 30, 2019
3fa8534
Fix the tag and format properly.
armiol Aug 30, 2019
2e07e49
Get rid of a space character.
armiol Aug 30, 2019
fe4c6c1
Address the discrepancy between the docs and the implementation.
armiol Aug 30, 2019
270e6f2
Use the default factory method instead of seeding the default value.
armiol Aug 30, 2019
3de308e
Remove the unused generic parameter.
armiol Aug 30, 2019
637da8f
Add a note on cache usage.
armiol Aug 30, 2019
360aba1
Fix the test flow.
armiol Aug 30, 2019
3153ff2
Turn the `DeliveryStage` into Protobuf message.
armiol Aug 30, 2019
feb7f61
Mark `RepositoryCache`'s nested interfaces as functional.
armiol Aug 31, 2019
0ab5620
Notify the batch delivery listener in any case.
armiol Aug 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ final class AggregateCommandEndpoint<I, A extends Aggregate<I, ?, ?>>
}

@Override
protected DispatchOutcome invokeDispatcher(A aggregate, CommandEnvelope envelope) {
protected DispatchOutcome invokeDispatcher(A aggregate) {
EntityLifecycle lifecycle = repository().lifecycleOf(aggregate.id());
DispatchCommand<I> dispatch = operationFor(lifecycle, aggregate, envelope);
DispatchCommand<I> dispatch = operationFor(lifecycle, aggregate, envelope());
return dispatch.perform();
}

Expand All @@ -64,7 +64,8 @@ protected void afterDispatched(I entityId) {
* @throws IllegalStateException always
*/
@Override
protected void onEmptyResult(A aggregate, CommandEnvelope cmd) throws IllegalStateException {
protected void onEmptyResult(A aggregate) throws IllegalStateException {
CommandEnvelope cmd = envelope();
String entityId = aggregate.idAsString();
String entityClass = aggregate.getClass()
.getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private void storeAndPost(A aggregate, DispatchOutcome outcome) {
} else if (success.hasRejection()) {
post(success.getRejection());
} else {
onEmptyResult(aggregate, envelope());
onEmptyResult(aggregate);
}
afterDispatched(aggregate.id());
}
Expand All @@ -118,7 +118,7 @@ private A loadOrCreate(I aggregateId) {

@CanIgnoreReturnValue
final DispatchOutcome handleAndApplyEvents(A aggregate) {
DispatchOutcome outcome = invokeDispatcher(aggregate, envelope());
DispatchOutcome outcome = invokeDispatcher(aggregate);
Success successfulOutcome = outcome.getSuccess();
return successfulOutcome.hasProducedEvents()
? applyProducedEvents(aggregate, outcome)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ final class AggregateEventReactionEndpoint<I, A extends Aggregate<I, ?, ?>>
}

@Override
protected DispatchOutcome invokeDispatcher(A aggregate, EventEnvelope event) {
return aggregate.reactOn(event);
protected DispatchOutcome invokeDispatcher(A aggregate) {
return aggregate.reactOn(envelope());
}

@Override
Expand All @@ -52,7 +52,7 @@ protected void afterDispatched(I entityId) {
* updated upon reacting on an event.
*/
@Override
protected void onEmptyResult(A aggregate, EventEnvelope event) {
protected void onEmptyResult(A aggregate) {
// Do nothing.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package io.spine.server.aggregate;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets.SetView;
import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
Expand All @@ -33,11 +34,13 @@
import io.spine.server.ServerEnvironment;
import io.spine.server.aggregate.model.AggregateClass;
import io.spine.server.commandbus.CommandDispatcher;
import io.spine.server.delivery.BatchDeliveryListener;
import io.spine.server.delivery.Delivery;
import io.spine.server.delivery.Inbox;
import io.spine.server.delivery.InboxLabel;
import io.spine.server.entity.EntityLifecycle;
import io.spine.server.entity.Repository;
import io.spine.server.entity.RepositoryCache;
import io.spine.server.event.EventBus;
import io.spine.server.event.EventDispatcherDelegate;
import io.spine.server.event.RejectionEnvelope;
Expand Down Expand Up @@ -104,6 +107,8 @@ public abstract class AggregateRepository<I, A extends Aggregate<I, ?, ?>>
*/
private @MonotonicNonNull Inbox<I> inbox;

private @MonotonicNonNull RepositoryCache<I, A> cache;

/** The number of events to store between snapshots. */
private int snapshotTrigger = DEFAULT_SNAPSHOT_TRIGGER;

Expand Down Expand Up @@ -147,10 +152,15 @@ public void registerWith(BoundedContext context) {
context.importBus()
.register(EventImportDispatcher.of(this));
}
initCache(context.isMultitenant());
initInbox();
initMirror();
}

private void initCache(boolean multitenant) {
cache = new RepositoryCache<>(multitenant, this::doLoadOrCreate, this::doStore);
}

/**
* Initializes the {@code Inbox}.
*/
Expand All @@ -159,6 +169,17 @@ private void initInbox() {
.delivery();
inbox = delivery
.<I>newInbox(entityStateType())
.withBatchListener(new BatchDeliveryListener<I>() {
@Override
public void onStart(I id) {
cache.startCaching(id);
}

@Override
public void onEnd(I id) {
cache.stopCaching(id);
}
})
.addEventEndpoint(InboxLabel.REACT_UPON_EVENT,
e -> new AggregateEventReactionEndpoint<>(this, e))
.addEventEndpoint(InboxLabel.IMPORT_EVENT,
Expand Down Expand Up @@ -264,6 +285,11 @@ protected AggregateClass<A> toModelClass(Class<A> cls) {
*/
@Override
protected final void store(A aggregate) {
cache.store(aggregate);
}

@VisibleForTesting
protected void doStore(A aggregate) {
Write<I> operation = Write.operationFor(this, aggregate);
operation.perform();
}
Expand Down Expand Up @@ -533,6 +559,11 @@ protected AggregateStorage<I> aggregateStorage() {
* @return loaded or created aggregate instance
*/
final A loadOrCreate(I id) {
return cache.load(id);
}

@VisibleForTesting
protected A doLoadOrCreate(I id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these methods package-private? For a user, it is strange to see them when subclassing AggregateRepository.

A result = load(id).orElseGet(() -> createNew(id));
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ final class EventImportEndpoint<I, A extends Aggregate<I, ?, ?>>
* applied}.
*/
@Override
protected DispatchOutcome invokeDispatcher(A aggregate, EventEnvelope eventEnvelope) {
Event event = eventEnvelope.outerObject();
protected DispatchOutcome invokeDispatcher(A aggregate) {
Event event = envelope().outerObject();
Success.Builder success = Success.newBuilder();
success.getProducedEventsBuilder()
.addEvent(event);
Expand All @@ -72,8 +72,8 @@ protected void afterDispatched(I entityId) {
}

@Override
protected void onEmptyResult(A aggregate, EventEnvelope event) {
protected void onEmptyResult(A aggregate) {
_error().log("The aggregate `%s` was not modified during the import of the event `%s`.",
aggregate, event);
aggregate, envelope());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2019, TeamDev. All rights reserved.
*
* Redistribution and use in source and/or binary forms, with or without
* modification, must retain the above copyright notice and the following
* disclaimer.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

package io.spine.server.delivery;

/**
* Listens to start and end of the {@code Inbox} batch delivery.
*
* <p>When the consequent messages in the {@code Inbox} are targeting the same entity,
* the delivery may be optimized by using either the same transaction or caching the storage
* operations while the batch is delivered.
*
* <p>The implementing classes may define their own behavior and react upon such use cases.
*/
public interface BatchDeliveryListener<I> {

/**
* Invoked before the batch delivery to the target with the given ID is started.
*
* @param id
* the ID of the delivery target
*/
void onStart(I id);

/**
* Invoked after the batch delivery to the target with the given ID is ended.
*
* @param id
* the ID of the delivery target
*/
void onEnd(I id);
}
Loading