Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose network queue size event #245

Merged
merged 5 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ const producer = client.producer({ idempotent: true })
// Begin a transaction
const transaction = await producer.transaction()

try {
try {
// Call one of the transaction's send methods
await transaction.send({ topic, messages })

Expand All @@ -449,8 +449,8 @@ try {
To send offsets as part of a transaction, meaning they will be committed only if the transaction succeeds, use the `transaction.sendOffsets()` method. This is necessary whenever we want a transaction to produce messages derived from a consumer, in a "consume-transform-produce" loop.

```javascript
await transaction.sendOffsets({
consumerGroupId, topics
await transaction.sendOffsets({
consumerGroupId, topics
})
```

Expand Down Expand Up @@ -1281,6 +1281,13 @@ List of available events:
`apiVersion`
}

* consumer.events.REQUEST_QUEUE_SIZE
payload: {
`broker`,
`clientId`,
`queueSize`
}

### <a name="instrumentation-producer"></a> Producer

* producer.events.CONNECT
Expand Down Expand Up @@ -1315,6 +1322,13 @@ List of available events:
`apiVersion`
}

* producer.events.REQUEST_QUEUE_SIZE
payload: {
`broker`,
`clientId`,
`queueSize`
}

### <a name="instrumentation-admin"></a> Admin

* admin.events.CONNECT
Expand Down Expand Up @@ -1349,6 +1363,13 @@ List of available events:
`apiVersion`
}

* admin.events.REQUEST_QUEUE_SIZE
payload: {
`broker`,
`clientId`,
`queueSize`
}

## <a name="custom-logging"></a> Custom logging

The logger is customized using log creators. A log creator is a function which receives a log level and returns a log function. The log function receives namespace, level, label, and log.
Expand Down
40 changes: 40 additions & 0 deletions src/admin/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,46 @@ describe('Admin', () => {
})
})

test('emits the request queue size event', async () => {
const emitter = new InstrumentationEventEmitter()
const cluster = createCluster({
instrumentationEmitter: emitter,
maxInFlightRequests: 1,
})

const admin = createAdmin({
cluster,
logger: newLogger(),
instrumentationEmitter: emitter,
})

const requestListener = jest.fn().mockName('request_queue_size')
admin.on(admin.events.REQUEST_QUEUE_SIZE, requestListener)

await admin.connect()
await Promise.all([
admin.createTopics({
waitForLeaders: false,
topics: [{ topic: `test-topic-${secureRandom()}`, partitions: 8 }],
}),
admin.createTopics({
waitForLeaders: false,
topics: [{ topic: `test-topic-${secureRandom()}`, partitions: 8 }],
}),
])

expect(requestListener).toHaveBeenCalledWith({
id: expect.any(Number),
timestamp: expect.any(Number),
type: 'admin.network.request_queue_size',
payload: {
broker: expect.any(String),
clientId: expect.any(String),
queueSize: expect.any(Number),
},
})
})

