Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eos 201 list offsets v2 #209

Merged
merged 4 commits into from
Nov 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions src/broker/__tests__/listOffsets.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const Broker = require('../index')
const apiKeys = require('../../protocol/requests/apiKeys')
const {
secureRandom,
createConnection,
Expand Down Expand Up @@ -74,12 +75,14 @@ describe('Broker > ListOffsets', () => {
partitions: expect.arrayContaining([
{
errorCode: 0,
offsets: expect.arrayContaining([expect.stringMatching(/\d+/)]),
offset: expect.stringMatching(/\d+/),
partition: 0,
timestamp: '-1',
},
]),
},
],
throttleTime: 0,
})
})

Expand Down Expand Up @@ -117,12 +120,14 @@ describe('Broker > ListOffsets', () => {
partitions: expect.arrayContaining([
{
errorCode: 0,
offsets: expect.arrayContaining(['4']),
offset: '4',
partition: 0,
timestamp: '-1',
},
]),
},
],
throttleTime: 0,
})

topics = [
Expand All @@ -140,12 +145,58 @@ describe('Broker > ListOffsets', () => {
partitions: expect.arrayContaining([
{
errorCode: 0,
offsets: expect.arrayContaining(['0']),
offset: '0',
partition: 0,
timestamp: '-1',
},
]),
},
],
throttleTime: 0,
})
})

