From ce3412f0433616a264d26d15fb3a997a5e9dcada Mon Sep 17 00:00:00 2001 From: Johan Haleby Date: Fri, 19 Apr 2024 16:08:59 +0200 Subject: [PATCH] Trying to avoid storing beginning of time subscriptions in durable storage (doesn't work yet) --- .../OccurrentAnnotationBeanPostProcessor.java | 33 +++++++++++-------- .../OccurrentMongoAutoConfiguration.java | 4 +++ .../org/occurrent/subscription/StartAt.java | 3 -- .../catchup/CatchupSubscriptionModel.java | 3 +- .../durable/DurableSubscriptionModel.java | 29 +++++++--------- 5 files changed, 37 insertions(+), 35 deletions(-) diff --git a/framework/spring-boot-starter-mongodb/src/main/java/org/occurrent/springboot/mongo/blocking/OccurrentAnnotationBeanPostProcessor.java b/framework/spring-boot-starter-mongodb/src/main/java/org/occurrent/springboot/mongo/blocking/OccurrentAnnotationBeanPostProcessor.java index 76dc45053..d3a165bec 100644 --- a/framework/spring-boot-starter-mongodb/src/main/java/org/occurrent/springboot/mongo/blocking/OccurrentAnnotationBeanPostProcessor.java +++ b/framework/spring-boot-starter-mongodb/src/main/java/org/occurrent/springboot/mongo/blocking/OccurrentAnnotationBeanPostProcessor.java @@ -26,7 +26,7 @@ import org.occurrent.dsl.subscription.blocking.Subscriptions; import org.occurrent.filter.Filter; import org.occurrent.subscription.StartAt; -import org.occurrent.subscription.SubscriptionPosition; +import org.occurrent.subscription.blocking.durable.DurableSubscriptionModel; import org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModel; import org.occurrent.subscription.blocking.durable.catchup.TimeBasedSubscriptionPosition; import org.springframework.beans.BeansException; @@ -164,23 +164,28 @@ private void processSubscribeAnnotation(Object bean, Method method, Subscrip private static @NotNull StartAt generateStartAt(Subscription subscription) { return switch (subscription.startAt()) { case BEGINNING_OF_TIME -> switch (subscription.resumeBehavior()) { - case SAME_AS_START_AT -> StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime()); -// case DEFAULT -> StartAt.dynamic(ctx -> { -// Object subscriptionPosition = ctx.subscriptionModelData().get("subscriptionPosition"); -// if (subscriptionPosition instanceof SubscriptionPosition sp) { -// // This means that the subscription model has recorded a subscription position, -// // and it means that we should start from this position. -// return StartAt.subscriptionPosition(sp); -// } else { -// // If no subscription position is persisted, we start from the "startAt" position -// // defined in the annotation. -// return subscription.startAt(); -// } -// }); + case SAME_AS_START_AT -> StartAt.dynamic(ctx -> { + boolean isDurableSubscription = DurableSubscriptionModel.class.isAssignableFrom(ctx.subscriptionModelType()); + if (isDurableSubscription) { + // Since we now know that we always start AND resume from the beginning of time for this subscription, + // we don't need to store the position in a durable storage, because we will always stream all events + // each time the subscription restarts anyway. Thus, we return null to instruct the DurableSubscriptionModel + // to simply delegate to the parent subscription. Note that this works because the parent of the CatchupSubscriptionModel + // is a DurableSubscriptionModel, so after the catch-up phase it'll hand over to the DurableSubscriptionModel. + // Unfortunately the CatchupSubscriptionModel is configured to write the position when the catch-up phase is completed, + // but it's harder to get around that. + return null; + } else { + return StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime()); + } + }); case DEFAULT -> StartAt.subscriptionModelDefault(); }; case NOW -> StartAt.now(); case DEFAULT -> StartAt.dynamic(ctx -> { + // By default, we don't want to run the "default" behavior of the CatchupSubscriptionModel, which is to + // start streaming from the beginning of time. We want to instruct the CatchupSubscriptionModel to simply + // delegate to the parent subscription, which is what we do if we return null. boolean isCatchupSubscription = CatchupSubscriptionModel.class.isAssignableFrom(ctx.subscriptionModelType()); return isCatchupSubscription ? null : StartAt.subscriptionModelDefault(); }); diff --git a/framework/spring-boot-starter-mongodb/src/main/java/org/occurrent/springboot/mongo/blocking/OccurrentMongoAutoConfiguration.java b/framework/spring-boot-starter-mongodb/src/main/java/org/occurrent/springboot/mongo/blocking/OccurrentMongoAutoConfiguration.java index 432df1846..f9f3296da 100644 --- a/framework/spring-boot-starter-mongodb/src/main/java/org/occurrent/springboot/mongo/blocking/OccurrentMongoAutoConfiguration.java +++ b/framework/spring-boot-starter-mongodb/src/main/java/org/occurrent/springboot/mongo/blocking/OccurrentMongoAutoConfiguration.java @@ -126,6 +126,10 @@ public SubscriptionModel occurrentCompetingDurableSubscriptionModel(MongoTemplat SpringMongoSubscriptionModel mongoSubscriptionModel = new SpringMongoSubscriptionModel(mongoTemplate, withConfig(eventStoreProperties.getCollection(), eventStoreProperties.getTimeRepresentation()) .restartSubscriptionsOnChangeStreamHistoryLost(occurrentProperties.getSubscription().isRestartOnChangeStreamHistoryLost())); DurableSubscriptionModel durableSubscriptionModel = new DurableSubscriptionModel(mongoSubscriptionModel, storage); + // TODO THINK ABOUT BACKWARD COMPATIBILTIY! ALla subscriptions med StartAt default kommer efter denna ändring att börja läsa från tidernas begynnelse (om det inte finns en position sparad). + // Det vill man nog inte. Vad händer om man gör subscribe "now"?! Kan vi komma på ett bättre sett, tex att Catchup INTE börjar läsa upp vid "default", utan endast om man explicit + // säger beginning of time?! Det är nog enda lösningen. Vid default med CatchupSubscriptionModel, delegera alltid till wrapped!!! Gör också så att även om man valt "beggingin of time" så ska den fortsätta + // från vad CatchupSubscriptionModel har skrivit ner (alltså om position är TimeBased), det är ju bara time-based i storage om man kraschat, och då vill man fortsätta därifrån. CatchupSubscriptionModel catchupSubscriptionModel = new CatchupSubscriptionModel(durableSubscriptionModel, eventStoreQueries, new CatchupSubscriptionModelConfig(useSubscriptionPositionStorage(storage) .andPersistSubscriptionPositionDuringCatchupPhaseForEveryNEvents(1000))); diff --git a/subscription/core/src/main/java/org/occurrent/subscription/StartAt.java b/subscription/core/src/main/java/org/occurrent/subscription/StartAt.java index e0a30cd98..20761b34e 100644 --- a/subscription/core/src/main/java/org/occurrent/subscription/StartAt.java +++ b/subscription/core/src/main/java/org/occurrent/subscription/StartAt.java @@ -93,9 +93,6 @@ public StartAt get(SubscriptionModelContext context) { if (startAt instanceof Dynamic) { return startAt.get(context); } -// else if (startAt == null) { -// throw new IllegalArgumentException("Dynamic \"start at\" was null which is not supported."); -// } return startAt; } } diff --git a/subscription/util/blocking/catchup-subscription/src/main/java/org/occurrent/subscription/blocking/durable/catchup/CatchupSubscriptionModel.java b/subscription/util/blocking/catchup-subscription/src/main/java/org/occurrent/subscription/blocking/durable/catchup/CatchupSubscriptionModel.java index 4186ca307..0f7dba79f 100644 --- a/subscription/util/blocking/catchup-subscription/src/main/java/org/occurrent/subscription/blocking/durable/catchup/CatchupSubscriptionModel.java +++ b/subscription/util/blocking/catchup-subscription/src/main/java/org/occurrent/subscription/blocking/durable/catchup/CatchupSubscriptionModel.java @@ -117,7 +117,7 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, } else if (startAt.isDynamic()) { StartAt startAtGeneratedByDynamic = startAt.get(generateSubscriptionModelContext()); if (startAtGeneratedByDynamic == null) { - // We're not allowed to use start this subscription model, defer to parent! + // We're not allowed to start this subscription model, defer to parent! runningCatchupSubscriptions.remove(subscriptionId); return getDelegatedSubscriptionModelRecursively().subscribe(subscriptionId, filter, startAt, action); } else { @@ -129,6 +129,7 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, // We want to continue from the wrapping subscription if it has something stored in its position storage. if (!isTimeBasedSubscriptionPosition(firstStartAt)) { + runningCatchupSubscriptions.remove(subscriptionId); return subscriptionModel.subscribe(subscriptionId, filter, firstStartAt, action); } diff --git a/subscription/util/blocking/durable-subscription/src/main/java/org/occurrent/subscription/blocking/durable/DurableSubscriptionModel.java b/subscription/util/blocking/durable-subscription/src/main/java/org/occurrent/subscription/blocking/durable/DurableSubscriptionModel.java index bdd8f66ab..9aa815b2d 100644 --- a/subscription/util/blocking/durable-subscription/src/main/java/org/occurrent/subscription/blocking/durable/DurableSubscriptionModel.java +++ b/subscription/util/blocking/durable-subscription/src/main/java/org/occurrent/subscription/blocking/durable/DurableSubscriptionModel.java @@ -24,7 +24,6 @@ import org.occurrent.subscription.SubscriptionPosition; import org.occurrent.subscription.api.blocking.*; -import java.util.Map; import java.util.Objects; import java.util.function.Consumer; @@ -81,7 +80,14 @@ public DurableSubscriptionModel(PositionAwareSubscriptionModel subscriptionModel public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, StartAt startAt, Consumer action) { Objects.requireNonNull(startAt, StartAt.class.getSimpleName() + " supplier cannot be null"); + // TODO DEtta funkar inte!! DEn sparar ner positionen ändå, dvs vi får aldrig null! + // Detta är i kontexten då Catchup har lämnat över hit. Det beror på att den sätter en + // startAt position explicit StartAt startAtToUse = generateStartAtPositionFrom(subscriptionId, startAt); + if (startAtToUse == null) { + // We're not allowed to start this subscription, delegate to wrapped subscription instead + return getDelegatedSubscriptionModelRecursively().subscribe(subscriptionId, filter, startAt, action); + } return subscriptionModel.subscribe(subscriptionId, filter, startAtToUse, cloudEvent -> { action.accept(cloudEvent); @@ -110,29 +116,18 @@ private StartAt generateStartAtPositionFrom(String subscriptionId, StartAt origi return subscriptionPosition == null ? startAtIfNoSubscriptionFound : StartAt.subscriptionPosition(subscriptionPosition); }); } else if (originalStartAt.isDynamic()) { - startAtToUse = StartAt.dynamic(() -> { - var subscriptionModelContext = generateSubscriptionModelContext(subscriptionId); - var nextStartAt = originalStartAt.get(subscriptionModelContext); + var subscriptionModelContext = new SubscriptionModelContext(DurableSubscriptionModel.class); + var nextStartAt = originalStartAt.get(subscriptionModelContext); + if (nextStartAt != null) { return generateStartAtPositionFrom(subscriptionId, nextStartAt); - }); - + } + return null; } else { startAtToUse = originalStartAt; } return startAtToUse; } - private SubscriptionModelContext generateSubscriptionModelContext(String subscriptionId) { - SubscriptionPosition subscriptionPosition = storage.read(subscriptionId); - final Map data; - if (subscriptionPosition == null) { - data = Map.of(); - } else { - data = Map.of("subscriptionPosition", subscriptionPosition); - } - return new SubscriptionModelContext(DurableSubscriptionModel.class, data); - } - @Override public void stop() { getDelegatedSubscriptionModel().stop();