Skip to content

Commit

Permalink
Trying to avoid storing beginning of time subscriptions in durable st…
Browse files Browse the repository at this point in the history
…orage (doesn't work yet)
  • Loading branch information
johanhaleby committed Apr 19, 2024
1 parent de0608d commit ce3412f
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.occurrent.dsl.subscription.blocking.Subscriptions;
import org.occurrent.filter.Filter;
import org.occurrent.subscription.StartAt;
import org.occurrent.subscription.SubscriptionPosition;
import org.occurrent.subscription.blocking.durable.DurableSubscriptionModel;
import org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModel;
import org.occurrent.subscription.blocking.durable.catchup.TimeBasedSubscriptionPosition;
import org.springframework.beans.BeansException;
Expand Down Expand Up @@ -164,23 +164,28 @@ private <E> void processSubscribeAnnotation(Object bean, Method method, Subscrip
private static @NotNull StartAt generateStartAt(Subscription subscription) {
return switch (subscription.startAt()) {
case BEGINNING_OF_TIME -> switch (subscription.resumeBehavior()) {
case SAME_AS_START_AT -> StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime());
// case DEFAULT -> StartAt.dynamic(ctx -> {
// Object subscriptionPosition = ctx.subscriptionModelData().get("subscriptionPosition");
// if (subscriptionPosition instanceof SubscriptionPosition sp) {
// // This means that the subscription model has recorded a subscription position,
// // and it means that we should start from this position.
// return StartAt.subscriptionPosition(sp);
// } else {
// // If no subscription position is persisted, we start from the "startAt" position
// // defined in the annotation.
// return subscription.startAt();
// }
// });
case SAME_AS_START_AT -> StartAt.dynamic(ctx -> {
boolean isDurableSubscription = DurableSubscriptionModel.class.isAssignableFrom(ctx.subscriptionModelType());
if (isDurableSubscription) {
// Since we now know that we always start AND resume from the beginning of time for this subscription,
// we don't need to store the position in a durable storage, because we will always stream all events
// each time the subscription restarts anyway. Thus, we return null to instruct the DurableSubscriptionModel
// to simply delegate to the parent subscription. Note that this works because the parent of the CatchupSubscriptionModel
// is a DurableSubscriptionModel, so after the catch-up phase it'll hand over to the DurableSubscriptionModel.
// Unfortunately the CatchupSubscriptionModel is configured to write the position when the catch-up phase is completed,
// but it's harder to get around that.
return null;
} else {
return StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime());
}
});
case DEFAULT -> StartAt.subscriptionModelDefault();
};
case NOW -> StartAt.now();
case DEFAULT -> StartAt.dynamic(ctx -> {
// By default, we don't want to run the "default" behavior of the CatchupSubscriptionModel, which is to
// start streaming from the beginning of time. We want to instruct the CatchupSubscriptionModel to simply
// delegate to the parent subscription, which is what we do if we return null.
boolean isCatchupSubscription = CatchupSubscriptionModel.class.isAssignableFrom(ctx.subscriptionModelType());
return isCatchupSubscription ? null : StartAt.subscriptionModelDefault();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public SubscriptionModel occurrentCompetingDurableSubscriptionModel(MongoTemplat
SpringMongoSubscriptionModel mongoSubscriptionModel = new SpringMongoSubscriptionModel(mongoTemplate, withConfig(eventStoreProperties.getCollection(), eventStoreProperties.getTimeRepresentation())
.restartSubscriptionsOnChangeStreamHistoryLost(occurrentProperties.getSubscription().isRestartOnChangeStreamHistoryLost()));
DurableSubscriptionModel durableSubscriptionModel = new DurableSubscriptionModel(mongoSubscriptionModel, storage);
// TODO THINK ABOUT BACKWARD COMPATIBILTIY! ALla subscriptions med StartAt default kommer efter denna ändring att börja läsa från tidernas begynnelse (om det inte finns en position sparad).
// Det vill man nog inte. Vad händer om man gör subscribe "now"?! Kan vi komma på ett bättre sett, tex att Catchup INTE börjar läsa upp vid "default", utan endast om man explicit
// säger beginning of time?! Det är nog enda lösningen. Vid default med CatchupSubscriptionModel, delegera alltid till wrapped!!! Gör också så att även om man valt "beggingin of time" så ska den fortsätta
// från vad CatchupSubscriptionModel har skrivit ner (alltså om position är TimeBased), det är ju bara time-based i storage om man kraschat, och då vill man fortsätta därifrån.
CatchupSubscriptionModel catchupSubscriptionModel = new CatchupSubscriptionModel(durableSubscriptionModel, eventStoreQueries,
new CatchupSubscriptionModelConfig(useSubscriptionPositionStorage(storage)
.andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents(1000)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@ public StartAt get(SubscriptionModelContext context) {
if (startAt instanceof Dynamic) {
return startAt.get(context);
}
// else if (startAt == null) {
// throw new IllegalArgumentException("Dynamic \"start at\" was null which is not supported.");
// }
return startAt;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter,
} else if (startAt.isDynamic()) {
StartAt startAtGeneratedByDynamic = startAt.get(generateSubscriptionModelContext());
if (startAtGeneratedByDynamic == null) {
// We're not allowed to use start this subscription model, defer to parent!
// We're not allowed to start this subscription model, defer to parent!
runningCatchupSubscriptions.remove(subscriptionId);
return getDelegatedSubscriptionModelRecursively().subscribe(subscriptionId, filter, startAt, action);
} else {
Expand All @@ -129,6 +129,7 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter,

// We want to continue from the wrapping subscription if it has something stored in its position storage.
if (!isTimeBasedSubscriptionPosition(firstStartAt)) {
runningCatchupSubscriptions.remove(subscriptionId);
return subscriptionModel.subscribe(subscriptionId, filter, firstStartAt, action);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.occurrent.subscription.SubscriptionPosition;
import org.occurrent.subscription.api.blocking.*;

import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;

Expand Down Expand Up @@ -81,7 +80,14 @@ public DurableSubscriptionModel(PositionAwareSubscriptionModel subscriptionModel
public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, StartAt startAt, Consumer<CloudEvent> action) {
Objects.requireNonNull(startAt, StartAt.class.getSimpleName() + " supplier cannot be null");

// TODO DEtta funkar inte!! DEn sparar ner positionen ändå, dvs vi får aldrig null!
// Detta är i kontexten då Catchup har lämnat över hit. Det beror på att den sätter en
// startAt position explicit
StartAt startAtToUse = generateStartAtPositionFrom(subscriptionId, startAt);
if (startAtToUse == null) {
// We're not allowed to start this subscription, delegate to wrapped subscription instead
return getDelegatedSubscriptionModelRecursively().subscribe(subscriptionId, filter, startAt, action);
}

return subscriptionModel.subscribe(subscriptionId, filter, startAtToUse, cloudEvent -> {
action.accept(cloudEvent);
Expand Down Expand Up @@ -110,29 +116,18 @@ private StartAt generateStartAtPositionFrom(String subscriptionId, StartAt origi
return subscriptionPosition == null ? startAtIfNoSubscriptionFound : StartAt.subscriptionPosition(subscriptionPosition);
});
} else if (originalStartAt.isDynamic()) {
startAtToUse = StartAt.dynamic(() -> {
var subscriptionModelContext = generateSubscriptionModelContext(subscriptionId);
var nextStartAt = originalStartAt.get(subscriptionModelContext);
var subscriptionModelContext = new SubscriptionModelContext(DurableSubscriptionModel.class);
var nextStartAt = originalStartAt.get(subscriptionModelContext);
if (nextStartAt != null) {
return generateStartAtPositionFrom(subscriptionId, nextStartAt);
});

}
return null;
} else {
startAtToUse = originalStartAt;
}
return startAtToUse;
}

private SubscriptionModelContext generateSubscriptionModelContext(String subscriptionId) {
SubscriptionPosition subscriptionPosition = storage.read(subscriptionId);
final Map<String, Object> data;
if (subscriptionPosition == null) {
data = Map.of();
} else {
data = Map.of("subscriptionPosition", subscriptionPosition);
}
return new SubscriptionModelContext(DurableSubscriptionModel.class, data);
}

@Override
public void stop() {
getDelegatedSubscriptionModel().stop();
Expand Down

0 comments on commit ce3412f

Please sign in to comment.