Skip to content

Commit

Permalink
Merge pull request #113 from openfoodfacts/112-support-new-multi-flav…
Browse files Browse the repository at this point in the history
…our-stream-name

feat: Add support for new stream name
  • Loading branch information
john-gom authored Dec 12, 2024
2 parents 963a4ae + 5c8b639 commit bfa83f1
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 6 deletions.
83 changes: 81 additions & 2 deletions src/domain/services/messages.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ describe('create', () => {
});
});

it('should not call importwithfilter for initialImport', async () => {
it('should not call importWithFilter for initialImport', async () => {
await createTestingModule([DomainModule], async (app) => {
const importService = app.get(ImportService);
const importSpy = jest
Expand Down Expand Up @@ -311,7 +311,7 @@ describe('create', () => {
});
});

it('should call importwithfilter for normal imports', async () => {
it('should not include non-food products in call to importWithFilter', async () => {
await createTestingModule([DomainModule], async (app) => {
const importService = app.get(ImportService);
const importSpy = jest
Expand All @@ -327,12 +327,91 @@ describe('create', () => {
id: nextId(),
message: {
code: code1,
product_type: 'food',
},
},
{
id: nextId(),
message: {
code: code2,
product_type: 'beauty',
},
},
];

const messagesService = app.get(MessagesService);
await messagesService.create(messages);

// Then the import is called
expect(importSpy).toHaveBeenCalledTimes(1);

// Update events are created for all codes
const events =
await sql`SELECT * FROM product_update_event WHERE message->>'code' IN ${sql(
[code1, code2],
)}`;

expect(events).toHaveLength(2);

// Import with filter only called for the food product
const importWithFilterIn = importSpy.mock.calls[0][0].code.$in;
expect(importWithFilterIn).toHaveLength(1);
expect(importWithFilterIn[0]).toBe(code1);
});
});

it('should not call importWithFilter for updates to only non-food products', async () => {
await createTestingModule([DomainModule], async (app) => {
const importService = app.get(ImportService);
const importSpy = jest
.spyOn(importService, 'importWithFilter')
.mockImplementation();

const code1 = randomCode();
let idCount = 0;
const nextId = () => `${Date.now()}-${idCount++}`;
const messages = [
{
id: nextId(),
message: {
code: code1,
product_type: 'beauty',
},
},
];

const messagesService = app.get(MessagesService);
await messagesService.create(messages);

// Then the import is not called
expect(importSpy).toHaveBeenCalledTimes(0);
});
});

it('should call importWithFilter for normal imports', async () => {
await createTestingModule([DomainModule], async (app) => {
const importService = app.get(ImportService);
const importSpy = jest
.spyOn(importService, 'importWithFilter')
.mockImplementation();

const code1 = randomCode();
const code2 = randomCode();
let idCount = 0;
const nextId = () => `${Date.now()}-${idCount++}`;
const messages = [
{
id: nextId(),
message: {
code: code1,
product_type: 'food',
},
},
{
id: nextId(),
message: {
code: code2,
product_type: 'food',
},
},
];
Expand Down
15 changes: 12 additions & 3 deletions src/domain/services/messages.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,18 @@ export class MessagesService {
do nothing`;

if (!initialImport) {
const productCodes = [...new Set(messages.map((m) => m.message.code))];
const filter = { code: { $in: productCodes } };
await this.importService.importWithFilter(filter, ProductSource.EVENT);
const productCodes = [
...new Set(
messages
// At the moment we only import food products. This can be removed when we import all flavours
.filter((m) => m.message.product_type === 'food')
.map((m) => m.message.code),
),
];
if (productCodes.length) {
const filter = { code: { $in: productCodes } };
await this.importService.importWithFilter(filter, ProductSource.EVENT);
}
}

// Update counts on product_update after products have been imported
Expand Down
75 changes: 74 additions & 1 deletion src/domain/services/redis.listener.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import { MessagesService } from './messages.service';
jest.setTimeout(300000);

describe('receiveMessages', () => {
it('should call importWithFilter when a message is received', async () => {
// Note following code can be removed when the new event is being published
it('should call importWithFilter when a message is received on the old stream', async () => {
await createTestingModule([DomainModule], async (app) => {
// GIVEN: Redis is running
const redis = await new GenericContainer('redis')
Expand Down Expand Up @@ -43,6 +44,77 @@ describe('receiveMessages', () => {
const messageId = await client.xAdd('product_updates_off', '*', {
code: code1,
rev: '1',
product_type: 'food',
});

// Wait for message to be delivered
await setTimeout(100);

// Then the import is called
expect(importSpy).toHaveBeenCalledTimes(1);
expect(await settings.getLastMessageId()).toBe(messageId);

// If a new message is added
importSpy.mockClear();
await client.xAdd('product_updates_off', '*', {
code: code2,
product_type: 'food',
});

// Wait for message to be delivered
await setTimeout(100);

// Then import is called again but only with the new code
expect(importSpy).toHaveBeenCalledTimes(1);
const codes = importSpy.mock.calls[0][0].code.$in;
expect(codes).toHaveLength(1);
expect(codes[0]).toBe(code2);

// Update events are created
const events =
await sql`SELECT * FROM product_update_event WHERE message->>'code' = ${code1}`;

expect(events).toHaveLength(1);
expect(events[0].message_id).toBe(messageId);
} finally {
await client.quit();
await redisListener.stopRedisConsumer();
await redis.stop();
}
});
});

it('should call importWithFilter when a message is received', async () => {
await createTestingModule([DomainModule], async (app) => {
// GIVEN: Redis is running
const redis = await new GenericContainer('redis')
.withExposedPorts(6379)
.start();
const redisUrl = `redis://localhost:${redis.getMappedPort(6379)}`;
const settings = app.get(SettingsService);
jest.spyOn(settings, 'getRedisUrl').mockImplementation(() => redisUrl);

// And lastmessageid is zero
await settings.setLastMessageId('0');
const importService = app.get(ImportService);
const importSpy = jest
.spyOn(importService, 'importWithFilter')
.mockImplementation();

const redisListener = app.get(RedisListener);
await redisListener.startRedisConsumer();

const client = createClient({ url: redisUrl });
await client.connect();
try {
const code1 = randomCode();
const code2 = randomCode();

// When: A message is sent
const messageId = await client.xAdd('product_updates', '*', {
code: code1,
product_type: 'food',
rev: '1',
});

// Wait for message to be delivered
Expand All @@ -56,6 +128,7 @@ describe('receiveMessages', () => {
importSpy.mockClear();
await client.xAdd('product_updates_off', '*', {
code: code2,
product_type: 'food',
});

// Wait for message to be delivered
Expand Down
10 changes: 10 additions & 0 deletions src/domain/services/redis.listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export class RedisListener {
[
// XREAD can read from multiple streams, starting at a
// different ID for each...
{
key: 'product_updates',
id: lastMessageId,
},
// Following can be deleted after PO is updated to use generic stream name
// Note should strictly have a different message id but PO will take more than one millisecond to switch
// so shouldn't be an issue
{
key: 'product_updates_off',
id: lastMessageId,
Expand All @@ -53,8 +60,11 @@ export class RedisListener {
if (messages?.length) {
/** Message looks like this:
{
timestamp: 123456789,
code: "0850026029062",
rev: 2,
flavor: "off",
product_type: "food",
user_id: "stephane",
action: "updated",
comment: "Modification : Remove changes",
Expand Down

0 comments on commit bfa83f1

Please sign in to comment.