From 71df2ae0c0a6a4a0277e1131227fb59eb7b265da Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Tue, 5 Nov 2024 08:19:22 +0800 Subject: [PATCH] [fix][broker] Increase readBuffer size for bookkeeper.DLOutputStream (#23548) (cherry picked from commit 7a4788895e31dcd794fcb89b3af2bc36fa221343) --- .../storage/bookkeeper/DLOutputStream.java | 17 +++++++++++++---- .../storage/bookkeeper/DLOutputStreamTest.java | 2 +- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java index 67345ebd47e31..f446961c1d8fe 100644 --- a/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java +++ b/pulsar-package-management/bookkeeper-storage/src/main/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStream.java @@ -36,7 +36,16 @@ class DLOutputStream { private final DistributedLogManager distributedLogManager; private final AsyncLogWriter writer; - private final byte[] readBuffer = new byte[8192]; + /* + * The LogRecord structure is: + * ------------------- + * Bytes 0 - 7 : Metadata (Long) + * Bytes 8 - 15 : TxId (Long) + * Bytes 16 - 19 : Payload length (Integer) + * Bytes 20 - 20+payload.length-1 : Payload (Byte[]) + * So the max buffer size should be LogRecord.MAX_LOGRECORD_SIZE - 2 * (Long.SIZE / 8) - Integer.SIZE / 8 + */ + private final byte[] readBuffer = new byte[LogRecord.MAX_LOGRECORD_SIZE - 2 * (Long.SIZE / 8) - Integer.SIZE / 8]; private long offset = 0L; private DLOutputStream(DistributedLogManager distributedLogManager, AsyncLogWriter writer) { @@ -51,9 +60,9 @@ static CompletableFuture openWriterAsync(DistributedLogManager d private void writeAsyncHelper(InputStream is, CompletableFuture result) { try { - int read = is.read(readBuffer); - if (read != -1) { - log.info("write something into the ledgers offset: {}, length: {}", offset, read); + int read = is.readNBytes(readBuffer, 0, readBuffer.length); + if (read > 0) { + log.debug("write something into the ledgers offset: {}, length: {}", offset, read); final ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read); offset += writeBuf.readableBytes(); final LogRecord record = new LogRecord(offset, writeBuf); diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java index b55e0e0d34a4f..235cb4fefc0c3 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java @@ -99,7 +99,7 @@ public void writeBytesArrayData() throws ExecutionException, InterruptedExceptio @Test public void writeLongBytesArrayData() throws ExecutionException, InterruptedException { - byte[] data = new byte[8192 * 3 + 4096]; + byte[] data = new byte[1040364 * 3 + 4096]; DLOutputStream.openWriterAsync(dlm) .thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data)) .thenCompose(DLOutputStream::closeAsync)).get();