Skip to content

Commit

Permalink
Durable/catchup subscription model doesn't store keep subscription st…
Browse files Browse the repository at this point in the history
…ate if Subscription annotation specifies START_AT_BEGINNING and resume is same as start
  • Loading branch information
johanhaleby committed May 3, 2024
1 parent 456d041 commit 1832fa2
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
* </p>
* <br>
* <p>
* Also note that the if a the subscription crashes during catch-up mode it'll continue where it left-off on restart, given the no specific `StartAt` position is supplied.
* Also note that the if a the subscription crashes during catch-up mode it'll continue where it left-off on restart, given the no specific `StartAt` position is supplied (i.e. if {@code StartAt.subscriptionModelDefault() is used}).
* For this to work, the subscription must store the subscription position in a {@link SubscriptionPositionStorage} implementation periodically. It's possible to configure
* how often this should happen in the {@link CatchupSubscriptionModelConfig}.
* </p>
Expand Down Expand Up @@ -120,7 +120,7 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter,
if (startAtGeneratedByDynamic == null) {
// We're not allowed to start this subscription model, defer to parent!
runningCatchupSubscriptions.remove(subscriptionId);
return getDelegatedSubscriptionModelRecursively().subscribe(subscriptionId, filter, startAt, action);
return getDelegatedSubscriptionModel().subscribe(subscriptionId, filter, startAt, action);
} else {
firstStartAt = startAtGeneratedByDynamic;
}
Expand Down Expand Up @@ -161,10 +161,22 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter,
// Perform the catchup
runCatchupForStream(stream, subscriptionId, action, null);

// Here's the reason why we're forcing the wrapping subscription to be a PositionAwareBlockingSubscription.
// This is in order to be 100% safe since we need to take events that are published meanwhile the EventStoreQuery
// is executed. Thus, we need the global position of the subscription at the time of starting the query.
SubscriptionPosition globalSubscriptionPosition = subscriptionModel.globalSubscriptionPosition();

Class<? extends SubscriptionModel> delegatedSubscriptionModelType = getDelegatedSubscriptionModel().getClass();
StartAt delegatedStartAt = startAt.get(new SubscriptionModelContext(delegatedSubscriptionModelType));
final SubscriptionPosition globalSubscriptionPosition;
if (delegatedStartAt == null) {
globalSubscriptionPosition = null;
returnIfSubscriptionPositionStorageConfigIs(UseSubscriptionPositionInStorage.class, cfg -> {
cfg.storage().delete(subscriptionId);
return null;
});
} else {
// Here's the reason why we're forcing the wrapping subscription to be a PositionAwareBlockingSubscription.
// This is in order to be 100% safe since we need to take events that are published meanwhile the EventStoreQuery
// is executed. Thus, we need the global position of the subscription at the time of starting the query.
globalSubscriptionPosition = subscriptionModel.globalSubscriptionPosition();
}

// Here we check if new events have arrived during catchup phase, if so we stream/catch-up these events as well.
long numberOfEventsAfterCatchupSubscriptionCompleted = eventStoreQueries.count(catchupFilter);
Expand All @@ -185,8 +197,6 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter,
subscriptionsWasCancelledOrShutdown = true;
}

// TODO Should we remove the position from storage?! For example if the wrapping subscription is not storing the position?

// Be careful since the wrapping subscription has not yet saved the global position here...
// We need a durable cache in order to be 100% safe.

Expand All @@ -197,21 +207,28 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter,
// PersistSubscriptionPositionDuringCatchupPhase) is that if using a "storage" at all in the config, is to accommodate
// that the wrapping subscription continues from where we left off.
StartAt startAtToUse = StartAt.dynamic(this.<Supplier<StartAt>, UseSubscriptionPositionInStorage>returnIfSubscriptionPositionStorageConfigIs(UseSubscriptionPositionInStorage.class,
cfg -> () -> {
// It's important that we find the document inside the supplier so that we look up the latest resume token on retry
SubscriptionPosition position = cfg.storage().read(subscriptionId);
// If there is no position stored in storage, or if the stored position is time-based
// (i.e. written by the catch-up subscription), we save the globalSubscriptionPosition.
// The reason that we need to write the time-based subscription position in this case
// is that the wrapped subscription might not support time-based subscriptions.
if ((position == null || isTimeBasedSubscriptionPosition(position)) && globalSubscriptionPosition != null) {
position = cfg.storage().save(subscriptionId, globalSubscriptionPosition);
} else if (position == null) {
// Position can still be null here if globalSubscriptionPosition is null, if so, we start at the "subscriptionModelDefault"
return StartAt.subscriptionModelDefault();
cfg -> () -> {
// It's important that we find the document inside the supplier so that we look up the latest resume token on retry
SubscriptionPosition position = cfg.storage().read(subscriptionId);
// If there is no position stored in storage, or if the stored position is time-based
// (i.e. written by the catch-up subscription), we save the globalSubscriptionPosition.
// The reason that we need to write the time-based subscription position in this case
// is that the wrapped subscription might not support time-based subscriptions.
if ((position == null || isTimeBasedSubscriptionPosition(position)) && globalSubscriptionPosition != null) {
position = cfg.storage().save(subscriptionId, globalSubscriptionPosition);
} else if (position == null) {
// Position can still be null here if globalSubscriptionPosition is null, if so, we start at the "subscriptionModelDefault"
return delegatedStartAt == null ? startAt : StartAt.subscriptionModelDefault();
}
return StartAt.subscriptionPosition(position);
})
.orElse(() -> {
if (globalSubscriptionPosition == null) {
return delegatedStartAt == null ? startAt : StartAt.subscriptionModelDefault();
} else {
return StartAt.subscriptionPosition(globalSubscriptionPosition);
}
return StartAt.subscriptionPosition(position);
}).orElse(() -> globalSubscriptionPosition == null ? StartAt.subscriptionModelDefault() : StartAt.subscriptionPosition(globalSubscriptionPosition)));
}));

final Subscription subscription;
if (subscriptionsWasCancelledOrShutdown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,10 @@ public DurableSubscriptionModel(PositionAwareSubscriptionModel subscriptionModel
public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, StartAt startAt, Consumer<CloudEvent> action) {
Objects.requireNonNull(startAt, StartAt.class.getSimpleName() + " supplier cannot be null");

// TODO DEtta funkar inte!! DEn sparar ner positionen ändå, dvs vi får aldrig null!
// Detta är i kontexten då Catchup har lämnat över hit. Det beror på att den sätter en
// startAt position explicit
StartAt startAtToUse = generateStartAtPositionFrom(subscriptionId, startAt);
if (startAtToUse == null) {
// We're not allowed to start this subscription, delegate to wrapped subscription instead
return getDelegatedSubscriptionModelRecursively().subscribe(subscriptionId, filter, startAt, action);
return getDelegatedSubscriptionModel().subscribe(subscriptionId, filter, startAt, action);
}

return subscriptionModel.subscribe(subscriptionId, filter, startAtToUse, cloudEvent -> {
Expand Down

0 comments on commit 1832fa2

Please sign in to comment.