diff --git a/core/common/src/main/java/alluxio/metrics/MetricKey.java b/core/common/src/main/java/alluxio/metrics/MetricKey.java index 100c914ca94e..8e40eefef70a 100644 --- a/core/common/src/main/java/alluxio/metrics/MetricKey.java +++ b/core/common/src/main/java/alluxio/metrics/MetricKey.java @@ -1711,6 +1711,13 @@ public static String getSyncMetricName(long mountId) { .setDescription("Bytes read per minute throughput from all Alluxio UFSes by all workers") .setMetricType(MetricType.GAUGE) .build(); + + public static final MetricKey CLUSTER_BYTES_READ_CACHE = + new Builder("Cluster.BytesReadCache") + .setDescription("Total number of bytes read from all worker's cache") + .setMetricType(MetricType.COUNTER) + .build(); + public static final MetricKey CLUSTER_BYTES_WRITTEN_REMOTE = new Builder("Cluster.BytesWrittenRemote") .setDescription("Total number of bytes written to workers via network (RPC). " @@ -2003,6 +2010,14 @@ public static String getSyncMetricName(long mountId) { .setMetricType(MetricType.METER) .setIsClusterAggregated(false) .build(); + + public static final MetricKey WORKER_BYTES_READ_CACHE = + new Builder("Worker.BytesReadCache") + .setDescription("Total number of bytes read from the worker's cache") + .setMetricType(MetricType.COUNTER) + .setIsClusterAggregated(true) + .build(); + public static final MetricKey WORKER_BYTES_WRITTEN_DIRECT = new Builder("Worker.BytesWrittenDirect") .setDescription("Total number of bytes written to this worker " diff --git a/core/server/master/src/main/java/alluxio/master/meta/AlluxioMasterRestServiceHandler.java b/core/server/master/src/main/java/alluxio/master/meta/AlluxioMasterRestServiceHandler.java index f09279100267..d7b5ad2bd7cc 100644 --- a/core/server/master/src/main/java/alluxio/master/meta/AlluxioMasterRestServiceHandler.java +++ b/core/server/master/src/main/java/alluxio/master/meta/AlluxioMasterRestServiceHandler.java @@ -989,8 +989,11 @@ public Response getWebUIMetrics() { .setTotalBytesReadRemote(FormatUtils.getSizeFromBytes(bytesReadRemote)) .setTotalBytesReadUfs(FormatUtils.getSizeFromBytes(bytesReadUfs)); + Long bytesReadCache = counters.get( + MetricKey.CLUSTER_BYTES_READ_CACHE.getName()).getCount(); + // cluster cache hit and miss - long bytesReadTotal = bytesReadLocal + bytesReadRemote + bytesReadDomainSocket; + long bytesReadTotal = bytesReadLocal + bytesReadCache + bytesReadUfs; double cacheHitLocalPercentage = (bytesReadTotal > 0) ? (100D * (bytesReadLocal + bytesReadDomainSocket) / bytesReadTotal) : 0; diff --git a/core/server/master/src/main/java/alluxio/master/metrics/MetricsStore.java b/core/server/master/src/main/java/alluxio/master/metrics/MetricsStore.java index cca38f792671..9ded436a8af0 100644 --- a/core/server/master/src/main/java/alluxio/master/metrics/MetricsStore.java +++ b/core/server/master/src/main/java/alluxio/master/metrics/MetricsStore.java @@ -185,6 +185,9 @@ public void initMetricKeys() { mClusterCounters.putIfAbsent(new ClusterCounterKey(InstanceType.WORKER, MetricKey.WORKER_BYTES_READ_DOMAIN.getMetricName()), MetricsSystem.counter(MetricKey.CLUSTER_BYTES_READ_DOMAIN.getName())); + mClusterCounters.putIfAbsent(new ClusterCounterKey(InstanceType.WORKER, + MetricKey.WORKER_BYTES_READ_CACHE.getMetricName()), + MetricsSystem.counter(MetricKey.CLUSTER_BYTES_READ_CACHE.getName())); mClusterCounters.putIfAbsent(new ClusterCounterKey(InstanceType.WORKER, MetricKey.WORKER_BYTES_WRITTEN_REMOTE.getMetricName()), MetricsSystem.counter(MetricKey.CLUSTER_BYTES_WRITTEN_REMOTE.getName())); diff --git a/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSyncHelper.java b/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSyncHelper.java index b43cc378ace1..20dc2ded5550 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSyncHelper.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/BlockMasterSyncHelper.java @@ -135,7 +135,6 @@ boolean heartbeat( // Send the heartbeat and execute the response Command cmdFromMaster = null; List metrics = MetricsSystem.reportWorkerMetrics(); - try { cmdFromMaster = mMasterClient.heartbeat(workerId, storeMeta.getCapacityBytesOnTiers(), storeMeta.getUsedBytesOnTiers(), blockReport.getRemovedBlocks(), diff --git a/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java index 855203f9859d..ce447d94abce 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/TieredBlockStore.java @@ -30,6 +30,8 @@ import alluxio.worker.block.io.BlockReader; import alluxio.worker.block.io.BlockWriter; import alluxio.worker.block.io.DelegatingBlockReader; +import alluxio.worker.block.io.LocalFileBlockReader; +import alluxio.worker.block.io.MetricAccountingBlockReader; import alluxio.worker.block.io.StoreBlockReader; import alluxio.worker.block.io.StoreBlockWriter; import alluxio.worker.block.management.DefaultStoreLoadTracker; @@ -221,10 +223,11 @@ public BlockReader createBlockReader(long sessionId, long blockId, long offset) } try { - BlockReader reader = new StoreBlockReader(sessionId, block); + LocalFileBlockReader reader = new StoreBlockReader(sessionId, block); ((FileChannel) reader.getChannel()).position(offset); accessBlock(sessionId, blockId); - return new DelegatingBlockReader(reader, blockLock); + BlockReader mareader = new MetricAccountingBlockReader(reader); + return new DelegatingBlockReader(mareader, blockLock); } catch (Exception e) { blockLock.close(); throw new IOException(format("Failed to get local block reader, sessionId=%d, " diff --git a/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockStore.java index 07df037ddc23..b52f428c78f3 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockStore.java @@ -387,11 +387,19 @@ public String toString() { } } - private static class BytesReadMetricKey { - private final AlluxioURI mUri; - private final String mUser; + /** + * create an BytesReadMetricKey. + */ + public static class BytesReadMetricKey { + public final AlluxioURI mUri; + public final String mUser; - BytesReadMetricKey(AlluxioURI uri, String user) { + /** + * create an instance of the key class. + * @param uri + * @param user + */ + public BytesReadMetricKey(AlluxioURI uri, String user) { mUri = uri; mUser = user; } diff --git a/core/server/worker/src/main/java/alluxio/worker/block/io/MetricAccountingBlockReader.java b/core/server/worker/src/main/java/alluxio/worker/block/io/MetricAccountingBlockReader.java new file mode 100644 index 000000000000..1e58c31aef1f --- /dev/null +++ b/core/server/worker/src/main/java/alluxio/worker/block/io/MetricAccountingBlockReader.java @@ -0,0 +1,103 @@ +/* + * The Alluxio Open Foundation licenses this work under the Apache License, version 2.0 + * (the "License"). You may not use this work except in compliance with the License, which is + * available at www.apache.org/licenses/LICENSE-2.0 + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied, as more fully set forth in the License. + * + * See the NOTICE file distributed with this work for information regarding copyright ownership. + */ + +package alluxio.worker.block.io; + +import alluxio.metrics.MetricKey; +import alluxio.metrics.MetricsSystem; + +import io.netty.buffer.ByteBuf; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; + +/** + * An reader class with metrics. + */ +public class MetricAccountingBlockReader extends BlockReader { + private final LocalFileBlockReader mBlockReader; + + /** + * A decorator of BlockReader. + * @param mblockReader block reader + */ + public MetricAccountingBlockReader(LocalFileBlockReader mblockReader) { + mBlockReader = mblockReader; + } + + @Override + public ByteBuffer read(long offset, long length) throws IOException { + ByteBuffer buffer = mBlockReader.read(offset, length); + int bytesReadFromCache = buffer.limit() - buffer.position(); + MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesReadFromCache); + return buffer; + } + + @Override + public long getLength() { + return mBlockReader.getLength(); + } + + @Override + public ReadableByteChannel getChannel() { + return new ReadableByteChannel() { + private final ReadableByteChannel mDelegate = mBlockReader.getChannel(); + @Override + public int read(ByteBuffer dst) throws IOException { + int bytesRead = mDelegate.read(dst); + if (bytesRead != -1) { + MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesRead); + } + return bytesRead; + } + + @Override + public boolean isOpen() { + return mDelegate.isOpen(); + } + + @Override + public void close() throws IOException { + mDelegate.close(); + } + }; + } + + @Override + public int transferTo(ByteBuf buf) throws IOException { + int bytesReadFromCache = mBlockReader.transferTo(buf); + if (bytesReadFromCache != -1) { + MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesReadFromCache); + } + return bytesReadFromCache; + } + + @Override + public boolean isClosed() { + return mBlockReader.isClosed(); + } + + @Override + public String getLocation() { + return mBlockReader.getLocation(); + } + + @Override + public String toString() { + return mBlockReader.toString(); + } + + @Override + public void close() throws IOException { + mBlockReader.close(); + } +} diff --git a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java index a0a09d2b8b41..41b7a95d4016 100644 --- a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java +++ b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java @@ -39,7 +39,6 @@ */ @NotThreadSafe public class PagedBlockReader extends BlockReader { - private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0); private final long mPageSize; private final CacheManager mCacheManager; @@ -122,6 +121,7 @@ private long read(ByteBuf byteBuf, long offset, long length) throws IOException bytesRead += bytesReadFromCache; MetricsSystem.meter(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE.getName()).mark(bytesRead); mReadFromLocalCache = true; + MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesReadFromCache); } else { if (!mUfsBlockReader.isPresent()) { throw new AlluxioRuntimeException( diff --git a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockStore.java index 2dba6f52ce61..59ac5af9daeb 100644 --- a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockStore.java @@ -300,7 +300,7 @@ private BlockReader getBlockReader(PagedBlockMeta blockMeta, long offset, } final Optional ufsBlockReader = readOptions.map(opt -> new PagedUfsBlockReader( - mUfsManager, mUfsInStreamCache, blockMeta, offset, opt, mPageSize)); + mUfsManager, mUfsInStreamCache, blockMeta, offset, opt, mPageSize)); return new PagedBlockReader(mCacheManager, blockMeta, offset, ufsBlockReader, mPageSize); } diff --git a/core/server/worker/src/main/java/alluxio/worker/page/PagedUfsBlockReader.java b/core/server/worker/src/main/java/alluxio/worker/page/PagedUfsBlockReader.java index ef79f18348da..dd4d132a4bef 100644 --- a/core/server/worker/src/main/java/alluxio/worker/page/PagedUfsBlockReader.java +++ b/core/server/worker/src/main/java/alluxio/worker/page/PagedUfsBlockReader.java @@ -12,6 +12,12 @@ package alluxio.worker.page; import alluxio.conf.PropertyKey; +import alluxio.exception.runtime.AlluxioRuntimeException; +import alluxio.exception.status.NotFoundException; +import alluxio.exception.status.UnavailableException; +import alluxio.metrics.MetricInfo; +import alluxio.metrics.MetricKey; +import alluxio.metrics.MetricsSystem; import alluxio.network.protocol.databuffer.NioDirectBufferPool; import alluxio.resource.CloseableResource; import alluxio.underfs.UfsManager; @@ -19,9 +25,11 @@ import alluxio.underfs.options.OpenOptions; import alluxio.util.IdUtils; import alluxio.worker.block.UfsInputStreamCache; +import alluxio.worker.block.UnderFileSystemBlockStore.BytesReadMetricKey; import alluxio.worker.block.io.BlockReader; import alluxio.worker.block.meta.BlockMeta; +import com.codahale.metrics.Counter; import com.google.common.base.Preconditions; import io.netty.buffer.ByteBuf; @@ -31,6 +39,8 @@ import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Block reader that reads from UFS. @@ -47,6 +57,9 @@ public class PagedUfsBlockReader extends BlockReader { private long mLastPageIndex = -1; private boolean mClosed = false; private long mPosition; + private final ConcurrentMap mUfsBytesReadMetrics = + new ConcurrentHashMap<>(); + private final Counter mUfsBytesRead; /** * @param ufsManager @@ -70,6 +83,23 @@ public PagedUfsBlockReader(UfsManager ufsManager, mInitialOffset = offset; mLastPage = ByteBuffer.allocateDirect((int) mPageSize); mPosition = offset; + try { + UfsManager.UfsClient ufsClient = mUfsManager.get(mUfsBlockOptions.getMountId()); + mUfsBytesRead = mUfsBytesReadMetrics.computeIfAbsent( + new BytesReadMetricKey(ufsClient.getUfsMountPointUri(), mUfsBlockOptions.getUser()), + key -> key.mUser == null + ? MetricsSystem.counterWithTags( + MetricKey.WORKER_BYTES_READ_UFS.getName(), + MetricKey.WORKER_BYTES_READ_UFS.isClusterAggregated(), + MetricInfo.TAG_UFS, MetricsSystem.escape(key.mUri)) + : MetricsSystem.counterWithTags( + MetricKey.WORKER_BYTES_READ_UFS.getName(), + MetricKey.WORKER_BYTES_READ_UFS.isClusterAggregated(), + MetricInfo.TAG_UFS, MetricsSystem.escape(key.mUri), + MetricInfo.TAG_USER, key.mUser)); + } catch (UnavailableException | NotFoundException e) { + throw AlluxioRuntimeException.from(e); + } } @Override @@ -145,6 +175,7 @@ public int readPageAtIndex(ByteBuffer buffer, long pageIndex) throws IOException mLastPage.flip(); mLastPageIndex = pageIndex; fillWithCachedPage(buffer, pageIndex * mPageSize, totalBytesRead); + mUfsBytesRead.inc(totalBytesRead); return totalBytesRead; } diff --git a/core/server/worker/src/main/java/alluxio/worker/page/UfsBlockReadOptions.java b/core/server/worker/src/main/java/alluxio/worker/page/UfsBlockReadOptions.java index 4156c63334cb..d6b43a4c197e 100644 --- a/core/server/worker/src/main/java/alluxio/worker/page/UfsBlockReadOptions.java +++ b/core/server/worker/src/main/java/alluxio/worker/page/UfsBlockReadOptions.java @@ -16,6 +16,7 @@ import com.google.common.base.Preconditions; import java.util.Objects; +import javax.annotation.Nullable; /** * Options for reading a block from UFS. @@ -26,12 +27,15 @@ public final class UfsBlockReadOptions { private final long mOffsetInFile; private final String mUfsPath; private final boolean mCacheIntoAlluxio; + @Nullable private final String mUser; - UfsBlockReadOptions(long mountId, long offsetInFile, String ufsPath, boolean cacheIntoAlluxio) { + UfsBlockReadOptions(long mountId, long offsetInFile, String ufsPath, boolean cacheIntoAlluxio, + @Nullable String user) { mMountId = mountId; mOffsetInFile = offsetInFile; mUfsPath = ufsPath; mCacheIntoAlluxio = cacheIntoAlluxio; + mUser = user; } /** @@ -47,7 +51,7 @@ public static UfsBlockReadOptions fromProto(Protocol.OpenUfsBlockOptions options "missing offset in file for UFS block read"); Preconditions.checkArgument(options.hasUfsPath(), "missing UFS path for UFS block read"); return new UfsBlockReadOptions(options.getMountId(), - options.getOffsetInFile(), options.getUfsPath(), !options.getNoCache()); + options.getOffsetInFile(), options.getUfsPath(), !options.getNoCache(), options.getUser()); } /** @@ -71,6 +75,12 @@ public String getUfsPath() { return mUfsPath; } + /** + * + * @return user + */ + public String getUser() { return mUser; } + /** * @return whether the UFS block should be cached into Alluxio */ diff --git a/core/server/worker/src/test/java/alluxio/worker/page/PagedBlockReaderTest.java b/core/server/worker/src/test/java/alluxio/worker/page/PagedBlockReaderTest.java index 19d1fd222652..d8a938078b71 100644 --- a/core/server/worker/src/test/java/alluxio/worker/page/PagedBlockReaderTest.java +++ b/core/server/worker/src/test/java/alluxio/worker/page/PagedBlockReaderTest.java @@ -239,7 +239,7 @@ public void sequentialTransferMultipleTimes() throws Exception { } private static UfsBlockReadOptions createUfsBlockOptions(String ufsPath) { - return new UfsBlockReadOptions(MOUNT_ID, OFFSET_IN_FILE, ufsPath, true); + return new UfsBlockReadOptions(MOUNT_ID, OFFSET_IN_FILE, ufsPath, true, null); } private static void createTempUfsBlock(Path destPath, long blockSize) throws Exception {