8000 fix(cache): watch errors must call done handler · kubernetes-client/javascript@a88bb13 · GitHub
[go: up one dir, main page]

Skip to content

Commit a88bb13

Browse files
bbathabrendandburns
authored andcommitted
fix(cache): watch errors must call done handler
The type of `watchObject` was incorrect and has been updated to match the actual request body. Using this info it was clear that 'ERROR' events were not being handled correctly. When the watch receives an error it is not always an http status code, because the status code can only be sent when the stream is starting. This means that `410` resourceVersion out of date errors could only be handled if they were detected before the watch stream started leaving watches running on channels that would never receive more events and not notifying `ListWatch` consumers of the error.
1 parent 3bd21ad commit a88bb13

File tree

2 files changed

+98
-7
lines changed

2 files changed

+98
-7
lines changed

src/cache.ts

+13-5
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,10 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
140140

141141
private async doneHandler(err: any): Promise<void> {
142142
this._stop();
143-
if (err && err.statusCode === 410) {
143+
if (
144+
err &&
145+
((err as { statusCode?: number }).statusCode === 410 || (err as { code?: number }).code === 410)
146+
) {
144147
this.resourceVersion = '';
145148
} else if (err) {
146149
this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err));
@@ -192,7 +195,11 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
192195
});
193196
}
194197

195-
private watchHandler(phase: string, obj: T, watchObj?: any): void {
198+
private async watchHandler(
199+
phase: string,
200+
obj: T,
201+
watchObj?: { type: string; object: KubernetesObject },
202+
): Promise<void> {
196203
switch (phase) {
197204
case 'ERROR':
198205
if ((obj as { code?: number }).code === 410) {
@@ -214,10 +221,11 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
214221
case 'BOOKMARK':
215222
// nothing to do, here for documentation, mostly.
216223
break;
224+
case 'ERROR':
225+
await this.doneHandler(obj);
226+
return;
217227
}
218-
if (watchObj && watchObj.metadata) {
219-
this.resourceVersion = watchObj.metadata.resourceVersion;
220-
}
228+
this.resourceVersion = obj.metadata!.resourceVersion || '';
221229
}
222230
}
223231

src/cache_test.ts

+85-2
Original file line numberDiff line numberDiff line change
@@ -1205,9 +1205,10 @@ describe('ListWatchCache', () => {
12051205
{
12061206
metadata: {
12071207
name: 'name3',
1208+
resourceVersion: '23456',
12081209
} as V1ObjectMeta,
12091210
} as V1Namespace,
1210-
{ metadata: { resourceVersion: '23456' } },
1211+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
12111212
);
12121213

12131214
await informer.stop();
@@ -1259,9 +1260,91 @@ describe('ListWatchCache', () => {
12591260
{
12601261
metadata: {
12611262
name: 'name3',
1263+
resourceVersion: '23456',
12621264
} as V1ObjectMeta,
12631265
} as V1Namespace,
1264-
{ metadata: { resourceVersion: '23456' } },
1266+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
1267+
);
1268+
1269+
await informer.stop();
1270+
1271+
let errorEmitted = false;
1272+
informer.on('error', () => (errorEmitted = true));
1273+
1274+
promise = new Promise((resolve) => {
1275+
mock.when(
1276+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1277+
).thenCall(() => {
1278+
resolve(new FakeRequest());
1279+
});
1280+
});
1281+
1282+
informer.start();
1283+
await promise;
1284+
1285+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
1286+
1287+
const object = {
1288+
kind: 'Status',
1289+
apiVersion: 'v1',
1290+
metadata: {},
1291+
status: 'Failure',
1292+
message: 'too old resource version: 12345 (1234)',
1293+
reason: 'Expired',
1294+
code: 410,
1295+
};
1296+
await watchHandler('ERROR', object, { type: 'ERROR', object });
1297+
1298+
mock.verify(
1299+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1300+
).thrice();
1301+
expect(errorEmitted).to.equal(false);
1302+
expect(listCalls).to.be.equal(2);
1303+
});
1304+
1305+
it('should list if the watch errors from the last version', async () => {
1306+
const fakeWatch = mock.mock(Watch);
1307+
const list: V1Pod[] = [];
1308+
const listObj = {
1309+
metadata: {
1310+
resourceVersion: '12345',
1311+
} as V1ListMeta,
1312+
items: list,
1313+
} as V1NamespaceList;
1314+
1315+
let listCalls = 0;
1316+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
1317+
response: http.IncomingMessage;
1318+
body: V1NamespaceList;
1319+
}> {
1320+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => {
1321+
listCalls++;
1322+
resolve({ response: {} as http.IncomingMessage, body: listObj });
1323+
});
1324+
};
1325+
let promise = new Promise((resolve) => {
1326+
mock.when(
1327+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1328+
).thenCall(() => {
1329+
resolve(new FakeRequest());
1330+
});
1331+
});
1332+
1333+
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
1334+
1335+
informer.start();
1336+
await promise;
1337+
1338+
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();
1339+
watchHandler(
1340+
'ADDED',
1341+
{
1342+
metadata: {
1343+
name: 'name3',
1344+
resourceVersion: '23456',
1345+
} as V1ObjectMeta,
1346+
} as V1Namespace,
1347+
{ type: 'ADDED', metadata: { resourceVersion: '23456' } },
12651348
);
12661349

12671350
await informer.stop();

0 commit comments

Comments
 (0)
0