Skip to content

Commit

Permalink
Merge pull request #209 from tulios/eos-201-ListOffsets-v2
Browse files Browse the repository at this point in the history
Eos 201 list offsets v2
  • Loading branch information
tulios authored Nov 22, 2018
2 parents 8be8751 + bad3a94 commit 2703cd5
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 16 deletions.
57 changes: 54 additions & 3 deletions src/broker/__tests__/listOffsets.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const Broker = require('../index')
const apiKeys = require('../../protocol/requests/apiKeys')
const {
secureRandom,
createConnection,
Expand Down Expand Up @@ -74,12 +75,14 @@ describe('Broker > ListOffsets', () => {
partitions: expect.arrayContaining([
{
errorCode: 0,
offsets: expect.arrayContaining([expect.stringMatching(/\d+/)]),
offset: expect.stringMatching(/\d+/),
partition: 0,
timestamp: '-1',
},
]),
},
],
throttleTime: 0,
})
})

Expand Down Expand Up @@ -117,12 +120,14 @@ describe('Broker > ListOffsets', () => {
partitions: expect.arrayContaining([
{
errorCode: 0,
offsets: expect.arrayContaining(['4']),
offset: '4',
partition: 0,
timestamp: '-1',
},
]),
},
],
throttleTime: 0,
})

topics = [
Expand All @@ -140,12 +145,58 @@ describe('Broker > ListOffsets', () => {
partitions: expect.arrayContaining([
{
errorCode: 0,
offsets: expect.arrayContaining(['0']),
offset: '0',
partition: 0,
timestamp: '-1',
},
]),
},
],
throttleTime: 0,
})
})

