Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cacheMissPercentage metric #18208

Merged
merged 9 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/common/src/main/java/alluxio/metrics/MetricKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,13 @@ public static String getSyncMetricName(long mountId) {
.setDescription("Bytes read per minute throughput from all Alluxio UFSes by all workers")
.setMetricType(MetricType.GAUGE)
.build();

public static final MetricKey CLUSTER_BYTES_READ_CACHE =
new Builder("Cluster.BytesReadCache")
.setDescription("Total number of bytes read from all worker's cache")
.setMetricType(MetricType.COUNTER)
.build();

public static final MetricKey CLUSTER_BYTES_WRITTEN_REMOTE =
new Builder("Cluster.BytesWrittenRemote")
.setDescription("Total number of bytes written to workers via network (RPC). "
Expand Down Expand Up @@ -2003,6 +2010,14 @@ public static String getSyncMetricName(long mountId) {
.setMetricType(MetricType.METER)
.setIsClusterAggregated(false)
.build();

public static final MetricKey WORKER_BYTES_READ_CACHE =
new Builder("Worker.BytesReadCache")
.setDescription("Total number of bytes read from the worker's cache")
.setMetricType(MetricType.COUNTER)
.setIsClusterAggregated(true)
.build();

public static final MetricKey WORKER_BYTES_WRITTEN_DIRECT =
new Builder("Worker.BytesWrittenDirect")
.setDescription("Total number of bytes written to this worker "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,8 +988,11 @@ public Response getWebUIMetrics() {
.setTotalBytesReadRemote(FormatUtils.getSizeFromBytes(bytesReadRemote))
.setTotalBytesReadUfs(FormatUtils.getSizeFromBytes(bytesReadUfs));

Long bytesReadCache = counters.get(
MetricKey.CLUSTER_BYTES_READ_CACHE.getName()).getCount();

// cluster cache hit and miss
long bytesReadTotal = bytesReadLocal + bytesReadRemote + bytesReadDomainSocket;
long bytesReadTotal = bytesReadLocal + bytesReadCache + bytesReadUfs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the metrics are updated in page store only. If the user uses the block store, then these bytesReadCache + bytesReadUfs will be zero. Can you also update the metrics in the block store?

double cacheHitLocalPercentage =
(bytesReadTotal > 0)
? (100D * (bytesReadLocal + bytesReadDomainSocket) / bytesReadTotal) : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ public void initMetricKeys() {
mClusterCounters.putIfAbsent(new ClusterCounterKey(InstanceType.WORKER,
MetricKey.WORKER_BYTES_READ_DOMAIN.getMetricName()),
MetricsSystem.counter(MetricKey.CLUSTER_BYTES_READ_DOMAIN.getName()));
mClusterCounters.putIfAbsent(new ClusterCounterKey(InstanceType.WORKER,
MetricKey.WORKER_BYTES_READ_CACHE.getMetricName()),
MetricsSystem.counter(MetricKey.CLUSTER_BYTES_READ_CACHE.getName()));
mClusterCounters.putIfAbsent(new ClusterCounterKey(InstanceType.WORKER,
MetricKey.WORKER_BYTES_WRITTEN_REMOTE.getMetricName()),
MetricsSystem.counter(MetricKey.CLUSTER_BYTES_WRITTEN_REMOTE.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ boolean heartbeat(
// Send the heartbeat and execute the response
Command cmdFromMaster = null;
List<alluxio.grpc.Metric> metrics = MetricsSystem.reportWorkerMetrics();

try {
cmdFromMaster = mMasterClient.heartbeat(workerId, storeMeta.getCapacityBytesOnTiers(),
storeMeta.getUsedBytesOnTiers(), blockReport.getRemovedBlocks(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.io.DelegatingBlockReader;
import alluxio.worker.block.io.LocalFileBlockReader;
import alluxio.worker.block.io.MetricAccountingBlockReader;
import alluxio.worker.block.io.StoreBlockReader;
import alluxio.worker.block.io.StoreBlockWriter;
import alluxio.worker.block.management.DefaultStoreLoadTracker;
Expand Down Expand Up @@ -221,10 +223,11 @@ public BlockReader createBlockReader(long sessionId, long blockId, long offset)
}

try {
BlockReader reader = new StoreBlockReader(sessionId, block);
LocalFileBlockReader reader = new StoreBlockReader(sessionId, block);
((FileChannel) reader.getChannel()).position(offset);
accessBlock(sessionId, blockId);
return new DelegatingBlockReader(reader, blockLock);
BlockReader mareader = new MetricAccountingBlockReader(reader);
return new DelegatingBlockReader(mareader, blockLock);
} catch (Exception e) {
blockLock.close();
throw new IOException(format("Failed to get local block reader, sessionId=%d, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,19 @@ public String toString() {
}
}

private static class BytesReadMetricKey {
private final AlluxioURI mUri;
private final String mUser;
/**
* create an BytesReadMetricKey.
*/
public static class BytesReadMetricKey {
public final AlluxioURI mUri;
public final String mUser;

BytesReadMetricKey(AlluxioURI uri, String user) {
/**
* create an instance of the key class.
* @param uri
* @param user
*/
public BytesReadMetricKey(AlluxioURI uri, String user) {
mUri = uri;
mUser = user;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.worker.block.io;

import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

/**
* An reader class with metrics.
*/
public class MetricAccountingBlockReader extends BlockReader {
private final LocalFileBlockReader mBlockReader;

/**
* A decorator of BlockReader.
* @param mblockReader block reader
*/
public MetricAccountingBlockReader(LocalFileBlockReader mblockReader) {
mBlockReader = mblockReader;
}

@Override
public ByteBuffer read(long offset, long length) throws IOException {
ByteBuffer buffer = mBlockReader.read(offset, length);
int bytesReadFromCache = buffer.limit() - buffer.position();
MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesReadFromCache);
return buffer;
}

@Override
public long getLength() {
return mBlockReader.getLength();
}

@Override
public ReadableByteChannel getChannel() {
return new ReadableByteChannel() {
private final ReadableByteChannel mDelegate = mBlockReader.getChannel();
@Override
public int read(ByteBuffer dst) throws IOException {
int bytesRead = mDelegate.read(dst);
if (bytesRead != -1) {
MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesRead);
}
return bytesRead;
}

@Override
public boolean isOpen() {
return mDelegate.isOpen();
}

@Override
public void close() throws IOException {
mDelegate.close();
}
};
}

@Override
public int transferTo(ByteBuf buf) throws IOException {
int bytesReadFromCache = mBlockReader.transferTo(buf);
if (bytesReadFromCache != -1) {
MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesReadFromCache);
}
return bytesReadFromCache;
}

@Override
public boolean isClosed() {
return mBlockReader.isClosed();
}

@Override
public String getLocation() {
return mBlockReader.getLocation();
}

@Override
public String toString() {
return mBlockReader.toString();
}

@Override
public void close() throws IOException {
mBlockReader.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
*/
@NotThreadSafe
public class PagedBlockReader extends BlockReader {

private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
private final long mPageSize;
private final CacheManager mCacheManager;
Expand Down Expand Up @@ -122,6 +121,7 @@ private long read(ByteBuf byteBuf, long offset, long length) throws IOException
bytesRead += bytesReadFromCache;
MetricsSystem.meter(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE.getName()).mark(bytesRead);
mReadFromLocalCache = true;
MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesReadFromCache);
} else {
if (!mUfsBlockReader.isPresent()) {
throw new AlluxioRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private BlockReader getBlockReader(PagedBlockMeta blockMeta, long offset,
}
final Optional<PagedUfsBlockReader> ufsBlockReader =
readOptions.map(opt -> new PagedUfsBlockReader(
mUfsManager, mUfsInStreamCache, blockMeta, offset, opt, mPageSize));
mUfsManager, mUfsInStreamCache, blockMeta, offset, opt, mPageSize));
return new PagedBlockReader(mCacheManager, blockMeta, offset, ufsBlockReader, mPageSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,24 @@
package alluxio.worker.page;

import alluxio.conf.PropertyKey;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.status.NotFoundException;
import alluxio.exception.status.UnavailableException;
import alluxio.metrics.MetricInfo;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.network.protocol.databuffer.NioDirectBufferPool;
import alluxio.resource.CloseableResource;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.IdUtils;
import alluxio.worker.block.UfsInputStreamCache;
import alluxio.worker.block.UnderFileSystemBlockStore.BytesReadMetricKey;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.meta.BlockMeta;

import com.codahale.metrics.Counter;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;

Expand All @@ -31,6 +39,8 @@
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Block reader that reads from UFS.
Expand All @@ -47,6 +57,9 @@ public class PagedUfsBlockReader extends BlockReader {
private long mLastPageIndex = -1;
private boolean mClosed = false;
private long mPosition;
private final ConcurrentMap<BytesReadMetricKey, Counter> mUfsBytesReadMetrics =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elements are only inserted into this map, but never queried

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map is used by mUfsBytesReadMetrics.computeIfAbsent().

new ConcurrentHashMap<>();
private final Counter mUfsBytesRead;

/**
* @param ufsManager
Expand All @@ -70,6 +83,23 @@ public PagedUfsBlockReader(UfsManager ufsManager,
mInitialOffset = offset;
mLastPage = ByteBuffer.allocateDirect((int) mPageSize);
mPosition = offset;
try {
UfsManager.UfsClient ufsClient = mUfsManager.get(mUfsBlockOptions.getMountId());
mUfsBytesRead = mUfsBytesReadMetrics.computeIfAbsent(
new BytesReadMetricKey(ufsClient.getUfsMountPointUri(), mUfsBlockOptions.getUser()),
key -> key.mUser == null
? MetricsSystem.counterWithTags(
MetricKey.WORKER_BYTES_READ_UFS.getName(),
MetricKey.WORKER_BYTES_READ_UFS.isClusterAggregated(),
MetricInfo.TAG_UFS, MetricsSystem.escape(key.mUri))
: MetricsSystem.counterWithTags(
MetricKey.WORKER_BYTES_READ_UFS.getName(),
MetricKey.WORKER_BYTES_READ_UFS.isClusterAggregated(),
MetricInfo.TAG_UFS, MetricsSystem.escape(key.mUri),
MetricInfo.TAG_USER, key.mUser));
} catch (UnavailableException | NotFoundException e) {
throw AlluxioRuntimeException.from(e);
}
}

@Override
Expand Down Expand Up @@ -145,6 +175,7 @@ public int readPageAtIndex(ByteBuffer buffer, long pageIndex) throws IOException
mLastPage.flip();
mLastPageIndex = pageIndex;
fillWithCachedPage(buffer, pageIndex * mPageSize, totalBytesRead);
mUfsBytesRead.inc(totalBytesRead);
return totalBytesRead;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Preconditions;

import java.util.Objects;
import javax.annotation.Nullable;

/**
* Options for reading a block from UFS.
Expand All @@ -26,12 +27,15 @@ public final class UfsBlockReadOptions {
private final long mOffsetInFile;
private final String mUfsPath;
private final boolean mCacheIntoAlluxio;
@Nullable private final String mUser;

UfsBlockReadOptions(long mountId, long offsetInFile, String ufsPath, boolean cacheIntoAlluxio) {
UfsBlockReadOptions(long mountId, long offsetInFile, String ufsPath, boolean cacheIntoAlluxio,
@Nullable String user) {
mMountId = mountId;
mOffsetInFile = offsetInFile;
mUfsPath = ufsPath;
mCacheIntoAlluxio = cacheIntoAlluxio;
mUser = user;
}

/**
Expand All @@ -47,7 +51,7 @@ public static UfsBlockReadOptions fromProto(Protocol.OpenUfsBlockOptions options
"missing offset in file for UFS block read");
Preconditions.checkArgument(options.hasUfsPath(), "missing UFS path for UFS block read");
return new UfsBlockReadOptions(options.getMountId(),
options.getOffsetInFile(), options.getUfsPath(), !options.getNoCache());
options.getOffsetInFile(), options.getUfsPath(), !options.getNoCache(), options.getUser());
}

/**
Expand All @@ -71,6 +75,12 @@ public String getUfsPath() {
return mUfsPath;
}

/**
*
* @return user
*/
public String getUser() { return mUser; }

/**
* @return whether the UFS block should be cached into Alluxio
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public void sequentialTransferMultipleTimes() throws Exception {
}

private static UfsBlockReadOptions createUfsBlockOptions(String ufsPath) {
return new UfsBlockReadOptions(MOUNT_ID, OFFSET_IN_FILE, ufsPath, true);
return new UfsBlockReadOptions(MOUNT_ID, OFFSET_IN_FILE, ufsPath, true, null);
}

private static void createTempUfsBlock(Path destPath, long blockSize) throws Exception {
Expand Down
Loading