diff --git a/README.md b/README.md
index 151423661..8bda0e40d 100644
--- a/README.md
+++ b/README.md
@@ -432,7 +432,7 @@ const producer = client.producer({ idempotent: true })
// Begin a transaction
const transaction = await producer.transaction()
-try {
+try {
// Call one of the transaction's send methods
await transaction.send({ topic, messages })
@@ -449,8 +449,8 @@ try {
To send offsets as part of a transaction, meaning they will be committed only if the transaction succeeds, use the `transaction.sendOffsets()` method. This is necessary whenever we want a transaction to produce messages derived from a consumer, in a "consume-transform-produce" loop.
```javascript
-await transaction.sendOffsets({
- consumerGroupId, topics
+await transaction.sendOffsets({
+ consumerGroupId, topics
})
```
@@ -1281,6 +1281,13 @@ List of available events:
`apiVersion`
}
+* consumer.events.REQUEST_QUEUE_SIZE
+ payload: {
+ `broker`,
+ `clientId`,
+ `queueSize`
+ }
+
### Producer
* producer.events.CONNECT
@@ -1315,6 +1322,13 @@ List of available events:
`apiVersion`
}
+* producer.events.REQUEST_QUEUE_SIZE
+ payload: {
+ `broker`,
+ `clientId`,
+ `queueSize`
+ }
+
### Admin
* admin.events.CONNECT
@@ -1349,6 +1363,13 @@ List of available events:
`apiVersion`
}
+* admin.events.REQUEST_QUEUE_SIZE
+ payload: {
+ `broker`,
+ `clientId`,
+ `queueSize`
+ }
+
## Custom logging
The logger is customized using log creators. A log creator is a function which receives a log level and returns a log function. The log function receives namespace, level, label, and log.
diff --git a/src/admin/index.spec.js b/src/admin/index.spec.js
index 0e3928fa0..88aac7e58 100644
--- a/src/admin/index.spec.js
+++ b/src/admin/index.spec.js
@@ -106,6 +106,46 @@ describe('Admin', () => {
})
})
+ test('emits the request queue size event', async () => {
+ const emitter = new InstrumentationEventEmitter()
+ const cluster = createCluster({
+ instrumentationEmitter: emitter,
+ maxInFlightRequests: 1,
+ })
+
+ const admin = createAdmin({
+ cluster,
+ logger: newLogger(),
+ instrumentationEmitter: emitter,
+ })
+
+ const requestListener = jest.fn().mockName('request_queue_size')
+ admin.on(admin.events.REQUEST_QUEUE_SIZE, requestListener)
+
+ await admin.connect()
+ await Promise.all([
+ admin.createTopics({
+ waitForLeaders: false,
+ topics: [{ topic: `test-topic-${secureRandom()}`, partitions: 8 }],
+ }),
+ admin.createTopics({
+ waitForLeaders: false,
+ topics: [{ topic: `test-topic-${secureRandom()}`, partitions: 8 }],
+ }),
+ ])
+
+ expect(requestListener).toHaveBeenCalledWith({
+ id: expect.any(Number),
+ timestamp: expect.any(Number),
+ type: 'admin.network.request_queue_size',
+ payload: {
+ broker: expect.any(String),
+ clientId: expect.any(String),
+ queueSize: expect.any(Number),
+ },
+ })
+ })
+
test('on throws an error when provided with an invalid event name', () => {
const admin = createAdmin({
cluster: createCluster(),
diff --git a/src/admin/instrumentationEvents.js b/src/admin/instrumentationEvents.js
index 2e3709c5d..cfc095c68 100644
--- a/src/admin/instrumentationEvents.js
+++ b/src/admin/instrumentationEvents.js
@@ -8,11 +8,13 @@ const events = {
DISCONNECT: adminType('disconnect'),
REQUEST: adminType(networkEvents.NETWORK_REQUEST),
REQUEST_TIMEOUT: adminType(networkEvents.NETWORK_REQUEST_TIMEOUT),
+ REQUEST_QUEUE_SIZE: adminType(networkEvents.NETWORK_REQUEST_QUEUE_SIZE),
}
const wrappedEvents = {
[events.REQUEST]: networkEvents.NETWORK_REQUEST,
[events.REQUEST_TIMEOUT]: networkEvents.NETWORK_REQUEST_TIMEOUT,
+ [events.REQUEST_QUEUE_SIZE]: networkEvents.NETWORK_REQUEST_QUEUE_SIZE,
}
const reversedWrappedEvents = swapObject(wrappedEvents)
diff --git a/src/consumer/__tests__/instrumentationEvents.spec.js b/src/consumer/__tests__/instrumentationEvents.spec.js
index 079b1808e..d4420e04a 100644
--- a/src/consumer/__tests__/instrumentationEvents.spec.js
+++ b/src/consumer/__tests__/instrumentationEvents.spec.js
@@ -323,7 +323,7 @@ describe('Consumer > Instrumentation Events', () => {
instrumentationEmitter: emitter,
})
- const requestListener = jest.fn().mockName('request')
+ const requestListener = jest.fn().mockName('request_timeout')
consumer.on(consumer.events.REQUEST_TIMEOUT, requestListener)
await consumer
@@ -348,4 +348,59 @@ describe('Consumer > Instrumentation Events', () => {
},
})
})
+
+ it('emits request queue size events', async () => {
+ const cluster = createCluster({
+ instrumentationEmitter: emitter,
+ maxInFlightRequests: 1,
+ })
+
+ consumer = createConsumer({
+ groupId,
+ cluster,
+ logger: newLogger(),
+ heartbeatInterval: 100,
+ maxWaitTimeInMs: 1,
+ maxBytesPerPartition: 180,
+ instrumentationEmitter: emitter,
+ })
+
+ const consumer2 = createConsumer({
+ groupId,
+ cluster,
+ logger: newLogger(),
+ heartbeatInterval: 100,
+ maxWaitTimeInMs: 1,
+ maxBytesPerPartition: 180,
+ instrumentationEmitter: emitter,
+ })
+
+ const requestListener = jest.fn().mockName('request_queue_size')
+ consumer.on(consumer.events.REQUEST_QUEUE_SIZE, requestListener)
+ consumer2.on(consumer2.events.REQUEST_QUEUE_SIZE, requestListener)
+
+ await Promise.all([
+ consumer
+ .connect()
+ .then(() => consumer.run({ eachMessage: () => true }))
+ .catch(e => e),
+ consumer2
+ .connect()
+ .then(() => consumer.run({ eachMessage: () => true }))
+ .catch(e => e),
+ ])
+
+ await consumer2.disconnect()
+
+ expect(requestListener).toHaveBeenCalledWith({
+ id: expect.any(Number),
+ timestamp: expect.any(Number),
+ type: 'consumer.network.request_queue_size',
+ payload: {
+ broker: expect.any(String),
+ clientId: expect.any(String),
+ queueSize: expect.any(Number),
+ },
+ })
+ })
})
diff --git a/src/consumer/instrumentationEvents.js b/src/consumer/instrumentationEvents.js
index 8a179d089..abcbe6716 100644
--- a/src/consumer/instrumentationEvents.js
+++ b/src/consumer/instrumentationEvents.js
@@ -16,11 +16,13 @@ const events = {
CRASH: consumerType('crash'),
REQUEST: consumerType(networkEvents.NETWORK_REQUEST),
REQUEST_TIMEOUT: consumerType(networkEvents.NETWORK_REQUEST_TIMEOUT),
+ REQUEST_QUEUE_SIZE: consumerType(networkEvents.NETWORK_REQUEST_QUEUE_SIZE),
}
const wrappedEvents = {
[events.REQUEST]: networkEvents.NETWORK_REQUEST,
[events.REQUEST_TIMEOUT]: networkEvents.NETWORK_REQUEST_TIMEOUT,
+ [events.REQUEST_QUEUE_SIZE]: networkEvents.NETWORK_REQUEST_QUEUE_SIZE,
}
const reversedWrappedEvents = swapObject(wrappedEvents)
diff --git a/src/producer/index.spec.js b/src/producer/index.spec.js
index 2d6387ca2..9d5fee288 100644
--- a/src/producer/index.spec.js
+++ b/src/producer/index.spec.js
@@ -296,6 +296,59 @@ describe('Producer', () => {
})
})
+ test('emits the request queue size event', async () => {
+ await createTopic({ topic: topicName, partitions: 8 })
+
+ const emitter = new InstrumentationEventEmitter()
+ const cluster = createCluster({
+ instrumentationEmitter: emitter,
+ maxInFlightRequests: 1,
+ clientId: 'test-client-id11111',
+ })
+
+ producer = createProducer({
+ cluster,
+ logger: newLogger(),
+ instrumentationEmitter: emitter,
+ })
+
+ const requestListener = jest.fn().mockName('request_queue_size')
+ producer.on(producer.events.REQUEST_QUEUE_SIZE, requestListener)
+
+ await producer.connect()
+ await Promise.all([
+ producer.send({
+ acks: -1,
+ topic: topicName,
+ messages: [
+ { partition: 0, value: 'value-0' },
+ { partition: 1, value: 'value-1' },
+ { partition: 2, value: 'value-2' },
+ ],
+ }),
+ producer.send({
+ acks: -1,
+ topic: topicName,
+ messages: [
+ { partition: 0, value: 'value-0' },
+ { partition: 1, value: 'value-1' },
+ { partition: 2, value: 'value-2' },
+ ],
+ }),
+ ])
+
+ expect(requestListener).toHaveBeenCalledWith({
+ id: expect.any(Number),
+ timestamp: expect.any(Number),
+ type: 'producer.network.request_queue_size',
+ payload: {
+ broker: expect.any(String),
+ clientId: expect.any(String),
+ queueSize: expect.any(Number),
+ },
+ })
+ })
+
describe('when acks=0', () => {
it('returns immediately', async () => {
const cluster = createCluster({
diff --git a/src/producer/instrumentationEvents.js b/src/producer/instrumentationEvents.js
index 84660449b..ea46915a1 100644
--- a/src/producer/instrumentationEvents.js
+++ b/src/producer/instrumentationEvents.js
@@ -8,11 +8,13 @@ const events = {
DISCONNECT: producerType('disconnect'),
REQUEST: producerType(networkEvents.NETWORK_REQUEST),
REQUEST_TIMEOUT: producerType(networkEvents.NETWORK_REQUEST_TIMEOUT),
+ REQUEST_QUEUE_SIZE: producerType(networkEvents.NETWORK_REQUEST_QUEUE_SIZE),
}
const wrappedEvents = {
[events.REQUEST]: networkEvents.NETWORK_REQUEST,
[events.REQUEST_TIMEOUT]: networkEvents.NETWORK_REQUEST_TIMEOUT,
+ [events.REQUEST_QUEUE_SIZE]: networkEvents.NETWORK_REQUEST_QUEUE_SIZE,
}
const reversedWrappedEvents = swapObject(wrappedEvents)