Skip to content

Commit

Permalink
Adds deliverAt and deliverAfter support to Producer (apache#123)
Browse files Browse the repository at this point in the history
Updates Message.cc to include support for delayed delivery from the C++ client. (pulsar_message_set_deliver_after & pulsar_message_set_deliver_at)

Co-authored-by: savearray2 <savearray2>
  • Loading branch information
savearray2 authored Sep 23, 2020
1 parent dc0b4c8 commit 52777a8
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
12 changes: 12 additions & 0 deletions src/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ static const std::string CFG_EVENT_TIME = "eventTimestamp";
static const std::string CFG_SEQUENCE_ID = "sequenceId";
static const std::string CFG_PARTITION_KEY = "partitionKey";
static const std::string CFG_REPL_CLUSTERS = "replicationClusters";
static const std::string CFG_DELIVER_AFTER = "deliverAfter";
static const std::string CFG_DELIVER_AT = "deliverAt";

Napi::FunctionReference Message::constructor;

Expand Down Expand Up @@ -194,6 +196,16 @@ pulsar_message_t *Message::BuildMessage(Napi::Object conf) {
FreeStringArray(arr, length);
}
}

if (conf.Has(CFG_DELIVER_AFTER) && conf.Get(CFG_DELIVER_AFTER).IsNumber()) {
Napi::Number deliverAfter = conf.Get(CFG_DELIVER_AFTER).ToNumber();
pulsar_message_set_deliver_after(cMessage, deliverAfter.Int64Value());
}

if (conf.Has(CFG_DELIVER_AT) && conf.Get(CFG_DELIVER_AT).IsNumber()) {
Napi::Number deliverAt = conf.Get(CFG_DELIVER_AT).ToNumber();
pulsar_message_set_deliver_at(cMessage, deliverAt.Int64Value());
}
return cMessage;
}

Expand Down
10 changes: 10 additions & 0 deletions tests/conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ maxUnackedMessagesPerConsumer=50000

subscriptionRedeliveryTrackerEnabled=true

# Whether to enable the delayed delivery for messages.
# If disabled, messages will be immediately delivered and there will
# be no tracking overhead.
delayedDeliveryEnabled=true

# Control the tick time for when retrying on delayed delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000

### --- Authentication --- ###

# Enable authentication
Expand Down
56 changes: 56 additions & 0 deletions tests/end_to_end.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,62 @@ const Pulsar = require('../index.js');
await client.close();
});

test('Produce-Delayed/Consume', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
operationTimeoutSeconds: 30,
});
expect(client).not.toBeNull();

const topic = 'persistent://public/default/produce-read-delayed';
const producer = await client.createProducer({
topic,
sendTimeoutMs: 30000,
batchingEnabled: true,
});
expect(producer).not.toBeNull();

const consumer = await client.subscribe({
topic,
subscription: 'sub',
subscriptionType: 'Shared',
});
expect(consumer).not.toBeNull();

const messages = [];
const time = (new Date()).getTime();
for (let i = 0; i < 5; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
deliverAfter: 3000,
});
messages.push(msg);
}
for (let i = 5; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
deliverAt: (new Date()).getTime() + 3000,
});
messages.push(msg);
}
await producer.flush();

const results = [];
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive();
results.push(msg.getData().toString());
consumer.acknowledge(msg);
}
expect(lodash.difference(messages, results)).toEqual([]);
expect((new Date()).getTime() - time).toBeGreaterThan(3000);

await producer.close();
await consumer.close();
await client.close();
});

test('Produce/Consume/Unsubscribe', async () => {
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
Expand Down

0 comments on commit 52777a8

Please sign in to comment.