10BC0 Prototype of the RxJS interop layer for signals by alxhub · Pull Request #49154 · angular/angular · GitHub
[go: up one dir, main page]

Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions goldens/public-api/core/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,11 +418,6 @@ export class DebugNode {
};
}

// @public
export type DeepReadonly<T> = T extends (infer R)[] ? ReadonlyArray<DeepReadonly<R>> : (T extends Function ? T : (T extends object ? {
readonly [P in keyof T]: DeepReadonly<T[P]>;
} : T));

// @public
export const DEFAULT_CURRENCY_CODE: InjectionToken<string>;

Expand Down Expand Up @@ -1366,7 +1361,7 @@ export interface SelfDecorator {
export function setTestabilityGetter(getter: GetTestability): void;

// @public
export type Signal<T> = (() => DeepReadonly<T>) & {
export type Signal<T> = (() => T) & {
[SIGNAL]: unknown;
};

Expand Down
32 changes: 32 additions & 0 deletions goldens/public-api/core/rxjs-interop/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
## API Report File for "@angular/core_rxjs-interop"

> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/).

```ts

import { DestroyRef } from '@angular/core';
import { Injector } from '@angular/core';
import { MonoTypeOperatorFunction } from 'rxjs';
import { Observable } from 'rxjs';
import { Signal } from '@angular/core';

// @public
export function fromObservable<T>(source: Observable<T>): Signal<T>;

// @public
export function fromObservable<T, U extends T | null | undefined>(source: Observable<T>, initialValue: U): Signal<T | U>;

// @public
export function fromSignal<T>(source: Signal<T>, options?: FromSignalOptions): Observable<T>;

// @public
export interface FromSignalOptions {
injector?: Injector;
}

// @public
export function takeUntilDestroyed<T>(destroyRef?: DestroyRef): MonoTypeOperatorFunction<T>;

// (No @packageDocumentation comment for this package)

```
6 changes: 6 additions & 0 deletions packages/bazel/test/ng_package/core_package.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ describe('@angular/core ng_package', () => {
esm: './esm2022/core.mjs',
default: './fesm2022/core.mjs'
},
'./rxjs-interop': {
types: './rxjs-interop/index.d.ts',
esm2022: './esm2022/rxjs-interop/rxjs-interop.mjs',
esm: './esm2022/rxjs-interop/rxjs-interop.mjs',
8000 default: './fesm2022/rxjs-interop.mjs'
},
'./testing': {
types: './testing/index.d.ts',
esm2022: './esm2022/testing/testing.mjs',
Expand Down
1 change: 1 addition & 0 deletions packages/core/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ ng_package(
],
deps = [
":core",
"//packages/core/rxjs-interop",
"//packages/core/testing",
],
)
Expand Down
28 changes: 28 additions & 0 deletions packages/core/rxjs-interop/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
load("//tools:defaults.bzl", "ng_module")

package(default_visibility = ["//visibility:public"])

exports_files(["package.json"])

ng_module(
name = "rxjs-interop",
srcs = glob(
[
"*.ts",
"src/**/*.ts",
],
),
deps = [
"//packages:types",
"//packages/core",
"@npm//rxjs",
],
)

filegroup(
name = "files_for_docgen",
srcs = glob([
"*.ts",
"src/**/*.ts",
]) + ["PACKAGE.md"],
)
1 change: 1 addition & 0 deletions packages/core/rxjs-interop/PACKAGE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Includes utilities related to using the RxJS library in conjunction with Angular's signal-based reactivity system.
9 changes: 9 additions & 0 deletions packages/core/rxjs-interop/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

export * from './src/index';
16 changes: 16 additions & 0 deletions packages/core/rxjs-interop/public_api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

/**
* @module
* @description
* Entry point for all public APIs of this package.
*/
export * from './src/index';

// This file only reexports content of the `src` folder. Keep it that way.
118 changes: 118 additions & 0 deletions packages/core/rxjs-interop/src/from_observable.ts
9E88
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

import {assertInInjectionContext, computed, DestroyRef, inject, signal, Signal, WritableSignal} from '@angular/core';
import {Observable} from 'rxjs';

/**
* Get the current value of an `Observable` as a reactive `Signal`.
*
* `fromObservable` returns a `Signal` which provides synchronous reactive access to values produced
* by the given `Observable`, by subscribing to that `Observable`. The returned `Signal` will always
* have the most recent value emitted by the subscription, and will throw an error if the
* `Observable` errors.
*
* The subscription will last for the lifetime of the current injection context. That is, if
* `fromObservable` is called from a component context, the subscription will be cleaned up when the
* component is destroyed. When called outside of a component, the current `EnvironmentInjector`'s
* lifetime will be used (which is typically the lifetime of the application itself).
*
* If the `Observable` does not produce a value before the `Signal` is read, the `Signal` will throw
* an error. To avoid this, use a synchronous `Observable` (potentially created with the `startWith`
* operator) or pass an initial value to `fromObservable` as the second argument.
*
* `fromObservable` must be called in an injection context.
*/
export function fromObservable<T>(source: Observable<T>): Signal<T>;

/**
* Get the current value of an `Observable` as a reactive `Signal`.
*
* `fromObservable` returns a `Signal` which provides synchronous reactive access to values produced
* by the given `Observable`, by subscribing to that `Observable`. The returned `Signal` will always
* have the most recent value emitted by the subscription, and will throw an error if the
* `Observable` errors.
*
* The subscription will last for the lifetime of the current injection context. That is, if
* `fromObservable` is called from a component context, the subscription will be cleaned up when the
* component is destroyed. When called outside of a component, the current `EnvironmentInjector`'s
* lifetime will be used (which is typically the lifetime of the application itself).
*
* Before the `Observable` emits its first value, the `Signal` will return the configured
* `initialValue`. If the `Observable` is known to produce a value before the `Signal` will be read,
* `initialValue` does not need to be passed.
*
* `fromObservable` must be called in an injection context.
*
* @developerPreview
*/
export function fromObservable<T, U extends T|null|undefined>(
// fromObservable(Observable<Animal>) -> Signal<Cat>
source: Observable<T>, initialValue: U): Signal<T|U>;
export function fromObservable<T, U = never>(source: Observable<T>, initialValue?: U): Signal<T|U> {
assertInInjectionContext(fromObservable);

// Note: T is the Observable value type, and U is the initial value type. They don't have to be
// the same - the returned signal gives values of type `T`.
let state: WritableSignal<State<T|U>>;
if (initialValue === undefined && arguments.length !== 2) {
// No initial value was passed, so initially the signal is in a `NoValue` state and will throw
// if accessed.
state = signal({kind: StateKind.NoValue});
} else {
// An initial value was passed, so use it.
state = signal<State<T|U>>({kind: StateKind.Value, value: initialValue!});
}

const sub = source.subscribe({
next: value => state.set({kind: StateKind.Value, value}),
error: error => state.set({kind: StateKind.Error, error}),
// Completion of the Observable is meaningless to the signal. Signals don't have a concept of
// "complete".
});

// Unsubscribe when the current context is destroyed.
inject(DestroyRef).onDestroy(sub.unsubscribe.bind(sub));

// The actual returned signal is a `computed` of the `State` signal, which maps the various states
// to either values or errors.
return computed(() => {
const current = state();
switch (current.kind) {
case StateKind.Value:
return current.value;
case StateKind.Error:
throw current.error;
case StateKind.NoValue:
// TODO(alxhub): use a RuntimeError when we finalize the error semantics
throw new Error(`fromObservable() signal read before the Observable emitted`);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should it be RuntimeError?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: TODO is left in. This needs a runtime error code.

}
});
}

const enum StateKind {
NoValue,
Value,
Error,
}

interface NoValueState {
kind: StateKind.NoValue;
}

interface ValueState<T> {
kind: StateKind.Value;
value: T;
}

interface ErrorState {
kind: StateKind.Error;
error: unknown;
}

type State<T> = NoValueState|ValueState<T>|ErrorState;
54 changes: 54 additions & 0 deletions packages/core/rxjs-interop/src/from_signal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

import {assertInInjectionContext, effect, inject, Injector, Signal} from '@angular/core';
import {Observable} from 'rxjs';

/**
* Options for `fromSignal`.
*
* @developerPreview
*/
export interface FromSignalOptions {
/**
* The `Injector` to use when creating the effect.
*
* If this isn't specified, the current injection context will be used.
*/
injector?: Injector;
}

/**
* Exposes the value of an Angular `Signal` as an RxJS `Observable`.
*
* The signal's value will be propagated into the `Observable`'s subscribers using an `effect`.
*
* `fromSignal` must be called in an injection context.
*
* @developerPreview
*/
export function fromSignal<T>(
source: Signal<T>,
options?: FromSignalOptions,
): Observable<T> {
!options?.injector && assertInInjectionContext(fromSignal);
const injector = options?.injector ?? inject(Injector);

// Creating a new `Observable` allows the creation of the effect to be lazy. This allows for all
// references to `source` to be dropped if the `Observable` is fully unsubscribed and thrown away.
return new Observable(observer => {
const watcher = effect(() => {
try {
observer.next(source());
} catch (err) {
observer.error(err);
}
}, {injector, manualCleanup: true});
return () => watcher.destroy();
});
}
11 changes: 11 additions & 0 deletions packages/core/rxjs-interop/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

export {fromObservable} from './from_observable';
export {fromSignal, FromSignalOptions} from './from_signal';
export {takeUntilDestroyed} from './take_until_destroyed';
36 changes: 36 additions & 0 deletions packages/core/rxjs-interop/src/take_until_destroyed.ts
6B28
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* @license
* Copyright Google LLC All Rights Reserved.
*
* Use of this source code is governed by an MIT-style license that can be
* found in the LICENSE file at https://angular.io/license
*/

import {assertInInjectionContext, DestroyRef, inject} from '@angular/core';
import {MonoTypeOperatorFunction, Observable} from 'rxjs';
import {takeUntil} from 'rxjs/operators';

/**
* Operator which completes the Observable when the calling context (component, directive, service,
* etc) is destroyed.
*
* @param destroyRef optionally, the `DestroyRef` representing the current context. This can be
* passed explicitly to use `takeUntilDestroyed` outside of an injection context. Otherwise, the
* current `DestroyRef` is injected.
*
* @developerPreview
*/
export function takeUntilDestroyed<T>(destroyRef?: DestroyRef): MonoTypeOperatorFunction<T> {
if (!destroyRef) {
assertInInjectionContext(takeUntilDestroyed);
destroyRef = inject(DestroyRef);
}
3360
const destroyed$ = new Observable<void>(observer => {
destroyRef!.onDestroy(observer.next.bind(observer));
});

return <T>(source: Observable<T>) => {
return source.pipe(takeUntil(destroyed$));
};
}
38 changes: 38 additions & 0 deletions packages/core/rxjs-interop/test/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
load("//tools:defaults.bzl", "jasmine_node_test", "karma_web_test_suite", "ts_library")
load("//tools/circular_dependency_test:index.bzl", "circular_dependency_test")

circular_dependency_test(
name = "circular_deps_test",
entry_point = "angular/packages/core/rxjs-interop/index.mjs",
deps = ["//packages/core/rxjs-interop"],
)

ts_library(
name = "test_lib",
testonly = True,
srcs = glob(["**/*.ts"]),
deps = [
"//packages:types",
"//packages/core",
"//packages/core/rxjs-interop",
"//packages/core/src/signals",
"//packages/core/testing",
"//packages/private/testing",
"@npm//rxjs",
],
)

jasmine_node_test(
name = "test",
bootstrap = ["//tools/testing:node"],
deps = [
":test_lib",
],
)

karma_web_test_suite(
name = "test_web",
deps = [
":test_lib",
],
)
Loading
0