describe('v0', () => {
test('request', async () => {
broker.versions[apiKeys.ListOffsets].minVersion = 0
broker.versions[apiKeys.ListOffsets].maxVersion = 0

const produceData = [
{
topic: topicName,
partitions: [
{
partition: 0,
messages: [{ key: `key-0`, value: `some-value-0` }],
},
],
},
]

await broker.produce({ topicData: produceData })

const topics = [
{
topic: topicName,
partitions: [{ partition: 0 }],
},
]

const response = await broker.listOffsets({ topics })
expect(response).toEqual({
responses: [
{
topic: topicName,
partitions: expect.arrayContaining([
{
errorCode: 0,
offset: expect.stringMatching(/\d+/),
partition: 0,
},
]),
},
],
})
})
})
})
39 changes: 28 additions & 11 deletions src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,20 +341,37 @@ module.exports = class Broker {
/**
* @public
* @param {number} replicaId=-1 Broker id of the follower. For normal consumers, use -1
* @param {object} topics e.g:
* [
* {
* topic: 'topic-name',
* partitions: [
* { partition: 0 }
* ]
* }
* ]
* @param {number} isolationLevel=1 This setting controls the visibility of transactional records (default READ_COMMITTED, Kafka >0.11 only)
* @param {TopicPartitionOffset[]} topics e.g:
*
* @typedef {Object} TopicPartitionOffset
* @property {string} topic
* @property {PartitionOffset[]} partitions
*
* @typedef {Object} PartitionOffset
* @property {number} partition
* @property {number} [timestamp=-1]
*
*
* @returns {Promise}
*/
async listOffsets({ replicaId, topics }) {
async listOffsets({ replicaId, isolationLevel, topics }) {
const listOffsets = this.lookupRequest(apiKeys.ListOffsets, requests.ListOffsets)
return await this.connection.send(listOffsets({ replicaId, topics }))
const result = await this.connection.send(listOffsets({ replicaId, isolationLevel, topics }))

// Kafka >= 0.11 will return a single `offset` (ListOffsets V2),
// rather than an array of `offsets` (ListOffsets V0).
// Normalize to just return `offset`.
result.responses.forEach(response => {
response.partitions.map(partition => {
if (partition.offsets) {
partition.offset = partition.offsets.pop()
delete partition.offsets
}
})
})

return result
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,9 @@ module.exports = class Cluster {

return keys(partitionsPerTopic).map(topic => ({
topic,
partitions: partitionsPerTopic[topic].map(({ partition, offsets }) => ({
partitions: partitionsPerTopic[topic].map(({ partition, offset }) => ({
partition,
offset: offsets.pop(),
offset,
})),
}))
}
Expand Down
1 change: 1 addition & 0 deletions src/protocol/requests/listOffsets/fixtures/v2_request.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"data": [255,255,255,255,0,0,0,0,1,0,31,116,101,115,116,45,116,111,112,105,99,45,55,50,55,55,48,53,99,101,54,56,99,50,57,102,101,100,100,100,102,52,0,0,0,1,0,0,0,0,0,0,1,95,104,110,35,204],"type": "Buffer"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"Buffer","data":[0,0,0,0,0,0,0,1,0,74,116,101,115,116,45,116,111,112,105,99,45,56,52,101,102,101,55,97,97,97,102,99,51,56,52,52,98,48,48,99,49,45,51,54,50,49,49,45,50,101,101,52,51,49,98,52,45,100,52,48,98,45,52,100,102,56,45,98,50,99,56,45,102,99,57,101,51,51,97,98,53,99,55,55,0,0,0,1,0,0,0,0,0,0,255,255,255,255,255,255,255,255,0,0,0,0,0,0,0,1]}
7 changes: 7 additions & 0 deletions src/protocol/requests/listOffsets/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const ISOLATION_LEVEL = require('../../isolationLevel')

// For normal consumers, use -1
const REPLICA_ID = -1

Expand All @@ -7,6 +9,11 @@ const versions = {
const response = require('./v0/response')
return { request: request({ replicaId, topics }), response }
},
2: ({ replicaId = REPLICA_ID, isolationLevel = ISOLATION_LEVEL.READ_COMMITTED, topics }) => {
const request = require('./v2/request')
const response = require('./v2/response')
return { request: request({ replicaId, isolationLevel, topics }), response }
},
}

module.exports = {
Expand Down
32 changes: 32 additions & 0 deletions src/protocol/requests/listOffsets/v2/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
const Encoder = require('../../../encoder')
const { ListOffsets: apiKey } = require('../../apiKeys')

/**
* ListOffsets Request (Version: 2) => replica_id isolation_level [topics]
* replica_id => INT32
* isolation_level => INT8
* topics => topic [partitions]
* topic => STRING
* partitions => partition timestamp
* partition => INT32
* timestamp => INT64
*/
module.exports = ({ replicaId, isolationLevel, topics }) => ({
apiKey,
apiVersion: 2,
apiName: 'ListOffsets',
encode: async () => {
return new Encoder()
.writeInt32(replicaId)
.writeInt8(isolationLevel)
.writeArray(topics.map(encodeTopic))
},
})

const encodeTopic = ({ topic, partitions }) => {
return new Encoder().writeString(topic).writeArray(partitions.map(encodePartition))
}

const encodePartition = ({ partition, timestamp = -1 }) => {
return new Encoder().writeInt32(partition).writeInt64(timestamp)
}
16 changes: 16 additions & 0 deletions src/protocol/requests/listOffsets/v2/request.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const RequestV0Protocol = require('./request')

describe('Protocol > Requests > ListOffsets > v2', () => {
test('request', async () => {
const timestamp = 1509285569484
const topics = [
{
topic: 'test-topic-727705ce68c29fedddf4',
partitions: [{ partition: 0, timestamp }],
},
]

const { buffer } = await RequestV0Protocol({ replicaId: -1, topics }).encode()
expect(buffer).toEqual(Buffer.from(require('../fixtures/v2_request.json')))
})
})
52 changes: 52 additions & 0 deletions src/protocol/requests/listOffsets/v2/response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')
const flatten = require('../../../../utils/flatten')

/**
* ListOffsets Response (Version: 2) => throttle_time_ms [responses]
* throttle_time_ms => INT32
* responses => topic [partition_responses]
* topic => STRING
* partition_responses => partition error_code timestamp offset
* partition => INT32
* error_code => INT16
* timestamp => INT64
* offset => INT64
*/
const decode = async rawData => {
const decoder = new Decoder(rawData)

return {
throttleTime: decoder.readInt32(),
responses: decoder.readArray(decodeResponses),
}
}

const decodeResponses = decoder => ({
topic: decoder.readString(),
partitions: decoder.readArray(decodePartitions),
})

const decodePartitions = decoder => ({
partition: decoder.readInt32(),
errorCode: decoder.readInt16(),
timestamp: decoder.readInt64().toString(),
offset: decoder.readInt64().toString(),
})

const parse = async data => {
const partitionsWithError = data.responses.map(response =>
response.partitions.filter(partition => failure(partition.errorCode))
)
const partitionWithError = flatten(partitionsWithError)[0]
if (partitionWithError) {
throw createErrorFromCode(partitionWithError.errorCode)
}

return data
}

module.exports = {
decode,
parse,
}
18 changes: 18 additions & 0 deletions src/protocol/requests/listOffsets/v2/response.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
const { decode, parse } = require('./response')

describe('Protocol > Requests > ListOffsets > v2', () => {
test('response', async () => {
const data = await decode(Buffer.from(require('../fixtures/v2_response.json')))
expect(data).toEqual({
throttleTime: 0,
responses: [
{
topic: 'test-topic-84efe7aaafc3844b00c1-36211-2ee431b4-d40b-4df8-b2c8-fc9e33ab5c77',
partitions: [{ partition: 0, errorCode: 0, timestamp: '-1', offset: '1' }],
},
],
})

await expect(parse(data)).resolves.toBeTruthy()
})
})