10000 Merge pull request #4688 from aramissennyeydd/sennyeya/weighted-opera… · nirshar/rushstack@b7704b4 · GitHub
[go: up one dir, main page]

Skip to content

Commit b7704b4

Browse files
authored
Merge pull request microsoft#4688 from aramissennyeydd/sennyeya/weighted-operation-reentry
fix(node-core-library): iterator weighting isn't fully respected by `Async#forEachAsync`
2 parents 890322a + 71d5e24 commit b7704b4

File tree

7 files changed

+95
-18
lines changed

7 files changed

+95
-18
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"changes": [
3+
{
4+
"packageName": "@microsoft/rush",
5+
"comment": "Fix an issue where operation weights were not respected.",
6+
"type": "none"
7+
}
8+
],
9+
"packageName": "@microsoft/rush"
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"changes": [
3+
{
4+
"packageName": "@rushstack/node-core-library",
5+
"comment": "Fix a bug in `Async.forEachAsync` where weight wasn't respected.",
6+
"type": "patch"
7+
}
8+
],
9+
"packageName": "@rushstack/node-core-library"
10+
}

libraries/node-core-library/src/Async.ts

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ export class Async {
151151
}
152152

153153
private static async _forEachWeightedAsync<TReturn, TEntry extends { weight: number; element: TReturn }>(
154-
iterable: Iterable<TEntry> | AsyncIterable<TEntry>,
154+
iterable: AsyncIterable<TEntry>,
155155
callback: (entry: TReturn, arrayIndex: number) => Promise<void>,
156156
options?: IAsyncParallelismOptions | undefined
157157
): Promise<void> {
@@ -160,10 +160,9 @@ export class Async {
160160
options?.concurrency && options.concurrency > 0 ? options.concurrency : Infinity;
161161
let concurrentUnitsInProgress: number = 0;
162162

163-
const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (
164-
(iterable as Iterable<TEntry>)[Symbol.iterator] ||
165-
(iterable as AsyncIterable<TEntry>)[Symbol.asyncIterator]
166-
).call(iterable);
163+
const iterator: Iterator<TEntry> | AsyncIterator<TEntry> = (iterable as AsyncIterable<TEntry>)[
164+
Symbol.asyncIterator
165+
].call(iterable);
167166

168167
let arrayIndex: number = 0;
169168
let iteratorIsComplete: boolean = false;
@@ -175,23 +174,29 @@ export class Async {
175174
!iteratorIsComplete &&
176175
!promiseHasResolvedOrRejected
177176
) {
178-
// Increment the concurrency while waiting for the iterator.
179-
// This function is reentrant, so this ensures that at most `concurrency` executions are waiting
180-
concurrentUnitsInProgress++;
177+
// Increment the current concurrency units in progress by the concurrency limit before fetching the iterator weight.
178+
// This function is reentrant, so this if concurrency is finite, at most 1 operation will be waiting. If it's infinite,
179+
// there will be effectively no cap on the number of operations waiting.
180+
const limitedConcurrency: number = !Number.isFinite(concurrency) ? 1 : concurrency;
181+
concurrentUnitsInProgress += limitedConcurrency;
181182
const currentIteratorResult: IteratorResult<TEntry> = await iterator.next();
182183
// eslint-disable-next-line require-atomic-updates
183184
iteratorIsComplete = !!currentIteratorResult.done;
184185

185186
if (!iteratorIsComplete) {
186187
const currentIteratorValue: TEntry = currentIteratorResult.value;
187188
Async.validateWeightedIterable(currentIteratorValue);
189+
// Cap the weight to concurrency, this allows 0 weight items to execute despite the concurrency limit.
188190
const weight: number = Math.min(currentIteratorValue.weight, concurrency);
189-
// If it's a weighted operation then add the rest of the weight, removing concurrent units if weight < 1.
190-
// Cap it to the concurrency limit, otherwise higher weights can cause issues in the case where 0 weighted
191-
// operations are present.
192-
concurrentUnitsInProgress += weight - 1;
191+
192+
// Remove the "lock" from the concurrency check and only apply the current weight.
193+
// This should allow other operations to execute.
194+
concurrentUnitsInProgress += weight;
195+
concurrentUnitsInProgress -= limitedConcurrency;
196+
193197
Promise.resolve(callback(currentIteratorValue.element, arrayIndex++))
194198
.then(async () => {
199+
// Remove the operation completely from the in progress units.
195200
concurrentUnitsInProgress -= weight;
196201
await onOperationCompletionAsync();
197202
})
@@ -201,7 +206,7 @@ export class Async {
201206
});
202207
} else {
203208
// The iterator is complete and there wasn't a value, so untrack the waiting state.
204-
concurrentUnitsInProgress--;
209+
concurrentUnitsInProgress -= limitedConcurrency;
205210
}
206211
}
207212

libraries/node-core-library/src/test/Async.test.ts

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,20 +273,21 @@ describe(Async.name, () => {
273273
[Symbol.asyncIterator]: () => asyncIterator
274274
};
275275

276-
const expectedConcurrency: 4 = 4;
277276
const finalPromise: Promise<void> = Async.forEachAsync(
278277
asyncIterable,
279278
async (item) => {
280279
// Do nothing
281280
},
282281
{
283-
concurrency: expectedConcurrency
282+
concurrency: 4
284283
}
285284
);
286285

287286
// Wait for all the instant resolutions to be done
288287
await Async.sleep(0);
289-
expect(waitingIterators).toEqual(expectedConcurrency);
288+
289+
// The final iteration cycle is locked, so only 1 iterator is waiting.
290+
expect(waitingIterators).toEqual(1);
290291
resolve2({ done: true, value: undefined });
291292
await finalPromise;
292293
});
@@ -494,6 +495,50 @@ describe(Async.name, () => {
494495
expect(fn).toHaveBeenCalledTimes(10);
495496
expect(maxRunning).toEqual(9);
496497
});
498+
499+
it('does not exceed the maxiumum concurrency for an async iterator when weighted', async () => {
500+
let waitingIterators: number = 0;
501+
502+
let resolve2!: (value: { done: true; value: undefined }) => void;
503+
const signal2: Promise<{ done: true; value: undefined }> = new Promise((resolve, reject) => {
504+
resolve2 = resolve;
505+
});
506+
507+
let iteratorIndex: number = 0;
508+
const asyncIterator: AsyncIterator<{ element: number; weight: number }> = {
509+
next: () => {
510+
iteratorIndex++;
511+
if (iteratorIndex < 20) {
512+
return Promise.resolve({ done: false, value: { element: iteratorIndex, weight: 2 } });
513+
} else {
514+
++waitingIterators;
515+
return signal2;
516+
}
517+
}
518+
};
519+
const asyncIterable: AsyncIterable<{ element: number; weight: number }> = {
520+
[Symbol.asyncIterator]: () => asyncIterator
521+
};
522+
523+
const finalPromise: Promise<void> = Async.forEachAsync(
524+
asyncIterable,
525+
async (item) => {
526+
// Do nothing
527+
},
528+
{
529+
concurrency: 4,
530+
weighted: true
531+
}
532+
);
533+
534+
// Wait for all the instant resolutions to be done
535+
await Async.sleep(0);
536+
537+
// The final iteration cycle is locked, so only 1 iterator is waiting.
538+
expect(waitingIterators).toEqual(1);
539+
resolve2({ done: true, value: undefined });
540+
await finalPromise;
541+
});
497542
});
498543

