Skip to content

Commit

Permalink
Fix cacheMissPercentage metric
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #18208
orig-commit: b37a96d
orig-commit-author: juanjuan2 <45665083+juanjuan2@users.noreply.github.com>

			pr-link: #18589
			change-id: cid-d24a6f324f7148cab4a30fbbf819510a1a314ebc
  • Loading branch information
Xenorith authored Apr 25, 2024
1 parent 48b985f commit 33ea120
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 13 deletions.
15 changes: 15 additions & 0 deletions core/common/src/main/java/alluxio/metrics/MetricKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,13 @@ public static String getSyncMetricName(long mountId) {
.setDescription("Bytes read per minute throughput from all Alluxio UFSes by all workers")
.setMetricType(MetricType.GAUGE)
.build();

public static final MetricKey CLUSTER_BYTES_READ_CACHE =
new Builder("Cluster.BytesReadCache")
.setDescription("Total number of bytes read from all worker's cache")
.setMetricType(MetricType.COUNTER)
.build();

public static final MetricKey CLUSTER_BYTES_WRITTEN_REMOTE =
new Builder("Cluster.BytesWrittenRemote")
.setDescription("Total number of bytes written to workers via network (RPC). "
Expand Down Expand Up @@ -2003,6 +2010,14 @@ public static String getSyncMetricName(long mountId) {
.setMetricType(MetricType.METER)
.setIsClusterAggregated(false)
.build();

public static final MetricKey WORKER_BYTES_READ_CACHE =
new Builder("Worker.BytesReadCache")
.setDescription("Total number of bytes read from the worker's cache")
.setMetricType(MetricType.COUNTER)
.setIsClusterAggregated(true)
.build();

public static final MetricKey WORKER_BYTES_WRITTEN_DIRECT =
new Builder("Worker.BytesWrittenDirect")
.setDescription("Total number of bytes written to this worker "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,8 +987,11 @@ public Response getWebUIMetrics() {
.setTotalBytesReadRemote(FormatUtils.getSizeFromBytes(bytesReadRemote))
.setTotalBytesReadUfs(FormatUtils.getSizeFromBytes(bytesReadUfs));

Long bytesReadCache = counters.get(
MetricKey.CLUSTER_BYTES_READ_CACHE.getName()).getCount();

// cluster cache hit and miss
long bytesReadTotal = bytesReadLocal + bytesReadRemote + bytesReadDomainSocket;
long bytesReadTotal = bytesReadLocal + bytesReadCache + bytesReadUfs;
double cacheHitLocalPercentage =
(bytesReadTotal > 0)
? (100D * (bytesReadLocal + bytesReadDomainSocket) / bytesReadTotal) : 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ public void initMetricKeys() {
mClusterCounters.putIfAbsent(new ClusterCounterKey(InstanceType.WORKER,
MetricKey.WORKER_BYTES_READ_DOMAIN.getMetricName()),
MetricsSystem.counter(MetricKey.CLUSTER_BYTES_READ_DOMAIN.getName()));
mClusterCounters.putIfAbsent(new ClusterCounterKey(InstanceType.WORKER,
MetricKey.WORKER_BYTES_READ_CACHE.getMetricName()),
MetricsSystem.counter(MetricKey.CLUSTER_BYTES_READ_CACHE.getName()));
mClusterCounters.putIfAbsent(new ClusterCounterKey(InstanceType.WORKER,
MetricKey.WORKER_BYTES_WRITTEN_REMOTE.getMetricName()),
MetricsSystem.counter(MetricKey.CLUSTER_BYTES_WRITTEN_REMOTE.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ boolean heartbeat(
// Send the heartbeat and execute the response
Command cmdFromMaster = null;
List<alluxio.grpc.Metric> metrics = MetricsSystem.reportWorkerMetrics();

try {
cmdFromMaster = mMasterClient.heartbeat(workerId, storeMeta.getCapacityBytesOnTiers(),
storeMeta.getUsedBytesOnTiers(), blockReport.getRemovedBlocks(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.io.DelegatingBlockReader;
import alluxio.worker.block.io.LocalFileBlockReader;
import alluxio.worker.block.io.MetricAccountingBlockReader;
import alluxio.worker.block.io.StoreBlockReader;
import alluxio.worker.block.io.StoreBlockWriter;
import alluxio.worker.block.management.DefaultStoreLoadTracker;
Expand Down Expand Up @@ -221,10 +223,11 @@ public BlockReader createBlockReader(long sessionId, long blockId, long offset)
}

try {
BlockReader reader = new StoreBlockReader(sessionId, block);
LocalFileBlockReader reader = new StoreBlockReader(sessionId, block);
((FileChannel) reader.getChannel()).position(offset);
accessBlock(sessionId, blockId);
return new DelegatingBlockReader(reader, blockLock);
BlockReader mareader = new MetricAccountingBlockReader(reader);
return new DelegatingBlockReader(mareader, blockLock);
} catch (Exception e) {
blockLock.close();
throw new IOException(format("Failed to get local block reader, sessionId=%d, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,19 @@ public String toString() {
}
}

private static class BytesReadMetricKey {
private final AlluxioURI mUri;
private final String mUser;
/**
* create an BytesReadMetricKey.
*/
public static class BytesReadMetricKey {
public final AlluxioURI mUri;
public final String mUser;

BytesReadMetricKey(AlluxioURI uri, String user) {
/**
* create an instance of the key class.
* @param uri
* @param user
*/
public BytesReadMetricKey(AlluxioURI uri, String user) {
mUri = uri;
mUser = user;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.worker.block.io;

import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

/**
* An reader class with metrics.
*/
public class MetricAccountingBlockReader extends BlockReader {
private final LocalFileBlockReader mBlockReader;

/**
* A decorator of BlockReader.
* @param mblockReader block reader
*/
public MetricAccountingBlockReader(LocalFileBlockReader mblockReader) {
mBlockReader = mblockReader;
}

@Override
public ByteBuffer read(long offset, long length) throws IOException {
ByteBuffer buffer = mBlockReader.read(offset, length);
int bytesReadFromCache = buffer.limit() - buffer.position();
MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesReadFromCache);
return buffer;
}

@Override
public long getLength() {
return mBlockReader.getLength();
}

@Override
public ReadableByteChannel getChannel() {
return new ReadableByteChannel() {
private final ReadableByteChannel mDelegate = mBlockReader.getChannel();
@Override
public int read(ByteBuffer dst) throws IOException {
int bytesRead = mDelegate.read(dst);
if (bytesRead != -1) {
MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesRead);
}
return bytesRead;
}

@Override
public boolean isOpen() {
return mDelegate.isOpen();
}

@Override
public void close() throws IOException {
mDelegate.close();
}
};
}

@Override
public int transferTo(ByteBuf buf) throws IOException {
int bytesReadFromCache = mBlockReader.transferTo(buf);
if (bytesReadFromCache != -1) {
MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesReadFromCache);
}
return bytesReadFromCache;
}

@Override
public boolean isClosed() {
return mBlockReader.isClosed();
}

@Override
public String getLocation() {
return mBlockReader.getLocation();
}

@Override
public String toString() {
return mBlockReader.toString();
}

@Override
public void close() throws IOException {
mBlockReader.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
*/
@NotThreadSafe
public class PagedBlockReader extends BlockReader {

private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
private final long mPageSize;
private final CacheManager mCacheManager;
Expand Down Expand Up @@ -122,6 +121,7 @@ private long read(ByteBuf byteBuf, long offset, long length) throws IOException
bytesRead += bytesReadFromCache;
MetricsSystem.meter(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE.getName()).mark(bytesRead);
mReadFromLocalCache = true;
MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesReadFromCache);
} else {
if (!mUfsBlockReader.isPresent()) {
throw new AlluxioRuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private BlockReader getBlockReader(PagedBlockMeta blockMeta, long offset,
}
final Optional<PagedUfsBlockReader> ufsBlockReader =
readOptions.map(opt -> new PagedUfsBlockReader(
mUfsManager, mUfsInStreamCache, blockMeta, offset, opt, mPageSize));
mUfsManager, mUfsInStreamCache, blockMeta, offset, opt, mPageSize));
return new PagedBlockReader(mCacheManager, blockMeta, offset, ufsBlockReader, mPageSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,24 @@
package alluxio.worker.page;

import alluxio.conf.PropertyKey;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.status.NotFoundException;
import alluxio.exception.status.UnavailableException;
import alluxio.metrics.MetricInfo;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.network.protocol.databuffer.NioDirectBufferPool;
import alluxio.resource.CloseableResource;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.IdUtils;
import alluxio.worker.block.UfsInputStreamCache;
import alluxio.worker.block.UnderFileSystemBlockStore.BytesReadMetricKey;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.meta.BlockMeta;

import com.codahale.metrics.Counter;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;

Expand All @@ -31,6 +39,8 @@
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Block reader that reads from UFS.
Expand All @@ -47,6 +57,9 @@ public class PagedUfsBlockReader extends BlockReader {
private long mLastPageIndex = -1;
private boolean mClosed = false;
private long mPosition;
private final ConcurrentMap<BytesReadMetricKey, Counter> mUfsBytesReadMetrics =
new ConcurrentHashMap<>();
private final Counter mUfsBytesRead;

/**
* @param ufsManager
Expand All @@ -70,6 +83,23 @@ public PagedUfsBlockReader(UfsManager ufsManager,
mInitialOffset = offset;
mLastPage = ByteBuffer.allocateDirect((int) mPageSize);
mPosition = offset;
try {
UfsManager.UfsClient ufsClient = mUfsManager.get(mUfsBlockOptions.getMountId());
mUfsBytesRead = mUfsBytesReadMetrics.computeIfAbsent(
new BytesReadMetricKey(ufsClient.getUfsMountPointUri(), mUfsBlockOptions.getUser()),
key -> key.mUser == null
? MetricsSystem.counterWithTags(
MetricKey.WORKER_BYTES_READ_UFS.getName(),
MetricKey.WORKER_BYTES_READ_UFS.isClusterAggregated(),
MetricInfo.TAG_UFS, MetricsSystem.escape(key.mUri))
: MetricsSystem.counterWithTags(
MetricKey.WORKER_BYTES_READ_UFS.getName(),
MetricKey.WORKER_BYTES_READ_UFS.isClusterAggregated(),
MetricInfo.TAG_UFS, MetricsSystem.escape(key.mUri),
MetricInfo.TAG_USER, key.mUser));
} catch (UnavailableException | NotFoundException e) {
throw AlluxioRuntimeException.from(e);
}
}

@Override
Expand Down Expand Up @@ -145,6 +175,7 @@ public int readPageAtIndex(ByteBuffer buffer, long pageIndex) throws IOException
mLastPage.flip();
mLastPageIndex = pageIndex;
fillWithCachedPage(buffer, pageIndex * mPageSize, totalBytesRead);
mUfsBytesRead.inc(totalBytesRead);
return totalBytesRead;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.base.Preconditions;

import java.util.Objects;
import javax.annotation.Nullable;

/**
* Options for reading a block from UFS.
Expand All @@ -26,12 +27,15 @@ public final class UfsBlockReadOptions {
private final long mOffsetInFile;
private final String mUfsPath;
private final boolean mCacheIntoAlluxio;
@Nullable private final String mUser;

UfsBlockReadOptions(long mountId, long offsetInFile, String ufsPath, boolean cacheIntoAlluxio) {
UfsBlockReadOptions(long mountId, long offsetInFile, String ufsPath, boolean cacheIntoAlluxio,
@Nullable String user) {
mMountId = mountId;
mOffsetInFile = offsetInFile;
mUfsPath = ufsPath;
mCacheIntoAlluxio = cacheIntoAlluxio;
mUser = user;
}

/**
Expand All @@ -47,7 +51,7 @@ public static UfsBlockReadOptions fromProto(Protocol.OpenUfsBlockOptions options
"missing offset in file for UFS block read");
Preconditions.checkArgument(options.hasUfsPath(), "missing UFS path for UFS block read");
return new UfsBlockReadOptions(options.getMountId(),
options.getOffsetInFile(), options.getUfsPath(), !options.getNoCache());
options.getOffsetInFile(), options.getUfsPath(), !options.getNoCache(), options.getUser());
}

/**
Expand All @@ -71,6 +75,12 @@ public String getUfsPath() {
return mUfsPath;
}

/**
*
* @return user
*/
public String getUser() { return mUser; }

/**
* @return whether the UFS block should be cached into Alluxio
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public void sequentialTransferMultipleTimes() throws Exception {
}

private static UfsBlockReadOptions createUfsBlockOptions(String ufsPath) {
return new UfsBlockReadOptions(MOUNT_ID, OFFSET_IN_FILE, ufsPath, true);
return new UfsBlockReadOptions(MOUNT_ID, OFFSET_IN_FILE, ufsPath, true, null);
}

private static void createTempUfsBlock(Path destPath, long blockSize) throws Exception {
Expand Down

0 comments on commit 33ea120

Please sign in to comment.