Skip to content

Commit

Permalink
ARTEMIS-3178 Page Limitting (max messages and max bytes)
Browse files Browse the repository at this point in the history
I am adding three attributes to Address-settings:

* page-limit-bytes: Number of bytes. We will convert this metric into max number of pages internally by dividing max-bytes / page-size. It will allow a max based on an estimate.
* page-limit-messages: Number of messages
* page-full-message-policy: fail or drop

We will now allow paging, until these max values and then fail or drop messages.

Once these values are retracted, the address will remain full until a period where cleanup is kicked in by paging. So these values may have a certain delay on being applied, but they should always be cleared once cleanup happened.
  • Loading branch information
clebertsuconic committed Jan 30, 2023
1 parent b5ae95c commit 764db34
Show file tree
Hide file tree
Showing 20 changed files with 1,219 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.settings.impl;

public enum PageFullMessagePolicy {
DROP, FAIL
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;

Expand Down Expand Up @@ -195,6 +196,18 @@ public void validate(final String name, final Object value) {
}
};

public static final Validator PAGE_FULL_MESSAGE_POLICY_TYPE = new Validator() {
@Override
public void validate(final String name, final Object value) {
String val = (String) value;
if (val == null ||
!val.equals(PageFullMessagePolicy.DROP.toString()) &&
!val.equals(PageFullMessagePolicy.FAIL.toString())) {
throw ActiveMQMessageBundle.BUNDLE.invalidAddressFullPolicyType(val);
}
}
};

