Skip to content

Commit

Permalink
[fix][broker] Increase readBuffer size for bookkeeper.DLOutputStream (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng authored Nov 5, 2024
1 parent 570cb44 commit 7a47888
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,16 @@ class DLOutputStream {

private final DistributedLogManager distributedLogManager;
private final AsyncLogWriter writer;
private final byte[] readBuffer = new byte[8192];
/*
* The LogRecord structure is:
* -------------------
* Bytes 0 - 7 : Metadata (Long)
* Bytes 8 - 15 : TxId (Long)
* Bytes 16 - 19 : Payload length (Integer)
* Bytes 20 - 20+payload.length-1 : Payload (Byte[])
* So the max buffer size should be LogRecord.MAX_LOGRECORD_SIZE - 2 * (Long.SIZE / 8) - Integer.SIZE / 8
*/
private final byte[] readBuffer = new byte[LogRecord.MAX_LOGRECORD_SIZE - 2 * (Long.SIZE / 8) - Integer.SIZE / 8];
private long offset = 0L;

private DLOutputStream(DistributedLogManager distributedLogManager, AsyncLogWriter writer) {
Expand All @@ -51,9 +60,9 @@ static CompletableFuture<DLOutputStream> openWriterAsync(DistributedLogManager d

private void writeAsyncHelper(InputStream is, CompletableFuture<DLOutputStream> result) {
try {
int read = is.read(readBuffer);
if (read != -1) {
log.info("write something into the ledgers offset: {}, length: {}", offset, read);
int read = is.readNBytes(readBuffer, 0, readBuffer.length);
if (read > 0) {
log.debug("write something into the ledgers offset: {}, length: {}", offset, read);
final ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read);
offset += writeBuf.readableBytes();
final LogRecord record = new LogRecord(offset, writeBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void writeBytesArrayData() throws ExecutionException, InterruptedExceptio

@Test
public void writeLongBytesArrayData() throws ExecutionException, InterruptedException {
byte[] data = new byte[8192 * 3 + 4096];
byte[] data = new byte[1040364 * 3 + 4096];
DLOutputStream.openWriterAsync(dlm)
.thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
.thenCompose(DLOutputStream::closeAsync)).get();
Expand Down

0 comments on commit 7a47888

Please sign in to comment.