Skip to content

Commit

Permalink
Changed default behavior of CatchupSubscriptionModel to only replay f…
Browse files Browse the repository at this point in the history
…rom beginning of time if explicitly specified to do so
  • Loading branch information
johanhaleby committed May 10, 2024
1 parent 807db4e commit 9d43565
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 20 deletions.
21 changes: 18 additions & 3 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
### Changelog next version
* Major improvements to CatchupSubscriptionModel, it now handles and includes events that have been written while the catch-up subscription phase runs. Also, the "idempotency cache" is only used while switching from catch-up to continuous mode, and not during the entire catch-up phase.
* Major improvements to `CatchupSubscriptionModel`, it now handles and includes events that have been written while the catch-up subscription phase runs. Also, the "idempotency cache" is only used while switching from catch-up to continuous mode, and not during the entire catch-up phase.
* Major changes to the `spring-boot-starter-mongodb` module. It now includes a `CatchupSubscriptionModel` which allows you to start subscriptions from an historic date more easily.
* `StartAt.Dynamic(..)` now takes a `SubscriptionModelContext` as a parameter. This means that subscription models can add a "context" that can be useful for dynamic behavior. For example, you can prevent a certain subscription model to start (and instead delegate to its parent) if you return `null` as `StartAt` from a dynamic position.
* Added annotation support for subscriptions when using the `spring-boot-starter-mongodb` module. You can now do:
Expand All @@ -9,8 +9,23 @@
System.out.println("Received event: " + event);
}
```
It also allows you to easily start the subscription from a moment in the past (such as beginning of time). See javadoc in `org.occurrent.annotation.Subscription` for more info.

It also allows you to easily start the subscription from a moment in the past (such as beginning of time). See javadoc in `org.occurrent.annotation.Subscription` for more info.
* Added `org.occurrent.subscription.blocking.durable.catchup.StartAtTime` as a help to the `CatchupSubscriptionModel` to easier specify an `OffsetDateTime` or "beginning of time" when starting a subscription catchup subscription model. Before you had to do:
```java
subscriptionModel.subscribe("myId", StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime()), System.out::println);
```
but now you can do:
```java
subscriptionModel.subscribe("myId", StartAtTime.beginningOfTime(), System.out::println);
```
which is shorter. You're using Kotlin you can import `org.occurrent.subscription.blocking.durable.catchup.beginningOfTime` and do:
```kotlin
subscriptionModel.subscribe("myId", StartAt.beginningOfTime(), ::println)
```
* Changed the default behavior of `CatchupSubscriptionModel`. Before it replayed all historic events by default if no specific start at position was supplied, but now it delegates to the wrapped subscription and no historic events will be replayed. Instead, you need to explicitly specify `beggingOfTime` or an `OffsetDateTime` as the start position. For example:
```java
subscriptionModel.subscribe("myId", StartAtTime.beginningOfTime(), System.out::println);
```
### 0.17.2 (2024-02-27)
* Fixed issue in CompetingConsumerSubscriptionModel in which it failed to reacquire consumption rights in some cases where MongoDB connection was lost.
Expand Down
86 changes: 85 additions & 1 deletion subscription/util/blocking/catchup-subscription/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>subscription-util-blocking</artifactId>
<groupId>org.occurrent</groupId>
Expand Down Expand Up @@ -54,6 +55,11 @@
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib</artifactId>
<optional>true</optional>
</dependency>

<!-- Test -->
<dependency>
Expand Down Expand Up @@ -127,4 +133,82 @@
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>compile</id>
<phase>process-sources</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<sourceDirs>
<source>src/main/java</source>
<source>src/main/kotlin</source>
<source>src/main/resources</source>
</sourceDirs>
</configuration>
</execution>
<execution>
<id>test-compile</id>
<phase>test-compile</phase>
<goals>
<goal>test-compile</goal>
</goals>
<configuration>
<sourceDirs>
<source>src/test/java</source>
<source>src/test/kotlin</source>
<source>src/test/resources</source>
</sourceDirs>
</configuration>
</execution>
</executions>
<configuration>
<args>
<arg>-Xjsr305=strict</arg>
</args>
<jvmTarget>17</jvmTarget>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<plugin>
<!-- See https://github.com/Kotlin/dokka#using-the-maven-plugin for more examples-->
<groupId>org.jetbrains.dokka</groupId>
<artifactId>dokka-maven-plugin</artifactId>
<version>${dokka.version}</version>
<executions>
<execution>
<phase>prepare-package</phase>
<goals>
<goal>javadocJar</goal>
</goals>
</execution>
</executions>
<configuration>
<dokkaPlugins>
<plugin>
<groupId>org.jetbrains.dokka</groupId>
<artifactId>kotlin-as-java-plugin</artifactId>
<version>${dokka.version}</version>
</plugin>
</dokkaPlugins>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,26 @@
import static org.occurrent.time.internal.RFC3339.RFC_3339_DATE_TIME_FORMATTER;

/**
* A {@link SubscriptionModel} that reads historic cloud events from the all event streams (see {@link EventStoreQueries#all()}) until caught up with the
* A {@link SubscriptionModel} that can read historic cloud events from the all event streams (see {@link EventStoreQueries#all()}) until caught up with the
* {@link PositionAwareSubscriptionModel#globalSubscriptionPosition()} of the {@code subscription} (you probably want to narrow the historic set events of events
* by using a {@link Filter} when subscribing). It'll automatically switch over to the supplied {@code subscription} when all history events are read and the subscription has caught-up.
* <br>
* <br>
* by using a {@link Filter} when subscribing). It'll automatically switch over to the wrapped {@code subscription model} when all history events are read and the subscription has caught-up.
* <br><b>Important:</b>&nbsp;The subscription model will only stream historic events if started with a {@link TimeBasedSubscriptionPosition}, by default (i.e. if {@code StartAt.subscriptionModelDefault() is used}),
* it'll NOT replay historic events, but instead delegate to the wrapped subscription model. Thus, to start the {@link CatchupSubscriptionModel} and make it replay historic events you can start it like this:
* <pre>
* var subscriptionModel = new CatchupSubscriptionModel(..);
* // All examples below are equivalent:
* subscriptionModel.subscribeFromBeginningOfTime("subscriptionId", e -> System.out.println("Event: " + e);
* subscriptionModel.subscribe("subscriptionId", StartAtTime.beginningOfTime(), e -> System.out.println("Event: " + e);
* subscriptionModel.subscribe("subscriptionId", StartAt.subscriptionPosition(TimeBasedSubscription.beginningOfTime()), e -> System.out.println("Event: " + e);
* </pre>
*
* If you're using Kotlin you can import the extension functions from {@code org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModelExtensions.kt} and do:
* <pre>
* subscriptionModel.subscribe("subscriptionId", StartAt.beginningOfTime()) { e ->
* println("Event: $e")
* }
* </pre>
*
* <p>
* Note that the implementation uses an in-memory cache (default size is {@value #DEFAULT_CACHE_SIZE} but this can be configured using a {@link CatchupSubscriptionModelConfig})
* to reduce the number of duplicate event when switching from historic events to the current cloud event position. It's highly recommended that the application logic is idempotent if the
Expand Down Expand Up @@ -100,6 +115,28 @@ public CatchupSubscriptionModel(PositionAwareSubscriptionModel subscriptionModel
this.config = config;
}

/**
* Shortcut to start subscribing to events matching the supplied filter from begging of time. Same as doing:
*
* <pre>
* subscriptionModel.subscribe(&lt;subscriptionId&gt;, &lt;filter&gt;, StartAtTime.beginningOfTime(), &lt;action&gt;);
* </pre>
*/
public Subscription subscribeFromBeginningOfTime(String subscriptionId, SubscriptionFilter filter, Consumer<CloudEvent> action) {
return subscribe(subscriptionId, filter, StartAtTime.beginningOfTime(), action);
}

/**
* Shortcut to start subscribing to <i>all</i> events from begging of time. Same as doing:
*
* <pre>
* subscriptionModel.subscribe(&lt;subscriptionId&gt;, StartAtTime.beginningOfTime(), &lt;action&gt;);
* </pre>
*/
public Subscription subscribeFromBeginningOfTime(String subscriptionId, Consumer<CloudEvent> action) {
return subscribe(subscriptionId, StartAtTime.beginningOfTime(), action);
}

@Override
public Subscription subscribe(String subscriptionId, SubscriptionFilter filter, StartAt startAt, Consumer<CloudEvent> action) {
Objects.requireNonNull(startAt, "Start at supplier cannot be null");
Expand All @@ -110,10 +147,14 @@ public Subscription subscribe(String subscriptionId, SubscriptionFilter filter,

final StartAt firstStartAt;
if (startAt.isDefault()) {
firstStartAt = StartAt.dynamic(() -> {
SubscriptionPosition subscriptionPosition = returnIfSubscriptionPositionStorageConfigIs(UseSubscriptionPositionInStorage.class, cfg -> cfg.storage().read(subscriptionId)).orElse(null);
return subscriptionPosition == null ? StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime()) : StartAt.subscriptionPosition(subscriptionPosition);
});
// By default, we check if there's a subscription position stored for this subscription, if so we resume from there, otherwise,
// delegate to the parent subscription model.
SubscriptionPosition subscriptionPosition = returnIfSubscriptionPositionStorageConfigIs(UseSubscriptionPositionInStorage.class, cfg -> cfg.storage().read(subscriptionId)).orElse(null);
if (subscriptionPosition == null) {
return getDelegatedSubscriptionModel().subscribe(subscriptionId, filter, startAt, action);
} else {
firstStartAt = StartAt.subscriptionPosition(subscriptionPosition);
}
} else if (startAt.isDynamic()) {
StartAt startAtGeneratedByDynamic = startAt.get(generateSubscriptionModelContext());
if (startAtGeneratedByDynamic == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
*
* Copyright 2024 Johan Haleby
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.occurrent.subscription.blocking.durable.catchup;

import java.time.OffsetDateTime;


/**
* Utility functions for easily specifying start position for the {@link CatchupSubscriptionModel}.
* <p>
* If you're using Kotlin, use the extension functions in {@code org.occurrent.subscription.blocking.durable.catchup.CatchupSubscriptionModelExtensions.kt} file instead.
* </p>
*/
public class StartAtTime {
public static org.occurrent.subscription.StartAt beginningOfTime() {
return org.occurrent.subscription.StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.beginningOfTime());
}

public static org.occurrent.subscription.StartAt offsetDateTime(OffsetDateTime offsetDateTime) {
return org.occurrent.subscription.StartAt.subscriptionPosition(TimeBasedSubscriptionPosition.from(offsetDateTime));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
*
* Copyright 2024 Johan Haleby
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.occurrent.subscription.blocking.durable.catchup

import org.occurrent.subscription.StartAt
import java.time.OffsetDateTime

/**
* Start at beginning of time.
*/
fun StartAt.beginningOfTime(): StartAt = StartAtTime.beginningOfTime()

/**
* Start at a specific `offsetDateTime`
*/
fun StartAt.offsetDateTime(offsetDateTime: OffsetDateTime): StartAt = StartAtTime.offsetDateTime(offsetDateTime)
Loading

0 comments on commit 9d43565

Please sign in to comment.