Skip to content

Commit

Permalink
fix(replay): Fixes potential out-of-order segments (#13609)
Browse files Browse the repository at this point in the history
This fixes a potential issue where segments can come in out of order due
to our flush "lock" was not being respected in two cases:

1) No current flush in progress, but flush throws an error (this should
be rare as the common errors that get thrown should stop the replay
completely)
2) Flush is in progress, which skips the code block that releases lock
and then calls debouncedFlush. This leaves the lock always set to a
resolved (or rejected) promise.

This ultimately should not change too much as the flush calls are
debounced anyway, but this cleans up the code a bit and also logs any
exceptions that may occur. However this can fix issues where segments
can come in out of order depending on how long the send request takes.
e.g.


![image](https://github.com/user-attachments/assets/ea304892-1c72-4e96-acc6-c714d263980c)


where ideally it looks like

![image](https://github.com/user-attachments/assets/8c3e706c-d3b2-43bd-a970-561b32b05458)
  • Loading branch information
billyvg authored Sep 10, 2024
1 parent d4a2fcd commit 8fcee67
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 13 deletions.
28 changes: 15 additions & 13 deletions packages/replay-internal/src/replay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1226,27 +1226,29 @@ export class ReplayContainer implements ReplayContainerInterface {
// TODO FN: Evaluate if we want to stop here, or remove this again?
}

// this._flushLock acts as a lock so that future calls to `_flush()`
// will be blocked until this promise resolves
const _flushInProgress = !!this._flushLock;

// this._flushLock acts as a lock so that future calls to `_flush()` will
// be blocked until current flush is finished (i.e. this promise resolves)
if (!this._flushLock) {
this._flushLock = this._runFlush();
await this._flushLock;
this._flushLock = undefined;
return;
}

// Wait for previous flush to finish, then call the debounced `_flush()`.
// It's possible there are other flush requests queued and waiting for it
// to resolve. We want to reduce all outstanding requests (as well as any
// new flush requests that occur within a second of the locked flush
// completing) into a single flush.

try {
await this._flushLock;
} catch (err) {
DEBUG_BUILD && logger.error(err);
this.handleException(err);
} finally {
this._debouncedFlush();
this._flushLock = undefined;

if (_flushInProgress) {
// Wait for previous flush to finish, then call the debounced
// `_flush()`. It's possible there are other flush requests queued and
// waiting for it to resolve. We want to reduce all outstanding
// requests (as well as any new flush requests that occur within a
// second of the locked flush completing) into a single flush.
this._debouncedFlush();
}
}
};

Expand Down
58 changes: 58 additions & 0 deletions packages/replay-internal/test/integration/flush.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,64 @@ describe('Integration | flush', () => {
await replay.start();
});

it('resets flush lock if runFlush rejects/throws', async () => {
mockRunFlush.mockImplementation(
() =>
new Promise((resolve, reject) => {
reject(new Error('runFlush'));
}),
);
try {
await replay['_flush']();
} catch {
// do nothing
}
expect(replay['_flushLock']).toBeUndefined();
});

it('resets flush lock when flush is called multiple times before it resolves', async () => {
let _resolve;
mockRunFlush.mockImplementation(
() =>
new Promise(resolve => {
_resolve = resolve;
}),
);
const mockDebouncedFlush: MockedFunction<ReplayContainer['_debouncedFlush']> = vi.spyOn(replay, '_debouncedFlush');
mockDebouncedFlush.mockImplementation(vi.fn);
mockDebouncedFlush.cancel = vi.fn();

const results = [replay['_flush'](), replay['_flush']()];
expect(replay['_flushLock']).not.toBeUndefined();

_resolve && _resolve();
await Promise.all(results);
expect(replay['_flushLock']).toBeUndefined();
mockDebouncedFlush.mockRestore();
});

it('resets flush lock when flush is called multiple times before it rejects', async () => {
let _reject;
mockRunFlush.mockImplementation(
() =>
new Promise((_, reject) => {
_reject = reject;
}),
);
const mockDebouncedFlush: MockedFunction<ReplayContainer['_debouncedFlush']> = vi.spyOn(replay, '_debouncedFlush');
mockDebouncedFlush.mockImplementation(vi.fn);
mockDebouncedFlush.cancel = vi.fn();
expect(replay['_flushLock']).toBeUndefined();
replay['_flush']();
const result = replay['_flush']();
expect(replay['_flushLock']).not.toBeUndefined();

_reject && _reject(new Error('Throw runFlush'));
await result;
expect(replay['_flushLock']).toBeUndefined();
mockDebouncedFlush.mockRestore();
});

/**
* Assuming the user wants to record a session
* when calling flush() without replay being enabled
Expand Down

0 comments on commit 8fcee67

Please sign in to comment.