Skip to content

Commit

Permalink
Merge pull request #245 from tulios/expose-network-queue-size-event
Browse files Browse the repository at this point in the history
Expose network queue size event
  • Loading branch information
tulios committed Jan 17, 2019
2 parents f7b76e3 + eb647f7 commit a12a00e
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 4 deletions.
27 changes: 24 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ const producer = client.producer({ idempotent: true })
// Begin a transaction
const transaction = await producer.transaction()

try {
try {
// Call one of the transaction's send methods
await transaction.send({ topic, messages })

Expand All @@ -449,8 +449,8 @@ try {
To send offsets as part of a transaction, meaning they will be committed only if the transaction succeeds, use the `transaction.sendOffsets()` method. This is necessary whenever we want a transaction to produce messages derived from a consumer, in a "consume-transform-produce" loop.

```javascript
await transaction.sendOffsets({
consumerGroupId, topics
await transaction.sendOffsets({
consumerGroupId, topics
})
```

Expand Down Expand Up @@ -1281,6 +1281,13 @@ List of available events:
`apiVersion`
}

* consumer.events.REQUEST_QUEUE_SIZE
payload: {
`broker`,
`clientId`,
`queueSize`
}

### <a name="instrumentation-producer"></a> Producer

* producer.events.CONNECT
Expand Down Expand Up @@ -1315,6 +1322,13 @@ List of available events:
`apiVersion`
}

* producer.events.REQUEST_QUEUE_SIZE
payload: {
`broker`,
`clientId`,
`queueSize`
}

### <a name="instrumentation-admin"></a> Admin

* admin.events.CONNECT
Expand Down Expand Up @@ -1349,6 +1363,13 @@ List of available events:
`apiVersion`
}

* admin.events.REQUEST_QUEUE_SIZE
payload: {
`broker`,
`clientId`,
`queueSize`
}

## <a name="custom-logging"></a> Custom logging

The logger is customized using log creators. A log creator is a function which receives a log level and returns a log function. The log function receives namespace, level, label, and log.
Expand Down
40 changes: 40 additions & 0 deletions src/admin/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,46 @@ describe('Admin', () => {
})
})

test('emits the request queue size event', async () => {
const emitter = new InstrumentationEventEmitter()
const cluster = createCluster({
instrumentationEmitter: emitter,
maxInFlightRequests: 1,
})

const admin = createAdmin({
cluster,
logger: newLogger(),
instrumentationEmitter: emitter,
})

const requestListener = jest.fn().mockName('request_queue_size')
admin.on(admin.events.REQUEST_QUEUE_SIZE, requestListener)

await admin.connect()
await Promise.all([
admin.createTopics({
waitForLeaders: false,
topics: [{ topic: `test-topic-${secureRandom()}`, partitions: 8 }],
}),
admin.createTopics({
waitForLeaders: false,
topics: [{ topic: `test-topic-${secureRandom()}`, partitions: 8 }],
}),
])

expect(requestListener).toHaveBeenCalledWith({
id: expect.any(Number),
timestamp: expect.any(Number),
type: 'admin.network.request_queue_size',
payload: {
broker: expect.any(String),
clientId: expect.any(String),
queueSize: expect.any(Number),
},
})
})

