Skip to content

Commit

Permalink
feat(core): Make event bus subscriptions transaction-safe
Browse files Browse the repository at this point in the history
Fixes #1107. Relates to #520.
This commit introduces a new mechanism to guarantee that event subscribers do not run into issues
with database transactions. It works by postponing the publishing on an event until the
associated transaction has completed. Thus the subscriber can be assured that all data changes
in the transaction are available to read right away.
  • Loading branch information
michaelbromley committed Sep 29, 2021
1 parent d35306f commit f0fd662
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 44 deletions.
36 changes: 31 additions & 5 deletions packages/core/e2e/database-transactions.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ import { mergeConfig } from '@vendure/core';
import { createTestEnvironment } from '@vendure/testing';
import gql from 'graphql-tag';
import path from 'path';
import { ReplaySubject } from 'rxjs';

import { initialData } from '../../../e2e-common/e2e-initial-data';
import { testConfig, TEST_SETUP_TIMEOUT_MS } from '../../../e2e-common/test-config';

import { TransactionTestPlugin, TRIGGER_EMAIL } from './fixtures/test-plugins/transaction-test-plugin';
import {
TransactionTestPlugin,
TRIGGER_ATTEMPTED_READ_EMAIL,
TRIGGER_ATTEMPTED_UPDATE_EMAIL,
} from './fixtures/test-plugins/transaction-test-plugin';