499544
describe(Async.runWithRetriesAsync.name, () => {

libraries/rush-lib/src/logic/operations/AsyncOperationQueue.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ export class AsyncOperationQueue
9393
}
9494
}
9595

96+
this.assignOperations();
97+
9698
if (this._completedOperations.size === this._totalOperations) {
9799
this._isDone = true;
98100
}

libraries/rush-lib/src/logic/operations/OperationExecutionManager.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,9 @@ export class OperationExecutionManager {
410410
if (record.status !== OperationStatus.RemoteExecuting) {
411411
// If the operation was not remote, then we can notify queue that it is complete
412412
this._executionQueue.complete(record);
413+
} else {
414+
// Attempt to requeue other operations if the operation was remote
415+
this._executionQueue.assignOperations();
413416
}
414417
}
415418
}

libraries/rush-lib/src/logic/operations/OperationExecutionRecord.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
9696
public readonly stdioSummarizer: StdioSummarizer = new StdioSummarizer();
9797

9898
public readonly runner: IOperationRunner;
99-
public readonly weight: number;
10099
public readonly associatedPhase: IPhase | undefined;
101100
public readonly associatedProject: RushConfigurationProject | undefined;
102101
public readonly _operationMetadataManager: OperationMetadataManager | undefined;
@@ -117,7 +116,6 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
117116

118117
this.operation = operation;
119118
this.runner = runner;
120-
this.weight = operation.weight;
121119
this.associatedPhase = associatedPhase;
122120
this.associatedProject = associatedProject;
123121
if (operation.associatedPhase && operation.associatedProject) {
@@ -134,6 +132,10 @@ export class OperationExecutionRecord implements IOperationRunnerContext {
134132
return this.runner.name;
135133
}
136134

135+
public get weight(): number {
136+
return this.operation.weight;
137+
}
138+
137139
public get debugMode(): boolean {
138140
return this._context.debugMode;
139141
}

0 commit comments

Comments
 (0)
0