diff --git a/src/protocol/recordBatch/v0/decoder.js b/src/protocol/recordBatch/v0/decoder.js index 99bb4f973..f8166e4f9 100644 --- a/src/protocol/recordBatch/v0/decoder.js +++ b/src/protocol/recordBatch/v0/decoder.js @@ -1,6 +1,5 @@ const Decoder = require('../../decoder') const { lookupCodecByRecordBatchAttributes } = require('../../message/compression') -const { KafkaJSPartialMessageError } = require('../../../errors') const RecordDecoder = require('../record/v0/decoder') const TRANSACTIONAL_FLAG_MASK = 0x10 @@ -24,18 +23,9 @@ const CONTROL_FLAG_MASK = 0x20 * Records => [Record] */ -module.exports = async decoder => { - const firstOffset = decoder.readInt64().toString() - const length = decoder.readInt32() - - const remainingBytes = Buffer.byteLength(decoder.slice(length).buffer) - - if (remainingBytes < length) { - throw new KafkaJSPartialMessageError( - `Tried to decode a partial record batch: remainingBytes(${remainingBytes}) < recordBatchLength(${length})` - ) - } - +module.exports = async fetchDecoder => { + const firstOffset = fetchDecoder.readInt64().toString() + const decoder = new Decoder(fetchDecoder.readBytes()) const partitionLeaderEpoch = decoder.readInt32() // The magic byte was read by the Fetch protocol to distinguish between @@ -58,10 +48,8 @@ module.exports = async decoder => { const isControlBatch = (attributes & CONTROL_FLAG_MASK) > 0 const codec = lookupCodecByRecordBatchAttributes(attributes) - const recordsSize = Buffer.byteLength(decoder.buffer) - const recordsDecoder = decoder.slice(recordsSize) const recordContext = { firstOffset, firstTimestamp, magicByte } - const records = await decodeRecords(codec, recordsDecoder, recordContext) + const records = await decodeRecords(codec, decoder, recordContext) return { firstOffset, @@ -85,7 +73,7 @@ const decodeRecords = async (codec, recordsDecoder, recordContext) => { const length = recordsDecoder.readInt32() - if (length === -1) { + if (length <= 0) { return [] } diff --git a/src/protocol/requests/fetch/fixtures/v4_from_scala_producer_response.json b/src/protocol/requests/fetch/fixtures/v4_from_scala_producer_response.json new file mode 100644 index 000000000..9b08e5fd0 --- /dev/null +++ b/src/protocol/requests/fetch/fixtures/v4_from_scala_producer_response.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,0,0,0,0,0,0,1,0,47,116,101,115,116,45,116,111,112,105,99,45,98,101,99,50,56,101,57,53,45,48,99,50,102,45,52,57,100,51,45,97,50,51,48,45,50,52,49,56,100,99,101,97,99,56,56,53,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,6,0,0,0,0,0,0,0,6,0,0,0,0,0,0,2,130,0,0,0,0,0,0,0,0,0,0,0,95,0,0,0,0,2,34,238,77,243,0,0,0,0,0,0,0,0,1,102,121,250,49,32,0,0,1,102,121,250,49,32,255,255,255,255,255,255,255,255,255,255,255,255,255,255,0,0,0,1,90,0,0,0,10,75,69,89,45,49,68,86,65,76,85,69,45,76,111,114,101,109,32,105,112,115,117,109,32,100,111,108,111,114,32,115,105,116,32,97,109,101,116,45,49,0,0,0,0,0,0,0,0,1,0,0,0,95,0,0,0,0,2,113,182,15,37,0,0,0,0,0,0,0,0,1,102,121,250,51,34,0,0,1,102,121,250,51,34,255,255,255,255,255,255,255,255,255,255,255,255,255,255,0,0,0,1,90,0,0,0,10,75,69,89,45,50,68,86,65,76,85,69,45,76,111,114,101,109,32,105,112,115,117,109,32,100,111,108,111,114,32,115,105,116,32,97,109,101,116,45,50,0,0,0,0,0,0,0,0,2,0,0,0,95,0,0,0,0,2,248,41,134,167,0,0,0,0,0,0,0,0,1,102,121,250,53,27,0,0,1,102,121,250,53,27,255,255,255,255,255,255,255,255,255,255,255,255,255,255,0,0,0,1,90,0,0,0,10,75,69,89,45,51,68,86,65,76,85,69,45,76,111,114,101,109,32,105,112,115,117,109,32,100,111,108,111,114,32,115,105,116,32,97,109,101,116,45,51,0,0,0,0,0,0,0,0,3,0,0,0,95,0,0,0,0,2,28,128,203,214,0,0,0,0,0,0,0,0,1,102,121,250,55,19,0,0,1,102,121,250,55,19,255,255,255,255,255,255,255,255,255,255,255,255,255,255,0,0,0,1,90,0,0,0,10,75,69,89,45,52,68,86,65,76,85,69,45,76,111,114,101,109,32,105,112,115,117,109,32,100,111,108,111,114,32,115,105,116,32,97,109,101,116,45,52,0,0,0,0,0,0,0,0,4,0,0,0,95,0,0,0,0,2,206,134,1,74,0,0,0,0,0,0,0,0,1,102,121,250,57,12,0,0,1,102,121,250,57,12,255,255,255,255,255,255,255,255,255,255,255,255,255,255,0,0,0,1,90,0,0,0,10,75,69,89,45,53,68,86,65,76,85,69,45,76,111,114,101,109,32,105,112,115,117,109,32,100,111,108,111,114,32,115,105,116,32,97,109,101,116,45,53,0,0,0,0,0,0,0,0,5,0,0,0,95,0,0,0,0,2,194,123,198,41,0,0,0,0,0,0,0,0,1,102,121,250,59,5,0,0,1,102,121,250,59,5,255,255,255,255,255,255,255,255,255,255,255,255,255,255,0,0,0,1,90,0,0,0,10,75,69,89,45,54,68,86,65,76,85,69,45,76,111,114,101,109,32,105,112,115,117,109,32,100,111,108,111,114,32,115,105,116,32,97,109,101,116,45,54,0]} diff --git a/src/protocol/requests/fetch/v4/response.js b/src/protocol/requests/fetch/v4/response.js index 4ea2b20de..139f0eacc 100644 --- a/src/protocol/requests/fetch/v4/response.js +++ b/src/protocol/requests/fetch/v4/response.js @@ -7,6 +7,7 @@ const { MAGIC_BYTE } = require('../../../recordBatch/v0') // the magic offset is at the same offset for all current message formats, but the 4 bytes // between the size and the magic is dependent on the version. const MAGIC_OFFSET = 16 +const RECORD_BATCH_OVERHEAD = 49 /** * Fetch Response (Version: 4) => throttle_time_ms [responses] @@ -37,8 +38,14 @@ const decodeMessages = async decoder => { const magicByte = messagesBuffer.slice(MAGIC_OFFSET).readInt8() if (magicByte === MAGIC_BYTE) { - const recordBatch = await RecordBatchDecoder(messagesDecoder) - return recordBatch.records + let records = [] + + while (messagesDecoder.canReadBytes(RECORD_BATCH_OVERHEAD)) { + const recordBatch = await RecordBatchDecoder(messagesDecoder) + records = [...records, ...recordBatch.records] + } + + return records } return MessageSetDecoder(messagesDecoder, messagesSize) diff --git a/src/protocol/requests/fetch/v4/response.spec.js b/src/protocol/requests/fetch/v4/response.spec.js index beb2c18a7..d234a0853 100644 --- a/src/protocol/requests/fetch/v4/response.spec.js +++ b/src/protocol/requests/fetch/v4/response.spec.js @@ -95,6 +95,33 @@ describe('Protocol > Requests > Fetch > v4', () => { key: Buffer.from('key-2'), value: Buffer.from('some-value-2'), }, + { + magicByte: 2, + attributes: 0, + timestamp: '1509827900073', + offset: '3', + headers: {}, + key: Buffer.from('key-1'), + value: Buffer.from('some-value-1'), + }, + { + magicByte: 2, + attributes: 0, + timestamp: '1509827900073', + offset: '4', + headers: {}, + key: Buffer.from('key-2'), + value: Buffer.from('some-value-2'), + }, + { + magicByte: 2, + attributes: 0, + timestamp: '1509827900073', + offset: '5', + headers: {}, + key: Buffer.from('key-3'), + value: Buffer.from('some-value-3'), + }, ], }, ], @@ -143,4 +170,85 @@ describe('Protocol > Requests > Fetch > v4', () => { await expect(parse(data)).resolves.toBeTruthy() }) + + test('response with several RecordBatch (from Scala producer)', async () => { + const data = await decode( + Buffer.from(require('../fixtures/v4_from_scala_producer_response.json')) + ) + expect(data).toEqual({ + throttleTime: 0, + responses: [ + { + topicName: 'test-topic-bec28e95-0c2f-49d3-a230-2418dceac885', + partitions: [ + { + partition: 0, + errorCode: 0, + highWatermark: '6', + lastStableOffset: '6', + abortedTransactions: [], + messages: [ + { + magicByte: 2, + attributes: 0, + timestamp: '1539644731680', + offset: '0', + key: Buffer.from('KEY-1'), + value: Buffer.from('VALUE-Lorem ipsum dolor sit amet-1'), + headers: {}, + }, + { + magicByte: 2, + attributes: 0, + timestamp: '1539644732194', + offset: '1', + key: Buffer.from('KEY-2'), + value: Buffer.from('VALUE-Lorem ipsum dolor sit amet-2'), + headers: {}, + }, + { + magicByte: 2, + attributes: 0, + timestamp: '1539644732699', + offset: '2', + key: Buffer.from('KEY-3'), + value: Buffer.from('VALUE-Lorem ipsum dolor sit amet-3'), + headers: {}, + }, + { + magicByte: 2, + attributes: 0, + timestamp: '1539644733203', + offset: '3', + key: Buffer.from('KEY-4'), + value: Buffer.from('VALUE-Lorem ipsum dolor sit amet-4'), + headers: {}, + }, + { + magicByte: 2, + attributes: 0, + timestamp: '1539644733708', + offset: '4', + key: Buffer.from('KEY-5'), + value: Buffer.from('VALUE-Lorem ipsum dolor sit amet-5'), + headers: {}, + }, + { + magicByte: 2, + attributes: 0, + timestamp: '1539644734213', + offset: '5', + key: Buffer.from('KEY-6'), + value: Buffer.from('VALUE-Lorem ipsum dolor sit amet-6'), + headers: {}, + }, + ], + }, + ], + }, + ], + }) + + await expect(parse(data)).resolves.toBeTruthy() + }) })