describe('Transaction infrastructure', () => {
const { server, adminClient } = createTestEnvironment(
Expand Down Expand Up @@ -104,13 +109,25 @@ describe('Transaction infrastructure', () => {

// Testing https://github.com/vendure-ecommerce/vendure/issues/520
it('passing transaction via EventBus', async () => {
TransactionTestPlugin.errorHandler.mockClear();
TransactionTestPlugin.reset();
const { createTestAdministrator } = await adminClient.query(CREATE_ADMIN, {
emailAddress: TRIGGER_EMAIL,
emailAddress: TRIGGER_ATTEMPTED_UPDATE_EMAIL,
fail: false,
});
await TransactionTestPlugin.eventHandlerComplete$.toPromise();
expect(createTestAdministrator.emailAddress).toBe(TRIGGER_EMAIL);
expect(createTestAdministrator.emailAddress).toBe(TRIGGER_ATTEMPTED_UPDATE_EMAIL);
expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled();
});

// Testing https://github.com/vendure-ecommerce/vendure/issues/1107
it('passing transaction via EventBus with delay in committing transaction', async () => {
TransactionTestPlugin.reset();
const { createTestAdministrator4 } = await adminClient.query(CREATE_ADMIN4, {
emailAddress: TRIGGER_ATTEMPTED_READ_EMAIL,
fail: false,
});
await TransactionTestPlugin.eventHandlerComplete$.toPromise();
expect(createTestAdministrator4.emailAddress).toBe(TRIGGER_ATTEMPTED_READ_EMAIL);
expect(TransactionTestPlugin.errorHandler).not.toHaveBeenCalled();
});
});
Expand Down Expand Up @@ -145,14 +162,23 @@ const CREATE_ADMIN2 = gql`
`;

const CREATE_ADMIN3 = gql`
mutation CreateTestAdmin2($emailAddress: String!, $fail: Boolean!) {
mutation CreateTestAdmin3($emailAddress: String!, $fail: Boolean!) {
createTestAdministrator3(emailAddress: $emailAddress, fail: $fail) {
...CreatedAdmin
}
}
${ADMIN_FRAGMENT}
`;

const CREATE_ADMIN4 = gql`
mutation CreateTestAdmin4($emailAddress: String!, $fail: Boolean!) {
createTestAdministrator4(emailAddress: $emailAddress, fail: $fail) {
...CreatedAdmin
}
}
${ADMIN_FRAGMENT}
`;

const VERIFY_TEST = gql`
query VerifyTest {
verify {
Expand Down
8 changes: 4 additions & 4 deletions packages/core/e2e/default-search-plugin.e2e-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import {
UpdateCollection,
UpdateProduct,
UpdateProductVariants,
UpdateTaxRate
UpdateTaxRate,
} from './graphql/generated-e2e-admin-types';
import { LogicalOperator, SearchProductsShop } from './graphql/generated-e2e-shop-types';
import {
Expand Down Expand Up @@ -565,7 +565,7 @@ describe('Default search plugin', () => {
},
);
expect(result.search.collections).toEqual([
{collection: {id: 'T_2', name: 'Plants',},count: 3,},
{ collection: { id: 'T_2', name: 'Plants' }, count: 3 },
]);
});

Expand All @@ -579,8 +579,8 @@ describe('Default search plugin', () => {
},
);
expect(result.search.collections).toEqual([
{collection: {id: 'T_2', name: 'Plants',},count: 3,},
]);
{ collection: { id: 'T_2', name: 'Plants' }, count: 3 },
]);
});

it('encodes the productId and productVariantId', async () => {
Expand Down
36 changes: 32 additions & 4 deletions packages/core/e2e/fixtures/test-plugins/transaction-test-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ export class TestEvent extends VendureEvent {
}
}

export const TRIGGER_EMAIL = 'trigger-email';
export const TRIGGER_ATTEMPTED_UPDATE_EMAIL = 'trigger-attempted-update-email';
export const TRIGGER_ATTEMPTED_READ_EMAIL = 'trigger-attempted-read-email';

@Injectable()
class TestUserService {
Expand Down Expand Up @@ -99,6 +100,15 @@ class TestResolver {
return this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail);
}

@Mutation()
@Transaction()
async createTestAdministrator4(@Ctx() ctx: RequestContext, @Args() args: any) {
const admin = await this.testAdminService.createAdministrator(ctx, args.emailAddress, args.fail);
this.eventBus.publish(new TestEvent(ctx, admin));
await new Promise(resolve => setTimeout(resolve, 50));
return admin;
}

@Query()
async verify() {
const admins = await this.connection.getRepository(Administrator).find();
Expand All @@ -119,6 +129,7 @@ class TestResolver {
createTestAdministrator(emailAddress: String!, fail: Boolean!): Administrator
createTestAdministrator2(emailAddress: String!, fail: Boolean!): Administrator
createTestAdministrator3(emailAddress: String!, fail: Boolean!): Administrator
createTestAdministrator4(emailAddress: String!, fail: Boolean!): Administrator
}
type VerifyResult {
admins: [Administrator!]!
Expand All @@ -138,16 +149,33 @@ export class TransactionTestPlugin implements OnApplicationBootstrap {

constructor(private eventBus: EventBus, private connection: TransactionalConnection) {}

static reset() {
this.eventHandlerComplete$ = new ReplaySubject(1);
this.errorHandler.mockClear();
}

onApplicationBootstrap(): any {
// This part is used to test how RequestContext with transactions behave
// when used in an Event subscription
this.subscription = this.eventBus.ofType(TestEvent).subscribe(async event => {
const { ctx, administrator } = event;
if (administrator.emailAddress === TRIGGER_EMAIL) {
if (administrator.emailAddress === TRIGGER_ATTEMPTED_UPDATE_EMAIL) {
const adminRepository = this.connection.getRepository(ctx, Administrator);
await new Promise(resolve => setTimeout(resolve, 50));
administrator.lastName = 'modified';
try {
await new Promise(resolve => setTimeout(resolve, 50));
await this.connection.getRepository(ctx, Administrator).save(administrator);
await adminRepository.save(administrator);
} catch (e) {
TransactionTestPlugin.errorHandler(e);
} finally {
TransactionTestPlugin.eventHandlerComplete$.complete();
}
}
if (administrator.emailAddress === TRIGGER_ATTEMPTED_READ_EMAIL) {
// note the ctx is not passed here, so we are not inside the ongoing transaction
const adminRepository = this.connection.getRepository(Administrator);
try {
await adminRepository.findOneOrFail(administrator.id);
} catch (e) {
TransactionTestPlugin.errorHandler(e);
} finally {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/config/config.service.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export class MockConfigService implements MockClass<ConfigService> {
defaultLanguageCode: jest.Mock<any>;
roundingStrategy: {};
entityIdStrategy = new MockIdStrategy();
entityOptions = {};
assetOptions = {
assetNamingStrategy: {} as any,
assetStorageStrategy: {} as any,
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/connection/connection.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import { ConfigModule } from '../config/config.module';
import { ConfigService } from '../config/config.service';
import { TypeOrmLogger } from '../config/logger/typeorm-logger';

import { TransactionSubscriber } from './transaction-subscriber';
import { TransactionalConnection } from './transactional-connection';

let defaultTypeOrmModule: DynamicModule;

@Module({
imports: [],
providers: [TransactionalConnection],
exports: [TransactionalConnection],
providers: [TransactionalConnection, TransactionSubscriber],
exports: [TransactionalConnection, TransactionSubscriber],
})
export class ConnectionModule {
static forRoot(): DynamicModule {
Expand Down
67 changes: 67 additions & 0 deletions packages/core/src/connection/transaction-subscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { Injectable } from '@nestjs/common';
import { InjectConnection } from '@nestjs/typeorm';
import { merge, Subject } from 'rxjs';
import { filter, map, take } from 'rxjs/operators';
import { Connection, EntitySubscriberInterface } from 'typeorm';
import { EntityManager } from 'typeorm/entity-manager/EntityManager';
import { QueryRunner } from 'typeorm/query-runner/QueryRunner';
import { TransactionCommitEvent } from 'typeorm/subscriber/event/TransactionCommitEvent';
import { TransactionRollbackEvent } from 'typeorm/subscriber/event/TransactionRollbackEvent';

export interface TransactionSubscriberEvent {
/**
* Connection used in the event.
*/
connection: Connection;
/**
* QueryRunner used in the event transaction.
* All database operations in the subscribed event listener should be performed using this query runner instance.
*/
queryRunner: QueryRunner;
/**
* EntityManager used in the event transaction.
* All database operations in the subscribed event listener should be performed using this entity manager instance.
*/
manager: EntityManager;
}

/**
* This subscriber listens to all transaction commit/rollback events emitted by TypeORM
* so that we can be notified as soon as a particular queryRunner's transactions ends.
*
* This is used by the {@link EventBus} to prevent events from being published until their
* associated transactions are complete.
*/
@Injectable()
export class TransactionSubscriber implements EntitySubscriberInterface {
private commit$ = new Subject<TransactionSubscriberEvent>();
private rollback$ = new Subject<TransactionSubscriberEvent>();

constructor(@InjectConnection() private connection: Connection) {
if (!connection.subscribers.find(subscriber => subscriber.constructor === TransactionSubscriber)) {
connection.subscribers.push(this);
}
}

afterTransactionCommit(event: TransactionCommitEvent) {
this.commit$.next(event);
}

afterTransactionRollback(event: TransactionRollbackEvent) {
this.rollback$.next(event);
}

awaitRelease(queryRunner: QueryRunner): Promise<QueryRunner> {
if (queryRunner.isTransactionActive) {
return merge(this.commit$, this.rollback$)
.pipe(
filter(event => event.queryRunner === queryRunner),
take(1),
map(event => event.queryRunner),
)
.toPromise();
} else {
return Promise.resolve(queryRunner);
}
}
}
8 changes: 2 additions & 6 deletions packages/core/src/connection/transactional-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ export interface GetEntityOrThrowOptions<T = any> extends FindOneOptions<T> {
/**
* @description
* If set to a positive integer, it will retry getting the entity in case it is initially not
* found. This can be useful when working with the {@link EventBus} and subscribing to the
* creation of new Entities which may on first attempt be inaccessible due to an ongoing
* transaction.
* found.
*
* @since 1.1.0
* @default 0
Expand Down Expand Up @@ -157,9 +155,7 @@ export class TransactionalConnection {
/**
* @description
* Finds an entity of the given type by ID, or throws an `EntityNotFoundError` if none
* is found. Can be configured to retry (using the `retries` option) in the event of the
* entity not being found on the first attempt. This can be useful when attempting to access
* an entity which was just created and may be inaccessible due to an ongoing transaction.
* is found.
*/
async getEntityOrThrow<T extends VendureEntity>(
ctx: RequestContext,
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/event-bus/event-bus.module.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { Module } from '@nestjs/common';

import { ConnectionModule } from '../connection/connection.module';

import { EventBus } from './event-bus';

@Module({
imports: [ConnectionModule],
providers: [EventBus],
exports: [EventBus],
})
Expand Down
54 changes: 35 additions & 19 deletions packages/core/src/event-bus/event-bus.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { Injectable, OnModuleDestroy } from '@nestjs/common';
import { Type } from '@vendure/common/lib/shared-types';
import { Observable, Subject } from 'rxjs';
import { filter, takeUntil } from 'rxjs/operators';
import { filter, mergeMap, takeUntil } from 'rxjs/operators';
import { EntityManager } from 'typeorm';

import { RequestContext } from '../api/common/request-context';
import { TRANSACTION_MANAGER_KEY } from '../common/constants';
import { TransactionSubscriber } from '../connection/transaction-subscriber';

import { VendureEvent } from './vendure-event';

Expand Down Expand Up @@ -57,22 +58,30 @@ export class EventBus implements OnModuleDestroy {
private eventStream = new Subject<VendureEvent>();
private destroy$ = new Subject();

constructor(private transactionSubscriber: TransactionSubscriber) {}

/**
* @description
* Publish an event which any subscribers can react to.
*/
publish<T extends VendureEvent>(event: T): void {
this.eventStream.next(this.prepareRequestContext(event));
this.eventStream.next(event);
}

/**
* @description
* Returns an RxJS Observable stream of events of the given type.
* If the event contains a {@link RequestContext} object, the subscriber
* will only get called after any active database transactions are complete.
*
* This means that the subscriber function can safely access all updated
* data related to the event.
*/
ofType<T extends VendureEvent>(type: Type<T>): Observable<T> {
return this.eventStream.asObservable().pipe(
takeUntil(this.destroy$),
filter(e => (e as any).constructor === type),
mergeMap(event => this.awaitActiveTransactions(event)),
) as Observable<T>;
}

Expand All @@ -82,26 +91,33 @@ export class EventBus implements OnModuleDestroy {
}

/**
* If the Event includes a RequestContext property, we need to:
* If the Event includes a RequestContext property, we need to check for any active transaction
* associated with it, and if there is one, we await that transaction to either commit or rollback
* before publishing the event.
*
* The reason for this is that if the transaction is still active when event subscribers execute,
* this can cause a couple of issues:
*
* 1) Set it as a copy of the original
* 2) Remove the TRANSACTION_MANAGER_KEY from that copy
* 1. If the transaction hasn't completed by the time the subscriber runs, the new data inside
* the transaction will not be available to the subscriber.
* 2. If the subscriber gets a reference to the EntityManager which has an active transaction,
* and then the transaction completes, and then the subscriber attempts a DB operation using that
* EntityManager, a fatal QueryRunnerAlreadyReleasedError will be thrown.
*
* The TRANSACTION_MANAGER_KEY is used to track transactions across calls
* (this is why we always pass the `ctx` object to get TransactionalConnection.getRepository() method).
* However, allowing a transaction to continue in an async event subscriber function _will_ cause
* very confusing issues (see https://github.com/vendure-ecommerce/vendure/issues/520), which is why
* we simply remove the reference to the transaction manager from the context object altogether.
* For more context on these two issues, see:
*
* * https://github.com/vendure-ecommerce/vendure/issues/520
* * https://github.com/vendure-ecommerce/vendure/issues/1107
*/
private prepareRequestContext<T extends VendureEvent>(event: T): T {
for (const propertyName of Object.getOwnPropertyNames(event)) {
const property = event[propertyName as keyof T];
if (property instanceof RequestContext) {
const ctxCopy = property.copy();
delete (ctxCopy as any)[TRANSACTION_MANAGER_KEY];
(event[propertyName as keyof T] as any) = ctxCopy;
}
private async awaitActiveTransactions<T extends VendureEvent>(event: T): Promise<T> {
const ctx = Object.values(event).find(value => value instanceof RequestContext);
if (!ctx) {
return event;
}
const transactionManager: EntityManager | undefined = (ctx as any)[TRANSACTION_MANAGER_KEY];
if (!transactionManager?.queryRunner) {
return event;
}
return event;
return this.transactionSubscriber.awaitRelease(transactionManager.queryRunner).then(() => event);
}
}
Loading

0 comments on commit f0fd662

Please sign in to comment.