Skip to content

Commit

Permalink
Issue #9387 has been implemented.
Browse files Browse the repository at this point in the history
  • Loading branch information
andrii0lomakin committed Sep 7, 2020
1 parent fd726ee commit bfa0d60
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1456,6 +1456,24 @@ public void change(final Object iCurrentValue, final Object iNewValue) {
true,
false),

SERVER_HEAP_USAGE_LIMIT(
"server.heapUsageLimit",
"Once server heap usage reaches provided limit (in percent), "
+ "sever will stop to accept new requests from client "
+ "till heap usage will drop bellow given limit",
Integer.class,
85,
false,
false),

SERVER_HEAP_USAGE_SLEEP_INTERVAL(
"server.heapUsageSleepInterval",
"Once heap usage reaches threshold server will wait given time in ms. before checking memory usage again",
Integer.class,
20,
false,
false),

// DISTRIBUTED
/** @Since 2.2.18 */
DISTRIBUTED_DUMP_STATS_EVERY(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
import com.orientechnologies.orient.server.distributed.ODistributedServerManager;
import com.orientechnologies.orient.server.distributed.config.ODistributedConfig;
import com.orientechnologies.orient.server.handler.OConfigurableHooksManager;
import com.orientechnologies.orient.server.memorymanager.MXBeanMemoryManager;
import com.orientechnologies.orient.server.memorymanager.MemoryManager;
import com.orientechnologies.orient.server.memorymanager.NoOpMemoryManager;
import com.orientechnologies.orient.server.network.OServerNetworkListener;
import com.orientechnologies.orient.server.network.OServerSocketFactory;
import com.orientechnologies.orient.server.network.protocol.ONetworkProtocol;
Expand Down Expand Up @@ -116,6 +119,7 @@ public class OServer {
private OClientConnectionManager clientConnectionManager;
private OHttpSessionManager httpSessionManager;
private OPushManager pushManager;
private volatile MemoryManager memoryManager;
private ClassLoader extensionClassLoader;
private OTokenHandler tokenHandler;
private OSystemDatabase systemDatabase;
Expand Down Expand Up @@ -448,6 +452,16 @@ public OServer activate()
final OServerConfiguration configuration = serverCfg.getConfiguration();

tokenHandler = new OTokenHandlerImpl(this);
if (OGlobalConfiguration.SERVER_HEAP_USAGE_LIMIT.getValueAsInteger() > 0) {
memoryManager =
new MXBeanMemoryManager(
OGlobalConfiguration.SERVER_HEAP_USAGE_LIMIT.getValueAsInteger(),
OGlobalConfiguration.SERVER_HEAP_USAGE_SLEEP_INTERVAL.getValueAsInteger());
} else {
memoryManager = new NoOpMemoryManager();
}

memoryManager.start();

if (configuration.network != null) {
// REGISTER/CREATE SOCKET FACTORIES
Expand Down Expand Up @@ -603,6 +617,7 @@ protected boolean deinit() {
pushManager.shutdown();
clientConnectionManager.shutdown();
httpSessionManager.shutdown();
memoryManager.shutdown();

if (pluginManager != null) pluginManager.shutdown();

Expand Down Expand Up @@ -1284,6 +1299,10 @@ public OrientDB getContext() {
return context;
}

public MemoryManager getMemoryManager() {
return memoryManager;
}

public void dropDatabase(String databaseName) {
if (databases.exists(databaseName, null, null)) {
databases.drop(databaseName, null, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.orientechnologies.orient.server.memorymanager;

import com.orientechnologies.common.log.OLogManager;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryNotificationInfo;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.MemoryType;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.LockSupport;
import javax.management.ListenerNotFoundException;
import javax.management.Notification;
import javax.management.NotificationEmitter;
import javax.management.NotificationListener;
import javax.management.openmbean.CompositeData;

public final class MXBeanMemoryManager implements NotificationListener, MemoryManager {
private final int memoryThreshold;
private final int sleepInterval;

private final ConcurrentLinkedQueue<String> poolsWithOverhead = new ConcurrentLinkedQueue<>();
private volatile HashMap<String, MemoryPoolMXBean> memoryBeans;

public MXBeanMemoryManager(final int memoryThreshold, final int sleepInterval) {
this.memoryThreshold = memoryThreshold;
this.sleepInterval = sleepInterval;

assert sleepInterval >= 0;
assert memoryThreshold >= 0 && memoryThreshold < 100;
}

@Override
public void start() {
if (memoryThreshold <= 0) {
return;
}

final HashMap<String, MemoryPoolMXBean> beansByName = new HashMap<>();

final List<MemoryPoolMXBean> mxBeans = ManagementFactory.getMemoryPoolMXBeans();
for (final MemoryPoolMXBean mxBean : mxBeans) {
if (mxBean.isUsageThresholdSupported() && mxBean.getType() == MemoryType.HEAP) {
final long maxMemory = mxBean.getUsage().getMax();
if (maxMemory > 0) {
final long threshold = maxMemory * memoryThreshold / 100;
mxBean.setUsageThreshold(threshold);

final NotificationEmitter emitter = (NotificationEmitter) mxBean;
emitter.addNotificationListener(this, null, null);

beansByName.put(mxBean.getName(), mxBean);

OLogManager.instance()
.infoNoDb(
this,
"Memory usage threshold for memory pool '%s' is set to %d bytes",
mxBean.getName(),
threshold);
}
}
}

this.memoryBeans = beansByName;
}

@Override
public void shutdown() {
for (final MemoryPoolMXBean mxBean : memoryBeans.values()) {
final NotificationEmitter emitter = (NotificationEmitter) mxBean;
try {
emitter.removeNotificationListener(this);
} catch (final ListenerNotFoundException e) {
throw new IllegalStateException(
"Memory bean "
+ mxBean.getName()
+ " was processed by memory manager but manager was not added as a listener",
e);
}
}
}

@Override
public void checkAndWaitMemoryThreshold() {
if (poolsWithOverhead.isEmpty()) {
return;
}

while (!poolsWithOverhead.isEmpty()) {
final String poolName = poolsWithOverhead.peek();
final MemoryPoolMXBean bean = memoryBeans.get(poolName);

while (bean.isUsageThresholdExceeded()) {
LockSupport.parkNanos(sleepInterval * 1_000_000L);
}

poolsWithOverhead.poll();
}
}

@Override
public void handleNotification(final Notification notification, final Object handback) {
final String notificationType = notification.getType();
if (notificationType.equals(MemoryNotificationInfo.MEMORY_THRESHOLD_EXCEEDED)) {
final CompositeData cd = (CompositeData) notification.getUserData();
final MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
poolsWithOverhead.add(info.getPoolName());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.orientechnologies.orient.server.memorymanager;

public interface MemoryManager {
void start();

void shutdown();

void checkAndWaitMemoryThreshold();
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.orientechnologies.orient.server.memorymanager;

public final class NoOpMemoryManager implements MemoryManager {
@Override
public void start() {}

@Override
public void shutdown() {}

@Override
public void checkAndWaitMemoryThreshold() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ protected void execute() throws Exception {
// do not remove this or we will get deadlock upon shutdown.
if (isShutdownFlag()) return;

server.getMemoryManager().checkAndWaitMemoryThreshold();

clientTxId = 0;
okSent = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ protected void execute() throws Exception {
connection.getData().commandInfo = "Listening";
connection.getData().commandDetail = null;

server.getMemoryManager().checkAndWaitMemoryThreshold();

try {
channel.socket.setSoTimeout(socketTimeout);
connection.getStats().lastCommandReceived = -1;
Expand Down

0 comments on commit bfa0d60

Please sign in to comment.