test('on throws an error when provided with an invalid event name', () => {
const admin = createAdmin({
cluster: createCluster(),
Expand Down
2 changes: 2 additions & 0 deletions src/admin/instrumentationEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ const events = {
DISCONNECT: adminType('disconnect'),
REQUEST: adminType(networkEvents.NETWORK_REQUEST),
REQUEST_TIMEOUT: adminType(networkEvents.NETWORK_REQUEST_TIMEOUT),
REQUEST_QUEUE_SIZE: adminType(networkEvents.NETWORK_REQUEST_QUEUE_SIZE),
}

const wrappedEvents = {
[events.REQUEST]: networkEvents.NETWORK_REQUEST,
[events.REQUEST_TIMEOUT]: networkEvents.NETWORK_REQUEST_TIMEOUT,
[events.REQUEST_QUEUE_SIZE]: networkEvents.NETWORK_REQUEST_QUEUE_SIZE,
}

const reversedWrappedEvents = swapObject(wrappedEvents)
Expand Down
57 changes: 56 additions & 1 deletion src/consumer/__tests__/instrumentationEvents.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ describe('Consumer > Instrumentation Events', () => {
instrumentationEmitter: emitter,
})

const requestListener = jest.fn().mockName('request')
const requestListener = jest.fn().mockName('request_timeout')
consumer.on(consumer.events.REQUEST_TIMEOUT, requestListener)

await consumer
Expand All @@ -348,4 +348,59 @@ describe('Consumer > Instrumentation Events', () => {
},
})
})

it('emits request queue size events', async () => {
const cluster = createCluster({
instrumentationEmitter: emitter,
maxInFlightRequests: 1,
})

consumer = createConsumer({
groupId,
cluster,
logger: newLogger(),
heartbeatInterval: 100,
maxWaitTimeInMs: 1,
maxBytesPerPartition: 180,
instrumentationEmitter: emitter,
})

const consumer2 = createConsumer({
groupId,
cluster,
logger: newLogger(),
heartbeatInterval: 100,
maxWaitTimeInMs: 1,
maxBytesPerPartition: 180,
instrumentationEmitter: emitter,
})

const requestListener = jest.fn().mockName('request_queue_size')
consumer.on(consumer.events.REQUEST_QUEUE_SIZE, requestListener)
consumer2.on(consumer2.events.REQUEST_QUEUE_SIZE, requestListener)

await Promise.all([
consumer
.connect()
.then(() => consumer.run({ eachMessage: () => true }))
.catch(e => e),
consumer2
.connect()
.then(() => consumer.run({ eachMessage: () => true }))
.catch(e => e),
])

await consumer2.disconnect()

expect(requestListener).toHaveBeenCalledWith({
id: expect.any(Number),
timestamp: expect.any(Number),
type: 'consumer.network.request_queue_size',
payload: {
broker: expect.any(String),
clientId: expect.any(String),
queueSize: expect.any(Number),
},
})
})
})
2 changes: 2 additions & 0 deletions src/consumer/instrumentationEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ const events = {
CRASH: consumerType('crash'),
REQUEST: consumerType(networkEvents.NETWORK_REQUEST),
REQUEST_TIMEOUT: consumerType(networkEvents.NETWORK_REQUEST_TIMEOUT),
REQUEST_QUEUE_SIZE: consumerType(networkEvents.NETWORK_REQUEST_QUEUE_SIZE),
}

const wrappedEvents = {
[events.REQUEST]: networkEvents.NETWORK_REQUEST,
[events.REQUEST_TIMEOUT]: networkEvents.NETWORK_REQUEST_TIMEOUT,
[events.REQUEST_QUEUE_SIZE]: networkEvents.NETWORK_REQUEST_QUEUE_SIZE,
}

const reversedWrappedEvents = swapObject(wrappedEvents)
Expand Down
53 changes: 53 additions & 0 deletions src/producer/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,59 @@ describe('Producer', () => {
})
})

test('emits the request queue size event', async () => {
await createTopic({ topic: topicName, partitions: 8 })

const emitter = new InstrumentationEventEmitter()
const cluster = createCluster({
instrumentationEmitter: emitter,
maxInFlightRequests: 1,
clientId: 'test-client-id11111',
})

producer = createProducer({
cluster,
logger: newLogger(),
instrumentationEmitter: emitter,
})

const requestListener = jest.fn().mockName('request_queue_size')
producer.on(producer.events.REQUEST_QUEUE_SIZE, requestListener)

await producer.connect()
await Promise.all([
producer.send({
acks: -1,
topic: topicName,
messages: [
{ partition: 0, value: 'value-0' },
{ partition: 1, value: 'value-1' },
{ partition: 2, value: 'value-2' },
],
}),
producer.send({
acks: -1,
topic: topicName,
messages: [
{ partition: 0, value: 'value-0' },
{ partition: 1, value: 'value-1' },
{ partition: 2, value: 'value-2' },
],
}),
])

expect(requestListener).toHaveBeenCalledWith({
id: expect.any(Number),
timestamp: expect.any(Number),
type: 'producer.network.request_queue_size',
payload: {
broker: expect.any(String),
clientId: expect.any(String),
queueSize: expect.any(Number),
},
})
})

describe('when acks=0', () => {
it('returns immediately', async () => {
const cluster = createCluster({
Expand Down
2 changes: 2 additions & 0 deletions src/producer/instrumentationEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ const events = {
DISCONNECT: producerType('disconnect'),
REQUEST: producerType(networkEvents.NETWORK_REQUEST),
REQUEST_TIMEOUT: producerType(networkEvents.NETWORK_REQUEST_TIMEOUT),
REQUEST_QUEUE_SIZE: producerType(networkEvents.NETWORK_REQUEST_QUEUE_SIZE),
}

const wrappedEvents = {
[events.REQUEST]: networkEvents.NETWORK_REQUEST,
[events.REQUEST_TIMEOUT]: networkEvents.NETWORK_REQUEST_TIMEOUT,
[events.REQUEST_QUEUE_SIZE]: networkEvents.NETWORK_REQUEST_QUEUE_SIZE,
}

const reversedWrappedEvents = swapObject(wrappedEvents)
Expand Down