Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: subscriptions payloads _entity w/ relations fields #2626

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Updated send_notification PG function to include `_block height` and entity `_id` (#2626)

## [16.1.0] - 2024-12-11
### Changed
Expand Down
6 changes: 3 additions & 3 deletions packages/node-core/src/db/sync-helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ describe('sync helper test', () => {
// For that reason the behaviour is kept the same as before delete was fixed.
expect(listener).toHaveBeenNthCalledWith(
1,
`{"id": "1", "_entity": {"id": "1", "block_number": 1}, "mutation_type": "UPDATE"}`
`{"id": "1", "_entity": {"id": "1", "_id": "adde2f8c-cb87-4e84-9600-77f434556e6d", "block_number": 1}, "_block_height": 1, "mutation_type": "UPDATE"}`
);
expect(listener).toHaveBeenNthCalledWith(
2,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "UPDATE"}`
`{"id": "1", "_entity": {"id": "1", "_id": "9396aca4-cef2-4b52-98a7-c5f1ed3edb81", "block_number": 2}, "_block_height": 2, "mutation_type": "UPDATE"}`
);
expect(listener).toHaveBeenNthCalledWith(
3,
`{"id": "1", "_entity": {"id": "1", "block_number": 2}, "mutation_type": "DELETE"}`
`{"id": "1", "_entity": {"id": "1", "_id": "9396aca4-cef2-4b52-98a7-c5f1ed3edb81", "block_number": 2}, "_block_height": 2, "mutation_type": "DELETE"}`
);
}, 10_000);
});
Expand Down
6 changes: 3 additions & 3 deletions packages/node-core/src/db/sync-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ BEGIN
'mutation_type', TG_OP,
'_entity', row);
IF payload -> '_entity' ? '_block_range' then
payload = payload #- '{"_entity","_id"}';
payload = payload #- '{"_entity","_block_range"}';
payload = payload #- '{"_entity","_block_range"}';
payload = payload || jsonb_build_object('_block_height', lower(row._block_range));
IF NOT upper_inf(row._block_range) then
-- Check if a newer version of the entity exists to determine operation
-- Check if a newer version of the entity exists to determine operation
EXECUTE FORMAT(
'SELECT EXISTS (SELECT 1 FROM "${schema}".%I WHERE id = $1 AND lower(_block_range) = upper($2))',
TG_TABLE_NAME
Expand Down
3 changes: 3 additions & 0 deletions packages/query/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- Subscriptions `_entity` field now returns all properties (#2626)

## [2.19.0] - 2024-12-11
### Added
- Support for ordering with fulltext search (#2623)
Expand Down
2 changes: 1 addition & 1 deletion packages/query/src/graphql/graphql.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ export class GraphqlModule implements OnModuleInit, OnModuleDestroy {
path: WS_ROUTE,
});

this.wsCleanup = useServer({schema}, wsServer);
this.wsCleanup = useServer({schema, context: {pgClient: this.pgPool}}, wsServer);
}

app.use(PinoLogger(PinoConfig));
Expand Down
24 changes: 21 additions & 3 deletions packages/query/src/graphql/plugins/PgSubscriptionPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import {hashName} from '@subql/utils';
import {PgIntrospectionResultsByKind} from '@subql/x-graphile-build-pg';
import {makeExtendSchemaPlugin, gql, embed} from 'graphile-utils';
import {makeExtendSchemaPlugin, gql, embed, Resolvers} from 'graphile-utils';
import {DocumentNode} from 'graphql';

const filter = (event, args) => {
Expand All @@ -30,7 +30,7 @@ function makePayload(entityType: string): {type: DocumentNode; name: string} {
}

export const PgSubscriptionPlugin = makeExtendSchemaPlugin((build) => {
const {inflection, pgIntrospectionResultsByKind} = build;
const {inflection, pgIntrospectionResultsByKind, pgSql: sql} = build;

const typeDefs = [
gql`
Expand All @@ -42,7 +42,7 @@ export const PgSubscriptionPlugin = makeExtendSchemaPlugin((build) => {
`,
];

const resolvers: Record<string, any> = {};
const resolvers: Resolvers = {};

// Generate subscription fields for all database tables
(pgIntrospectionResultsByKind as PgIntrospectionResultsByKind).class.forEach((table) => {
Expand All @@ -65,6 +65,24 @@ export const PgSubscriptionPlugin = makeExtendSchemaPlugin((build) => {
)
}`
);

resolvers[payloadName] = {
_entity: {
resolve: async ({_block_height, _entity}, args, context, resolveInfo) => {
const [row] = await resolveInfo.graphile.selectGraphQLResultFromTable(
sql.identifier(table.namespace.name, table.name),
(tableAlias, queryBuilder) => {
queryBuilder.context.args ??= {};
queryBuilder.context.args.blockHeight = sql.fragment`${sql.value(_block_height.toString())}::bigint`;
queryBuilder.where(sql.fragment`${tableAlias}._id = ${sql.value(_entity._id)}`);
queryBuilder.limit(1);
}
);

return row;
},
},
};
});

return {
Expand Down