Skip to content

Commit

Permalink
wrap dequeue and peek in runExclusive. Remove no longer needed code.
Browse files Browse the repository at this point in the history
  • Loading branch information
iartemiev committed Feb 5, 2021
1 parent 9ad562e commit f87c7b7
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 99 deletions.
142 changes: 46 additions & 96 deletions packages/datastore/src/sync/outbox.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { MutationEvent } from './index';
import { ModelPredicateCreator } from '../predicates';
import { ExclusiveStorage as Storage, StorageFacade } from '../storage/storage';
import {
ExclusiveStorage as Storage,
StorageFacade,
Storage as StorageClass,
} from '../storage/storage';
import {
InternalSchema,
NamespaceResolver,
Expand Down Expand Up @@ -28,9 +32,19 @@ class MutationEventOutbox {
mutationEvent: MutationEvent
): Promise<void> {
storage.runExclusive(async s => {
const predicate = this.currentPredicate(mutationEvent);
const existing = await s.query(this.MutationEvent, predicate);
const [first] = existing;
const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];

const predicate = ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c =>
c
.modelId('eq', mutationEvent.modelId)
.id('ne', this.inProgressMutationEventId)
);

const [first] = await s.query(this.MutationEvent, predicate);

if (first === undefined) {
await s.save(mutationEvent, undefined, this.ownSymbol);
Expand All @@ -41,12 +55,9 @@ class MutationEventOutbox {

if (first.operation === TransformerMutationType.CREATE) {
if (incomingMutationType === TransformerMutationType.DELETE) {
// get predicate again to avoid race condition with inProgressMutationEventId
const predicate = this.currentPredicate(mutationEvent);
// delete all for model
await s.delete(this.MutationEvent, predicate);
} else {
// first gets updated with incoming's data, condition intentionally skiped
// first gets updated with incoming's data, condition intentionally skipped
await s.save(
this.MutationEvent.copyOf(first, draft => {
draft.data = mutationEvent.data;
Expand All @@ -59,32 +70,19 @@ class MutationEventOutbox {
const { condition: incomingConditionJSON } = mutationEvent;
const incomingCondition = JSON.parse(incomingConditionJSON);

const updated = await this.reconcileOutboxOnEnqueue(
existing,
mutationEvent
);

// If no condition
if (Object.keys(incomingCondition).length === 0) {
// get predicate again to avoid race condition with inProgressMutationEventId
const predicate = this.currentPredicate(mutationEvent);
// delete all for model
await s.delete(this.MutationEvent, predicate);
}

if (updated) {
await s.save(updated, undefined, this.ownSymbol);
return;
}

// Enqueue new one
await s.save(mutationEvent, undefined, this.ownSymbol);
}
});
}

public async dequeue(
storage: Storage,
storage: StorageClass,
record?: PersistentModel
): Promise<MutationEvent> {
const head = await this.peek(storage);
Expand Down Expand Up @@ -141,94 +139,46 @@ class MutationEventOutbox {
return result;
}

private async reconcileOutboxOnEnqueue(
existing: MutationEvent[],
mutationEvent: MutationEvent
): Promise<MutationEvent | undefined> {
const { _version, _lastChangedAt } = existing.reduce(
(acc, cur) => {
const oldData = JSON.parse(cur.data);
const { _version: lastVersion } = acc;
const { _version: _v, _lastChangedAt: _lCA } = oldData;

if (_v > lastVersion) {
return { _version: _v, _lastChangedAt: _lCA };
}

return acc;
},
{
_version: 0,
_lastChangedAt: 0,
}
);

const currentData = JSON.parse(mutationEvent.data);
const currentVersion = currentData._version;

if (currentVersion < _version) {
const newData = { ...currentData, _version, _lastChangedAt };
const newMutation = new this.MutationEvent({
...mutationEvent,
data: JSON.stringify(newData),
});
return newMutation;
}
}

private async reconcileOutboxOnDequeue(
storage: Storage,
storage: StorageClass,
record: PersistentModel
): Promise<void> {
storage.runExclusive(async s => {
const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];
const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];

const predicate = ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c => c.modelId('eq', record.id).id('ne', this.inProgressMutationEventId)
);
const predicate = ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c => c.modelId('eq', record.id).id('ne', this.inProgressMutationEventId)
);

const outdatedMutations = await s.query(this.MutationEvent, predicate);
const outdatedMutations = await storage.query(
this.MutationEvent,
predicate
);

if (!outdatedMutations.length) {
return;
}
if (!outdatedMutations.length) {
return;
}

const { _version, _lastChangedAt } = record;
const { _version, _lastChangedAt } = record;

const reconciledMutations = outdatedMutations.map(m => {
const oldData = JSON.parse(m.data);
const reconciledMutations = outdatedMutations.map(m => {
const oldData = JSON.parse(m.data);

const newData = { ...oldData, _version, _lastChangedAt };
const newData = { ...oldData, _version, _lastChangedAt };

return this.MutationEvent.copyOf(m, draft => {
draft.data = JSON.stringify(newData);
});
return this.MutationEvent.copyOf(m, draft => {
draft.data = JSON.stringify(newData);
});

await s.delete(this.MutationEvent, predicate);

await Promise.all(
reconciledMutations.map(
async m => await s.save(m, undefined, this.ownSymbol)
)
);
});
}

private currentPredicate(mutationEvent: MutationEvent) {
const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[
'MutationEvent'
];
await storage.delete(this.MutationEvent, predicate);

return ModelPredicateCreator.createFromExisting<MutationEvent>(
mutationEventModelDefinition,
c =>
c
.modelId('eq', mutationEvent.modelId)
.id('ne', this.inProgressMutationEventId)
await Promise.all(
reconciledMutations.map(
async m => await storage.save(m, undefined, this.ownSymbol)
)
);
}
}
Expand Down
13 changes: 10 additions & 3 deletions packages/datastore/src/sync/processors/mutation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,21 @@ class MutationProcessor {

if (result === undefined) {
logger.debug('done retrying');
await this.outbox.dequeue(this.storage);
await this.storage.runExclusive(async storage => {
await this.outbox.dequeue(storage);
});
continue;
}

const record = result.data[opName];
await this.outbox.dequeue(this.storage, record);
let hasMore = false;

const hasMore = (await this.outbox.peek(this.storage)) !== undefined;
await this.storage.runExclusive(async storage => {
// using runExclusive to prevent possible race condition
// when another record gets enqueued between dequeue and peek
await this.outbox.dequeue(storage, record);
hasMore = (await this.outbox.peek(storage)) !== undefined;
});

this.observer.next({
operation,
Expand Down

0 comments on commit f87c7b7

Please sign in to comment.