diff --git a/src/broker/__tests__/listOffsets.spec.js b/src/broker/__tests__/listOffsets.spec.js index 46674614d..31e9b0887 100644 --- a/src/broker/__tests__/listOffsets.spec.js +++ b/src/broker/__tests__/listOffsets.spec.js @@ -1,4 +1,5 @@ const Broker = require('../index') +const apiKeys = require('../../protocol/requests/apiKeys') const { secureRandom, createConnection, @@ -74,12 +75,14 @@ describe('Broker > ListOffsets', () => { partitions: expect.arrayContaining([ { errorCode: 0, - offsets: expect.arrayContaining([expect.stringMatching(/\d+/)]), + offset: expect.stringMatching(/\d+/), partition: 0, + timestamp: '-1', }, ]), }, ], + throttleTime: 0, }) }) @@ -117,12 +120,14 @@ describe('Broker > ListOffsets', () => { partitions: expect.arrayContaining([ { errorCode: 0, - offsets: expect.arrayContaining(['4']), + offset: '4', partition: 0, + timestamp: '-1', }, ]), }, ], + throttleTime: 0, }) topics = [ @@ -140,12 +145,58 @@ describe('Broker > ListOffsets', () => { partitions: expect.arrayContaining([ { errorCode: 0, - offsets: expect.arrayContaining(['0']), + offset: '0', partition: 0, + timestamp: '-1', }, ]), }, ], + throttleTime: 0, + }) + }) + + describe('v0', () => { + test('request', async () => { + broker.versions[apiKeys.ListOffsets].minVersion = 0 + broker.versions[apiKeys.ListOffsets].maxVersion = 0 + + const produceData = [ + { + topic: topicName, + partitions: [ + { + partition: 0, + messages: [{ key: `key-0`, value: `some-value-0` }], + }, + ], + }, + ] + + await broker.produce({ topicData: produceData }) + + const topics = [ + { + topic: topicName, + partitions: [{ partition: 0 }], + }, + ] + + const response = await broker.listOffsets({ topics }) + expect(response).toEqual({ + responses: [ + { + topic: topicName, + partitions: expect.arrayContaining([ + { + errorCode: 0, + offset: expect.stringMatching(/\d+/), + partition: 0, + }, + ]), + }, + ], + }) }) }) }) diff --git a/src/broker/index.js b/src/broker/index.js index 3850d0b21..66cd45183 100644 --- a/src/broker/index.js +++ b/src/broker/index.js @@ -341,20 +341,37 @@ module.exports = class Broker { /** * @public * @param {number} replicaId=-1 Broker id of the follower. For normal consumers, use -1 - * @param {object} topics e.g: - * [ - * { - * topic: 'topic-name', - * partitions: [ - * { partition: 0 } - * ] - * } - * ] + * @param {number} isolationLevel=1 This setting controls the visibility of transactional records (default READ_COMMITTED, Kafka >0.11 only) + * @param {TopicPartitionOffset[]} topics e.g: + * + * @typedef {Object} TopicPartitionOffset + * @property {string} topic + * @property {PartitionOffset[]} partitions + * + * @typedef {Object} PartitionOffset + * @property {number} partition + * @property {number} [timestamp=-1] + * + * * @returns {Promise} */ - async listOffsets({ replicaId, topics }) { + async listOffsets({ replicaId, isolationLevel, topics }) { const listOffsets = this.lookupRequest(apiKeys.ListOffsets, requests.ListOffsets) - return await this.connection.send(listOffsets({ replicaId, topics })) + const result = await this.connection.send(listOffsets({ replicaId, isolationLevel, topics })) + + // Kafka >= 0.11 will return a single `offset` (ListOffsets V2), + // rather than an array of `offsets` (ListOffsets V0). + // Normalize to just return `offset`. + result.responses.forEach(response => { + response.partitions.map(partition => { + if (partition.offsets) { + partition.offset = partition.offsets.pop() + delete partition.offsets + } + }) + }) + + return result } /** diff --git a/src/cluster/index.js b/src/cluster/index.js index d44ab0086..b2ff3f2fb 100644 --- a/src/cluster/index.js +++ b/src/cluster/index.js @@ -362,9 +362,9 @@ module.exports = class Cluster { return keys(partitionsPerTopic).map(topic => ({ topic, - partitions: partitionsPerTopic[topic].map(({ partition, offsets }) => ({ + partitions: partitionsPerTopic[topic].map(({ partition, offset }) => ({ partition, - offset: offsets.pop(), + offset, })), })) } diff --git a/src/protocol/requests/listOffsets/fixtures/v2_request.json b/src/protocol/requests/listOffsets/fixtures/v2_request.json new file mode 100644 index 000000000..65bb3db89 --- /dev/null +++ b/src/protocol/requests/listOffsets/fixtures/v2_request.json @@ -0,0 +1 @@ +{"data": [255,255,255,255,0,0,0,0,1,0,31,116,101,115,116,45,116,111,112,105,99,45,55,50,55,55,48,53,99,101,54,56,99,50,57,102,101,100,100,100,102,52,0,0,0,1,0,0,0,0,0,0,1,95,104,110,35,204],"type": "Buffer"} \ No newline at end of file diff --git a/src/protocol/requests/listOffsets/fixtures/v2_response.json b/src/protocol/requests/listOffsets/fixtures/v2_response.json new file mode 100644 index 000000000..e79238200 --- /dev/null +++ b/src/protocol/requests/listOffsets/fixtures/v2_response.json @@ -0,0 +1 @@ +{"type":"Buffer","data":[0,0,0,0,0,0,0,1,0,74,116,101,115,116,45,116,111,112,105,99,45,56,52,101,102,101,55,97,97,97,102,99,51,56,52,52,98,48,48,99,49,45,51,54,50,49,49,45,50,101,101,52,51,49,98,52,45,100,52,48,98,45,52,100,102,56,45,98,50,99,56,45,102,99,57,101,51,51,97,98,53,99,55,55,0,0,0,1,0,0,0,0,0,0,255,255,255,255,255,255,255,255,0,0,0,0,0,0,0,1]} \ No newline at end of file diff --git a/src/protocol/requests/listOffsets/index.js b/src/protocol/requests/listOffsets/index.js index 1d97fef30..4b3d9c944 100644 --- a/src/protocol/requests/listOffsets/index.js +++ b/src/protocol/requests/listOffsets/index.js @@ -1,3 +1,5 @@ +const ISOLATION_LEVEL = require('../../isolationLevel') + // For normal consumers, use -1 const REPLICA_ID = -1 @@ -7,6 +9,11 @@ const versions = { const response = require('./v0/response') return { request: request({ replicaId, topics }), response } }, + 2: ({ replicaId = REPLICA_ID, isolationLevel = ISOLATION_LEVEL.READ_COMMITTED, topics }) => { + const request = require('./v2/request') + const response = require('./v2/response') + return { request: request({ replicaId, isolationLevel, topics }), response } + }, } module.exports = { diff --git a/src/protocol/requests/listOffsets/v2/request.js b/src/protocol/requests/listOffsets/v2/request.js new file mode 100644 index 000000000..3a8caa0d1 --- /dev/null +++ b/src/protocol/requests/listOffsets/v2/request.js @@ -0,0 +1,32 @@ +const Encoder = require('../../../encoder') +const { ListOffsets: apiKey } = require('../../apiKeys') + +/** + * ListOffsets Request (Version: 2) => replica_id isolation_level [topics] + * replica_id => INT32 + * isolation_level => INT8 + * topics => topic [partitions] + * topic => STRING + * partitions => partition timestamp + * partition => INT32 + * timestamp => INT64 + */ +module.exports = ({ replicaId, isolationLevel, topics }) => ({ + apiKey, + apiVersion: 2, + apiName: 'ListOffsets', + encode: async () => { + return new Encoder() + .writeInt32(replicaId) + .writeInt8(isolationLevel) + .writeArray(topics.map(encodeTopic)) + }, +}) + +const encodeTopic = ({ topic, partitions }) => { + return new Encoder().writeString(topic).writeArray(partitions.map(encodePartition)) +} + +const encodePartition = ({ partition, timestamp = -1 }) => { + return new Encoder().writeInt32(partition).writeInt64(timestamp) +} diff --git a/src/protocol/requests/listOffsets/v2/request.spec.js b/src/protocol/requests/listOffsets/v2/request.spec.js new file mode 100644 index 000000000..930970c75 --- /dev/null +++ b/src/protocol/requests/listOffsets/v2/request.spec.js @@ -0,0 +1,16 @@ +const RequestV0Protocol = require('./request') + +describe('Protocol > Requests > ListOffsets > v2', () => { + test('request', async () => { + const timestamp = 1509285569484 + const topics = [ + { + topic: 'test-topic-727705ce68c29fedddf4', + partitions: [{ partition: 0, timestamp }], + }, + ] + + const { buffer } = await RequestV0Protocol({ replicaId: -1, topics }).encode() + expect(buffer).toEqual(Buffer.from(require('../fixtures/v2_request.json'))) + }) +}) diff --git a/src/protocol/requests/listOffsets/v2/response.js b/src/protocol/requests/listOffsets/v2/response.js new file mode 100644 index 000000000..4c6fdbfb2 --- /dev/null +++ b/src/protocol/requests/listOffsets/v2/response.js @@ -0,0 +1,52 @@ +const Decoder = require('../../../decoder') +const { failure, createErrorFromCode } = require('../../../error') +const flatten = require('../../../../utils/flatten') + +/** + * ListOffsets Response (Version: 2) => throttle_time_ms [responses] + * throttle_time_ms => INT32 + * responses => topic [partition_responses] + * topic => STRING + * partition_responses => partition error_code timestamp offset + * partition => INT32 + * error_code => INT16 + * timestamp => INT64 + * offset => INT64 + */ +const decode = async rawData => { + const decoder = new Decoder(rawData) + + return { + throttleTime: decoder.readInt32(), + responses: decoder.readArray(decodeResponses), + } +} + +const decodeResponses = decoder => ({ + topic: decoder.readString(), + partitions: decoder.readArray(decodePartitions), +}) + +const decodePartitions = decoder => ({ + partition: decoder.readInt32(), + errorCode: decoder.readInt16(), + timestamp: decoder.readInt64().toString(), + offset: decoder.readInt64().toString(), +}) + +const parse = async data => { + const partitionsWithError = data.responses.map(response => + response.partitions.filter(partition => failure(partition.errorCode)) + ) + const partitionWithError = flatten(partitionsWithError)[0] + if (partitionWithError) { + throw createErrorFromCode(partitionWithError.errorCode) + } + + return data +} + +module.exports = { + decode, + parse, +} diff --git a/src/protocol/requests/listOffsets/v2/response.spec.js b/src/protocol/requests/listOffsets/v2/response.spec.js new file mode 100644 index 000000000..01d57e727 --- /dev/null +++ b/src/protocol/requests/listOffsets/v2/response.spec.js @@ -0,0 +1,18 @@ +const { decode, parse } = require('./response') + +describe('Protocol > Requests > ListOffsets > v2', () => { + test('response', async () => { + const data = await decode(Buffer.from(require('../fixtures/v2_response.json'))) + expect(data).toEqual({ + throttleTime: 0, + responses: [ + { + topic: 'test-topic-84efe7aaafc3844b00c1-36211-2ee431b4-d40b-4df8-b2c8-fc9e33ab5c77', + partitions: [{ partition: 0, errorCode: 0, timestamp: '-1', offset: '1' }], + }, + ], + }) + + await expect(parse(data)).resolves.toBeTruthy() + }) +})