8000 fix(replay): Handle compression failures more robustly (#6988) · akaedu2012/sentry-javascript@5c41ab7 · GitHub
[go: up one dir, main page]

Skip to content

Commit 5c41ab7

Browse files
authored
fix(replay): Handle compression failures more robustly (getsentry#6988)
We now stop the recording when either `addEvent` or `finish` fail.
1 parent a8d8dfa commit 5c41ab7

21 files changed

+537
-460
lines changed

packages/replay/.eslintrc.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ module.exports = {
1313
// TODO: figure out if we need a worker-specific tsconfig
1414
project: ['tsconfig.worker.json'],
1515
},
16+
rules: {
17+
// We cannot use backticks, as that conflicts with the stringified worker
18+
'prefer-template': 'off',
19+
},
1620
},
1721
{
1822
files: ['src/worker/**/*.js'],

packages/replay/src/eventBuffer/EventBufferArray.ts

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,38 +5,31 @@ import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
55
* Used as fallback if the compression worker cannot be loaded or is disabled.
66
*/
77
export class EventBufferArray implements EventBuffer {
8-
private _events: RecordingEvent[];
8+
/** All the events that are buffered to be sent. */
9+
public events: RecordingEvent[];
910

1011
public constructor() {
11-
this._events = [];
12+
this.events = [];
1213
}
1314

1415
/** @inheritdoc */
15-
public get pendingLength(): number {
16-
return this._events.length;
17-
}
18-
19-
/**
20-
* Returns the raw events that are buffered. In `EventBufferArray`, this is the
21-
* same as `this._events`.
22-
*/
23-
public get pendingEvents(): RecordingEvent[] {
24-
return this._events;
16+
public get hasEvents(): boolean {
17+
return this.events.length > 0;
2518
}
2619

2720
/** @inheritdoc */
2821
public destroy(): void {
29-
this._events = [];
22+
this.events = [];
3023
}
3124

3225
/** @inheritdoc */
3326
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
3427
if (isCheckout) {
35-
this._events = [event];
28+
this.events = [event];
3629
return;
3730
}
3831

39-
this._events.push(event);
32+
this.events.push(event);
4033
return;
4134
}
4235

@@ -46,8 +39,8 @@ export class EventBufferArray implements EventBuffer {
4639
// Make a copy of the events array reference and immediately clear the
4740
// events member so that we do not lose new events while uploading
4841
// attachment.
49-
const eventsRet = this._events;
50-
this._events = [];
42+
const eventsRet = this.events;
43+
this.events = [];
5144
resolve(JSON.stringify(eventsRet));
5245
});
5346
}
Lines changed: 23 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,36 @@
11
import type { ReplayRecordingData } from '@sentry/types';
2-
import { logger } from '@sentry/utils';
32

4-
import type { AddEventResult, EventBuffer, RecordingEvent, WorkerRequest, WorkerResponse } from '../types';
3+
import type { AddEventResult, EventBuffer, RecordingEvent } from '../types';
4+
import { WorkerHandler } from './WorkerHandler';
55

66
/**
77
* Event buffer that uses a web worker to compress events.
88
* Exported only for testing.
99
*/
1010
export class EventBufferCompressionWorker implements EventBuffer {
11-
/**
12-
* Keeps track of the list of events since the last flush that have not been compressed.
13-
* For example, page is reloaded and a flush attempt is made, but
14-
* `finish()` (and thus the flush), does not complete.
15-
*/
16-
public _pendingEvents: RecordingEvent[] = [];
11+
/** @inheritdoc */
12+
public hasEvents: boolean;
1713

18-
private _worker: Worker;
19-
private _eventBufferItemLength: number = 0;
20-
private _id: number = 0;
21-
private _ensureReadyPromise?: Promise<void>;
14+
private _worker: WorkerHandler;
2215

2316
public constructor(worker: Worker) {
24-
this._worker = worker;
25-
}
26-
27-
/**
28-
* The number of raw events that are buffered. This may not be the same as
29-
* the number of events that have been compresed in the worker because
30-
* `addEvent` is async.
31-
*/
32-
public get pendingLength(): number {
33-
return this._eventBufferItemLength;
34-
}
35-
36-
/**
37-
* Returns a list of the raw recording events that are being compressed.
38-
*/
39-
public get pendingEvents(): RecordingEvent[] {
40-
return this._pendingEvents;
17+
this._worker = new WorkerHandler(worker);
18+
this.hasEvents = false;
4119
}
4220

4321
/**
4422
* Ensure the worker is ready (or not).
4523
* This will either resolve when the worker is ready, or reject if an error occured.
4624
*/
4725
public ensureReady(): Promise<void> {
48-
// Ensure we only check once
49-
if (this._ensureReadyPromise) {
50-
return this._ensureReadyPromise;
51-
}
52-
53-
this._ensureReadyPromise = new Promise((resolve, reject) => {
54-
this._worker.addEventListener(
55-
'message',
56-
({ data }: MessageEvent) => {
57-
if ((data as WorkerResponse).success) {
58-
resolve();
59-
} else {
60-
reject();
61-
}
62-
},
63-
{ once: true },
64-
);
65-
66-
this._worker.addEventListener(
67-
'error',
68-
error => {
69-
reject(error);
70-
},
71-
{ once: true },
72-
);
73-
});
74-
75-
return this._ensureReadyPromise;
26+
return this._worker.ensureReady();
7627
}
7728

7829
/**
7930
* Destroy the event buffer.
8031
*/
8132
public destroy(): void {
82-
__DEBUG_BUILD__ && logger.log('[Replay] Destroying compression worker');
83-
this._worker.terminate();
33+
this._worker.destroy();
8434
}
8535

8636
/**
@@ -89,19 +39,12 @@ export class EventBufferCompressionWorker implements EventBuffer {
8939
* Returns true if event was successfuly received and processed by worker.
9040
*/
9141
public async addEvent(event: RecordingEvent, isCheckout?: boolean): Promise<AddEventResult> {
42+
this.hasEvents = true;
43+
9244
if (isCheckout) {
9345
// This event is a checkout, make sure worker buffer is cleared before
9446
// proceeding.
95-
await this._postMessage({
96-
id: this._getAndIncrementId(),
97-
method: 'init',
98-
args: [],
99-
});
100-
}
101-
102-
// Don't store checkout events in `_pendingEvents` because they are too large
103-
if (!isCheckout) {
104-
this._pendingEvents.push(event);
47+
await this._clear();
10548
}
10649

10750
return this._sendEventToWorker(event);
@@ -110,97 +53,30 @@ export class EventBufferCompressionWorker implements EventBuffer {
11053
/**
11154
* Finish the event buffer and return the compressed data.
11255
*/
113-
public async finish(): Promise<ReplayRecordingData> {
114-
try {
115-
return await this._finishRequest(this._getAndIncrementId());
116-
} catch (error) {
117-
__DEBUG_BUILD__ && logger.error('[Replay] Error when trying to compress events', error);
118-
// fall back to uncompressed
119-
const events = this.pendingEvents;
120-
return JSON.stringify(events);
121-
}
122-
}
123-
124-
/**
125-
* Post message to worker and wait for response before resolving promise.
126-
*/
127-
private _postMessage<T>({ id, method, args }: WorkerRequest): Promise<T> {
128-
return new Promise((resolve, reject) => {
129-
const listener = ({ data }: MessageEvent): void => {
130-
const response = data as WorkerResponse;
131-
if (response.method !== method) {
132-
return;
133-
}
134-
135-
// There can be multiple listeners for a single method, the id ensures
136-
// that the response matches the caller.
137-
if (response.id !== id) {
138-
return;
139-
}
140-
141-
// At this point, we'll always want to remove listener regardless of result status
142-
this._worker.removeEventListener('message', listener);
143-
144-
if (!response.success) {
145-
// TODO: Do some error handling, not sure what
146-
__DEBUG_BUILD__ && logger.error('[Replay]', response.response);
147-
148-
reject(new Error('Error in compression worker'));
149-
return;
150-
}
151-
152-
resolve(response.response as T);
153-
};
154-
155-
let stringifiedArgs;
156-
try {
157-
stringifiedArgs = JSON.stringify(args);
158-
} catch (err) {
159-
__DEBUG_BUILD__ && logger.error('[Replay] Error when trying to stringify args', err);
160-
stringifiedArgs = '[]';
161-
}
162-
163-
// Note: we can't use `once` option because it's possible it needs to
164-
// listen to multiple messages
165-
this._worker.addEventListener('message', listener);
166-
this._worker.postMessage({ id, method, args: stringifiedArgs });
167-
});
56+
public finish(): Promise<ReplayRecordingData> {
57+
return this._finishRequest();
16858
}
16959

17060
/**
17161
* Send the event to the worker.
17262
*/
173-
private async _sendEventToWorker(event: RecordingEvent): Promise<AddEventResult> {
174-
const promise = this._postMessage<void>({
175-
id: this._getAndIncrementId(),
176-
method: 'addEvent',
177-
args: [event],
178-
});
179-
180-
// XXX: See note in `get length()`
181-
this._eventBufferItemLength++;
182-
183-
return promise;
63+
private _sendEventToWorker(event: RecordingEvent): Promise<AddEventResult> {
64+
return this._worker.postMessage<void>('addEvent', JSON.stringify(event));
18465
}
18566

18667
/**
18768
* Finish the request and return the compressed data from the worker.
18869
*/
189-
private async _finishRequest(id: number): Promise<Uint8Array> {
190-
const promise = this._postMessage<Uint8Array>({ id, method: 'finish', args: [] });
191-
192-
// XXX: See note in `get length()`
193-
this._eventBufferItemLength = 0;
194-
195-
await promise;
70+
private async _finishRequest(): Promise<Uint8Array> {
71+
const response = await this._worker.postMessage<Uint8Array>('finish');
19672

197-
this._pendingEvents = [];
73+
this.hasEvents = false;
19874

199-
return promise;
75+
return response;
20076
}
20177

202-
/** Get the current ID and increment it for the next call. */
203-
private _getAndIncrementId(): number {
204-
return this._id++;
78+
/** Clear any pending events from the worker. */
79+
private _clear(): Promise<void> {
80+
return this._worker.postMessage('clear');
20581
}
20682
}

packages/replay/src/eventBuffer/EventBufferProxy.ts

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,12 @@ export class EventBufferProxy implements EventBuffer {
2121
this._compression = new EventBufferCompressionWorker(worker);
2222
this._used = this._fallback;
2323

24-
this._ensureWorkerIsLoadedPromise = this._ensureWorkerIsLoaded().catch(() => {
25-
// Ignore errors here
26-
});
24+
this._ensureWorkerIsLoadedPromise = this._ensureWorkerIsLoaded();
2725
}
2826

2927
/** @inheritDoc */
30-
public get pendingLength(): number {
31-
return this._used.pendingLength;
32-
}
33-
34-
/** @inheritDoc */
35-
public get pendingEvents(): RecordingEvent[] {
36-
return this._used.pendingEvents;
28+
public get hasEvents(): boolean {
29+
return this._used.hasEvents;
3730
}
3831

3932
/** @inheritDoc */
@@ -75,18 +68,28 @@ export class EventBufferProxy implements EventBuffer {
7568
return;
7669
}
7770

78-
// Compression worker is ready, we can use it
7971
// Now we need to switch over the array buffer to the compression worker
72+
await this._switchToCompressionWorker();
73+
}
74+
75+
/** Switch the used buffer to the compression worker. */
76+
private async _switchToCompressionWorker(): Promise<void> {
77+
const { events } = this._fallback;
78+
8079
const addEventPromises: Promise<void>[] = [];
81-
for (const event of this._fallback.pendingEvents) {
80+
for (const event of events) {
8281
addEventPromises.push(this._compression.addEvent(event));
8382
}
8483

85-
// We switch over to the compression buffer immediately - any further events will be added
84+
// We switch over to the new buffer immediately - any further events will be added
8685
// after the previously buffered ones
8786
this._used = this._compression;
8887

8988
// Wait for original events to be re-added before resolving
90-
await Promise.all(addEventPromises);
89+
try {
90+
await Promise.all(addEventPromises);
91+
} catch (error) {
92+
__DEBUG_BUILD__ && logger.warn('[Replay] Failed to add events when switching buffers.', error);
93+
}
9194
}
9295
}

0 commit comments

Comments
 (0)
0