public static final Validator SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = new Validator() {
@Override
public void validate(final String name, final Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
Expand Down Expand Up @@ -218,6 +219,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {

private static final String ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME = "address-full-policy";

private static final String PAGE_FULL_MESSAGE_POLICY_NODE_NAME = "page-full-policy";

private static final String MAX_READ_PAGE_BYTES_NODE_NAME = "max-read-page-bytes";

private static final String MAX_READ_PAGE_MESSAGES_NODE_NAME = "max-read-page-messages";
Expand All @@ -226,6 +229,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {

private static final String PAGE_MAX_CACHE_SIZE_NODE_NAME = "page-max-cache-size";

private static final String PAGE_LIMIT_BYTES_NODE_NAME = "page-limit-bytes";

private static final String PAGE_LIMIT_MESSAGES_NODE_NAME = "page-limit-messages";

private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME = "message-counter-history-day-limit";

private static final String LVQ_NODE_NAME = "last-value-queue";
Expand Down Expand Up @@ -1281,13 +1288,26 @@ protected Pair<String, AddressSettings> parseAddressSettings(final Node node) {
ActiveMQServerLogger.LOGGER.pageMaxSizeUsed();
}
addressSettings.setPageCacheMaxSize(XMLUtil.parseInt(child));
} else if (PAGE_LIMIT_BYTES_NODE_NAME.equalsIgnoreCase(name)) {
long pageLimitBytes = ByteUtil.convertTextBytes(getTrimmedTextContent(child));
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PAGE_LIMIT_BYTES_NODE_NAME, pageLimitBytes);
addressSettings.setPageLimitBytes(pageLimitBytes);
} else if (PAGE_LIMIT_MESSAGES_NODE_NAME.equalsIgnoreCase(name)) {
long pageLimitMessages = ByteUtil.convertTextBytes(getTrimmedTextContent(child));
Validators.MINUS_ONE_OR_POSITIVE_INT.validate(PAGE_LIMIT_MESSAGES_NODE_NAME, pageLimitMessages);
addressSettings.setPageLimitMessages(pageLimitMessages);
} else if (MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setMessageCounterHistoryDayLimit(XMLUtil.parseInt(child));
} else if (ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.ADDRESS_FULL_MESSAGE_POLICY_TYPE.validate(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME, value);
AddressFullMessagePolicy policy = Enum.valueOf(AddressFullMessagePolicy.class, value);
addressSettings.setAddressFullMessagePolicy(policy);
} else if (PAGE_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) {
String value = getTrimmedTextContent(child);
Validators.PAGE_FULL_MESSAGE_POLICY_TYPE.validate(PAGE_FULL_MESSAGE_POLICY_NODE_NAME, value);
PageFullMessagePolicy policy = Enum.valueOf(PageFullMessagePolicy.class, value);
addressSettings.setPageFullMessagePolicy(policy);
} else if (LVQ_NODE_NAME.equalsIgnoreCase(name) || DEFAULT_LVQ_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setDefaultLastValueQueue(XMLUtil.parseBoolean(child));
} else if (DEFAULT_LVQ_KEY_NODE_NAME.equalsIgnoreCase(name)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;

Expand Down Expand Up @@ -60,6 +62,19 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener

AddressFullMessagePolicy getAddressFullMessagePolicy();

PageFullMessagePolicy getPageFullMessagePolicy();

Long getPageLimitMessages();

Long getPageLimitBytes();

/** Callback to be used by a counter when the Page is full for that counter */
void pageFull(PageSubscription subscription);

boolean isPageFull();

void checkPageLimit(long numberOfMessages);

long getFirstPage();

int getPageSizeBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.paging.cursor;

import java.util.concurrent.Future;
import java.util.function.Consumer;

import org.apache.activemq.artemis.core.filter.Filter;
Expand Down Expand Up @@ -46,7 +47,7 @@ public interface PageCursorProvider {

void flushExecutors();

void scheduleCleanup();
Future<Boolean> scheduleCleanup();

void disableCleanup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -37,12 +38,14 @@
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.ArtemisCloseable;
import org.apache.activemq.artemis.utils.SimpleFutureImpl;
import org.apache.activemq.artemis.utils.collections.ConcurrentLongHashMap;
import org.apache.activemq.artemis.utils.collections.LinkedList;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public class PageCursorProviderImpl implements PageCursorProvider {
Expand Down Expand Up @@ -179,11 +182,14 @@ public void close(PageSubscription cursor) {
}

@Override
public void scheduleCleanup() {
public Future<Boolean> scheduleCleanup() {
final SimpleFutureImpl<Boolean> future = new SimpleFutureImpl<>();
if (!cleanupEnabled || scheduledCleanup.intValue() > 2) {
// Scheduled cleanup was already scheduled before.. never mind!
// or we have cleanup disabled
return;
// Scheduled cleanup was already scheduled before.
// On that case just flush the executor returning the future.set(true)
// after any previous scheduled cleanup is finished.
pagingStore.execute(() -> future.set(true));
return future;
}

scheduledCleanup.incrementAndGet();
Expand All @@ -199,9 +205,12 @@ public void run() {
} finally {
storageManager.clearContext();
scheduledCleanup.decrementAndGet();
future.set(true);
}
}
});

return future;
}

/**
Expand Down Expand Up @@ -241,6 +250,22 @@ public void resumeCleanup() {
scheduleCleanup();
}

private long getNumberOfMessagesOnSubscriptions() {
AtomicLong largerCounter = new AtomicLong();
activeCursors.forEach((id, sub) -> {
long value = sub.getCounter().getValue();
if (value > largerCounter.get()) {
largerCounter.set(value);
}
});

return largerCounter.get();
}

void checkClearPageLimit() {
pagingStore.checkPageLimit(getNumberOfMessagesOnSubscriptions());
}

protected void cleanup() {

if (!countersRebuilt) {
Expand Down Expand Up @@ -299,6 +324,10 @@ protected void cleanup() {
// Then we do some check on eventual pages that can be already removed but they are away from the streaming
cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList, minPage, firstPage);

if (pagingStore.isPageFull()) {
checkClearPageLimit();
}

assert pagingStore.getNumberOfPages() >= 0;

if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == null || pagingStore.getCurrentPage().getNumberOfMessages() == 0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class PageSubscriptionCounterImpl extends BasePagingCounter {

private PageSubscription subscription;

private PagingStore pagingStore;

private final StorageManager storage;

private volatile long value;
Expand Down Expand Up @@ -187,11 +190,16 @@ private void process(int add, long size) {
if (logger.isTraceEnabled()) {
logger.trace("process subscription={} add={}, size={}", subscriptionID, add, size);
}
valueUpdater.addAndGet(this, add);
long value = valueUpdater.addAndGet(this, add);
persistentSizeUpdater.addAndGet(this, size);
if (add > 0) {
addedUpdater.addAndGet(this, add);
addedPersistentSizeUpdater.addAndGet(this, size);

/// we could have pagingStore null on tests, so we need to validate if pagingStore != null before anything...
if (pagingStore != null && pagingStore.getPageFullMessagePolicy() != null && !pagingStore.isPageFull()) {
checkAdd(value);
}
}

if (isRebuilding()) {
Expand All @@ -200,6 +208,15 @@ private void process(int add, long size) {
}
}

private void checkAdd(long numberOfMessages) {
Long pageLimitMessages = pagingStore.getPageLimitMessages();
if (pageLimitMessages != null) {
if (numberOfMessages >= pageLimitMessages.longValue()) {
pagingStore.pageFull(this.subscription);
}
}
}

@Override
public void delete() throws Exception {
Transaction tx = new TransactionImpl(storage);
Expand Down Expand Up @@ -420,6 +437,7 @@ public void addAndGet(int count, long persistentSize) {
@Override
public PageSubscriptionCounter setSubscription(PageSubscription subscription) {
this.subscription = subscription;
this.pagingStore = subscription.getPagingStore();
return this;
}
}
Loading

0 comments on commit 764db34

Please sign in to comment.