describe('v0', () => {
test('request', async () => {
broker.versions[apiKeys.ListOffsets].minVersion = 0
broker.versions[apiKeys.ListOffsets].maxVersion = 0

const produceData = [
{
topic: topicName,
partitions: [
{
partition: 0,
messages: [{ key: `key-0`, value: `some-value-0` }],
},
],
},
]

await broker.produce({ topicData: produceData })

const topics = [
{
topic: topicName,
partitions: [{ partition: 0 }],
},
]

const response = await broker.listOffsets({ topics })
expect(response).toEqual({
responses: [
{
topic: topicName,
partitions: expect.arrayContaining([
{
errorCode: 0,
offset: expect.stringMatching(/\d+/),
partition: 0,
},
]),
},
],
})
})
})
})
39 changes: 28 additions & 11 deletions src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,20 +341,37 @@ module.exports = class Broker {
/**
* @public
* @param {number} replicaId=-1 Broker id of the follower. For normal consumers, use -1
* @param {object} topics e.g:
* [
* {
* topic: 'topic-name',
* partitions: [
* { partition: 0 }
* ]
* }
* ]
* @param {number} isolationLevel=1 This setting controls the visibility of transactional records (default READ_COMMITTED, Kafka >0.11 only)
* @param {TopicPartitionOffset[]} topics e.g:
*
* @typedef {Object} TopicPartitionOffset
* @property {string} topic
* @property {PartitionOffset[]} partitions
*
* @typedef {Object} PartitionOffset
* @property {number} partition
* @property {number} [timestamp=-1]
*
*
* @returns {Promise}
*/
async listOffsets({ replicaId, topics }) {
async listOffsets({ replicaId, isolationLevel, topics }) {
const listOffsets = this.lookupRequest(apiKeys.ListOffsets, requests.ListOffsets)
return await this.connection.send(listOffsets({ replicaId, topics }))
const result = await this.connection.send(listOffsets({ replicaId, isolationLevel, topics }))

// Kafka >= 0.11 will return a single `offset` (ListOffsets V2),
// rather than an array of `offsets` (ListOffsets V0).
// Normalize to just return `offset`.
result.responses.forEach(response => {
response.partitions.map(partition => {
if (partition.offsets) {
partition.offset = partition.offsets.pop()
delete partition.offsets
}
})
})

return result
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,9 @@ module.exports = class Cluster {

return keys(partitionsPerTopic).map(topic => ({
topic,
partitions: partitionsPerTopic[topic].map(({ partition, offsets }) => ({
partitions: partitionsPerTopic[topic].map(({ partition, offset }) => ({
partition,
offset: offsets.pop(),
offset,
})),
}))
}
Expand Down
1 change: 1 addition & 0 deletions src/protocol/requests/listOffsets/fixtures/v2_request.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"data": [255,255,255,255,0,0,0,0,1,0,31,116,101,115,116,45,116,111,112,105,99,45,55,50,55,55,48,53,99,101,54,56,99,50,57,102,101,100,100,100,102,52,0,0,0,1,0,0,0,0,0,0,1,95,104,110,35,204],"type": "Buffer"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,0,0,0,0,1,0,74,116,101,115,116,45,116,111,112,105,99,45,56,52,101,102,101,55,97,97,97,102,99,51,56,52,52,98,48,48,99,49,45,51,54,50,49,49,45,50,101,101,52,51,49,98,52,45,100,52,48,98,45,52,100,102,56,45,98,50,99,56,45,102,99,57,101,51,51,97,98,53,99,55,55,0,0,0,1,0,0,0,0,0,0,255,255,255,255,255,255,255,255,0,0,0,0,0,0,0,1]}
7 changes: 7 additions & 0 deletions src/protocol/requests/listOffsets/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const ISOLATION_LEVEL = require('../../isolationLevel')

// For normal consumers, use -1
const REPLICA_ID = -1

Expand All @@ -7,6 +9,11 @@ const versions = {
const response = require('./v0/response')
return { request: request({ replicaId, topics }), response }
},
2: ({ replicaId = REPLICA_ID, isolationLevel = ISOLATION_LEVEL.READ_COMMITTED, topics }) => {
const request = require('./v2/request')
const response = require('./v2/response')
return { request: request({ replicaId, isolationLevel, topics }), response }
},
}

module.exports = {
Expand Down
32 changes: 32 additions & 0 deletions src/protocol/requests/listOffsets/v2/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
const Encoder = require('../../../encoder')
const { ListOffsets: apiKey } = require('../../apiKeys')

/**
* ListOffsets Request (Version: 2) => replica_id isolation_level [topics]
* replica_id => INT32
* isolation_level => INT8
* topics => topic [partitions]
* topic => STRING
* partitions => partition timestamp
* partition => INT32
* timestamp => INT64
*/
module.exports = ({ replicaId, isolationLevel, topics }) => ({
apiKey,
apiVersion: 2,
apiName: 'ListOffsets',
encode: async () => {
return new Encoder()
.writeInt32(replicaId)
.writeInt8(isolationLevel)
.writeArray(topics.map(encodeTopic))
},
})

const encodeTopic = ({ topic, partitions }) => {
return new Encoder().writeString(topic).writeArray(partitions.map(encodePartition))
}

const encodePartition = ({ partition, timestamp = -1 }) => {
return new Encoder().writeInt32(partition).writeInt64(timestamp)
}
16 changes: 16 additions & 0 deletions src/protocol/requests/listOffsets/v2/request.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const RequestV0Protocol = require('./request')

describe('Protocol > Requests > ListOffsets > v2', () => {
test('request', async () => {
const timestamp = 1509285569484
const topics = [
{
topic: 'test-topic-727705ce68c29fedddf4',
partitions: [{ partition: 0, timestamp }],
},
]

const { buffer } = await RequestV0Protocol({ replicaId: -1, topics }).encode()
expect(buffer).toEqual(Buffer.from(require('../fixtures/v2_request.json')))
})
})
52 changes: 52 additions & 0 deletions src/protocol/requests/listOffsets/v2/response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* ListOffsets Response (Version: 2) => throttle_time_ms [responses]
* throttle_time_ms => INT32
* responses => topic [partition_responses]
* topic => STRING
* partition_responses => partition error_code timestamp offset
* partition => INT32
* error_code => INT16
* timestamp => INT64
* offset => INT64
*/
const decode = async rawData => {
const decoder = new Decoder(rawData)

return {
throttleTime: decoder.readInt32(),
responses: decoder.readArray(decodeResponses),
}
}

const decodeResponses = decoder => ({
topic: decoder.readString(),
partitions: decoder.readArray(decodePartitions),
})

const decodePartitions = decoder => ({
partition: decoder.readInt32(),
errorCode: decoder.readInt16(),
timestamp: decoder.readInt64().toString(),
offset: decoder.readInt64().toString(),
})

const parse = async data => {
const partitionsWithError = data.responses.map(response =>
response.partitions.filter(partition => failure(partition.errorCode))
)
const partitionWithError = flatten(partitionsWithError)[0]
if (partitionWithError) {
throw createErrorFromCode(partitionWithError.errorCode)
}

return data
}

module.exports = {
decode,
parse,
}
18 changes: 18 additions & 0 deletions src/protocol/requests/listOffsets/v2/response.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
const { decode, parse } = require('./response')

describe('Protocol > Requests > ListOffsets > v2', () => {
test('response', async () => {
const data = await decode(Buffer.from(require('../fixtures/v2_response.json')))
expect(data).toEqual({
throttleTime: 0,
responses: [
{
topic: 'test-topic-84efe7aaafc3844b00c1-36211-2ee431b4-d40b-4df8-b2c8-fc9e33ab5c77',
partitions: [{ partition: 0, errorCode: 0, timestamp: '-1', offset: '1' }],
},
],
})

await expect(parse(data)).resolves.toBeTruthy()
})
})

0 comments on commit 2703cd5

Please sign in to comment.