test('on throws an error when provided with an invalid event name', () => {
const admin = createAdmin({
cluster: createCluster(),
Expand Down
2 changes: 2 additions & 0 deletions src/admin/instrumentationEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ const events = {
DISCONNECT: adminType('disconnect'),
REQUEST: adminType(networkEvents.NETWORK_REQUEST),
REQUEST_TIMEOUT: adminType(networkEvents.NETWORK_REQUEST_TIMEOUT),
REQUEST_QUEUE_SIZE: adminType(networkEvents.NETWORK_REQUEST_QUEUE_SIZE),
}

const wrappedEvents = {
[events.REQUEST]: networkEvents.NETWORK_REQUEST,
[events.REQUEST_TIMEOUT]: networkEvents.NETWORK_REQUEST_TIMEOUT,
[events.REQUEST_QUEUE_SIZE]: networkEvents.NETWORK_REQUEST_QUEUE_SIZE,
}

const reversedWrappedEvents = swapObject(wrappedEvents)
Expand Down
57 changes: 56 additions & 1 deletion src/consumer/__tests__/instrumentationEvents.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ describe('Consumer > Instrumentation Events', () => {
instrumentationEmitter: emitter,
})

const requestListener = jest.fn().mockName('request')
const requestListener = jest.fn().mockName('request_timeout')
consumer.on(consumer.events.REQUEST_TIMEOUT, requestListener)

await consumer
Expand All @@ -348,4 +348,59 @@ describe('Consumer > Instrumentation Events', () => {
},
})
})

it('emits request queue size events', async () => {
const cluster = createCluster({
instrumentationEmitter: emitter,
maxInFlightRequests: 1,
})

consumer = createConsumer({
groupId,
cluster,
logger: newLogger(),
heartbeatInterval: 100,
maxWaitTimeInMs: 1,
maxBytesPerPartition: 180,
instrumentationEmitter: emitter,
})

const consumer2 = createConsumer({
groupId,
cluster,
logger: newLogger(),
heartbeatInterval: 100,
maxWaitTimeInMs: 1,
maxBytesPerPartition: 180,
instrumentationEmitter: emitter,
})

const requestListener = jest.fn().mockName('request_queue_size')
consumer.on(consumer.events.REQUEST_QUEUE_SIZE, requestListener)
consumer2.on(consumer2.events.REQUEST_QUEUE_SIZE, requestListener)

await Promise.all([
consumer
.connect()
.then(() => consumer.run({ eachMessage: () => true }))
.catch(e => e),
consumer2
.connect()
.then(() => consumer.run({ eachMessage: () => true }))
.catch(e => e),
])

await consumer2.disconnect()

expect(requestListener).toHaveBeenCalledWith({
id: expect.any(Number),
timestamp: expect.any(Number),
type: 'consumer.network.request_queue_size',
payload: {
broker: expect.any(String),
clientId: expect.any(String),
queueSize: expect.any(Number),
},
})
})
})
2 changes: 2 additions & 0 deletions src/consumer/instrumentationEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ const events = {
CRASH: consumerType('crash'),
REQUEST: consumerType(networkEvents.NETWORK_REQUEST),
REQUEST_TIMEOUT: consumerType(networkEvents.NETWORK_REQUEST_TIMEOUT),
REQUEST_QUEUE_SIZE: consumerType(networkEvents.NETWORK_REQUEST_QUEUE_SIZE),
}

const wrappedEvents = {
[events.REQUEST]: networkEvents.NETWORK_REQUEST,
[events.REQUEST_TIMEOUT]: networkEvents.NETWORK_REQUEST_TIMEOUT,
[events.REQUEST_QUEUE_SIZE]: networkEvents.NETWORK_REQUEST_QUEUE_SIZE,
}

const reversedWrappedEvents = swapObject(wrappedEvents)
Expand Down
53 changes: 53 additions & 0 deletions src/producer/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,59 @@ describe('Producer', () => {
})
})

test('emits the request queue size event', async () => {
await createTopic({ topic: topicName, partitions: 8 })

const emitter = new InstrumentationEventEmitter()
const cluster = createCluster({
instrumentationEmitter: emitter,
maxInFlightRequests: 1,
clientId: 'test-client-id11111',
})

producer = createProducer({
cluster,
logger: newLogger(),
instrumentationEmitter: emitter,
})

const requestListener = jest.fn().mockName('request_queue_size')
producer.on(producer.events.REQUEST_QUEUE_SIZE, requestListener)

await producer.connect()
await Promise.all([
producer.send({
acks: -1,
topic: topicName,
messages: [
{ partition: 0, value: 'value-0' },
{ partition: 1, value: 'value-1' },
{ partition: 2, value: 'value-2' },
],
}),
producer.send({
acks: -1,
topic: topicName,
messages: [
{ partition: 0, value: 'value-0' },
{ partition: 1, value: 'value-1' },
{ partition: 2, value: 'value-2' },
],
}),
])

expect(requestListener).toHaveBeenCalledWith({
id: expect.any(Number),
timestamp: expect.any(Number),
type: 'producer.network.request_queue_size',
payload: {
broker: expect.any(String),
clientId: expect.any(String),
queueSize: expect.any(Number),
},
})
})

describe('when acks=0', () => {
it('returns immediately', async () => {
const cluster = createCluster({
Expand Down
2 changes: 2 additions & 0 deletions src/producer/instrumentationEvents.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ const events = {
DISCONNECT: producerType('disconnect'),
REQUEST: producerType(networkEvents.NETWORK_REQUEST),
REQUEST_TIMEOUT: producerType(networkEvents.NETWORK_REQUEST_TIMEOUT),
REQUEST_QUEUE_SIZE: producerType(networkEvents.NETWORK_REQUEST_QUEUE_SIZE),
}

const wrappedEvents = {
[events.REQUEST]: networkEvents.NETWORK_REQUEST,
[events.REQUEST_TIMEOUT]: networkEvents.NETWORK_REQUEST_TIMEOUT,
[events.REQUEST_QUEUE_SIZE]: networkEvents.NETWORK_REQUEST_QUEUE_SIZE,
}

const reversedWrappedEvents = swapObject(wrappedEvents)
Expand Down

0 comments on commit a12a00e

Please sign in to comment.