Skip to content

Commit

Permalink
feat(store): implement DataStore#trimStream
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <admin@lv5.moe>
  • Loading branch information
ShadowySpirits committed Nov 6, 2023
1 parent 7b0a839 commit 15f3397
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.util.Lifecycle;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.ControllerServiceImpl;
import com.automq.rocketmq.controller.server.MetadataStoreBuilder;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.metadata.DefaultProxyMetadataService;
import com.automq.rocketmq.metadata.DefaultStoreMetadataService;
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
Expand Down Expand Up @@ -75,7 +75,7 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
MessageStoreImpl messageStore = MessageStoreBuilder.build(brokerConfig.store(), brokerConfig.s3Stream(), storeMetadataService, dlqService);
this.messageStore = messageStore;

DataStore dataStore = new DataStoreFacade(messageStore.getS3ObjectOperator(), messageStore.getTopicQueueManager());
DataStore dataStore = new DataStoreFacade(messageStore.streamStore(), messageStore.s3ObjectOperator(), messageStore.topicQueueManager());
metadataStore.setDataStore(dataStore);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ public interface DataStore {
*
* @param streamId ID of the stream to trim
* @param offset Minimum offset of the stream after trim operation
* @return True if successful; failure future otherwise.
*/
CompletableFuture<Boolean> trimStream(long streamId, long offset);
CompletableFuture<Void> trimStream(long streamId, long offset);

/**
* Delete a list of S3 objects by object id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.common.model.generated.FlatMessage;
import com.automq.rocketmq.store.api.LogicQueue;
import com.automq.rocketmq.store.api.LogicQueueManager;
import com.automq.rocketmq.store.api.MessageStore;
import com.automq.rocketmq.store.api.S3ObjectOperator;
import com.automq.rocketmq.store.model.message.AckResult;
import com.automq.rocketmq.store.model.message.ChangeInvisibleDurationResult;
import com.automq.rocketmq.store.model.message.ClearRetryMessagesResult;
Expand All @@ -40,7 +38,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;

public class MockMessageStore implements MessageStore {
Expand All @@ -65,16 +62,6 @@ public void shutdown() {

}

@Override
public LogicQueueManager getTopicQueueManager() {
throw new NotImplementedException();
}

@Override
public S3ObjectOperator getS3ObjectOperator() {
throw new NotImplementedException();
}

@Override
public CompletableFuture<PopResult> pop(long consumerGroupId, long topicId, int queueId, Filter filter,
int batchSize, boolean fifo, boolean retry, long invisibleDuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.store.api.LogicQueueManager;
import com.automq.rocketmq.store.api.S3ObjectOperator;
import com.automq.rocketmq.store.api.StreamStore;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public class DataStoreFacade implements DataStore {

private final StreamStore streamStore;
private final S3ObjectOperator s3ObjectOperator;

private final LogicQueueManager logicQueueManager;

public DataStoreFacade(S3ObjectOperator s3ObjectOperator, LogicQueueManager logicQueueManager) {
public DataStoreFacade(StreamStore streamStore, S3ObjectOperator s3ObjectOperator,
LogicQueueManager logicQueueManager) {
this.streamStore = streamStore;

Check warning on line 35 in store/src/main/java/com/automq/rocketmq/store/DataStoreFacade.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/DataStoreFacade.java#L34-L35

Added lines #L34 - L35 were not covered by tests
this.s3ObjectOperator = s3ObjectOperator;
this.logicQueueManager = logicQueueManager;
}
Expand All @@ -40,8 +43,8 @@ public CompletableFuture<Void> closeQueue(long topicId, int queueId) {
}

@Override
public CompletableFuture<Boolean> trimStream(long streamId, long offset) {
return null;
public CompletableFuture<Void> trimStream(long streamId, long offset) {
return streamStore.trim(streamId, offset);

Check warning on line 47 in store/src/main/java/com/automq/rocketmq/store/DataStoreFacade.java

View check run for this annotation

Codecov / codecov/patch

store/src/main/java/com/automq/rocketmq/store/DataStoreFacade.java#L47

Added line #L47 was not covered by tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ public MessageStoreImpl(StoreConfig config, StreamStore streamStore,
this.s3ObjectOperator = s3ObjectOperator;
}

@Override
public LogicQueueManager getTopicQueueManager() {
public LogicQueueManager topicQueueManager() {
return logicQueueManager;
}

Expand All @@ -93,8 +92,7 @@ public StreamStore streamStore() {
/**
* @return {@link S3ObjectOperator} instance
*/
@Override
public S3ObjectOperator getS3ObjectOperator() {
public S3ObjectOperator s3ObjectOperator() {
return s3ObjectOperator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@

public interface MessageStore extends Lifecycle {

LogicQueueManager getTopicQueueManager();

S3ObjectOperator getS3ObjectOperator();

/**
* Pop message from specified topic and queue.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void run() {
waitForRunning(config.periodicExporterIntervalInMills());

StreamStore streamStore = messageStore.streamStore();
DefaultLogicQueueManager manager = (DefaultLogicQueueManager) messageStore.getTopicQueueManager();
DefaultLogicQueueManager manager = (DefaultLogicQueueManager) messageStore.topicQueueManager();
Set<LagRecord> newLagRecordSet = Sets.newConcurrentHashSet();
manager.logicQueueMap().forEach((topicQueueId, logicQueueFuture) -> {
if (!logicQueueFuture.isDone() || logicQueueFuture.isCompletedExceptionally()) {
Expand Down Expand Up @@ -148,7 +148,6 @@ public void initStaticMetrics(Meter meter) {

@Override
public void initDynamicMetrics(Meter meter) {
StoreMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
consumerLagMessages = meter.gaugeBuilder(GAUGE_CONSUMER_LAG_MESSAGES)
.setDescription("Consumer lag messages")
.ofLongs()
Expand Down

0 comments on commit 15f3397

Please sign in to comment.