-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Delivery improvements #1153
Delivery improvements #1153
Conversation
…he argument different from the envelope wrapped.
…a `RepositoryCache` (in progress).
Extract nested classes from `Inbox` and `InboxPart`.
… not been active for too long.
Add more tests for delivery.
Codecov Report
@@ Coverage Diff @@
## master #1153 +/- ##
============================================
- Coverage 91.73% 91.69% -0.05%
- Complexity 4112 4190 +78
============================================
Files 536 542 +6
Lines 12989 13210 +221
Branches 747 767 +20
============================================
+ Hits 11916 12113 +197
- Misses 838 850 +12
- Partials 235 247 +12 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@armiol, please see my comments.
} | ||
|
||
@VisibleForTesting | ||
protected A doLoadOrCreate(I id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make these methods package-private? For a user, it is strange to see them when subclassing AggregateRepository
.
@@ -59,17 +58,19 @@ | |||
* | |||
* <b>Configuration</b> | |||
* | |||
* <p>By default, a shard is assigned according to the identifier of the target entity. The messages | |||
* <p>By default, a shard is assigned according to the identifier of the target entity. The | |||
* messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix the alignment here and further down the doc.
* {@linkplain DeliveryBuilder#setPageSize(int) configured}. | ||
* | ||
* <p>After each {@code DeliveryStage} it is possible to stop the delivery by | ||
* {@link DeliveryBuilder#setMonitor(DeliveryMonitor) supplying} the custom delivery monitor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* {@link DeliveryBuilder#setMonitor(DeliveryMonitor) supplying} the custom delivery monitor. | |
* {@link DeliveryBuilder#setMonitor(DeliveryMonitor) supplying} a custom delivery monitor. |
try { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please kill this line.
int deliveredMsgCount; | ||
do { | ||
deliveredMsgCount = doDeliver(session); | ||
} while (deliveredMsgCount > 0); | ||
} catch (DeliveryStoppedByMonitorException ignored) { | ||
// do nothing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's group the deliveredMsgCount
and the instructions of whether to continue delivery or not into an object. Something of a sort:
StageResult result;
do {
result = doDeliver(session);
} while (result.shouldDeliverMore());
@@ -32,12 +36,26 @@ | |||
* | |||
* <p>Uses a hash code of the entity identifier and the remainder of its division by the total | |||
* number of shards to determine the index of a shard, at which the modification is allowed. | |||
* | |||
* <p>To reach the consistency of the calculated values across different nodes, | |||
* the identifier is transformed into a {@code String}, and this value is then hashed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the identifier is transformed into a
String
This is not true for Message
identifiers. I'd just say that it is transformed into bytes.
* The hash function to use for the shard index calculation. | ||
*/ | ||
@SuppressWarnings("UnstableApiUsage") // See the class-level docs. | ||
private static final HashFunction HASHER = Hashing.murmur3_32(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The seed 0
may just be avoided (and the no-argument method may be called instead).
* @param <E> | ||
* the type of entity | ||
*/ | ||
public interface Store<I, E extends Entity<I, ?>> extends Consumer<E> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need I
?
|
||
if (!cache.containsKey(idInTenant)) { | ||
E entity = loadFn.apply(idInTenant.value()); | ||
cache.put(idInTenant, entity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the cache grow too quick and fill the memory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in our use cases. The maximum number of cached entities is the total number of shards — and that's in case there is just one node serving the whole app. It's because each shard delivery is a single-thread process, and the only entity is cached at a time by the delivery.
End-users will hardly ever have 100k of shards and just a single instance processing them all.
I don't want to use Guava's cache for such a simple use case. And it's much slower than the concurrent HashMap
.
I have added the note on usage though.
super.configure(tester); | ||
tester.setDefault(InboxId.class, InboxId.getDefaultInstance()) | ||
.setDefault(TypeUrl.class, TypeUrl.of(Calc.class)); | ||
tester.testStaticMethods(getUtilityClass(), NullPointerTester.Visibility.PACKAGE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The configure
method is not meant to execute tests.
Update the license report one more time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@armiol, please consider making Store
and Load
functional interfaces. Other than that — LGTM.
Update the license report.
In this changeset, several improvements to the
Delivery
mechanism are made.Consistent shard indexes
The default sharding strategy now relies on Murmur3 hashing algorithm, rather than on the native Java
hashCode()
. It allows determining the shard index consistently across different JVMs.Delivery stages and monitoring
As long as the messages during the delivery are read page-by-page, the
Delivery
may now be configured with the page size. Additionally, an API user now can control whether the delivery should be continued after delivering another page. In environments, such as AppEngine Standard, there are time restrictions on the max duration of the request processing.The usage of
DeliveryMonitor
allows to obtain more fine-grained control and stop the execution before the instance gets killed.Batch dispatching
In case the consequent messages in the same shard are headed to the same target, they are now grouped in a batch and dispatched together. Also, instead of reading and writing the
Entity
per-message, theRepositoryCache
is used to cacheload
andstore
calls.It starts caching before the batch of
InboxMessage
s is delivered and stops the caching afterward, propagating all the updates to the underlying storage.Cleaning
ShardedWorkRegistry
When an application node picks up a shard, a respective record is created in the
ShardedWorkRegistry
, telling that the node picked up some shard at a certain time. However, as the application nodes may get killed, some of the records may contain stale data and prevent other nodes from picking the shards.The new
ShardedWorkRegistry.releaseExpiredSessions(ShardIndex, Duration)
call allows to manually clear the registry from the nodes which have picked up their sessions a long time ago (specified by theduration
parameter).Additionally, some polishing has been done in the API of
Endpoint
s removing the parameter discrepancy and streamlining the execution.